3 * write-cluster management routines for LaFS
5 * Copyright (C) 2006-2009
6 * NeilBrown <neilb@suse.de>
7 * Released under the GPL, version 2
11 * Blocks to be written to a cluster need to be sorted so as to minimise
12 * fragmentation and to keep the cluster head small.
13 * So that we can track the amount of space needed in the cluster head,
14 * we really need to keep the list sorted rather than just sort it
16 * We do this using a linked list of blocks and an insertion sort.
17 * To improve the speed of insertion we keep a modify skiplist structure
19 * This is 'modified' in that it explicitly understands sequential
20 * runs within the list and doesn't keep extra skip-points within a run.
22 * The skip-points require memory allocation, which cannot be
23 * guaranteed here in the writeout path. To handle this we
24 * - preallocate some entries
25 * - accept a less-efficient list if memory isn't available
26 * - flush early if things get really bad.
28 * Skip point placement and runs.
29 * When an entry is added to the list, we place a skip-point over it
30 * based on the value of a random number, with a 1-in-4 chance of placement.
31 * The height of the skip point is also random - we keep rolling the dice,
32 * increasing height each time that we get 1-in-4.
33 * If the new entry is placed immediately after a skip point, and is
34 * consecutive with that skip point, the skip point is moved forward
35 * so that it will always be at the end of a run.
36 * When a new entry finds itself at the start or end of a run, a skip
40 /* A height of 8 with a fanout of 1-in-4 allows reasonable
41 * addressing for 2^16 entries, which is probably more than we need
45 #include <linux/random.h>
46 #include <linux/slab.h>
48 static void cluster_flush(struct fs *fs, int cnum);
50 static void skip_discard(struct skippoint *sp)
53 struct skippoint *next = sp->next[0];
59 static int cmp_blk(struct block *a, struct block *b)
61 /* compare the filesys/inode/index/addr info
62 * of two block and return:
65 * -1 if consecutive in order
66 * -2 if in same inode and in order
67 * <=-3 if different inodes and correct order
70 if (a->inode != b->inode) {
71 struct inode *fsinoa = ino_from_sb(a->inode->i_sb);
72 struct inode *fsinob = ino_from_sb(b->inode->i_sb);
85 /* Same inode. If both data blocks they might be
88 if (!test_bit(B_Index, &a->flags) &&
89 !test_bit(B_Index, &b->flags)) {
90 if (a->fileaddr == b->fileaddr)
92 if (a->fileaddr + 1 == b->fileaddr)
94 if (a->fileaddr < b->fileaddr)
98 /* at least one is an index block. Sort them before data */
99 if (!test_bit(B_Index, &a->flags))
100 /* a is data, b is index, b first */
103 if (!test_bit(B_Index, &b->flags))
106 /* both are index blocks, use fileaddr or depth */
107 if (a->fileaddr < b->fileaddr)
109 if (a->fileaddr > b->fileaddr)
111 if (iblk(a)->depth < iblk(b)->depth)
113 /* equality really shouldn't be possible */
117 static struct block *skip_find(struct skippoint *head,
118 struct list_head *list,
119 struct block *target,
120 struct skippoint *where)
122 /* Find the last block that precedes 'target'
123 * and return it. If target will be the start,
125 * 'where' gets filled in with the last skip point
126 * at each level which is before-or-at the found
127 * block. This can be used for easy insertion.
131 struct block *b, *next;
132 for (level = SKIP_MAX_HEIGHT-1; level >= 0 ; level--) {
133 while (head->next[level] &&
134 cmp_blk(head->next[level]->b, target) < 0)
135 head = head->next[level];
136 where->next[level] = head;
138 if (head->b == NULL) {
139 if (list_empty(list) ||
140 cmp_blk(list_entry(list->next, struct block, lru),
142 /* This goes at the start */
144 b = list_entry(list->next, struct block, lru);
147 /* b is before target */
149 list_for_each_entry_continue(next, list, lru) {
150 if (cmp_blk(next, target) > 0)
154 /* b is the last element before target */
158 static int cluster_insert(struct skippoint *head,
159 struct list_head *list,
160 struct block *target,
163 /* Insert 'target' at an appropriate point in the
164 * list, creating a skippoint if appropriate.
167 * 1 if not in a run, but adjacent to something in same file
169 * However if the value we would return would be more than
170 * 'avail' abort the insert and return -1;
173 struct skippoint pos, *newpoint;
176 int cmpbefore, cmpafter;
177 unsigned long rand[DIV_ROUND_UP(SKIP_MAX_HEIGHT * 2 / 8 + 1,
178 sizeof(unsigned long))];
181 BUG_ON(!list_empty(&target->lru));
182 if (unlikely(list_empty(list))) {
184 /* handle this trivial case separately */
187 list_add(&target->lru, list);
189 for (height = 0; height < SKIP_MAX_HEIGHT; height++)
190 head->next[height] = NULL;
193 b = skip_find(head, list, target, &pos);
195 list_add(&target->lru, list);
198 list_add(&target->lru, &b->lru);
199 cmpbefore = cmp_blk(b, target);
201 if (target->lru.next == list) /* new last */
204 cmpafter = cmp_blk(target, list_entry(target->lru.next,
207 if (cmpbefore == -1) {
208 /* adjacent with previous. Possibly move skippoint */
209 if (pos.next[0]->b == b) {
210 pos.next[0]->b = target;
214 } else if (cmpafter == -1)
215 /* adjacent with next. No need to add a skippoint */
217 else if (cmpbefore == -2 || cmpafter == -2)
218 /* same inodes as next or previous */
225 list_del_init(&target->lru);
228 /* Now consider adding a skippoint here. We want 2 bits
229 * per level of random data
231 get_random_bytes(rand, sizeof(rand));
233 while (height < SKIP_MAX_HEIGHT &&
234 test_bit(height*2, rand) &&
235 test_bit(height*2+1, rand))
240 newpoint = kmalloc(sizeof(struct skippoint), GFP_NOFS);
243 newpoint->b = target;
244 for (level = 0; level < height-1; level++) {
245 newpoint->next[level] = pos.next[level];
246 pos.next[level] = newpoint;
251 /*-----------------------------------------------------------------------
252 * A segment is divided up in a slightly complicated way into
253 * tables, rows and columns. This allows us to align writes with
254 * stripes in a raid4 array or similar.
255 * So we have some support routines to help track our way around
259 static int seg_remainder(struct fs *fs, struct segpos *seg)
261 /* return the number of blocks from the start of segpos to the
262 * end of the segment.
263 * i.e. remaining rows in this table, plus remaining tables in
266 struct fs_dev *dv = &fs->devs[seg->dev];
267 int rows = dv->rows_per_table - seg->st_row;
269 BUG_ON(seg->dev < 0);
270 rows += dv->rows_per_table * (dv->tables_per_seg - seg->st_table - 1);
271 return rows * dv->width;
274 static void seg_step(struct fs *fs, struct segpos *seg)
276 /* reposition this segpos to be immediately after it's current end
277 * and make the 'current' point be the start.
280 seg->st_table = seg->nxt_table;
281 seg->st_row = seg->nxt_row;
282 seg->table = seg->st_table;
283 seg->row = seg->st_row;
287 u32 lafs_seg_setsize(struct fs *fs, struct segpos *seg, u32 size)
289 /* move the 'nxt' table/row to be 'size' blocks beyond
290 * current start. size will be rounded up to a multiple
293 struct fs_dev *dv = &fs->devs[seg->dev];
295 BUG_ON(seg->dev < 0);
296 size = (size + dv->width - 1) / dv->width;
297 rv = size * dv->width;
299 seg->nxt_table = seg->st_table + size / dv->rows_per_table;
300 seg->nxt_row = size % dv->rows_per_table;
304 void lafs_seg_setpos(struct fs *fs, struct segpos *seg, u64 addr)
306 /* set seg to start at the given virtual address, and be
308 * 'addr' should always be at the start of a row, though
309 * as it might be read off storage, we need to be
310 * a little bit careful.
315 virttoseg(fs, addr, &seg->dev, &seg->num, &offset);
316 dv = &fs->devs[seg->dev];
317 col = offset / dv->stride;
318 row = offset % dv->stride;
319 table = col / dv->width;
320 col = col % dv->width;
323 seg->st_table = table;
326 seg->table = seg->nxt_table = seg->st_table;
327 seg->row = seg->nxt_row = seg->st_row;
331 static u64 seg_addr(struct fs *fs, struct segpos *seg)
333 /* Return the virtual address of the blocks pointed
336 struct fs_dev *dv = &fs->devs[seg->dev];
340 /* Setting 'next' address for last cluster in
341 * a cleaner segment */
343 if (seg->table > seg->nxt_table ||
344 (seg->table == seg->nxt_table &&
345 seg->row >= seg->nxt_row))
346 /* We have gone beyond the end of the cluster,
347 * must be bad data during roll-forward
350 addr = seg->col * dv->stride;
352 addr += seg->table * dv->rows_per_table;
353 addr += seg->num * dv->segment_stride;
358 u64 lafs_seg_next(struct fs *fs, struct segpos *seg)
360 /* step forward one block, returning the address of
361 * the block stepped over
362 * The write cluster can have three sections:
363 * - the tail end of one table
364 * - a number of complete tables
365 * - the head of one table
367 * For each table or partial table we want to talk a full
368 * column at a time, then go to the next column.
369 * So we step to the next row. If it is beyond the range of
370 * the current table, go to 'start' of next column, or to
373 struct fs_dev *dv = &fs->devs[seg->dev];
374 u64 addr = seg_addr(fs, seg);
377 /* Beyond end of cluster */
380 /* now step forward in column or table or seg */
382 if (seg->row >= dv->rows_per_table ||
383 (seg->table == seg->nxt_table
384 && seg->row >= seg->nxt_row)) {
386 if (seg->col >= dv->width) {
390 if (seg->table == seg->st_table)
391 seg->row = seg->st_row;
398 static void cluster_reset(struct fs *fs, struct wc *wc)
400 if (wc->seg.dev < 0) {
405 wc->remaining = seg_remainder(fs, &wc->seg);
406 wc->chead_blocks = 1;
408 wc->cluster_space = fs->blocksize - sizeof(struct cluster_head);
409 wc->chead = page_address(wc->page[wc->pending_next]);
410 wc->chead_size = sizeof(struct cluster_head);
413 static void close_segment(struct fs *fs, int cnum)
415 /* Release old segment */
416 /* FIXME I need lafs_seg_{de,}ref for every snapshot,
419 struct wc *wc = fs->wc + cnum;
420 if (wc->seg.dev >= 0) {
422 spin_lock(&fs->lock);
423 fs->clean_reserved -= seg_remainder(fs, &wc->seg);
424 spin_unlock(&fs->lock);
426 lafs_seg_forget(fs, wc->seg.dev, wc->seg.num);
431 void lafs_close_all_segments(struct fs *fs)
434 for (cnum = 0; cnum < WC_NUM; cnum++)
435 close_segment(fs, cnum);
438 static int new_segment(struct fs *fs, int cnum)
440 /* new_segment can fail if cnum > 0 and there is no
443 struct wc *wc = fs->wc + cnum;
447 close_segment(fs, cnum);
449 if (cnum && fs->clean_reserved < fs->max_segment) {
450 /* we have reached the end of the last cleaner
451 * segment. The remainder of clean_reserved
452 * is of no value (if there even is any)
454 if (fs->clean_reserved) {
455 spin_lock(&fs->alloc_lock);
456 fs->free_blocks += fs->clean_reserved;
457 fs->clean_reserved = 0;
458 spin_unlock(&fs->alloc_lock);
462 /* This gets a reference on the 'segsum' */
463 lafs_free_get(fs, &dev, &seg, 0);
464 lafs_seg_setpos(fs, &wc->seg, segtovirt(fs, dev, seg));
465 BUG_ON(wc->seg.dev != dev);
466 BUG_ON(wc->seg.num != seg);
469 /* wc mutex protects us here */
475 /*-------------------------------------------------------------------------
476 * lafs_cluster_allocate
477 * This adds a block to the writecluster list and possibly triggers a
479 * The flush will assign a physical address to each block and schedule
480 * the actual write out.
482 * There can be multiple active write clusters. e.g one for new data,
483 * one for cleaned data, one for defrag writes.
485 * It is here that the in-core inode is copied to the data block, if
486 * appropriate, and here that small files get copied into the inode
487 * thus avoiding writing a separate block for the file content.
489 * Roll-forward sees any data block as automatically extending the
490 * size of the file. This requires that we write blocks in block-order
491 * as much as possible (which we would anyway) and that we inform
492 * roll-forward when a file ends mid-block.
494 * A block should be iolocked when being passed to lafs_cluster_allocate.
495 * When the IO is complete, it will be unlocked.
497 static int flush_data_to_inode(struct block *b)
499 /* This is the first and only datablock in a file
500 * and it will fit in the inode. So put it there.
504 loff_t size = i_size_read(b->inode);
505 struct lafs_inode *lai = LAFSI(b->inode);
506 struct datablock *db;
507 struct fs *fs = fs_from_inode(b->inode);
509 lafs_iolock_block(&lai->iblock->b);
510 dprintk("FLUSHING single block into inode - s=%d %s\n",
511 (int)size, strblk(b));
512 if (lai->iblock->depth > 1)
513 /* Too much indexing still - truncate must be happening */
515 if (lai->iblock->depth == 1) {
516 /* If there is nothing in this block we can still use it */
517 if (lai->iblock->children.next != &b->siblings ||
518 b->siblings.next != &lai->iblock->children ||
519 lafs_leaf_next(lai->iblock, 1) != 0xffffffff)
522 /* Need checkpoint lock to pin the dblock */
523 lafs_checkpoint_lock(fs);
524 if (test_and_clear_bit(B_Dirty, &b->flags)) {
526 /* Check size again, just in case it changed */
527 size = i_size_read(b->inode);
528 if (size + lai->metadata_size > fs->blocksize) {
529 /* Lost a race, better give up restoring Dirty bit */
530 if (test_and_set_bit(B_Dirty, &b->flags))
531 if (test_and_set_bit(B_Credit, &b->flags))
532 lafs_space_return(fs, 1);
533 lafs_checkpoint_unlock(fs);
536 if (!test_and_set_bit(B_Credit, &lai->dblock->b.flags))
538 if (test_and_clear_bit(B_UnincCredit, &b->flags))
540 if (!test_and_set_bit(B_ICredit, &lai->dblock->b.flags))
542 lafs_space_return(fs, credits);
543 LAFS_BUG(!test_bit(B_SegRef, &lai->dblock->b.flags), &lai->dblock->b);
544 set_bit(B_PinPending, &lai->dblock->b.flags);
545 if (test_bit(B_Pinned, &b->flags))
546 lafs_pin_block_ph(&lai->dblock->b, !!test_bit(B_Phase1, &b->flags));
548 lafs_pin_block(&lai->dblock->b);
549 lafs_dirty_dblock(lai->dblock);
550 } else if (test_and_clear_bit(B_Realloc, &b->flags)) {
552 LAFS_BUG(!test_bit(B_Valid, &lai->iblock->b.flags),
554 if (test_and_clear_bit(B_UnincCredit, &b->flags))
556 if (!test_and_set_bit(B_Realloc, &lai->iblock->b.flags))
558 if (!test_and_set_bit(B_UnincCredit, &lai->iblock->b.flags))
560 LAFS_BUG(credits < 0, b);
561 lafs_space_return(fs, credits);
563 printk("Wasn't dirty or realloc: %s\n", strblk(b));
566 lafs_checkpoint_unlock(fs);
568 lai->iblock->depth = 0;
569 clear_bit(B_Valid, &lai->iblock->b.flags);
571 /* safe to reference ->dblock as b is a dirty child */
572 db = getdref(lai->dblock, MKREF(flush2db));
573 ibuf = map_dblock(db);
574 dbuf = map_dblock_2(dblk(b));
576 if (test_and_clear_bit(B_UnincCredit, &b->flags))
577 lafs_space_return(fs, 1);
579 /* It is an awkward time to call lafs_inode_fillblock,
580 * so do this one little change manually
582 ((struct la_inode *)ibuf)->depth = 0;
583 memcpy(ibuf + lai->metadata_size,
585 memset(ibuf + lai->metadata_size + size,
587 fs->blocksize - lai->metadata_size - size);
588 unmap_dblock_2(db, ibuf);
589 unmap_dblock(dblk(b), dbuf);
590 lafs_iounlock_block(&lai->iblock->b);
592 putdref(db, MKREF(flush2db));
597 lafs_iounlock_block(&lai->iblock->b);
601 int lafs_cluster_allocate(struct block *b, int cnum)
603 struct fs *fs = fs_from_inode(b->inode);
604 struct wc *wc = &fs->wc[cnum];
605 struct lafs_inode *lai;
610 LAFS_BUG(test_bit(B_Index, &b->flags) &&
611 iblk(b)->uninc_table.pending_cnt > 0, b);
613 LAFS_BUG(!test_bit(B_IOLock, &b->flags), b);
614 lai = LAFSI(b->inode);
616 LAFS_BUG(!test_bit(B_Valid, &b->flags), b);
617 LAFS_BUG(!fs->rolled, b);
618 LAFS_BUG(lai->flags & File_nonlogged &&
619 !test_bit(B_Index, &b->flags), b);
621 /* We cannot change the physaddr of an uniblock in a
622 * different phase to the parent, else the parent will
623 * get the wrong address.
625 LAFS_BUG(test_bit(B_Index, &b->flags) &&
626 test_bit(B_Uninc, &b->flags) &&
627 !!test_bit(B_Phase1, &b->flags) !=
628 !!test_bit(B_Phase1, &b->parent->b.flags), b);
630 size = i_size_read(b->inode);
631 if (!test_bit(B_Index, &b->flags) && /* it is a datablock */
633 b->parent == lai->iblock && /* No indexing */
634 lai->type >= TypeBase && /* 'size' is meaningful */
635 size + lai->metadata_size <= fs->blocksize) {
636 int success = flush_data_to_inode(b);
638 lafs_summary_update(fs, b->inode, b->physaddr, 0,
639 0, !!test_bit(B_Phase1, &b->flags),
640 test_bit(B_SegRef, &b->flags));
642 lafs_iounlock_block(b);
645 /* There are conflicting blocks in the index tree */
646 if (test_and_clear_bit(B_Realloc, &b->flags))
647 lafs_space_return(fs, 1);
648 if (!test_bit(B_Dirty, &b->flags) ||
649 !test_bit(B_Pinned, &b->flags)) {
650 /* It is safe to just leave this for later */
651 lafs_iounlock_block(b);
654 /* We really need to write this, so use a full block. */
655 printk("WARNING %s\n", strblk(b));
659 /* The symbiosis between the datablock and the indexblock for an
660 * inode needs to be dealt with here.
661 * The data block will not normally be written until the InoIdx
662 * block is unpinned or phase-flipped. However if it is, as the
663 * result of some 'sync' style operation, then we write it even
664 * though the index info might not be uptodate. The rest of the
665 * content will be safe for roll-forward to pick up, and the rest
666 * will be written when the InoIdx block is ready.
668 * When the InoIdx block is ready, we don't want to write it as
669 * there is nowhere for it to be incorporated in to. Rather we
670 * move the pinning and credits across to the Data block
671 * which will subsequently be written.
674 if (test_bit(B_InoIdx, &b->flags)) {
675 struct block *b2 = &lai->dblock->b;
678 LAFS_BUG(!test_bit(B_Pinned, &b->flags), b);
679 if (!test_bit(B_Pinned, &b2->flags)) {
680 /* index block pinned, data block not,
681 * carry the pinning across
683 if (b2->parent == NULL &&
686 getiref(b->parent, MKREF(child));
687 LAFS_BUG(b->parent != b2->parent, b);
688 set_bit(B_PinPending, &b2->flags); // FIXME is this right?
689 set_phase(b2, test_bit(B_Phase1,
695 if (test_and_clear_bit(B_Realloc, &b->flags))
697 LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
699 if (test_and_clear_bit(B_Dirty, &b->flags))
701 if (test_and_clear_bit(B_Realloc, &b->flags))
704 if (test_and_clear_bit(B_UnincCredit, &b->flags))
707 /* We always erase the InoIdx block before the
708 * inode data block, so this cannot happen
710 LAFS_BUG(!test_bit(B_Valid, &b2->flags), b2);
713 if (!test_and_set_bit(B_UnincCredit, &b2->flags))
714 if (!test_and_clear_bit(B_ICredit, &b2->flags))
717 if (!test_bit(B_Dirty, &b2->flags))
718 /* need some credits... */
719 if (!test_and_set_bit(B_Credit, &b2->flags))
721 lafs_dirty_dblock(dblk(b2));
723 if (!test_and_set_bit(B_UnincCredit, &b2->flags))
726 if (!test_and_set_bit(B_Realloc, &b2->flags))
729 LAFS_BUG(credits < 0, b2);
730 lafs_space_return(fs, credits);
731 /* make sure 'dirty' status is registered */
733 lafs_iounlock_block(b);
737 if (!test_bit(B_Dirty, &b->flags) &&
738 !test_bit(B_Realloc, &b->flags)) {
739 /* block just got truncated, so don't write it. */
740 lafs_iounlock_block(b);
744 if (test_bit(B_EmptyIndex, &b->flags)) {
746 if (test_and_set_bit(B_Writeback, &b->flags))
748 lafs_iounlock_block(b);
749 lafs_allocated_block(fs, b, 0);
751 if (test_and_clear_bit(B_Realloc, &b->flags))
753 LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
755 if (test_and_clear_bit(B_Dirty, &b->flags))
757 if (test_and_clear_bit(B_Realloc, &b->flags))
760 lafs_writeback_done(b);
761 lafs_space_return(fs, credits);
765 if (!test_bit(B_Index, &b->flags)) {
766 struct inode *myi = rcu_my_inode(dblk(b));
767 if (myi && test_and_clear_bit(I_Dirty, &LAFSI(myi)->iflags))
768 lafs_inode_fillblock(myi);
772 mutex_lock(&wc->lock);
774 /* The following check must be inside the mutex_lock
775 * to ensure exclusion between checkpoint and writepage.
776 * checkpoint must never see the block not on the
777 * leaf lru unless it is already in the cluster and so can
780 if (!list_empty_careful(&b->lru)) {
781 /* Someone is flushing this block before
782 * checkpoint got to it - probably writepage.
783 * Must remove it from the leaf lru so it can go
784 * on the cluster list.
786 LAFS_BUG(test_bit(B_OnFree, &b->flags), b);
787 spin_lock(&fs->lock);
788 if (!list_empty(&b->lru))
789 list_del_init(&b->lru);
790 spin_unlock(&fs->lock);
793 if (test_and_set_bit(B_Writeback, &b->flags))
795 lafs_iounlock_block(b);
797 getref(b, MKREF(cluster)); /* we soon own a reference in the list */
801 /* see how much room is in cluster.
802 * The interesting values are:
803 * none, one descriptor, one grouphead and one descriptor
804 * These are assigned values 0, 1, 2.
806 if (wc->cluster_space >= (sizeof(struct group_head) +
807 sizeof(struct descriptor)) ||
808 (wc->remaining > 2 &&
809 wc->chead_blocks < MAX_CHEAD_BLOCKS &&
810 (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE)
813 else if (wc->cluster_space >= sizeof(struct descriptor))
817 if (wc->seg.dev < 0 || wc->remaining < 1)
820 used = cluster_insert(&wc->slhead, &wc->clhead, b, avail);
824 else if (wc->slhead.b)
825 cluster_flush(fs, cnum);
827 err = new_segment(fs, cnum);
829 /* No cleaner segments - this will have to go
830 * out with a checkpoint.
831 * Block is still 'dirty' - just clear
834 lafs_writeback_done(b);
835 putref(b, MKREF(cluster));
836 mutex_unlock(&wc->lock);
839 cluster_reset(fs, wc);
844 wc->cluster_space -= sizeof(struct descriptor);
846 wc->cluster_space -= sizeof(struct group_head);
847 if (wc->cluster_space < 0) {
848 /* need a new page */
851 wc->cluster_space += fs->blocksize;
854 BUG_ON(wc->remaining < 0);
855 if (wc->remaining == 0)
856 cluster_flush(fs, cnum);
857 mutex_unlock(&wc->lock);
861 /*-------------------------------------------------------------------------
862 * Cluster head building.
863 * We build the cluster head bit by bit as we find blocks
864 * in the list. These routines help.
867 static void cluster_addhead(struct wc *wc, struct inode *ino,
868 struct group_head **headstart)
870 struct group_head *gh = (struct group_head *)((char *)wc->chead +
873 dprintk("CLUSTER addhead %d\n", wc->chead_size);
876 gh->inum = cpu_to_le32(ino->i_ino);
877 gh->fsnum = cpu_to_le32(ino_from_sb(ino->i_sb)->i_ino);
878 tnf = ((ino->i_generation<<8) | (LAFSI(ino)->trunc_gen & 0xff))
882 gh->truncatenum_and_flag = cpu_to_le16(tnf);
883 wc->chead_size += sizeof(struct group_head);
886 static void cluster_closehead(struct wc *wc,
887 struct group_head *headstart)
889 int size = wc->chead_size - (((char *)headstart) - (char *)wc->chead);
891 dprintk("CLUSTER closehead %d %d\n", wc->chead_size, size);
892 headstart->group_size_words = size / 4;
895 static void cluster_addmini(struct wc *wc, u32 addr, int offset,
896 int size, const char *data,
897 int size2, const char *data2)
899 /* if size2 !=0, then only
900 * (size-size2) is at 'data' and the rest is at 'data2'
902 struct miniblock *mb = ((struct miniblock *)
903 ((char *)wc->chead + wc->chead_size));
905 dprintk("CLUSTER addmini %d %d\n", wc->chead_size, size);
907 mb->block_num = cpu_to_le32(addr);
908 mb->block_offset = cpu_to_le16(offset);
909 mb->length = cpu_to_le16(size + DescMiniOffset);
910 wc->chead_size += sizeof(struct miniblock);
911 memcpy(mb->data, data, size-size2);
913 memcpy(mb->data + size-size2, data2, size2);
914 memset(mb->data+size, 0, (-size)&3);
915 wc->chead_size += ROUND_UP(size);
918 static void cluster_adddesc(struct wc *wc, struct block *blk,
919 struct descriptor **desc_start)
921 struct descriptor *dh = (struct descriptor *)((char *)wc->chead +
924 dprintk("CLUSTER add_desc %d\n", wc->chead_size);
925 dh->block_num = cpu_to_le32(blk->fileaddr);
926 dh->block_cnt = cpu_to_le32(0);
928 if (test_bit(B_Index, &blk->flags))
929 dh->block_bytes = DescIndex;
930 wc->chead_size += sizeof(struct descriptor);
933 static void cluster_incdesc(struct wc *wc, struct descriptor *desc_start,
934 struct block *b, int blkbits)
936 desc_start->block_cnt =
937 cpu_to_le32(le32_to_cpu(desc_start->block_cnt)+1);
938 if (!test_bit(B_Index, &b->flags)) {
939 if (LAFSI(b->inode)->type >= TypeBase) {
940 u64 size = b->inode->i_size;
941 if (size > ((loff_t)b->fileaddr << blkbits) &&
942 size <= ((loff_t)(b->fileaddr + 1) << blkbits))
943 desc_start->block_bytes =
944 cpu_to_le32(size & ((1<<blkbits)-1));
946 desc_start->block_bytes =
947 cpu_to_le32(1 << blkbits);
949 desc_start->block_bytes = cpu_to_le32(1<<blkbits);
953 /*------------------------------------------------------------------------
955 * Updates are added to the cluster head immediately, causing a flush
956 * if there is insufficient room.
957 * This means that miniblocks always occur before block descriptors.
958 * miniblocks are only ever written to cluster-series 0.
962 lafs_cluster_update_prepare(struct update_handle *uh, struct fs *fs,
965 /* make there there is room for a 'len' update in upcoming
966 * cluster. Do nothing here.
974 lafs_cluster_update_pin(struct update_handle *uh)
976 /* last chance to bail out */
977 if (lafs_space_alloc(uh->fs, 1, ReleaseSpace)) {
979 uh->fs->cluster_updates++;
985 static unsigned long long
986 lafs_cluster_update_commit_both(struct update_handle *uh, struct fs *fs,
987 struct inode *ino, u32 addr,
988 int offset, int len, struct datablock *b,
990 int len2, const char *str2)
992 /* There is the data - alternate layout */
993 /* flush 'size' bytes of data, as 'offset' of bnum in ino
994 * in the next write cluster
996 struct wc *wc = &fs->wc[0];
997 struct group_head *head_start;
998 unsigned long long seq;
999 char *mapping = NULL;
1001 BUG_ON(!fs->rolled);
1003 mutex_lock(&wc->lock);
1004 while (wc->cluster_space < (sizeof(struct group_head)+
1005 sizeof(struct descriptor) +
1007 if (wc->remaining > 0 && wc->chead_blocks < MAX_CHEAD_BLOCKS &&
1008 (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE) {
1011 wc->cluster_space += fs->blocksize;
1013 cluster_flush(fs, 0);
1016 fs->cluster_updates -= uh->reserved;
1017 lafs_space_return(fs, uh->reserved);
1020 cluster_addhead(wc, ino, &head_start);
1022 mapping = map_dblock(b);
1023 str1 = mapping + offset;
1025 cluster_addmini(wc, addr, offset, len, str1, len2, str2);
1027 unmap_dblock(b, mapping);
1028 cluster_closehead(wc, head_start);
1029 wc->cluster_space -= (sizeof(struct group_head)+
1030 sizeof(struct descriptor) +
1032 seq = wc->cluster_seq;
1033 mutex_unlock(&wc->lock);
1038 lafs_cluster_update_commit(struct update_handle *uh,
1039 struct datablock *b,
1040 int offset, int len)
1042 /* OK here is the data to put in the next cluster. */
1043 struct fs *fs = fs_from_inode(b->b.inode);
1044 unsigned long long seq;
1046 seq = lafs_cluster_update_commit_both(uh, fs, b->b.inode, b->b.fileaddr,
1047 offset, len, b, NULL, 0, NULL);
1053 lafs_cluster_update_commit_buf(struct update_handle *uh, struct fs *fs,
1054 struct inode *ino, u32 addr,
1055 int offset, int len, const char *str1,
1056 int len2, const char *str2)
1058 return lafs_cluster_update_commit_both(uh, fs,
1066 lafs_cluster_update_abort(struct update_handle *uh)
1068 /* Didn't want that cluster_update after all */
1069 uh->fs->cluster_updates -= uh->reserved;
1070 lafs_space_return(uh->fs, uh->reserved);
1073 int lafs_calc_cluster_csum(struct cluster_head *head)
1075 unsigned int oldcsum = head->checksum;
1076 unsigned long long newcsum = 0;
1079 unsigned int *superc = (unsigned int *) head;
1082 for (i = 0; i < le16_to_cpu(head->Hlength)/4; i++)
1083 newcsum += le32_to_cpu(superc[i]);
1084 csum = (newcsum & 0xffffffff) + (newcsum>>32);
1085 head->checksum = oldcsum;
1086 return cpu_to_le32(csum);
1089 /*------------------------------------------------------------------------
1091 * Once we have a suitable number of blocks and updates in a writecluster
1092 * we need to write them out.
1093 * We first iterate over the block list building the clusterhead.
1094 * Then we finalise and write the clusterhead and iterate again
1095 * writing out each block.
1096 * We keep a biassed blockcount in the wc, and decrement it on write-completion.
1097 * Once the blockcount hits the bias the cluster is written. Then when
1098 * the next (or next-plus-one) cluster head is written we remove the bias
1099 * and the data is deemed to be safe.
1101 * Update blocks will already have been included in the write cluster.
1103 * We reinitialise the write cluster once we are finished.
1106 static void cluster_done(struct fs *fs, struct wc *wc)
1108 /* This is called when one of the pending_cnts hit 0, indicating
1109 * that a write-cluster has been written and committed to storage.
1110 * We find all the pending blocks on a completed cluster and
1114 struct block *b, *tmp;
1115 struct list_head tofree;
1117 INIT_LIST_HEAD(&tofree);
1119 spin_lock(&fs->lock);
1120 for (i = 0; i < 4 ; i++)
1121 if (atomic_read(&wc->pending_cnt[i]) == 0)
1122 list_splice_init(&wc->pending_blocks[i], &tofree);
1123 spin_unlock(&fs->lock);
1125 list_for_each_entry_safe(b, tmp, &tofree, lru) {
1126 list_del_init(&b->lru);
1127 lafs_writeback_done(b);
1128 putref(b, MKREF(cluster));
1132 void lafs_clusters_done(struct fs *fs)
1135 for (i = 0 ; i < WC_NUM; i++)
1136 cluster_done(fs, &fs->wc[i]);
1139 void lafs_done_work(struct work_struct *ws)
1141 struct fs *fs = container_of(ws, struct fs, done_work);
1142 lafs_clusters_done(fs);
1145 static void cluster_flush(struct fs *fs, int cnum)
1147 struct wc *wc = &fs->wc[cnum];
1150 struct inode *current_inode = NULL;
1151 u64 current_block = NoBlock;
1152 u64 head_addr[MAX_CHEAD_BLOCKS];
1153 struct descriptor *desc_start = NULL;
1154 struct group_head *head_start = NULL;
1155 struct segpos segend;
1161 if (!test_bit(CheckpointOpen, &fs->fsstate)) {
1162 /* First cluster since a checkpoint, make sure
1163 * we do another checkpoint within 30 seconds
1165 fs->next_checkpoint = jiffies + 30 * HZ;
1166 set_bit(CheckpointOpen, &fs->fsstate);
1167 lafs_wake_thread(fs);
1169 /* set the writecluster size as this will guide layout in
1173 if (test_and_clear_bit(FlushNeeded, &fs->fsstate))
1174 set_bit(SecondFlushNeeded, &fs->fsstate);
1176 clear_bit(SecondFlushNeeded, &fs->fsstate);
1178 cluster_size = lafs_seg_setsize(fs, &wc->seg,
1179 seg_remainder(fs, &wc->seg)
1182 /* find, and step over, address header block(s) */
1183 for (i = 0; i < wc->chead_blocks ; i++)
1184 head_addr[i] = lafs_seg_next(fs, &wc->seg);
1186 list_for_each_entry(b, &wc->clhead, lru) {
1188 if (b->inode != current_inode) {
1189 /* need to create a new group_head */
1192 cluster_closehead(wc, head_start);
1193 cluster_addhead(wc, b->inode, &head_start);
1194 current_inode = b->inode;
1195 current_block = NoBlock;
1197 if (desc_start == NULL || b->fileaddr != current_block+1 ||
1198 test_bit(B_Index, &b->flags)) {
1199 cluster_adddesc(wc, b, &desc_start);
1200 current_block = b->fileaddr;
1203 cluster_incdesc(wc, desc_start, b, fs->blocksize_bits);
1204 addr = lafs_seg_next(fs, &wc->seg);
1206 if (cnum && test_bit(B_Dirty, &b->flags))
1207 /* We are cleaning but this block is now dirty.
1208 * Don't waste the UnincCredit on recording the
1209 * clean location as it will be written as
1210 * dirty soon. Just pin it to the current phase
1211 * to ensure that it really does get written.
1215 lafs_allocated_block(fs, b, addr);
1219 cluster_closehead(wc, head_start);
1220 segend = wc->seg; /* We may write zeros from here */
1221 seg_step(fs, &wc->seg);
1222 wc->remaining = seg_remainder(fs, &wc->seg);
1223 /* Need to make sure out ->next_addr gets set properly
1224 * for non-cleaning segments
1226 if (wc->remaining < 2) {
1228 new_segment(fs, cnum);
1230 close_segment(fs, cnum);
1233 /* Fill in the cluster header */
1234 strncpy(wc->chead->idtag, "LaFSHead", 8);
1235 wc->chead->flags = cpu_to_le32(0);
1237 /* checkpoints only happen in cnum==0 */
1238 if (cnum == 0 && fs->checkpointing) {
1239 spin_lock(&fs->lock);
1240 wc->chead->flags = cpu_to_le32(fs->checkpointing);
1241 if (fs->checkpointing & CH_CheckpointEnd) {
1242 fs->checkpointing = 0;
1243 clear_bit(CheckpointOpen, &fs->fsstate);
1244 } else if (fs->checkpointing & CH_CheckpointStart) {
1245 fs->checkpointcluster = head_addr[0];
1246 fs->checkpointing &= ~CH_CheckpointStart;
1248 spin_unlock(&fs->lock);
1249 if (!fs->checkpointing)
1250 wake_up(&fs->phase_wait);
1253 memcpy(wc->chead->uuid, fs->state->uuid, 16);
1254 wc->chead->seq = cpu_to_le64(wc->cluster_seq);
1256 wc->chead->Hlength = cpu_to_le16(wc->chead_size);
1257 wc->chead->Clength = cpu_to_le16(cluster_size);
1258 spin_lock(&fs->lock);
1260 fs->clean_reserved -= cluster_size;
1262 fs->free_blocks -= cluster_size;
1263 spin_unlock(&fs->lock);
1264 /* FIXME if this is just a header, no data blocks,
1265 * then use VerifyNull. Alternately if there is
1266 * no-one waiting for a sync to complete (how do we
1267 * track that?) use VerifyNext2
1272 wc->pending_vfy_type[wc->pending_next] = vtype;
1273 wc->chead->verify_type = cpu_to_le16(vtype);
1274 memset(wc->chead->verify_data, 0, 16);
1276 wc->chead->next_addr = cpu_to_le64(seg_addr(fs, &wc->seg));
1277 wc->chead->prev_addr = cpu_to_le64(wc->prev_addr);
1278 wc->prev_addr = head_addr[0];
1279 wc->chead->this_addr = cpu_to_le64(wc->prev_addr);
1281 wc->chead->checksum = lafs_calc_cluster_csum(wc->chead);
1283 /* We cannot write the header until any previous cluster
1284 * which uses the header as a commit block has been
1287 which = (wc->pending_next+3)%4;
1288 dprintk("AA which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1289 atomic_read(&wc->pending_cnt[which]));
1290 if (wc->pending_vfy_type[which] == VerifyNext)
1291 wait_event(wc->pending_wait,
1292 atomic_read(&wc->pending_cnt[which]) == 1);
1293 which = (which+3) % 4;
1294 dprintk("AB which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1295 atomic_read(&wc->pending_cnt[which]));
1296 if (wc->pending_vfy_type[which] == VerifyNext2)
1297 wait_event(wc->pending_wait,
1298 atomic_read(&wc->pending_cnt[which]) == 1);
1300 lafs_clusters_done(fs);
1301 dprintk("cluster_flush pre-bug pending_next=%d cnt=%d\n",
1302 wc->pending_next, atomic_read(&wc->pending_cnt
1303 [wc->pending_next]));
1304 BUG_ON(atomic_read(&wc->pending_cnt[wc->pending_next]) != 0);
1305 BUG_ON(!list_empty(&wc->pending_blocks[wc->pending_next]));
1307 /* initialise pending count to 1. This will be decremented
1308 * once all related blocks have been submitted for write.
1309 * This includes the blocks in the next 0, 1, or 2
1312 atomic_inc(&wc->pending_cnt[wc->pending_next]);
1314 /* Now write it all out.
1315 * Later we should possibly re-order the writes
1316 * for raid4 stripe-at-a-time
1318 for (i = 0; i < wc->chead_blocks; i++)
1320 page_address(wc->page[wc->pending_next])
1321 + i * fs->blocksize,
1324 while (!list_empty(&wc->clhead)) {
1326 b = list_entry(wc->clhead.next, struct block, lru);
1327 dprintk("flushing %s\n", strblk(b));
1329 if (test_and_clear_bit(B_Realloc, &b->flags))
1332 if (test_and_clear_bit(B_Dirty, &b->flags))
1336 WARN_ON(credits == 0);
1337 lafs_space_return(fs, credits);
1338 spin_lock(&fs->lock);
1340 list_add(&b->lru, &wc->pending_blocks[(wc->pending_next+3)%4]);
1341 spin_unlock(&fs->lock);
1342 lafs_write_block(fs, b, wc);
1345 skip_discard(wc->slhead.next[0]);
1346 /* FIXME write out zeros to pad raid4 if needed */
1348 /* Having submitted this request we need to remove the bias
1349 * from pending_cnt of previous clusters
1351 which = wc->pending_next;
1353 dprintk("A which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1354 atomic_read(&wc->pending_cnt[which]));
1355 if (wc->pending_vfy_type[which] == VerifyNull)
1356 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1358 which = (which+3) % 4;
1359 dprintk("B which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1360 atomic_read(&wc->pending_cnt[which]));
1361 if (wc->pending_vfy_type[which] == VerifyNext)
1362 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1364 which = (which+3) % 4;
1365 dprintk("C which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1366 atomic_read(&wc->pending_cnt[which]));
1367 if (wc->pending_vfy_type[which] == VerifyNext2)
1368 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1371 cluster_done(fs, wc);
1372 wake_up(&wc->pending_wait);
1374 dprintk("D %d\n", wake);
1376 lafs_write_flush(fs, wc);
1378 wc->pending_next = (wc->pending_next+1) % 4;
1379 /* now re-initialise the cluster information */
1380 cluster_reset(fs, wc);
1382 wait_event(wc->pending_wait,
1383 atomic_read(&wc->pending_cnt[wc->pending_next]) == 0);
1384 dprintk("cluster_flush end pending_next=%d cnt=%d\n",
1385 wc->pending_next, atomic_read(&wc->pending_cnt
1386 [wc->pending_next]));
1389 void lafs_cluster_flush(struct fs *fs, int cnum)
1391 struct wc *wc = &fs->wc[cnum];
1392 dprintk("LAFS_cluster_flush %d\n", cnum);
1393 mutex_lock(&wc->lock);
1394 if (!list_empty(&wc->clhead)
1395 || wc->chead_size > sizeof(struct cluster_head)
1397 ((fs->checkpointing & CH_CheckpointEnd)
1398 || test_bit(FlushNeeded, &fs->fsstate)
1399 || test_bit(SecondFlushNeeded, &fs->fsstate)
1401 cluster_flush(fs, cnum);
1402 mutex_unlock(&wc->lock);
1405 void lafs_cluster_wait_all(struct fs *fs)
1408 for (i = 0; i < WC_NUM; i++) {
1409 struct wc *wc = &fs->wc[i];
1411 for (j = 0; j < 4; j++) {
1412 wait_event(wc->pending_wait,
1413 atomic_read(&wc->pending_cnt[j]) <= 1);
1418 /* The end_io function for writes to a cluster is one
1419 * of 4 depending on which in the circular list the
1422 static void cluster_end_io(struct bio *bio, int err,
1423 int which, int header)
1425 /* FIXME how to handle errors? */
1426 struct wc *wc = bio->bi_private;
1427 struct fs *fs = container_of(wc, struct fs, wc[wc->cnum]);
1431 dprintk("end_io err=%d which=%d header=%d\n",
1432 err, which, header);
1434 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1436 if (atomic_read(&wc->pending_cnt[which]) == 1)
1438 which = (which+3) % 4;
1440 (wc->pending_vfy_type[which] == VerifyNext ||
1441 wc->pending_vfy_type[which] == VerifyNext2) &&
1442 atomic_dec_and_test(&wc->pending_cnt[which]))
1444 if (atomic_read(&wc->pending_cnt[which]) == 1)
1447 which = (which+3) % 4;
1449 wc->pending_vfy_type[which] == VerifyNext2 &&
1450 atomic_dec_and_test(&wc->pending_cnt[which]))
1452 if (atomic_read(&wc->pending_cnt[which]) == 1)
1456 schedule_work(&fs->done_work);
1458 wake_up(&wc->pending_wait);
1459 if (test_bit(FlushNeeded, &fs->fsstate) ||
1460 test_bit(SecondFlushNeeded, &fs->fsstate))
1461 lafs_wake_thread(fs);
1465 static void cluster_endio_data_0(struct bio *bio, int err)
1467 cluster_end_io(bio, err, 0, 0);
1470 static void cluster_endio_data_1(struct bio *bio, int err)
1472 cluster_end_io(bio, err, 1, 0);
1475 static void cluster_endio_data_2(struct bio *bio, int err)
1477 cluster_end_io(bio, err, 2, 0);
1480 static void cluster_endio_data_3(struct bio *bio, int err)
1482 cluster_end_io(bio, err, 3, 0);
1485 static void cluster_endio_header_0(struct bio *bio, int err)
1487 cluster_end_io(bio, err, 0, 1);
1490 static void cluster_endio_header_1(struct bio *bio, int err)
1492 cluster_end_io(bio, err, 1, 1);
1495 static void cluster_endio_header_2(struct bio *bio, int err)
1497 cluster_end_io(bio, err, 2, 1);
1500 static void cluster_endio_header_3(struct bio *bio, int err)
1502 cluster_end_io(bio, err, 3, 1);
1505 bio_end_io_t *lafs_cluster_endio_choose(int which, int header)
1510 return cluster_endio_header_0;
1512 return cluster_endio_header_1;
1515 return cluster_endio_header_2;
1517 return cluster_endio_header_3;
1521 return cluster_endio_data_0;
1523 return cluster_endio_data_1;
1526 return cluster_endio_data_2;
1528 return cluster_endio_data_3;
1531 int lafs_cluster_init(struct fs *fs, int cnum, u64 addr, u64 prev, u64 seq)
1533 struct wc *wc = &fs->wc[cnum];
1536 wc->slhead.b = NULL;
1537 INIT_LIST_HEAD(&wc->clhead);
1539 for (i = 0; i < 4 ; i++) {
1540 wc->pending_vfy_type[i] = VerifyNull;
1541 atomic_set(&wc->pending_cnt[i], 0);
1542 wc->page[i] = alloc_page(GFP_KERNEL);
1549 put_page(wc->page[i]);
1553 wc->pending_next = 0;
1554 wc->cluster_seq = seq;
1555 wc->prev_addr = prev;
1559 lafs_seg_setpos(fs, &wc->seg, addr);
1560 cluster_reset(fs, wc);
1562 spin_lock(&fs->lock);
1563 fs->free_blocks += wc->remaining+1;
1564 spin_unlock(&fs->lock);
1570 void lafs_flush(struct datablock *b)
1572 /* Need to write this block out and wait until
1573 * it has been written, so that we can update it
1574 * without risking corruption to previous snapshot.
1579 int lafs_cluster_empty(struct fs *fs, int cnum)
1581 return list_empty(&fs->wc[cnum].clhead);