diff options
author | 2025-02-28 11:20:22 +0100 | |
---|---|---|
committer | 2025-02-28 11:20:22 +0100 | |
commit | d8e164accbfd8c089ede5ce07c70234a6a5ff2d1 (patch) | |
tree | 019910863e9445ff19297ff2864cad727dde928f | |
parent | Linux 6.14-rc1 (diff) | |
parent | ceph: fix generic/421 test failure (diff) | |
download | wireguard-linux-d8e164accbfd8c089ede5ce07c70234a6a5ff2d1.tar.xz wireguard-linux-d8e164accbfd8c089ede5ce07c70234a6a5ff2d1.zip |
Merge patch series "ceph: fix generic/421 test failure"
Viacheslav Dubeyko <slava@dubeyko.com> says:
From: Viacheslav Dubeyko <Slava.Dubeyko@ibm.com>
The generic/421 fails to finish because of the issue:
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.894678] INFO: task kworker/u48:0:11 blocked for more than 122 seconds.
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895403] Not tainted 6.13.0-rc5+ #1
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.895867] "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896633] task:kworker/u48:0 state:D stack:0 pid:11 tgid:11 ppid:2 flags:0x00004000
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.896641] Workqueue: writeback wb_workfn (flush-ceph-24)
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897614] Call Trace:
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897620] <TASK>
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897629] __schedule+0x443/0x16b0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897637] schedule+0x2b/0x140
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897640] io_schedule+0x4c/0x80
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897643] folio_wait_bit_common+0x11b/0x310
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897646] ? _raw_spin_unlock_irq+0xe/0x50
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897652] ? __pfx_wake_page_function+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897655] __folio_lock+0x17/0x30
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897658] ceph_writepages_start+0xca9/0x1fb0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897663] ? fsnotify_remove_queued_event+0x2f/0x40
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897668] do_writepages+0xd2/0x240
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897672] __writeback_single_inode+0x44/0x350
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897675] writeback_sb_inodes+0x25c/0x550
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897680] wb_writeback+0x89/0x310
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897683] ? finish_task_switch.isra.0+0x97/0x310
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897687] wb_workfn+0xb5/0x410
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897689] process_one_work+0x188/0x3d0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897692] worker_thread+0x2b5/0x3c0
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897694] ? __pfx_worker_thread+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897696] kthread+0xe1/0x120
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897699] ? __pfx_kthread+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897701] ret_from_fork+0x43/0x70
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897705] ? __pfx_kthread+0x10/0x10
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897707] ret_from_fork_asm+0x1a/0x30
Jan 3 14:25:27 ceph-testing-0001 kernel: [ 369.897711] </TASK>
There are several issues here:
(1) ceph_kill_sb() doesn't wait ending of flushing
all dirty folios/pages because of racy nature of
mdsc->stopping_blockers. As a result, mdsc->stopping
becomes CEPH_MDSC_STOPPING_FLUSHED too early.
(2) The ceph_inc_osd_stopping_blocker(fsc->mdsc) fails
to increment mdsc->stopping_blockers. Finally,
already locked folios/pages are never been unlocked
and the logic tries to lock the same page second time.
(3) The folio_batch with found dirty pages by
filemap_get_folios_tag() is not processed properly.
And this is why some number of dirty pages simply never
processed and we have dirty folios/pages after unmount
anyway.
This patchset is refactoring the ceph_writepages_start()
method and it fixes the issues by means of:
(1) introducing dirty_folios counter and flush_end_wq
waiting queue in struct ceph_mds_client;
(2) ceph_dirty_folio() increments the dirty_folios
counter;
(3) writepages_finish() decrements the dirty_folios
counter and wake up all waiters on the queue
if dirty_folios counter is equal or lesser than zero;
(4) adding in ceph_kill_sb() method the logic of
checking the value of dirty_folios counter and
waiting if it is bigger than zero;
(5) adding ceph_inc_osd_stopping_blocker() call in the
beginning of the ceph_writepages_start() and
ceph_dec_osd_stopping_blocker() at the end of
the ceph_writepages_start() with the goal to resolve
the racy nature of mdsc->stopping_blockers.
sudo ./check generic/421
FSTYP -- ceph
PLATFORM -- Linux/x86_64 ceph-testing-0001 6.13.0+ #137 SMP PREEMPT_DYNAMIC Mon Feb 3 20:30:08 UTC 2025
MKFS_OPTIONS -- 127.0.0.1:40551:/scratch
MOUNT_OPTIONS -- -o name=fs,secret=<secret>,ms_mode=crc,nowsync,copyfrom 127.0.0.1:40551:/scratch /mnt/scratch
generic/421 7s ... 4s
Ran: generic/421
Passed all 1 tests
* patches from https://lore.kernel.org/r/20250205000249.123054-1-slava@dubeyko.com:
ceph: fix generic/421 test failure
ceph: introduce ceph_submit_write() method
ceph: introduce ceph_process_folio_batch() method
ceph: extend ceph_writeback_ctl for ceph_writepages_start() refactoring
Link: https://lore.kernel.org/r/20250205000249.123054-1-slava@dubeyko.com
Signed-off-by: Christian Brauner <brauner@kernel.org>
-rw-r--r-- | fs/ceph/addr.c | 1110 | ||||
-rw-r--r-- | fs/ceph/mds_client.c | 2 | ||||
-rw-r--r-- | fs/ceph/mds_client.h | 3 | ||||
-rw-r--r-- | fs/ceph/super.c | 11 |
4 files changed, 746 insertions, 380 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index f5224a566b69..d82ce4867fca 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -82,6 +82,7 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) { struct inode *inode = mapping->host; struct ceph_client *cl = ceph_inode_to_client(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_inode_info *ci; struct ceph_snap_context *snapc; @@ -92,6 +93,8 @@ static bool ceph_dirty_folio(struct address_space *mapping, struct folio *folio) return false; } + atomic64_inc(&mdsc->dirty_folios); + ci = ceph_inode(inode); /* dirty the head */ @@ -568,7 +571,36 @@ struct ceph_writeback_ctl u64 truncate_size; u32 truncate_seq; bool size_stable; + bool head_snapc; + struct ceph_snap_context *snapc; + struct ceph_snap_context *last_snapc; + + bool done; + bool should_loop; + bool range_whole; + pgoff_t start_index; + pgoff_t index; + pgoff_t end; + xa_mark_t tag; + + pgoff_t strip_unit_end; + unsigned int wsize; + unsigned int nr_folios; + unsigned int max_pages; + unsigned int locked_pages; + + int op_idx; + int num_ops; + u64 offset; + u64 len; + + struct folio_batch fbatch; + unsigned int processed_in_fbatch; + + bool from_pool; + struct page **pages; + struct page **data_pages; }; /* @@ -865,6 +897,7 @@ static void writepages_finish(struct ceph_osd_request *req) struct ceph_snap_context *snapc = req->r_snapc; struct address_space *mapping = inode->i_mapping; struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); unsigned int len = 0; bool remove_page; @@ -920,6 +953,12 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_put_snap_context(detach_page_private(page)); end_page_writeback(page); + + if (atomic64_dec_return(&mdsc->dirty_folios) <= 0) { + wake_up_all(&mdsc->flush_end_wq); + WARN_ON(atomic64_read(&mdsc->dirty_folios) < 0); + } + doutc(cl, "unlocking %p\n", page); if (remove_page) @@ -949,36 +988,13 @@ static void writepages_finish(struct ceph_osd_request *req) ceph_dec_osd_stopping_blocker(fsc->mdsc); } -/* - * initiate async writeback - */ -static int ceph_writepages_start(struct address_space *mapping, - struct writeback_control *wbc) +static inline +bool is_forced_umount(struct address_space *mapping) { struct inode *inode = mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); struct ceph_client *cl = fsc->client; - struct ceph_vino vino = ceph_vino(inode); - pgoff_t index, start_index, end = -1; - struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc; - struct folio_batch fbatch; - int rc = 0; - unsigned int wsize = i_blocksize(inode); - struct ceph_osd_request *req = NULL; - struct ceph_writeback_ctl ceph_wbc; - bool should_loop, range_whole = false; - bool done = false; - bool caching = ceph_is_cache_enabled(inode); - xa_mark_t tag; - - if (wbc->sync_mode == WB_SYNC_NONE && - fsc->write_congested) - return 0; - - doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), - wbc->sync_mode == WB_SYNC_NONE ? "NONE" : - (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); if (ceph_inode_is_shutdown(inode)) { if (ci->i_wrbuffer_ref > 0) { @@ -987,388 +1003,739 @@ static int ceph_writepages_start(struct address_space *mapping, ceph_vinop(inode), ceph_ino(inode)); } mapping_set_error(mapping, -EIO); - return -EIO; /* we're in a forced umount, don't write! */ + return true; } + + return false; +} + +static inline +unsigned int ceph_define_write_size(struct address_space *mapping) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + unsigned int wsize = i_blocksize(inode); + if (fsc->mount_options->wsize < wsize) wsize = fsc->mount_options->wsize; - folio_batch_init(&fbatch); + return wsize; +} + +static inline +void ceph_folio_batch_init(struct ceph_writeback_ctl *ceph_wbc) +{ + folio_batch_init(&ceph_wbc->fbatch); + ceph_wbc->processed_in_fbatch = 0; +} + +static inline +void ceph_folio_batch_reinit(struct ceph_writeback_ctl *ceph_wbc) +{ + folio_batch_release(&ceph_wbc->fbatch); + ceph_folio_batch_init(ceph_wbc); +} + +static inline +void ceph_init_writeback_ctl(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + ceph_wbc->snapc = NULL; + ceph_wbc->last_snapc = NULL; + + ceph_wbc->strip_unit_end = 0; + ceph_wbc->wsize = ceph_define_write_size(mapping); - start_index = wbc->range_cyclic ? mapping->writeback_index : 0; - index = start_index; + ceph_wbc->nr_folios = 0; + ceph_wbc->max_pages = 0; + ceph_wbc->locked_pages = 0; + + ceph_wbc->done = false; + ceph_wbc->should_loop = false; + ceph_wbc->range_whole = false; + + ceph_wbc->start_index = wbc->range_cyclic ? mapping->writeback_index : 0; + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) { - tag = PAGECACHE_TAG_TOWRITE; + ceph_wbc->tag = PAGECACHE_TAG_TOWRITE; } else { - tag = PAGECACHE_TAG_DIRTY; + ceph_wbc->tag = PAGECACHE_TAG_DIRTY; } -retry: + + ceph_wbc->op_idx = -1; + ceph_wbc->num_ops = 0; + ceph_wbc->offset = 0; + ceph_wbc->len = 0; + ceph_wbc->from_pool = false; + + ceph_folio_batch_init(ceph_wbc); + + ceph_wbc->pages = NULL; + ceph_wbc->data_pages = NULL; +} + +static inline +int ceph_define_writeback_range(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + /* find oldest snap context with dirty data */ - snapc = get_oldest_context(inode, &ceph_wbc, NULL); - if (!snapc) { + ceph_wbc->snapc = get_oldest_context(inode, ceph_wbc, NULL); + if (!ceph_wbc->snapc) { /* hmm, why does writepages get called when there is no dirty data? */ doutc(cl, " no snap context with dirty data?\n"); - goto out; + return -ENODATA; } - doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", snapc, - snapc->seq, snapc->num_snaps); - should_loop = false; - if (ceph_wbc.head_snapc && snapc != last_snapc) { + doutc(cl, " oldest snapc is %p seq %lld (%d snaps)\n", + ceph_wbc->snapc, ceph_wbc->snapc->seq, + ceph_wbc->snapc->num_snaps); + + ceph_wbc->should_loop = false; + + if (ceph_wbc->head_snapc && ceph_wbc->snapc != ceph_wbc->last_snapc) { /* where to start/end? */ if (wbc->range_cyclic) { - index = start_index; - end = -1; - if (index > 0) - should_loop = true; - doutc(cl, " cyclic, start at %lu\n", index); + ceph_wbc->index = ceph_wbc->start_index; + ceph_wbc->end = -1; + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; + doutc(cl, " cyclic, start at %lu\n", ceph_wbc->index); } else { - index = wbc->range_start >> PAGE_SHIFT; - end = wbc->range_end >> PAGE_SHIFT; + ceph_wbc->index = wbc->range_start >> PAGE_SHIFT; + ceph_wbc->end = wbc->range_end >> PAGE_SHIFT; if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX) - range_whole = true; - doutc(cl, " not cyclic, %lu to %lu\n", index, end); + ceph_wbc->range_whole = true; + doutc(cl, " not cyclic, %lu to %lu\n", + ceph_wbc->index, ceph_wbc->end); } - } else if (!ceph_wbc.head_snapc) { + } else if (!ceph_wbc->head_snapc) { /* Do not respect wbc->range_{start,end}. Dirty pages * in that range can be associated with newer snapc. * They are not writeable until we write all dirty pages * associated with 'snapc' get written */ - if (index > 0) - should_loop = true; + if (ceph_wbc->index > 0) + ceph_wbc->should_loop = true; doutc(cl, " non-head snapc, range whole\n"); } - if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) - tag_pages_for_writeback(mapping, index, end); + ceph_put_snap_context(ceph_wbc->last_snapc); + ceph_wbc->last_snapc = ceph_wbc->snapc; - ceph_put_snap_context(last_snapc); - last_snapc = snapc; + return 0; +} - while (!done && index <= end) { - int num_ops = 0, op_idx; - unsigned i, nr_folios, max_pages, locked_pages = 0; - struct page **pages = NULL, **data_pages; - struct page *page; - pgoff_t strip_unit_end = 0; - u64 offset = 0, len = 0; - bool from_pool = false; +static inline +bool has_writeback_done(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->done && ceph_wbc->index > ceph_wbc->end; +} - max_pages = wsize >> PAGE_SHIFT; +static inline +bool can_next_page_be_processed(struct ceph_writeback_ctl *ceph_wbc, + unsigned index) +{ + return index < ceph_wbc->nr_folios && + ceph_wbc->locked_pages < ceph_wbc->max_pages; +} -get_more_pages: - nr_folios = filemap_get_folios_tag(mapping, &index, - end, tag, &fbatch); - doutc(cl, "pagevec_lookup_range_tag got %d\n", nr_folios); - if (!nr_folios && !locked_pages) - break; - for (i = 0; i < nr_folios && locked_pages < max_pages; i++) { - struct folio *folio = fbatch.folios[i]; +static +int ceph_check_page_before_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct folio *folio) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_snap_context *pgsnapc; + struct page *page = &folio->page; - page = &folio->page; - doutc(cl, "? %p idx %lu\n", page, page->index); - if (locked_pages == 0) - lock_page(page); /* first page */ - else if (!trylock_page(page)) - break; + /* only dirty pages, or our accounting breaks */ + if (unlikely(!PageDirty(page)) || unlikely(page->mapping != mapping)) { + doutc(cl, "!dirty or !mapping %p\n", page); + return -ENODATA; + } - /* only dirty pages, or our accounting breaks */ - if (unlikely(!PageDirty(page)) || - unlikely(page->mapping != mapping)) { - doutc(cl, "!dirty or !mapping %p\n", page); - unlock_page(page); - continue; - } - /* only if matching snap context */ - pgsnapc = page_snap_context(page); - if (pgsnapc != snapc) { - doutc(cl, "page snapc %p %lld != oldest %p %lld\n", - pgsnapc, pgsnapc->seq, snapc, snapc->seq); - if (!should_loop && - !ceph_wbc.head_snapc && - wbc->sync_mode != WB_SYNC_NONE) - should_loop = true; - unlock_page(page); - continue; - } - if (page_offset(page) >= ceph_wbc.i_size) { - doutc(cl, "folio at %lu beyond eof %llu\n", - folio->index, ceph_wbc.i_size); - if ((ceph_wbc.size_stable || - folio_pos(folio) >= i_size_read(inode)) && - folio_clear_dirty_for_io(folio)) - folio_invalidate(folio, 0, - folio_size(folio)); - folio_unlock(folio); - continue; + /* only if matching snap context */ + pgsnapc = page_snap_context(page); + if (pgsnapc != ceph_wbc->snapc) { + doutc(cl, "page snapc %p %lld != oldest %p %lld\n", + pgsnapc, pgsnapc->seq, + ceph_wbc->snapc, ceph_wbc->snapc->seq); + + if (!ceph_wbc->should_loop && !ceph_wbc->head_snapc && + wbc->sync_mode != WB_SYNC_NONE) + ceph_wbc->should_loop = true; + + return -ENODATA; + } + + if (page_offset(page) >= ceph_wbc->i_size) { + doutc(cl, "folio at %lu beyond eof %llu\n", + folio->index, ceph_wbc->i_size); + + if ((ceph_wbc->size_stable || + folio_pos(folio) >= i_size_read(inode)) && + folio_clear_dirty_for_io(folio)) + folio_invalidate(folio, 0, folio_size(folio)); + + return -ENODATA; + } + + if (ceph_wbc->strip_unit_end && + (page->index > ceph_wbc->strip_unit_end)) { + doutc(cl, "end of strip unit %p\n", page); + return -E2BIG; + } + + return 0; +} + +static inline +void __ceph_allocate_page_array(struct ceph_writeback_ctl *ceph_wbc, + unsigned int max_pages) +{ + ceph_wbc->pages = kmalloc_array(max_pages, + sizeof(*ceph_wbc->pages), + GFP_NOFS); + if (!ceph_wbc->pages) { + ceph_wbc->from_pool = true; + ceph_wbc->pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); + BUG_ON(!ceph_wbc->pages); + } +} + +static inline +void ceph_allocate_page_array(struct address_space *mapping, + struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + u64 objnum; + u64 objoff; + u32 xlen; + + /* prepare async write request */ + ceph_wbc->offset = (u64)page_offset(page); + ceph_calc_file_object_mapping(&ci->i_layout, + ceph_wbc->offset, ceph_wbc->wsize, + &objnum, &objoff, &xlen); + + ceph_wbc->num_ops = 1; + ceph_wbc->strip_unit_end = page->index + ((xlen - 1) >> PAGE_SHIFT); + + BUG_ON(ceph_wbc->pages); + ceph_wbc->max_pages = calc_pages_for(0, (u64)xlen); + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->max_pages); + + ceph_wbc->len = 0; +} + +static inline +bool is_page_index_contiguous(struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + return page->index == (ceph_wbc->offset + ceph_wbc->len) >> PAGE_SHIFT; +} + +static inline +bool is_num_ops_too_big(struct ceph_writeback_ctl *ceph_wbc) +{ + return ceph_wbc->num_ops >= + (ceph_wbc->from_pool ? CEPH_OSD_SLAB_OPS : CEPH_OSD_MAX_OPS); +} + +static inline +bool is_write_congestion_happened(struct ceph_fs_client *fsc) +{ + return atomic_long_inc_return(&fsc->writeback_count) > + CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb); +} + +static inline +int ceph_move_dirty_page_in_page_array(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc, + struct page *page) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct page **pages = ceph_wbc->pages; + unsigned int index = ceph_wbc->locked_pages; + gfp_t gfp_flags = ceph_wbc->locked_pages ? GFP_NOWAIT : GFP_NOFS; + + if (IS_ENCRYPTED(inode)) { + pages[index] = fscrypt_encrypt_pagecache_blocks(page, + PAGE_SIZE, + 0, + gfp_flags); + if (IS_ERR(pages[index])) { + if (PTR_ERR(pages[index]) == -EINVAL) { + pr_err_client(cl, "inode->i_blkbits=%hhu\n", + inode->i_blkbits); } - if (strip_unit_end && (page->index > strip_unit_end)) { - doutc(cl, "end of strip unit %p\n", page); + + /* better not fail on first page! */ + BUG_ON(ceph_wbc->locked_pages == 0); + + pages[index] = NULL; + return PTR_ERR(pages[index]); + } + } else { + pages[index] = page; + } + + ceph_wbc->locked_pages++; + + return 0; +} + +static +int ceph_process_folio_batch(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct folio *folio = NULL; + struct page *page = NULL; + unsigned i; + int rc = 0; + + for (i = 0; can_next_page_be_processed(ceph_wbc, i); i++) { + folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + page = &folio->page; + + doutc(cl, "? %p idx %lu, folio_test_writeback %#x, " + "folio_test_dirty %#x, folio_test_locked %#x\n", + page, page->index, folio_test_writeback(folio), + folio_test_dirty(folio), + folio_test_locked(folio)); + + if (folio_test_writeback(folio) || + folio_test_private_2(folio) /* [DEPRECATED] */) { + doutc(cl, "waiting on writeback %p\n", folio); + folio_wait_writeback(folio); + folio_wait_private_2(folio); /* [DEPRECATED] */ + continue; + } + + if (ceph_wbc->locked_pages == 0) + lock_page(page); /* first page */ + else if (!trylock_page(page)) + break; + + rc = ceph_check_page_before_write(mapping, wbc, + ceph_wbc, folio); + if (rc == -ENODATA) { + rc = 0; + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } else if (rc == -E2BIG) { + rc = 0; + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + break; + } + + if (!clear_page_dirty_for_io(page)) { + doutc(cl, "%p !clear_page_dirty_for_io\n", page); + unlock_page(page); + ceph_wbc->fbatch.folios[i] = NULL; + continue; + } + + /* + * We have something to write. If this is + * the first locked page this time through, + * calculate max possible write size and + * allocate a page array + */ + if (ceph_wbc->locked_pages == 0) { + ceph_allocate_page_array(mapping, ceph_wbc, page); + } else if (!is_page_index_contiguous(ceph_wbc, page)) { + if (is_num_ops_too_big(ceph_wbc)) { + redirty_page_for_writepage(wbc, page); unlock_page(page); break; } - if (folio_test_writeback(folio) || - folio_test_private_2(folio) /* [DEPRECATED] */) { - if (wbc->sync_mode == WB_SYNC_NONE) { - doutc(cl, "%p under writeback\n", folio); - folio_unlock(folio); - continue; - } - doutc(cl, "waiting on writeback %p\n", folio); - folio_wait_writeback(folio); - folio_wait_private_2(folio); /* [DEPRECATED] */ - } - if (!clear_page_dirty_for_io(page)) { - doutc(cl, "%p !clear_page_dirty_for_io\n", page); - unlock_page(page); - continue; - } + ceph_wbc->num_ops++; + ceph_wbc->offset = (u64)page_offset(page); + ceph_wbc->len = 0; + } - /* - * We have something to write. If this is - * the first locked page this time through, - * calculate max possinle write size and - * allocate a page array - */ - if (locked_pages == 0) { - u64 objnum; - u64 objoff; - u32 xlen; - - /* prepare async write request */ - offset = (u64)page_offset(page); - ceph_calc_file_object_mapping(&ci->i_layout, - offset, wsize, - &objnum, &objoff, - &xlen); - len = xlen; - - num_ops = 1; - strip_unit_end = page->index + - ((len - 1) >> PAGE_SHIFT); - - BUG_ON(pages); - max_pages = calc_pages_for(0, (u64)len); - pages = kmalloc_array(max_pages, - sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); - } - - len = 0; - } else if (page->index != - (offset + len) >> PAGE_SHIFT) { - if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS : - CEPH_OSD_MAX_OPS)) { - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - - num_ops++; - offset = (u64)page_offset(page); - len = 0; - } + /* note position of first page in fbatch */ + doutc(cl, "%llx.%llx will write page %p idx %lu\n", + ceph_vinop(inode), page, page->index); - /* note position of first page in fbatch */ - doutc(cl, "%llx.%llx will write page %p idx %lu\n", - ceph_vinop(inode), page, page->index); - - if (atomic_long_inc_return(&fsc->writeback_count) > - CONGESTION_ON_THRESH( - fsc->mount_options->congestion_kb)) - fsc->write_congested = true; - - if (IS_ENCRYPTED(inode)) { - pages[locked_pages] = - fscrypt_encrypt_pagecache_blocks(page, - PAGE_SIZE, 0, - locked_pages ? GFP_NOWAIT : GFP_NOFS); - if (IS_ERR(pages[locked_pages])) { - if (PTR_ERR(pages[locked_pages]) == -EINVAL) - pr_err_client(cl, - "inode->i_blkbits=%hhu\n", - inode->i_blkbits); - /* better not fail on first page! */ - BUG_ON(locked_pages == 0); - pages[locked_pages] = NULL; - redirty_page_for_writepage(wbc, page); - unlock_page(page); - break; - } - ++locked_pages; - } else { - pages[locked_pages++] = page; - } + fsc->write_congested = is_write_congestion_happened(fsc); - fbatch.folios[i] = NULL; - len += thp_size(page); + rc = ceph_move_dirty_page_in_page_array(mapping, wbc, + ceph_wbc, page); + if (rc) { + redirty_page_for_writepage(wbc, page); + unlock_page(page); + break; } - /* did we get anything? */ - if (!locked_pages) - goto release_folios; - if (i) { - unsigned j, n = 0; - /* shift unused page to beginning of fbatch */ - for (j = 0; j < nr_folios; j++) { - if (!fbatch.folios[j]) - continue; - if (n < j) - fbatch.folios[n] = fbatch.folios[j]; - n++; - } - fbatch.nr = n; + ceph_wbc->fbatch.folios[i] = NULL; + ceph_wbc->len += thp_size(page); + } - if (nr_folios && i == nr_folios && - locked_pages < max_pages) { - doutc(cl, "reached end fbatch, trying for more\n"); - folio_batch_release(&fbatch); - goto get_more_pages; - } + ceph_wbc->processed_in_fbatch = i; + + return rc; +} + +static inline +void ceph_shift_unused_folios_left(struct folio_batch *fbatch) +{ + unsigned j, n = 0; + + /* shift unused page to beginning of fbatch */ + for (j = 0; j < folio_batch_count(fbatch); j++) { + if (!fbatch->folios[j]) + continue; + + if (n < j) { + fbatch->folios[n] = fbatch->folios[j]; } + n++; + } + + fbatch->nr = n; +} + +static +int ceph_submit_write(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct inode *inode = mapping->host; + struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_vino vino = ceph_vino(inode); + struct ceph_osd_request *req = NULL; + struct page *page = NULL; + bool caching = ceph_is_cache_enabled(inode); + u64 offset; + u64 len; + unsigned i; + new_request: - offset = ceph_fscrypt_page_offset(pages[0]); - len = wsize; + offset = ceph_fscrypt_page_offset(ceph_wbc->pages[0]); + len = ceph_wbc->wsize; + req = ceph_osdc_new_request(&fsc->client->osdc, + &ci->i_layout, vino, + offset, &len, 0, ceph_wbc->num_ops, + CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, + ceph_wbc->snapc, ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, false); + if (IS_ERR(req)) { req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - offset, &len, 0, num_ops, - CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, false); - if (IS_ERR(req)) { - req = ceph_osdc_new_request(&fsc->client->osdc, - &ci->i_layout, vino, - offset, &len, 0, - min(num_ops, - CEPH_OSD_SLAB_OPS), - CEPH_OSD_OP_WRITE, - CEPH_OSD_FLAG_WRITE, - snapc, ceph_wbc.truncate_seq, - ceph_wbc.truncate_size, true); - BUG_ON(IS_ERR(req)); + &ci->i_layout, vino, + offset, &len, 0, + min(ceph_wbc->num_ops, + CEPH_OSD_SLAB_OPS), + CEPH_OSD_OP_WRITE, + CEPH_OSD_FLAG_WRITE, + ceph_wbc->snapc, + ceph_wbc->truncate_seq, + ceph_wbc->truncate_size, + true); + BUG_ON(IS_ERR(req)); + } + + page = ceph_wbc->pages[ceph_wbc->locked_pages - 1]; + BUG_ON(len < ceph_fscrypt_page_offset(page) + thp_size(page) - offset); + + if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { + for (i = 0; i < folio_batch_count(&ceph_wbc->fbatch); i++) { + struct folio *folio = ceph_wbc->fbatch.folios[i]; + + if (!folio) + continue; + + page = &folio->page; + redirty_page_for_writepage(wbc, page); + unlock_page(page); } - BUG_ON(len < ceph_fscrypt_page_offset(pages[locked_pages - 1]) + - thp_size(pages[locked_pages - 1]) - offset); - if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { - rc = -EIO; - goto release_folios; + for (i = 0; i < ceph_wbc->locked_pages; i++) { + page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); + + if (!page) + continue; + + redirty_page_for_writepage(wbc, page); + unlock_page(page); } - req->r_callback = writepages_finish; - req->r_inode = inode; - - /* Format the osd request message and submit the write */ - len = 0; - data_pages = pages; - op_idx = 0; - for (i = 0; i < locked_pages; i++) { - struct page *page = ceph_fscrypt_pagecache_page(pages[i]); - - u64 cur_offset = page_offset(page); - /* - * Discontinuity in page range? Ceph can handle that by just passing - * multiple extents in the write op. - */ - if (offset + len != cur_offset) { - /* If it's full, stop here */ - if (op_idx + 1 == req->r_num_ops) - break; - - /* Kick off an fscache write with what we have so far. */ - ceph_fscache_write_to_cache(inode, offset, len, caching); - - /* Start a new extent */ - osd_req_op_extent_dup_last(req, op_idx, - cur_offset - offset); - doutc(cl, "got pages at %llu~%llu\n", offset, - len); - osd_req_op_extent_osd_data_pages(req, op_idx, - data_pages, len, 0, - from_pool, false); - osd_req_op_extent_update(req, op_idx, len); - - len = 0; - offset = cur_offset; - data_pages = pages + i; - op_idx++; - } - set_page_writeback(page); - if (caching) - ceph_set_page_fscache(page); - len += thp_size(page); + ceph_osdc_put_request(req); + return -EIO; + } + + req->r_callback = writepages_finish; + req->r_inode = inode; + + /* Format the osd request message and submit the write */ + len = 0; + ceph_wbc->data_pages = ceph_wbc->pages; + ceph_wbc->op_idx = 0; + for (i = 0; i < ceph_wbc->locked_pages; i++) { + u64 cur_offset; + + page = ceph_fscrypt_pagecache_page(ceph_wbc->pages[i]); + cur_offset = page_offset(page); + + /* + * Discontinuity in page range? Ceph can handle that by just passing + * multiple extents in the write op. + */ + if (offset + len != cur_offset) { + /* If it's full, stop here */ + if (ceph_wbc->op_idx + 1 == req->r_num_ops) + break; + + /* Kick off an fscache write with what we have so far. */ + ceph_fscache_write_to_cache(inode, offset, len, caching); + + /* Start a new extent */ + osd_req_op_extent_dup_last(req, ceph_wbc->op_idx, + cur_offset - offset); + + doutc(cl, "got pages at %llu~%llu\n", offset, len); + + osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, + ceph_wbc->data_pages, + len, 0, + ceph_wbc->from_pool, + false); + osd_req_op_extent_update(req, ceph_wbc->op_idx, len); + + len = 0; + offset = cur_offset; + ceph_wbc->data_pages = ceph_wbc->pages + i; + ceph_wbc->op_idx++; } - ceph_fscache_write_to_cache(inode, offset, len, caching); - - if (ceph_wbc.size_stable) { - len = min(len, ceph_wbc.i_size - offset); - } else if (i == locked_pages) { - /* writepages_finish() clears writeback pages - * according to the data length, so make sure - * data length covers all locked pages */ - u64 min_len = len + 1 - thp_size(page); - len = get_writepages_data_length(inode, pages[i - 1], - offset); - len = max(len, min_len); + + set_page_writeback(page); + + if (caching) + ceph_set_page_fscache(page); + + len += thp_size(page); + } + + ceph_fscache_write_to_cache(inode, offset, len, caching); + + if (ceph_wbc->size_stable) { + len = min(len, ceph_wbc->i_size - offset); + } else if (i == ceph_wbc->locked_pages) { + /* writepages_finish() clears writeback pages + * according to the data length, so make sure + * data length covers all locked pages */ + u64 min_len = len + 1 - thp_size(page); + len = get_writepages_data_length(inode, + ceph_wbc->pages[i - 1], + offset); + len = max(len, min_len); + } + + if (IS_ENCRYPTED(inode)) + len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); + + doutc(cl, "got pages at %llu~%llu\n", offset, len); + + if (IS_ENCRYPTED(inode) && + ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) { + pr_warn_client(cl, + "bad encrypted write offset=%lld len=%llu\n", + offset, len); + } + + osd_req_op_extent_osd_data_pages(req, ceph_wbc->op_idx, + ceph_wbc->data_pages, len, + 0, ceph_wbc->from_pool, false); + osd_req_op_extent_update(req, ceph_wbc->op_idx, len); + + BUG_ON(ceph_wbc->op_idx + 1 != req->r_num_ops); + + ceph_wbc->from_pool = false; + if (i < ceph_wbc->locked_pages) { + BUG_ON(ceph_wbc->num_ops <= req->r_num_ops); + ceph_wbc->num_ops -= req->r_num_ops; + ceph_wbc->locked_pages -= i; + + /* allocate new pages array for next request */ + ceph_wbc->data_pages = ceph_wbc->pages; + __ceph_allocate_page_array(ceph_wbc, ceph_wbc->locked_pages); + memcpy(ceph_wbc->pages, ceph_wbc->data_pages + i, + ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); + memset(ceph_wbc->data_pages + i, 0, + ceph_wbc->locked_pages * sizeof(*ceph_wbc->pages)); + } else { + BUG_ON(ceph_wbc->num_ops != req->r_num_ops); + /* request message now owns the pages array */ + ceph_wbc->pages = NULL; + } + + req->r_mtime = inode_get_mtime(inode); + ceph_osdc_start_request(&fsc->client->osdc, req); + req = NULL; + + wbc->nr_to_write -= i; + if (ceph_wbc->pages) + goto new_request; + + return 0; +} + +static +void ceph_wait_until_current_writes_complete(struct address_space *mapping, + struct writeback_control *wbc, + struct ceph_writeback_ctl *ceph_wbc) +{ + struct page *page; + unsigned i, nr; + + if (wbc->sync_mode != WB_SYNC_NONE && + ceph_wbc->start_index == 0 && /* all dirty pages were checked */ + !ceph_wbc->head_snapc) { + ceph_wbc->index = 0; + + while ((ceph_wbc->index <= ceph_wbc->end) && + (nr = filemap_get_folios_tag(mapping, + &ceph_wbc->index, + (pgoff_t)-1, + PAGECACHE_TAG_WRITEBACK, + &ceph_wbc->fbatch))) { + for (i = 0; i < nr; i++) { + page = &ceph_wbc->fbatch.folios[i]->page; + if (page_snap_context(page) != ceph_wbc->snapc) + continue; + wait_on_page_writeback(page); + } + + folio_batch_release(&ceph_wbc->fbatch); + cond_resched(); } - if (IS_ENCRYPTED(inode)) - len = round_up(len, CEPH_FSCRYPT_BLOCK_SIZE); + } +} - doutc(cl, "got pages at %llu~%llu\n", offset, len); +/* + * initiate async writeback + */ +static int ceph_writepages_start(struct address_space *mapping, + struct writeback_control *wbc) +{ + struct inode *inode = mapping->host; + struct ceph_fs_client *fsc = ceph_inode_to_fs_client(inode); + struct ceph_client *cl = fsc->client; + struct ceph_writeback_ctl ceph_wbc; + int rc = 0; - if (IS_ENCRYPTED(inode) && - ((offset | len) & ~CEPH_FSCRYPT_BLOCK_MASK)) - pr_warn_client(cl, - "bad encrypted write offset=%lld len=%llu\n", - offset, len); - - osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len, - 0, from_pool, false); - osd_req_op_extent_update(req, op_idx, len); - - BUG_ON(op_idx + 1 != req->r_num_ops); - - from_pool = false; - if (i < locked_pages) { - BUG_ON(num_ops <= req->r_num_ops); - num_ops -= req->r_num_ops; - locked_pages -= i; - - /* allocate new pages array for next request */ - data_pages = pages; - pages = kmalloc_array(locked_pages, sizeof(*pages), - GFP_NOFS); - if (!pages) { - from_pool = true; - pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS); - BUG_ON(!pages); + if (wbc->sync_mode == WB_SYNC_NONE && fsc->write_congested) + return 0; + + doutc(cl, "%llx.%llx (mode=%s)\n", ceph_vinop(inode), + wbc->sync_mode == WB_SYNC_NONE ? "NONE" : + (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD")); + + if (is_forced_umount(mapping)) { + /* we're in a forced umount, don't write! */ + return -EIO; + } + + ceph_init_writeback_ctl(mapping, wbc, &ceph_wbc); + + if (!ceph_inc_osd_stopping_blocker(fsc->mdsc)) { + rc = -EIO; + goto out; + } + +retry: + rc = ceph_define_writeback_range(mapping, wbc, &ceph_wbc); + if (rc == -ENODATA) { + /* hmm, why does writepages get called when there + is no dirty data? */ + rc = 0; + goto dec_osd_stopping_blocker; + } + + if (wbc->sync_mode == WB_SYNC_ALL || wbc->tagged_writepages) + tag_pages_for_writeback(mapping, ceph_wbc.index, ceph_wbc.end); + + while (!has_writeback_done(&ceph_wbc)) { + ceph_wbc.locked_pages = 0; + ceph_wbc.max_pages = ceph_wbc.wsize >> PAGE_SHIFT; + +get_more_pages: + ceph_folio_batch_reinit(&ceph_wbc); + + ceph_wbc.nr_folios = filemap_get_folios_tag(mapping, + &ceph_wbc.index, + ceph_wbc.end, + ceph_wbc.tag, + &ceph_wbc.fbatch); + doutc(cl, "pagevec_lookup_range_tag for tag %#x got %d\n", + ceph_wbc.tag, ceph_wbc.nr_folios); + + if (!ceph_wbc.nr_folios && !ceph_wbc.locked_pages) + break; + +process_folio_batch: + rc = ceph_process_folio_batch(mapping, wbc, &ceph_wbc); + if (rc) + goto release_folios; + + /* did we get anything? */ + if (!ceph_wbc.locked_pages) + goto release_folios; + + if (ceph_wbc.processed_in_fbatch) { + ceph_shift_unused_folios_left(&ceph_wbc.fbatch); + + if (folio_batch_count(&ceph_wbc.fbatch) == 0 && + ceph_wbc.locked_pages < ceph_wbc.max_pages) { + doutc(cl, "reached end fbatch, trying for more\n"); + goto get_more_pages; } - memcpy(pages, data_pages + i, - locked_pages * sizeof(*pages)); - memset(data_pages + i, 0, - locked_pages * sizeof(*pages)); - } else { - BUG_ON(num_ops != req->r_num_ops); - index = pages[i - 1]->index + 1; - /* request message now owns the pages array */ - pages = NULL; } - req->r_mtime = inode_get_mtime(inode); - ceph_osdc_start_request(&fsc->client->osdc, req); - req = NULL; + rc = ceph_submit_write(mapping, wbc, &ceph_wbc); + if (rc) + goto release_folios; - wbc->nr_to_write -= i; - if (pages) - goto new_request; + ceph_wbc.locked_pages = 0; + ceph_wbc.strip_unit_end = 0; + + if (folio_batch_count(&ceph_wbc.fbatch) > 0) { + ceph_wbc.nr_folios = + folio_batch_count(&ceph_wbc.fbatch); + goto process_folio_batch; + } /* * We stop writing back only if we are not doing @@ -1377,61 +1744,44 @@ new_request: * we tagged for writeback prior to entering this loop. */ if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE) - done = true; + ceph_wbc.done = true; release_folios: doutc(cl, "folio_batch release on %d folios (%p)\n", - (int)fbatch.nr, fbatch.nr ? fbatch.folios[0] : NULL); - folio_batch_release(&fbatch); + (int)ceph_wbc.fbatch.nr, + ceph_wbc.fbatch.nr ? ceph_wbc.fbatch.folios[0] : NULL); + folio_batch_release(&ceph_wbc.fbatch); } - if (should_loop && !done) { + if (ceph_wbc.should_loop && !ceph_wbc.done) { /* more to do; loop back to beginning of file */ doutc(cl, "looping back to beginning of file\n"); - end = start_index - 1; /* OK even when start_index == 0 */ + /* OK even when start_index == 0 */ + ceph_wbc.end = ceph_wbc.start_index - 1; /* to write dirty pages associated with next snapc, * we need to wait until current writes complete */ - if (wbc->sync_mode != WB_SYNC_NONE && - start_index == 0 && /* all dirty pages were checked */ - !ceph_wbc.head_snapc) { - struct page *page; - unsigned i, nr; - index = 0; - while ((index <= end) && - (nr = filemap_get_folios_tag(mapping, &index, - (pgoff_t)-1, - PAGECACHE_TAG_WRITEBACK, - &fbatch))) { - for (i = 0; i < nr; i++) { - page = &fbatch.folios[i]->page; - if (page_snap_context(page) != snapc) - continue; - wait_on_page_writeback(page); - } - folio_batch_release(&fbatch); - cond_resched(); - } - } + ceph_wait_until_current_writes_complete(mapping, wbc, &ceph_wbc); - start_index = 0; - index = 0; + ceph_wbc.start_index = 0; + ceph_wbc.index = 0; goto retry; } - if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0)) - mapping->writeback_index = index; + if (wbc->range_cyclic || (ceph_wbc.range_whole && wbc->nr_to_write > 0)) + mapping->writeback_index = ceph_wbc.index; + +dec_osd_stopping_blocker: + ceph_dec_osd_stopping_blocker(fsc->mdsc); out: - ceph_osdc_put_request(req); - ceph_put_snap_context(last_snapc); + ceph_put_snap_context(ceph_wbc.last_snapc); doutc(cl, "%llx.%llx dend - startone, rc = %d\n", ceph_vinop(inode), rc); + return rc; } - - /* * See if a given @snapc is either writeable, or already written. */ diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 54b3421501e9..230e0c3f341f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -5489,6 +5489,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->stopping_lock); atomic_set(&mdsc->stopping_blockers, 0); init_completion(&mdsc->stopping_waiter); + atomic64_set(&mdsc->dirty_folios, 0); + init_waitqueue_head(&mdsc->flush_end_wq); init_waitqueue_head(&mdsc->session_close_wq); INIT_LIST_HEAD(&mdsc->waiting_for_map); mdsc->quotarealms_inodes = RB_ROOT; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 7c9fee9e80d4..3e2a6fa7c19a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -458,6 +458,9 @@ struct ceph_mds_client { atomic_t stopping_blockers; struct completion stopping_waiter; + atomic64_t dirty_folios; + wait_queue_head_t flush_end_wq; + atomic64_t quotarealms_count; /* # realms with quota */ /* * We keep a list of inodes we don't see in the mountpoint but that we diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 4344e1f11806..f3951253e393 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -1563,6 +1563,17 @@ static void ceph_kill_sb(struct super_block *s) */ sync_filesystem(s); + if (atomic64_read(&mdsc->dirty_folios) > 0) { + wait_queue_head_t *wq = &mdsc->flush_end_wq; + long timeleft = wait_event_killable_timeout(*wq, + atomic64_read(&mdsc->dirty_folios) <= 0, + fsc->client->options->mount_timeout); + if (!timeleft) /* timed out */ + pr_warn_client(cl, "umount timed out, %ld\n", timeleft); + else if (timeleft < 0) /* killed */ + pr_warn_client(cl, "umount was killed, %ld\n", timeleft); + } + spin_lock(&mdsc->stopping_lock); mdsc->stopping = CEPH_MDSC_STOPPING_FLUSHING; wait = !!atomic_read(&mdsc->stopping_blockers); |