]> git.neil.brown.name Git - LaFS.git/blob - cluster.c
roll: handle directory updates.
[LaFS.git] / cluster.c
1
2 /*
3  * write-cluster management routines for LaFS
4  * fs/lafs/cluster.c
5  * Copyright (C) 2006-2009
6  * NeilBrown <neilb@suse.de>
7  * Released under the GPL, version 2
8  */
9
10 /*
11  * Blocks to be written to a cluster need to be sorted so as to minimise
12  * fragmentation and to keep the cluster head small.
13  * So that we can track the amount of space needed in the cluster head,
14  * we really need to keep the list sorted rather than just sort it
15  * before writeout.
16  * We do this using a linked list of blocks and an insertion sort.
17  * To improve the speed of insertion we keep a modify skiplist structure
18  * over the list.
19  * This is 'modified' in that it explicitly understands sequential
20  * runs within the list and doesn't keep extra skip-points within a run.
21  *
22  * The skip-points require memory allocation, which cannot be
23  * guaranteed here in the writeout path.  To handle this we
24  *  - preallocate some entries
25  *  - accept a less-efficient list if memory isn't available
26  *  - flush early if things get really bad.
27  *
28  * Skip point placement and runs.
29  * When an entry is added to the list, we place a skip-point over it
30  * based on the value of a random number, with a 1-in-4 chance of placement.
31  * The height of the skip point is also random - we keep rolling the dice,
32  * increasing height each time that we get 1-in-4.
33  * If the new entry is placed immediately after a skip point, and is
34  * consecutive with that skip point, the skip point is moved forward
35  * so that it will always be at the end of a run.
36  * When a new entry finds itself at the start or end of a run, a skip
37  * point is not added.
38  */
39
40 /* A height of 8 with a fanout of 1-in-4 allows reasonable
41  * addressing for 2^16 entries, which is probably more than we need
42  */
43
44 #include        "lafs.h"
45 #include        <linux/random.h>
46 #include        <linux/slab.h>
47
48 static void cluster_flush(struct fs *fs, int cnum);
49
50 static void skip_discard(struct skippoint *sp)
51 {
52         while (sp) {
53                 struct skippoint *next = sp->next[0];
54                 kfree(sp);
55                 sp = next;
56         }
57 }
58
59 static int cmp_blk(struct block *a, struct block *b)
60 {
61         /* compare the filesys/inode/index/addr info
62          * of two block and return:
63          *  0 if equals
64          * +ve if a > b
65          * -1 if consecutive in order
66          * -2 if in same inode and in order
67          * <=-3 if different inodes and correct order
68          */
69
70         if (a->inode != b->inode) {
71                 struct inode *fsinoa = ino_from_sb(a->inode->i_sb);
72                 struct inode *fsinob = ino_from_sb(b->inode->i_sb);
73                 if (fsinoa->i_ino <
74                     fsinob->i_ino)
75                         return -3;
76                 if (fsinoa->i_ino >
77                     fsinob->i_ino)
78                         return 3;
79                 /* same filesystem */
80                 if (a->inode->i_ino <
81                     b->inode->i_ino)
82                         return -3;
83                 return 3;
84         }
85         /* Same inode.  If both data blocks they might be
86          * consecutive
87          */
88         if (!test_bit(B_Index, &a->flags) &&
89             !test_bit(B_Index, &b->flags)) {
90                 if (a->fileaddr == b->fileaddr)
91                         return 0;
92                 if (a->fileaddr + 1 == b->fileaddr)
93                         return -1;
94                 if (a->fileaddr < b->fileaddr)
95                         return -2;
96                 return 2;
97         }
98         /* at least one is an index block. Sort them before data */
99         if (!test_bit(B_Index, &a->flags))
100                 /* a is data, b is index, b first */
101                 return 2;
102
103         if (!test_bit(B_Index, &b->flags))
104                 return -2;
105
106         /* both are index blocks, use fileaddr or depth */
107         if (a->fileaddr < b->fileaddr)
108                 return -2;
109         if (a->fileaddr > b->fileaddr)
110                 return 2;
111         if (iblk(a)->depth < iblk(b)->depth)
112                 return -2;
113         /* equality really shouldn't be possible */
114         return 2;
115 }
116
117 static struct block *skip_find(struct skippoint *head,
118                                struct list_head *list,
119                                struct block *target,
120                                struct skippoint *where)
121 {
122         /* Find the last block that precedes 'target'
123          * and return it.  If target will be the start,
124          * return NULL.
125          * 'where' gets filled in with the last skip point
126          * at each level which is before-or-at the found
127          * block.  This can be used for easy insertion.
128          */
129
130         int level;
131         struct block *b, *next;
132         for (level = SKIP_MAX_HEIGHT-1; level >= 0 ; level--) {
133                 while (head->next[level] &&
134                        cmp_blk(head->next[level]->b, target) < 0)
135                         head = head->next[level];
136                 where->next[level] = head;
137         }
138         if (head->b == NULL) {
139                 if (list_empty(list) ||
140                     cmp_blk(list_entry(list->next, struct block, lru),
141                             target) > 0)
142                         /* This goes at the start */
143                         return NULL;
144                 b = list_entry(list->next, struct block, lru);
145         } else
146                 b = head->b;
147         /* b is before target */
148         next = b;
149         list_for_each_entry_continue(next, list, lru) {
150                 if (cmp_blk(next, target) > 0)
151                         break;
152                 b = next;
153         }
154         /* b is the last element before target */
155         return b;
156 }
157
158 static int cluster_insert(struct skippoint *head,
159                           struct list_head *list,
160                           struct block *target,
161                           int avail)
162 {
163         /* Insert 'target' at an appropriate point in the
164          * list, creating a skippoint if appropriate.
165          * Return:
166          *  0 if part of a run
167          *  1 if not in a run, but adjacent to something in same file
168          *  2 otherwise
169          * However if the value we would return would be more than
170          * 'avail' abort the insert and return -1;
171          */
172         int rv;
173         struct skippoint pos, *newpoint;
174         int level;
175         struct block *b;
176         int cmpbefore, cmpafter;
177         unsigned long rand[DIV_ROUND_UP(SKIP_MAX_HEIGHT * 2 / 8 + 1,
178                                         sizeof(unsigned long))];
179         int height;
180
181         BUG_ON(!list_empty(&target->lru));
182         if (unlikely(list_empty(list))) {
183                 int height;
184                 /* handle this trivial case separately */
185                 if (avail < 2)
186                         return -1;
187                 list_add(&target->lru, list);
188                 head->b = NULL;
189                 for (height = 0; height < SKIP_MAX_HEIGHT; height++)
190                         head->next[height] = NULL;
191                 return 2;
192         }
193         b = skip_find(head, list, target, &pos);
194         if (b == NULL) {
195                 list_add(&target->lru, list);
196                 cmpbefore = 3;
197         } else {
198                 list_add(&target->lru, &b->lru);
199                 cmpbefore = cmp_blk(b, target);
200         }
201         if (target->lru.next  == list) /* new last */
202                 cmpafter = -3;
203         else
204                 cmpafter = cmp_blk(target, list_entry(target->lru.next,
205                                                       struct block,
206                                                       lru));
207         if (cmpbefore == -1) {
208                 /* adjacent with previous.  Possibly move skippoint */
209                 if (pos.next[0]->b == b) {
210                         pos.next[0]->b = target;
211                         return 0;
212                 }
213                 rv = 0;
214         } else if (cmpafter == -1)
215                 /* adjacent with next. No need to add a skippoint */
216                 return 0;
217         else if (cmpbefore == -2 || cmpafter == -2)
218                 /* same inodes as next or previous */
219                 rv = 1;
220         else
221                 rv = 2;
222
223         if (rv > avail) {
224                 /* Doesn't fit */
225                 list_del_init(&target->lru);
226                 return -1;
227         }
228         /* Now consider adding a skippoint here.  We want 2 bits
229          * per level of random data
230          */
231         get_random_bytes(rand, sizeof(rand));
232         height = 0;
233         while (height < SKIP_MAX_HEIGHT &&
234                test_bit(height*2, rand) &&
235                test_bit(height*2+1, rand))
236                 height++;
237         if (height == 0)
238                 return rv;
239
240         newpoint = kmalloc(sizeof(struct skippoint), GFP_NOFS);
241         if (!newpoint)
242                 return rv;
243         newpoint->b = target;
244         for (level = 0; level < height-1; level++) {
245                 newpoint->next[level] = pos.next[level];
246                 pos.next[level] = newpoint;
247         }
248         return rv;
249 }
250
251 /*-----------------------------------------------------------------------
252  * A segment is divided up in a slightly complicated way into
253  * tables, rows and columns.  This allows us to align writes with
254  * stripes in a raid4 array or similar.
255  * So we have some support routines to help track our way around
256  * the segment
257  */
258
259 static int seg_remainder(struct fs *fs, struct segpos *seg)
260 {
261         /* return the number of blocks from the start of segpos to the
262          * end of the segment.
263          * i.e. remaining rows in this table, plus remaining tables in
264          * this segment
265          */
266         struct fs_dev *dv = &fs->devs[seg->dev];
267         int rows = dv->rows_per_table - seg->st_row;
268
269         BUG_ON(seg->dev < 0);
270         rows += dv->rows_per_table * (dv->tables_per_seg - seg->st_table - 1);
271         return rows * dv->width;
272 }
273
274 static void seg_step(struct fs *fs, struct segpos *seg)
275 {
276         /* reposition this segpos to be immediately after it's current end
277          * and make the 'current' point be the start.
278          * Size will be empty
279          */
280         seg->st_table = seg->nxt_table;
281         seg->st_row = seg->nxt_row;
282         seg->table = seg->st_table;
283         seg->row = seg->st_row;
284         seg->col = 0;
285 }
286
287 u32 lafs_seg_setsize(struct fs *fs, struct segpos *seg, u32 size)
288 {
289         /* move the 'nxt' table/row to be 'size' blocks beyond
290          * current start. size will be rounded up to a multiple
291          * of width.
292          */
293         struct fs_dev *dv = &fs->devs[seg->dev];
294         u32 rv;
295         BUG_ON(seg->dev < 0);
296         size = (size + dv->width - 1) / dv->width;
297         rv = size * dv->width;
298         size += seg->st_row;
299         seg->nxt_table = seg->st_table + size / dv->rows_per_table;
300         seg->nxt_row = size % dv->rows_per_table;
301         return rv;
302 }
303
304 void lafs_seg_setpos(struct fs *fs, struct segpos *seg, u64 addr)
305 {
306         /* set seg to start at the given virtual address, and be
307          * of size 0
308          * 'addr' should always be at the start of a row, though
309          * as it might be read off storage, we need to be
310          * a little bit careful.
311          */
312         u32 offset;
313         u32 row, col, table;
314         struct fs_dev *dv;
315         virttoseg(fs, addr, &seg->dev, &seg->num, &offset);
316         dv = &fs->devs[seg->dev];
317         col = offset / dv->stride;
318         row = offset % dv->stride;
319         table = col / dv->width;
320         col = col % dv->width;
321         BUG_ON(col);
322
323         seg->st_table = table;
324         seg->st_row = row;
325
326         seg->table = seg->nxt_table = seg->st_table;
327         seg->row = seg->nxt_row = seg->st_row;
328         seg->col = 0;
329 }
330
331 static u64 seg_addr(struct fs *fs, struct segpos *seg)
332 {
333         /* Return the virtual address of the blocks pointed
334          * to by 'seg'.
335          */
336         struct fs_dev *dv = &fs->devs[seg->dev];
337         u64 addr;
338
339         if (seg->dev < 0)
340                 /* Setting 'next' address for last cluster in
341                  * a cleaner segment */
342                 return 0;
343         if (seg->table > seg->nxt_table ||
344             (seg->table == seg->nxt_table &&
345              seg->row >= seg->nxt_row))
346                 /* We have gone beyond the end of the cluster,
347                  * must be bad data during roll-forward
348                  */
349                 return 0;
350         addr = seg->col * dv->stride;
351         addr += seg->row;
352         addr += seg->table * dv->rows_per_table;
353         addr += seg->num * dv->segment_stride;
354         addr += dv->start;
355         return addr;
356 }
357
358 u64 lafs_seg_next(struct fs *fs, struct segpos *seg)
359 {
360         /* step forward one block, returning the address of
361          * the block stepped over
362          * The write cluster can have three sections:
363          * - the tail end of one table
364          * - a number of complete tables
365          * - the head of one table
366          *
367          * For each table or partial table we want to talk a full
368          * column at a time, then go to the next column.
369          * So we step to the next row.  If it is beyond the range of
370          * the current table, go to 'start' of next column, or to
371          * next table.
372          */
373         struct fs_dev *dv = &fs->devs[seg->dev];
374         u64 addr = seg_addr(fs, seg);
375
376         if (!addr)
377                 /* Beyond end of cluster */
378                 return 0;
379
380         /* now step forward in column or table or seg */
381         seg->row++;
382         if (seg->row >= dv->rows_per_table ||
383             (seg->table == seg->nxt_table
384              && seg->row >= seg->nxt_row)) {
385                 seg->col++;
386                 if (seg->col >= dv->width) {
387                         seg->col = 0;
388                         seg->table++;
389                 }
390                 if (seg->table == seg->st_table)
391                         seg->row = seg->st_row;
392                 else
393                         seg->row = 0;
394         }
395         return addr;
396 }
397
398 static void cluster_reset(struct fs *fs, struct wc *wc)
399 {
400         if (wc->seg.dev < 0) {
401                 wc->remaining = 0;
402                 wc->chead_size = 0;
403                 return;
404         }
405         wc->remaining = seg_remainder(fs, &wc->seg);
406         wc->chead_blocks = 1;
407         wc->remaining--;
408         wc->cluster_space = fs->blocksize - sizeof(struct cluster_head);
409         wc->chead = page_address(wc->page[wc->pending_next]);
410         wc->chead_size = sizeof(struct cluster_head);
411 }
412
413 static void close_segment(struct fs *fs, int cnum)
414 {
415         /* Release old segment */
416         /* FIXME I need lafs_seg_{de,}ref for every snapshot,
417          * don't I ???
418          */
419         struct wc *wc = fs->wc + cnum;
420         if (wc->seg.dev >= 0) {
421                 if (cnum) {
422                         spin_lock(&fs->lock);
423                         fs->clean_reserved -= seg_remainder(fs, &wc->seg);
424                         spin_unlock(&fs->lock);
425                 }
426                 lafs_seg_forget(fs, wc->seg.dev, wc->seg.num);
427                 wc->seg.dev = -1;
428         }
429 }
430
431 void lafs_close_all_segments(struct fs *fs)
432 {
433         int cnum;
434         for (cnum = 0; cnum < WC_NUM; cnum++)
435                 close_segment(fs, cnum);
436 }
437
438 static int new_segment(struct fs *fs, int cnum)
439 {
440         /* new_segment can fail if cnum > 0 and there is no
441          * clean_reserved
442          */
443         struct wc *wc = fs->wc + cnum;
444         unsigned int dev;
445         u32 seg;
446
447         close_segment(fs, cnum);
448
449         if (cnum && fs->clean_reserved < fs->max_segment) {
450                 /* we have reached the end of the last cleaner
451                  * segment.  The remainder of clean_reserved
452                  * is of no value (if there even is any)
453                  */
454                 if (fs->clean_reserved) {
455                         spin_lock(&fs->alloc_lock);
456                         fs->free_blocks += fs->clean_reserved;
457                         fs->clean_reserved = 0;
458                         spin_unlock(&fs->alloc_lock);
459                 }
460                 return -ENOSPC;
461         }
462         /* This gets a reference on the 'segsum' */
463         lafs_free_get(fs, &dev, &seg, 0);
464         lafs_seg_setpos(fs, &wc->seg, segtovirt(fs, dev, seg));
465         BUG_ON(wc->seg.dev != dev);
466         BUG_ON(wc->seg.num != seg);
467
468         if (cnum == 0)
469                 /* wc mutex protects us here */
470                 fs->newsegments++;
471
472         return 0;
473 }
474
475 /*-------------------------------------------------------------------------
476  * lafs_cluster_allocate
477  * This adds a block to the writecluster list and possibly triggers a
478  * flush.
479  * The flush will assign a physical address to each block and schedule
480  * the actual write out.
481  *
482  * There can be multiple active write clusters. e.g one for new data,
483  * one for cleaned data, one for defrag writes.
484  *
485  * It is here that the in-core inode is copied to the data block, if
486  * appropriate, and here that small files get copied into the inode
487  * thus avoiding writing a separate block for the file content.
488  *
489  * Roll-forward sees any data block as automatically extending the
490  * size of the file.  This requires that we write blocks in block-order
491  * as much as possible (which we would anyway) and that we inform
492  * roll-forward when a file ends mid-block.
493  *
494  * A block should be iolocked when being passed to lafs_cluster_allocate.
495  * When the IO is complete, it will be unlocked.
496  */
497 static int flush_data_to_inode(struct block *b)
498 {
499         /* This is the first and only datablock in a file
500          * and it will fit in the inode. So put it there.
501          */
502
503         char *ibuf, *dbuf;
504         loff_t size = i_size_read(b->inode);
505         struct lafs_inode *lai = LAFSI(b->inode);
506         struct datablock *db;
507         struct fs *fs = fs_from_inode(b->inode);
508
509         lafs_iolock_block(&lai->iblock->b);
510         dprintk("FLUSHING single block into inode - s=%d %s\n",
511                 (int)size, strblk(b));
512         if (lai->iblock->depth > 1)
513                 /* Too much indexing still - truncate must be happening */
514                 goto give_up;
515         if (lai->iblock->depth == 1) {
516                 /* If there is nothing in this block we can still use it */
517                 if (lai->iblock->children.next != &b->siblings ||
518                     b->siblings.next != &lai->iblock->children ||
519                     lafs_leaf_next(lai->iblock, 1) != 0xffffffff)
520                         goto give_up;
521         }
522         /* Need checkpoint lock to pin the dblock */
523         lafs_checkpoint_lock(fs);
524         if (test_and_clear_bit(B_Dirty, &b->flags)) {
525                 int credits = 1;
526                 /* Check size again, just in case it changed */
527                 size = i_size_read(b->inode);
528                 if (size + lai->metadata_size > fs->blocksize) {
529                         /* Lost a race, better give up restoring Dirty bit */
530                         if (test_and_set_bit(B_Dirty, &b->flags))
531                                 if (test_and_set_bit(B_Credit, &b->flags))
532                                         lafs_space_return(fs, 1);
533                         lafs_checkpoint_unlock(fs);
534                         goto give_up;
535                 }
536                 if (!test_and_set_bit(B_Credit, &lai->dblock->b.flags))
537                         credits--;
538                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
539                         credits++;
540                 if (!test_and_set_bit(B_ICredit, &lai->dblock->b.flags))
541                         credits--;
542                 lafs_space_return(fs, credits);
543                 LAFS_BUG(!test_bit(B_SegRef, &lai->dblock->b.flags), &lai->dblock->b);
544                 set_bit(B_PinPending, &lai->dblock->b.flags);
545                 if (test_bit(B_Pinned, &b->flags))
546                         lafs_pin_block_ph(&lai->dblock->b, !!test_bit(B_Phase1, &b->flags));
547                 else
548                         lafs_pin_block(&lai->dblock->b);
549                 lafs_dirty_dblock(lai->dblock);
550         } else if (test_and_clear_bit(B_Realloc, &b->flags)) {
551                 int credits = 1;
552                 LAFS_BUG(!test_bit(B_Valid, &lai->iblock->b.flags),
553                          &lai->iblock->b);
554                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
555                         credits++;
556                 if (!test_and_set_bit(B_Realloc, &lai->iblock->b.flags))
557                         credits--;
558                 if (!test_and_set_bit(B_UnincCredit, &lai->iblock->b.flags))
559                         credits--;
560                 LAFS_BUG(credits < 0, b);
561                 lafs_space_return(fs, credits);
562         } else {
563                 printk("Wasn't dirty or realloc: %s\n", strblk(b));
564                 LAFS_BUG(1, b);
565         }
566         lafs_checkpoint_unlock(fs);
567         lai->depth = 0;
568         lai->iblock->depth = 0;
569         clear_bit(B_Valid, &lai->iblock->b.flags);
570
571         /* safe to reference ->dblock as b is a dirty child */
572         db = getdref(lai->dblock, MKREF(flush2db));
573         ibuf = map_dblock(db);
574         dbuf = map_dblock_2(dblk(b));
575
576         if (test_and_clear_bit(B_UnincCredit, &b->flags))
577                 lafs_space_return(fs, 1);
578
579         /* It is an awkward time to call lafs_inode_fillblock,
580          * so do this one little change manually
581          */
582         ((struct la_inode *)ibuf)->depth = 0;
583         memcpy(ibuf + lai->metadata_size,
584                dbuf, size);
585         memset(ibuf + lai->metadata_size + size,
586                0,
587                fs->blocksize - lai->metadata_size - size);
588         unmap_dblock_2(db, ibuf);
589         unmap_dblock(dblk(b), dbuf);
590         lafs_iounlock_block(&lai->iblock->b);
591
592         putdref(db, MKREF(flush2db));
593         lafs_refile(b, 0);
594         return 1;
595
596 give_up:
597         lafs_iounlock_block(&lai->iblock->b);
598         return 0;
599 }
600
601 int lafs_cluster_allocate(struct block *b, int cnum)
602 {
603         struct fs *fs = fs_from_inode(b->inode);
604         struct wc *wc = &fs->wc[cnum];
605         struct lafs_inode *lai;
606         loff_t size;
607         int used;
608         int err = 0;
609
610         LAFS_BUG(test_bit(B_Index, &b->flags) &&
611                  iblk(b)->uninc_table.pending_cnt > 0, b);
612
613         LAFS_BUG(!test_bit(B_IOLock, &b->flags), b);
614         lai = LAFSI(b->inode);
615
616         LAFS_BUG(!test_bit(B_Valid, &b->flags), b);
617         LAFS_BUG(!fs->rolled, b);
618         LAFS_BUG(lai->flags & File_nonlogged &&
619                  !test_bit(B_Index, &b->flags), b);
620
621         /* We cannot change the physaddr of an uniblock in a
622          * different phase to the parent, else the parent will
623          * get the wrong address.
624          */
625         LAFS_BUG(test_bit(B_Index, &b->flags) &&
626                  test_bit(B_Uninc, &b->flags) &&
627                  !!test_bit(B_Phase1, &b->flags) !=
628                  !!test_bit(B_Phase1, &b->parent->b.flags), b);
629
630         size = i_size_read(b->inode);
631         if (!test_bit(B_Index, &b->flags) &&          /* it is a datablock */
632             b->fileaddr == 0 &&
633             b->parent == lai->iblock &&               /* No indexing */
634             lai->type >= TypeBase &&                  /* 'size' is meaningful */
635             size + lai->metadata_size <= fs->blocksize) {
636                 int success = flush_data_to_inode(b);
637                 if (success) {
638                         lafs_summary_update(fs, b->inode, b->physaddr, 0,
639                                             0, !!test_bit(B_Phase1, &b->flags),
640                                             test_bit(B_SegRef, &b->flags));
641                         b->physaddr = 0;
642                         lafs_iounlock_block(b);
643                         return 0;
644                 }
645                 /* There are conflicting blocks in the index tree */
646                 if (test_and_clear_bit(B_Realloc, &b->flags))
647                         lafs_space_return(fs, 1);
648                 if (!test_bit(B_Dirty, &b->flags) ||
649                     !test_bit(B_Pinned, &b->flags)) {
650                         /* It is safe to just leave this for later */
651                         lafs_iounlock_block(b);
652                         return 0;
653                 }
654                 /* We really need to write this, so use a full block. */
655                 printk("WARNING %s\n", strblk(b));
656                 WARN_ON(1);
657         }
658
659         /* The symbiosis between the datablock and the indexblock for an
660          * inode needs to be dealt with here.
661          * The data block will not normally be written until the InoIdx
662          * block is unpinned or phase-flipped.  However if it is, as the
663          * result of some 'sync' style operation, then we write it even
664          * though the index info might not be uptodate.  The rest of the
665          * content will be safe for roll-forward to pick up, and the rest
666          * will be written when the InoIdx block is ready.
667          *
668          * When the InoIdx block is ready, we don't want to write it as
669          * there is nowhere for it to be incorporated in to.  Rather we
670          * move the pinning and credits across to the Data block
671          * which will subsequently be written.
672          */
673
674         if (test_bit(B_InoIdx, &b->flags)) {
675                 struct block *b2 = &lai->dblock->b;
676                 int credits = 0;
677
678                 LAFS_BUG(!test_bit(B_Pinned, &b->flags), b);
679                 if (!test_bit(B_Pinned, &b2->flags)) {
680                         /* index block pinned, data block not,
681                          * carry the pinning across
682                          */
683                         if (b2->parent == NULL &&
684                             b->parent)
685                                 b2->parent =
686                                         getiref(b->parent, MKREF(child));
687                         LAFS_BUG(b->parent != b2->parent, b);
688                         set_bit(B_PinPending, &b2->flags);  // FIXME is this right?
689                         set_phase(b2, test_bit(B_Phase1,
690                                                &b->flags));
691                         lafs_refile(b2, 0);
692                 }
693
694                 if (cnum) {
695                         if (test_and_clear_bit(B_Realloc, &b->flags))
696                                 credits++;
697                         LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
698                 } else {
699                         if (test_and_clear_bit(B_Dirty, &b->flags))
700                                 credits++;
701                         if (test_and_clear_bit(B_Realloc, &b->flags))
702                                 credits++;
703                 }
704                 if (test_and_clear_bit(B_UnincCredit, &b->flags))
705                         credits++;
706
707                 /* We always erase the InoIdx block before the
708                  * inode data block, so this cannot happen
709                  */
710                 LAFS_BUG(!test_bit(B_Valid, &b2->flags), b2);
711
712                 if (cnum == 0) {
713                         if (!test_and_set_bit(B_UnincCredit, &b2->flags))
714                                 if (!test_and_clear_bit(B_ICredit, &b2->flags))
715                                         credits--;
716
717                         if (!test_bit(B_Dirty, &b2->flags))
718                                 /* need some credits... */
719                                 if (!test_and_set_bit(B_Credit, &b2->flags))
720                                         credits--;
721                         lafs_dirty_dblock(dblk(b2));
722                 } else {
723                         if (!test_and_set_bit(B_UnincCredit, &b2->flags))
724                                 credits--;
725
726                         if (!test_and_set_bit(B_Realloc, &b2->flags))
727                                 credits--;
728                 }
729                 LAFS_BUG(credits < 0, b2);
730                 lafs_space_return(fs, credits);
731                 /* make sure 'dirty' status is registered */
732                 lafs_refile(b2, 0);
733                 lafs_iounlock_block(b);
734                 return 0;
735         }
736
737         if (!test_bit(B_Dirty, &b->flags) &&
738             !test_bit(B_Realloc, &b->flags)) {
739                 /* block just got truncated, so don't write it. */
740                 lafs_iounlock_block(b);
741                 return 0;
742         }
743
744         if (test_bit(B_EmptyIndex, &b->flags)) {
745                 int credits = 0;
746                 if (test_and_set_bit(B_Writeback, &b->flags))
747                         LAFS_BUG(1, b);
748                 lafs_iounlock_block(b);
749                 lafs_allocated_block(fs, b, 0);
750                 if (cnum) {
751                         if (test_and_clear_bit(B_Realloc, &b->flags))
752                                 credits++;
753                         LAFS_BUG(test_bit(B_Dirty, &b->flags), b);
754                 } else {
755                         if (test_and_clear_bit(B_Dirty, &b->flags))
756                                 credits++;
757                         if (test_and_clear_bit(B_Realloc, &b->flags))
758                                 credits++;
759                 }
760                 lafs_writeback_done(b);
761                 lafs_space_return(fs, credits);
762                 return 0;
763         }
764
765         if (!test_bit(B_Index, &b->flags)) {
766                 struct inode *myi = rcu_my_inode(dblk(b));
767                 if (myi && test_and_clear_bit(I_Dirty, &LAFSI(myi)->iflags))
768                         lafs_inode_fillblock(myi);
769                 rcu_iput(myi);
770         }
771
772         mutex_lock(&wc->lock);
773
774         /* The following check must be inside the mutex_lock
775          * to ensure exclusion between checkpoint and writepage.
776          * checkpoint must never see the block not on the
777          * leaf lru unless it is already in the cluster and so can
778          * be waited for.
779          */
780         if (!list_empty_careful(&b->lru)) {
781                 /* Someone is flushing this block before
782                  * checkpoint got to it - probably writepage.
783                  * Must remove it from the leaf lru so it can go
784                  * on the cluster list.
785                  */
786                 LAFS_BUG(test_bit(B_OnFree, &b->flags), b);
787                 spin_lock(&fs->lock);
788                 if (!list_empty(&b->lru))
789                         list_del_init(&b->lru);
790                 spin_unlock(&fs->lock);
791         }
792
793         if (test_and_set_bit(B_Writeback, &b->flags))
794                 LAFS_BUG(1, b);
795         lafs_iounlock_block(b);
796
797         getref(b, MKREF(cluster)); /* we soon own a reference in the list */
798
799         for (;;) {
800                 int avail;
801                 /* see how much room is in cluster.
802                  * The interesting values are:
803                  * none, one descriptor, one grouphead and one descriptor
804                  * These are assigned values 0, 1, 2.
805                  */
806                 if (wc->cluster_space >= (sizeof(struct group_head) +
807                                           sizeof(struct descriptor)) ||
808                     (wc->remaining > 2 &&
809                      wc->chead_blocks < MAX_CHEAD_BLOCKS &&
810                      (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE)
811                         )
812                         avail = 2;
813                 else if (wc->cluster_space >= sizeof(struct descriptor))
814                         avail = 1;
815                 else
816                         avail = 0;
817                 if (wc->seg.dev < 0 || wc->remaining < 1)
818                         used = -1;
819                 else
820                         used = cluster_insert(&wc->slhead, &wc->clhead, b, avail);
821
822                 if (used >= 0)
823                         break;
824                 else if (wc->slhead.b)
825                         cluster_flush(fs, cnum);
826                 else {
827                         err = new_segment(fs, cnum);
828                         if (err) {
829                                 /* No cleaner segments - this will have to go
830                                  * out with a checkpoint.
831                                  * Block is still 'dirty' - just clear
832                                  * writeback flag.
833                                  */
834                                 lafs_writeback_done(b);
835                                 putref(b, MKREF(cluster));
836                                 mutex_unlock(&wc->lock);
837                                 return -ENOSPC;
838                         }
839                         cluster_reset(fs, wc);
840                 }
841         }
842
843         if (used > 0)
844                 wc->cluster_space -= sizeof(struct descriptor);
845         if (used > 1)
846                 wc->cluster_space -= sizeof(struct group_head);
847         if (wc->cluster_space < 0) {
848                 /* need a new page */
849                 wc->chead_blocks++;
850                 wc->remaining--;
851                 wc->cluster_space += fs->blocksize;
852         }
853         wc->remaining--;
854         BUG_ON(wc->remaining < 0);
855         if (wc->remaining == 0)
856                 cluster_flush(fs, cnum);
857         mutex_unlock(&wc->lock);
858         return 0;
859 }
860
861 /*-------------------------------------------------------------------------
862  * Cluster head building.
863  * We build the cluster head bit by bit as we find blocks
864  * in the list.  These routines help.
865  */
866
867 static void cluster_addhead(struct wc *wc, struct inode *ino,
868                             struct group_head **headstart)
869 {
870         struct group_head *gh = (struct group_head *)((char *)wc->chead +
871                                                       wc->chead_size);
872         u16 tnf;
873         dprintk("CLUSTER addhead %d\n", wc->chead_size);
874         *headstart = gh;
875
876         gh->inum = cpu_to_le32(ino->i_ino);
877         gh->fsnum = cpu_to_le32(ino_from_sb(ino->i_sb)->i_ino);
878         tnf = ((ino->i_generation<<8) | (LAFSI(ino)->trunc_gen & 0xff))
879                 & 0x7fff;
880         if (wc->cnum)
881                 tnf |= 0x8000;
882         gh->truncatenum_and_flag = cpu_to_le16(tnf);
883         wc->chead_size += sizeof(struct group_head);
884 }
885
886 static void cluster_closehead(struct wc *wc,
887                               struct group_head *headstart)
888 {
889         int size = wc->chead_size - (((char *)headstart) - (char *)wc->chead);
890
891         dprintk("CLUSTER closehead %d %d\n", wc->chead_size, size);
892         headstart->group_size_words = size / 4;
893 }
894
895 static void cluster_addmini(struct wc *wc, u32 addr, int offset,
896                             int size, const char *data,
897                             int size2, const char *data2)
898 {
899         /* if size2 !=0, then only
900          * (size-size2) is at 'data' and the rest is at 'data2'
901          */
902         struct miniblock *mb = ((struct miniblock *)
903                                 ((char *)wc->chead + wc->chead_size));
904
905         dprintk("CLUSTER addmini %d %d\n", wc->chead_size, size);
906
907         mb->block_num = cpu_to_le32(addr);
908         mb->block_offset = cpu_to_le16(offset);
909         mb->length = cpu_to_le16(size + DescMiniOffset);
910         wc->chead_size += sizeof(struct miniblock);
911         memcpy(mb->data, data, size-size2);
912         if (size2)
913                 memcpy(mb->data + size-size2, data2, size2);
914         memset(mb->data+size, 0, (-size)&3);
915         wc->chead_size += ROUND_UP(size);
916 }
917
918 static void cluster_adddesc(struct wc *wc, struct block *blk,
919                             struct descriptor **desc_start)
920 {
921         struct descriptor *dh = (struct descriptor *)((char *)wc->chead +
922                                                       wc->chead_size);
923         *desc_start = dh;
924         dprintk("CLUSTER add_desc %d\n", wc->chead_size);
925         dh->block_num = cpu_to_le32(blk->fileaddr);
926         dh->block_cnt = cpu_to_le32(0);
927         dh->block_bytes = 0;
928         if (test_bit(B_Index, &blk->flags))
929                 dh->block_bytes = DescIndex;
930         wc->chead_size += sizeof(struct descriptor);
931 }
932
933 static void cluster_incdesc(struct wc *wc, struct descriptor *desc_start,
934                             struct block *b, int blkbits)
935 {
936         desc_start->block_cnt =
937                 cpu_to_le32(le32_to_cpu(desc_start->block_cnt)+1);
938         if (!test_bit(B_Index, &b->flags)) {
939                 if (LAFSI(b->inode)->type >= TypeBase) {
940                         u64 size = b->inode->i_size;
941                         if (size > ((loff_t)b->fileaddr << blkbits) &&
942                             size <= ((loff_t)(b->fileaddr + 1) << blkbits))
943                                 desc_start->block_bytes =
944                                         cpu_to_le32(size & ((1<<blkbits)-1));
945                         else
946                                 desc_start->block_bytes =
947                                         cpu_to_le32(1 << blkbits);
948                 } else
949                         desc_start->block_bytes = cpu_to_le32(1<<blkbits);
950         }
951 }
952
953 /*------------------------------------------------------------------------
954  * Miniblocks/updates
955  * Updates are added to the cluster head immediately, causing a flush
956  * if there is insufficient room.
957  * This means that miniblocks always occur before block descriptors.
958  * miniblocks are only ever written to cluster-series 0.
959  */
960
961 int
962 lafs_cluster_update_prepare(struct update_handle *uh, struct fs *fs,
963                             int len)
964 {
965         /* make there there is room for a 'len' update in upcoming
966          * cluster.  Do nothing here.
967          */
968         uh->fs = fs;
969         uh->reserved = 0;
970         return 0;
971 }
972
973 int
974 lafs_cluster_update_pin(struct update_handle *uh)
975 {
976         /* last chance to bail out */
977         if (lafs_space_alloc(uh->fs, 1, ReleaseSpace)) {
978                 uh->reserved = 1;
979                 uh->fs->cluster_updates++;
980                 return 0;
981         }
982         return -EAGAIN;
983 }
984
985 static unsigned long long
986 lafs_cluster_update_commit_both(struct update_handle *uh, struct fs *fs,
987                                 struct inode *ino, u32 addr,
988                                 int offset, int len, struct datablock *b,
989                                 const char *str1,
990                                 int len2, const char *str2)
991 {
992         /* There is the data - alternate layout */
993         /* flush 'size' bytes of data, as 'offset' of bnum in ino
994          * in the next write cluster
995          */
996         struct wc *wc = &fs->wc[0];
997         struct group_head *head_start;
998         unsigned long long seq;
999         char *mapping = NULL;
1000
1001         BUG_ON(!fs->rolled);
1002
1003         mutex_lock(&wc->lock);
1004         while (wc->cluster_space < (sizeof(struct group_head)+
1005                                     sizeof(struct descriptor) +
1006                                     ROUND_UP(len))) {
1007                 if (wc->remaining > 0 && wc->chead_blocks < MAX_CHEAD_BLOCKS &&
1008                     (wc->chead_blocks+1) * fs->blocksize <= PAGE_SIZE) {
1009                         wc->chead_blocks++;
1010                         wc->remaining--;
1011                         wc->cluster_space += fs->blocksize;
1012                 } else
1013                         cluster_flush(fs, 0);
1014         }
1015
1016         fs->cluster_updates -= uh->reserved;
1017         lafs_space_return(fs, uh->reserved);
1018         uh->reserved = 0;
1019
1020         cluster_addhead(wc, ino, &head_start);
1021         if (b) {
1022                 mapping = map_dblock(b);
1023                 str1 = mapping + offset;
1024         }
1025         cluster_addmini(wc, addr, offset, len, str1, len2, str2);
1026         if (b)
1027                 unmap_dblock(b, mapping);
1028         cluster_closehead(wc, head_start);
1029         wc->cluster_space -= (sizeof(struct group_head)+
1030                               sizeof(struct descriptor) +
1031                               ROUND_UP(len));
1032         seq = wc->cluster_seq;
1033         mutex_unlock(&wc->lock);
1034         return seq;
1035 }
1036
1037 unsigned long long
1038 lafs_cluster_update_commit(struct update_handle *uh,
1039                            struct datablock *b,
1040                            int offset, int len)
1041 {
1042         /* OK here is the data to put in the next cluster. */
1043         struct fs *fs = fs_from_inode(b->b.inode);
1044         unsigned long long seq;
1045
1046         seq = lafs_cluster_update_commit_both(uh, fs, b->b.inode, b->b.fileaddr,
1047                                               offset, len, b, NULL, 0, NULL);
1048
1049         return seq;
1050 }
1051
1052 unsigned long long
1053 lafs_cluster_update_commit_buf(struct update_handle *uh, struct fs *fs,
1054                                struct inode *ino, u32 addr,
1055                                int offset, int len, const char *str1,
1056                                int len2, const char *str2)
1057 {
1058         return lafs_cluster_update_commit_both(uh, fs,
1059                                                ino, addr,
1060                                                offset, len,
1061                                                NULL,
1062                                                str1, len2, str2);
1063 }
1064
1065 void
1066 lafs_cluster_update_abort(struct update_handle *uh)
1067 {
1068         /* Didn't want that cluster_update after all */
1069         uh->fs->cluster_updates -= uh->reserved;
1070         lafs_space_return(uh->fs, uh->reserved);
1071 }
1072
1073 int lafs_calc_cluster_csum(struct cluster_head *head)
1074 {
1075         unsigned int oldcsum = head->checksum;
1076         unsigned long long newcsum = 0;
1077         unsigned long csum;
1078         int i;
1079         unsigned int *superc = (unsigned int *) head;
1080         head->checksum = 0;
1081
1082         for (i = 0; i < le16_to_cpu(head->Hlength)/4; i++)
1083                 newcsum += le32_to_cpu(superc[i]);
1084         csum = (newcsum & 0xffffffff) + (newcsum>>32);
1085         head->checksum = oldcsum;
1086         return cpu_to_le32(csum);
1087 }
1088
1089 /*------------------------------------------------------------------------
1090  * Cluster flushing.
1091  * Once we have a suitable number of blocks and updates in a writecluster
1092  * we need to write them out.
1093  * We first iterate over the block list building the clusterhead.
1094  * Then we finalise and write the clusterhead and iterate again
1095  * writing out each block.
1096  * We keep a biassed blockcount in the wc, and decrement it on write-completion.
1097  * Once the blockcount hits the bias the cluster is written.  Then when
1098  * the next (or next-plus-one) cluster head is written we remove the bias
1099  * and the data is deemed to be safe.
1100  *
1101  * Update blocks will already have been included in the write cluster.
1102  *
1103  * We reinitialise the write cluster once we are finished.
1104  */
1105
1106 static void cluster_done(struct fs *fs, struct wc *wc)
1107 {
1108         /* This is called when one of the pending_cnts hit 0, indicating
1109          * that a write-cluster has been written and committed to storage.
1110          * We find all the pending blocks on a completed cluster and
1111          * acknowledge them.
1112          */
1113         int i;
1114         struct block *b, *tmp;
1115         struct list_head tofree;
1116
1117         INIT_LIST_HEAD(&tofree);
1118
1119         spin_lock(&fs->lock);
1120         for (i = 0; i < 4 ; i++)
1121                 if (atomic_read(&wc->pending_cnt[i]) == 0)
1122                         list_splice_init(&wc->pending_blocks[i], &tofree);
1123         spin_unlock(&fs->lock);
1124
1125         list_for_each_entry_safe(b, tmp, &tofree, lru) {
1126                 list_del_init(&b->lru);
1127                 lafs_writeback_done(b);
1128                 putref(b, MKREF(cluster));
1129         }
1130 }
1131
1132 void lafs_clusters_done(struct fs *fs)
1133 {
1134         int i;
1135         for (i = 0 ; i < WC_NUM; i++)
1136                 cluster_done(fs, &fs->wc[i]);
1137 }
1138
1139 void lafs_done_work(struct work_struct *ws)
1140 {
1141         struct fs *fs = container_of(ws, struct fs, done_work);
1142         lafs_clusters_done(fs);
1143 }
1144
1145 static void cluster_flush(struct fs *fs, int cnum)
1146 {
1147         struct wc *wc = &fs->wc[cnum];
1148         int i;
1149         struct block *b;
1150         struct inode *current_inode = NULL;
1151         u64 current_block = NoBlock;
1152         u64 head_addr[MAX_CHEAD_BLOCKS];
1153         struct descriptor *desc_start = NULL;
1154         struct group_head *head_start = NULL;
1155         struct segpos segend;
1156         int which;
1157         int wake;
1158         int vtype;
1159         int cluster_size;
1160
1161         if (!test_bit(CheckpointOpen, &fs->fsstate)) {
1162                 /* First cluster since a checkpoint, make sure
1163                  * we do another checkpoint within 30 seconds
1164                  */
1165                 fs->next_checkpoint = jiffies + 30 * HZ;
1166                 set_bit(CheckpointOpen, &fs->fsstate);
1167                 lafs_wake_thread(fs);
1168         }
1169         /* set the writecluster size as this will guide layout in
1170          * some cases
1171          */
1172
1173         if (test_and_clear_bit(FlushNeeded, &fs->fsstate))
1174                 set_bit(SecondFlushNeeded, &fs->fsstate);
1175         else
1176                 clear_bit(SecondFlushNeeded, &fs->fsstate);
1177
1178         cluster_size = lafs_seg_setsize(fs, &wc->seg,
1179                                         seg_remainder(fs, &wc->seg)
1180                                         - wc->remaining);
1181
1182         /* find, and step over, address header block(s) */
1183         for (i = 0; i < wc->chead_blocks ; i++)
1184                 head_addr[i] = lafs_seg_next(fs, &wc->seg);
1185
1186         list_for_each_entry(b, &wc->clhead, lru) {
1187                 u64 addr;
1188                 if (b->inode != current_inode) {
1189                         /* need to create a new group_head */
1190                         desc_start = NULL;
1191                         if (head_start)
1192                                 cluster_closehead(wc, head_start);
1193                         cluster_addhead(wc, b->inode, &head_start);
1194                         current_inode = b->inode;
1195                         current_block = NoBlock;
1196                 }
1197                 if (desc_start == NULL || b->fileaddr != current_block+1 ||
1198                     test_bit(B_Index, &b->flags)) {
1199                         cluster_adddesc(wc, b, &desc_start);
1200                         current_block = b->fileaddr;
1201                 } else
1202                         current_block++;
1203                 cluster_incdesc(wc, desc_start, b, fs->blocksize_bits);
1204                 addr = lafs_seg_next(fs, &wc->seg);
1205                 BUG_ON(!addr);
1206                 if (cnum && test_bit(B_Dirty, &b->flags))
1207                         /* We are cleaning but this block is now dirty.
1208                          * Don't waste the UnincCredit on recording the
1209                          * clean location as it will be written as
1210                          * dirty soon.  Just pin it to the current phase
1211                          * to ensure that it really does get written.
1212                          */
1213                         lafs_pin_block(b);
1214                 else
1215                         lafs_allocated_block(fs, b, addr);
1216         }
1217
1218         if (head_start)
1219                 cluster_closehead(wc, head_start);
1220         segend = wc->seg; /* We may write zeros from here */
1221         seg_step(fs, &wc->seg);
1222         wc->remaining = seg_remainder(fs, &wc->seg);
1223         /* Need to make sure out ->next_addr gets set properly
1224          * for non-cleaning segments
1225          */
1226         if (wc->remaining < 2) {
1227                 if (cnum == 0)
1228                         new_segment(fs, cnum);
1229                 else
1230                         close_segment(fs, cnum);
1231         }
1232
1233         /* Fill in the cluster header */
1234         strncpy(wc->chead->idtag, "LaFSHead", 8);
1235         wc->chead->flags = cpu_to_le32(0);
1236
1237         /* checkpoints only happen in cnum==0 */
1238         if (cnum == 0 && fs->checkpointing) {
1239                 spin_lock(&fs->lock);
1240                 wc->chead->flags = cpu_to_le32(fs->checkpointing);
1241                 if (fs->checkpointing & CH_CheckpointEnd) {
1242                         fs->checkpointing = 0;
1243                         clear_bit(CheckpointOpen, &fs->fsstate);
1244                 } else if (fs->checkpointing & CH_CheckpointStart) {
1245                         fs->checkpointcluster = head_addr[0];
1246                         fs->checkpointing &= ~CH_CheckpointStart;
1247                 }
1248                 spin_unlock(&fs->lock);
1249                 if (!fs->checkpointing)
1250                         wake_up(&fs->phase_wait);
1251         }
1252
1253         memcpy(wc->chead->uuid, fs->state->uuid, 16);
1254         wc->chead->seq = cpu_to_le64(wc->cluster_seq);
1255         wc->cluster_seq++;
1256         wc->chead->Hlength = cpu_to_le16(wc->chead_size);
1257         wc->chead->Clength = cpu_to_le16(cluster_size);
1258         spin_lock(&fs->lock);
1259         if (cnum)
1260                 fs->clean_reserved -= cluster_size;
1261         else
1262                 fs->free_blocks -= cluster_size;
1263         spin_unlock(&fs->lock);
1264         /* FIXME if this is just a header, no data blocks,
1265          * then use VerifyNull.  Alternately if there is
1266          * no-one waiting for a sync to complete (how do we
1267          * track that?) use VerifyNext2
1268          */
1269         vtype = VerifyNext;
1270         if (cnum)
1271                 vtype = VerifyNull;
1272         wc->pending_vfy_type[wc->pending_next] = vtype;
1273         wc->chead->verify_type = cpu_to_le16(vtype);
1274         memset(wc->chead->verify_data, 0, 16);
1275
1276         wc->chead->next_addr = cpu_to_le64(seg_addr(fs, &wc->seg));
1277         wc->chead->prev_addr = cpu_to_le64(wc->prev_addr);
1278         wc->prev_addr = head_addr[0];
1279         wc->chead->this_addr = cpu_to_le64(wc->prev_addr);
1280
1281         wc->chead->checksum = lafs_calc_cluster_csum(wc->chead);
1282
1283         /* We cannot write the header until any previous cluster
1284          * which uses the header as a commit block has been
1285          * fully written
1286          */
1287         which = (wc->pending_next+3)%4;
1288         dprintk("AA which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1289                 atomic_read(&wc->pending_cnt[which]));
1290         if (wc->pending_vfy_type[which] == VerifyNext)
1291                 wait_event(wc->pending_wait,
1292                            atomic_read(&wc->pending_cnt[which]) == 1);
1293         which = (which+3) % 4;
1294         dprintk("AB which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1295                 atomic_read(&wc->pending_cnt[which]));
1296         if (wc->pending_vfy_type[which] == VerifyNext2)
1297                 wait_event(wc->pending_wait,
1298                            atomic_read(&wc->pending_cnt[which]) == 1);
1299
1300         lafs_clusters_done(fs);
1301         dprintk("cluster_flush pre-bug pending_next=%d cnt=%d\n",
1302                 wc->pending_next, atomic_read(&wc->pending_cnt
1303                                               [wc->pending_next]));
1304         BUG_ON(atomic_read(&wc->pending_cnt[wc->pending_next]) != 0);
1305         BUG_ON(!list_empty(&wc->pending_blocks[wc->pending_next]));
1306
1307         /* initialise pending count to 1.  This will be decremented
1308          * once all related blocks have been submitted for write.
1309          * This includes the blocks in the next 0, 1, or 2
1310          * write clusters.
1311          */
1312         atomic_inc(&wc->pending_cnt[wc->pending_next]);
1313
1314         /* Now write it all out.
1315          * Later we should possibly re-order the writes
1316          * for raid4 stripe-at-a-time
1317          */
1318         for (i = 0; i < wc->chead_blocks; i++)
1319                 lafs_write_head(fs,
1320                                 page_address(wc->page[wc->pending_next])
1321                                 + i * fs->blocksize,
1322                                 head_addr[i], wc);
1323
1324         while (!list_empty(&wc->clhead)) {
1325                 int credits = 0;
1326                 b = list_entry(wc->clhead.next, struct block, lru);
1327                 dprintk("flushing %s\n", strblk(b));
1328
1329                 if (test_and_clear_bit(B_Realloc, &b->flags))
1330                         credits++;
1331                 if (cnum == 0) {
1332                         if (test_and_clear_bit(B_Dirty, &b->flags))
1333                                 credits++;
1334                 }
1335
1336                 WARN_ON(credits == 0);
1337                 lafs_space_return(fs, credits);
1338                 spin_lock(&fs->lock);
1339                 list_del(&b->lru);
1340                 list_add(&b->lru, &wc->pending_blocks[(wc->pending_next+3)%4]);
1341                 spin_unlock(&fs->lock);
1342                 lafs_write_block(fs, b, wc);
1343                 lafs_refile(b, 0);
1344         }
1345         skip_discard(wc->slhead.next[0]);
1346         /* FIXME write out zeros to pad raid4 if needed */
1347
1348         /* Having submitted this request we need to remove the bias
1349          * from pending_cnt of previous clusters
1350          */
1351         which = wc->pending_next;
1352         wake = 0;
1353         dprintk("A which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1354                 atomic_read(&wc->pending_cnt[which]));
1355         if (wc->pending_vfy_type[which] == VerifyNull)
1356                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1357                         wake = 1;
1358         which = (which+3) % 4;
1359         dprintk("B which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1360                 atomic_read(&wc->pending_cnt[which]));
1361         if (wc->pending_vfy_type[which] == VerifyNext)
1362                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1363                         wake = 1;
1364         which = (which+3) % 4;
1365         dprintk("C which=%d vt=%d pc=%d\n", which, wc->pending_vfy_type[which],
1366                 atomic_read(&wc->pending_cnt[which]));
1367         if (wc->pending_vfy_type[which] == VerifyNext2)
1368                 if (atomic_dec_and_test(&wc->pending_cnt[which]))
1369                         wake = 1;
1370         if (wake) {
1371                 cluster_done(fs, wc);
1372                 wake_up(&wc->pending_wait);
1373         }
1374         dprintk("D %d\n", wake);
1375
1376         lafs_write_flush(fs, wc);
1377
1378         wc->pending_next = (wc->pending_next+1) % 4;
1379         /* now re-initialise the cluster information */
1380         cluster_reset(fs, wc);
1381
1382         wait_event(wc->pending_wait,
1383                    atomic_read(&wc->pending_cnt[wc->pending_next]) == 0);
1384         dprintk("cluster_flush end pending_next=%d cnt=%d\n",
1385                 wc->pending_next, atomic_read(&wc->pending_cnt
1386                                               [wc->pending_next]));
1387 }
1388
1389 void lafs_cluster_flush(struct fs *fs, int cnum)
1390 {
1391         struct wc *wc = &fs->wc[cnum];
1392         dprintk("LAFS_cluster_flush %d\n", cnum);
1393         mutex_lock(&wc->lock);
1394         if (!list_empty(&wc->clhead)
1395             || wc->chead_size > sizeof(struct cluster_head)
1396             || (cnum == 0 &&
1397                 ((fs->checkpointing & CH_CheckpointEnd)
1398                  || test_bit(FlushNeeded, &fs->fsstate)
1399                  || test_bit(SecondFlushNeeded, &fs->fsstate)
1400                         )))
1401                 cluster_flush(fs, cnum);
1402         mutex_unlock(&wc->lock);
1403 }
1404
1405 void lafs_cluster_wait_all(struct fs *fs)
1406 {
1407         int i;
1408         for (i = 0; i < WC_NUM; i++) {
1409                 struct wc *wc = &fs->wc[i];
1410                 int j;
1411                 for (j = 0; j < 4; j++) {
1412                         wait_event(wc->pending_wait,
1413                                    atomic_read(&wc->pending_cnt[j]) <= 1);
1414                 }
1415         }
1416 }
1417
1418 /* The end_io function for writes to a cluster is one
1419  * of 4 depending on which in the circular list the
1420  * block is for.
1421  */
1422 static void cluster_end_io(struct bio *bio, int err,
1423                    int which, int header)
1424 {
1425         /* FIXME how to handle errors? */
1426         struct wc *wc = bio->bi_private;
1427         struct fs *fs = container_of(wc, struct fs, wc[wc->cnum]);
1428         int wake = 0;
1429         int done = 0;
1430
1431         dprintk("end_io err=%d which=%d header=%d\n",
1432                 err, which, header);
1433
1434         if (atomic_dec_and_test(&wc->pending_cnt[which]))
1435                 done++;
1436         if (atomic_read(&wc->pending_cnt[which]) == 1)
1437                 wake = 1;
1438         which = (which+3) % 4;
1439         if (header &&
1440             (wc->pending_vfy_type[which] == VerifyNext ||
1441              wc->pending_vfy_type[which] == VerifyNext2) &&
1442             atomic_dec_and_test(&wc->pending_cnt[which]))
1443                 done++;
1444         if (atomic_read(&wc->pending_cnt[which]) == 1)
1445                 wake = 1;
1446
1447         which = (which+3) % 4;
1448         if (header &&
1449             wc->pending_vfy_type[which] == VerifyNext2 &&
1450             atomic_dec_and_test(&wc->pending_cnt[which]))
1451                 done++;
1452         if (atomic_read(&wc->pending_cnt[which]) == 1)
1453                 wake = 1;
1454
1455         if (done)
1456                 schedule_work(&fs->done_work);
1457         if (done || wake) {
1458                 wake_up(&wc->pending_wait);
1459                 if (test_bit(FlushNeeded, &fs->fsstate) ||
1460                     test_bit(SecondFlushNeeded, &fs->fsstate))
1461                         lafs_wake_thread(fs);
1462         }
1463 }
1464
1465 static void cluster_endio_data_0(struct bio *bio, int err)
1466 {
1467         cluster_end_io(bio, err, 0, 0);
1468 }
1469
1470 static void cluster_endio_data_1(struct bio *bio, int err)
1471 {
1472         cluster_end_io(bio, err, 1, 0);
1473 }
1474
1475 static void cluster_endio_data_2(struct bio *bio, int err)
1476 {
1477         cluster_end_io(bio, err, 2, 0);
1478 }
1479
1480 static void cluster_endio_data_3(struct bio *bio, int err)
1481 {
1482         cluster_end_io(bio, err, 3, 0);
1483 }
1484
1485 static void cluster_endio_header_0(struct bio *bio, int err)
1486 {
1487         cluster_end_io(bio, err, 0, 1);
1488 }
1489
1490 static void cluster_endio_header_1(struct bio *bio, int err)
1491 {
1492         cluster_end_io(bio, err, 1, 1);
1493 }
1494
1495 static void cluster_endio_header_2(struct bio *bio, int err)
1496 {
1497         cluster_end_io(bio, err, 2, 1);
1498 }
1499
1500 static void cluster_endio_header_3(struct bio *bio, int err)
1501 {
1502         cluster_end_io(bio, err, 3, 1);
1503 }
1504
1505 bio_end_io_t *lafs_cluster_endio_choose(int which, int header)
1506 {
1507         if (header)
1508                 if ((which&2) == 0)
1509                         if (which == 0)
1510                                 return cluster_endio_header_0;
1511                         else
1512                                 return cluster_endio_header_1;
1513                 else
1514                         if (which == 2)
1515                                 return cluster_endio_header_2;
1516                         else
1517                                 return cluster_endio_header_3;
1518         else
1519                 if ((which&2) == 0)
1520                         if (which == 0)
1521                                 return cluster_endio_data_0;
1522                         else
1523                                 return cluster_endio_data_1;
1524                 else
1525                         if (which == 2)
1526                                 return cluster_endio_data_2;
1527                         else
1528                                 return cluster_endio_data_3;
1529 }
1530
1531 int lafs_cluster_init(struct fs *fs, int cnum, u64 addr, u64 prev, u64 seq)
1532 {
1533         struct wc *wc = &fs->wc[cnum];
1534         int i;
1535
1536         wc->slhead.b = NULL;
1537         INIT_LIST_HEAD(&wc->clhead);
1538
1539         for (i = 0; i < 4 ; i++) {
1540                 wc->pending_vfy_type[i] = VerifyNull;
1541                 atomic_set(&wc->pending_cnt[i], 0);
1542                 wc->page[i] = alloc_page(GFP_KERNEL);
1543                 if (!wc->page[i])
1544                         break;
1545         }
1546         if (i < 4) {
1547                 while (i > 0) {
1548                         i--;
1549                         put_page(wc->page[i]);
1550                 }
1551                 return -ENOMEM;
1552         }
1553         wc->pending_next = 0;
1554         wc->cluster_seq = seq;
1555         wc->prev_addr = prev;
1556         wc->cnum = cnum;
1557
1558         if (addr) {
1559                 lafs_seg_setpos(fs, &wc->seg, addr);
1560                 cluster_reset(fs, wc);
1561                 BUG_ON(cnum);
1562                 spin_lock(&fs->lock);
1563                 fs->free_blocks += wc->remaining+1;
1564                 spin_unlock(&fs->lock);
1565         }
1566
1567         return 0;
1568 }
1569
1570 void lafs_flush(struct datablock *b)
1571 {
1572         /* Need to write this block out and wait until
1573          * it has been written, so that we can update it
1574          * without risking corruption to previous snapshot.
1575          *
1576          */
1577 }
1578
1579 int lafs_cluster_empty(struct fs *fs, int cnum)
1580 {
1581         return list_empty(&fs->wc[cnum].clhead);
1582 }
1583