]> git.neil.brown.name Git - LaFS.git/blob - cluster.c
README update
[LaFS.git] / cluster.c
1
2 /*
3  * write-cluster management routines for LaFS
4  * fs/lafs/cluster.c
5  * Copyright (C) 2006-2009
6  * NeilBrown <neilb@suse.de>
7  * Released under the GPL, version 2
8  */
9
10 /*
11  * Blocks to be written to a cluster need to be sorted so as to minimise
12  * fragmentation and to keep the cluster head small.
13  * So that we can track the amount of space needed in the cluster head,
14  * we really need to keep the list sorted rather than just sort it
15  * before writeout.
16  * We do this using a linked list of blocks and an insertion sort.
17  * To improve the speed of insertion we keep a modify skiplist structure
18  * over the list.
19  * This is 'modified' in that it explicitly understands sequential
20  * runs within the list and doesn't keep extra skip-points within a run.
21  *
22  * The skip-points require memory allocation, which cannot be
23  * guaranteed here in the writeout path.  To handle this we
24  *  - preallocate some entries
25  *  - accept a less-efficient list if memory isn't available
26  *  - flush early if things get really bad.
27  *
28  * Skip point placement and runs.
29  * When an entry is added to the list, we place a skip-point over it
30  * based on the value of a random number, with a 1-in-4 chance of placement.
31  * The height of the skip point is also random - we keep rolling the dice,
32  * increasing height each time that we get 1-in-4.
33  * If the new entry is placed immediately after a skip point, and is
34  * consecutive with that skip point, the skip point is moved forward
35  * so that it will always be at the end of a run.
36  * When a new entry finds itself at the start or end of a run, a skip
37  * point is not added.
38  */
39
40 /* A height of 8 with a fanout of 1-in-4 allows reasonable
41  * addressing for 2^16 entries, which is probably more than we need
42  */
43
44 #include        "lafs.h"
45 #include        <linux/crc32.h>
46 #include        <linux/random.h>
47 #include        <linux/slab.h>
48
49 static void cluster_flush(struct fs *fs, int cnum);
50
51 static void skip_discard(struct skippoint *sp)
52 {
53         while (sp) {
54                 struct skippoint *next = sp->next[0];
55                 kfree(sp);
56                 sp = next;
57         }
58 }
59
60 static int cmp_blk(struct block *a, struct block *b)
61 {
62         /* compare the filesys/inode/index/addr info
63          * of two block and return:
64          *  0 if equals
65          * +ve if a > b
66          * -1 if consecutive in order
67          * -2 if in same inode and in order
68          * <=-3 if different inodes and correct order
69          */
70
71         if (a->inode != b->inode) {
72                 struct inode *fsinoa = LAFSI(a->inode)->filesys;
73                 struct inode *fsinob = LAFSI(b->inode)->filesys;
74                 if (fsinoa->i_ino <
75                     fsinob->i_ino)
76                         return -3;
77                 if (fsinoa->i_ino >
78                     fsinob->i_ino)
79                         return 3;
80                 /* same filesystem */
81                 if (a->inode->i_ino <
82                     b->inode->i_ino)
83                         return -3;
84                 return 3;
85         }
86         /* Same inode.  If both data blocks they might be
87          * consecutive
88          */
89         if (!test_bit(B_Index, &a->flags) &&
90             !test_bit(B_Index, &b->flags)) {
91                 if (a->fileaddr == b->fileaddr)
92                         return 0;
93                 if (a->fileaddr + 1 == b->fileaddr)
94                         return -1;
95                 if (a->fileaddr < b->fileaddr)
96                         return -2;
97                 return 2;
98         }
99         /* at least one is an index block. Sort them before data */
100         if (!test_bit(B_Index, &a->flags))
101                 /* a is data, b is index, b first */
102                 return 2;
103
104         if (!test_bit(B_Index, &b->flags))
105                 return -2;
106
107         /* both are index blocks, use fileaddr or depth */
108         if (a->fileaddr < b->fileaddr)
109                 return -2;
110         if (a->fileaddr > b->fileaddr)
111                 return 2;
112         if (iblk(a)->depth < iblk(b)->depth)
113                 return -2;
114         /* equality really shouldn't be possible */
115         return 2;
116 }
117
118 static struct block *skip_find(struct skippoint *head,
119                                struct list_head *list,
120                                struct block *target,
121                                struct skippoint *where)
122 {
123         /* Find the last block that precedes 'target'
124          * and return it.  If target will be the start,
125          * return NULL.
126          * 'where' gets filled in with the last skip point
127          * at each level which is before-or-at the found
128          * block.  This can be used for easy insertion.
129          */
130
131         int level;
132         struct block *b, *next;
133         for (level = SKIP_MAX_HEIGHT-1; level >= 0 ; level--) {
134                 while (head->next[level] &&
135                        cmp_blk(head->next[level]->b, target) < 0)
136                         head = head->next[level];
137                 where->next[level] = head;
138         }
139         if (head->b == NULL) {
140                 if (list_empty(list) ||
141                     cmp_blk(list_entry(list->next, struct block, lru),
142                             target) > 0)
143                         /* This goes at the start */
144                         return NULL;
145                 b = list_entry(list->next, struct block, lru);
146         } else
147                 b = head->b;
148         /* b is before target */
149         next = b;
150         list_for_each_entry_continue(next, list, lru) {
151                 if (cmp_blk(next, target) > 0)
152                         break;
153                 b = next;
154         }
155         /* b is the last element before target */
156         return b;
157 }
158
159 static int cluster_insert(struct skippoint *head,
160                           struct list_head *list,
161                           struct block *target,
162                           int avail)
163 {
164         /* Insert 'target' at an appropriate point in the
165          * list, creating a skippoint if appropriate.
166          * Return:
167          *  0 if part of a run
168          *  1 if not in a run, but adjacent to something in same file
169          *  2 otherwise
170          * However if the value we would return would be more than
171          * 'avail' abort the insert and return -1;
172          */
173         int rv;
174         struct skippoint pos, *newpoint;
175         int level;
176         struct block *b;
177         int cmpbefore, cmpafter;
178         unsigned long rand[DIV_ROUND_UP(SKIP_MAX_HEIGHT * 2 / 8 + 1,
179                                         sizeof(unsigned long))];
180         int height;
181
182         BUG_ON(!list_empty(&target->lru));
183         if (unlikely(list_empty(list))) {
184                 int height;
185                 /* handle this trivial case separately */
186                 if (avail < 2)
187                         return -1;
188                 list_add(&target->lru, list);
189                 head->b = NULL;
190                 for (height = 0; height < SKIP_MAX_HEIGHT; height++)
191                         head->next[height] = NULL;
192                 return 2;
193         }
194         b = skip_find(head, list, target, &pos);
195         if (b == NULL) {
196                 list_add(&target->lru, list);
197                 cmpbefore = 3;
198         } else {
199                 list_add(&target->lru, &b->lru);
200                 cmpbefore = cmp_blk(b, target);
201         }
202         if (target->lru.next  == list) /* new last */
203                 cmpafter = -3;
204         else
205                 cmpafter = cmp_blk(target, list_entry(target->lru.next,
206                                                       struct block,
207                                                       lru));
208         if (cmpbefore == -1) {
209                 /* adjacent with previous.  Possibly move skippoint */
210                 if (pos.next[0]->b == b) {
211                         pos.next[0]->b = target;
212                         return 0;
213                 }
214                 rv = 0;
215         } else if (cmpafter == -1)
216                 /* adjacent with next. No need to add a skippoint */
217                 return 0;
218         else if (cmpbefore == -2 || cmpafter == -2)
219                 /* same inodes as next or previous */
220                 rv = 1;
221         else
222                 rv = 2;
223
224         if (rv > avail) {
225                 /* Doesn't fit */
226                 list_del_init(&target->lru);
227                 return -1;
228         }
229         /* Now consider adding a skippoint here.  We want 2 bits
230          * per level of random data
231          */
232         get_random_bytes(rand, sizeof(rand));
233         height = 0;
234         while (height < SKIP_MAX_HEIGHT &&
235                test_bit(height*2, rand) &&
236                test_bit(height*2+1, rand))
237                 height++;
238         if (height == 0)
239                 return rv;
240
241         newpoint = kmalloc(sizeof(struct skippoint), GFP_NOFS);
242         if (!newpoint)
243                 return rv;
244         newpoint->b = target;
245         for (level = 0; level < height-1; level++) {
246                 newpoint->next[level] = pos.next[level];
247                 pos.next[level] = newpoint;
248         }
249         return rv;
250 }
251
252 /*-----------------------------------------------------------------------
253  * A segment is divided up in a slightly complicated way into
254  * tables, rows and columns.  This allows us to align writes with
255  * stripes in a raid4 array or similar.
256  * So we have some support routines to help track our way around
257  * the segment
258  */
259
260 static int seg_remainder(struct fs *fs, struct segpos *seg)
261 {
262         /* return the number of blocks from the start of segpos to the
263          * end of the segment.
264          * i.e. remaining rows in this table, plus remaining tables in
265          * this segment
266          */
267         struct fs_dev *dv = &fs->devs[seg->dev];
268         int rows = dv->rows_per_table - seg->st_row;
269
270         BUG_ON(seg->dev < 0);
271         rows += dv->rows_per_table * (dv->tables_per_seg - seg->st_table - 1);
272         return rows * dv->width;
273 }
274
275 static void seg_step(struct fs *fs, struct segpos *seg)
276 {
277         /* reposition this segpos to be immediately after it's current end
278          * and make the 'current' point be the start.
279          * Size will be empty
280          */
281         seg->st_table = seg->nxt_table;
282         seg->st_row = seg->nxt_row;
283         seg->table = seg->st_table;
284         seg->row = seg->st_row;
285         seg->col = 0;
286 }
287
288 u32 lafs_seg_setsize(struct fs *fs, struct segpos *seg, u32 size)
289 {
290         /* move the 'nxt' table/row to be 'size' blocks beyond
291          * current start. size will be rounded up to a multiple
292          * of width.
293          */
294         struct fs_dev *dv = &fs->devs[seg->dev];
295         u32 rv;
296         BUG_ON(seg->dev < 0);
297         size = (size + dv->width - 1) / dv->width;
298         rv = size * dv->width;
299         size += seg->st_row;
300         seg->nxt_table = seg->st_table + size / dv->rows_per_table;
301         seg->nxt_row = size % dv->rows_per_table;
302         return rv;
303 }
304
305 void lafs_seg_setpos(struct fs *fs, struct segpos *seg, u64 addr)
306 {
307         /* set seg to start at the given virtual address, and be
308          * of size 0
309          * 'addr' should always be at the start of a row, though
310          * as it might be read off storage, we need to be
311          * a little bit careful.
312          */
313         u32 offset;
314         u32 row, col, table;
315         struct fs_dev *dv;
316         virttoseg(fs, addr, &seg->dev, &seg->num, &offset);
317         dv = &fs->devs[seg->dev];
318         col = offset / dv->stride;
319         row = offset % dv->stride;
320         table = col / dv->width;
321         col = col % dv->width;
322         BUG_ON(col);
323
324         seg->st_table = table;
325         seg->st_row = row;
326
327         seg->table = seg->nxt_table = seg->st_table;
328         seg->row = seg->nxt_row = seg->st_row;
329         seg->col = 0;
330 }
331
332 static u64 seg_addr(struct fs *fs, struct segpos *seg)
333 {
334         /* Return the virtual address of the blocks pointed
335          * to by 'seg'.
336          */
337         struct fs_dev *dv = &fs->devs[seg->dev];
338         u64 addr;
339
340         if (seg->dev < 0)
341                 /* Setting 'next' address for last cluster in
342                  * a cleaner segment */
343                 return (u64)-1;
344         if (seg->table > seg->nxt_table ||
345             (seg->table == seg->nxt_table &&
346              seg->row > seg->nxt_row))
347                 /* We have gone beyond the end of the cluster,
348                  * must be bad data during roll-forward
349                  */
350                 return (u64)-1;
351         addr = seg->col * dv->stride;
352         addr += seg->row;
353         addr += seg->table * dv->rows_per_table;
354         addr += seg->num * dv->segment_stride;
355         addr += dv->start;
356         return addr;
357 }
358
359 u64 lafs_seg_next(struct fs *fs, struct segpos *seg)
360 {
361         /* step forward one block, returning the address of
362          * the block stepped over
363          * The write cluster can have three sections:
364          * - the tail end of one table
365          * - a number of complete tables
366          * - the head of one table
367          *
368          * For each table or partial table we want to talk a full
369          * column at a time, then go to the next column.
370          * So we step to the next row.  If it is beyond the range of
371          * the current table, go to 'start' of next column, or to
372          * next table.
373          */
374         struct fs_dev *dv = &fs->devs[seg->dev];
375         u64 addr = seg_addr(fs, seg);
376
377         if (!(addr+1))
378                 /* Beyond end of cluster */
379                 return addr;
380
381         /* now step forward in column or table or seg */
382         seg->row++;
383         if (seg->row >= dv->rows_per_table ||
384             (seg->table == seg->nxt_table
385              && seg->row >= seg->nxt_row)) {
386                 seg->col++;
387                 if (seg->col >= dv->width) {
388                         seg->col = 0;
389                         seg->table++;
390                 }
391                 if (seg->table == seg->st_table)
392                         seg->row = seg->st_row;
393                 else
394                         seg->row = 0;
395         }
396         return addr;
397 }
398
399 static void cluster_reset(struct fs *fs, struct wc *wc)
400 {
401         if (wc->seg.dev < 0) {
402                 wc->remaining = 0;
403                 wc->chead_size = 0;
404                 return;
405         }
406         wc->remaining = seg_remainder(fs, &wc->seg);
407         wc->chead_blocks = 1;
408         wc->remaining--;
409         wc->cluster_space = fs->blocksize - sizeof(struct cluster_head);
410         wc->chead = page_address(wc->page[wc->pending_next]);
411         wc->chead_size = sizeof(struct cluster_head);
412 }
413
414 static void close_segment(struct fs *fs, int cnum)
415 {
416         /* Release old segment */
417         /* FIXME I need lafs_seg_{de,}ref for every snapshot,
418          * don't I ???
419          */
420         struct wc *wc = fs->wc + cnum;
421         if (wc->seg.dev >= 0) {
422                 if (cnum) {
423                         spin_lock(&fs->lock);
424                         fs->clean_reserved -= seg_remainder(fs, &wc->seg);
425                         spin_unlock(&fs->lock);
426                 }
427                 lafs_seg_forget(fs, wc->seg.dev, wc->seg.num);
428                 wc->seg.dev = -1;
429         }
430 }
431
432 void lafs_close_all_segments(struct fs *fs)
433 {
434         int cnum;
435         for (cnum = 0; cnum < WC_NUM; cnum++)
436                 close_segment(fs, cnum);
437 }
438
439 static int new_segment(struct fs *fs, int cnum)
440 {
441         /* new_segment can fail if cnum > 0 and there is no
442          * clean_reserved
443          */
444         struct wc *wc = fs->wc + cnum;
445         unsigned int dev;
446         u32 seg;
447
448         close_segment(fs, cnum);
449
450         if (cnum && fs->clean_reserved < fs->max_segment) {
451                 /* we have reached the end of the last cleaner
452                  * segment.  The remainder of clean_reserved
453                  * is of no value (if there even is any)
454                  */
455                 if (fs->clean_reserved) {
456                         spin_lock(&fs->alloc_lock);
457                         fs->free_blocks += fs->clean_reserved;
458                         fs->clean_reserved = 0;
459                         spin_unlock(&fs->alloc_lock);
460                 }
461                 return -ENOSPC;
462         }
463         /* This gets a reference on the 'segsum' */
464         lafs_free_get(fs, &dev, &seg, 0);
465         lafs_seg_setpos(fs, &wc->seg, segtovirt(fs, dev, seg));
466         BUG_ON(wc->seg.dev != dev);
467         BUG_ON(wc->seg.num != seg);
468
469         if (cnum == 0)
470                 /* wc mutex protects us here */
471                 fs->newsegments++;
472
473         return 0;
474 }
475
476 /*-------------------------------------------------------------------------
477  * lafs_cluster_allocate
478  * This adds a block to the writecluster list and possibly triggers a
479  * flush.
480  * The flush will assign a physical address to each block and schedule
481  * the actual write out.
482  *
483  * There can be multiple active write clusters. e.g one for new data,
484  * one for cleaned data, one for defrag writes.
485  *
486  * It is here that the in-core inode is copied to the data block, if
487  * appropriate, and here that small files get copied into the inode
488  * thus avoiding writing a separate block for the file content.
489  *
490  * Roll-forward sees any data block as automatically extending the
491  * size of the file.  This requires that we write blocks in block-order
492  * as much as possible (which we would anyway) and that we inform
493  * roll-forward when a file ends mid-block.
494  *
495  * A block should be iolocked when being passed to lafs_cluster_allocate.
496  * When the IO is complete, it will be unlocked.
497  */
498 static int flush_data_to_inode(struct block *b)
499 {
500         /* This is the first and only datablock in a file
501          * and it will fit in the inode. So put it there.
502          */
503
504         char *ibuf, *dbuf;
505         loff_t size = i_size_read(b->inode);
506         struct lafs_inode *lai = LAFSI(b->inode);
507         struct datablock *db;
508         struct fs *fs = fs_from_inode(b->inode);
509
510         lafs_iolock_block(&lai->iblock->b);
511         dprintk("FLUSHING single block into inode - s=%d %s\n",
512                 (int)size, strblk(b));
513         if (lai->iblock->depth > 1)
514                 /* Too much indexing still - truncate must be happening */
515                 goto give_up;
516         if (lai->iblock->depth == 1) {
517                 /* If there is nothing in this block we can still use it */
518                 if (lai->iblock->children.next != &b->siblings ||
519                     b->siblings.next != &lai->iblock->children ||
520                     lafs_leaf_next(lai->iblock, 1) != 0xffffffff)
521                         goto give_up;
522         }
523         /* Need checkpoint lock to pin the dblock */
524         lafs_checkpoint_lock(fs);
525         if (test_and_clear_bit(B_Dirty, &b->flags)) {
526                 int credits = 1;
527                 /* Check size again, just in case it changed */
528                 size = i_size_read(b->inode);
529                 if (size + lai->metadata_size > fs->blocksize) {
530                         /* Lost a race, better give up restoring Dirty bit */
531                         if (test_and_set_bit(B_Dirty, &b->flags))
532                                 if (test_and_set_bit(B_Credit, &b->flags))
533                                         lafs_space_return(fs, 1);
534                         lafs_checkpoint_unlock(fs);
535                         goto give_up;
536                 }
537                 if (!test_and_set_bit(B_Credit, &lai->dblock->b.flags))
538                         credits--;
539                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
540                         credits++;
541                 if (!test_and_set_bit(B_ICredit, &lai->dblock->b.flags))
542                         credits--;
543                 lafs_space_return(fs, credits);
544                 LAFS_BUG(!test_bit(B_SegRef, &lai->dblock->b.flags), &lai->dblock->b);
545                 set_bit(B_PinPending, &lai->dblock->b.flags);
546                 if (test_bit(B_Pinned, &b->flags))
547                         lafs_pin_block_ph(&lai->dblock->b, !!test_bit(B_Phase1, &b->flags));
548                 else
549                         lafs_pin_block(&lai->dblock->b);
550                 lafs_dirty_dblock(lai->dblock);
551         } else if (test_and_clear_bit(B_Realloc, &b->flags)) {
552                 int credits = 1;
553                 LAFS_BUG(!test_bit(B_Valid, &lai->iblock->b.flags),
554                          &lai->iblock->b);
555                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
556                         credits++;
557                 if (!test_and_set_bit(B_Realloc, &lai->iblock->b.flags))
558                         credits--;
559                 if (!test_and_set_bit(B_UnincCredit, &lai->iblock->b.flags))
560                         credits--;
561                 LAFS_BUG(credits < 0, b);
562                 lafs_space_return(fs, credits);
563         } else {
564                 printk("Wasn't dirty or realloc: %s\n", strblk(b));
565                 LAFS_BUG(1, b);
566         }
567         lafs_checkpoint_unlock(fs);
568         lai->depth = 0;
569         lai->iblock->depth = 0;
570         clear_bit(B_Valid, &lai->iblock->b.flags);
571
572         /* safe to reference ->dblock as b is a dirty child */
573         db = getdref(lai->dblock, MKREF(flush2db));
574         ibuf = map_dblock(db);
575         dbuf = map_dblock_2(dblk(b));
576
577         if (test_and_clear_bit(B_UnincCredit, &b->flags))
578                 lafs_space_return(fs, 1);
579
580         /* It is an awkward time to call lafs_inode_fillblock,
581          * so do this one little change manually
582          */
583         ((struct la_inode *)ibuf)->depth = 0;
584         memcpy(ibuf + lai->metadata_size,
585                dbuf, size);
586         memset(ibuf + lai->metadata_size + size,
587                0,
588                fs->blocksize - lai->metadata_size - size);
589         unmap_dblock_2(db, ibuf);
590         unmap_dblock(dblk(b), dbuf);
591         lafs_iounlock_block(&lai->iblock->b);
592
593         putdref(db, MKREF(flush2db));
594         lafs_refile(b, 0);
595         return 1;
596
597 give_up:
598         lafs_iounlock_block(&lai->iblock->b);
599         return 0;
600 }
601
602 int lafs_cluster_allocate(struct block *b, int cnum)
603 {
604         struct fs *fs = fs_from_inode(b->inode);
605         struct wc *wc = &fs->wc[cnum];
606         struct lafs_inode *lai;
607         loff_t size;
608         int used;
609         int err = 0;
610
611         LAFS_BUG(test_bit(B_Index, &b->flags) &&
612                  iblk(b)->uninc_table.pending_cnt > 0, b);
613
614         LAFS_BUG(!test_bit(B_IOLock, &b->flags), b);
615         lai = LAFSI(b->inode);
616
617         LAFS_BUG(!test_bit(B_Valid, &b->flags), b);
618         LAFS_BUG(!fs->rolled, b);
619         LAFS_BUG(lai->flags & File_nonlogged &&
620                  !test_bit(B_Index, &b->flags), b);
621
622         /* We cannot change the physaddr of an uniblock in a
623          * different phase to the parent, else the parent will
624          * get the wrong address.
625          */
626         LAFS_BUG(test_bit(B_Index, &b->flags) &&
627                  test_bit(B_Uninc, &b->flags) &&
628                  !!test_bit(B_Phase1, &b->flags) !=
629                  !!test_bit(B_Phase1, &b->parent->b.flags), b);
630
631         size = i_size_read(b->inode);
632         if (!test_bit(B_Index, &b->flags) &&          /* it is a datablock */
633             b->fileaddr == 0 &&
634             b->parent == lai->iblock &&               /* No indexing */
635             lai->type >= TypeBase &&                  /* 'size' is meaningful */
636             size + lai->metadata_size <= fs->blocksize) {
637                 int success = flush_data_to_inode(b);
638                 if (success) {
639                         lafs_summary_update(fs, b->inode, b->physaddr, 0,
640                                             0, !!test_bit(B_Phase1, &b->flags),
641                                             test_bit(B_SegRef, &b->flags));
642                         b->physaddr = 0;
643                         lafs_iounlock_block(b);
644                         return 0;
645                 }
646                 /* There are conflicting blocks in the index tree */
647                 if (test_and_clear_bit(B_Realloc, &b->flags))
648                         lafs_space_return(fs, 1);
649                 if (!test_bit(B_Dirty, &b->flags) ||
650                     !test_bit(B_Pinned, &b->flags)) {
651                         /* It is safe to just leave this for later */
652                         lafs_iounlock_block(b);
653                         return 0;
654                 }
655                 /* We really need to write this, so use a full block. */
656                 printk("WARNING %s\n", strblk(b));
657                 WARN_ON(1);
658         }
659
660         /* The symbiosis between the datablock and the indexblock for an
661          * inode needs to be dealt with here.
662          * The data block will not normally be written until the InoIdx
663          * block is unpinned or phase-flipped.  However if it is, as the
664          * result of some 'sync' style operation, then we write it even
665          * though the index info might not be uptodate.  The rest of the
666          * content will be safe for roll-forward to pick up, and the rest
667          * will be written when the InoIdx block is ready.
668          *
669          * When the InoIdx block is ready, we don't want to write it as
670          * there is nowhere for it to be incorporated in to.  Rather we
671          * move the pinning and credits across to the Data block
672          * which will subsequently be written.
673          */
674
675         if (test_bit(B_InoIdx, &b->flags)) {
676                 struct block *b2 = &lai->dblock->b;
677                 int credits = 0;
678
679                 LAFS_BUG(!test_bit(B_Pinned, &b->flags), b);
680                 if (!test_bit(B_Pinned, &b2->flags)) {
681                         /* index block pinned, data block not,
682                          * carry the pinning across
683                          */
684                         if (b2->parent == NULL &&
685                             b->parent)
686                                 b2->parent =
687                                         getiref(b->parent, MKREF(child));
688                         LAFS_BUG(b->parent != b2->parent, b);
689                         set_bit(B_PinPending, &b2->flags);  // FIXME is this right?
690                         set_phase(b2, test_bit(B_Phase1,
691                                                &b->flags));
692                         lafs_refile(b2, 0);
693                 }
694
695                 if (cnum) {
696                         if (test_and_clear_bit(B_Realloc, &b->flags))
697                                 credits++;
698                         LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
699                 } else {
700                         if (test_and_clear_bit(B_Dirty, &b->flags))
701                                 credits++;
702                         if (test_and_clear_bit(B_Realloc, &b->flags))
703                                 credits++;
704                 }
705                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
706                         credits++;
707
708                 /* We always erase the InoIdx block before the
709                  * inode data block, so this cannot happen
710                  */
711                 LAFS_BUG(!test_bit(B_Valid, &b2->flags), b2);
712
713                 if (cnum == 0) {
714                         if (!test_and_set_bit(B_UnincCredit, &b2->flags))
715                                 if (!test_and_clear_bit(B_ICredit, &b2->flags))
716                                         credits--;
717
718                         if (!test_bit(B_Dirty, &b2->flags))
719                                 /* need some credits... */
720                                 if (!test_and_set_bit(B_Credit, &b2->flags))
721                                         credits--;
722                         lafs_dirty_dblock(dblk(b2));
723                 } else {
724                         if (!test_and_set_bit(B_UnincCredit, &b2->flags))
725                                 credits--;
726
727                         if (!test_and_set_bit(B_Realloc, &b2->flags))
728                                 credits--;
729                 }
730                 LAFS_BUG(credits < 0, b2);
731                 lafs_space_return(fs, credits);
732                 /* make sure 'dirty' status is registered */
733                 lafs_refile(b2, 0);
734                 lafs_iounlock_block(b);
735                 return 0;
736         }
737
738         if (!test_bit(B_Dirty, &b->flags) &&
739             !test_bit(B_Realloc, &b->flags)) {
740                 /* block just got truncated, so don't write it. */
741                 lafs_iounlock_block(b);
742                 return 0;
743         }
744
745         if (test_bit(B_EmptyIndex, &b->flags)) {
746                 int credits = 0;
747                 if (test_and_set_bit(B_Writeback, &b->flags))
748                         LAFS_BUG(1, b);
749                 lafs_iounlock_block(b);
750                 lafs_allocated_block(fs, b, 0);
751                 if (cnum) {
752                         if (test_and_clear_bit(B_Realloc, &b->flags))
753                                 credits++;
754                         LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
755                 } else {
756                         if (test_and_clear_bit(B_Dirty, &b->flags))
757                                 credits++;
758                         if (test_and_clear_bit(B_Realloc, &b->flags))
759                                 credits++;
760                 }
761                 lafs_writeback_done(b);
762                 lafs_space_return(fs, credits);
763                 return 0;
764         }
765
766         if (!test_bit(B_Index, &b->flags)) {
767                 struct inode *myi = rcu_my_inode(dblk(b));
768                 if (myi && test_and_clear_bit(I_Dirty, &LAFSI(myi)->iflags))
769                         lafs_inode_fillblock(myi);
770                 rcu_iput(myi);
771         }
772
773         mutex_lock(&wc->lock);
774
775         /* The following check must be inside the mutex_lock
776          * to ensure exclusion between checkpoint and writepage.
777          * checkpoint must never see the block not on the
778          * leaf lru unless it is already in the cluster and so can
779          * be waited for.
780          */
781         if (!list_empty_careful(&b->lru)) {
782                 /* Someone is flushing this block before
783                  * checkpoint got to it - probably writepage.
784                  * Must remove it from the leaf lru so it can go
785                  * on the cluster list.
786                  */
787                 LAFS_BUG(test_bit(B_OnFree, &b->flags), b);
788                 spin_lock(&fs->lock);
789                 if (!list_empty(&b->lru))
790                         list_del_init(&b->lru);
791                 spin_unlock(&fs->lock);
792         }
793
794         if (test_and_set_bit(B_Writeback, &b->flags))
795                 LAFS_BUG(1, b);
796         lafs_iounlock_block(b);
797
798         getref(b, MKREF(cluster)); /* we soon own a reference in the list */
799
800         for (;;) {
801                 int avail;
802                 /* see how much room is in cluster.
803                  * The interesting values are:
804                  * none, one descriptor, one group_head and one descriptor
805                  * These are assigned values 0, 1, 2.
806                  */
807                 if (wc->cluster_space >= (sizeof(struct group_head) +
808                                           sizeof(struct descriptor)) ||
809                     (wc->remaining > 2 &&
810                      wc->chead_blocks < MAX_CHEAD_BLOCKS &&
811                      (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE)
812                         )
813                         avail = 2;
814                 else if (wc->cluster_space >= sizeof(struct descriptor))
815                         avail = 1;
816                 else
817                         avail = 0;
818                 if (wc->seg.dev < 0 || wc->remaining < 1)
819                         used = -1;
820                 else
821                         used = cluster_insert(&wc->slhead, &wc->clhead, b, avail);
822
823                 if (used >= 0)
824                         break;
825                 else if (wc->slhead.b)
826                         cluster_flush(fs, cnum);
827                 else {
828                         err = new_segment(fs, cnum);
829                         if (err) {
830                                 /* No cleaner segments - this will have to go
831                                  * out with a checkpoint.
832                                  * Block is still 'dirty' - just clear
833                                  * writeback flag.
834                                  */
835                                 lafs_writeback_done(b);
836                                 putref(b, MKREF(cluster));
837                                 mutex_unlock(&wc->lock);
838                                 return -ENOSPC;
839                         }
840                         cluster_reset(fs, wc);
841                 }
842         }
843
844         if (used > 0)
845                 wc->cluster_space -= sizeof(struct descriptor);
846         if (used > 1)
847                 wc->cluster_space -= sizeof(struct group_head);
848         if (wc->cluster_space < 0) {
849                 /* need a new page */
850                 wc->chead_blocks++;
851                 wc->remaining--;
852                 wc->cluster_space += fs->blocksize;
853         }
854         wc->remaining--;
855         BUG_ON(wc->remaining < 0);
856         if (wc->remaining == 0)
857                 cluster_flush(fs, cnum);
858         mutex_unlock(&wc->lock);
859         return 0;
860 }
861
862 /*-------------------------------------------------------------------------
863  * Cluster head building.
864  * We build the cluster head bit by bit as we find blocks
865  * in the list.  These routines help.
866  */
867
868 static void cluster_addhead(struct wc *wc, struct inode *ino,
869                             struct group_head **headstart)
870 {
871         struct group_head *gh = (struct group_head *)((char *)wc->chead +
872                                                       wc->chead_size);
873         u16 tnf;
874         u64 tstamp;
875         dprintk("CLUSTER addhead %d\n", wc->chead_size);
876         *headstart = gh;
877
878         gh->inum = cpu_to_le32(ino->i_ino);
879         gh->fsnum = cpu_to_le32(LAFSI(ino)->filesys->i_ino);
880         if (LAFSI(ino)->type >= TypeBase) {
881                 tstamp = encode_time(&ino->i_mtime);
882                 if (tstamp != encode_time(&ino->i_ctime))
883                         tstamp = 0;
884         } else
885                 tstamp = 0;
886         gh->timestamp = cpu_to_le64(tstamp);
887
888         tnf = ((ino->i_generation<<8) | (LAFSI(ino)->trunc_gen & 0xff))
889                 & 0x7fff;
890         if (wc->cnum)
891                 tnf |= 0x8000;
892         gh->truncatenum_and_flag = cpu_to_le16(tnf);
893         wc->chead_size += sizeof(struct group_head);
894 }
895
896 static void cluster_closehead(struct wc *wc,
897                               struct group_head *headstart)
898 {
899         int size = wc->chead_size - (((char *)headstart) - (char *)wc->chead);
900
901         dprintk("CLUSTER closehead %d %d\n", wc->chead_size, size);
902         headstart->group_size_words = size / 4;
903 }
904
905 static void cluster_addmini(struct wc *wc, u32 addr, int offset,
906                             int size, const char *data,
907                             int size2, const char *data2)
908 {
909         /* if size2 !=0, then only
910          * (size-size2) is at 'data' and the rest is at 'data2'
911          */
912         struct miniblock *mb = ((struct miniblock *)
913                                 ((char *)wc->chead + wc->chead_size));
914
915         dprintk("CLUSTER addmini %d %d\n", wc->chead_size, size);
916
917         mb->block_num = cpu_to_le32(addr);
918         mb->block_offset = cpu_to_le16(offset);
919         mb->length = cpu_to_le16(size + DescMiniOffset);
920         wc->chead_size += sizeof(struct miniblock);
921         memcpy(mb->data, data, size-size2);
922         if (size2)
923                 memcpy(mb->data + size-size2, data2, size2);
924         memset(mb->data+size, 0, (-size)&3);
925         wc->chead_size += ROUND_UP(size);
926 }
927
928 static void cluster_adddesc(struct wc *wc, struct block *blk,
929                             struct descriptor **desc_start)
930 {
931         struct descriptor *dh = (struct descriptor *)((char *)wc->chead +
932                                                       wc->chead_size);
933         *desc_start = dh;
934         dprintk("CLUSTER add_desc %d\n", wc->chead_size);
935         dh->block_num = cpu_to_le32(blk->fileaddr);
936         dh->block_cnt = cpu_to_le32(0);
937         dh->block_bytes = 0;
938         if (test_bit(B_Index, &blk->flags))
939                 dh->block_bytes = DescIndex;
940         wc->chead_size += sizeof(struct descriptor);
941 }
942
943 static void cluster_incdesc(struct wc *wc, struct descriptor *desc_start,
944                             struct block *b, int blkbits)
945 {
946         desc_start->block_cnt =
947                 cpu_to_le32(le32_to_cpu(desc_start->block_cnt)+1);
948         if (!test_bit(B_Index, &b->flags)) {
949                 if (LAFSI(b->inode)->type >= TypeBase) {
950                         u64 size = b->inode->i_size;
951                         if (size > ((loff_t)b->fileaddr << blkbits) &&
952                             size <= ((loff_t)(b->fileaddr + 1) << blkbits))
953                                 desc_start->block_bytes =
954                                         cpu_to_le32(size & ((1<<blkbits)-1));
955                         else
956                                 desc_start->block_bytes =
957                                         cpu_to_le32(1 << blkbits);
958                 } else
959                         desc_start->block_bytes = cpu_to_le32(1<<blkbits);
960         }
961 }
962
963 /*------------------------------------------------------------------------
964  * Miniblocks/updates
965  * Updates are added to the cluster head immediately, causing a flush
966  * if there is insufficient room.
967  * This means that miniblocks always occur before block descriptors.
968  * miniblocks are only ever written to cluster-series 0.
969  */
970
971 int
972 lafs_cluster_update_prepare(struct update_handle *uh, struct fs *fs,
973                             int len)
974 {
975         /* make there there is room for a 'len' update in upcoming
976          * cluster.  Do nothing here.
977          */
978         uh->fs = fs;
979         uh->reserved = 0;
980         return 0;
981 }
982
983 int
984 lafs_cluster_update_pin(struct update_handle *uh)
985 {
986         /* last chance to bail out */
987         if (lafs_space_alloc(uh->fs, 1, ReleaseSpace)) {
988                 uh->reserved = 1;
989                 uh->fs->cluster_updates++;
990                 return 0;
991         }
992         return -EAGAIN;
993 }
994
995 static unsigned long long
996 lafs_cluster_update_commit_both(struct update_handle *uh, struct fs *fs,
997                                 struct inode *ino, u32 addr,
998                                 int offset, int len, struct datablock *b,
999                                 const char *str1,
1000                                 int len2, const char *str2)
1001 {
1002         /* There is the data - alternate layout */
1003         /* flush 'size' bytes of data, as 'offset' of bnum in ino
1004          * in the next write cluster
1005          */
1006         struct wc *wc = &fs->wc[0];
1007         struct group_head *head_start;
1008         unsigned long long seq;
1009         char *mapping = NULL;
1010
1011         BUG_ON(!fs->rolled);
1012
1013         mutex_lock(&wc->lock);
1014         while (wc->cluster_space < (sizeof(struct group_head)+
1015                                     sizeof(struct descriptor) +
1016                                     ROUND_UP(len))) {
1017                 if (wc->remaining > 0 && wc->chead_blocks < MAX_CHEAD_BLOCKS &&
1018                     (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE) {
1019                         wc->chead_blocks++;
1020                         wc->remaining--;
1021                         wc->cluster_space += fs->blocksize;
1022                 } else
1023                         cluster_flush(fs, 0);
1024         }
1025
1026         fs->cluster_updates -= uh->reserved;
1027         lafs_space_return(fs, uh->reserved);
1028         uh->reserved = 0;
1029
1030         cluster_addhead(wc, ino, &head_start);
1031         if (b) {
1032                 mapping = map_dblock(b);
1033                 str1 = mapping + offset;
1034         }
1035         cluster_addmini(wc, addr, offset, len, str1, len2, str2);
1036         if (b)
1037                 unmap_dblock(b, mapping);
1038         cluster_closehead(wc, head_start);
1039         wc->cluster_space -= (sizeof(struct group_head)+
1040                               sizeof(struct descriptor) +
1041                               ROUND_UP(len));
1042         seq = wc->cluster_seq;
1043         mutex_unlock(&wc->lock);
1044         return seq;
1045 }
1046
1047 unsigned long long
1048 lafs_cluster_update_commit(struct update_handle *uh,
1049                            struct datablock *b,
1050                            int offset, int len)
1051 {
1052         /* OK here is the data to put in the next cluster. */
1053         struct fs *fs = fs_from_inode(b->b.inode);
1054         unsigned long long seq;
1055
1056         seq = lafs_cluster_update_commit_both(uh, fs, b->b.inode, b->b.fileaddr,
1057                                               offset, len, b, NULL, 0, NULL);
1058
1059         return seq;
1060 }
1061
1062 unsigned long long
1063 lafs_cluster_update_commit_buf(struct update_handle *uh, struct fs *fs,
1064                                struct inode *ino, u32 addr,
1065                                int offset, int len, const char *str1,
1066                                int len2, const char *str2)
1067 {
1068         return lafs_cluster_update_commit_both(uh, fs,
1069                                                ino, addr,
1070                                                offset, len,
1071                                                NULL,
1072                                                str1, len2, str2);
1073 }
1074
1075 void
1076 lafs_cluster_update_abort(struct update_handle *uh)
1077 {
1078         /* Didn't want that cluster_update after all */
1079         uh->fs->cluster_updates -= uh->reserved;
1080         lafs_space_return(uh->fs, uh->reserved);
1081 }
1082
1083 int lafs_calc_cluster_csum(struct cluster_head *head)
1084 {
1085         unsigned int oldcsum = head->checksum;
1086         unsigned long csum;
1087
1088         head->checksum = 0;
1089         csum = crc32_le(0, (unsigned char *)head, le16_to_cpu(head->Hlength));
1090         head->checksum = oldcsum;
1091         return csum;
1092 }
1093
1094 /*------------------------------------------------------------------------
1095  * Cluster flushing.
1096  * Once we have a suitable number of blocks and updates in a writecluster
1097  * we need to write them out.
1098  * We first iterate over the block list building the clusterhead.
1099  * Then we finalise and write the clusterhead and iterate again
1100  * writing out each block.
1101  * We keep a biassed blockcount in the wc, and decrement it on write-completion.
1102  * Once the blockcount hits the bias the cluster is written.  Then when
1103  * the next (or next-plus-one) cluster head is written we remove the bias
1104  * and the data is deemed to be safe.
1105  *
1106  * Update blocks will already have been included in the write cluster.
1107  *
1108  * We reinitialise the write cluster once we are finished.
1109  */
1110
1111 static void cluster_done(struct fs *fs, struct wc *wc)
1112 {
1113         /* This is called when one of the pending_cnts hit 0, indicating
1114          * that a write-cluster has been written and committed to storage.
1115          * We find all the pending blocks on a completed cluster and
1116          * acknowledge them.
1117          */
1118         int i;
1119         struct block *b, *tmp;
1120         struct list_head tofree;
1121
1122         INIT_LIST_HEAD(&tofree);
1123
1124         spin_lock(&fs->lock);
1125         for (i = 0; i < 4 ; i++)
1126                 if (atomic_read(&wc->pending_cnt[i]) == 0)
1127                         list_splice_init(&wc->pending_blocks[i], &tofree);
1128         spin_unlock(&fs->lock);
1129
1130         list_for_each_entry_safe(b, tmp, &tofree, lru) {
1131                 list_del_init(&b->lru);
1132                 lafs_writeback_done(b);
1133                 putref(b, MKREF(cluster));
1134         }
1135 }
1136
1137 void lafs_clusters_done(struct fs *fs)
1138 {
1139         int i;
1140         for (i = 0 ; i < WC_NUM; i++)
1141                 cluster_done(fs, &fs->wc[i]);
1142 }
1143
1144 void lafs_done_work(struct work_struct *ws)
1145 {
1146         struct fs *fs = container_of(ws, struct fs, done_work);
1147         lafs_clusters_done(fs);
1148 }
1149
1150 static void cluster_flush(struct fs *fs, int cnum)
1151 {
1152         struct wc *wc = &fs->wc[cnum];
1153         int i;
1154         struct block *b;
1155         struct inode *current_inode = NULL;
1156         u64 current_block = NoBlock;
1157         u64 head_addr[MAX_CHEAD_BLOCKS];
1158         struct descriptor *desc_start = NULL;
1159         struct group_head *head_start = NULL;
1160         struct segpos segend;
1161         int which;
1162         int wake;
1163         int vtype;
1164         int cluster_size;
1165
1166         if (!test_bit(CheckpointOpen, &fs->fsstate)) {
1167                 /* First cluster since a checkpoint, make sure
1168                  * we do another checkpoint within 30 seconds
1169                  */
1170                 fs->next_checkpoint = jiffies + 30 * HZ;
1171                 set_bit(CheckpointOpen, &fs->fsstate);
1172                 lafs_wake_thread(fs);
1173         }
1174         /* set the writecluster size as this will guide layout in
1175          * some cases
1176          */
1177
1178         if (test_and_clear_bit(FlushNeeded, &fs->fsstate))
1179                 set_bit(SecondFlushNeeded, &fs->fsstate);
1180         else
1181                 clear_bit(SecondFlushNeeded, &fs->fsstate);
1182
1183         cluster_size = lafs_seg_setsize(fs, &wc->seg,
1184                                         seg_remainder(fs, &wc->seg)
1185                                         - wc->remaining);
1186
1187         /* find, and step over, address header block(s) */
1188         for (i = 0; i < wc->chead_blocks ; i++)
1189                 head_addr[i] = lafs_seg_next(fs, &wc->seg);
1190
1191         list_for_each_entry(b, &wc->clhead, lru) {
1192                 u64 addr;
1193                 if (b->inode != current_inode) {
1194                         /* need to create a new group_head */
1195                         desc_start = NULL;
1196                         if (head_start)
1197                                 cluster_closehead(wc, head_start);
1198                         cluster_addhead(wc, b->inode, &head_start);
1199                         current_inode = b->inode;
1200                         current_block = NoBlock;
1201                 }
1202                 if (desc_start == NULL || b->fileaddr != current_block+1 ||
1203                     test_bit(B_Index, &b->flags)) {
1204                         cluster_adddesc(wc, b, &desc_start);
1205                         current_block = b->fileaddr;
1206                 } else
1207                         current_block++;
1208                 cluster_incdesc(wc, desc_start, b, fs->blocksize_bits);
1209                 addr = lafs_seg_next(fs, &wc->seg);
1210                 BUG_ON(!addr || !(addr+1));
1211                 if (cnum && test_bit(B_Dirty, &b->flags))
1212                         /* We are cleaning but this block is now dirty.
1213                          * Don't waste the UnincCredit on recording the
1214                          * clean location as it will be written as
1215                          * dirty soon.  Just pin it to the current phase
1216                          * to ensure that it really does get written.
1217                          */
1218                         lafs_pin_block(b);
1219                 else
1220                         lafs_allocated_block(fs, b, addr);
1221         }
1222
1223         if (head_start)
1224                 cluster_closehead(wc, head_start);
1225         segend = wc->seg; /* We may write zeros from here */
1226         seg_step(fs, &wc->seg);
1227         wc->remaining = seg_remainder(fs, &wc->seg);
1228         /* Need to make sure our ->next_addr gets set properly
1229          * for non-cleaning segments
1230          */
1231         if (wc->remaining < 2) {
1232                 if (cnum == 0)
1233                         new_segment(fs, cnum);
1234                 else
1235                         close_segment(fs, cnum);
1236         }
1237
1238         /* Fill in the cluster header */
1239         strncpy(wc->chead->idtag, "LaFSHead", 8);
1240         wc->chead->flags = cpu_to_le32(0);
1241
1242         /* checkpoints only happen in cnum==0 */
1243         if (cnum == 0 && fs->checkpointing) {
1244                 spin_lock(&fs->lock);
1245                 wc->chead->flags = cpu_to_le32(fs->checkpointing);
1246                 if (fs->checkpointing & CH_CheckpointEnd) {
1247                         fs->checkpointing = 0;
1248                         clear_bit(CheckpointOpen, &fs->fsstate);
1249                 } else if (fs->checkpointing & CH_CheckpointStart) {
1250                         fs->checkpointcluster = head_addr[0];
1251                         fs->checkpointing &= ~CH_CheckpointStart;
1252                 }
1253                 spin_unlock(&fs->lock);
1254                 if (!fs->checkpointing)
1255                         wake_up(&fs->phase_wait);
1256         }
1257
1258         memcpy(wc->chead->uuid, fs->state->uuid, 16);
1259         wc->chead->seq = cpu_to_le64(wc->cluster_seq);
1260         wc->cluster_seq++;
1261         wc->chead->Hlength = cpu_to_le16(wc->chead_size);
1262         wc->chead->Clength = cpu_to_le16(cluster_size);
1263         spin_lock(&fs->lock);
1264         if (cnum)
1265                 fs->clean_reserved -= cluster_size;
1266         else
1267                 fs->free_blocks -= cluster_size;
1268         spin_unlock(&fs->lock);
1269         /* FIXME if this is just a header, no data blocks,
1270          * then use VerifyNull.  Alternately if there is
1271          * no-one waiting for a sync to complete (how do we
1272          * track that?) use VerifyNext2
1273          */
1274         vtype = VerifyNext;
1275         if (cnum)
1276                 vtype = VerifyNull;
1277         wc->pending_vfy_type[wc->pending_next] = vtype;
1278         wc->chead->verify_type = cpu_to_le16(vtype);
1279         memset(wc->chead->verify_data, 0, 16);
1280
1281         wc->chead->next_addr = cpu_to_le64(seg_addr(fs, &wc->seg));
1282         wc->chead->prev_addr = cpu_to_le64(wc->prev_addr);
1283         wc->prev_addr = head_addr[0];
1284         wc->chead->this_addr = cpu_to_le64(wc->prev_addr);
1285
1286         wc->chead->checksum = lafs_calc_cluster_csum(wc->chead);
1287
1288         /* We cannot write the header until any previous cluster
1289          * which uses the header as a commit block has been
1290          * fully written
1291          */
1292         which = (wc->pending_next+3)%4;
1293         dprintk("AA which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1294                 atomic_read(&wc->pending_cnt[which]));
1295         if (wc->pending_vfy_type[which] == VerifyNext)
1296                 wait_event(wc->pending_wait,
1297                            atomic_read(&wc->pending_cnt[which]) == 1);
1298         which = (which+3) % 4;
1299         dprintk("AB which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1300                 atomic_read(&wc->pending_cnt[which]));
1301         if (wc->pending_vfy_type[which] == VerifyNext2)
1302                 wait_event(wc->pending_wait,
1303                            atomic_read(&wc->pending_cnt[which]) == 1);
1304
1305         lafs_clusters_done(fs);
1306         dprintk("cluster_flush pre-bug pending_next=%d cnt=%d\n",
1307                 wc->pending_next, atomic_read(&wc->pending_cnt
1308                                               [wc->pending_next]));
1309         BUG_ON(atomic_read(&wc->pending_cnt[wc->pending_next]) != 0);
1310         BUG_ON(!list_empty(&wc->pending_blocks[wc->pending_next]));
1311
1312         /* initialise pending count to 1.  This will be decremented
1313          * once all related blocks have been submitted for write.
1314          * This includes the blocks in the next 0, 1, or 2
1315          * write clusters.
1316          */
1317         atomic_inc(&wc->pending_cnt[wc->pending_next]);
1318
1319         /* Now write it all out.
1320          * Later we should possibly re-order the writes
1321          * for raid4 stripe-at-a-time
1322          */
1323         for (i = 0; i < wc->chead_blocks; i++)
1324                 lafs_write_head(fs,
1325                                 page_address(wc->page[wc->pending_next])
1326                                 + i * fs->blocksize,
1327                                 head_addr[i], wc);
1328
1329         while (!list_empty(&wc->clhead)) {
1330                 int credits = 0;
1331                 b = list_entry(wc->clhead.next, struct block, lru);
1332                 dprintk("flushing %s\n", strblk(b));
1333
1334                 if (test_and_clear_bit(B_Realloc, &b->flags))
1335                         credits++;
1336                 if (cnum == 0) {
1337                         if (test_and_clear_bit(B_Dirty, &b->flags))
1338                                 credits++;
1339                 }
1340
1341                 WARN_ON(credits == 0);
1342                 lafs_space_return(fs, credits);
1343                 spin_lock(&fs->lock);
1344                 list_del(&b->lru);
1345                 list_add(&b->lru, &wc->pending_blocks[(wc->pending_next+3)%4]);
1346                 spin_unlock(&fs->lock);
1347                 lafs_write_block(fs, b, wc);
1348                 lafs_refile(b, 0);
1349         }
1350         skip_discard(wc->slhead.next[0]);
1351         /* FIXME write out zeros to pad raid4 if needed */
1352
1353         /* Having submitted this request we need to remove the bias
1354          * from pending_cnt of previous clusters
1355          */
1356         which = wc->pending_next;
1357         wake = 0;
1358         dprintk("A which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1359                 atomic_read(&wc->pending_cnt[which]));
1360         if (wc->pending_vfy_type[which] == VerifyNull)
1361                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1362                         wake = 1;
1363         which = (which+3) % 4;
1364         dprintk("B which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1365                 atomic_read(&wc->pending_cnt[which]));
1366         if (wc->pending_vfy_type[which] == VerifyNext)
1367                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1368                         wake = 1;
1369         which = (which+3) % 4;
1370         dprintk("C which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1371                 atomic_read(&wc->pending_cnt[which]));
1372         if (wc->pending_vfy_type[which] == VerifyNext2)
1373                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1374                         wake = 1;
1375         if (wake) {
1376                 cluster_done(fs, wc);
1377                 wake_up(&wc->pending_wait);
1378         }
1379         dprintk("D %d\n", wake);
1380
1381         lafs_write_flush(fs, wc);
1382
1383         wc->pending_next = (wc->pending_next+1) % 4;
1384         /* now re-initialise the cluster information */
1385         cluster_reset(fs, wc);
1386
1387         wait_event(wc->pending_wait,
1388                    atomic_read(&wc->pending_cnt[wc->pending_next]) == 0);
1389         dprintk("cluster_flush end pending_next=%d cnt=%d\n",
1390                 wc->pending_next, atomic_read(&wc->pending_cnt
1391                                               [wc->pending_next]));
1392 }
1393
1394 void lafs_cluster_flush(struct fs *fs, int cnum)
1395 {
1396         struct wc *wc = &fs->wc[cnum];
1397         dprintk("LAFS_cluster_flush %d\n", cnum);
1398         mutex_lock(&wc->lock);
1399         if (!list_empty(&wc->clhead)
1400             || wc->chead_size > sizeof(struct cluster_head)
1401             || (cnum == 0 &&
1402                 ((fs->checkpointing & CH_CheckpointEnd)
1403                  || test_bit(FlushNeeded, &fs->fsstate)
1404                  || test_bit(SecondFlushNeeded, &fs->fsstate)
1405                         )))
1406                 cluster_flush(fs, cnum);
1407         mutex_unlock(&wc->lock);
1408 }
1409
1410 void lafs_cluster_wait_all(struct fs *fs)
1411 {
1412         int i;
1413         for (i = 0; i < WC_NUM; i++) {
1414                 struct wc *wc = &fs->wc[i];
1415                 int j;
1416                 for (j = 0; j < 4; j++) {
1417                         wait_event(wc->pending_wait,
1418                                    atomic_read(&wc->pending_cnt[j]) <= 1);
1419                 }
1420         }
1421 }
1422
1423 /* The end_io function for writes to a cluster is one
1424  * of 4 depending on which in the circular list the
1425  * block is for.
1426  */
1427 static void cluster_end_io(struct bio *bio, int err,
1428                    int which, int header)
1429 {
1430         /* FIXME how to handle errors? */
1431         struct wc *wc = bio->bi_private;
1432         struct fs *fs = container_of(wc, struct fs, wc[wc->cnum]);
1433         int wake = 0;
1434         int done = 0;
1435
1436         dprintk("end_io err=%d which=%d header=%d\n",
1437                 err, which, header);
1438
1439         if (atomic_dec_and_test(&wc->pending_cnt[which]))
1440                 done++;
1441         if (atomic_read(&wc->pending_cnt[which]) == 1)
1442                 wake = 1;
1443         which = (which+3) % 4;
1444         if (header &&
1445             (wc->pending_vfy_type[which] == VerifyNext ||
1446              wc->pending_vfy_type[which] == VerifyNext2) &&
1447             atomic_dec_and_test(&wc->pending_cnt[which]))
1448                 done++;
1449         if (atomic_read(&wc->pending_cnt[which]) == 1)
1450                 wake = 1;
1451
1452         which = (which+3) % 4;
1453         if (header &&
1454             wc->pending_vfy_type[which] == VerifyNext2 &&
1455             atomic_dec_and_test(&wc->pending_cnt[which]))
1456                 done++;
1457         if (atomic_read(&wc->pending_cnt[which]) == 1)
1458                 wake = 1;
1459
1460         if (done)
1461                 schedule_work(&fs->done_work);
1462         if (done || wake) {
1463                 wake_up(&wc->pending_wait);
1464                 if (test_bit(FlushNeeded, &fs->fsstate) ||
1465                     test_bit(SecondFlushNeeded, &fs->fsstate))
1466                         lafs_wake_thread(fs);
1467         }
1468 }
1469
1470 static void cluster_endio_data_0(struct bio *bio, int err)
1471 {
1472         cluster_end_io(bio, err, 0, 0);
1473 }
1474
1475 static void cluster_endio_data_1(struct bio *bio, int err)
1476 {
1477         cluster_end_io(bio, err, 1, 0);
1478 }
1479
1480 static void cluster_endio_data_2(struct bio *bio, int err)
1481 {
1482         cluster_end_io(bio, err, 2, 0);
1483 }
1484
1485 static void cluster_endio_data_3(struct bio *bio, int err)
1486 {
1487         cluster_end_io(bio, err, 3, 0);
1488 }
1489
1490 static void cluster_endio_header_0(struct bio *bio, int err)
1491 {
1492         cluster_end_io(bio, err, 0, 1);
1493 }
1494
1495 static void cluster_endio_header_1(struct bio *bio, int err)
1496 {
1497         cluster_end_io(bio, err, 1, 1);
1498 }
1499
1500 static void cluster_endio_header_2(struct bio *bio, int err)
1501 {
1502         cluster_end_io(bio, err, 2, 1);
1503 }
1504
1505 static void cluster_endio_header_3(struct bio *bio, int err)
1506 {
1507         cluster_end_io(bio, err, 3, 1);
1508 }
1509
1510 bio_end_io_t *lafs_cluster_endio_choose(int which, int header)
1511 {
1512         if (header)
1513                 if ((which&2) == 0)
1514                         if (which == 0)
1515                                 return cluster_endio_header_0;
1516                         else
1517                                 return cluster_endio_header_1;
1518                 else
1519                         if (which == 2)
1520                                 return cluster_endio_header_2;
1521                         else
1522                                 return cluster_endio_header_3;
1523         else
1524                 if ((which&2) == 0)
1525                         if (which == 0)
1526                                 return cluster_endio_data_0;
1527                         else
1528                                 return cluster_endio_data_1;
1529                 else
1530                         if (which == 2)
1531                                 return cluster_endio_data_2;
1532                         else
1533                                 return cluster_endio_data_3;
1534 }
1535
1536 int lafs_cluster_init(struct fs *fs, int cnum, u64 addr, u64 prev, u64 seq)
1537 {
1538         struct wc *wc = &fs->wc[cnum];
1539         int i;
1540
1541         wc->slhead.b = NULL;
1542         INIT_LIST_HEAD(&wc->clhead);
1543
1544         for (i = 0; i < 4 ; i++) {
1545                 wc->pending_vfy_type[i] = VerifyNull;
1546                 atomic_set(&wc->pending_cnt[i], 0);
1547                 wc->page[i] = alloc_page(GFP_KERNEL);
1548                 if (!wc->page[i])
1549                         break;
1550         }
1551         if (i < 4) {
1552                 while (i > 0) {
1553                         i--;
1554                         put_page(wc->page[i]);
1555                 }
1556                 return -ENOMEM;
1557         }
1558         wc->pending_next = 0;
1559         wc->cluster_seq = seq;
1560         wc->prev_addr = prev;
1561         wc->cnum = cnum;
1562
1563         if (addr) {
1564                 lafs_seg_setpos(fs, &wc->seg, addr);
1565                 cluster_reset(fs, wc);
1566                 BUG_ON(cnum);
1567                 spin_lock(&fs->lock);
1568                 fs->free_blocks += wc->remaining+1;
1569                 spin_unlock(&fs->lock);
1570         }
1571
1572         return 0;
1573 }
1574
1575 void lafs_flush(struct datablock *b)
1576 {
1577         /* Need to write this block out and wait until
1578          * it has been written, so that we can update it
1579          * without risking corruption to previous snapshot.
1580          *
1581          */
1582 }
1583
1584 int lafs_cluster_empty(struct fs *fs, int cnum)
1585 {
1586         return list_empty(&fs->wc[cnum].clhead);
1587 }
1588