]> git.neil.brown.name Git - LaFS.git/blob - cluster.c
split out set_youth.
[LaFS.git] / cluster.c
1
2 /*
3  * write-cluster management routines for LaFS
4  * fs/lafs/cluster.c
5  * Copyright (C) 2006-2009
6  * NeilBrown <neilb@suse.de>
7  * Released under the GPL, version 2
8  */
9
10 /*
11  * Blocks to be written to a cluster need to be sorted so as to minimise
12  * fragmentation and to keep the cluster head small.
13  * So that we can track the amount of space needed in the cluster head,
14  * we really need to keep the list sorted rather than just sort it
15  * before writeout.
16  * We do this using a linked list of blocks and an insertion sort.
17  * To improve the speed of insertion we keep a modify skiplist structure
18  * over the list.
19  * This is 'modified' in that it explicitly understands sequential
20  * runs within the list and doesn't keep extra skip-points within a run.
21  *
22  * The skip-points require memory allocation, which cannot be
23  * guaranteed here in the writeout path.  To handle this we
24  *  - preallocate some entries
25  *  - accept a less-efficient list if memory isn't available
26  *  - flush early if things get really bad.
27  *
28  * Skip point placement and runs.
29  * When an entry is added to the list, we place a skip-point over it
30  * based on the value of a random number, with a 1-in-4 chance of placement.
31  * The height of the skip point is also random - we keep rolling the dice,
32  * increasing height each time that we get 1-in-4.
33  * If the new entry is placed immediately after a skip point, and is
34  * consecutive with that skip point, the skip point is moved forward
35  * so that it will always be at the end of a run.
36  * When a new entry finds itself at the start or end of a run, a skip
37  * point is not added.
38  */
39
40 /* A height of 8 with a fanout of 1-in-4 allows reasonable
41  * addressing for 2^16 entries, which is probably more than we need
42  */
43
44 #include        "lafs.h"
45 #include        <linux/random.h>
46 #include        <linux/slab.h>
47
48 static void cluster_flush(struct fs *fs, int cnum);
49
50 static void skip_discard(struct skippoint *sp)
51 {
52         while (sp) {
53                 struct skippoint *next = sp->next[0];
54                 kfree(sp);
55                 sp = next;
56         }
57 }
58
59 static int cmp_blk(struct block *a, struct block *b)
60 {
61         /* compare the filesys/inode/index/addr info
62          * of two block and return:
63          *  0 if equals
64          * +ve if a > b
65          * -1 if consecutive in order
66          * -2 if in same inode and in order
67          * <=-3 if different inodes and correct order
68          */
69
70         if (a->inode != b->inode) {
71                 struct inode *fsinoa = ino_from_sb(a->inode->i_sb);
72                 struct inode *fsinob = ino_from_sb(b->inode->i_sb);
73                 if (fsinoa->i_ino <
74                     fsinob->i_ino)
75                         return -3;
76                 if (fsinoa->i_ino >
77                     fsinob->i_ino)
78                         return 3;
79                 /* same filesystem */
80                 if (a->inode->i_ino <
81                     b->inode->i_ino)
82                         return -3;
83                 return 3;
84         }
85         /* Same inode.  If both data blocks they might be
86          * consecutive
87          */
88         if (!test_bit(B_Index, &a->flags) &&
89             !test_bit(B_Index, &b->flags)) {
90                 if (a->fileaddr == b->fileaddr)
91                         return 0;
92                 if (a->fileaddr + 1 == b->fileaddr)
93                         return -1;
94                 if (a->fileaddr < b->fileaddr)
95                         return -2;
96                 return 2;
97         }
98         /* at least one is an index block. Sort them before data */
99         if (!test_bit(B_Index, &a->flags))
100                 /* a is data, b is index, b first */
101                 return 2;
102
103         if (!test_bit(B_Index, &b->flags))
104                 return -2;
105
106         /* both are index blocks, use fileaddr or depth */
107         if (a->fileaddr < b->fileaddr)
108                 return -2;
109         if (a->fileaddr > b->fileaddr)
110                 return 2;
111         if (iblk(a)->depth < iblk(b)->depth)
112                 return -2;
113         /* equality really shouldn't be possible */
114         return 2;
115 }
116
117 static struct block *skip_find(struct skippoint *head,
118                                struct list_head *list,
119                                struct block *target,
120                                struct skippoint *where)
121 {
122         /* Find the last block that precedes 'target'
123          * and return it.  If target will be the start,
124          * return NULL.
125          * 'where' gets filled in with the last skip point
126          * at each level which is before-or-at the found
127          * block.  This can be used for easy insertion.
128          */
129
130         int level;
131         struct block *b, *next;
132         for (level = SKIP_MAX_HEIGHT-1; level >= 0 ; level--) {
133                 while (head->next[level] &&
134                        cmp_blk(head->next[level]->b, target) < 0)
135                         head = head->next[level];
136                 where->next[level] = head;
137         }
138         if (head->b == NULL) {
139                 if (list_empty(list) ||
140                     cmp_blk(list_entry(list->next, struct block, lru),
141                             target) > 0)
142                         /* This goes at the start */
143                         return NULL;
144                 b = list_entry(list->next, struct block, lru);
145         } else
146                 b = head->b;
147         /* b is before target */
148         next = b;
149         list_for_each_entry_continue(next, list, lru) {
150                 if (cmp_blk(next, target) > 0)
151                         break;
152                 b = next;
153         }
154         /* b is the last element before target */
155         return b;
156 }
157
158 static int cluster_insert(struct skippoint *head,
159                           struct list_head *list,
160                           struct block *target,
161                           int avail)
162 {
163         /* Insert 'target' at an appropriate point in the
164          * list, creating a skippoint if appropriate.
165          * Return:
166          *  0 if part of a run
167          *  1 if not in a run, but adjacent to something in same file
168          *  2 otherwise
169          * However if the value we would return would be more than
170          * 'avail' abort the insert and return -1;
171          */
172         int rv;
173         struct skippoint pos, *newpoint;
174         int level;
175         struct block *b;
176         int cmpbefore, cmpafter;
177         unsigned long rand[DIV_ROUND_UP(SKIP_MAX_HEIGHT * 2 / 8 + 1,
178                                         sizeof(unsigned long))];
179         int height;
180
181         BUG_ON(!list_empty(&target->lru));
182         if (unlikely(list_empty(list))) {
183                 int height;
184                 /* handle this trivial case separately */
185                 if (avail < 2)
186                         return -1;
187                 list_add(&target->lru, list);
188                 head->b = NULL;
189                 for (height = 0; height < SKIP_MAX_HEIGHT; height++)
190                         head->next[height] = NULL;
191                 return 2;
192         }
193         b = skip_find(head, list, target, &pos);
194         if (b == NULL) {
195                 list_add(&target->lru, list);
196                 cmpbefore = 3;
197         } else {
198                 list_add(&target->lru, &b->lru);
199                 cmpbefore = cmp_blk(b, target);
200         }
201         if (target->lru.next  == list) /* new last */
202                 cmpafter = -3;
203         else
204                 cmpafter = cmp_blk(target, list_entry(target->lru.next,
205                                                       struct block,
206                                                       lru));
207         if (cmpbefore == -1) {
208                 /* adjacent with previous.  Possibly move skippoint */
209                 if (pos.next[0]->b == b) {
210                         pos.next[0]->b = target;
211                         return 0;
212                 }
213                 rv = 0;
214         } else if (cmpafter == -1)
215                 /* adjacent with next. No need to add a skippoint */
216                 return 0;
217         else if (cmpbefore == -2 || cmpafter == -2)
218                 /* same inodes as next or previous */
219                 rv = 1;
220         else
221                 rv = 2;
222
223         if (rv > avail) {
224                 /* Doesn't fit */
225                 list_del_init(&target->lru);
226                 return -1;
227         }
228         /* Now consider adding a skippoint here.  We want 2 bits
229          * per level of random data
230          */
231         get_random_bytes(rand, sizeof(rand));
232         height = 0;
233         while (height < SKIP_MAX_HEIGHT &&
234                test_bit(height*2, rand) &&
235                test_bit(height*2+1, rand))
236                 height++;
237         if (height == 0)
238                 return rv;
239
240         newpoint = kmalloc(sizeof(struct skippoint), GFP_NOFS);
241         if (!newpoint)
242                 return rv;
243         newpoint->b = target;
244         for (level = 0; level < height-1; level++) {
245                 newpoint->next[level] = pos.next[level];
246                 pos.next[level] = newpoint;
247         }
248         return rv;
249 }
250
251 /*-----------------------------------------------------------------------
252  * A segment is divided up in a slightly complicated way into
253  * tables, rows and columns.  This allows us to align writes with
254  * stripes in a raid4 array or similar.
255  * So we have some support routines to help track our way around
256  * the segment
257  */
258
259 static int seg_remainder(struct fs *fs, struct segpos *seg)
260 {
261         /* return the number of blocks from the start of segpos to the
262          * end of the segment.
263          * i.e. remaining rows in this table, plus remaining tables in
264          * this segment
265          */
266         struct fs_dev *dv = &fs->devs[seg->dev];
267         int rows = dv->rows_per_table - seg->st_row;
268
269         BUG_ON(seg->dev < 0);
270         rows += dv->rows_per_table * (dv->tables_per_seg - seg->st_table - 1);
271         return rows * dv->width;
272 }
273
274 static void seg_step(struct fs *fs, struct segpos *seg)
275 {
276         /* reposition this segpos to be immediately after it's current end
277          * and make the 'current' point be the start.
278          * Size will be empty
279          */
280         seg->st_table = seg->nxt_table;
281         seg->st_row = seg->nxt_row;
282         seg->table = seg->st_table;
283         seg->row = seg->nxt_row;
284         seg->col = 0;
285 }
286
287 static u32 seg_setsize(struct fs *fs, struct segpos *seg, u32 size)
288 {
289         /* move the 'nxt' table/row to be 'size' blocks beyond
290          * current start. size will be rounded up to a multiple
291          * of width.
292          */
293         struct fs_dev *dv = &fs->devs[seg->dev];
294         u32 rv;
295         BUG_ON(seg->dev < 0);
296         size = (size + dv->width - 1) / dv->width;
297         rv = size * dv->width;
298         size += seg->st_row;
299         seg->nxt_table = seg->st_table + size / dv->rows_per_table;
300         seg->nxt_row = size % dv->rows_per_table;
301         return rv;
302 }
303
304 static void seg_setpos(struct fs *fs, struct segpos *seg, u64 addr)
305 {
306         /* set seg to start at the given virtual address, and be
307          * of size 0
308          * 'addr' should always be at the start of a row, though
309          * as it might be read off storage, we need to be
310          * a little bit careful.
311          */
312         u32 offset;
313         u32 row, col, table;
314         struct fs_dev *dv;
315         virttoseg(fs, addr, &seg->dev, &seg->num, &offset);
316         dv = &fs->devs[seg->dev];
317         col = offset / dv->stride;
318         row = offset % dv->stride;
319         table = col / dv->width;
320         col = col % dv->width;
321         BUG_ON(col);
322
323         seg->st_table = table;
324         seg->st_row = row;
325
326         seg->table = seg->nxt_table = seg->st_table;
327         seg->row = seg->nxt_row = seg->st_row;
328         seg->col = 0;
329 }
330
331 static u64 seg_next(struct fs *fs, struct segpos *seg)
332 {
333         /* step forward one block, returning the address of
334          * the block stepped over
335          */
336         struct fs_dev *dv = &fs->devs[seg->dev];
337
338         u64 addr = seg->col * dv->stride;
339         addr += seg->row;
340         addr += seg->table * (dv->stride);  /* FIXME */
341         addr += seg->num * dv->segment_stride;
342         addr += dv->start;
343
344         /* now step forward in column or table or seg */
345         seg->col++;
346         if (seg->col >= dv->width) {
347                 seg->col = 0;
348                 seg->row++;
349                 if (seg->row >= dv->rows_per_table) {
350                         seg->row = 0;
351                         seg->table++;
352                 }
353         }
354         return addr;
355 }
356
357 static u64 seg_addr(struct fs *fs, struct segpos *seg)
358 {
359         /* step forward one block, returning the address of
360          * the block stepped over. FIXME this comment is wrong.
361          */
362         struct fs_dev *dv = &fs->devs[seg->dev];
363         u64 addr;
364
365         if (seg->dev < 0)
366                 /* Setting 'next' address for last cluster in
367                  * a cleaner segment */
368                 return 0;
369         addr = seg->col * dv->stride;
370         addr += seg->row;
371         addr += seg->table * (dv->stride);  /* FIXME */
372         addr += seg->num * dv->segment_stride;
373         addr += dv->start;
374         return addr;
375 }
376
377 static void cluster_reset(struct fs *fs, struct wc *wc)
378 {
379         int blocksize = fs->blocksize;
380         if (wc->seg.dev < 0) {
381                 wc->remaining = 0;
382                 wc->chead_size = 0;
383                 return;
384         }
385         wc->remaining = seg_remainder(fs, &wc->seg);
386         wc->chead_blocks = 1;
387         wc->remaining--;
388         wc->cluster_space = blocksize - sizeof(struct cluster_head);
389         wc->chead = page_address(wc->page[wc->pending_next]);
390         wc->chead_size = sizeof(struct cluster_head);
391 }
392
393 static void close_segment(struct fs *fs, int cnum)
394 {
395         /* Release old segment */
396         /* FIXME I need lafs_seg_{de,}ref for every snapshot,
397          * don't I ???
398          */
399         struct wc *wc = fs->wc + cnum;
400         if (wc->seg.dev >= 0) {
401                 if (cnum) {
402                         spin_lock(&fs->lock);
403                         fs->clean_reserved -= seg_remainder(fs, &wc->seg);
404                         spin_unlock(&fs->lock);
405                 }
406                 lafs_seg_forget(fs, wc->seg.dev, wc->seg.num);
407                 wc->seg.dev = -1;
408         }
409 }
410
411 void lafs_close_all_segments(struct fs *fs)
412 {
413         int cnum;
414         for (cnum = 0; cnum < WC_NUM; cnum++)
415                 close_segment(fs, cnum);
416 }
417
418 static int new_segment(struct fs *fs, int cnum)
419 {
420         /* new_segment can fail if cnum > 0 and there is no
421          * clean_reserved
422          */
423         struct wc *wc = fs->wc + cnum;
424         unsigned int dev;
425         u32 seg;
426
427         close_segment(fs, cnum);
428
429         if (cnum && fs->clean_reserved < fs->max_segment) {
430                 /* we have reached the end of the last cleaner
431                  * segment.  The remainder of clean_reserved
432                  * is of no value (if there even is any)
433                  */
434                 if (fs->clean_reserved) {
435                         spin_lock(&fs->alloc_lock);
436                         fs->free_blocks += fs->clean_reserved;
437                         fs->clean_reserved = 0;
438                         spin_unlock(&fs->alloc_lock);
439                 }
440                 return -ENOSPC;
441         }
442         /* This gets a reference on the 'segsum' */
443         lafs_free_get(fs, &dev, &seg, 0);
444         seg_setpos(fs, &wc->seg, segtovirt(fs, dev, seg));
445         BUG_ON(wc->seg.dev != dev);
446         BUG_ON(wc->seg.num != seg);
447
448         if (cnum == 0)
449                 /* wc mutex protects us here */
450                 fs->newsegments++;
451
452         return 0;
453 }
454
455 /*-------------------------------------------------------------------------
456  * lafs_cluster_allocate
457  * This adds a block to the writecluster list and possibly triggers a
458  * flush.
459  * The flush will assign a physical address to each block and schedule
460  * the actual write out.
461  *
462  * There can be multiple active write clusters. e.g one for new data,
463  * one for cleaned data, one for defrag writes.
464  *
465  * It is here that the in-core inode is copied to the data block, if
466  * appropriate, and here that small files get copied into the inode
467  * thus avoiding writing a separate block for the file content.
468  *
469  * Roll-forward sees any data block as automatically extending the
470  * size of the file.  This requires that we write blocks in block-order
471  * as much as possible (which we would anyway) and that we inform
472  * roll-forward when a file ends mid-block.
473  *
474  * A block should be iolocked when being passed to lafs_cluster_allocate.
475  * When the IO is complete, it will be unlocked.
476  */
477 static int flush_data_to_inode(struct block *b)
478 {
479         /* This is the first and only datablock in a file
480          * and it will fit in the inode. So put it there.
481          */
482
483         char *ibuf, *dbuf;
484         loff_t size = i_size_read(b->inode);
485         struct lafs_inode *lai = LAFSI(b->inode);
486         struct datablock *db;
487         struct fs *fs = fs_from_inode(b->inode);
488
489         lafs_iolock_block(&lai->iblock->b);
490         dprintk("FLUSHING single block into inode - s=%d %s\n",
491                 (int)size, strblk(b));
492         if (lai->iblock->depth > 1)
493                 /* Too much indexing still - truncate must be happening */
494                 goto give_up;
495         if (lai->iblock->depth == 1) {
496                 /* If there is nothing in this block we can still use it */
497                 if (lai->iblock->children.next != &b->siblings ||
498                     b->siblings.next != &lai->iblock->children ||
499                     lafs_leaf_next(lai->iblock, 1) != 0xffffffff)
500                         goto give_up;
501         }
502         /* Need checkpoint lock to pin the dblock */
503         lafs_checkpoint_lock(fs);
504         if (test_and_clear_bit(B_Dirty, &b->flags)) {
505                 int credits = 1;
506                 /* Check size again, just in case it changed */
507                 size = i_size_read(b->inode);
508                 if (size + lai->metadata_size > fs->blocksize) {
509                         /* Lost a race, better give up restoring Dirty bit */
510                         if (test_and_set_bit(B_Dirty, &b->flags))
511                                 if (test_and_set_bit(B_Credit, &b->flags))
512                                         lafs_space_return(fs, 1);
513                         lafs_checkpoint_unlock(fs);
514                         goto give_up;
515                 }
516                 if (!test_and_set_bit(B_Credit, &lai->dblock->b.flags))
517                         credits--;
518                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
519                         credits++;
520                 if (!test_and_set_bit(B_ICredit, &lai->dblock->b.flags))
521                         credits--;
522                 lafs_space_return(fs, credits);
523                 LAFS_BUG(!test_bit(B_SegRef, &lai->dblock->b.flags), &lai->dblock->b);
524                 set_bit(B_PinPending, &lai->dblock->b.flags);
525                 if (test_bit(B_Pinned, &b->flags))
526                         lafs_pin_block_ph(&lai->dblock->b, !!test_bit(B_Phase1, &b->flags));
527                 else
528                         lafs_pin_block(&lai->dblock->b);
529                 lafs_dirty_dblock(lai->dblock);
530         } else if (test_and_clear_bit(B_Realloc, &b->flags)) {
531                 int credits = 1;
532                 LAFS_BUG(!test_bit(B_Valid, &lai->iblock->b.flags),
533                          &lai->iblock->b);
534                 if (!test_and_set_bit(B_Realloc, &lai->iblock->b.flags))
535                         credits--;
536                 if (!test_and_set_bit(B_UnincCredit, &lai->iblock->b.flags))
537                         if (!test_and_clear_bit(B_ICredit,
538                                                 &lai->iblock->b.flags))
539                                 if (!test_and_clear_bit(B_UnincCredit, &b->flags))
540                                         LAFS_BUG(1, b); /* We needed an UnincCredit */
541                 lafs_space_return(fs, credits);
542         } else {
543                 printk("Wasn't dirty or realloc: %s\n", strblk(b));
544                 LAFS_BUG(1, b);
545         }
546         lafs_checkpoint_unlock(fs);
547         lai->depth = 0;
548         lai->iblock->depth = 0;
549         clear_bit(B_Valid, &lai->iblock->b.flags);
550
551         /* safe to reference ->dblock as b is a dirty child */
552         db = getdref(lai->dblock, MKREF(flush2db));
553         ibuf = map_dblock(db);
554         dbuf = map_dblock_2(dblk(b));
555
556         if (test_and_clear_bit(B_UnincCredit, &b->flags))
557                 lafs_space_return(fs, 1);
558
559         /* It is an awkward time to call lafs_inode_fillblock,
560          * so do this one little change manually
561          */
562         ((struct la_inode *)ibuf)->depth = 0;
563         memcpy(ibuf + lai->metadata_size,
564                dbuf, size);
565         memset(ibuf + lai->metadata_size + size,
566                0,
567                fs->blocksize - lai->metadata_size - size);
568         unmap_dblock_2(db, ibuf);
569         unmap_dblock(dblk(b), dbuf);
570         lafs_iounlock_block(&lai->iblock->b);
571
572         putdref(db, MKREF(flush2db));
573         lafs_refile(b, 0);
574         return 1;
575
576 give_up:
577         lafs_iounlock_block(&lai->iblock->b);
578         return 0;
579 }
580
581 int lafs_cluster_allocate(struct block *b, int cnum)
582 {
583         struct fs *fs = fs_from_inode(b->inode);
584         struct wc *wc = &fs->wc[cnum];
585         struct lafs_inode *lai;
586         loff_t size;
587         int used;
588         int err = 0;
589
590         LAFS_BUG(test_bit(B_Index, &b->flags) &&
591                  iblk(b)->uninc_table.pending_cnt > 0, b);
592
593         LAFS_BUG(!test_bit(B_IOLock, &b->flags), b);
594         lai = LAFSI(b->inode);
595
596         LAFS_BUG(!test_bit(B_Valid, &b->flags), b);
597         LAFS_BUG(!fs->rolled, b);
598         LAFS_BUG(lai->flags & File_nonlogged &&
599                  !test_bit(B_Index, &b->flags), b);
600
601         /* We cannot change the physaddr of an uniblock in a
602          * different phase to the parent, else the parent will
603          * get the wrong address.
604          */
605         LAFS_BUG(test_bit(B_Index, &b->flags) &&
606                  test_bit(B_Uninc, &b->flags) &&
607                  !!test_bit(B_Phase1, &b->flags) !=
608                  !!test_bit(B_Phase1, &b->parent->b.flags), b);
609
610         size = i_size_read(b->inode);
611         if (!test_bit(B_Index, &b->flags) &&          /* it is a datablock */
612             b->fileaddr == 0 &&
613             b->parent == lai->iblock &&               /* No indexing */
614             lai->type >= TypeBase &&                  /* 'size' is meaningful */
615             size + lai->metadata_size <= fs->blocksize) {
616                 int success = flush_data_to_inode(b);
617                 if (success) {
618                         lafs_summary_update(fs, b->inode, b->physaddr, 0,
619                                             0, !!test_bit(B_Phase1, &b->flags),
620                                             test_bit(B_SegRef, &b->flags));
621                         b->physaddr = 0;
622                         lafs_iounlock_block(b);
623                         return 0;
624                 }
625                 /* There are conflicting blocks in the index tree */
626                 if (test_and_clear_bit(B_Realloc, &b->flags))
627                         lafs_space_return(fs, 1);
628                 if (!test_bit(B_Dirty, &b->flags) ||
629                     !test_bit(B_Pinned, &b->flags)) {
630                         /* It is safe to just leave this for later */
631                         lafs_iounlock_block(b);
632                         return 0;
633                 }
634                 /* We really need to write this, so use a full block. */
635                 printk("WARNING %s\n", strblk(b));
636                 WARN_ON(1);
637         }
638
639         /* The symbiosis between the datablock and the indexblock for an
640          * inode needs to be dealt with here.
641          * The data block will not normally be written until the InoIdx
642          * block is unpinned or phase-flipped.  However if it is, as the
643          * result of some 'sync' style operation, then we write it even
644          * though the index info might not be uptodate.  The rest of the
645          * content will be safe for roll-forward to pick up, and the rest
646          * will be written when the InoIdx block is ready.
647          *
648          * When the InoIdx block is ready, we don't want to write it as
649          * there is nowhere for it to be incorporated in to.  Rather we
650          * move the pinning and credits across to the Data block
651          * which will subsequently be written.
652          */
653
654         if (test_bit(B_InoIdx, &b->flags)) {
655                 struct block *b2 = &lai->dblock->b;
656                 int credits = 0;
657
658                 LAFS_BUG(!test_bit(B_Pinned, &b->flags), b);
659                 if (!test_bit(B_Pinned, &b2->flags)) {
660                         /* index block pinned, data block not,
661                          * carry the pinning across
662                          */
663                         if (b2->parent == NULL &&
664                             b->parent)
665                                 b2->parent =
666                                         getiref(b->parent, MKREF(child));
667                         LAFS_BUG(b->parent != b2->parent, b);
668                         set_bit(B_PinPending, &b2->flags);  // FIXME is this right?
669                         set_phase(b2, test_bit(B_Phase1,
670                                                &b->flags));
671                         lafs_refile(b2, 0);
672                 }
673
674                 if (cnum) {
675                         if (test_and_clear_bit(B_Realloc, &b->flags))
676                                 credits++;
677                         LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
678                 } else {
679                         if (test_and_clear_bit(B_Dirty, &b->flags))
680                                 credits++;
681                         if (test_and_clear_bit(B_Realloc, &b->flags))
682                                 credits++;
683                 }
684                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
685                         credits++;
686
687                 /* We always erase the InoIdx block before the
688                  * inode data block, so this cannot happen
689                  */
690                 LAFS_BUG(!test_bit(B_Valid, &b2->flags), b2);
691
692                 if (!test_and_set_bit(B_UnincCredit, &b2->flags))
693                         if (!test_and_clear_bit(B_ICredit, &b2->flags))
694                                 credits--;
695
696                 if (cnum == 0) {
697                         if (!test_bit(B_Dirty, &b2->flags))
698                                 /* need some credits... */
699                                 if (!test_and_set_bit(B_Credit, &b2->flags))
700                                         credits--;
701                         lafs_dirty_dblock(dblk(b2));
702                 } else {
703                         if (!test_and_set_bit(B_Realloc, &b2->flags))
704                                 credits--;
705                 }
706                 LAFS_BUG(credits < 0, b2);
707                 lafs_space_return(fs, credits);
708                 /* make sure 'dirty' status is registered */
709                 lafs_refile(b2, 0);
710                 lafs_iounlock_block(b);
711                 return 0;
712         }
713
714         if (!test_bit(B_Dirty, &b->flags) &&
715             !test_bit(B_Realloc, &b->flags)) {
716                 /* block just got truncated, so don't write it. */
717                 lafs_iounlock_block(b);
718                 return 0;
719         }
720
721         if (test_bit(B_EmptyIndex, &b->flags)) {
722                 int credits = 0;
723                 if (test_and_set_bit(B_Writeback, &b->flags))
724                         LAFS_BUG(1, b);
725                 lafs_iounlock_block(b);
726                 lafs_allocated_block(fs, b, 0);
727                 if (cnum) {
728                         if (test_and_clear_bit(B_Realloc, &b->flags))
729                                 credits++;
730                         LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
731                 } else {
732                         if (test_and_clear_bit(B_Dirty, &b->flags))
733                                 credits++;
734                         if (test_and_clear_bit(B_Realloc, &b->flags))
735                                 credits++;
736                 }
737                 lafs_writeback_done(b);
738                 lafs_space_return(fs, credits);
739                 return 0;
740         }
741
742         if (!test_bit(B_Index, &b->flags)) {
743                 struct inode *myi = rcu_my_inode(dblk(b));
744                 if (myi && test_and_clear_bit(I_Dirty, &LAFSI(myi)->iflags))
745                         lafs_inode_fillblock(myi);
746                 rcu_iput(myi);
747         }
748
749         mutex_lock(&wc->lock);
750
751         /* The following check must be inside the mutex_lock
752          * to ensure exclusion between checkpoint and writepage.
753          * checkpoint must never see the block not on the
754          * leaf lru unless it is already in the cluster and so can
755          * be waited for.
756          */
757         if (!list_empty_careful(&b->lru)) {
758                 /* Someone is flushing this block before
759                  * checkpoint got to it - probably writepage.
760                  * Must remove it from the leaf lru so it can go
761                  * on the cluster list.
762                  */
763                 LAFS_BUG(test_bit(B_OnFree, &b->flags), b);
764                 spin_lock(&fs->lock);
765                 if (!list_empty(&b->lru))
766                         list_del_init(&b->lru);
767                 spin_unlock(&fs->lock);
768         }
769
770         if (test_and_set_bit(B_Writeback, &b->flags))
771                 LAFS_BUG(1, b);
772         lafs_iounlock_block(b);
773
774         getref(b, MKREF(cluster)); /* we soon own a reference in the list */
775
776         for (;;) {
777                 int avail;
778                 /* see how much room is in cluster.
779                  * The interesting values are:
780                  * none, one descriptor, one grouphead and one descriptor
781                  * These are assigned values 0, 1, 2.
782                  */
783                 if (wc->cluster_space >= (sizeof(struct group_head) +
784                                           sizeof(struct descriptor)) ||
785                     (wc->remaining > 2 &&
786                      wc->chead_blocks < MAX_CHEAD_BLOCKS &&
787                      (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE)
788                         )
789                         avail = 2;
790                 else if (wc->cluster_space >= sizeof(struct descriptor))
791                         avail = 1;
792                 else
793                         avail = 0;
794                 if (wc->seg.dev < 0 || wc->remaining < 1)
795                         used = -1;
796                 else
797                         used = cluster_insert(&wc->slhead, &wc->clhead, b, avail);
798
799                 if (used >= 0)
800                         break;
801                 else if (wc->slhead.b)
802                         cluster_flush(fs, cnum);
803                 else {
804                         err = new_segment(fs, cnum);
805                         if (err) {
806                                 /* No cleaner segments - this will have to go
807                                  * out with a checkpoint.
808                                  * Block is still 'dirty' - just clear
809                                  * writeback flag.
810                                  */
811                                 lafs_writeback_done(b);
812                                 putref(b, MKREF(cluster));
813                                 mutex_unlock(&wc->lock);
814                                 return -ENOSPC;
815                         }
816                         cluster_reset(fs, wc);
817                 }
818         }
819
820         if (used > 0)
821                 wc->cluster_space -= sizeof(struct descriptor);
822         if (used > 1)
823                 wc->cluster_space -= sizeof(struct group_head);
824         if (wc->cluster_space < 0) {
825                 /* need a new page */
826                 wc->chead_blocks++;
827                 wc->remaining--;
828                 wc->cluster_space += fs->blocksize;
829         }
830         wc->remaining--;
831         BUG_ON(wc->remaining < 0);
832         if (wc->remaining == 0)
833                 cluster_flush(fs, cnum);
834         mutex_unlock(&wc->lock);
835         return 0;
836 }
837
838 /*-------------------------------------------------------------------------
839  * Cluster head building.
840  * We build the cluster head bit by bit as we find blocks
841  * in the list.  These routines help.
842  */
843
844 static inline void cluster_addhead(struct wc *wc, struct inode *ino,
845                                    struct group_head **headstart)
846 {
847         struct group_head *gh = (struct group_head *)((char *)wc->chead +
848                                                       wc->chead_size);
849         u16 tnf;
850         dprintk("CLUSTER addhead %d\n", wc->chead_size);
851         *headstart = gh;
852
853         gh->inum = cpu_to_le32(ino->i_ino);
854         gh->fsnum = cpu_to_le32(ino_from_sb(ino->i_sb)->i_ino);
855         tnf = ((ino->i_generation<<8) | (LAFSI(ino)->trunc_gen & 0xff))
856                 & 0x7fff;
857         if (wc->cnum)
858                 tnf |= 0x8000;
859         gh->truncatenum_and_flag = cpu_to_le16(tnf);
860         wc->chead_size += sizeof(struct group_head);
861 }
862
863 static inline void cluster_closehead(struct wc *wc,
864                                      struct group_head *headstart)
865 {
866         int size = wc->chead_size - (((char *)headstart) - (char *)wc->chead);
867
868         dprintk("CLUSTER closehead %d %d\n", wc->chead_size, size);
869         headstart->group_size_words = size / 4;
870 }
871
872 static inline void cluster_addmini(struct wc *wc, u32 addr, int offset,
873                                    int size, const char *data,
874                                    int size2, const char *data2)
875 {
876         /* if size2 !=0, then only
877          * (size-size2) is at 'data' and the rest is at 'data2'
878          */
879         struct miniblock *mb = ((struct miniblock *)
880                                 ((char *)wc->chead + wc->chead_size));
881
882         dprintk("CLUSTER addmini %d %d\n", wc->chead_size, size);
883
884         mb->block_num = cpu_to_le32(addr);
885         mb->block_offset = cpu_to_le16(offset);
886         mb->length = cpu_to_le16(size + DescMiniOffset);
887         wc->chead_size += sizeof(struct miniblock);
888         memcpy(mb->data, data, size-size2);
889         if (size2)
890                 memcpy(mb->data + size-size2, data2, size2);
891         memset(mb->data+size, 0, (-size)&3);
892         wc->chead_size += ROUND_UP(size);
893 }
894
895 static inline void cluster_adddesc(struct wc *wc, struct block *blk,
896                                    struct descriptor **desc_start)
897 {
898         struct descriptor *dh = (struct descriptor *)((char *)wc->chead +
899                                                       wc->chead_size);
900         *desc_start = dh;
901         dprintk("CLUSTER add_desc %d\n", wc->chead_size);
902         dh->block_num = cpu_to_le32(blk->fileaddr);
903         dh->block_cnt = cpu_to_le32(0);
904         dh->block_bytes = 0;
905         if (test_bit(B_Index, &blk->flags))
906                 dh->block_bytes = DescIndex;
907         wc->chead_size += sizeof(struct descriptor);
908 }
909
910 static inline void cluster_incdesc(struct wc *wc, struct descriptor *desc_start,
911                                    struct block *b, int blkbits)
912 {
913         desc_start->block_cnt =
914                 cpu_to_le32(le32_to_cpu(desc_start->block_cnt)+1);
915         if (!test_bit(B_Index, &b->flags)) {
916                 if (LAFSI(b->inode)->type >= TypeBase) {
917                         u64 size = b->inode->i_size;
918                         if (size > ((loff_t)b->fileaddr << blkbits) &&
919                             size <= ((loff_t)(b->fileaddr + 1) << blkbits))
920                                 desc_start->block_bytes =
921                                         cpu_to_le32(size & ((1<<blkbits)-1));
922                         else
923                                 desc_start->block_bytes =
924                                         cpu_to_le32(1 << blkbits);
925                 } else
926                         desc_start->block_bytes = cpu_to_le32(1<<blkbits);
927         }
928 }
929
930 /*------------------------------------------------------------------------
931  * Miniblocks/updates
932  * Updates are added to the cluster head immediately, causing a flush
933  * if there is insufficient room.
934  * This means that miniblocks always occur before block descriptors.
935  * miniblocks are only ever written to cluster-series 0.
936  */
937
938 int
939 lafs_cluster_update_prepare(struct update_handle *uh, struct fs *fs,
940                             int len)
941 {
942         /* make there there is room for a 'len' update in upcoming
943          * cluster.  Do nothing here.
944          */
945         uh->fs = fs;
946         uh->reserved = 0;
947         return 0;
948 }
949
950 int
951 lafs_cluster_update_pin(struct update_handle *uh)
952 {
953         /* last chance to bail out */
954         if (lafs_space_alloc(uh->fs, 1, ReleaseSpace)) {
955                 uh->reserved = 1;
956                 uh->fs->cluster_updates++;
957                 return 0;
958         }
959         return -EAGAIN;
960 }
961
962 static unsigned long long
963 lafs_cluster_update_commit_both(struct update_handle *uh, struct fs *fs,
964                                 struct inode *ino, u32 addr,
965                                 int offset, int len, struct datablock *b,
966                                 const char *str1,
967                                 int len2, const char *str2)
968 {
969         /* There is the data - alternate layout */
970         /* flush 'size' bytes of data, as 'offset' of bnum in ino
971          * in the next write cluster
972          */
973         struct wc *wc = &fs->wc[0];
974         struct group_head *head_start;
975         unsigned long long seq;
976         char *mapping = NULL;
977
978         BUG_ON(!fs->rolled);
979
980         mutex_lock(&wc->lock);
981         while (wc->cluster_space < (sizeof(struct group_head)+
982                                     sizeof(struct descriptor) +
983                                     ROUND_UP(len))) {
984                 if (wc->remaining > 0 && wc->chead_blocks < MAX_CHEAD_BLOCKS &&
985                     (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE) {
986                         wc->chead_blocks++;
987                         wc->remaining--;
988                         wc->cluster_space += fs->blocksize;
989                 } else
990                         cluster_flush(fs, 0);
991         }
992
993         fs->cluster_updates -= uh->reserved;
994         lafs_space_return(fs, uh->reserved);
995         uh->reserved = 0;
996
997         cluster_addhead(wc, ino, &head_start);
998         if (b) {
999                 mapping = map_dblock(b);
1000                 str1 = mapping + offset;
1001         }
1002         cluster_addmini(wc, addr, offset, len, str1, len2, str2);
1003         if (b)
1004                 unmap_dblock(b, mapping);
1005         cluster_closehead(wc, head_start);
1006         wc->cluster_space -= (sizeof(struct group_head)+
1007                               sizeof(struct descriptor) +
1008                               ROUND_UP(len));
1009         seq = wc->cluster_seq;
1010         mutex_unlock(&wc->lock);
1011         return seq;
1012 }
1013
1014 unsigned long long
1015 lafs_cluster_update_commit(struct update_handle *uh,
1016                            struct datablock *b,
1017                            int offset, int len)
1018 {
1019         /* OK here is the data to put in the next cluster. */
1020         struct fs *fs = fs_from_inode(b->b.inode);
1021         unsigned long long seq;
1022
1023         seq = lafs_cluster_update_commit_both(uh, fs, b->b.inode, b->b.fileaddr,
1024                                               offset, len, b, NULL, 0, NULL);
1025
1026         return seq;
1027 }
1028
1029 unsigned long long
1030 lafs_cluster_update_commit_buf(struct update_handle *uh, struct fs *fs,
1031                                struct inode *ino, u32 addr,
1032                                int offset, int len, const char *str1,
1033                                int len2, const char *str2)
1034 {
1035         return lafs_cluster_update_commit_both(uh, fs,
1036                                                ino, addr,
1037                                                offset, len,
1038                                                NULL,
1039                                                str1, len2, str2);
1040 }
1041
1042 void
1043 lafs_cluster_update_abort(struct update_handle *uh)
1044 {
1045         /* Didn't want that cluster_update after all */
1046         uh->fs->cluster_updates -= uh->reserved;
1047         lafs_space_return(uh->fs, uh->reserved);
1048 }
1049
1050 int lafs_calc_cluster_csum(struct cluster_head *head)
1051 {
1052         unsigned int oldcsum = head->checksum;
1053         unsigned long long newcsum = 0;
1054         unsigned long csum;
1055         int i;
1056         unsigned int *superc = (unsigned int *) head;
1057         head->checksum = 0;
1058
1059         for (i = 0; i < le16_to_cpu(head->Hlength)/4; i++)
1060                 newcsum += le32_to_cpu(superc[i]);
1061         csum = (newcsum & 0xffffffff) + (newcsum>>32);
1062         head->checksum = oldcsum;
1063         return cpu_to_le32(csum);
1064 }
1065
1066 /*------------------------------------------------------------------------
1067  * Cluster flushing.
1068  * Once we have a suitable number of blocks and updates in a writecluster
1069  * we need to write them out.
1070  * We first iterate over the block list building the clusterhead.
1071  * Then we finalise and write the clusterhead and iterate again
1072  * writing out each block.
1073  * We keep a biassed blockcount in the wc, and decrement it on write-completion.
1074  * Once the blockcount hits the bias the cluster is written.  Then when
1075  * the next (or next-plus-one) cluster head is written we remove the bias
1076  * and the data is deemed to be safe.
1077  *
1078  * Update blocks will already have been included in the write cluster.
1079  *
1080  * We reinitialise the write cluster once we are finished.
1081  */
1082
1083 static void cluster_done(struct fs *fs, struct wc *wc)
1084 {
1085         /* This is called when one of the pending_cnts hit 0, indicating
1086          * that a write-cluster has been written and committed to storage.
1087          * We find all the pending blocks on a completed cluster and
1088          * acknowledge them.
1089          */
1090         int i;
1091         struct block *b, *tmp;
1092         struct list_head tofree;
1093
1094         INIT_LIST_HEAD(&tofree);
1095
1096         spin_lock(&fs->lock);
1097         for (i = 0; i < 4 ; i++)
1098                 if (atomic_read(&wc->pending_cnt[i]) == 0)
1099                         list_splice_init(&wc->pending_blocks[i], &tofree);
1100         spin_unlock(&fs->lock);
1101
1102         list_for_each_entry_safe(b, tmp, &tofree, lru) {
1103                 list_del_init(&b->lru);
1104                 lafs_writeback_done(b);
1105                 putref(b, MKREF(cluster));
1106         }
1107 }
1108
1109 void lafs_clusters_done(struct fs *fs)
1110 {
1111         int i;
1112         for (i = 0 ; i < WC_NUM; i++)
1113                 cluster_done(fs, &fs->wc[i]);
1114 }
1115
1116 void lafs_done_work(struct work_struct *ws)
1117 {
1118         struct fs *fs = container_of(ws, struct fs, done_work);
1119         lafs_clusters_done(fs);
1120 }
1121
1122 static void cluster_flush(struct fs *fs, int cnum)
1123 {
1124         struct wc *wc = &fs->wc[cnum];
1125         int i;
1126         struct block *b;
1127         struct inode *current_inode = NULL;
1128         u64 current_block = NoBlock;
1129         u64 head_addr[MAX_CHEAD_BLOCKS];
1130         struct descriptor *desc_start = NULL;
1131         struct group_head *head_start = NULL;
1132         struct segpos segend;
1133         int which;
1134         int wake;
1135         int blocks = 0;
1136         int vtype;
1137         int cluster_size;
1138
1139         if (!test_bit(CheckpointOpen, &fs->fsstate)) {
1140                 /* First cluster since a checkpoint, make sure
1141                  * we do another checkpoint within 30 seconds
1142                  */
1143                 fs->next_checkpoint = jiffies + 30 * HZ;
1144                 set_bit(CheckpointOpen, &fs->fsstate);
1145                 lafs_wake_thread(fs);
1146         }
1147         /* set the writecluster size as this will guide layout in
1148          * some cases
1149          */
1150
1151         if (test_and_clear_bit(FlushNeeded, &fs->fsstate))
1152                 set_bit(SecondFlushNeeded, &fs->fsstate);
1153         else
1154                 clear_bit(SecondFlushNeeded, &fs->fsstate);
1155
1156         cluster_size = seg_setsize(fs, &wc->seg,
1157                                    seg_remainder(fs, &wc->seg) - wc->remaining);
1158
1159         /* find, and step over, address header block(s) */
1160         for (i = 0; i < wc->chead_blocks ; i++)
1161                 head_addr[i] = seg_next(fs, &wc->seg);
1162
1163         list_for_each_entry(b, &wc->clhead, lru) {
1164                 u64 addr;
1165                 if (b->inode != current_inode) {
1166                         /* need to create a new group_head */
1167                         desc_start = NULL;
1168                         if (head_start)
1169                                 cluster_closehead(wc, head_start);
1170                         cluster_addhead(wc, b->inode, &head_start);
1171                         current_inode = b->inode;
1172                         current_block = NoBlock;
1173                 }
1174                 if (desc_start == NULL || b->fileaddr != current_block+1 ||
1175                     test_bit(B_Index, &b->flags)) {
1176                         cluster_adddesc(wc, b, &desc_start);
1177                         current_block = b->fileaddr;
1178                 } else
1179                         current_block++;
1180                 cluster_incdesc(wc, desc_start, b, fs->blocksize_bits);
1181                 addr = seg_next(fs, &wc->seg);
1182                 if (cnum && test_bit(B_Dirty, &b->flags))
1183                         /* Were a cleaning but this block is now dirty.
1184                          * Don't waste the UnincCredit on recording the
1185                          * clean location as it will be written as
1186                          * dirty soon.  Just pin it to the current phase
1187                          * to ensure that it really does get written.
1188                          */
1189                         lafs_pin_block(b);
1190                 else
1191                         lafs_allocated_block(fs, b, addr);
1192                 blocks++;
1193         }
1194
1195         if (head_start)
1196                 cluster_closehead(wc, head_start);
1197         segend = wc->seg; /* We may write zeros from here */
1198         seg_step(fs, &wc->seg);
1199         wc->remaining = seg_remainder(fs, &wc->seg);
1200         /* Need to make sure out ->next_addr gets set properly
1201          * for non-cleaning segments
1202          */
1203         if (wc->remaining < 2) {
1204                 if (cnum == 0)
1205                         new_segment(fs, cnum);
1206                 else
1207                         close_segment(fs, cnum);
1208         }
1209
1210         /* Fill in the cluster header */
1211         strncpy(wc->chead->idtag, "LaFSHead", 8);
1212         if (cnum)
1213                 wc->chead->flags = cpu_to_le32(0);
1214         else {
1215                 /* checkpoints only happen in cnum==0 */
1216                 if (fs->checkpointing) {
1217                         spin_lock(&fs->lock);
1218                         wc->chead->flags = cpu_to_le32(fs->checkpointing);
1219                         if (fs->checkpointing & CH_CheckpointEnd) {
1220                                 fs->checkpointing = 0;
1221                                 clear_bit(CheckpointOpen, &fs->fsstate);
1222                         } else if (fs->checkpointing & CH_CheckpointStart) {
1223                                 fs->checkpointcluster = head_addr[0];
1224                                 fs->checkpointing &= ~CH_CheckpointStart;
1225                         }
1226                         spin_unlock(&fs->lock);
1227                         if (!fs->checkpointing)
1228                                 wake_up(&fs->phase_wait);
1229                 } else
1230                         wc->chead->flags = 0;
1231         }
1232         memcpy(wc->chead->uuid, fs->state->uuid, 16);
1233         wc->chead->seq = cpu_to_le64(wc->cluster_seq);
1234         wc->cluster_seq++;
1235         wc->chead->Hlength = cpu_to_le16(wc->chead_size);
1236         wc->chead->Clength = cpu_to_le16(cluster_size);
1237         spin_lock(&fs->lock);
1238         if (cnum)
1239                 fs->clean_reserved -= cluster_size;
1240         else
1241                 fs->free_blocks -= cluster_size;
1242         spin_unlock(&fs->lock);
1243         /* FIXME if this is just a header, no data blocks,
1244          * then use VerifyNull.  Alternately if there is
1245          * no-one waiting for a sync to complete (how do we
1246          * track that?) use VerifyNext2
1247          */
1248         vtype = VerifyNext;
1249         if (cnum)
1250                 vtype = VerifyNull;
1251         wc->pending_vfy_type[wc->pending_next] = vtype;
1252         wc->chead->verify_type = cpu_to_le16(vtype);
1253         memset(wc->chead->verify_data, 0, 16);
1254
1255         wc->chead->next_addr = cpu_to_le64(seg_addr(fs, &wc->seg));
1256         wc->chead->prev_addr = cpu_to_le64(wc->prev_addr);
1257         wc->prev_addr = head_addr[0];
1258         wc->chead->this_addr = cpu_to_le64(wc->prev_addr);
1259
1260         wc->chead->checksum = lafs_calc_cluster_csum(wc->chead);
1261
1262         /* We cannot write the header until any previous cluster
1263          * which uses the header as a commit block has been
1264          * fully written
1265          */
1266         which = (wc->pending_next+3)%4;
1267         dprintk("AA which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1268                 atomic_read(&wc->pending_cnt[which]));
1269         if (wc->pending_vfy_type[which] == VerifyNext)
1270                 wait_event(wc->pending_wait,
1271                            atomic_read(&wc->pending_cnt[which]) == 1);
1272         which = (which+3) % 4;
1273         dprintk("AB which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1274                 atomic_read(&wc->pending_cnt[which]));
1275         if (wc->pending_vfy_type[which] == VerifyNext2)
1276                 wait_event(wc->pending_wait,
1277                            atomic_read(&wc->pending_cnt[which]) == 1);
1278
1279         lafs_clusters_done(fs);
1280         dprintk("cluster_flush pre-bug pending_next=%d cnt=%d\n",
1281                 wc->pending_next, atomic_read(&wc->pending_cnt
1282                                               [wc->pending_next]));
1283         BUG_ON(atomic_read(&wc->pending_cnt[wc->pending_next]) != 0);
1284         BUG_ON(!list_empty(&wc->pending_blocks[wc->pending_next]));
1285
1286         /* initialise pending count to 1.  This will be decremented
1287          * once all related blocks have been submitted for write.
1288          * This includes the blocks in the next 0, 1, or 2
1289          * write clusters.
1290          */
1291         atomic_inc(&wc->pending_cnt[wc->pending_next]);
1292
1293         /* Now write it all out.
1294          * Later we should possibly re-order the writes
1295          * for raid4 stripe-at-a-time
1296          */
1297         for (i = 0; i < wc->chead_blocks; i++)
1298                 lafs_write_head(fs,
1299                                 page_address(wc->page[wc->pending_next])
1300                                 + i * fs->blocksize,
1301                                 head_addr[i], wc);
1302
1303         while (!list_empty(&wc->clhead)) {
1304                 int credits = 0;
1305                 b = list_entry(wc->clhead.next, struct block, lru);
1306                 dprintk("flushing %s\n", strblk(b));
1307
1308                 if (test_and_clear_bit(B_Realloc, &b->flags))
1309                         credits++;
1310                 if (cnum == 0) {
1311                         if (test_and_clear_bit(B_Dirty, &b->flags))
1312                                 credits++;
1313                 }
1314
1315                 WARN_ON(credits == 0);
1316                 lafs_space_return(fs, credits);
1317                 spin_lock(&fs->lock);
1318                 list_del(&b->lru);
1319                 list_add(&b->lru, &wc->pending_blocks[(wc->pending_next+3)%4]);
1320                 spin_unlock(&fs->lock);
1321                 lafs_write_block(fs, b, wc);
1322                 lafs_refile(b, 0);
1323         }
1324         skip_discard(wc->slhead.next[0]);
1325         /* FIXME write out zeros to pad raid4 if needed */
1326
1327         /* Having submitted this request we need to remove the bias
1328          * from pending_cnt of previous clusters
1329          */
1330         which = wc->pending_next;
1331         wake = 0;
1332         dprintk("A which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1333                 atomic_read(&wc->pending_cnt[which]));
1334         if (wc->pending_vfy_type[which] == VerifyNull)
1335                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1336                         wake = 1;
1337         which = (which+3) % 4;
1338         dprintk("B which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1339                 atomic_read(&wc->pending_cnt[which]));
1340         if (wc->pending_vfy_type[which] == VerifyNext)
1341                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1342                         wake = 1;
1343         which = (which+3) % 4;
1344         dprintk("C which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1345                 atomic_read(&wc->pending_cnt[which]));
1346         if (wc->pending_vfy_type[which] == VerifyNext2)
1347                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1348                         wake = 1;
1349         if (wake) {
1350                 cluster_done(fs, wc);
1351                 wake_up(&wc->pending_wait);
1352         }
1353         dprintk("D %d\n", wake);
1354
1355         lafs_write_flush(fs, wc);
1356
1357         wc->pending_next = (wc->pending_next+1) % 4;
1358         /* now re-initialise the cluster information */
1359         cluster_reset(fs, wc);
1360
1361         wait_event(wc->pending_wait,
1362                    atomic_read(&wc->pending_cnt[wc->pending_next]) == 0);
1363         dprintk("cluster_flush end pending_next=%d cnt=%d\n",
1364                 wc->pending_next, atomic_read(&wc->pending_cnt
1365                                               [wc->pending_next]));
1366 }
1367
1368 void lafs_cluster_flush(struct fs *fs, int cnum)
1369 {
1370         struct wc *wc = &fs->wc[cnum];
1371         dprintk("LAFS_cluster_flush %d\n", cnum);
1372         mutex_lock(&wc->lock);
1373         if (!list_empty(&wc->clhead)
1374             || wc->chead_size > sizeof(struct cluster_head)
1375             || (cnum == 0 &&
1376                 ((fs->checkpointing & CH_CheckpointEnd)
1377                  || test_bit(FlushNeeded, &fs->fsstate)
1378                  || test_bit(SecondFlushNeeded, &fs->fsstate)
1379                         )))
1380                 cluster_flush(fs, cnum);
1381         mutex_unlock(&wc->lock);
1382 }
1383
1384 void lafs_cluster_wait_all(struct fs *fs)
1385 {
1386         int i;
1387         for (i = 0; i < WC_NUM; i++) {
1388                 struct wc *wc = &fs->wc[i];
1389                 int j;
1390                 for (j = 0; j < 4; j++) {
1391                         wait_event(wc->pending_wait,
1392                                    atomic_read(&wc->pending_cnt[j]) <= 1);
1393                 }
1394         }
1395 }
1396
1397 /* The end_io function for writes to a cluster is one
1398  * of 4 depending on which in the circular list the
1399  * block is for.
1400  */
1401 static void cluster_end_io(struct bio *bio, int err,
1402                    int which, int header)
1403 {
1404         /* FIXME how to handle errors? */
1405         struct wc *wc = bio->bi_private;
1406         struct fs *fs = container_of(wc, struct fs, wc[wc->cnum]);
1407         int wake = 0;
1408         int done = 0;
1409
1410         dprintk("end_io err=%d which=%d header=%d\n",
1411                 err, which, header);
1412
1413         if (atomic_dec_and_test(&wc->pending_cnt[which]))
1414                 done++;
1415         if (atomic_read(&wc->pending_cnt[which]) == 1)
1416                 wake = 1;
1417         which = (which+3) % 4;
1418         if (header &&
1419             (wc->pending_vfy_type[which] == VerifyNext ||
1420              wc->pending_vfy_type[which] == VerifyNext2) &&
1421             atomic_dec_and_test(&wc->pending_cnt[which]))
1422                 done++;
1423         if (atomic_read(&wc->pending_cnt[which]) == 1)
1424                 wake = 1;
1425
1426         which = (which+3) % 4;
1427         if (header &&
1428             wc->pending_vfy_type[which] == VerifyNext2 &&
1429             atomic_dec_and_test(&wc->pending_cnt[which]))
1430                 done++;
1431         if (atomic_read(&wc->pending_cnt[which]) == 1)
1432                 wake = 1;
1433
1434         if (done)
1435                 schedule_work(&fs->done_work);
1436         if (done || wake) {
1437                 wake_up(&wc->pending_wait);
1438                 if (test_bit(FlushNeeded, &fs->fsstate) ||
1439                     test_bit(SecondFlushNeeded, &fs->fsstate))
1440                         lafs_wake_thread(fs);
1441         }
1442 }
1443
1444 static void cluster_endio_data_0(struct bio *bio, int err)
1445 {
1446         cluster_end_io(bio, err, 0, 0);
1447 }
1448
1449 static void cluster_endio_data_1(struct bio *bio, int err)
1450 {
1451         cluster_end_io(bio, err, 1, 0);
1452 }
1453
1454 static void cluster_endio_data_2(struct bio *bio, int err)
1455 {
1456         cluster_end_io(bio, err, 2, 0);
1457 }
1458
1459 static void cluster_endio_data_3(struct bio *bio, int err)
1460 {
1461         cluster_end_io(bio, err, 3, 0);
1462 }
1463
1464 static void cluster_endio_header_0(struct bio *bio, int err)
1465 {
1466         cluster_end_io(bio, err, 0, 1);
1467 }
1468
1469 static void cluster_endio_header_1(struct bio *bio, int err)
1470 {
1471         cluster_end_io(bio, err, 1, 1);
1472 }
1473
1474 static void cluster_endio_header_2(struct bio *bio, int err)
1475 {
1476         cluster_end_io(bio, err, 2, 1);
1477 }
1478
1479 static void cluster_endio_header_3(struct bio *bio, int err)
1480 {
1481         cluster_end_io(bio, err, 3, 1);
1482 }
1483
1484 bio_end_io_t *lafs_cluster_endio_choose(int which, int header)
1485 {
1486         if (header)
1487                 if ((which&2) == 0)
1488                         if (which == 0)
1489                                 return cluster_endio_header_0;
1490                         else
1491                                 return cluster_endio_header_1;
1492                 else
1493                         if (which == 2)
1494                                 return cluster_endio_header_2;
1495                         else
1496                                 return cluster_endio_header_3;
1497         else
1498                 if ((which&2) == 0)
1499                         if (which == 0)
1500                                 return cluster_endio_data_0;
1501                         else
1502                                 return cluster_endio_data_1;
1503                 else
1504                         if (which == 2)
1505                                 return cluster_endio_data_2;
1506                         else
1507                                 return cluster_endio_data_3;
1508 }
1509
1510 int lafs_cluster_init(struct fs *fs, int cnum, u64 addr, u64 prev, u64 seq)
1511 {
1512         struct wc *wc = &fs->wc[cnum];
1513         int i;
1514
1515         wc->slhead.b = NULL;
1516         INIT_LIST_HEAD(&wc->clhead);
1517
1518         for (i = 0; i < 4 ; i++) {
1519                 wc->pending_vfy_type[i] = VerifyNull;
1520                 atomic_set(&wc->pending_cnt[i], 0);
1521                 wc->page[i] = alloc_page(GFP_KERNEL);
1522                 if (!wc->page[i])
1523                         break;
1524         }
1525         if (i < 4) {
1526                 while (i > 0) {
1527                         i--;
1528                         put_page(wc->page[i]);
1529                 }
1530                 return -ENOMEM;
1531         }
1532         wc->pending_next = 0;
1533         wc->cluster_seq = seq;
1534         wc->prev_addr = prev;
1535
1536         if (addr) {
1537                 seg_setpos(fs, &wc->seg, addr);
1538                 cluster_reset(fs, wc);
1539                 BUG_ON(cnum);
1540                 spin_lock(&fs->lock);
1541                 fs->free_blocks += wc->remaining+1;
1542                 spin_unlock(&fs->lock);
1543         }
1544         wc->cnum = cnum;
1545
1546         return 0;
1547 }
1548
1549 void lafs_flush(struct datablock *b)
1550 {
1551         /* Need to write this block out and wait until
1552          * it has been written, so that we can update it
1553          * without risking corruption to previous snapshot.
1554          *
1555          */
1556 }
1557
1558 int lafs_cluster_empty(struct fs *fs, int cnum)
1559 {
1560         return list_empty(&fs->wc[cnum].clhead);
1561 }
1562