3 * write-cluster management routines for LaFS
5 * Copyright (C) 2006-2009
6 * NeilBrown <neilb@suse.de>
7 * Released under the GPL, version 2
11 * Blocks to be written to a cluster need to be sorted so as to minimise
12 * fragmentation and to keep the cluster head small.
13 * So that we can track the amount of space needed in the cluster head,
14 * we really need to keep the list sorted rather than just sort it
16 * We do this using a linked list of blocks and an insertion sort.
17 * To improve the speed of insertion we keep a modify skiplist structure
19 * This is 'modified' in that it explicitly understands sequential
20 * runs within the list and doesn't keep extra skip-points within a run.
22 * The skip-points require memory allocation, which cannot be
23 * guaranteed here in the writeout path. To handle this we
24 * - preallocate some entries
25 * - accept a less-efficient list if memory isn't available
26 * - flush early if things get really bad.
28 * Skip point placement and runs.
29 * When an entry is added to the list, we place a skip-point over it
30 * based on the value of a random number, with a 1-in-4 chance of placement.
31 * The height of the skip point is also random - we keep rolling the dice,
32 * increasing height each time that we get 1-in-4.
33 * If the new entry is placed immediately after a skip point, and is
34 * consecutive with that skip point, the skip point is moved forward
35 * so that it will always be at the end of a run.
36 * When a new entry finds itself at the start or end of a run, a skip
40 /* A height of 8 with a fanout of 1-in-4 allows reasonable
41 * addressing for 2^16 entries, which is probably more than we need
45 #include <linux/random.h>
46 #include <linux/slab.h>
48 static void cluster_flush(struct fs *fs, int cnum);
50 static void skip_discard(struct skippoint *sp)
53 struct skippoint *next = sp->next[0];
59 static int cmp_blk(struct block *a, struct block *b)
61 /* compare the filesys/inode/index/addr info
62 * of two block and return:
65 * -1 if consecutive in order
66 * -2 if in same inode and in order
67 * <=-3 if different inodes and correct order
70 if (a->inode != b->inode) {
71 struct inode *fsinoa = ino_from_sb(a->inode->i_sb);
72 struct inode *fsinob = ino_from_sb(b->inode->i_sb);
85 /* Same inode. If both data blocks they might be
88 if (!test_bit(B_Index, &a->flags) &&
89 !test_bit(B_Index, &b->flags)) {
90 if (a->fileaddr == b->fileaddr)
92 if (a->fileaddr + 1 == b->fileaddr)
94 if (a->fileaddr < b->fileaddr)
98 /* at least one is an index block. Sort them before data */
99 if (!test_bit(B_Index, &a->flags))
100 /* a is data, b is index, b first */
103 if (!test_bit(B_Index, &b->flags))
106 /* both are index blocks, use fileaddr or depth */
107 if (a->fileaddr < b->fileaddr)
109 if (a->fileaddr > b->fileaddr)
111 if (iblk(a)->depth < iblk(b)->depth)
113 /* equality really shouldn't be possible */
117 static struct block *skip_find(struct skippoint *head,
118 struct list_head *list,
119 struct block *target,
120 struct skippoint *where)
122 /* Find the last block that precedes 'target'
123 * and return it. If target will be the start,
125 * 'where' gets filled in with the last skip point
126 * at each level which is before-or-at the found
127 * block. This can be used for easy insertion.
131 struct block *b, *next;
132 for (level = SKIP_MAX_HEIGHT-1; level >= 0 ; level--) {
133 while (head->next[level] &&
134 cmp_blk(head->next[level]->b, target) < 0)
135 head = head->next[level];
136 where->next[level] = head;
138 if (head->b == NULL) {
139 if (list_empty(list) ||
140 cmp_blk(list_entry(list->next, struct block, lru),
142 /* This goes at the start */
144 b = list_entry(list->next, struct block, lru);
147 /* b is before target */
149 list_for_each_entry_continue(next, list, lru) {
150 if (cmp_blk(next, target) > 0)
154 /* b is the last element before target */
158 static int cluster_insert(struct skippoint *head,
159 struct list_head *list,
160 struct block *target,
163 /* Insert 'target' at an appropriate point in the
164 * list, creating a skippoint if appropriate.
167 * 1 if not in a run, but adjacent to something in same file
169 * However if the value we would return would be more than
170 * 'avail' abort the insert and return -1;
173 struct skippoint pos, *newpoint;
176 int cmpbefore, cmpafter;
177 unsigned long rand[DIV_ROUND_UP(SKIP_MAX_HEIGHT * 2 / 8 + 1,
178 sizeof(unsigned long))];
181 BUG_ON(!list_empty(&target->lru));
182 if (unlikely(list_empty(list))) {
184 /* handle this trivial case separately */
187 list_add(&target->lru, list);
189 for (height = 0; height < SKIP_MAX_HEIGHT; height++)
190 head->next[height] = NULL;
193 b = skip_find(head, list, target, &pos);
195 list_add(&target->lru, list);
198 list_add(&target->lru, &b->lru);
199 cmpbefore = cmp_blk(b, target);
201 if (target->lru.next == list) /* new last */
204 cmpafter = cmp_blk(target, list_entry(target->lru.next,
207 if (cmpbefore == -1) {
208 /* adjacent with previous. Possibly move skippoint */
209 if (pos.next[0]->b == b) {
210 pos.next[0]->b = target;
214 } else if (cmpafter == -1)
215 /* adjacent with next. No need to add a skippoint */
217 else if (cmpbefore == -2 || cmpafter == -2)
218 /* same inodes as next or previous */
225 list_del_init(&target->lru);
228 /* Now consider adding a skippoint here. We want 2 bits
229 * per level of random data
231 get_random_bytes(rand, sizeof(rand));
233 while (height < SKIP_MAX_HEIGHT &&
234 test_bit(height*2, rand) &&
235 test_bit(height*2+1, rand))
240 newpoint = kmalloc(sizeof(struct skippoint), GFP_NOFS);
243 newpoint->b = target;
244 for (level = 0; level < height-1; level++) {
245 newpoint->next[level] = pos.next[level];
246 pos.next[level] = newpoint;
251 /*-----------------------------------------------------------------------
252 * A segment is divided up in a slightly complicated way into
253 * tables, rows and columns. This allows us to align writes with
254 * stripes in a raid4 array or similar.
255 * So we have some support routines to help track our way around
259 static int seg_remainder(struct fs *fs, struct segpos *seg)
261 /* return the number of blocks from the start of segpos to the
262 * end of the segment.
263 * i.e. remaining rows in this table, plus remaining tables in
266 struct fs_dev *dv = &fs->devs[seg->dev];
267 int rows = dv->rows_per_table - seg->st_row;
269 BUG_ON(seg->dev < 0);
270 rows += dv->rows_per_table * (dv->tables_per_seg - seg->st_table - 1);
271 return rows * dv->width;
274 static void seg_step(struct fs *fs, struct segpos *seg)
276 /* reposition this segpos to be immediately after it's current end
277 * and make the 'current' point be the start.
280 seg->st_table = seg->nxt_table;
281 seg->st_row = seg->nxt_row;
282 seg->table = seg->st_table;
283 seg->row = seg->nxt_row;
287 static u32 seg_setsize(struct fs *fs, struct segpos *seg, u32 size)
289 /* move the 'nxt' table/row to be 'size' blocks beyond
290 * current start. size will be rounded up to a multiple
293 struct fs_dev *dv = &fs->devs[seg->dev];
295 BUG_ON(seg->dev < 0);
296 size = (size + dv->width - 1) / dv->width;
297 rv = size * dv->width;
299 seg->nxt_table = seg->st_table + size / dv->rows_per_table;
300 seg->nxt_row = size % dv->rows_per_table;
304 static void seg_setpos(struct fs *fs, struct segpos *seg, u64 addr)
306 /* set seg to start at the given virtual address, and be
308 * 'addr' should always be at the start of a row, though
309 * as it might be read off storage, we need to be
310 * a little bit careful.
315 virttoseg(fs, addr, &seg->dev, &seg->num, &offset);
316 dv = &fs->devs[seg->dev];
317 col = offset / dv->stride;
318 row = offset % dv->stride;
319 table = col / dv->width;
320 col = col % dv->width;
323 seg->st_table = table;
326 seg->table = seg->nxt_table = seg->st_table;
327 seg->row = seg->nxt_row = seg->st_row;
331 static u64 seg_next(struct fs *fs, struct segpos *seg)
333 /* step forward one block, returning the address of
334 * the block stepped over
336 struct fs_dev *dv = &fs->devs[seg->dev];
338 u64 addr = seg->col * dv->stride;
340 addr += seg->table * (dv->stride); /* FIXME */
341 addr += seg->num * dv->segment_stride;
344 /* now step forward in column or table or seg */
346 if (seg->col >= dv->width) {
349 if (seg->row >= dv->rows_per_table) {
357 static u64 seg_addr(struct fs *fs, struct segpos *seg)
359 /* step forward one block, returning the address of
360 * the block stepped over. FIXME this comment is wrong.
362 struct fs_dev *dv = &fs->devs[seg->dev];
366 /* Setting 'next' address for last cluster in
367 * a cleaner segment */
369 addr = seg->col * dv->stride;
371 addr += seg->table * (dv->stride); /* FIXME */
372 addr += seg->num * dv->segment_stride;
377 static void cluster_reset(struct fs *fs, struct wc *wc)
379 int blocksize = fs->blocksize;
380 if (wc->seg.dev < 0) {
385 wc->remaining = seg_remainder(fs, &wc->seg);
386 wc->chead_blocks = 1;
388 wc->cluster_space = blocksize - sizeof(struct cluster_head);
389 wc->chead = page_address(wc->page[wc->pending_next]);
390 wc->chead_size = sizeof(struct cluster_head);
393 static void close_segment(struct fs *fs, int cnum)
395 /* Release old segment */
396 /* FIXME I need lafs_seg_{de,}ref for every snapshot,
399 struct wc *wc = fs->wc + cnum;
400 if (wc->seg.dev >= 0) {
402 spin_lock(&fs->lock);
403 fs->clean_reserved -= seg_remainder(fs, &wc->seg);
404 spin_unlock(&fs->lock);
406 lafs_seg_forget(fs, wc->seg.dev, wc->seg.num);
411 void lafs_close_all_segments(struct fs *fs)
414 for (cnum = 0; cnum < WC_NUM; cnum++)
415 close_segment(fs, cnum);
418 static int new_segment(struct fs *fs, int cnum)
420 /* new_segment can fail if cnum > 0 and there is no
423 struct wc *wc = fs->wc + cnum;
427 close_segment(fs, cnum);
429 if (cnum && fs->clean_reserved < fs->max_segment) {
430 /* we have reached the end of the last cleaner
431 * segment. The remainder of clean_reserved
432 * is of no value (if there even is any)
434 if (fs->clean_reserved) {
435 spin_lock(&fs->alloc_lock);
436 fs->free_blocks += fs->clean_reserved;
437 fs->clean_reserved = 0;
438 spin_unlock(&fs->alloc_lock);
442 /* This gets a reference on the 'segsum' */
443 lafs_free_get(fs, &dev, &seg, 0);
444 seg_setpos(fs, &wc->seg, segtovirt(fs, dev, seg));
445 BUG_ON(wc->seg.dev != dev);
446 BUG_ON(wc->seg.num != seg);
449 /* wc mutex protects us here */
455 /*-------------------------------------------------------------------------
456 * lafs_cluster_allocate
457 * This adds a block to the writecluster list and possibly triggers a
459 * The flush will assign a physical address to each block and schedule
460 * the actual write out.
462 * There can be multiple active write clusters. e.g one for new data,
463 * one for cleaned data, one for defrag writes.
465 * It is here that the in-core inode is copied to the data block, if
466 * appropriate, and here that small files get copied into the inode
467 * thus avoiding writing a separate block for the file content.
469 * Roll-forward sees any data block as automatically extending the
470 * size of the file. This requires that we write blocks in block-order
471 * as much as possible (which we would anyway) and that we inform
472 * roll-forward when a file ends mid-block.
474 * A block should be iolocked when being passed to lafs_cluster_allocate.
475 * When the IO is complete, it will be unlocked.
477 static int flush_data_to_inode(struct block *b)
479 /* This is the first and only datablock in a file
480 * and it will fit in the inode. So put it there.
484 loff_t size = i_size_read(b->inode);
485 struct lafs_inode *lai = LAFSI(b->inode);
486 struct datablock *db;
487 struct fs *fs = fs_from_inode(b->inode);
489 lafs_iolock_block(&lai->iblock->b);
490 dprintk("FLUSHING single block into inode - s=%d %s\n",
491 (int)size, strblk(b));
492 if (lai->iblock->depth > 1)
493 /* Too much indexing still - truncate must be happening */
495 if (lai->iblock->depth == 1) {
496 /* If there is nothing in this block we can still use it */
497 if (lai->iblock->children.next != &b->siblings ||
498 b->siblings.next != &lai->iblock->children ||
499 lafs_leaf_next(lai->iblock, 1) != 0xffffffff)
502 /* Need checkpoint lock to pin the dblock */
503 lafs_checkpoint_lock(fs);
504 if (test_and_clear_bit(B_Dirty, &b->flags)) {
506 /* Check size again, just in case it changed */
507 size = i_size_read(b->inode);
508 if (size + lai->metadata_size > fs->blocksize) {
509 /* Lost a race, better give up restoring Dirty bit */
510 if (test_and_set_bit(B_Dirty, &b->flags))
511 if (test_and_set_bit(B_Credit, &b->flags))
512 lafs_space_return(fs, 1);
513 lafs_checkpoint_unlock(fs);
516 if (!test_and_set_bit(B_Credit, &lai->dblock->b.flags))
518 if (test_and_clear_bit(B_UnincCredit, &b->flags))
520 if (!test_and_set_bit(B_ICredit, &lai->dblock->b.flags))
522 lafs_space_return(fs, credits);
523 LAFS_BUG(!test_bit(B_SegRef, &lai->dblock->b.flags), &lai->dblock->b);
524 set_bit(B_PinPending, &lai->dblock->b.flags);
525 if (test_bit(B_Pinned, &b->flags))
526 lafs_pin_block_ph(&lai->dblock->b, !!test_bit(B_Phase1, &b->flags));
528 lafs_pin_block(&lai->dblock->b);
529 lafs_dirty_dblock(lai->dblock);
530 } else if (test_and_clear_bit(B_Realloc, &b->flags)) {
532 LAFS_BUG(!test_bit(B_Valid, &lai->iblock->b.flags),
534 if (!test_and_set_bit(B_Realloc, &lai->iblock->b.flags))
536 if (!test_and_set_bit(B_UnincCredit, &lai->iblock->b.flags))
537 if (!test_and_clear_bit(B_ICredit,
538 &lai->iblock->b.flags))
539 if (!test_and_clear_bit(B_UnincCredit, &b->flags))
540 LAFS_BUG(1, b); /* We needed an UnincCredit */
541 lafs_space_return(fs, credits);
543 printk("Wasn't dirty or realloc: %s\n", strblk(b));
546 lafs_checkpoint_unlock(fs);
548 lai->iblock->depth = 0;
549 clear_bit(B_Valid, &lai->iblock->b.flags);
551 /* safe to reference ->dblock as b is a dirty child */
552 db = getdref(lai->dblock, MKREF(flush2db));
553 ibuf = map_dblock(db);
554 dbuf = map_dblock_2(dblk(b));
556 if (test_and_clear_bit(B_UnincCredit, &b->flags))
557 lafs_space_return(fs, 1);
559 /* It is an awkward time to call lafs_inode_fillblock,
560 * so do this one little change manually
562 ((struct la_inode *)ibuf)->depth = 0;
563 memcpy(ibuf + lai->metadata_size,
565 memset(ibuf + lai->metadata_size + size,
567 fs->blocksize - lai->metadata_size - size);
568 unmap_dblock_2(db, ibuf);
569 unmap_dblock(dblk(b), dbuf);
570 lafs_iounlock_block(&lai->iblock->b);
572 putdref(db, MKREF(flush2db));
577 lafs_iounlock_block(&lai->iblock->b);
581 int lafs_cluster_allocate(struct block *b, int cnum)
583 struct fs *fs = fs_from_inode(b->inode);
584 struct wc *wc = &fs->wc[cnum];
585 struct lafs_inode *lai;
590 LAFS_BUG(test_bit(B_Index, &b->flags) &&
591 iblk(b)->uninc_table.pending_cnt > 0, b);
593 LAFS_BUG(!test_bit(B_IOLock, &b->flags), b);
594 lai = LAFSI(b->inode);
596 LAFS_BUG(!test_bit(B_Valid, &b->flags), b);
597 LAFS_BUG(!fs->rolled, b);
598 LAFS_BUG(lai->flags & File_nonlogged &&
599 !test_bit(B_Index, &b->flags), b);
601 /* We cannot change the physaddr of an uniblock in a
602 * different phase to the parent, else the parent will
603 * get the wrong address.
605 LAFS_BUG(test_bit(B_Index, &b->flags) &&
606 test_bit(B_Uninc, &b->flags) &&
607 !!test_bit(B_Phase1, &b->flags) !=
608 !!test_bit(B_Phase1, &b->parent->b.flags), b);
610 size = i_size_read(b->inode);
611 if (!test_bit(B_Index, &b->flags) && /* it is a datablock */
613 b->parent == lai->iblock && /* No indexing */
614 lai->type >= TypeBase && /* 'size' is meaningful */
615 size + lai->metadata_size <= fs->blocksize) {
616 int success = flush_data_to_inode(b);
618 lafs_summary_update(fs, b->inode, b->physaddr, 0,
619 0, !!test_bit(B_Phase1, &b->flags),
620 test_bit(B_SegRef, &b->flags));
622 lafs_iounlock_block(b);
625 /* There are conflicting blocks in the index tree */
626 if (test_and_clear_bit(B_Realloc, &b->flags))
627 lafs_space_return(fs, 1);
628 if (!test_bit(B_Dirty, &b->flags) ||
629 !test_bit(B_Pinned, &b->flags)) {
630 /* It is safe to just leave this for later */
631 lafs_iounlock_block(b);
634 /* We really need to write this, so use a full block. */
635 printk("WARNING %s\n", strblk(b));
639 /* The symbiosis between the datablock and the indexblock for an
640 * inode needs to be dealt with here.
641 * The data block will not normally be written until the InoIdx
642 * block is unpinned or phase-flipped. However if it is, as the
643 * result of some 'sync' style operation, then we write it even
644 * though the index info might not be uptodate. The rest of the
645 * content will be safe for roll-forward to pick up, and the rest
646 * will be written when the InoIdx block is ready.
648 * When the InoIdx block is ready, we don't want to write it as
649 * there is nowhere for it to be incorporated in to. Rather we
650 * move the pinning and credits across to the Data block
651 * which will subsequently be written.
654 if (test_bit(B_InoIdx, &b->flags)) {
655 struct block *b2 = &lai->dblock->b;
658 LAFS_BUG(!test_bit(B_Pinned, &b->flags), b);
659 if (!test_bit(B_Pinned, &b2->flags)) {
660 /* index block pinned, data block not,
661 * carry the pinning across
663 if (b2->parent == NULL &&
666 getiref(b->parent, MKREF(child));
667 LAFS_BUG(b->parent != b2->parent, b);
668 set_bit(B_PinPending, &b2->flags); // FIXME is this right?
669 set_phase(b2, test_bit(B_Phase1,
675 if (test_and_clear_bit(B_Realloc, &b->flags))
677 LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
679 if (test_and_clear_bit(B_Dirty, &b->flags))
681 if (test_and_clear_bit(B_Realloc, &b->flags))
684 if (test_and_clear_bit(B_UnincCredit, &b->flags))
687 /* We always erase the InoIdx block before the
688 * inode data block, so this cannot happen
690 LAFS_BUG(!test_bit(B_Valid, &b2->flags), b2);
692 if (!test_and_set_bit(B_UnincCredit, &b2->flags))
693 if (!test_and_clear_bit(B_ICredit, &b2->flags))
697 if (!test_bit(B_Dirty, &b2->flags))
698 /* need some credits... */
699 if (!test_and_set_bit(B_Credit, &b2->flags))
701 lafs_dirty_dblock(dblk(b2));
703 if (!test_and_set_bit(B_Realloc, &b2->flags))
706 LAFS_BUG(credits < 0, b2);
707 lafs_space_return(fs, credits);
708 /* make sure 'dirty' status is registered */
710 lafs_iounlock_block(b);
714 if (!test_bit(B_Dirty, &b->flags) &&
715 !test_bit(B_Realloc, &b->flags)) {
716 /* block just got truncated, so don't write it. */
717 lafs_iounlock_block(b);
721 if (test_bit(B_EmptyIndex, &b->flags)) {
723 if (test_and_set_bit(B_Writeback, &b->flags))
725 lafs_iounlock_block(b);
726 lafs_allocated_block(fs, b, 0);
728 if (test_and_clear_bit(B_Realloc, &b->flags))
730 LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
732 if (test_and_clear_bit(B_Dirty, &b->flags))
734 if (test_and_clear_bit(B_Realloc, &b->flags))
737 lafs_writeback_done(b);
738 lafs_space_return(fs, credits);
742 if (!test_bit(B_Index, &b->flags)) {
743 struct inode *myi = rcu_my_inode(dblk(b));
744 if (myi && test_and_clear_bit(I_Dirty, &LAFSI(myi)->iflags))
745 lafs_inode_fillblock(myi);
749 mutex_lock(&wc->lock);
751 /* The following check must be inside the mutex_lock
752 * to ensure exclusion between checkpoint and writepage.
753 * checkpoint must never see the block not on the
754 * leaf lru unless it is already in the cluster and so can
757 if (!list_empty_careful(&b->lru)) {
758 /* Someone is flushing this block before
759 * checkpoint got to it - probably writepage.
760 * Must remove it from the leaf lru so it can go
761 * on the cluster list.
763 LAFS_BUG(test_bit(B_OnFree, &b->flags), b);
764 spin_lock(&fs->lock);
765 if (!list_empty(&b->lru))
766 list_del_init(&b->lru);
767 spin_unlock(&fs->lock);
770 if (test_and_set_bit(B_Writeback, &b->flags))
772 lafs_iounlock_block(b);
774 getref(b, MKREF(cluster)); /* we soon own a reference in the list */
778 /* see how much room is in cluster.
779 * The interesting values are:
780 * none, one descriptor, one grouphead and one descriptor
781 * These are assigned values 0, 1, 2.
783 if (wc->cluster_space >= (sizeof(struct group_head) +
784 sizeof(struct descriptor)) ||
785 (wc->remaining > 2 &&
786 wc->chead_blocks < MAX_CHEAD_BLOCKS &&
787 (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE)
790 else if (wc->cluster_space >= sizeof(struct descriptor))
794 if (wc->seg.dev < 0 || wc->remaining < 1)
797 used = cluster_insert(&wc->slhead, &wc->clhead, b, avail);
801 else if (wc->slhead.b)
802 cluster_flush(fs, cnum);
804 err = new_segment(fs, cnum);
806 /* No cleaner segments - this will have to go
807 * out with a checkpoint.
808 * Block is still 'dirty' - just clear
811 lafs_writeback_done(b);
812 putref(b, MKREF(cluster));
813 mutex_unlock(&wc->lock);
816 cluster_reset(fs, wc);
821 wc->cluster_space -= sizeof(struct descriptor);
823 wc->cluster_space -= sizeof(struct group_head);
824 if (wc->cluster_space < 0) {
825 /* need a new page */
828 wc->cluster_space += fs->blocksize;
831 BUG_ON(wc->remaining < 0);
832 if (wc->remaining == 0)
833 cluster_flush(fs, cnum);
834 mutex_unlock(&wc->lock);
838 /*-------------------------------------------------------------------------
839 * Cluster head building.
840 * We build the cluster head bit by bit as we find blocks
841 * in the list. These routines help.
844 static inline void cluster_addhead(struct wc *wc, struct inode *ino,
845 struct group_head **headstart)
847 struct group_head *gh = (struct group_head *)((char *)wc->chead +
850 dprintk("CLUSTER addhead %d\n", wc->chead_size);
853 gh->inum = cpu_to_le32(ino->i_ino);
854 gh->fsnum = cpu_to_le32(ino_from_sb(ino->i_sb)->i_ino);
855 tnf = ((ino->i_generation<<8) | (LAFSI(ino)->trunc_gen & 0xff))
859 gh->truncatenum_and_flag = cpu_to_le16(tnf);
860 wc->chead_size += sizeof(struct group_head);
863 static inline void cluster_closehead(struct wc *wc,
864 struct group_head *headstart)
866 int size = wc->chead_size - (((char *)headstart) - (char *)wc->chead);
868 dprintk("CLUSTER closehead %d %d\n", wc->chead_size, size);
869 headstart->group_size_words = size / 4;
872 static inline void cluster_addmini(struct wc *wc, u32 addr, int offset,
873 int size, const char *data,
874 int size2, const char *data2)
876 /* if size2 !=0, then only
877 * (size-size2) is at 'data' and the rest is at 'data2'
879 struct miniblock *mb = ((struct miniblock *)
880 ((char *)wc->chead + wc->chead_size));
882 dprintk("CLUSTER addmini %d %d\n", wc->chead_size, size);
884 mb->block_num = cpu_to_le32(addr);
885 mb->block_offset = cpu_to_le16(offset);
886 mb->length = cpu_to_le16(size + DescMiniOffset);
887 wc->chead_size += sizeof(struct miniblock);
888 memcpy(mb->data, data, size-size2);
890 memcpy(mb->data + size-size2, data2, size2);
891 memset(mb->data+size, 0, (-size)&3);
892 wc->chead_size += ROUND_UP(size);
895 static inline void cluster_adddesc(struct wc *wc, struct block *blk,
896 struct descriptor **desc_start)
898 struct descriptor *dh = (struct descriptor *)((char *)wc->chead +
901 dprintk("CLUSTER add_desc %d\n", wc->chead_size);
902 dh->block_num = cpu_to_le32(blk->fileaddr);
903 dh->block_cnt = cpu_to_le32(0);
905 if (test_bit(B_Index, &blk->flags))
906 dh->block_bytes = DescIndex;
907 wc->chead_size += sizeof(struct descriptor);
910 static inline void cluster_incdesc(struct wc *wc, struct descriptor *desc_start,
911 struct block *b, int blkbits)
913 desc_start->block_cnt =
914 cpu_to_le32(le32_to_cpu(desc_start->block_cnt)+1);
915 if (!test_bit(B_Index, &b->flags)) {
916 if (LAFSI(b->inode)->type >= TypeBase) {
917 u64 size = b->inode->i_size;
918 if (size > ((loff_t)b->fileaddr << blkbits) &&
919 size <= ((loff_t)(b->fileaddr + 1) << blkbits))
920 desc_start->block_bytes =
921 cpu_to_le32(size & ((1<<blkbits)-1));
923 desc_start->block_bytes =
924 cpu_to_le32(1 << blkbits);
926 desc_start->block_bytes = cpu_to_le32(1<<blkbits);
930 /*------------------------------------------------------------------------
932 * Updates are added to the cluster head immediately, causing a flush
933 * if there is insufficient room.
934 * This means that miniblocks always occur before block descriptors.
935 * miniblocks are only ever written to cluster-series 0.
939 lafs_cluster_update_prepare(struct update_handle *uh, struct fs *fs,
942 /* make there there is room for a 'len' update in upcoming
943 * cluster. Do nothing here.
951 lafs_cluster_update_pin(struct update_handle *uh)
953 /* last chance to bail out */
954 if (lafs_space_alloc(uh->fs, 1, ReleaseSpace)) {
956 uh->fs->cluster_updates++;
962 static unsigned long long
963 lafs_cluster_update_commit_both(struct update_handle *uh, struct fs *fs,
964 struct inode *ino, u32 addr,
965 int offset, int len, struct datablock *b,
967 int len2, const char *str2)
969 /* There is the data - alternate layout */
970 /* flush 'size' bytes of data, as 'offset' of bnum in ino
971 * in the next write cluster
973 struct wc *wc = &fs->wc[0];
974 struct group_head *head_start;
975 unsigned long long seq;
976 char *mapping = NULL;
980 mutex_lock(&wc->lock);
981 while (wc->cluster_space < (sizeof(struct group_head)+
982 sizeof(struct descriptor) +
984 if (wc->remaining > 0 && wc->chead_blocks < MAX_CHEAD_BLOCKS &&
985 (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE) {
988 wc->cluster_space += fs->blocksize;
990 cluster_flush(fs, 0);
993 fs->cluster_updates -= uh->reserved;
994 lafs_space_return(fs, uh->reserved);
997 cluster_addhead(wc, ino, &head_start);
999 mapping = map_dblock(b);
1000 str1 = mapping + offset;
1002 cluster_addmini(wc, addr, offset, len, str1, len2, str2);
1004 unmap_dblock(b, mapping);
1005 cluster_closehead(wc, head_start);
1006 wc->cluster_space -= (sizeof(struct group_head)+
1007 sizeof(struct descriptor) +
1009 seq = wc->cluster_seq;
1010 mutex_unlock(&wc->lock);
1015 lafs_cluster_update_commit(struct update_handle *uh,
1016 struct datablock *b,
1017 int offset, int len)
1019 /* OK here is the data to put in the next cluster. */
1020 struct fs *fs = fs_from_inode(b->b.inode);
1021 unsigned long long seq;
1023 seq = lafs_cluster_update_commit_both(uh, fs, b->b.inode, b->b.fileaddr,
1024 offset, len, b, NULL, 0, NULL);
1030 lafs_cluster_update_commit_buf(struct update_handle *uh, struct fs *fs,
1031 struct inode *ino, u32 addr,
1032 int offset, int len, const char *str1,
1033 int len2, const char *str2)
1035 return lafs_cluster_update_commit_both(uh, fs,
1043 lafs_cluster_update_abort(struct update_handle *uh)
1045 /* Didn't want that cluster_update after all */
1046 uh->fs->cluster_updates -= uh->reserved;
1047 lafs_space_return(uh->fs, uh->reserved);
1050 int lafs_calc_cluster_csum(struct cluster_head *head)
1052 unsigned int oldcsum = head->checksum;
1053 unsigned long long newcsum = 0;
1056 unsigned int *superc = (unsigned int *) head;
1059 for (i = 0; i < le16_to_cpu(head->Hlength)/4; i++)
1060 newcsum += le32_to_cpu(superc[i]);
1061 csum = (newcsum & 0xffffffff) + (newcsum>>32);
1062 head->checksum = oldcsum;
1063 return cpu_to_le32(csum);
1066 /*------------------------------------------------------------------------
1068 * Once we have a suitable number of blocks and updates in a writecluster
1069 * we need to write them out.
1070 * We first iterate over the block list building the clusterhead.
1071 * Then we finalise and write the clusterhead and iterate again
1072 * writing out each block.
1073 * We keep a biassed blockcount in the wc, and decrement it on write-completion.
1074 * Once the blockcount hits the bias the cluster is written. Then when
1075 * the next (or next-plus-one) cluster head is written we remove the bias
1076 * and the data is deemed to be safe.
1078 * Update blocks will already have been included in the write cluster.
1080 * We reinitialise the write cluster once we are finished.
1083 static void cluster_done(struct fs *fs, struct wc *wc)
1085 /* This is called when one of the pending_cnts hit 0, indicating
1086 * that a write-cluster has been written and committed to storage.
1087 * We find all the pending blocks on a completed cluster and
1091 struct block *b, *tmp;
1092 struct list_head tofree;
1094 INIT_LIST_HEAD(&tofree);
1096 spin_lock(&fs->lock);
1097 for (i = 0; i < 4 ; i++)
1098 if (atomic_read(&wc->pending_cnt[i]) == 0)
1099 list_splice_init(&wc->pending_blocks[i], &tofree);
1100 spin_unlock(&fs->lock);
1102 list_for_each_entry_safe(b, tmp, &tofree, lru) {
1103 list_del_init(&b->lru);
1104 lafs_writeback_done(b);
1105 putref(b, MKREF(cluster));
1109 void lafs_clusters_done(struct fs *fs)
1112 for (i = 0 ; i < WC_NUM; i++)
1113 cluster_done(fs, &fs->wc[i]);
1116 void lafs_done_work(struct work_struct *ws)
1118 struct fs *fs = container_of(ws, struct fs, done_work);
1119 lafs_clusters_done(fs);
1122 static void cluster_flush(struct fs *fs, int cnum)
1124 struct wc *wc = &fs->wc[cnum];
1127 struct inode *current_inode = NULL;
1128 u64 current_block = NoBlock;
1129 u64 head_addr[MAX_CHEAD_BLOCKS];
1130 struct descriptor *desc_start = NULL;
1131 struct group_head *head_start = NULL;
1132 struct segpos segend;
1139 if (!test_bit(CheckpointOpen, &fs->fsstate)) {
1140 /* First cluster since a checkpoint, make sure
1141 * we do another checkpoint within 30 seconds
1143 fs->next_checkpoint = jiffies + 30 * HZ;
1144 set_bit(CheckpointOpen, &fs->fsstate);
1145 lafs_wake_thread(fs);
1147 /* set the writecluster size as this will guide layout in
1151 if (test_and_clear_bit(FlushNeeded, &fs->fsstate))
1152 set_bit(SecondFlushNeeded, &fs->fsstate);
1154 clear_bit(SecondFlushNeeded, &fs->fsstate);
1156 cluster_size = seg_setsize(fs, &wc->seg,
1157 seg_remainder(fs, &wc->seg) - wc->remaining);
1159 /* find, and step over, address header block(s) */
1160 for (i = 0; i < wc->chead_blocks ; i++)
1161 head_addr[i] = seg_next(fs, &wc->seg);
1163 list_for_each_entry(b, &wc->clhead, lru) {
1165 if (b->inode != current_inode) {
1166 /* need to create a new group_head */
1169 cluster_closehead(wc, head_start);
1170 cluster_addhead(wc, b->inode, &head_start);
1171 current_inode = b->inode;
1172 current_block = NoBlock;
1174 if (desc_start == NULL || b->fileaddr != current_block+1 ||
1175 test_bit(B_Index, &b->flags)) {
1176 cluster_adddesc(wc, b, &desc_start);
1177 current_block = b->fileaddr;
1180 cluster_incdesc(wc, desc_start, b, fs->blocksize_bits);
1181 addr = seg_next(fs, &wc->seg);
1182 if (cnum && test_bit(B_Dirty, &b->flags))
1183 /* Were a cleaning but this block is now dirty.
1184 * Don't waste the UnincCredit on recording the
1185 * clean location as it will be written as
1186 * dirty soon. Just pin it to the current phase
1187 * to ensure that it really does get written.
1191 lafs_allocated_block(fs, b, addr);
1196 cluster_closehead(wc, head_start);
1197 segend = wc->seg; /* We may write zeros from here */
1198 seg_step(fs, &wc->seg);
1199 wc->remaining = seg_remainder(fs, &wc->seg);
1200 /* Need to make sure out ->next_addr gets set properly
1201 * for non-cleaning segments
1203 if (wc->remaining < 2) {
1205 new_segment(fs, cnum);
1207 close_segment(fs, cnum);
1210 /* Fill in the cluster header */
1211 strncpy(wc->chead->idtag, "LaFSHead", 8);
1213 wc->chead->flags = cpu_to_le32(0);
1215 /* checkpoints only happen in cnum==0 */
1216 if (fs->checkpointing) {
1217 spin_lock(&fs->lock);
1218 wc->chead->flags = cpu_to_le32(fs->checkpointing);
1219 if (fs->checkpointing & CH_CheckpointEnd) {
1220 fs->checkpointing = 0;
1221 clear_bit(CheckpointOpen, &fs->fsstate);
1222 } else if (fs->checkpointing & CH_CheckpointStart) {
1223 fs->checkpointcluster = head_addr[0];
1224 fs->checkpointing &= ~CH_CheckpointStart;
1226 spin_unlock(&fs->lock);
1227 if (!fs->checkpointing)
1228 wake_up(&fs->phase_wait);
1230 wc->chead->flags = 0;
1232 memcpy(wc->chead->uuid, fs->state->uuid, 16);
1233 wc->chead->seq = cpu_to_le64(wc->cluster_seq);
1235 wc->chead->Hlength = cpu_to_le16(wc->chead_size);
1236 wc->chead->Clength = cpu_to_le16(cluster_size);
1237 spin_lock(&fs->lock);
1239 fs->clean_reserved -= cluster_size;
1241 fs->free_blocks -= cluster_size;
1242 spin_unlock(&fs->lock);
1243 /* FIXME if this is just a header, no data blocks,
1244 * then use VerifyNull. Alternately if there is
1245 * no-one waiting for a sync to complete (how do we
1246 * track that?) use VerifyNext2
1251 wc->pending_vfy_type[wc->pending_next] = vtype;
1252 wc->chead->verify_type = cpu_to_le16(vtype);
1253 memset(wc->chead->verify_data, 0, 16);
1255 wc->chead->next_addr = cpu_to_le64(seg_addr(fs, &wc->seg));
1256 wc->chead->prev_addr = cpu_to_le64(wc->prev_addr);
1257 wc->prev_addr = head_addr[0];
1258 wc->chead->this_addr = cpu_to_le64(wc->prev_addr);
1260 wc->chead->checksum = lafs_calc_cluster_csum(wc->chead);
1262 /* We cannot write the header until any previous cluster
1263 * which uses the header as a commit block has been
1266 which = (wc->pending_next+3)%4;
1267 dprintk("AA which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1268 atomic_read(&wc->pending_cnt[which]));
1269 if (wc->pending_vfy_type[which] == VerifyNext)
1270 wait_event(wc->pending_wait,
1271 atomic_read(&wc->pending_cnt[which]) == 1);
1272 which = (which+3) % 4;
1273 dprintk("AB which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1274 atomic_read(&wc->pending_cnt[which]));
1275 if (wc->pending_vfy_type[which] == VerifyNext2)
1276 wait_event(wc->pending_wait,
1277 atomic_read(&wc->pending_cnt[which]) == 1);
1279 lafs_clusters_done(fs);
1280 dprintk("cluster_flush pre-bug pending_next=%d cnt=%d\n",
1281 wc->pending_next, atomic_read(&wc->pending_cnt
1282 [wc->pending_next]));
1283 BUG_ON(atomic_read(&wc->pending_cnt[wc->pending_next]) != 0);
1284 BUG_ON(!list_empty(&wc->pending_blocks[wc->pending_next]));
1286 /* initialise pending count to 1. This will be decremented
1287 * once all related blocks have been submitted for write.
1288 * This includes the blocks in the next 0, 1, or 2
1291 atomic_inc(&wc->pending_cnt[wc->pending_next]);
1293 /* Now write it all out.
1294 * Later we should possibly re-order the writes
1295 * for raid4 stripe-at-a-time
1297 for (i = 0; i < wc->chead_blocks; i++)
1299 page_address(wc->page[wc->pending_next])
1300 + i * fs->blocksize,
1303 while (!list_empty(&wc->clhead)) {
1305 b = list_entry(wc->clhead.next, struct block, lru);
1306 dprintk("flushing %s\n", strblk(b));
1308 if (test_and_clear_bit(B_Realloc, &b->flags))
1311 if (test_and_clear_bit(B_Dirty, &b->flags))
1315 WARN_ON(credits == 0);
1316 lafs_space_return(fs, credits);
1317 spin_lock(&fs->lock);
1319 list_add(&b->lru, &wc->pending_blocks[(wc->pending_next+3)%4]);
1320 spin_unlock(&fs->lock);
1321 lafs_write_block(fs, b, wc);
1324 skip_discard(wc->slhead.next[0]);
1325 /* FIXME write out zeros to pad raid4 if needed */
1327 /* Having submitted this request we need to remove the bias
1328 * from pending_cnt of previous clusters
1330 which = wc->pending_next;
1332 dprintk("A which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1333 atomic_read(&wc->pending_cnt[which]));
1334 if (wc->pending_vfy_type[which] == VerifyNull)
1335 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1337 which = (which+3) % 4;
1338 dprintk("B which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1339 atomic_read(&wc->pending_cnt[which]));
1340 if (wc->pending_vfy_type[which] == VerifyNext)
1341 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1343 which = (which+3) % 4;
1344 dprintk("C which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1345 atomic_read(&wc->pending_cnt[which]));
1346 if (wc->pending_vfy_type[which] == VerifyNext2)
1347 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1350 cluster_done(fs, wc);
1351 wake_up(&wc->pending_wait);
1353 dprintk("D %d\n", wake);
1355 lafs_write_flush(fs, wc);
1357 wc->pending_next = (wc->pending_next+1) % 4;
1358 /* now re-initialise the cluster information */
1359 cluster_reset(fs, wc);
1361 wait_event(wc->pending_wait,
1362 atomic_read(&wc->pending_cnt[wc->pending_next]) == 0);
1363 dprintk("cluster_flush end pending_next=%d cnt=%d\n",
1364 wc->pending_next, atomic_read(&wc->pending_cnt
1365 [wc->pending_next]));
1368 void lafs_cluster_flush(struct fs *fs, int cnum)
1370 struct wc *wc = &fs->wc[cnum];
1371 dprintk("LAFS_cluster_flush %d\n", cnum);
1372 mutex_lock(&wc->lock);
1373 if (!list_empty(&wc->clhead)
1374 || wc->chead_size > sizeof(struct cluster_head)
1376 ((fs->checkpointing & CH_CheckpointEnd)
1377 || test_bit(FlushNeeded, &fs->fsstate)
1378 || test_bit(SecondFlushNeeded, &fs->fsstate)
1380 cluster_flush(fs, cnum);
1381 mutex_unlock(&wc->lock);
1384 void lafs_cluster_wait_all(struct fs *fs)
1387 for (i = 0; i < WC_NUM; i++) {
1388 struct wc *wc = &fs->wc[i];
1390 for (j = 0; j < 4; j++) {
1391 wait_event(wc->pending_wait,
1392 atomic_read(&wc->pending_cnt[j]) <= 1);
1397 /* The end_io function for writes to a cluster is one
1398 * of 4 depending on which in the circular list the
1401 static void cluster_end_io(struct bio *bio, int err,
1402 int which, int header)
1404 /* FIXME how to handle errors? */
1405 struct wc *wc = bio->bi_private;
1406 struct fs *fs = container_of(wc, struct fs, wc[wc->cnum]);
1410 dprintk("end_io err=%d which=%d header=%d\n",
1411 err, which, header);
1413 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1415 if (atomic_read(&wc->pending_cnt[which]) == 1)
1417 which = (which+3) % 4;
1419 (wc->pending_vfy_type[which] == VerifyNext ||
1420 wc->pending_vfy_type[which] == VerifyNext2) &&
1421 atomic_dec_and_test(&wc->pending_cnt[which]))
1423 if (atomic_read(&wc->pending_cnt[which]) == 1)
1426 which = (which+3) % 4;
1428 wc->pending_vfy_type[which] == VerifyNext2 &&
1429 atomic_dec_and_test(&wc->pending_cnt[which]))
1431 if (atomic_read(&wc->pending_cnt[which]) == 1)
1435 schedule_work(&fs->done_work);
1437 wake_up(&wc->pending_wait);
1438 if (test_bit(FlushNeeded, &fs->fsstate) ||
1439 test_bit(SecondFlushNeeded, &fs->fsstate))
1440 lafs_wake_thread(fs);
1444 static void cluster_endio_data_0(struct bio *bio, int err)
1446 cluster_end_io(bio, err, 0, 0);
1449 static void cluster_endio_data_1(struct bio *bio, int err)
1451 cluster_end_io(bio, err, 1, 0);
1454 static void cluster_endio_data_2(struct bio *bio, int err)
1456 cluster_end_io(bio, err, 2, 0);
1459 static void cluster_endio_data_3(struct bio *bio, int err)
1461 cluster_end_io(bio, err, 3, 0);
1464 static void cluster_endio_header_0(struct bio *bio, int err)
1466 cluster_end_io(bio, err, 0, 1);
1469 static void cluster_endio_header_1(struct bio *bio, int err)
1471 cluster_end_io(bio, err, 1, 1);
1474 static void cluster_endio_header_2(struct bio *bio, int err)
1476 cluster_end_io(bio, err, 2, 1);
1479 static void cluster_endio_header_3(struct bio *bio, int err)
1481 cluster_end_io(bio, err, 3, 1);
1484 bio_end_io_t *lafs_cluster_endio_choose(int which, int header)
1489 return cluster_endio_header_0;
1491 return cluster_endio_header_1;
1494 return cluster_endio_header_2;
1496 return cluster_endio_header_3;
1500 return cluster_endio_data_0;
1502 return cluster_endio_data_1;
1505 return cluster_endio_data_2;
1507 return cluster_endio_data_3;
1510 int lafs_cluster_init(struct fs *fs, int cnum, u64 addr, u64 prev, u64 seq)
1512 struct wc *wc = &fs->wc[cnum];
1515 wc->slhead.b = NULL;
1516 INIT_LIST_HEAD(&wc->clhead);
1518 for (i = 0; i < 4 ; i++) {
1519 wc->pending_vfy_type[i] = VerifyNull;
1520 atomic_set(&wc->pending_cnt[i], 0);
1521 wc->page[i] = alloc_page(GFP_KERNEL);
1528 put_page(wc->page[i]);
1532 wc->pending_next = 0;
1533 wc->cluster_seq = seq;
1534 wc->prev_addr = prev;
1537 seg_setpos(fs, &wc->seg, addr);
1538 cluster_reset(fs, wc);
1540 spin_lock(&fs->lock);
1541 fs->free_blocks += wc->remaining+1;
1542 spin_unlock(&fs->lock);
1549 void lafs_flush(struct datablock *b)
1551 /* Need to write this block out and wait until
1552 * it has been written, so that we can update it
1553 * without risking corruption to previous snapshot.
1558 int lafs_cluster_empty(struct fs *fs, int cnum)
1560 return list_empty(&fs->wc[cnum].clhead);