3 * write-cluster management routines for LaFS
5 * Copyright (C) 2006-2009
6 * NeilBrown <neilb@suse.de>
7 * Released under the GPL, version 2
11 * Blocks to be written to a cluster need to be sorted so as to minimise
12 * fragmentation and to keep the cluster head small.
13 * So that we can track the amount of space needed in the cluster head,
14 * we really need to keep the list sorted rather than just sort it
16 * We do this using a linked list of blocks and an insertion sort.
17 * To improve the speed of insertion we keep a modify skiplist structure
19 * This is 'modified' in that it explicitly understands sequential
20 * runs within the list and doesn't keep extra skip-points within a run.
22 * The skip-points require memory allocation, which cannot be
23 * guaranteed here in the writeout path. To handle this we
24 * - preallocate some entries
25 * - accept a less-efficient list if memory isn't available
26 * - flush early if things get really bad.
28 * Skip point placement and runs.
29 * When an entry is added to the list, we place a skip-point over it
30 * based on the value of a random number, with a 1-in-4 chance of placement.
31 * The height of the skip point is also random - we keep rolling the dice,
32 * increasing height each time that we get 1-in-4.
33 * If the new entry is placed immediately after a skip point, and is
34 * consecutive with that skip point, the skip point is moved forward
35 * so that it will always be at the end of a run.
36 * When a new entry finds itself at the start or end of a run, a skip
40 /* A height of 8 with a fanout of 1-in-4 allows reasonable
41 * addressing for 2^16 entries, which is probably more than we need
45 #include <linux/crc32.h>
46 #include <linux/random.h>
47 #include <linux/slab.h>
49 static void cluster_flush(struct fs *fs, int cnum);
51 static void skip_discard(struct skippoint *sp)
54 struct skippoint *next = sp->next[0];
60 static int cmp_blk(struct block *a, struct block *b)
62 /* compare the filesys/inode/index/addr info
63 * of two block and return:
66 * -1 if consecutive in order
67 * -2 if in same inode and in order
68 * <=-3 if different inodes and correct order
71 if (a->inode != b->inode) {
72 struct inode *fsinoa = LAFSI(a->inode)->filesys;
73 struct inode *fsinob = LAFSI(b->inode)->filesys;
86 /* Same inode. If both data blocks they might be
89 if (!test_bit(B_Index, &a->flags) &&
90 !test_bit(B_Index, &b->flags)) {
91 if (a->fileaddr == b->fileaddr)
93 if (a->fileaddr + 1 == b->fileaddr)
95 if (a->fileaddr < b->fileaddr)
99 /* at least one is an index block. Sort them before data */
100 if (!test_bit(B_Index, &a->flags))
101 /* a is data, b is index, b first */
104 if (!test_bit(B_Index, &b->flags))
107 /* both are index blocks, use fileaddr or depth */
108 if (a->fileaddr < b->fileaddr)
110 if (a->fileaddr > b->fileaddr)
112 if (iblk(a)->depth < iblk(b)->depth)
114 /* equality really shouldn't be possible */
118 static struct block *skip_find(struct skippoint *head,
119 struct list_head *list,
120 struct block *target,
121 struct skippoint *where)
123 /* Find the last block that precedes 'target'
124 * and return it. If target will be the start,
126 * 'where' gets filled in with the last skip point
127 * at each level which is before-or-at the found
128 * block. This can be used for easy insertion.
132 struct block *b, *next;
133 for (level = SKIP_MAX_HEIGHT-1; level >= 0 ; level--) {
134 while (head->next[level] &&
135 cmp_blk(head->next[level]->b, target) < 0)
136 head = head->next[level];
137 where->next[level] = head;
139 if (head->b == NULL) {
140 if (list_empty(list) ||
141 cmp_blk(list_entry(list->next, struct block, lru),
143 /* This goes at the start */
145 b = list_entry(list->next, struct block, lru);
148 /* b is before target */
150 list_for_each_entry_continue(next, list, lru) {
151 if (cmp_blk(next, target) > 0)
155 /* b is the last element before target */
159 static int cluster_insert(struct skippoint *head,
160 struct list_head *list,
161 struct block *target,
164 /* Insert 'target' at an appropriate point in the
165 * list, creating a skippoint if appropriate.
168 * 1 if not in a run, but adjacent to something in same file
170 * However if the value we would return would be more than
171 * 'avail' abort the insert and return -1;
174 struct skippoint pos, *newpoint;
177 int cmpbefore, cmpafter;
178 unsigned long rand[DIV_ROUND_UP(SKIP_MAX_HEIGHT * 2 / 8 + 1,
179 sizeof(unsigned long))];
182 BUG_ON(!list_empty(&target->lru));
183 if (unlikely(list_empty(list))) {
185 /* handle this trivial case separately */
188 list_add(&target->lru, list);
190 for (height = 0; height < SKIP_MAX_HEIGHT; height++)
191 head->next[height] = NULL;
194 b = skip_find(head, list, target, &pos);
196 list_add(&target->lru, list);
199 list_add(&target->lru, &b->lru);
200 cmpbefore = cmp_blk(b, target);
202 if (target->lru.next == list) /* new last */
205 cmpafter = cmp_blk(target, list_entry(target->lru.next,
208 if (cmpbefore == -1) {
209 /* adjacent with previous. Possibly move skippoint */
210 if (pos.next[0]->b == b) {
211 pos.next[0]->b = target;
215 } else if (cmpafter == -1)
216 /* adjacent with next. No need to add a skippoint */
218 else if (cmpbefore == -2 || cmpafter == -2)
219 /* same inodes as next or previous */
226 list_del_init(&target->lru);
229 /* Now consider adding a skippoint here. We want 2 bits
230 * per level of random data
232 get_random_bytes(rand, sizeof(rand));
234 while (height < SKIP_MAX_HEIGHT &&
235 test_bit(height*2, rand) &&
236 test_bit(height*2+1, rand))
241 newpoint = kmalloc(sizeof(struct skippoint), GFP_NOFS);
244 newpoint->b = target;
245 for (level = 0; level < height-1; level++) {
246 newpoint->next[level] = pos.next[level];
247 pos.next[level] = newpoint;
252 /*-----------------------------------------------------------------------
253 * A segment is divided up in a slightly complicated way into
254 * tables, rows and columns. This allows us to align writes with
255 * stripes in a raid4 array or similar.
256 * So we have some support routines to help track our way around
260 static int seg_remainder(struct fs *fs, struct segpos *seg)
262 /* return the number of blocks from the start of segpos to the
263 * end of the segment.
264 * i.e. remaining rows in this table, plus remaining tables in
267 struct fs_dev *dv = &fs->devs[seg->dev];
268 int rows = dv->rows_per_table - seg->st_row;
270 BUG_ON(seg->dev < 0);
271 rows += dv->rows_per_table * (dv->tables_per_seg - seg->st_table - 1);
272 return rows * dv->width;
275 static void seg_step(struct fs *fs, struct segpos *seg)
277 /* reposition this segpos to be immediately after it's current end
278 * and make the 'current' point be the start.
281 seg->st_table = seg->nxt_table;
282 seg->st_row = seg->nxt_row;
283 seg->table = seg->st_table;
284 seg->row = seg->st_row;
288 u32 lafs_seg_setsize(struct fs *fs, struct segpos *seg, u32 size)
290 /* move the 'nxt' table/row to be 'size' blocks beyond
291 * current start. size will be rounded up to a multiple
294 struct fs_dev *dv = &fs->devs[seg->dev];
296 BUG_ON(seg->dev < 0);
297 size = (size + dv->width - 1) / dv->width;
298 rv = size * dv->width;
300 seg->nxt_table = seg->st_table + size / dv->rows_per_table;
301 seg->nxt_row = size % dv->rows_per_table;
305 void lafs_seg_setpos(struct fs *fs, struct segpos *seg, u64 addr)
307 /* set seg to start at the given virtual address, and be
309 * 'addr' should always be at the start of a row, though
310 * as it might be read off storage, we need to be
311 * a little bit careful.
316 virttoseg(fs, addr, &seg->dev, &seg->num, &offset);
317 dv = &fs->devs[seg->dev];
318 col = offset / dv->stride;
319 row = offset % dv->stride;
320 table = col / dv->width;
321 col = col % dv->width;
324 seg->st_table = table;
327 seg->table = seg->nxt_table = seg->st_table;
328 seg->row = seg->nxt_row = seg->st_row;
332 static u64 seg_addr(struct fs *fs, struct segpos *seg)
334 /* Return the virtual address of the blocks pointed
337 struct fs_dev *dv = &fs->devs[seg->dev];
341 /* Setting 'next' address for last cluster in
342 * a cleaner segment */
344 if (seg->table > seg->nxt_table ||
345 (seg->table == seg->nxt_table &&
346 seg->row > seg->nxt_row))
347 /* We have gone beyond the end of the cluster,
348 * must be bad data during roll-forward
351 addr = seg->col * dv->stride;
353 addr += seg->table * dv->rows_per_table;
354 addr += seg->num * dv->segment_stride;
359 u64 lafs_seg_next(struct fs *fs, struct segpos *seg)
361 /* step forward one block, returning the address of
362 * the block stepped over
363 * The write cluster can have three sections:
364 * - the tail end of one table
365 * - a number of complete tables
366 * - the head of one table
368 * For each table or partial table we want to talk a full
369 * column at a time, then go to the next column.
370 * So we step to the next row. If it is beyond the range of
371 * the current table, go to 'start' of next column, or to
374 struct fs_dev *dv = &fs->devs[seg->dev];
375 u64 addr = seg_addr(fs, seg);
378 /* Beyond end of cluster */
381 /* now step forward in column or table or seg */
383 if (seg->row >= dv->rows_per_table ||
384 (seg->table == seg->nxt_table
385 && seg->row >= seg->nxt_row)) {
387 if (seg->col >= dv->width) {
391 if (seg->table == seg->st_table)
392 seg->row = seg->st_row;
399 static void cluster_reset(struct fs *fs, struct wc *wc)
401 if (wc->seg.dev < 0) {
406 wc->remaining = seg_remainder(fs, &wc->seg);
407 wc->chead_blocks = 1;
409 wc->cluster_space = fs->blocksize - sizeof(struct cluster_head);
410 wc->chead = page_address(wc->page[wc->pending_next]);
411 wc->chead_size = sizeof(struct cluster_head);
414 static void close_segment(struct fs *fs, int cnum)
416 /* Release old segment */
417 /* FIXME I need lafs_seg_{de,}ref for every snapshot,
420 struct wc *wc = fs->wc + cnum;
421 if (wc->seg.dev >= 0) {
423 spin_lock(&fs->lock);
424 fs->clean_reserved -= seg_remainder(fs, &wc->seg);
425 spin_unlock(&fs->lock);
427 lafs_seg_forget(fs, wc->seg.dev, wc->seg.num);
432 void lafs_close_all_segments(struct fs *fs)
435 for (cnum = 0; cnum < WC_NUM; cnum++)
436 close_segment(fs, cnum);
439 static int new_segment(struct fs *fs, int cnum)
441 /* new_segment can fail if cnum > 0 and there is no
444 struct wc *wc = fs->wc + cnum;
448 close_segment(fs, cnum);
450 if (cnum && fs->clean_reserved < fs->max_segment) {
451 /* we have reached the end of the last cleaner
452 * segment. The remainder of clean_reserved
453 * is of no value (if there even is any)
455 if (fs->clean_reserved) {
456 spin_lock(&fs->alloc_lock);
457 fs->free_blocks += fs->clean_reserved;
458 fs->clean_reserved = 0;
459 spin_unlock(&fs->alloc_lock);
463 /* This gets a reference on the 'segsum' */
464 lafs_free_get(fs, &dev, &seg, 0);
465 lafs_seg_setpos(fs, &wc->seg, segtovirt(fs, dev, seg));
466 BUG_ON(wc->seg.dev != dev);
467 BUG_ON(wc->seg.num != seg);
470 /* wc mutex protects us here */
476 /*-------------------------------------------------------------------------
477 * lafs_cluster_allocate
478 * This adds a block to the writecluster list and possibly triggers a
480 * The flush will assign a physical address to each block and schedule
481 * the actual write out.
483 * There can be multiple active write clusters. e.g one for new data,
484 * one for cleaned data, one for defrag writes.
486 * It is here that the in-core inode is copied to the data block, if
487 * appropriate, and here that small files get copied into the inode
488 * thus avoiding writing a separate block for the file content.
490 * Roll-forward sees any data block as automatically extending the
491 * size of the file. This requires that we write blocks in block-order
492 * as much as possible (which we would anyway) and that we inform
493 * roll-forward when a file ends mid-block.
495 * A block should be iolocked when being passed to lafs_cluster_allocate.
496 * When the IO is complete, it will be unlocked.
498 static int flush_data_to_inode(struct block *b)
500 /* This is the first and only datablock in a file
501 * and it will fit in the inode. So put it there.
505 loff_t size = i_size_read(b->inode);
506 struct lafs_inode *lai = LAFSI(b->inode);
507 struct datablock *db;
508 struct fs *fs = fs_from_inode(b->inode);
510 lafs_iolock_block(&lai->iblock->b);
511 dprintk("FLUSHING single block into inode - s=%d %s\n",
512 (int)size, strblk(b));
513 if (lai->iblock->depth > 1)
514 /* Too much indexing still - truncate must be happening */
516 if (lai->iblock->depth == 1) {
517 /* If there is nothing in this block we can still use it */
518 if (lai->iblock->children.next != &b->siblings ||
519 b->siblings.next != &lai->iblock->children ||
520 lafs_leaf_next(lai->iblock, 1) != 0xffffffff)
523 /* Need checkpoint lock to pin the dblock */
524 lafs_checkpoint_lock(fs);
525 if (test_and_clear_bit(B_Dirty, &b->flags)) {
527 /* Check size again, just in case it changed */
528 size = i_size_read(b->inode);
529 if (size + lai->metadata_size > fs->blocksize) {
530 /* Lost a race, better give up restoring Dirty bit */
531 if (test_and_set_bit(B_Dirty, &b->flags))
532 if (test_and_set_bit(B_Credit, &b->flags))
533 lafs_space_return(fs, 1);
534 lafs_checkpoint_unlock(fs);
537 if (!test_and_set_bit(B_Credit, &lai->dblock->b.flags))
539 if (test_and_clear_bit(B_UnincCredit, &b->flags))
541 if (!test_and_set_bit(B_ICredit, &lai->dblock->b.flags))
543 lafs_space_return(fs, credits);
544 LAFS_BUG(!test_bit(B_SegRef, &lai->dblock->b.flags), &lai->dblock->b);
545 set_bit(B_PinPending, &lai->dblock->b.flags);
546 if (test_bit(B_Pinned, &b->flags))
547 lafs_pin_block_ph(&lai->dblock->b, !!test_bit(B_Phase1, &b->flags));
549 lafs_pin_block(&lai->dblock->b);
550 lafs_dirty_dblock(lai->dblock);
551 } else if (test_and_clear_bit(B_Realloc, &b->flags)) {
553 LAFS_BUG(!test_bit(B_Valid, &lai->iblock->b.flags),
555 if (test_and_clear_bit(B_UnincCredit, &b->flags))
557 if (!test_and_set_bit(B_Realloc, &lai->iblock->b.flags))
559 if (!test_and_set_bit(B_UnincCredit, &lai->iblock->b.flags))
561 LAFS_BUG(credits < 0, b);
562 lafs_space_return(fs, credits);
564 printk("Wasn't dirty or realloc: %s\n", strblk(b));
567 lafs_checkpoint_unlock(fs);
569 lai->iblock->depth = 0;
570 clear_bit(B_Valid, &lai->iblock->b.flags);
572 /* safe to reference ->dblock as b is a dirty child */
573 db = getdref(lai->dblock, MKREF(flush2db));
574 ibuf = map_dblock(db);
575 dbuf = map_dblock_2(dblk(b));
577 if (test_and_clear_bit(B_UnincCredit, &b->flags))
578 lafs_space_return(fs, 1);
580 /* It is an awkward time to call lafs_inode_fillblock,
581 * so do this one little change manually
583 ((struct la_inode *)ibuf)->depth = 0;
584 memcpy(ibuf + lai->metadata_size,
586 memset(ibuf + lai->metadata_size + size,
588 fs->blocksize - lai->metadata_size - size);
589 unmap_dblock_2(db, ibuf);
590 unmap_dblock(dblk(b), dbuf);
591 lafs_iounlock_block(&lai->iblock->b);
593 putdref(db, MKREF(flush2db));
598 lafs_iounlock_block(&lai->iblock->b);
602 int lafs_cluster_allocate(struct block *b, int cnum)
604 struct fs *fs = fs_from_inode(b->inode);
605 struct wc *wc = &fs->wc[cnum];
606 struct lafs_inode *lai;
611 LAFS_BUG(test_bit(B_Index, &b->flags) &&
612 iblk(b)->uninc_table.pending_cnt > 0, b);
614 LAFS_BUG(!test_bit(B_IOLock, &b->flags), b);
615 lai = LAFSI(b->inode);
617 LAFS_BUG(!test_bit(B_Valid, &b->flags), b);
618 LAFS_BUG(!fs->rolled, b);
619 LAFS_BUG(lai->flags & File_nonlogged &&
620 !test_bit(B_Index, &b->flags), b);
622 /* We cannot change the physaddr of an uniblock in a
623 * different phase to the parent, else the parent will
624 * get the wrong address.
626 LAFS_BUG(test_bit(B_Index, &b->flags) &&
627 test_bit(B_Uninc, &b->flags) &&
628 !!test_bit(B_Phase1, &b->flags) !=
629 !!test_bit(B_Phase1, &b->parent->b.flags), b);
631 size = i_size_read(b->inode);
632 if (!test_bit(B_Index, &b->flags) && /* it is a datablock */
634 b->parent == lai->iblock && /* No indexing */
635 lai->type >= TypeBase && /* 'size' is meaningful */
636 size + lai->metadata_size <= fs->blocksize) {
637 int success = flush_data_to_inode(b);
639 lafs_summary_update(fs, b->inode, b->physaddr, 0,
640 0, !!test_bit(B_Phase1, &b->flags),
641 test_bit(B_SegRef, &b->flags));
643 lafs_iounlock_block(b);
646 /* There are conflicting blocks in the index tree */
647 if (test_and_clear_bit(B_Realloc, &b->flags))
648 lafs_space_return(fs, 1);
649 if (!test_bit(B_Dirty, &b->flags) ||
650 !test_bit(B_Pinned, &b->flags)) {
651 /* It is safe to just leave this for later */
652 lafs_iounlock_block(b);
655 /* We really need to write this, so use a full block. */
656 printk("WARNING %s\n", strblk(b));
660 /* The symbiosis between the datablock and the indexblock for an
661 * inode needs to be dealt with here.
662 * The data block will not normally be written until the InoIdx
663 * block is unpinned or phase-flipped. However if it is, as the
664 * result of some 'sync' style operation, then we write it even
665 * though the index info might not be uptodate. The rest of the
666 * content will be safe for roll-forward to pick up, and the rest
667 * will be written when the InoIdx block is ready.
669 * When the InoIdx block is ready, we don't want to write it as
670 * there is nowhere for it to be incorporated in to. Rather we
671 * move the pinning and credits across to the Data block
672 * which will subsequently be written.
675 if (test_bit(B_InoIdx, &b->flags)) {
676 struct block *b2 = &lai->dblock->b;
679 LAFS_BUG(!test_bit(B_Pinned, &b->flags), b);
680 if (!test_bit(B_Pinned, &b2->flags)) {
681 /* index block pinned, data block not,
682 * carry the pinning across
684 if (b2->parent == NULL &&
687 getiref(b->parent, MKREF(child));
688 LAFS_BUG(b->parent != b2->parent, b);
689 set_bit(B_PinPending, &b2->flags); // FIXME is this right?
690 set_phase(b2, test_bit(B_Phase1,
696 if (test_and_clear_bit(B_Realloc, &b->flags))
698 LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
700 if (test_and_clear_bit(B_Dirty, &b->flags))
702 if (test_and_clear_bit(B_Realloc, &b->flags))
705 if (test_and_clear_bit(B_UnincCredit, &b->flags))
708 /* We always erase the InoIdx block before the
709 * inode data block, so this cannot happen
711 LAFS_BUG(!test_bit(B_Valid, &b2->flags), b2);
714 if (!test_and_set_bit(B_UnincCredit, &b2->flags))
715 if (!test_and_clear_bit(B_ICredit, &b2->flags))
718 if (!test_bit(B_Dirty, &b2->flags))
719 /* need some credits... */
720 if (!test_and_set_bit(B_Credit, &b2->flags))
722 lafs_dirty_dblock(dblk(b2));
724 if (!test_and_set_bit(B_UnincCredit, &b2->flags))
727 if (!test_and_set_bit(B_Realloc, &b2->flags))
730 LAFS_BUG(credits < 0, b2);
731 lafs_space_return(fs, credits);
732 /* make sure 'dirty' status is registered */
734 lafs_iounlock_block(b);
738 if (!test_bit(B_Dirty, &b->flags) &&
739 !test_bit(B_Realloc, &b->flags)) {
740 /* block just got truncated, so don't write it. */
741 lafs_iounlock_block(b);
745 if (test_bit(B_EmptyIndex, &b->flags)) {
747 if (test_and_set_bit(B_Writeback, &b->flags))
749 lafs_iounlock_block(b);
750 lafs_allocated_block(fs, b, 0);
752 if (test_and_clear_bit(B_Realloc, &b->flags))
754 LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
756 if (test_and_clear_bit(B_Dirty, &b->flags))
758 if (test_and_clear_bit(B_Realloc, &b->flags))
761 lafs_writeback_done(b);
762 lafs_space_return(fs, credits);
766 if (!test_bit(B_Index, &b->flags)) {
767 struct inode *myi = rcu_my_inode(dblk(b));
768 if (myi && test_and_clear_bit(I_Dirty, &LAFSI(myi)->iflags))
769 lafs_inode_fillblock(myi);
773 mutex_lock(&wc->lock);
775 /* The following check must be inside the mutex_lock
776 * to ensure exclusion between checkpoint and writepage.
777 * checkpoint must never see the block not on the
778 * leaf lru unless it is already in the cluster and so can
781 if (!list_empty_careful(&b->lru)) {
782 /* Someone is flushing this block before
783 * checkpoint got to it - probably writepage.
784 * Must remove it from the leaf lru so it can go
785 * on the cluster list.
787 LAFS_BUG(test_bit(B_OnFree, &b->flags), b);
788 spin_lock(&fs->lock);
789 if (!list_empty(&b->lru))
790 list_del_init(&b->lru);
791 spin_unlock(&fs->lock);
794 if (test_and_set_bit(B_Writeback, &b->flags))
796 lafs_iounlock_block(b);
798 getref(b, MKREF(cluster)); /* we soon own a reference in the list */
802 /* see how much room is in cluster.
803 * The interesting values are:
804 * none, one descriptor, one group_head and one descriptor
805 * These are assigned values 0, 1, 2.
807 if (wc->cluster_space >= (sizeof(struct group_head) +
808 sizeof(struct descriptor)) ||
809 (wc->remaining > 2 &&
810 wc->chead_blocks < MAX_CHEAD_BLOCKS &&
811 (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE)
814 else if (wc->cluster_space >= sizeof(struct descriptor))
818 if (wc->seg.dev < 0 || wc->remaining < 1)
821 used = cluster_insert(&wc->slhead, &wc->clhead, b, avail);
825 else if (wc->slhead.b)
826 cluster_flush(fs, cnum);
828 err = new_segment(fs, cnum);
830 /* No cleaner segments - this will have to go
831 * out with a checkpoint.
832 * Block is still 'dirty' - just clear
835 lafs_writeback_done(b);
836 putref(b, MKREF(cluster));
837 mutex_unlock(&wc->lock);
840 cluster_reset(fs, wc);
845 wc->cluster_space -= sizeof(struct descriptor);
847 wc->cluster_space -= sizeof(struct group_head);
848 if (wc->cluster_space < 0) {
849 /* need a new page */
852 wc->cluster_space += fs->blocksize;
855 BUG_ON(wc->remaining < 0);
856 if (wc->remaining == 0)
857 cluster_flush(fs, cnum);
858 mutex_unlock(&wc->lock);
862 /*-------------------------------------------------------------------------
863 * Cluster head building.
864 * We build the cluster head bit by bit as we find blocks
865 * in the list. These routines help.
868 static void cluster_addhead(struct wc *wc, struct inode *ino,
869 struct group_head **headstart)
871 struct group_head *gh = (struct group_head *)((char *)wc->chead +
875 dprintk("CLUSTER addhead %d\n", wc->chead_size);
878 gh->inum = cpu_to_le32(ino->i_ino);
879 gh->fsnum = cpu_to_le32(LAFSI(ino)->filesys->i_ino);
880 if (LAFSI(ino)->type >= TypeBase) {
881 tstamp = encode_time(&ino->i_mtime);
882 if (tstamp != encode_time(&ino->i_ctime))
886 gh->timestamp = cpu_to_le64(tstamp);
888 tnf = ((ino->i_generation<<8) | (LAFSI(ino)->trunc_gen & 0xff))
892 gh->truncatenum_and_flag = cpu_to_le16(tnf);
893 wc->chead_size += sizeof(struct group_head);
896 static void cluster_closehead(struct wc *wc,
897 struct group_head *headstart)
899 int size = wc->chead_size - (((char *)headstart) - (char *)wc->chead);
901 dprintk("CLUSTER closehead %d %d\n", wc->chead_size, size);
902 headstart->group_size_words = size / 4;
905 static void cluster_addmini(struct wc *wc, u32 addr, int offset,
906 int size, const char *data,
907 int size2, const char *data2)
909 /* if size2 !=0, then only
910 * (size-size2) is at 'data' and the rest is at 'data2'
912 struct miniblock *mb = ((struct miniblock *)
913 ((char *)wc->chead + wc->chead_size));
915 dprintk("CLUSTER addmini %d %d\n", wc->chead_size, size);
917 mb->block_num = cpu_to_le32(addr);
918 mb->block_offset = cpu_to_le16(offset);
919 mb->length = cpu_to_le16(size + DescMiniOffset);
920 wc->chead_size += sizeof(struct miniblock);
921 memcpy(mb->data, data, size-size2);
923 memcpy(mb->data + size-size2, data2, size2);
924 memset(mb->data+size, 0, (-size)&3);
925 wc->chead_size += ROUND_UP(size);
928 static void cluster_adddesc(struct wc *wc, struct block *blk,
929 struct descriptor **desc_start)
931 struct descriptor *dh = (struct descriptor *)((char *)wc->chead +
934 dprintk("CLUSTER add_desc %d\n", wc->chead_size);
935 dh->block_num = cpu_to_le32(blk->fileaddr);
936 dh->block_cnt = cpu_to_le32(0);
938 if (test_bit(B_Index, &blk->flags))
939 dh->block_bytes = DescIndex;
940 wc->chead_size += sizeof(struct descriptor);
943 static void cluster_incdesc(struct wc *wc, struct descriptor *desc_start,
944 struct block *b, int blkbits)
946 desc_start->block_cnt =
947 cpu_to_le32(le32_to_cpu(desc_start->block_cnt)+1);
948 if (!test_bit(B_Index, &b->flags)) {
949 if (LAFSI(b->inode)->type >= TypeBase) {
950 u64 size = b->inode->i_size;
951 if (size > ((loff_t)b->fileaddr << blkbits) &&
952 size <= ((loff_t)(b->fileaddr + 1) << blkbits))
953 desc_start->block_bytes =
954 cpu_to_le32(size & ((1<<blkbits)-1));
956 desc_start->block_bytes =
957 cpu_to_le32(1 << blkbits);
959 desc_start->block_bytes = cpu_to_le32(1<<blkbits);
963 /*------------------------------------------------------------------------
965 * Updates are added to the cluster head immediately, causing a flush
966 * if there is insufficient room.
967 * This means that miniblocks always occur before block descriptors.
968 * miniblocks are only ever written to cluster-series 0.
972 lafs_cluster_update_prepare(struct update_handle *uh, struct fs *fs,
975 /* make there there is room for a 'len' update in upcoming
976 * cluster. Do nothing here.
984 lafs_cluster_update_pin(struct update_handle *uh)
986 /* last chance to bail out */
987 if (lafs_space_alloc(uh->fs, 1, ReleaseSpace)) {
989 uh->fs->cluster_updates++;
995 static unsigned long long
996 lafs_cluster_update_commit_both(struct update_handle *uh, struct fs *fs,
997 struct inode *ino, u32 addr,
998 int offset, int len, struct datablock *b,
1000 int len2, const char *str2)
1002 /* There is the data - alternate layout */
1003 /* flush 'size' bytes of data, as 'offset' of bnum in ino
1004 * in the next write cluster
1006 struct wc *wc = &fs->wc[0];
1007 struct group_head *head_start;
1008 unsigned long long seq;
1009 char *mapping = NULL;
1011 BUG_ON(!fs->rolled);
1013 mutex_lock(&wc->lock);
1014 while (wc->cluster_space < (sizeof(struct group_head)+
1015 sizeof(struct descriptor) +
1017 if (wc->remaining > 0 && wc->chead_blocks < MAX_CHEAD_BLOCKS &&
1018 (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE) {
1021 wc->cluster_space += fs->blocksize;
1023 cluster_flush(fs, 0);
1026 fs->cluster_updates -= uh->reserved;
1027 lafs_space_return(fs, uh->reserved);
1030 cluster_addhead(wc, ino, &head_start);
1032 mapping = map_dblock(b);
1033 str1 = mapping + offset;
1035 cluster_addmini(wc, addr, offset, len, str1, len2, str2);
1037 unmap_dblock(b, mapping);
1038 cluster_closehead(wc, head_start);
1039 wc->cluster_space -= (sizeof(struct group_head)+
1040 sizeof(struct descriptor) +
1042 seq = wc->cluster_seq;
1043 mutex_unlock(&wc->lock);
1048 lafs_cluster_update_commit(struct update_handle *uh,
1049 struct datablock *b,
1050 int offset, int len)
1052 /* OK here is the data to put in the next cluster. */
1053 struct fs *fs = fs_from_inode(b->b.inode);
1054 unsigned long long seq;
1056 seq = lafs_cluster_update_commit_both(uh, fs, b->b.inode, b->b.fileaddr,
1057 offset, len, b, NULL, 0, NULL);
1063 lafs_cluster_update_commit_buf(struct update_handle *uh, struct fs *fs,
1064 struct inode *ino, u32 addr,
1065 int offset, int len, const char *str1,
1066 int len2, const char *str2)
1068 return lafs_cluster_update_commit_both(uh, fs,
1076 lafs_cluster_update_abort(struct update_handle *uh)
1078 /* Didn't want that cluster_update after all */
1079 uh->fs->cluster_updates -= uh->reserved;
1080 lafs_space_return(uh->fs, uh->reserved);
1083 int lafs_calc_cluster_csum(struct cluster_head *head)
1085 unsigned int oldcsum = head->checksum;
1089 csum = crc32_le(0, (unsigned char *)head, le16_to_cpu(head->Hlength));
1090 head->checksum = oldcsum;
1094 /*------------------------------------------------------------------------
1096 * Once we have a suitable number of blocks and updates in a writecluster
1097 * we need to write them out.
1098 * We first iterate over the block list building the clusterhead.
1099 * Then we finalise and write the clusterhead and iterate again
1100 * writing out each block.
1101 * We keep a biassed blockcount in the wc, and decrement it on write-completion.
1102 * Once the blockcount hits the bias the cluster is written. Then when
1103 * the next (or next-plus-one) cluster head is written we remove the bias
1104 * and the data is deemed to be safe.
1106 * Update blocks will already have been included in the write cluster.
1108 * We reinitialise the write cluster once we are finished.
1111 static void cluster_done(struct fs *fs, struct wc *wc)
1113 /* This is called when one of the pending_cnts hit 0, indicating
1114 * that a write-cluster has been written and committed to storage.
1115 * We find all the pending blocks on a completed cluster and
1119 struct block *b, *tmp;
1120 struct list_head tofree;
1122 INIT_LIST_HEAD(&tofree);
1124 spin_lock(&fs->lock);
1125 for (i = 0; i < 4 ; i++)
1126 if (atomic_read(&wc->pending_cnt[i]) == 0)
1127 list_splice_init(&wc->pending_blocks[i], &tofree);
1128 spin_unlock(&fs->lock);
1130 list_for_each_entry_safe(b, tmp, &tofree, lru) {
1131 list_del_init(&b->lru);
1132 lafs_writeback_done(b);
1133 putref(b, MKREF(cluster));
1137 void lafs_clusters_done(struct fs *fs)
1140 for (i = 0 ; i < WC_NUM; i++)
1141 cluster_done(fs, &fs->wc[i]);
1144 void lafs_done_work(struct work_struct *ws)
1146 struct fs *fs = container_of(ws, struct fs, done_work);
1147 lafs_clusters_done(fs);
1150 static void cluster_flush(struct fs *fs, int cnum)
1152 struct wc *wc = &fs->wc[cnum];
1155 struct inode *current_inode = NULL;
1156 u64 current_block = NoBlock;
1157 u64 head_addr[MAX_CHEAD_BLOCKS];
1158 struct descriptor *desc_start = NULL;
1159 struct group_head *head_start = NULL;
1160 struct segpos segend;
1166 if (!test_bit(CheckpointOpen, &fs->fsstate)) {
1167 /* First cluster since a checkpoint, make sure
1168 * we do another checkpoint within 30 seconds
1170 fs->next_checkpoint = jiffies + 30 * HZ;
1171 set_bit(CheckpointOpen, &fs->fsstate);
1172 lafs_wake_thread(fs);
1174 /* set the writecluster size as this will guide layout in
1178 if (test_and_clear_bit(FlushNeeded, &fs->fsstate))
1179 set_bit(SecondFlushNeeded, &fs->fsstate);
1181 clear_bit(SecondFlushNeeded, &fs->fsstate);
1183 cluster_size = lafs_seg_setsize(fs, &wc->seg,
1184 seg_remainder(fs, &wc->seg)
1187 /* find, and step over, address header block(s) */
1188 for (i = 0; i < wc->chead_blocks ; i++)
1189 head_addr[i] = lafs_seg_next(fs, &wc->seg);
1191 list_for_each_entry(b, &wc->clhead, lru) {
1193 if (b->inode != current_inode) {
1194 /* need to create a new group_head */
1197 cluster_closehead(wc, head_start);
1198 cluster_addhead(wc, b->inode, &head_start);
1199 current_inode = b->inode;
1200 current_block = NoBlock;
1202 if (desc_start == NULL || b->fileaddr != current_block+1 ||
1203 test_bit(B_Index, &b->flags)) {
1204 cluster_adddesc(wc, b, &desc_start);
1205 current_block = b->fileaddr;
1208 cluster_incdesc(wc, desc_start, b, fs->blocksize_bits);
1209 addr = lafs_seg_next(fs, &wc->seg);
1210 BUG_ON(!addr || !(addr+1));
1211 if (cnum && test_bit(B_Dirty, &b->flags))
1212 /* We are cleaning but this block is now dirty.
1213 * Don't waste the UnincCredit on recording the
1214 * clean location as it will be written as
1215 * dirty soon. Just pin it to the current phase
1216 * to ensure that it really does get written.
1220 lafs_allocated_block(fs, b, addr);
1224 cluster_closehead(wc, head_start);
1225 segend = wc->seg; /* We may write zeros from here */
1226 seg_step(fs, &wc->seg);
1227 wc->remaining = seg_remainder(fs, &wc->seg);
1228 /* Need to make sure our ->next_addr gets set properly
1229 * for non-cleaning segments
1231 if (wc->remaining < 2) {
1233 new_segment(fs, cnum);
1235 close_segment(fs, cnum);
1238 /* Fill in the cluster header */
1239 strncpy(wc->chead->idtag, "LaFSHead", 8);
1240 wc->chead->flags = cpu_to_le32(0);
1242 /* checkpoints only happen in cnum==0 */
1243 if (cnum == 0 && fs->checkpointing) {
1244 spin_lock(&fs->lock);
1245 wc->chead->flags = cpu_to_le32(fs->checkpointing);
1246 if (fs->checkpointing & CH_CheckpointEnd) {
1247 fs->checkpointing = 0;
1248 clear_bit(CheckpointOpen, &fs->fsstate);
1249 } else if (fs->checkpointing & CH_CheckpointStart) {
1250 fs->checkpointcluster = head_addr[0];
1251 fs->checkpointing &= ~CH_CheckpointStart;
1253 spin_unlock(&fs->lock);
1254 if (!fs->checkpointing)
1255 wake_up(&fs->phase_wait);
1258 memcpy(wc->chead->uuid, fs->state->uuid, 16);
1259 wc->chead->seq = cpu_to_le64(wc->cluster_seq);
1261 wc->chead->Hlength = cpu_to_le16(wc->chead_size);
1262 wc->chead->Clength = cpu_to_le16(cluster_size);
1263 spin_lock(&fs->lock);
1265 fs->clean_reserved -= cluster_size;
1267 fs->free_blocks -= cluster_size;
1268 spin_unlock(&fs->lock);
1269 /* FIXME if this is just a header, no data blocks,
1270 * then use VerifyNull. Alternately if there is
1271 * no-one waiting for a sync to complete (how do we
1272 * track that?) use VerifyNext2
1277 wc->pending_vfy_type[wc->pending_next] = vtype;
1278 wc->chead->verify_type = cpu_to_le16(vtype);
1279 memset(wc->chead->verify_data, 0, 16);
1281 wc->chead->next_addr = cpu_to_le64(seg_addr(fs, &wc->seg));
1282 wc->chead->prev_addr = cpu_to_le64(wc->prev_addr);
1283 wc->prev_addr = head_addr[0];
1284 wc->chead->this_addr = cpu_to_le64(wc->prev_addr);
1286 wc->chead->checksum = lafs_calc_cluster_csum(wc->chead);
1288 /* We cannot write the header until any previous cluster
1289 * which uses the header as a commit block has been
1292 which = (wc->pending_next+3)%4;
1293 dprintk("AA which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1294 atomic_read(&wc->pending_cnt[which]));
1295 if (wc->pending_vfy_type[which] == VerifyNext)
1296 wait_event(wc->pending_wait,
1297 atomic_read(&wc->pending_cnt[which]) == 1);
1298 which = (which+3) % 4;
1299 dprintk("AB which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1300 atomic_read(&wc->pending_cnt[which]));
1301 if (wc->pending_vfy_type[which] == VerifyNext2)
1302 wait_event(wc->pending_wait,
1303 atomic_read(&wc->pending_cnt[which]) == 1);
1305 lafs_clusters_done(fs);
1306 dprintk("cluster_flush pre-bug pending_next=%d cnt=%d\n",
1307 wc->pending_next, atomic_read(&wc->pending_cnt
1308 [wc->pending_next]));
1309 BUG_ON(atomic_read(&wc->pending_cnt[wc->pending_next]) != 0);
1310 BUG_ON(!list_empty(&wc->pending_blocks[wc->pending_next]));
1312 /* initialise pending count to 1. This will be decremented
1313 * once all related blocks have been submitted for write.
1314 * This includes the blocks in the next 0, 1, or 2
1317 atomic_inc(&wc->pending_cnt[wc->pending_next]);
1319 /* Now write it all out.
1320 * Later we should possibly re-order the writes
1321 * for raid4 stripe-at-a-time
1323 for (i = 0; i < wc->chead_blocks; i++)
1325 page_address(wc->page[wc->pending_next])
1326 + i * fs->blocksize,
1329 while (!list_empty(&wc->clhead)) {
1331 b = list_entry(wc->clhead.next, struct block, lru);
1332 dprintk("flushing %s\n", strblk(b));
1334 if (test_and_clear_bit(B_Realloc, &b->flags))
1337 if (test_and_clear_bit(B_Dirty, &b->flags))
1341 WARN_ON(credits == 0);
1342 lafs_space_return(fs, credits);
1343 spin_lock(&fs->lock);
1345 list_add(&b->lru, &wc->pending_blocks[(wc->pending_next+3)%4]);
1346 spin_unlock(&fs->lock);
1347 lafs_write_block(fs, b, wc);
1350 skip_discard(wc->slhead.next[0]);
1351 /* FIXME write out zeros to pad raid4 if needed */
1353 /* Having submitted this request we need to remove the bias
1354 * from pending_cnt of previous clusters
1356 which = wc->pending_next;
1358 dprintk("A which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1359 atomic_read(&wc->pending_cnt[which]));
1360 if (wc->pending_vfy_type[which] == VerifyNull)
1361 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1363 which = (which+3) % 4;
1364 dprintk("B which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1365 atomic_read(&wc->pending_cnt[which]));
1366 if (wc->pending_vfy_type[which] == VerifyNext)
1367 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1369 which = (which+3) % 4;
1370 dprintk("C which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1371 atomic_read(&wc->pending_cnt[which]));
1372 if (wc->pending_vfy_type[which] == VerifyNext2)
1373 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1376 cluster_done(fs, wc);
1377 wake_up(&wc->pending_wait);
1379 dprintk("D %d\n", wake);
1381 lafs_write_flush(fs, wc);
1383 wc->pending_next = (wc->pending_next+1) % 4;
1384 /* now re-initialise the cluster information */
1385 cluster_reset(fs, wc);
1387 wait_event(wc->pending_wait,
1388 atomic_read(&wc->pending_cnt[wc->pending_next]) == 0);
1389 dprintk("cluster_flush end pending_next=%d cnt=%d\n",
1390 wc->pending_next, atomic_read(&wc->pending_cnt
1391 [wc->pending_next]));
1394 void lafs_cluster_flush(struct fs *fs, int cnum)
1396 struct wc *wc = &fs->wc[cnum];
1397 dprintk("LAFS_cluster_flush %d\n", cnum);
1398 mutex_lock(&wc->lock);
1399 if (!list_empty(&wc->clhead)
1400 || wc->chead_size > sizeof(struct cluster_head)
1402 ((fs->checkpointing & CH_CheckpointEnd)
1403 || test_bit(FlushNeeded, &fs->fsstate)
1404 || test_bit(SecondFlushNeeded, &fs->fsstate)
1406 cluster_flush(fs, cnum);
1407 mutex_unlock(&wc->lock);
1410 void lafs_cluster_wait_all(struct fs *fs)
1413 for (i = 0; i < WC_NUM; i++) {
1414 struct wc *wc = &fs->wc[i];
1416 for (j = 0; j < 4; j++) {
1417 wait_event(wc->pending_wait,
1418 atomic_read(&wc->pending_cnt[j]) <= 1);
1423 /* The end_io function for writes to a cluster is one
1424 * of 4 depending on which in the circular list the
1427 static void cluster_end_io(struct bio *bio, int err,
1428 int which, int header)
1430 /* FIXME how to handle errors? */
1431 struct wc *wc = bio->bi_private;
1432 struct fs *fs = container_of(wc, struct fs, wc[wc->cnum]);
1436 dprintk("end_io err=%d which=%d header=%d\n",
1437 err, which, header);
1439 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1441 if (atomic_read(&wc->pending_cnt[which]) == 1)
1443 which = (which+3) % 4;
1445 (wc->pending_vfy_type[which] == VerifyNext ||
1446 wc->pending_vfy_type[which] == VerifyNext2) &&
1447 atomic_dec_and_test(&wc->pending_cnt[which]))
1449 if (atomic_read(&wc->pending_cnt[which]) == 1)
1452 which = (which+3) % 4;
1454 wc->pending_vfy_type[which] == VerifyNext2 &&
1455 atomic_dec_and_test(&wc->pending_cnt[which]))
1457 if (atomic_read(&wc->pending_cnt[which]) == 1)
1461 schedule_work(&fs->done_work);
1463 wake_up(&wc->pending_wait);
1464 if (test_bit(FlushNeeded, &fs->fsstate) ||
1465 test_bit(SecondFlushNeeded, &fs->fsstate))
1466 lafs_wake_thread(fs);
1470 static void cluster_endio_data_0(struct bio *bio, int err)
1472 cluster_end_io(bio, err, 0, 0);
1475 static void cluster_endio_data_1(struct bio *bio, int err)
1477 cluster_end_io(bio, err, 1, 0);
1480 static void cluster_endio_data_2(struct bio *bio, int err)
1482 cluster_end_io(bio, err, 2, 0);
1485 static void cluster_endio_data_3(struct bio *bio, int err)
1487 cluster_end_io(bio, err, 3, 0);
1490 static void cluster_endio_header_0(struct bio *bio, int err)
1492 cluster_end_io(bio, err, 0, 1);
1495 static void cluster_endio_header_1(struct bio *bio, int err)
1497 cluster_end_io(bio, err, 1, 1);
1500 static void cluster_endio_header_2(struct bio *bio, int err)
1502 cluster_end_io(bio, err, 2, 1);
1505 static void cluster_endio_header_3(struct bio *bio, int err)
1507 cluster_end_io(bio, err, 3, 1);
1510 bio_end_io_t *lafs_cluster_endio_choose(int which, int header)
1515 return cluster_endio_header_0;
1517 return cluster_endio_header_1;
1520 return cluster_endio_header_2;
1522 return cluster_endio_header_3;
1526 return cluster_endio_data_0;
1528 return cluster_endio_data_1;
1531 return cluster_endio_data_2;
1533 return cluster_endio_data_3;
1536 int lafs_cluster_init(struct fs *fs, int cnum, u64 addr, u64 prev, u64 seq)
1538 struct wc *wc = &fs->wc[cnum];
1541 wc->slhead.b = NULL;
1542 INIT_LIST_HEAD(&wc->clhead);
1544 for (i = 0; i < 4 ; i++) {
1545 wc->pending_vfy_type[i] = VerifyNull;
1546 atomic_set(&wc->pending_cnt[i], 0);
1547 wc->page[i] = alloc_page(GFP_KERNEL);
1554 put_page(wc->page[i]);
1558 wc->pending_next = 0;
1559 wc->cluster_seq = seq;
1560 wc->prev_addr = prev;
1564 lafs_seg_setpos(fs, &wc->seg, addr);
1565 cluster_reset(fs, wc);
1567 spin_lock(&fs->lock);
1568 fs->free_blocks += wc->remaining+1;
1569 spin_unlock(&fs->lock);
1575 void lafs_flush(struct datablock *b)
1577 /* Need to write this block out and wait until
1578 * it has been written, so that we can update it
1579 * without risking corruption to previous snapshot.
1584 int lafs_cluster_empty(struct fs *fs, int cnum)
1586 return list_empty(&fs->wc[cnum].clhead);