From 144cba1493fdd6e3e1980e439a31df877831ebcd Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 27 Apr 2015 11:09:54 +0800 Subject: libceph: allow setting osd_req_op's flags Signed-off-by: Yan, Zheng Reviewed-by: Alex Elder --- fs/ceph/addr.c | 3 ++- fs/ceph/file.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index e162bcd105ee..feeaf3b65fa0 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -884,7 +884,8 @@ get_more_pages: } if (do_sync) - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(req, 1, + CEPH_OSD_OP_STARTSYNC, 0); req->r_callback = writepages_finish; req->r_inode = inode; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3b6b522b4b31..777eb21d556f 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -614,7 +614,7 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) break; } - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC); + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); n = iov_iter_get_pages_alloc(from, &pages, len, &start); if (unlikely(n < 0)) { -- cgit v1.2.3-59-g8ed1b From 10183a69551f76702ac68bc74a437b25419c6de0 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 27 Apr 2015 15:33:28 +0800 Subject: ceph: check OSD caps before read/write Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 203 +++++++++++++++++++++++++++++++++++++++++++++++++++ fs/ceph/caps.c | 4 + fs/ceph/inode.c | 3 + fs/ceph/mds_client.c | 4 + fs/ceph/mds_client.h | 9 +++ fs/ceph/super.c | 15 +++- fs/ceph/super.h | 17 +++-- 7 files changed, 249 insertions(+), 6 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index feeaf3b65fa0..b96027727248 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1598,3 +1598,206 @@ int ceph_mmap(struct file *file, struct vm_area_struct *vma) vma->vm_ops = &ceph_vmops; return 0; } + +enum { + POOL_READ = 1, + POOL_WRITE = 2, +}; + +static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) +{ + struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode); + struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_osd_request *rd_req = NULL, *wr_req = NULL; + struct rb_node **p, *parent; + struct ceph_pool_perm *perm; + struct page **pages; + int err = 0, err2 = 0, have = 0; + + down_read(&mdsc->pool_perm_rwsem); + p = &mdsc->pool_perm_tree.rb_node; + while (*p) { + perm = rb_entry(*p, struct ceph_pool_perm, node); + if (pool < perm->pool) + p = &(*p)->rb_left; + else if (pool > perm->pool) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } + } + up_read(&mdsc->pool_perm_rwsem); + if (*p) + goto out; + + dout("__ceph_pool_perm_get pool %u no perm cached\n", pool); + + down_write(&mdsc->pool_perm_rwsem); + parent = NULL; + while (*p) { + parent = *p; + perm = rb_entry(parent, struct ceph_pool_perm, node); + if (pool < perm->pool) + p = &(*p)->rb_left; + else if (pool > perm->pool) + p = &(*p)->rb_right; + else { + have = perm->perm; + break; + } + } + if (*p) { + up_write(&mdsc->pool_perm_rwsem); + goto out; + } + + rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, + ci->i_snap_realm->cached_context, + 1, false, GFP_NOFS); + if (!rd_req) { + err = -ENOMEM; + goto out_unlock; + } + + rd_req->r_flags = CEPH_OSD_FLAG_READ; + osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0); + rd_req->r_base_oloc.pool = pool; + snprintf(rd_req->r_base_oid.name, sizeof(rd_req->r_base_oid.name), + "%llx.00000000", ci->i_vino.ino); + rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); + + wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, + ci->i_snap_realm->cached_context, + 1, false, GFP_NOFS); + if (!wr_req) { + err = -ENOMEM; + goto out_unlock; + } + + wr_req->r_flags = CEPH_OSD_FLAG_WRITE | + CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK; + osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL); + wr_req->r_base_oloc.pool = pool; + wr_req->r_base_oid = rd_req->r_base_oid; + + /* one page should be large enough for STAT data */ + pages = ceph_alloc_page_vector(1, GFP_KERNEL); + if (IS_ERR(pages)) { + err = PTR_ERR(pages); + goto out_unlock; + } + + osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE, + 0, false, true); + ceph_osdc_build_request(rd_req, 0, NULL, CEPH_NOSNAP, + &ci->vfs_inode.i_mtime); + err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false); + + ceph_osdc_build_request(wr_req, 0, NULL, CEPH_NOSNAP, + &ci->vfs_inode.i_mtime); + err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false); + + if (!err) + err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req); + if (!err2) + err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req); + + if (err >= 0 || err == -ENOENT) + have |= POOL_READ; + else if (err != -EPERM) + goto out_unlock; + + if (err2 == 0 || err2 == -EEXIST) + have |= POOL_WRITE; + else if (err2 != -EPERM) { + err = err2; + goto out_unlock; + } + + perm = kmalloc(sizeof(*perm), GFP_NOFS); + if (!perm) { + err = -ENOMEM; + goto out_unlock; + } + + perm->pool = pool; + perm->perm = have; + rb_link_node(&perm->node, parent, p); + rb_insert_color(&perm->node, &mdsc->pool_perm_tree); + err = 0; +out_unlock: + up_write(&mdsc->pool_perm_rwsem); + + if (rd_req) + ceph_osdc_put_request(rd_req); + if (wr_req) + ceph_osdc_put_request(wr_req); +out: + if (!err) + err = have; + dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err); + return err; +} + +int ceph_pool_perm_check(struct ceph_inode_info *ci, int need) +{ + u32 pool; + int ret, flags; + + if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode), + NOPOOLPERM)) + return 0; + + spin_lock(&ci->i_ceph_lock); + flags = ci->i_ceph_flags; + pool = ceph_file_layout_pg_pool(ci->i_layout); + spin_unlock(&ci->i_ceph_lock); +check: + if (flags & CEPH_I_POOL_PERM) { + if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) { + dout("ceph_pool_perm_check pool %u no read perm\n", + pool); + return -EPERM; + } + if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) { + dout("ceph_pool_perm_check pool %u no write perm\n", + pool); + return -EPERM; + } + return 0; + } + + ret = __ceph_pool_perm_get(ci, pool); + if (ret < 0) + return ret; + + flags = CEPH_I_POOL_PERM; + if (ret & POOL_READ) + flags |= CEPH_I_POOL_RD; + if (ret & POOL_WRITE) + flags |= CEPH_I_POOL_WR; + + spin_lock(&ci->i_ceph_lock); + if (pool == ceph_file_layout_pg_pool(ci->i_layout)) { + ci->i_ceph_flags = flags; + } else { + pool = ceph_file_layout_pg_pool(ci->i_layout); + flags = ci->i_ceph_flags; + } + spin_unlock(&ci->i_ceph_lock); + goto check; +} + +void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc) +{ + struct ceph_pool_perm *perm; + struct rb_node *n; + + while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) { + n = rb_first(&mdsc->pool_perm_tree); + perm = rb_entry(n, struct ceph_pool_perm, node); + rb_erase(n, &mdsc->pool_perm_tree); + kfree(perm); + } +} diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index be5ea6af8366..7c8e93aeb01e 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2233,6 +2233,10 @@ int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, { int _got, check_max, ret, err = 0; + ret = ceph_pool_perm_check(ci, need); + if (ret < 0) + return ret; + retry: if (endoff > 0) check_max_size(&ci->vfs_inode, endoff); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e876e1944519..1a68c0e38a52 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -753,7 +753,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, if (new_version || (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) { + if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool) + ci->i_ceph_flags &= ~CEPH_I_POOL_PERM; ci->i_layout = info->layout; + queue_trunc = ceph_fill_file_size(inode, issued, le32_to_cpu(info->truncate_seq), le64_to_cpu(info->truncate_size), diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 84f37f34f9aa..f125e06dacb8 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -3414,6 +3414,9 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) ceph_caps_init(mdsc); ceph_adjust_min_caps(mdsc, fsc->min_caps); + init_rwsem(&mdsc->pool_perm_rwsem); + mdsc->pool_perm_tree = RB_ROOT; + return 0; } @@ -3607,6 +3610,7 @@ static void ceph_mdsc_stop(struct ceph_mds_client *mdsc) ceph_mdsmap_destroy(mdsc->mdsmap); kfree(mdsc->sessions); ceph_caps_finalize(mdsc); + ceph_pool_perm_destroy(mdsc); } void ceph_mdsc_destroy(struct ceph_fs_client *fsc) diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 1875b5d985c6..d474141c034a 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -260,6 +260,12 @@ struct ceph_mds_request { int r_num_caps; }; +struct ceph_pool_perm { + struct rb_node node; + u32 pool; + int perm; +}; + /* * mds client state */ @@ -328,6 +334,9 @@ struct ceph_mds_client { spinlock_t dentry_lru_lock; struct list_head dentry_lru; int num_dentry; + + struct rw_semaphore pool_perm_rwsem; + struct rb_root pool_perm_tree; }; extern const char *ceph_mds_op_name(int op); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 4e9905374078..9a5350030af8 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -134,10 +134,12 @@ enum { Opt_noino32, Opt_fscache, Opt_nofscache, + Opt_poolperm, + Opt_nopoolperm, #ifdef CONFIG_CEPH_FS_POSIX_ACL Opt_acl, #endif - Opt_noacl + Opt_noacl, }; static match_table_t fsopt_tokens = { @@ -165,6 +167,8 @@ static match_table_t fsopt_tokens = { {Opt_noino32, "noino32"}, {Opt_fscache, "fsc"}, {Opt_nofscache, "nofsc"}, + {Opt_poolperm, "poolperm"}, + {Opt_nopoolperm, "nopoolperm"}, #ifdef CONFIG_CEPH_FS_POSIX_ACL {Opt_acl, "acl"}, #endif @@ -268,6 +272,13 @@ static int parse_fsopt_token(char *c, void *private) case Opt_nofscache: fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE; break; + case Opt_poolperm: + fsopt->flags &= ~CEPH_MOUNT_OPT_NOPOOLPERM; + printk ("pool perm"); + break; + case Opt_nopoolperm: + fsopt->flags |= CEPH_MOUNT_OPT_NOPOOLPERM; + break; #ifdef CONFIG_CEPH_FS_POSIX_ACL case Opt_acl: fsopt->sb_flags |= MS_POSIXACL; @@ -436,6 +447,8 @@ static int ceph_show_options(struct seq_file *m, struct dentry *root) seq_puts(m, ",nodcache"); if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) seq_puts(m, ",fsc"); + if (fsopt->flags & CEPH_MOUNT_OPT_NOPOOLPERM) + seq_puts(m, ",nopoolperm"); #ifdef CONFIG_CEPH_FS_POSIX_ACL if (fsopt->sb_flags & MS_POSIXACL) diff --git a/fs/ceph/super.h b/fs/ceph/super.h index fa20e1318939..18b917c7feb4 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -35,6 +35,7 @@ #define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */ #define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */ #define CEPH_MOUNT_OPT_FSCACHE (1<<10) /* use fscache */ +#define CEPH_MOUNT_OPT_NOPOOLPERM (1<<11) /* no pool permission check */ #define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES | \ CEPH_MOUNT_OPT_DCACHE) @@ -438,10 +439,14 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, /* * Ceph inode. */ -#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */ -#define CEPH_I_NODELAY 4 /* do not delay cap release */ -#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */ -#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */ +#define CEPH_I_DIR_ORDERED (1 << 0) /* dentries in dir are ordered */ +#define CEPH_I_NODELAY (1 << 1) /* do not delay cap release */ +#define CEPH_I_FLUSH (1 << 2) /* do not delay flush of dirty metadata */ +#define CEPH_I_NOFLUSH (1 << 3) /* do not flush dirty caps */ +#define CEPH_I_POOL_PERM (1 << 4) /* pool rd/wr bits are valid */ +#define CEPH_I_POOL_RD (1 << 5) /* can read from pool */ +#define CEPH_I_POOL_WR (1 << 6) /* can write to pool */ + static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, int release_count, int ordered_count) @@ -879,6 +884,9 @@ extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode); /* addr.c */ extern const struct address_space_operations ceph_aops; extern int ceph_mmap(struct file *file, struct vm_area_struct *vma); +extern int ceph_uninline_data(struct file *filp, struct page *locked_page); +extern int ceph_pool_perm_check(struct ceph_inode_info *ci, int need); +extern void ceph_pool_perm_destroy(struct ceph_mds_client* mdsc); /* file.c */ extern const struct file_operations ceph_file_fops; @@ -890,7 +898,6 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry, extern int ceph_release(struct inode *inode, struct file *filp); extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page, char *data, size_t len); -int ceph_uninline_data(struct file *filp, struct page *locked_page); /* dir.c */ extern const struct file_operations ceph_dir_fops; extern const struct file_operations ceph_snapdir_fops; -- cgit v1.2.3-59-g8ed1b From 7b06a826e7c52d77ce801e5960ecf0338eafe886 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 1 May 2015 10:03:40 +0800 Subject: ceph: use empty snap context for uninline_data and get_pool_perm Cached_context in ceph_snap_realm is directly accessed by uninline_data() and get_pool_perm(). This is racy in theory. both uninline_data() and get_pool_perm() do not modify existing object, they only create new object. So we can pass the empty snap context to them. Unlike cached_context in ceph_snap_realm, we do not need to protect the empty snap context. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 9 ++++----- fs/ceph/snap.c | 18 +++++++++--------- fs/ceph/super.h | 1 + 3 files changed, 14 insertions(+), 14 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index b96027727248..ccc4325aa5c5 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1510,8 +1510,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_vino(inode), 0, &len, 0, 1, CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - ci->i_snap_realm->cached_context, - 0, 0, false); + ceph_empty_snapc, 0, 0, false); if (IS_ERR(req)) { err = PTR_ERR(req); goto out; @@ -1529,7 +1528,7 @@ int ceph_uninline_data(struct file *filp, struct page *locked_page) ceph_vino(inode), 0, &len, 1, 3, CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_ONDISK | CEPH_OSD_FLAG_WRITE, - ci->i_snap_realm->cached_context, + ceph_empty_snapc, ci->i_truncate_seq, ci->i_truncate_size, false); if (IS_ERR(req)) { @@ -1653,7 +1652,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) } rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, - ci->i_snap_realm->cached_context, + ceph_empty_snapc, 1, false, GFP_NOFS); if (!rd_req) { err = -ENOMEM; @@ -1668,7 +1667,7 @@ static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool) rd_req->r_base_oid.name_len = strlen(rd_req->r_base_oid.name); wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, - ci->i_snap_realm->cached_context, + ceph_empty_snapc, 1, false, GFP_NOFS); if (!wr_req) { err = -ENOMEM; diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index a97e39f09ba6..b2a945345d2b 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -296,7 +296,7 @@ static int cmpu64_rev(const void *a, const void *b) } -static struct ceph_snap_context *empty_snapc; +struct ceph_snap_context *ceph_empty_snapc; /* * build the snap context for a given realm. @@ -338,9 +338,9 @@ static int build_snap_context(struct ceph_snap_realm *realm) return 0; } - if (num == 0 && realm->seq == empty_snapc->seq) { - ceph_get_snap_context(empty_snapc); - snapc = empty_snapc; + if (num == 0 && realm->seq == ceph_empty_snapc->seq) { + ceph_get_snap_context(ceph_empty_snapc); + snapc = ceph_empty_snapc; goto done; } @@ -482,7 +482,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) cap_snap. lucky us. */ dout("queue_cap_snap %p already pending\n", inode); kfree(capsnap); - } else if (ci->i_snap_realm->cached_context == empty_snapc) { + } else if (ci->i_snap_realm->cached_context == ceph_empty_snapc) { dout("queue_cap_snap %p empty snapc\n", inode); kfree(capsnap); } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| @@ -964,14 +964,14 @@ out: int __init ceph_snap_init(void) { - empty_snapc = ceph_create_snap_context(0, GFP_NOFS); - if (!empty_snapc) + ceph_empty_snapc = ceph_create_snap_context(0, GFP_NOFS); + if (!ceph_empty_snapc) return -ENOMEM; - empty_snapc->seq = 1; + ceph_empty_snapc->seq = 1; return 0; } void ceph_snap_exit(void) { - ceph_put_snap_context(empty_snapc); + ceph_put_snap_context(ceph_empty_snapc); } diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 18b917c7feb4..b182fd7499d9 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -692,6 +692,7 @@ static inline int default_congestion_kb(void) /* snap.c */ +extern struct ceph_snap_context *ceph_empty_snapc; struct ceph_snap_realm *ceph_lookup_snap_realm(struct ceph_mds_client *mdsc, u64 ino); extern void ceph_get_snap_realm(struct ceph_mds_client *mdsc, -- cgit v1.2.3-59-g8ed1b From 5dda377cf0a6bd43f64a3c1efb670d7c668e7b29 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 30 Apr 2015 14:40:54 +0800 Subject: ceph: set i_head_snapc when getting CEPH_CAP_FILE_WR reference In most cases that snap context is needed, we are holding reference of CEPH_CAP_FILE_WR. So we can set ceph inode's i_head_snapc when getting the CEPH_CAP_FILE_WR reference, and make codes get snap context from i_head_snapc. This makes the code simpler. Another benefit of this change is that we can handle snap notification more elegantly. Especially when snap context is updated while someone else is doing write. The old queue cap_snap code may set cap_snap's context to ether the old context or the new snap context, depending on if i_head_snapc is set. The new queue capp_snap code always set cap_snap's context to the old snap context. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 37 ++++++-------- fs/ceph/caps.c | 155 ++++++++++++++++++++++++++++++++++++++++----------------- fs/ceph/file.c | 31 +++++++++--- fs/ceph/snap.c | 126 ++++++++++++++++++++++++---------------------- 4 files changed, 212 insertions(+), 137 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index ccc4325aa5c5..5f53ac0d9d7c 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page) inode = mapping->host; ci = ceph_inode(inode); - /* - * Note that we're grabbing a snapc ref here without holding - * any locks! - */ - snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context); - /* dirty the head */ spin_lock(&ci->i_ceph_lock); - if (ci->i_head_snapc == NULL) - ci->i_head_snapc = ceph_get_snap_context(snapc); - ++ci->i_wrbuffer_ref_head; + BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + snapc = ceph_get_snap_context(capsnap->context); + capsnap->dirty_pages++; + } else { + BUG_ON(!ci->i_head_snapc); + snapc = ceph_get_snap_context(ci->i_head_snapc); + ++ci->i_wrbuffer_ref_head; + } if (ci->i_wrbuffer_ref == 0) ihold(inode); ++ci->i_wrbuffer_ref; @@ -1033,7 +1037,6 @@ static int ceph_update_writeable_page(struct file *file, { struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; loff_t page_off = pos & PAGE_CACHE_MASK; int pos_in_page = pos & ~PAGE_CACHE_MASK; int end_in_page = pos_in_page + len; @@ -1045,10 +1048,6 @@ retry_locked: /* writepages currently holds page lock, but if we change that later, */ wait_on_page_writeback(page); - /* check snap context */ - BUG_ON(!ci->i_snap_realm); - down_read(&mdsc->snap_rwsem); - BUG_ON(!ci->i_snap_realm->cached_context); snapc = page_snap_context(page); if (snapc && snapc != ci->i_head_snapc) { /* @@ -1056,7 +1055,6 @@ retry_locked: * context! is it writeable now? */ oldest = get_oldest_context(inode, NULL); - up_read(&mdsc->snap_rwsem); if (snapc->seq > oldest->seq) { ceph_put_snap_context(oldest); @@ -1113,7 +1111,6 @@ retry_locked: } /* we need to read it. */ - up_read(&mdsc->snap_rwsem); r = readpage_nounlock(file, page); if (r < 0) goto fail_nosnap; @@ -1158,16 +1155,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping, /* * we don't do anything in here that simple_write_end doesn't do - * except adjust dirty page accounting and drop read lock on - * mdsc->snap_rwsem. + * except adjust dirty page accounting */ static int ceph_write_end(struct file *file, struct address_space *mapping, loff_t pos, unsigned len, unsigned copied, struct page *page, void *fsdata) { struct inode *inode = file_inode(file); - struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_mds_client *mdsc = fsc->mdsc; unsigned from = pos & (PAGE_CACHE_SIZE - 1); int check_cap = 0; @@ -1189,7 +1183,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping, set_page_dirty(page); unlock_page(page); - up_read(&mdsc->snap_rwsem); page_cache_release(page); if (check_cap) @@ -1315,7 +1308,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; - struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; struct page *page = vmf->page; loff_t off = page_offset(page); loff_t size = i_size_read(inode); @@ -1374,7 +1366,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) if (ret == 0) { /* success. we'll keep the page locked. */ set_page_dirty(page); - up_read(&mdsc->snap_rwsem); ret = VM_FAULT_LOCKED; } else { if (ret == -ENOMEM) diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 7c8e93aeb01e..feb8ec92f1b4 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -2073,7 +2073,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, * * Protected by i_ceph_lock. */ -static void __take_cap_refs(struct ceph_inode_info *ci, int got) +static void __take_cap_refs(struct ceph_inode_info *ci, int got, + bool snap_rwsem_locked) { if (got & CEPH_CAP_PIN) ci->i_pin_ref++; @@ -2081,8 +2082,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) ci->i_rd_ref++; if (got & CEPH_CAP_FILE_CACHE) ci->i_rdcache_ref++; - if (got & CEPH_CAP_FILE_WR) + if (got & CEPH_CAP_FILE_WR) { + if (ci->i_wr_ref == 0 && !ci->i_head_snapc) { + BUG_ON(!snap_rwsem_locked); + ci->i_head_snapc = ceph_get_snap_context( + ci->i_snap_realm->cached_context); + } ci->i_wr_ref++; + } if (got & CEPH_CAP_FILE_BUFFER) { if (ci->i_wb_ref == 0) ihold(&ci->vfs_inode); @@ -2100,16 +2107,19 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got) * requested from the MDS. */ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, - loff_t endoff, int *got, int *check_max, int *err) + loff_t endoff, bool nonblock, int *got, int *err) { struct inode *inode = &ci->vfs_inode; + struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc; int ret = 0; int have, implemented; int file_wanted; + bool snap_rwsem_locked = false; dout("get_cap_refs %p need %s want %s\n", inode, ceph_cap_string(need), ceph_cap_string(want)); +again: spin_lock(&ci->i_ceph_lock); /* make sure file is actually open */ @@ -2125,6 +2135,10 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, /* finish pending truncate */ while (ci->i_truncate_pending) { spin_unlock(&ci->i_ceph_lock); + if (snap_rwsem_locked) { + up_read(&mdsc->snap_rwsem); + snap_rwsem_locked = false; + } __ceph_do_pending_vmtruncate(inode); spin_lock(&ci->i_ceph_lock); } @@ -2136,7 +2150,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, dout("get_cap_refs %p endoff %llu > maxsize %llu\n", inode, endoff, ci->i_max_size); if (endoff > ci->i_requested_max_size) { - *check_max = 1; + *err = -EAGAIN; ret = 1; } goto out_unlock; @@ -2164,8 +2178,29 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, inode, ceph_cap_string(have), ceph_cap_string(not), ceph_cap_string(revoking)); if ((revoking & not) == 0) { + if (!snap_rwsem_locked && + !ci->i_head_snapc && + (need & CEPH_CAP_FILE_WR)) { + if (!down_read_trylock(&mdsc->snap_rwsem)) { + /* + * we can not call down_read() when + * task isn't in TASK_RUNNING state + */ + if (nonblock) { + *err = -EAGAIN; + ret = 1; + goto out_unlock; + } + + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + snap_rwsem_locked = true; + goto again; + } + snap_rwsem_locked = true; + } *got = need | (have & want); - __take_cap_refs(ci, *got); + __take_cap_refs(ci, *got, true); ret = 1; } } else { @@ -2189,6 +2224,8 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want, } out_unlock: spin_unlock(&ci->i_ceph_lock); + if (snap_rwsem_locked) + up_read(&mdsc->snap_rwsem); dout("get_cap_refs %p ret %d got %s\n", inode, ret, ceph_cap_string(*got)); @@ -2231,54 +2268,70 @@ static void check_max_size(struct inode *inode, loff_t endoff) int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, loff_t endoff, int *got, struct page **pinned_page) { - int _got, check_max, ret, err = 0; + int _got, ret, err = 0; ret = ceph_pool_perm_check(ci, need); if (ret < 0) return ret; -retry: - if (endoff > 0) - check_max_size(&ci->vfs_inode, endoff); - _got = 0; - check_max = 0; - ret = wait_event_interruptible(ci->i_cap_wq, - try_get_cap_refs(ci, need, want, endoff, - &_got, &check_max, &err)); - if (err) - ret = err; - if (ret < 0) - return ret; + while (true) { + if (endoff > 0) + check_max_size(&ci->vfs_inode, endoff); - if (check_max) - goto retry; + err = 0; + _got = 0; + ret = try_get_cap_refs(ci, need, want, endoff, + false, &_got, &err); + if (ret) { + if (err == -EAGAIN) + continue; + if (err < 0) + return err; + } else { + ret = wait_event_interruptible(ci->i_cap_wq, + try_get_cap_refs(ci, need, want, endoff, + true, &_got, &err)); + if (err == -EAGAIN) + continue; + if (err < 0) + ret = err; + if (ret < 0) + return ret; + } - if (ci->i_inline_version != CEPH_INLINE_NONE && - (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && - i_size_read(&ci->vfs_inode) > 0) { - struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0); - if (page) { - if (PageUptodate(page)) { - *pinned_page = page; - goto out; + if (ci->i_inline_version != CEPH_INLINE_NONE && + (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) && + i_size_read(&ci->vfs_inode) > 0) { + struct page *page = + find_get_page(ci->vfs_inode.i_mapping, 0); + if (page) { + if (PageUptodate(page)) { + *pinned_page = page; + break; + } + page_cache_release(page); } - page_cache_release(page); - } - /* - * drop cap refs first because getattr while holding - * caps refs can cause deadlock. - */ - ceph_put_cap_refs(ci, _got); - _got = 0; + /* + * drop cap refs first because getattr while + * holding * caps refs can cause deadlock. + */ + ceph_put_cap_refs(ci, _got); + _got = 0; - /* getattr request will bring inline data into page cache */ - ret = __ceph_do_getattr(&ci->vfs_inode, NULL, - CEPH_STAT_CAP_INLINE_DATA, true); - if (ret < 0) - return ret; - goto retry; + /* + * getattr request will bring inline data into + * page cache + */ + ret = __ceph_do_getattr(&ci->vfs_inode, NULL, + CEPH_STAT_CAP_INLINE_DATA, + true); + if (ret < 0) + return ret; + continue; + } + break; } -out: + *got = _got; return 0; } @@ -2290,7 +2343,7 @@ out: void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) { spin_lock(&ci->i_ceph_lock); - __take_cap_refs(ci, caps); + __take_cap_refs(ci, caps, false); spin_unlock(&ci->i_ceph_lock); } @@ -2341,6 +2394,13 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) wake = 1; } } + if (ci->i_wrbuffer_ref_head == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { + BUG_ON(!ci->i_head_snapc); + ceph_put_snap_context(ci->i_head_snapc); + ci->i_head_snapc = NULL; + } /* see comment in __ceph_remove_cap() */ if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) drop_inode_snap_realm(ci); @@ -2384,7 +2444,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, if (ci->i_head_snapc == snapc) { ci->i_wrbuffer_ref_head -= nr; if (ci->i_wrbuffer_ref_head == 0 && - ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) { + ci->i_wr_ref == 0 && + ci->i_dirty_caps == 0 && + ci->i_flushing_caps == 0) { BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; @@ -2775,7 +2837,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, dout(" inode %p now clean\n", inode); BUG_ON(!list_empty(&ci->i_dirty_item)); drop = 1; - if (ci->i_wrbuffer_ref_head == 0) { + if (ci->i_wr_ref == 0 && + ci->i_wrbuffer_ref_head == 0) { BUG_ON(!ci->i_head_snapc); ceph_put_snap_context(ci->i_head_snapc); ci->i_head_snapc = NULL; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 777eb21d556f..0a76a370d798 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -557,13 +557,13 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) * objects, rollback on failure, etc.) */ static ssize_t -ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) +ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, + struct ceph_snap_context *snapc) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; struct page **pages; @@ -600,7 +600,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) size_t start; ssize_t n; - snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, pos, &len, 0, @@ -674,13 +673,13 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) * objects, rollback on failure, etc.) */ static ssize_t -ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) +ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, + struct ceph_snap_context *snapc) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_fs_client *fsc = ceph_inode_to_client(inode); - struct ceph_snap_context *snapc; struct ceph_vino vino; struct ceph_osd_request *req; struct page **pages; @@ -717,7 +716,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos) size_t left; int n; - snapc = ci->i_snap_realm->cached_context; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, pos, &len, 0, 1, @@ -996,14 +994,30 @@ retry_snap: if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 || (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) { + struct ceph_snap_context *snapc; struct iov_iter data; mutex_unlock(&inode->i_mutex); + + spin_lock(&ci->i_ceph_lock); + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + snapc = ceph_get_snap_context(capsnap->context); + } else { + BUG_ON(!ci->i_head_snapc); + snapc = ceph_get_snap_context(ci->i_head_snapc); + } + spin_unlock(&ci->i_ceph_lock); + /* we might need to revert back to that point */ data = *from; if (iocb->ki_flags & IOCB_DIRECT) - written = ceph_sync_direct_write(iocb, &data, pos); + written = ceph_sync_direct_write(iocb, &data, pos, + snapc); else - written = ceph_sync_write(iocb, &data, pos); + written = ceph_sync_write(iocb, &data, pos, snapc); if (written == -EOLDSNAPC) { dout("aio_write %p %llx.%llx %llu~%u" "got EOLDSNAPC, retrying\n", @@ -1014,6 +1028,7 @@ retry_snap: } if (written > 0) iov_iter_advance(from, written); + ceph_put_snap_context(snapc); } else { loff_t old_size = inode->i_size; /* diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index b2a945345d2b..5bfdab9a465e 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -455,6 +455,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) { struct inode *inode = &ci->vfs_inode; struct ceph_cap_snap *capsnap; + struct ceph_snap_context *old_snapc; int used, dirty; capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); @@ -467,6 +468,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) used = __ceph_caps_used(ci); dirty = __ceph_caps_dirty(ci); + old_snapc = ci->i_head_snapc; + /* * If there is a write in progress, treat that as a dirty Fw, * even though it hasn't completed yet; by the time we finish @@ -481,76 +484,79 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) writes in progress now were started before the previous cap_snap. lucky us. */ dout("queue_cap_snap %p already pending\n", inode); - kfree(capsnap); - } else if (ci->i_snap_realm->cached_context == ceph_empty_snapc) { + goto update_snapc; + } + if (ci->i_snap_realm->cached_context == ceph_empty_snapc) { dout("queue_cap_snap %p empty snapc\n", inode); - kfree(capsnap); - } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| - CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) { - struct ceph_snap_context *snapc = ci->i_head_snapc; + goto update_snapc; + } + if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| + CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) { + dout("queue_cap_snap %p nothing dirty|writing\n", inode); + goto update_snapc; + } - /* - * if we are a sync write, we may need to go to the snaprealm - * to get the current snapc. - */ - if (!snapc) - snapc = ci->i_snap_realm->cached_context; + BUG_ON(!old_snapc); - dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", - inode, capsnap, snapc, ceph_cap_string(dirty)); - ihold(inode); + dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", + inode, capsnap, old_snapc, ceph_cap_string(dirty)); + ihold(inode); - atomic_set(&capsnap->nref, 1); - capsnap->ci = ci; - INIT_LIST_HEAD(&capsnap->ci_item); - INIT_LIST_HEAD(&capsnap->flushing_item); - - capsnap->follows = snapc->seq; - capsnap->issued = __ceph_caps_issued(ci, NULL); - capsnap->dirty = dirty; - - capsnap->mode = inode->i_mode; - capsnap->uid = inode->i_uid; - capsnap->gid = inode->i_gid; - - if (dirty & CEPH_CAP_XATTR_EXCL) { - __ceph_build_xattrs_blob(ci); - capsnap->xattr_blob = - ceph_buffer_get(ci->i_xattrs.blob); - capsnap->xattr_version = ci->i_xattrs.version; - } else { - capsnap->xattr_blob = NULL; - capsnap->xattr_version = 0; - } + atomic_set(&capsnap->nref, 1); + capsnap->ci = ci; + INIT_LIST_HEAD(&capsnap->ci_item); + INIT_LIST_HEAD(&capsnap->flushing_item); - capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + capsnap->follows = old_snapc->seq; + capsnap->issued = __ceph_caps_issued(ci, NULL); + capsnap->dirty = dirty; - /* dirty page count moved from _head to this cap_snap; - all subsequent writes page dirties occur _after_ this - snapshot. */ - capsnap->dirty_pages = ci->i_wrbuffer_ref_head; - ci->i_wrbuffer_ref_head = 0; - capsnap->context = snapc; - ci->i_head_snapc = - ceph_get_snap_context(ci->i_snap_realm->cached_context); - dout(" new snapc is %p\n", ci->i_head_snapc); - list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); - - if (used & CEPH_CAP_FILE_WR) { - dout("queue_cap_snap %p cap_snap %p snapc %p" - " seq %llu used WR, now pending\n", inode, - capsnap, snapc, snapc->seq); - capsnap->writing = 1; - } else { - /* note mtime, size NOW. */ - __ceph_finish_cap_snap(ci, capsnap); - } + capsnap->mode = inode->i_mode; + capsnap->uid = inode->i_uid; + capsnap->gid = inode->i_gid; + + if (dirty & CEPH_CAP_XATTR_EXCL) { + __ceph_build_xattrs_blob(ci); + capsnap->xattr_blob = + ceph_buffer_get(ci->i_xattrs.blob); + capsnap->xattr_version = ci->i_xattrs.version; } else { - dout("queue_cap_snap %p nothing dirty|writing\n", inode); - kfree(capsnap); + capsnap->xattr_blob = NULL; + capsnap->xattr_version = 0; } + capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE; + + /* dirty page count moved from _head to this cap_snap; + all subsequent writes page dirties occur _after_ this + snapshot. */ + capsnap->dirty_pages = ci->i_wrbuffer_ref_head; + ci->i_wrbuffer_ref_head = 0; + capsnap->context = old_snapc; + list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps); + old_snapc = NULL; + + if (used & CEPH_CAP_FILE_WR) { + dout("queue_cap_snap %p cap_snap %p snapc %p" + " seq %llu used WR, now pending\n", inode, + capsnap, old_snapc, old_snapc->seq); + capsnap->writing = 1; + } else { + /* note mtime, size NOW. */ + __ceph_finish_cap_snap(ci, capsnap); + } + capsnap = NULL; + +update_snapc: + if (ci->i_head_snapc) { + ci->i_head_snapc = ceph_get_snap_context( + ci->i_snap_realm->cached_context); + dout(" new snapc is %p\n", ci->i_head_snapc); + } spin_unlock(&ci->i_ceph_lock); + + kfree(capsnap); + ceph_put_snap_context(old_snapc); } /* -- cgit v1.2.3-59-g8ed1b From 860560904962d08fd38666207c910065fe53e074 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 1 May 2015 16:57:16 +0800 Subject: ceph: avoid sending unnessesary FLUSHSNAP message when a snap notification contains no new snapshot, we can avoid sending FLUSHSNAP message to MDS. But we still need to create cap_snap in some case because it's required by write path and page writeback path Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 69 +++++++++++++++++++++++++++++++-------------------------- fs/ceph/snap.c | 49 ++++++++++++++++++++++++++++++---------- fs/ceph/super.h | 5 +++-- 3 files changed, 78 insertions(+), 45 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index feb8ec92f1b4..f1dbcae7c75c 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1297,11 +1297,8 @@ retry: if (capsnap->dirty_pages || capsnap->writing) break; - /* - * if cap writeback already occurred, we should have dropped - * the capsnap in ceph_put_wrbuffer_cap_refs. - */ - BUG_ON(capsnap->dirty == 0); + /* should be removed by ceph_try_drop_cap_snap() */ + BUG_ON(!capsnap->need_flush); /* pick mds, take s_mutex */ if (ci->i_auth_cap == NULL) { @@ -2347,6 +2344,27 @@ void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps) spin_unlock(&ci->i_ceph_lock); } + +/* + * drop cap_snap that is not associated with any snapshot. + * we don't need to send FLUSHSNAP message for it. + */ +static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap) +{ + if (!capsnap->need_flush && + !capsnap->writing && !capsnap->dirty_pages) { + + dout("dropping cap_snap %p follows %llu\n", + capsnap, capsnap->follows); + ceph_put_snap_context(capsnap->context); + list_del(&capsnap->ci_item); + list_del(&capsnap->flushing_item); + ceph_put_cap_snap(capsnap); + return 1; + } + return 0; +} + /* * Release cap refs. * @@ -2360,7 +2378,6 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) { struct inode *inode = &ci->vfs_inode; int last = 0, put = 0, flushsnaps = 0, wake = 0; - struct ceph_cap_snap *capsnap; spin_lock(&ci->i_ceph_lock); if (had & CEPH_CAP_PIN) @@ -2382,17 +2399,17 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) if (had & CEPH_CAP_FILE_WR) if (--ci->i_wr_ref == 0) { last++; - if (!list_empty(&ci->i_cap_snaps)) { - capsnap = list_first_entry(&ci->i_cap_snaps, - struct ceph_cap_snap, - ci_item); - if (capsnap->writing) { - capsnap->writing = 0; - flushsnaps = - __ceph_finish_cap_snap(ci, - capsnap); - wake = 1; - } + if (__ceph_have_pending_cap_snap(ci)) { + struct ceph_cap_snap *capsnap = + list_last_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, + ci_item); + capsnap->writing = 0; + if (ceph_try_drop_cap_snap(capsnap)) + put++; + else if (__ceph_finish_cap_snap(ci, capsnap)) + flushsnaps = 1; + wake = 1; } if (ci->i_wrbuffer_ref_head == 0 && ci->i_dirty_caps == 0 && @@ -2416,7 +2433,7 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had) ceph_flush_snaps(ci); if (wake) wake_up_all(&ci->i_cap_wq); - if (put) + while (put-- > 0) iput(inode); } @@ -2467,25 +2484,15 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr, capsnap->dirty_pages -= nr; if (capsnap->dirty_pages == 0) { complete_capsnap = 1; - if (capsnap->dirty == 0) - /* cap writeback completed before we created - * the cap_snap; no FLUSHSNAP is needed */ - drop_capsnap = 1; + drop_capsnap = ceph_try_drop_cap_snap(capsnap); } dout("put_wrbuffer_cap_refs on %p cap_snap %p " - " snap %lld %d/%d -> %d/%d %s%s%s\n", + " snap %lld %d/%d -> %d/%d %s%s\n", inode, capsnap, capsnap->context->seq, ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr, ci->i_wrbuffer_ref, capsnap->dirty_pages, last ? " (wrbuffer last)" : "", - complete_capsnap ? " (complete capsnap)" : "", - drop_capsnap ? " (drop capsnap)" : ""); - if (drop_capsnap) { - ceph_put_snap_context(capsnap->context); - list_del(&capsnap->ci_item); - list_del(&capsnap->flushing_item); - ceph_put_cap_snap(capsnap); - } + complete_capsnap ? " (complete capsnap)" : ""); } spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index 5bfdab9a465e..ba708017d60b 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -436,6 +436,14 @@ static int dup_array(u64 **dst, __le64 *src, u32 num) return 0; } +static bool has_new_snaps(struct ceph_snap_context *o, + struct ceph_snap_context *n) +{ + if (n->num_snaps == 0) + return false; + /* snaps are in descending order */ + return n->snaps[0] > o->seq; +} /* * When a snapshot is applied, the size/mtime inode metadata is queued @@ -455,7 +463,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) { struct inode *inode = &ci->vfs_inode; struct ceph_cap_snap *capsnap; - struct ceph_snap_context *old_snapc; + struct ceph_snap_context *old_snapc, *new_snapc; int used, dirty; capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS); @@ -469,6 +477,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) dirty = __ceph_caps_dirty(ci); old_snapc = ci->i_head_snapc; + new_snapc = ci->i_snap_realm->cached_context; /* * If there is a write in progress, treat that as a dirty Fw, @@ -486,20 +495,37 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) dout("queue_cap_snap %p already pending\n", inode); goto update_snapc; } - if (ci->i_snap_realm->cached_context == ceph_empty_snapc) { - dout("queue_cap_snap %p empty snapc\n", inode); - goto update_snapc; - } - if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL| - CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) { + if (ci->i_wrbuffer_ref_head == 0 && + !(dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR))) { dout("queue_cap_snap %p nothing dirty|writing\n", inode); goto update_snapc; } BUG_ON(!old_snapc); - dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n", - inode, capsnap, old_snapc, ceph_cap_string(dirty)); + /* + * There is no need to send FLUSHSNAP message to MDS if there is + * no new snapshot. But when there is dirty pages or on-going + * writes, we still need to create cap_snap. cap_snap is needed + * by the write path and page writeback path. + * + * also see ceph_try_drop_cap_snap() + */ + if (has_new_snaps(old_snapc, new_snapc)) { + if (dirty & (CEPH_CAP_ANY_EXCL|CEPH_CAP_FILE_WR)) + capsnap->need_flush = true; + } else { + if (!(used & CEPH_CAP_FILE_WR) && + ci->i_wrbuffer_ref_head == 0) { + dout("queue_cap_snap %p " + "no new_snap|dirty_page|writing\n", inode); + goto update_snapc; + } + } + + dout("queue_cap_snap %p cap_snap %p queuing under %p %s %s\n", + inode, capsnap, old_snapc, ceph_cap_string(dirty), + capsnap->need_flush ? "" : "no_flush"); ihold(inode); atomic_set(&capsnap->nref, 1); @@ -549,9 +575,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci) update_snapc: if (ci->i_head_snapc) { - ci->i_head_snapc = ceph_get_snap_context( - ci->i_snap_realm->cached_context); - dout(" new snapc is %p\n", ci->i_head_snapc); + ci->i_head_snapc = ceph_get_snap_context(new_snapc); + dout(" new snapc is %p\n", new_snapc); } spin_unlock(&ci->i_ceph_lock); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index b182fd7499d9..4ef1ae92c2a6 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -164,6 +164,7 @@ struct ceph_cap_snap { int writing; /* a sync write is still in progress */ int dirty_pages; /* dirty pages awaiting writeback */ bool inline_data; + bool need_flush; }; static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) @@ -719,8 +720,8 @@ extern void ceph_snap_exit(void); static inline bool __ceph_have_pending_cap_snap(struct ceph_inode_info *ci) { return !list_empty(&ci->i_cap_snaps) && - list_entry(ci->i_cap_snaps.prev, struct ceph_cap_snap, - ci_item)->writing; + list_last_entry(&ci->i_cap_snaps, struct ceph_cap_snap, + ci_item)->writing; } /* inode.c */ -- cgit v1.2.3-59-g8ed1b From 604d1b0245b97738cde4341944ad93edff4b2827 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 1 May 2015 17:49:16 +0800 Subject: ceph: take snap_rwsem when accessing snap realm's cached_context When ceph inode's i_head_snapc is NULL, __ceph_mark_dirty_caps() accesses snap realm's cached_context. So we need take read lock of snap_rwsem. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 4 +++- fs/ceph/inode.c | 15 +++++++++++++++ fs/ceph/xattr.c | 45 +++++++++++++++++++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 7 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index f1dbcae7c75c..900c05fd77d8 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1413,9 +1413,11 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { - if (!ci->i_head_snapc) + if (!ci->i_head_snapc) { + WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); ci->i_head_snapc = ceph_get_snap_context( ci->i_snap_realm->cached_context); + } dout(" inode %p now dirty snapc %p auth cap %p\n", &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap); BUG_ON(!list_empty(&ci->i_dirty_item)); diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 1a68c0e38a52..1c991df276c9 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -1727,6 +1727,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) int mask = 0; int err = 0; int inode_dirty_flags = 0; + bool lock_snap_rwsem = false; if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; @@ -1742,6 +1743,18 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); + + if (!ci->i_head_snapc && + (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + issued = __ceph_caps_issued(ci, NULL); + } + } + dout("setattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ia_valid & ATTR_UID) { @@ -1890,6 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) release &= issued; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (inode_dirty_flags) __mark_inode_dirty(inode, inode_dirty_flags); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index cd7ffad4041d..c6f7d9b82085 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -911,6 +911,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, struct inode *inode = d_inode(dentry); struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; int issued; int err; int dirty = 0; @@ -920,6 +921,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, char *newval = NULL; struct ceph_inode_xattr *xattr = NULL; int required_blob_size; + bool lock_snap_rwsem = false; if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -951,9 +953,20 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); - dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; + + if (!lock_snap_rwsem && !ci->i_head_snapc) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + goto retry; + } + } + + dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued)); __build_xattrs(inode); required_blob_size = __get_required_blob_size(ci, name_len, val_len); @@ -966,7 +979,7 @@ retry: dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) - goto out; + goto do_sync_unlocked; spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); @@ -984,6 +997,8 @@ retry: } spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); return err; @@ -991,6 +1006,8 @@ retry: do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); err = ceph_sync_setxattr(dentry, name, value, size, flags); out: kfree(newname); @@ -1044,10 +1061,12 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) struct inode *inode = d_inode(dentry); struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; int issued; int err; int required_blob_size; int dirty; + bool lock_snap_rwsem = false; if (!ceph_is_valid_xattr(name)) return -EOPNOTSUPP; @@ -1064,10 +1083,21 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); - dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); - if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL)) goto do_sync; + + if (!lock_snap_rwsem && !ci->i_head_snapc) { + lock_snap_rwsem = true; + if (!down_read_trylock(&mdsc->snap_rwsem)) { + spin_unlock(&ci->i_ceph_lock); + down_read(&mdsc->snap_rwsem); + spin_lock(&ci->i_ceph_lock); + goto retry; + } + } + + dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued)); + __build_xattrs(inode); required_blob_size = __get_required_blob_size(ci, 0, 0); @@ -1080,7 +1110,7 @@ retry: dout(" preaallocating new blob size=%d\n", required_blob_size); blob = ceph_buffer_new(required_blob_size, GFP_NOFS); if (!blob) - goto out; + goto do_sync_unlocked; spin_lock(&ci->i_ceph_lock); if (ci->i_xattrs.prealloc_blob) ceph_buffer_put(ci->i_xattrs.prealloc_blob); @@ -1094,14 +1124,17 @@ retry: ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; spin_unlock(&ci->i_ceph_lock); + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); return err; do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: + if (lock_snap_rwsem) + up_read(&mdsc->snap_rwsem); err = ceph_send_removexattr(dentry, name); -out: return err; } -- cgit v1.2.3-59-g8ed1b From 622f3e250f498976ad4cbae6f2be5cb359ded4f5 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 7 May 2015 10:59:47 +0800 Subject: ceph: don't trim auth cap when there are cap snaps Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index f125e06dacb8..88010f9a254d 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1371,7 +1371,8 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) inode, cap, ceph_cap_string(mine), ceph_cap_string(oissued), ceph_cap_string(used), ceph_cap_string(wanted)); if (cap == ci->i_auth_cap) { - if (ci->i_dirty_caps | ci->i_flushing_caps) + if (ci->i_dirty_caps || ci->i_flushing_caps || + !list_empty(&ci->i_cap_snaps)) goto out; if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; -- cgit v1.2.3-59-g8ed1b From affbc19a68f9966ad65a773db405f78e2bafc07b Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 5 May 2015 21:22:13 +0800 Subject: ceph: make sure syncfs flushes all cap snaps Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 18 ++++++----- fs/ceph/mds_client.c | 86 +++++++++++++++++++++++++++++++++++++--------------- fs/ceph/mds_client.h | 1 + fs/ceph/snap.c | 2 ++ 4 files changed, 76 insertions(+), 31 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 900c05fd77d8..bbd969e16a01 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1259,14 +1259,14 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * asynchronously back to the MDS once sync writes complete and dirty * data is written out. * - * Unless @again is true, skip cap_snaps that were already sent to + * Unless @kick is true, skip cap_snaps that were already sent to * the MDS (i.e., during this session). * * Called under i_ceph_lock. Takes s_mutex as needed. */ void __ceph_flush_snaps(struct ceph_inode_info *ci, struct ceph_mds_session **psession, - int again) + int kick) __releases(ci->i_ceph_lock) __acquires(ci->i_ceph_lock) { @@ -1307,7 +1307,7 @@ retry: } /* only flush each capsnap once */ - if (!again && !list_empty(&capsnap->flushing_item)) { + if (!kick && !list_empty(&capsnap->flushing_item)) { dout("already flushed %p, skipping\n", capsnap); continue; } @@ -1317,6 +1317,9 @@ retry: if (session && session->s_mds != mds) { dout("oops, wrong session %p mutex\n", session); + if (kick) + goto out; + mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); session = NULL; @@ -1342,10 +1345,9 @@ retry: capsnap->flush_tid = ++ci->i_cap_flush_last_tid; atomic_inc(&capsnap->nref); - if (!list_empty(&capsnap->flushing_item)) - list_del_init(&capsnap->flushing_item); - list_add_tail(&capsnap->flushing_item, - &session->s_cap_snaps_flushing); + if (list_empty(&capsnap->flushing_item)) + list_add_tail(&capsnap->flushing_item, + &session->s_cap_snaps_flushing); spin_unlock(&ci->i_ceph_lock); dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n", @@ -2876,6 +2878,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, struct ceph_mds_session *session) { struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; u64 follows = le64_to_cpu(m->snap_follows); struct ceph_cap_snap *capsnap; int drop = 0; @@ -2899,6 +2902,7 @@ static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid, list_del(&capsnap->ci_item); list_del(&capsnap->flushing_item); ceph_put_cap_snap(capsnap); + wake_up_all(&mdsc->cap_flushing_wq); drop = 1; break; } else { diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 88010f9a254d..2bb9264b9225 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1488,17 +1488,22 @@ out_unlocked: return err; } -static int check_cap_flush(struct inode *inode, u64 want_flush_seq) +static int check_cap_flush(struct ceph_inode_info *ci, + u64 want_flush_seq, u64 want_snap_seq) { - struct ceph_inode_info *ci = ceph_inode(inode); - int ret; + int ret1 = 1, ret2 = 1; spin_lock(&ci->i_ceph_lock); - if (ci->i_flushing_caps) - ret = ci->i_cap_flush_seq >= want_flush_seq; - else - ret = 1; + if (want_flush_seq > 0 && ci->i_flushing_caps) + ret1 = ci->i_cap_flush_seq >= want_flush_seq; + + if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&ci->i_cap_snaps, + struct ceph_cap_snap, ci_item); + ret2 = capsnap->follows >= want_snap_seq; + } spin_unlock(&ci->i_ceph_lock); - return ret; + return ret1 && ret2; } /* @@ -1506,45 +1511,72 @@ static int check_cap_flush(struct inode *inode, u64 want_flush_seq) * * returns true if we've flushed through want_flush_seq */ -static void wait_caps_flush(struct ceph_mds_client *mdsc, u64 want_flush_seq) +static void wait_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_seq, u64 want_snap_seq) { int mds; dout("check_cap_flush want %lld\n", want_flush_seq); mutex_lock(&mdsc->mutex); - for (mds = 0; mds < mdsc->max_sessions; mds++) { + for (mds = 0; mds < mdsc->max_sessions; ) { struct ceph_mds_session *session = mdsc->sessions[mds]; - struct inode *inode = NULL; + struct inode *inode1 = NULL, *inode2 = NULL; - if (!session) + if (!session) { + mds++; continue; + } get_session(session); mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); if (!list_empty(&session->s_cap_flushing)) { struct ceph_inode_info *ci = - list_entry(session->s_cap_flushing.next, - struct ceph_inode_info, - i_flushing_item); + list_first_entry(&session->s_cap_flushing, + struct ceph_inode_info, + i_flushing_item); - if (!check_cap_flush(&ci->vfs_inode, want_flush_seq)) { + if (!check_cap_flush(ci, want_flush_seq, 0)) { dout("check_cap_flush still flushing %p " "seq %lld <= %lld to mds%d\n", &ci->vfs_inode, ci->i_cap_flush_seq, - want_flush_seq, session->s_mds); - inode = igrab(&ci->vfs_inode); + want_flush_seq, mds); + inode1 = igrab(&ci->vfs_inode); + } + } + if (!list_empty(&session->s_cap_snaps_flushing)) { + struct ceph_cap_snap *capsnap = + list_first_entry(&session->s_cap_snaps_flushing, + struct ceph_cap_snap, + flushing_item); + struct ceph_inode_info *ci = capsnap->ci; + if (!check_cap_flush(ci, 0, want_snap_seq)) { + dout("check_cap_flush still flushing snap %p " + "follows %lld <= %lld to mds%d\n", + &ci->vfs_inode, capsnap->follows, + want_snap_seq, mds); + inode2 = igrab(&ci->vfs_inode); } } mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); - if (inode) { + if (inode1) { wait_event(mdsc->cap_flushing_wq, - check_cap_flush(inode, want_flush_seq)); - iput(inode); + check_cap_flush(ceph_inode(inode1), + want_flush_seq, 0)); + iput(inode1); + } + if (inode2) { + wait_event(mdsc->cap_flushing_wq, + check_cap_flush(ceph_inode(inode2), + 0, want_snap_seq)); + iput(inode2); } + if (!inode1 && !inode2) + mds++; + mutex_lock(&mdsc->mutex); } @@ -3391,6 +3423,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) atomic_set(&mdsc->num_sessions, 0); mdsc->max_sessions = 0; mdsc->stopping = 0; + mdsc->last_snap_seq = 0; init_rwsem(&mdsc->snap_rwsem); mdsc->snap_realms = RB_ROOT; INIT_LIST_HEAD(&mdsc->snap_empty); @@ -3517,7 +3550,7 @@ restart: void ceph_mdsc_sync(struct ceph_mds_client *mdsc) { - u64 want_tid, want_flush; + u64 want_tid, want_flush, want_snap; if (mdsc->fsc->mount_state == CEPH_MOUNT_SHUTDOWN) return; @@ -3532,10 +3565,15 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) want_flush = mdsc->cap_flush_seq; spin_unlock(&mdsc->cap_dirty_lock); - dout("sync want tid %lld flush_seq %lld\n", want_tid, want_flush); + down_read(&mdsc->snap_rwsem); + want_snap = mdsc->last_snap_seq; + up_read(&mdsc->snap_rwsem); + + dout("sync want tid %lld flush_seq %lld snap_seq %lld\n", + want_tid, want_flush, want_snap); wait_unsafe_requests(mdsc, want_tid); - wait_caps_flush(mdsc, want_flush); + wait_caps_flush(mdsc, want_flush, want_snap); } /* diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index d474141c034a..bf24d88cfeb2 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -290,6 +290,7 @@ struct ceph_mds_client { * references (implying they contain no inodes with caps) that * should be destroyed. */ + u64 last_snap_seq; struct rw_semaphore snap_rwsem; struct rb_root snap_realms; struct list_head snap_empty; diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c index ba708017d60b..233d906aec02 100644 --- a/fs/ceph/snap.c +++ b/fs/ceph/snap.c @@ -730,6 +730,8 @@ more: /* queue realm for cap_snap creation */ list_add(&realm->dirty_item, &dirty_realms); + if (realm->seq > mdsc->last_snap_seq) + mdsc->last_snap_seq = realm->seq; invalidate = 1; } else if (!realm->cached_context) { -- cgit v1.2.3-59-g8ed1b From 745a8e3bccbc6adae69a98ddc525e529aa44636e Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 14 May 2015 17:22:42 +0800 Subject: ceph: don't pre-allocate space for cap release messages Previously we pre-allocate cap release messages for each caps. This wastes lots of memory when there are large amount of caps. This patch make the code not pre-allocate the cap release messages. Instead, we add the corresponding ceph_cap struct to a list when releasing a cap. Later when flush cap releases is needed, we allocate the cap release messages dynamically. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 85 +++++++------------- fs/ceph/mds_client.c | 223 ++++++++++++++++++++------------------------------- fs/ceph/mds_client.h | 3 - fs/ceph/super.h | 20 +++-- 4 files changed, 129 insertions(+), 202 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index bbd969e16a01..245ca381a6dc 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -926,16 +926,6 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) /* remove from session list */ spin_lock(&session->s_cap_lock); - /* - * s_cap_reconnect is protected by s_cap_lock. no one changes - * s_cap_gen while session is in the reconnect state. - */ - if (queue_release && - (!session->s_cap_reconnect || - cap->cap_gen == session->s_cap_gen)) - __queue_cap_release(session, ci->i_vino.ino, cap->cap_id, - cap->mseq, cap->issue_seq); - if (session->s_cap_iterator == cap) { /* not yet, we are iterating over this very cap */ dout("__ceph_remove_cap delaying %p removal from session %p\n", @@ -948,6 +938,25 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) } /* protect backpointer with s_cap_lock: see iterate_session_caps */ cap->ci = NULL; + + /* + * s_cap_reconnect is protected by s_cap_lock. no one changes + * s_cap_gen while session is in the reconnect state. + */ + if (queue_release && + (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) { + cap->queue_release = 1; + if (removed) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + removed = 0; + } + } else { + cap->queue_release = 0; + } + cap->cap_ino = ci->i_vino.ino; + spin_unlock(&session->s_cap_lock); /* remove from inode list */ @@ -1053,44 +1062,6 @@ static int send_cap_msg(struct ceph_mds_session *session, return 0; } -void __queue_cap_release(struct ceph_mds_session *session, - u64 ino, u64 cap_id, u32 migrate_seq, - u32 issue_seq) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - struct ceph_mds_cap_item *item; - - BUG_ON(!session->s_num_cap_releases); - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - - dout(" adding %llx release to mds%d msg %p (%d left)\n", - ino, session->s_mds, msg, session->s_num_cap_releases); - - BUG_ON(msg->front.iov_len + sizeof(*item) > PAGE_CACHE_SIZE); - head = msg->front.iov_base; - le32_add_cpu(&head->num, 1); - item = msg->front.iov_base + msg->front.iov_len; - item->ino = cpu_to_le64(ino); - item->cap_id = cpu_to_le64(cap_id); - item->migrate_seq = cpu_to_le32(migrate_seq); - item->seq = cpu_to_le32(issue_seq); - - session->s_num_cap_releases--; - - msg->front.iov_len += sizeof(*item); - if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { - dout(" release msg %p full\n", msg); - list_move_tail(&msg->list_head, &session->s_cap_releases_done); - } else { - dout(" release msg %p at %d/%d (%d)\n", msg, - (int)le32_to_cpu(head->num), - (int)CEPH_CAPS_PER_RELEASE, - (int)msg->front.iov_len); - } -} - /* * Queue cap releases when an inode is dropped from our cache. Since * inode is about to be destroyed, there is no need for i_ceph_lock. @@ -3051,7 +3022,6 @@ retry: mutex_lock_nested(&session->s_mutex, SINGLE_DEPTH_NESTING); } - ceph_add_cap_releases(mdsc, tsession); new_cap = ceph_get_cap(mdsc, NULL); } else { WARN_ON(1); @@ -3247,16 +3217,20 @@ void ceph_handle_caps(struct ceph_mds_session *session, dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq, (unsigned)seq); - if (op == CEPH_CAP_OP_IMPORT) - ceph_add_cap_releases(mdsc, session); - if (!inode) { dout(" i don't have ino %llx\n", vino.ino); if (op == CEPH_CAP_OP_IMPORT) { + cap = ceph_get_cap(mdsc, NULL); + cap->cap_ino = vino.ino; + cap->queue_release = 1; + cap->cap_id = cap_id; + cap->mseq = mseq; + cap->seq = seq; spin_lock(&session->s_cap_lock); - __queue_cap_release(session, vino.ino, cap_id, - mseq, seq); + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; spin_unlock(&session->s_cap_lock); } goto flush_cap_releases; @@ -3332,11 +3306,10 @@ void ceph_handle_caps(struct ceph_mds_session *session, flush_cap_releases: /* - * send any full release message to try to move things + * send any cap release message to try to move things * along for the mds (who clearly thinks we still have this * cap). */ - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); done: diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 2bb9264b9225..76eb14489bfa 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -458,7 +458,6 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, s->s_cap_reconnect = 0; s->s_cap_iterator = NULL; INIT_LIST_HEAD(&s->s_cap_releases); - INIT_LIST_HEAD(&s->s_cap_releases_done); INIT_LIST_HEAD(&s->s_cap_flushing); INIT_LIST_HEAD(&s->s_cap_snaps_flushing); @@ -998,27 +997,25 @@ void ceph_mdsc_open_export_target_sessions(struct ceph_mds_client *mdsc, * session caps */ -/* - * Free preallocated cap messages assigned to this session - */ -static void cleanup_cap_releases(struct ceph_mds_session *session) +/* caller holds s_cap_lock, we drop it */ +static void cleanup_cap_releases(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) + __releases(session->s_cap_lock) { - struct ceph_msg *msg; + LIST_HEAD(tmp_list); + list_splice_init(&session->s_cap_releases, &tmp_list); + session->s_num_cap_releases = 0; + spin_unlock(&session->s_cap_lock); - spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); - } - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - ceph_msg_put(msg); + dout("cleanup_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + struct ceph_cap *cap; + /* zero out the in-progress message */ + cap = list_first_entry(&tmp_list, + struct ceph_cap, session_caps); + list_del(&cap->session_caps); + ceph_put_cap(mdsc, cap); } - spin_unlock(&session->s_cap_lock); } static void cleanup_session_requests(struct ceph_mds_client *mdsc, @@ -1095,10 +1092,16 @@ static int iterate_session_caps(struct ceph_mds_session *session, dout("iterate_session_caps finishing cap %p removal\n", cap); BUG_ON(cap->session != session); + cap->session = NULL; list_del_init(&cap->session_caps); session->s_nr_caps--; - cap->session = NULL; - old_cap = cap; /* put_cap it w/o locks held */ + if (cap->queue_release) { + list_add_tail(&cap->session_caps, + &session->s_cap_releases); + session->s_num_cap_releases++; + } else { + old_cap = cap; /* put_cap it w/o locks held */ + } } if (ret < 0) goto out; @@ -1191,11 +1194,12 @@ static void remove_session_caps(struct ceph_mds_session *session) spin_lock(&session->s_cap_lock); } } - spin_unlock(&session->s_cap_lock); + + // drop cap expires and unlock s_cap_lock + cleanup_cap_releases(session->s_mdsc, session); BUG_ON(session->s_nr_caps > 0); BUG_ON(!list_empty(&session->s_cap_flushing)); - cleanup_cap_releases(session); } /* @@ -1418,76 +1422,10 @@ static int trim_caps(struct ceph_mds_client *mdsc, session->s_trim_caps = 0; } - ceph_add_cap_releases(mdsc, session); ceph_send_cap_releases(mdsc, session); return 0; } -/* - * Allocate cap_release messages. If there is a partially full message - * in the queue, try to allocate enough to cover it's remainder, so that - * we can send it immediately. - * - * Called under s_mutex. - */ -int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg, *partial = NULL; - struct ceph_mds_cap_release *head; - int err = -ENOMEM; - int extra = mdsc->fsc->mount_options->cap_release_safety; - int num; - - dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds, - extra); - - spin_lock(&session->s_cap_lock); - - if (!list_empty(&session->s_cap_releases)) { - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, - list_head); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - if (num) { - dout(" partial %p with (%d/%d)\n", msg, num, - (int)CEPH_CAPS_PER_RELEASE); - extra += CEPH_CAPS_PER_RELEASE - num; - partial = msg; - } - } - while (session->s_num_cap_releases < session->s_nr_caps + extra) { - spin_unlock(&session->s_cap_lock); - msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE, - GFP_NOFS, false); - if (!msg) - goto out_unlocked; - dout("add_cap_releases %p msg %p now %d\n", session, msg, - (int)msg->front.iov_len); - head = msg->front.iov_base; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - spin_lock(&session->s_cap_lock); - list_add(&msg->list_head, &session->s_cap_releases); - session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE; - } - - if (partial) { - head = partial->front.iov_base; - num = le32_to_cpu(head->num); - dout(" queueing partial %p with %d/%d\n", partial, num, - (int)CEPH_CAPS_PER_RELEASE); - list_move_tail(&partial->list_head, - &session->s_cap_releases_done); - session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num; - } - err = 0; - spin_unlock(&session->s_cap_lock); -out_unlocked: - return err; -} - static int check_cap_flush(struct ceph_inode_info *ci, u64 want_flush_seq, u64 want_snap_seq) { @@ -1590,60 +1528,74 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { - struct ceph_msg *msg; + struct ceph_msg *msg = NULL; + struct ceph_mds_cap_release *head; + struct ceph_mds_cap_item *item; + struct ceph_cap *cap; + LIST_HEAD(tmp_list); + int num_cap_releases; - dout("send_cap_releases mds%d\n", session->s_mds); spin_lock(&session->s_cap_lock); - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); - spin_unlock(&session->s_cap_lock); - msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); - dout("send_cap_releases mds%d %p\n", session->s_mds, msg); - ceph_con_send(&session->s_con, msg); - spin_lock(&session->s_cap_lock); - } +again: + list_splice_init(&session->s_cap_releases, &tmp_list); + num_cap_releases = session->s_num_cap_releases; + session->s_num_cap_releases = 0; spin_unlock(&session->s_cap_lock); -} - -static void discard_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session) -{ - struct ceph_msg *msg; - struct ceph_mds_cap_release *head; - unsigned num; - dout("discard_cap_releases mds%d\n", session->s_mds); + while (!list_empty(&tmp_list)) { + if (!msg) { + msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, + PAGE_CACHE_SIZE, GFP_NOFS, false); + if (!msg) + goto out_err; + head = msg->front.iov_base; + head->num = cpu_to_le32(0); + msg->front.iov_len = sizeof(*head); + } + cap = list_first_entry(&tmp_list, struct ceph_cap, + session_caps); + list_del(&cap->session_caps); + num_cap_releases--; - if (!list_empty(&session->s_cap_releases)) { - /* zero out the in-progress message */ - msg = list_first_entry(&session->s_cap_releases, - struct ceph_msg, list_head); head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", - session->s_mds, msg, num); - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - session->s_num_cap_releases += num; + le32_add_cpu(&head->num, 1); + item = msg->front.iov_base + msg->front.iov_len; + item->ino = cpu_to_le64(cap->cap_ino); + item->cap_id = cpu_to_le64(cap->cap_id); + item->migrate_seq = cpu_to_le32(cap->mseq); + item->seq = cpu_to_le32(cap->issue_seq); + msg->front.iov_len += sizeof(*item); + + ceph_put_cap(mdsc, cap); + + if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); + msg = NULL; + } } - /* requeue completed messages */ - while (!list_empty(&session->s_cap_releases_done)) { - msg = list_first_entry(&session->s_cap_releases_done, - struct ceph_msg, list_head); - list_del_init(&msg->list_head); + BUG_ON(num_cap_releases != 0); - head = msg->front.iov_base; - num = le32_to_cpu(head->num); - dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, - num); - session->s_num_cap_releases += num; - head->num = cpu_to_le32(0); - msg->front.iov_len = sizeof(*head); - list_add(&msg->list_head, &session->s_cap_releases); + spin_lock(&session->s_cap_lock); + if (!list_empty(&session->s_cap_releases)) + goto again; + spin_unlock(&session->s_cap_lock); + + if (msg) { + msg->hdr.front_len = cpu_to_le32(msg->front.iov_len); + dout("send_cap_releases mds%d %p\n", session->s_mds, msg); + ceph_con_send(&session->s_con, msg); } + return; +out_err: + pr_err("send_cap_releases mds%d, failed to allocate message\n", + session->s_mds); + spin_lock(&session->s_cap_lock); + list_splice(&tmp_list, &session->s_cap_releases); + session->s_num_cap_releases += num_cap_releases; + spin_unlock(&session->s_cap_lock); } /* @@ -2529,7 +2481,6 @@ out_err: } mutex_unlock(&mdsc->mutex); - ceph_add_cap_releases(mdsc, req->r_session); mutex_unlock(&session->s_mutex); /* kick calling process */ @@ -2921,8 +2872,7 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, */ session->s_cap_reconnect = 1; /* drop old cap expires; we're about to reestablish that state */ - discard_cap_releases(mdsc, session); - spin_unlock(&session->s_cap_lock); + cleanup_cap_releases(mdsc, session); /* trim unused caps to reduce MDS's cache rejoin time */ if (mdsc->fsc->sb->s_root) @@ -3385,7 +3335,6 @@ static void delayed_work(struct work_struct *work) send_renew_caps(mdsc, s); else ceph_con_keepalive(&s->s_con); - ceph_add_cap_releases(mdsc, s); if (s->s_state == CEPH_MDS_SESSION_OPEN || s->s_state == CEPH_MDS_SESSION_HUNG) ceph_send_cap_releases(mdsc, s); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index bf24d88cfeb2..294fa23a7df6 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -139,7 +139,6 @@ struct ceph_mds_session { int s_cap_reconnect; int s_readonly; struct list_head s_cap_releases; /* waiting cap_release messages */ - struct list_head s_cap_releases_done; /* ready to send */ struct ceph_cap *s_cap_iterator; /* protected by mutex */ @@ -389,8 +388,6 @@ static inline void ceph_mdsc_put_request(struct ceph_mds_request *req) kref_put(&req->r_kref, ceph_mdsc_release_request); } -extern int ceph_add_cap_releases(struct ceph_mds_client *mdsc, - struct ceph_mds_session *session); extern void ceph_send_cap_releases(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 4ef1ae92c2a6..c4961353d058 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -122,11 +122,21 @@ struct ceph_cap { struct rb_node ci_node; /* per-ci cap tree */ struct ceph_mds_session *session; struct list_head session_caps; /* per-session caplist */ - int mds; u64 cap_id; /* unique cap id (mds provided) */ - int issued; /* latest, from the mds */ - int implemented; /* implemented superset of issued (for revocation) */ - int mds_wanted; + union { + /* in-use caps */ + struct { + int issued; /* latest, from the mds */ + int implemented; /* implemented superset of + issued (for revocation) */ + int mds, mds_wanted; + }; + /* caps to release */ + struct { + u64 cap_ino; + int queue_release; + }; + }; u32 seq, issue_seq, mseq; u32 cap_gen; /* active/stale cycle */ unsigned long last_used; @@ -845,8 +855,6 @@ extern void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap); extern int ceph_is_any_caps(struct inode *inode); -extern void __queue_cap_release(struct ceph_mds_session *session, u64 ino, - u64 cap_id, u32 migrate_seq, u32 issue_seq); extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_fsync(struct file *file, loff_t start, loff_t end, -- cgit v1.2.3-59-g8ed1b From e8a7b8b12b13831467c6158c1e82801e25b5dd98 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 19 May 2015 18:54:40 +0800 Subject: ceph: exclude setfilelock requests when calculating oldest tid setfilelock requests can block for a long time, which can prevent client from advancing its oldest tid. Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 30 +++++++++++++++++++++++------- fs/ceph/mds_client.h | 2 ++ 2 files changed, 25 insertions(+), 7 deletions(-) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 76eb14489bfa..69a36f40517f 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -628,6 +628,9 @@ static void __register_request(struct ceph_mds_client *mdsc, req->r_uid = current_fsuid(); req->r_gid = current_fsgid(); + if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK) + mdsc->oldest_tid = req->r_tid; + if (dir) { struct ceph_inode_info *ci = ceph_inode(dir); @@ -643,6 +646,21 @@ static void __unregister_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req) { dout("__unregister_request %p tid %lld\n", req, req->r_tid); + + if (req->r_tid == mdsc->oldest_tid) { + struct rb_node *p = rb_next(&req->r_node); + mdsc->oldest_tid = 0; + while (p) { + struct ceph_mds_request *next_req = + rb_entry(p, struct ceph_mds_request, r_node); + if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) { + mdsc->oldest_tid = next_req->r_tid; + break; + } + p = rb_next(p); + } + } + rb_erase(&req->r_node, &mdsc->request_tree); RB_CLEAR_NODE(&req->r_node); @@ -1682,13 +1700,9 @@ static struct ceph_mds_request *__get_oldest_req(struct ceph_mds_client *mdsc) struct ceph_mds_request, r_node); } -static u64 __get_oldest_tid(struct ceph_mds_client *mdsc) +static inline u64 __get_oldest_tid(struct ceph_mds_client *mdsc) { - struct ceph_mds_request *req = __get_oldest_req(mdsc); - - if (req) - return req->r_tid; - return 0; + return mdsc->oldest_tid; } /* @@ -3378,6 +3392,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_LIST_HEAD(&mdsc->snap_empty); spin_lock_init(&mdsc->snap_empty_lock); mdsc->last_tid = 0; + mdsc->oldest_tid = 0; mdsc->request_tree = RB_ROOT; INIT_DELAYED_WORK(&mdsc->delayed_work, delayed_work); mdsc->last_renew_caps = jiffies; @@ -3471,7 +3486,8 @@ restart: nextreq = rb_entry(n, struct ceph_mds_request, r_node); else nextreq = NULL; - if ((req->r_op & CEPH_MDS_OP_WRITE)) { + if (req->r_op != CEPH_MDS_OP_SETFILELOCK && + (req->r_op & CEPH_MDS_OP_WRITE)) { /* write op */ ceph_mdsc_get_request(req); if (nextreq) diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 294fa23a7df6..2ef799961ebb 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -296,6 +296,8 @@ struct ceph_mds_client { spinlock_t snap_empty_lock; /* protect snap_empty */ u64 last_tid; /* most recent mds request */ + u64 oldest_tid; /* oldest incomplete mds request, + excluding setfilelock requests */ struct rb_root request_tree; /* pending mds requests */ struct delayed_work delayed_work; /* delayed work */ unsigned long last_renew_caps; /* last time we renewed our caps */ -- cgit v1.2.3-59-g8ed1b From a319bf56a617354e62cf5f774d2ca4e1a8a3bff3 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Fri, 15 May 2015 12:02:17 +0300 Subject: libceph: store timeouts in jiffies, verify user input There are currently three libceph-level timeouts that the user can specify on mount: mount_timeout, osd_idle_ttl and osdkeepalive. All of these are in seconds and no checking is done on user input: negative values are accepted, we multiply them all by HZ which may or may not overflow, arbitrarily large jiffies then get added together, etc. There is also a bug in the way mount_timeout=0 is handled. It's supposed to mean "infinite timeout", but that's not how wait.h APIs treat it and so __ceph_open_session() for example will busy loop without much chance of being interrupted if none of ceph-mons are there. Fix all this by verifying user input, storing timeouts capped by msecs_to_jiffies() in jiffies and using the new ceph_timeout_jiffies() helper for all user-specified waits to handle infinite timeouts correctly. Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder --- drivers/block/rbd.c | 5 +++-- fs/ceph/dir.c | 4 ++-- fs/ceph/mds_client.c | 12 ++++++------ fs/ceph/mds_client.h | 2 +- fs/ceph/super.c | 2 +- include/linux/ceph/libceph.h | 17 +++++++++++------ net/ceph/ceph_common.c | 41 +++++++++++++++++++++++++++++++---------- net/ceph/mon_client.c | 11 +++++++++-- net/ceph/osd_client.c | 15 +++++++-------- 9 files changed, 71 insertions(+), 38 deletions(-) (limited to 'fs') diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c index 349115ae3bc2..992683b6b299 100644 --- a/drivers/block/rbd.c +++ b/drivers/block/rbd.c @@ -4963,8 +4963,8 @@ out_err: */ static int rbd_add_get_pool_id(struct rbd_client *rbdc, const char *pool_name) { + struct ceph_options *opts = rbdc->client->options; u64 newest_epoch; - unsigned long timeout = rbdc->client->options->mount_timeout * HZ; int tries = 0; int ret; @@ -4979,7 +4979,8 @@ again: if (rbdc->client->osdc.osdmap->epoch < newest_epoch) { ceph_monc_request_next_osdmap(&rbdc->client->monc); (void) ceph_monc_wait_osdmap(&rbdc->client->monc, - newest_epoch, timeout); + newest_epoch, + opts->mount_timeout); goto again; } else { /* the osdmap we have is new enough */ diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 4248307fea90..173dd4b58c71 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1259,8 +1259,8 @@ static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end, inode, req->r_tid, last_tid); if (req->r_timeout) { unsigned long time_left = wait_for_completion_timeout( - &req->r_safe_completion, - req->r_timeout); + &req->r_safe_completion, + ceph_timeout_jiffies(req->r_timeout)); if (time_left > 0) ret = 0; else diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 69a36f40517f..0b0e0a9a81c0 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2268,7 +2268,8 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, dout("do_request waiting\n"); if (req->r_timeout) { err = (long)wait_for_completion_killable_timeout( - &req->r_completion, req->r_timeout); + &req->r_completion, + ceph_timeout_jiffies(req->r_timeout)); if (err == 0) err = -EIO; } else if (req->r_wait_for_completion) { @@ -3424,8 +3425,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) */ static void wait_requests(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_request *req; - struct ceph_fs_client *fsc = mdsc->fsc; mutex_lock(&mdsc->mutex); if (__get_oldest_req(mdsc)) { @@ -3433,7 +3434,7 @@ static void wait_requests(struct ceph_mds_client *mdsc) dout("wait_requests waiting for requests\n"); wait_for_completion_timeout(&mdsc->safe_umount_waiters, - fsc->client->options->mount_timeout * HZ); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining requests */ mutex_lock(&mdsc->mutex); @@ -3556,10 +3557,9 @@ static bool done_closing_sessions(struct ceph_mds_client *mdsc) */ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) { + struct ceph_options *opts = mdsc->fsc->client->options; struct ceph_mds_session *session; int i; - struct ceph_fs_client *fsc = mdsc->fsc; - unsigned long timeout = fsc->client->options->mount_timeout * HZ; dout("close_sessions\n"); @@ -3580,7 +3580,7 @@ void ceph_mdsc_close_sessions(struct ceph_mds_client *mdsc) dout("waiting for sessions to close\n"); wait_event_timeout(mdsc->session_close_wq, done_closing_sessions(mdsc), - timeout); + ceph_timeout_jiffies(opts->mount_timeout)); /* tear down remaining sessions */ mutex_lock(&mdsc->mutex); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 2ef799961ebb..509d6822e9b1 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -227,7 +227,7 @@ struct ceph_mds_request { int r_err; bool r_aborted; - unsigned long r_timeout; /* optional. jiffies */ + unsigned long r_timeout; /* optional. jiffies, 0 is "wait forever" */ unsigned long r_started; /* start time to measure timeout against */ unsigned long r_request_started; /* start time for mds request only, used to measure lease durations */ diff --git a/fs/ceph/super.c b/fs/ceph/super.c index 9a5350030af8..edeb83c43112 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -742,7 +742,7 @@ static struct dentry *open_root_dentry(struct ceph_fs_client *fsc, req->r_ino1.ino = CEPH_INO_ROOT; req->r_ino1.snap = CEPH_NOSNAP; req->r_started = started; - req->r_timeout = fsc->client->options->mount_timeout * HZ; + req->r_timeout = fsc->client->options->mount_timeout; req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE); req->r_num_caps = 2; err = ceph_mdsc_do_request(mdsc, NULL, req); diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index 85ae9a889a3f..d73a569f9bf5 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -43,9 +43,9 @@ struct ceph_options { int flags; struct ceph_fsid fsid; struct ceph_entity_addr my_addr; - int mount_timeout; - int osd_idle_ttl; - int osd_keepalive_timeout; + unsigned long mount_timeout; /* jiffies */ + unsigned long osd_idle_ttl; /* jiffies */ + unsigned long osd_keepalive_timeout; /* jiffies */ /* * any type that can't be simply compared or doesn't need need @@ -63,9 +63,9 @@ struct ceph_options { /* * defaults */ -#define CEPH_MOUNT_TIMEOUT_DEFAULT 60 -#define CEPH_OSD_KEEPALIVE_DEFAULT 5 -#define CEPH_OSD_IDLE_TTL_DEFAULT 60 +#define CEPH_MOUNT_TIMEOUT_DEFAULT msecs_to_jiffies(60 * 1000) +#define CEPH_OSD_KEEPALIVE_DEFAULT msecs_to_jiffies(5 * 1000) +#define CEPH_OSD_IDLE_TTL_DEFAULT msecs_to_jiffies(60 * 1000) #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024) #define CEPH_MSG_MAX_MIDDLE_LEN (16*1024*1024) @@ -93,6 +93,11 @@ enum { CEPH_MOUNT_SHUTDOWN, }; +static inline unsigned long ceph_timeout_jiffies(unsigned long timeout) +{ + return timeout ?: MAX_SCHEDULE_TIMEOUT; +} + struct ceph_mds_client; /* diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c index 79e8f71aef5b..a80e91c2c9a3 100644 --- a/net/ceph/ceph_common.c +++ b/net/ceph/ceph_common.c @@ -352,8 +352,8 @@ ceph_parse_options(char *options, const char *dev_name, /* start with defaults */ opt->flags = CEPH_OPT_DEFAULT; opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT; - opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */ - opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */ + opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; + opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* get mon ip(s) */ /* ip1[:port1][,ip2[:port2]...] */ @@ -439,13 +439,32 @@ ceph_parse_options(char *options, const char *dev_name, pr_warn("ignoring deprecated osdtimeout option\n"); break; case Opt_osdkeepalivetimeout: - opt->osd_keepalive_timeout = intval; + /* 0 isn't well defined right now, reject it */ + if (intval < 1 || intval > INT_MAX / 1000) { + pr_err("osdkeepalive out of range\n"); + err = -EINVAL; + goto out; + } + opt->osd_keepalive_timeout = + msecs_to_jiffies(intval * 1000); break; case Opt_osd_idle_ttl: - opt->osd_idle_ttl = intval; + /* 0 isn't well defined right now, reject it */ + if (intval < 1 || intval > INT_MAX / 1000) { + pr_err("osd_idle_ttl out of range\n"); + err = -EINVAL; + goto out; + } + opt->osd_idle_ttl = msecs_to_jiffies(intval * 1000); break; case Opt_mount_timeout: - opt->mount_timeout = intval; + /* 0 is "wait forever" (i.e. infinite timeout) */ + if (intval < 0 || intval > INT_MAX / 1000) { + pr_err("mount_timeout out of range\n"); + err = -EINVAL; + goto out; + } + opt->mount_timeout = msecs_to_jiffies(intval * 1000); break; case Opt_share: @@ -512,12 +531,14 @@ int ceph_print_client_options(struct seq_file *m, struct ceph_client *client) seq_puts(m, "notcp_nodelay,"); if (opt->mount_timeout != CEPH_MOUNT_TIMEOUT_DEFAULT) - seq_printf(m, "mount_timeout=%d,", opt->mount_timeout); + seq_printf(m, "mount_timeout=%d,", + jiffies_to_msecs(opt->mount_timeout) / 1000); if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT) - seq_printf(m, "osd_idle_ttl=%d,", opt->osd_idle_ttl); + seq_printf(m, "osd_idle_ttl=%d,", + jiffies_to_msecs(opt->osd_idle_ttl) / 1000); if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT) seq_printf(m, "osdkeepalivetimeout=%d,", - opt->osd_keepalive_timeout); + jiffies_to_msecs(opt->osd_keepalive_timeout) / 1000); /* drop redundant comma */ if (m->count != pos) @@ -627,7 +648,7 @@ static int have_mon_and_osd_map(struct ceph_client *client) int __ceph_open_session(struct ceph_client *client, unsigned long started) { int err; - unsigned long timeout = client->options->mount_timeout * HZ; + unsigned long timeout = client->options->mount_timeout; /* open session, and wait for mon and osd maps */ err = ceph_monc_open_session(&client->monc); @@ -643,7 +664,7 @@ int __ceph_open_session(struct ceph_client *client, unsigned long started) dout("mount waiting for mon_map\n"); err = wait_event_interruptible_timeout(client->auth_wq, have_mon_and_osd_map(client) || (client->auth_err < 0), - timeout); + ceph_timeout_jiffies(timeout)); if (err == -EINTR || err == -ERESTARTSYS) return err; if (client->auth_err < 0) diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c index 2b3cf05e87b0..0da3bdc116f7 100644 --- a/net/ceph/mon_client.c +++ b/net/ceph/mon_client.c @@ -298,6 +298,12 @@ void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc) } EXPORT_SYMBOL(ceph_monc_request_next_osdmap); +/* + * Wait for an osdmap with a given epoch. + * + * @epoch: epoch to wait for + * @timeout: in jiffies, 0 means "wait forever" + */ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, unsigned long timeout) { @@ -308,11 +314,12 @@ int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch, while (monc->have_osdmap < epoch) { mutex_unlock(&monc->mutex); - if (timeout != 0 && time_after_eq(jiffies, started + timeout)) + if (timeout && time_after_eq(jiffies, started + timeout)) return -ETIMEDOUT; ret = wait_event_interruptible_timeout(monc->client->auth_wq, - monc->have_osdmap >= epoch, timeout); + monc->have_osdmap >= epoch, + ceph_timeout_jiffies(timeout)); if (ret < 0) return ret; diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c index 4cb4fab46e4f..50033677c0fa 100644 --- a/net/ceph/osd_client.c +++ b/net/ceph/osd_client.c @@ -1097,7 +1097,7 @@ static void __move_osd_to_lru(struct ceph_osd_client *osdc, BUG_ON(!list_empty(&osd->o_osd_lru)); list_add_tail(&osd->o_osd_lru, &osdc->osd_lru); - osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ; + osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl; } static void maybe_move_osd_to_lru(struct ceph_osd_client *osdc, @@ -1208,7 +1208,7 @@ static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o) static void __schedule_osd_timeout(struct ceph_osd_client *osdc) { schedule_delayed_work(&osdc->timeout_work, - osdc->client->options->osd_keepalive_timeout * HZ); + osdc->client->options->osd_keepalive_timeout); } static void __cancel_osd_timeout(struct ceph_osd_client *osdc) @@ -1576,10 +1576,9 @@ static void handle_timeout(struct work_struct *work) { struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, timeout_work.work); + struct ceph_options *opts = osdc->client->options; struct ceph_osd_request *req; struct ceph_osd *osd; - unsigned long keepalive = - osdc->client->options->osd_keepalive_timeout * HZ; struct list_head slow_osds; dout("timeout\n"); down_read(&osdc->map_sem); @@ -1595,7 +1594,8 @@ static void handle_timeout(struct work_struct *work) */ INIT_LIST_HEAD(&slow_osds); list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) { - if (time_before(jiffies, req->r_stamp + keepalive)) + if (time_before(jiffies, + req->r_stamp + opts->osd_keepalive_timeout)) break; osd = req->r_osd; @@ -1622,8 +1622,7 @@ static void handle_osds_timeout(struct work_struct *work) struct ceph_osd_client *osdc = container_of(work, struct ceph_osd_client, osds_timeout_work.work); - unsigned long delay = - osdc->client->options->osd_idle_ttl * HZ >> 2; + unsigned long delay = osdc->client->options->osd_idle_ttl / 4; dout("osds timeout\n"); down_read(&osdc->map_sem); @@ -2628,7 +2627,7 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client) osdc->event_count = 0; schedule_delayed_work(&osdc->osds_timeout_work, - round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ)); + round_jiffies_relative(osdc->client->options->osd_idle_ttl)); err = -ENOMEM; osdc->req_mempool = mempool_create_kmalloc_pool(10, -- cgit v1.2.3-59-g8ed1b From 5be73034771c8f18b241f1974803865a4de2cad1 Mon Sep 17 00:00:00 2001 From: Ilya Dryomov Date: Tue, 19 May 2015 12:05:38 +0300 Subject: ceph: simplify two mount_timeout sites No need to bifurcate wait now that we've got ceph_timeout_jiffies(). Signed-off-by: Ilya Dryomov Reviewed-by: Alex Elder Reviewed-by: Yan, Zheng --- fs/ceph/dir.c | 14 ++++---------- fs/ceph/mds_client.c | 18 ++++++++++-------- 2 files changed, 14 insertions(+), 18 deletions(-) (limited to 'fs') diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 173dd4b58c71..3dec27e36417 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1257,17 +1257,11 @@ static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end, dout("dir_fsync %p wait on tid %llu (until %llu)\n", inode, req->r_tid, last_tid); - if (req->r_timeout) { - unsigned long time_left = wait_for_completion_timeout( - &req->r_safe_completion, + ret = !wait_for_completion_timeout(&req->r_safe_completion, ceph_timeout_jiffies(req->r_timeout)); - if (time_left > 0) - ret = 0; - else - ret = -EIO; /* timed out */ - } else { - wait_for_completion(&req->r_safe_completion); - } + if (ret) + ret = -EIO; /* timed out */ + ceph_mdsc_put_request(req); spin_lock(&ci->i_unsafe_lock); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 0b0e0a9a81c0..5be2d287a26c 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2266,16 +2266,18 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc, /* wait */ mutex_unlock(&mdsc->mutex); dout("do_request waiting\n"); - if (req->r_timeout) { - err = (long)wait_for_completion_killable_timeout( - &req->r_completion, - ceph_timeout_jiffies(req->r_timeout)); - if (err == 0) - err = -EIO; - } else if (req->r_wait_for_completion) { + if (!req->r_timeout && req->r_wait_for_completion) { err = req->r_wait_for_completion(mdsc, req); } else { - err = wait_for_completion_killable(&req->r_completion); + long timeleft = wait_for_completion_killable_timeout( + &req->r_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (timeleft > 0) + err = 0; + else if (!timeleft) + err = -EIO; /* timed out */ + else + err = timeleft; /* killed */ } dout("do_request waited, got %d\n", err); mutex_lock(&mdsc->mutex); -- cgit v1.2.3-59-g8ed1b From 3e0708b990f7e46d87d47b3b06de322490f2f2ee Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Fri, 22 May 2015 16:38:02 +0800 Subject: ceph: ratelimit warn messages for MDS closes session Signed-off-by: Yan, Zheng --- fs/ceph/mds_client.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 5be2d287a26c..8080d486a991 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -8,6 +8,7 @@ #include #include #include +#include #include "super.h" #include "mds_client.h" @@ -1048,7 +1049,8 @@ static void cleanup_session_requests(struct ceph_mds_client *mdsc, req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item); list_del_init(&req->r_unsafe_item); - pr_info(" dropping unsafe request %llu\n", req->r_tid); + pr_warn_ratelimited(" dropping unsafe request %llu\n", + req->r_tid); __unregister_request(mdsc, req); } /* zero r_attempts, so kick_requests() will re-send requests */ @@ -1152,7 +1154,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&mdsc->cap_dirty_lock); if (!list_empty(&ci->i_dirty_item)) { - pr_info(" dropping dirty %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty %s state for %p %lld\n", ceph_cap_string(ci->i_dirty_caps), inode, ceph_ino(inode)); ci->i_dirty_caps = 0; @@ -1160,7 +1163,8 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } if (!list_empty(&ci->i_flushing_item)) { - pr_info(" dropping dirty+flushing %s state for %p %lld\n", + pr_warn_ratelimited( + " dropping dirty+flushing %s state for %p %lld\n", ceph_cap_string(ci->i_flushing_caps), inode, ceph_ino(inode)); ci->i_flushing_caps = 0; -- cgit v1.2.3-59-g8ed1b From 41445999aeec1f0fdf196ab55b2c770473b2ea01 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Mon, 25 May 2015 17:36:42 +0800 Subject: ceph: don't include used caps in cap_wanted when copying files to cephfs, file data may stay in page cache after corresponding file is closed. Cached data use Fc capability. If we include Fc capability in cap_wanted, MDS will treat files with cached data as open files, and journal them in an EOpen event when trimming log segment. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 245ca381a6dc..a80a899e5c41 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1525,13 +1525,13 @@ retry: retry_locked: file_wanted = __ceph_caps_file_wanted(ci); used = __ceph_caps_used(ci); - want = file_wanted | used; issued = __ceph_caps_issued(ci, &implemented); revoking = implemented & ~issued; - retain = want | CEPH_CAP_PIN; + want = file_wanted; + retain = file_wanted | used | CEPH_CAP_PIN; if (!mdsc->stopping && inode->i_nlink > 0) { - if (want) { + if (file_wanted) { retain |= CEPH_CAP_ANY; /* be greedy */ } else if (S_ISDIR(inode->i_mode) && (issued & CEPH_CAP_FILE_SHARED) && -- cgit v1.2.3-59-g8ed1b From 89b52fe14de4d703ba837a7418bb4cd286dcc87f Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 27 May 2015 09:59:48 +0800 Subject: ceph: fix flushing caps Current ceph_fsync() only flushes dirty caps and wait for them to be flushed. It doesn't wait for caps that has already been flushing. This patch makes ceph_fsync() wait for pending flushing caps too. Besides, this patch also makes caps_are_flushed() peroperly handle tid wrapping. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index a80a899e5c41..e9b03b51b874 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1097,8 +1097,7 @@ void ceph_queue_caps_release(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, int used, int want, int retain, int flushing, - unsigned *pflush_tid) + int op, int used, int want, int retain, int flushing) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; @@ -1170,8 +1169,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, * first ack clean Ax. */ flush_tid = ++ci->i_cap_flush_last_tid; - if (pflush_tid) - *pflush_tid = flush_tid; dout(" cap_flush_tid %d\n", (int)flush_tid); for (i = 0; i < CEPH_CAP_BITS; i++) if (flushing & (1 << i)) @@ -1724,7 +1721,7 @@ ack: /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, NULL); + want, retain, flushing); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1753,12 +1750,12 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, unsigned *flush_tid) +static int try_flush_caps(struct inode *inode, u16 flush_tid[]) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - int flushing = 0; struct ceph_mds_session *session = NULL; + int flushing = 0; retry: spin_lock(&ci->i_ceph_lock); @@ -1787,17 +1784,19 @@ retry: /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - cap->issued | cap->implemented, flushing, - flush_tid); - if (!delayed) - goto out_unlocked; + cap->issued | cap->implemented, flushing); spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); + if (delayed) + __cap_delay_requeue(mdsc, ci); } + + flushing = ci->i_flushing_caps; + if (flushing) + memcpy(flush_tid, ci->i_cap_flush_tid, + sizeof(ci->i_cap_flush_tid)); out: spin_unlock(&ci->i_ceph_lock); -out_unlocked: if (session) mutex_unlock(&session->s_mutex); return flushing; @@ -1806,19 +1805,22 @@ out_unlocked: /* * Return true if we've flushed caps through the given flush_tid. */ -static int caps_are_flushed(struct inode *inode, unsigned tid) +static int caps_are_flushed(struct inode *inode, u16 flush_tid[]) { struct ceph_inode_info *ci = ceph_inode(inode); int i, ret = 1; spin_lock(&ci->i_ceph_lock); - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((ci->i_flushing_caps & (1 << i)) && - ci->i_cap_flush_tid[i] <= tid) { + for (i = 0; i < CEPH_CAP_BITS; i++) { + if (!(ci->i_flushing_caps & (1 << i))) + continue; + // tid only has 16 bits. we need to handle wrapping + if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) { /* still flushing this bit */ ret = 0; break; } + } spin_unlock(&ci->i_ceph_lock); return ret; } @@ -1871,7 +1873,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); - unsigned flush_tid; + u16 flush_tid[CEPH_CAP_BITS]; int ret; int dirty; @@ -1883,7 +1885,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) return ret; mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, &flush_tid); + dirty = try_flush_caps(inode, flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); /* @@ -1892,7 +1894,6 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) * wait for that) */ if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { - dout("fsync waiting for flush_tid %u\n", flush_tid); ret = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); } @@ -1911,14 +1912,14 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) { struct ceph_inode_info *ci = ceph_inode(inode); - unsigned flush_tid; + u16 flush_tid[CEPH_CAP_BITS]; int err = 0; int dirty; int wait = wbc->sync_mode == WB_SYNC_ALL; dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, &flush_tid); + dirty = try_flush_caps(inode, flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); @@ -1988,7 +1989,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, __ceph_caps_used(ci), __ceph_caps_wanted(ci), cap->issued | cap->implemented, - ci->i_flushing_caps, NULL); + ci->i_flushing_caps); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2027,7 +2028,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, __ceph_caps_used(ci), __ceph_caps_wanted(ci), cap->issued | cap->implemented, - ci->i_flushing_caps, NULL); + ci->i_flushing_caps); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); -- cgit v1.2.3-59-g8ed1b From da819c8150c5b6e6a6a21ee41135b88f6cd18c3e Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 27 May 2015 11:19:34 +0800 Subject: ceph: fix directory fsync fsync() on directory should flush dirty caps and wait for any uncommitted directory opertions to commit. But ceph_dir_fsync() only waits for uncommitted directory opertions. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++-------- fs/ceph/dir.c | 56 +------------------------------------------- 2 files changed, 65 insertions(+), 64 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index e9b03b51b874..dc988337f841 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1838,13 +1838,16 @@ static void sync_write_wait(struct inode *inode) struct ceph_osd_request *req; u64 last_tid; + if (!S_ISREG(inode->i_mode)) + return; + spin_lock(&ci->i_unsafe_lock); if (list_empty(head)) goto out; /* set upper bound as _last_ entry in chain */ - req = list_entry(head->prev, struct ceph_osd_request, - r_unsafe_item); + req = list_last_entry(head, struct ceph_osd_request, + r_unsafe_item); last_tid = req->r_tid; do { @@ -1862,13 +1865,59 @@ static void sync_write_wait(struct inode *inode) */ if (list_empty(head)) break; - req = list_entry(head->next, struct ceph_osd_request, - r_unsafe_item); + req = list_first_entry(head, struct ceph_osd_request, + r_unsafe_item); } while (req->r_tid < last_tid); out: spin_unlock(&ci->i_unsafe_lock); } +/* + * wait for any uncommitted directory operations to commit. + */ +static int unsafe_dirop_wait(struct inode *inode) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + struct list_head *head = &ci->i_unsafe_dirops; + struct ceph_mds_request *req; + u64 last_tid; + int ret = 0; + + if (!S_ISDIR(inode->i_mode)) + return 0; + + spin_lock(&ci->i_unsafe_lock); + if (list_empty(head)) + goto out; + + req = list_last_entry(head, struct ceph_mds_request, + r_unsafe_dir_item); + last_tid = req->r_tid; + + do { + ceph_mdsc_get_request(req); + spin_unlock(&ci->i_unsafe_lock); + + dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n", + inode, req->r_tid, last_tid); + ret = !wait_for_completion_timeout(&req->r_safe_completion, + ceph_timeout_jiffies(req->r_timeout)); + if (ret) + ret = -EIO; /* timed out */ + + ceph_mdsc_put_request(req); + + spin_lock(&ci->i_unsafe_lock); + if (ret || list_empty(head)) + break; + req = list_first_entry(head, struct ceph_mds_request, + r_unsafe_dir_item); + } while (req->r_tid < last_tid); +out: + spin_unlock(&ci->i_unsafe_lock); + return ret; +} + int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; @@ -1882,24 +1931,30 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) ret = filemap_write_and_wait_range(inode->i_mapping, start, end); if (ret < 0) - return ret; + goto out; + + if (datasync) + goto out; + mutex_lock(&inode->i_mutex); dirty = try_flush_caps(inode, flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); + ret = unsafe_dirop_wait(inode); + /* * only wait on non-file metadata writeback (the mds * can recover size and mtime, so we don't need to * wait for that) */ - if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { + if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { ret = wait_event_interruptible(ci->i_cap_wq, - caps_are_flushed(inode, flush_tid)); + caps_are_flushed(inode, flush_tid)); } - - dout("fsync %p%s done\n", inode, datasync ? " datasync" : ""); mutex_unlock(&inode->i_mutex); +out: + dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret); return ret; } diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 3dec27e36417..424e23138c59 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -1223,60 +1223,6 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, return size - left; } -/* - * an fsync() on a dir will wait for any uncommitted directory - * operations to commit. - */ -static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end, - int datasync) -{ - struct inode *inode = file_inode(file); - struct ceph_inode_info *ci = ceph_inode(inode); - struct list_head *head = &ci->i_unsafe_dirops; - struct ceph_mds_request *req; - u64 last_tid; - int ret = 0; - - dout("dir_fsync %p\n", inode); - ret = filemap_write_and_wait_range(inode->i_mapping, start, end); - if (ret) - return ret; - mutex_lock(&inode->i_mutex); - - spin_lock(&ci->i_unsafe_lock); - if (list_empty(head)) - goto out; - - req = list_entry(head->prev, - struct ceph_mds_request, r_unsafe_dir_item); - last_tid = req->r_tid; - - do { - ceph_mdsc_get_request(req); - spin_unlock(&ci->i_unsafe_lock); - - dout("dir_fsync %p wait on tid %llu (until %llu)\n", - inode, req->r_tid, last_tid); - ret = !wait_for_completion_timeout(&req->r_safe_completion, - ceph_timeout_jiffies(req->r_timeout)); - if (ret) - ret = -EIO; /* timed out */ - - ceph_mdsc_put_request(req); - - spin_lock(&ci->i_unsafe_lock); - if (ret || list_empty(head)) - break; - req = list_entry(head->next, - struct ceph_mds_request, r_unsafe_dir_item); - } while (req->r_tid < last_tid); -out: - spin_unlock(&ci->i_unsafe_lock); - mutex_unlock(&inode->i_mutex); - - return ret; -} - /* * We maintain a private dentry LRU. * @@ -1347,7 +1293,7 @@ const struct file_operations ceph_dir_fops = { .open = ceph_open, .release = ceph_release, .unlocked_ioctl = ceph_ioctl, - .fsync = ceph_dir_fsync, + .fsync = ceph_fsync, }; const struct file_operations ceph_snapdir_fops = { -- cgit v1.2.3-59-g8ed1b From 553adfd941f8ca622965ef809553d918ea039929 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 9 Jun 2015 15:48:57 +0800 Subject: ceph: track pending caps flushing accurately Previously we do not trace accurate TID for flushing caps. when MDS failovers, we have no choice but to re-send all flushing caps with a new TID. This can cause problem because MDS can has already flushed some caps and has issued the same caps to other client. The re-sent cap flush has a new TID, which makes MDS unable to detect if it has already processed the cap flush. This patch adds code to track pending caps flushing accurately. When re-sending cap flush is needed, we use its original flush TID. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 245 +++++++++++++++++++++++++++++++++------------------ fs/ceph/inode.c | 3 +- fs/ceph/mds_client.c | 20 +++++ fs/ceph/mds_client.h | 1 + fs/ceph/super.h | 11 ++- 5 files changed, 192 insertions(+), 88 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index dc988337f841..9a25f8d66fbc 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1097,7 +1097,8 @@ void ceph_queue_caps_release(struct inode *inode) * caller should hold snap_rwsem (read), s_mutex. */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, - int op, int used, int want, int retain, int flushing) + int op, int used, int want, int retain, int flushing, + u64 flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; @@ -1115,8 +1116,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, u64 xattr_version = 0; struct ceph_buffer *xattr_blob = NULL; int delayed = 0; - u64 flush_tid = 0; - int i; int ret; bool inline_data; @@ -1160,24 +1159,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, cap->implemented &= cap->issued | used; cap->mds_wanted = want; - if (flushing) { - /* - * assign a tid for flush operations so we can avoid - * flush1 -> dirty1 -> flush2 -> flushack1 -> mark - * clean type races. track latest tid for every bit - * so we can handle flush AxFw, flush Fw, and have the - * first ack clean Ax. - */ - flush_tid = ++ci->i_cap_flush_last_tid; - dout(" cap_flush_tid %d\n", (int)flush_tid); - for (i = 0; i < CEPH_CAP_BITS; i++) - if (flushing & (1 << i)) - ci->i_cap_flush_tid[i] = flush_tid; - - follows = ci->i_head_snapc->seq; - } else { - follows = 0; - } + follows = flushing ? ci->i_head_snapc->seq : 0; keep = cap->implemented; seq = cap->seq; @@ -1311,7 +1293,10 @@ retry: goto retry; } - capsnap->flush_tid = ++ci->i_cap_flush_last_tid; + spin_lock(&mdsc->cap_dirty_lock); + capsnap->flush_tid = ++mdsc->last_cap_flush_tid; + spin_unlock(&mdsc->cap_dirty_lock); + atomic_inc(&capsnap->nref); if (list_empty(&capsnap->flushing_item)) list_add_tail(&capsnap->flushing_item, @@ -1407,6 +1392,29 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) return dirty; } +static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &ci->i_cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, i_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->i_node, parent, p); + rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1414,10 +1422,12 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) * Called under i_ceph_lock. */ static int __mark_caps_flushing(struct inode *inode, - struct ceph_mds_session *session) + struct ceph_mds_session *session, + u64 *flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); + struct ceph_cap_flush *cf; int flushing; BUG_ON(ci->i_dirty_caps == 0); @@ -1432,9 +1442,14 @@ static int __mark_caps_flushing(struct inode *inode, ci->i_dirty_caps = 0; dout(" inode %p now !dirty\n", inode); + cf = kmalloc(sizeof(*cf), GFP_ATOMIC); + cf->caps = flushing; + spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); + cf->tid = ++mdsc->last_cap_flush_tid; + if (list_empty(&ci->i_flushing_item)) { ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); @@ -1448,6 +1463,9 @@ static int __mark_caps_flushing(struct inode *inode, } spin_unlock(&mdsc->cap_dirty_lock); + __add_cap_flushing_to_inode(ci, cf); + + *flush_tid = cf->tid; return flushing; } @@ -1493,6 +1511,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; + u64 flush_tid; int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; @@ -1711,17 +1730,20 @@ ack: took_snap_rwsem = 1; } - if (cap == ci->i_auth_cap && ci->i_dirty_caps) - flushing = __mark_caps_flushing(inode, session); - else + if (cap == ci->i_auth_cap && ci->i_dirty_caps) { + flushing = __mark_caps_flushing(inode, session, + &flush_tid); + } else { flushing = 0; + flush_tid = 0; + } mds = cap->mds; /* remember mds, so we don't repeat */ sent++; /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing); + want, retain, flushing, flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1750,12 +1772,13 @@ ack: /* * Try to flush dirty caps back to the auth mds. */ -static int try_flush_caps(struct inode *inode, u16 flush_tid[]) +static int try_flush_caps(struct inode *inode, u64 *ptid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_session *session = NULL; int flushing = 0; + u64 flush_tid = 0; retry: spin_lock(&ci->i_ceph_lock); @@ -1780,46 +1803,52 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session); + flushing = __mark_caps_flushing(inode, session, &flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, - cap->issued | cap->implemented, flushing); + (cap->issued | cap->implemented), + flushing, flush_tid); - spin_lock(&ci->i_ceph_lock); - if (delayed) + if (delayed) { + spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); + spin_unlock(&ci->i_ceph_lock); + } + } else { + struct rb_node *n = rb_last(&ci->i_cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, i_node); + flush_tid = cf->tid; + } + flushing = ci->i_flushing_caps; + spin_unlock(&ci->i_ceph_lock); } - - flushing = ci->i_flushing_caps; - if (flushing) - memcpy(flush_tid, ci->i_cap_flush_tid, - sizeof(ci->i_cap_flush_tid)); out: - spin_unlock(&ci->i_ceph_lock); if (session) mutex_unlock(&session->s_mutex); + + *ptid = flush_tid; return flushing; } /* * Return true if we've flushed caps through the given flush_tid. */ -static int caps_are_flushed(struct inode *inode, u16 flush_tid[]) +static int caps_are_flushed(struct inode *inode, u64 flush_tid) { struct ceph_inode_info *ci = ceph_inode(inode); - int i, ret = 1; + struct ceph_cap_flush *cf; + struct rb_node *n; + int ret = 1; spin_lock(&ci->i_ceph_lock); - for (i = 0; i < CEPH_CAP_BITS; i++) { - if (!(ci->i_flushing_caps & (1 << i))) - continue; - // tid only has 16 bits. we need to handle wrapping - if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) { - /* still flushing this bit */ + n = rb_first(&ci->i_cap_flush_tree); + if (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid <= flush_tid) ret = 0; - break; - } } spin_unlock(&ci->i_ceph_lock); return ret; @@ -1922,7 +1951,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) { struct inode *inode = file->f_mapping->host; struct ceph_inode_info *ci = ceph_inode(inode); - u16 flush_tid[CEPH_CAP_BITS]; + u64 flush_tid; int ret; int dirty; @@ -1938,7 +1967,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) mutex_lock(&inode->i_mutex); - dirty = try_flush_caps(inode, flush_tid); + dirty = try_flush_caps(inode, &flush_tid); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); ret = unsafe_dirop_wait(inode); @@ -1967,14 +1996,14 @@ out: int ceph_write_inode(struct inode *inode, struct writeback_control *wbc) { struct ceph_inode_info *ci = ceph_inode(inode); - u16 flush_tid[CEPH_CAP_BITS]; + u64 flush_tid; int err = 0; int dirty; int wait = wbc->sync_mode == WB_SYNC_ALL; dout("write_inode %p wait=%d\n", inode, wait); if (wait) { - dirty = try_flush_caps(inode, flush_tid); + dirty = try_flush_caps(inode, &flush_tid); if (dirty) err = wait_event_interruptible(ci->i_cap_wq, caps_are_flushed(inode, flush_tid)); @@ -2022,6 +2051,51 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, } } +static int __kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session, + struct ceph_inode_info *ci) +{ + struct inode *inode = &ci->vfs_inode; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + int delayed = 0; + u64 first_tid = 0; + + while (true) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", inode, + cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + break; + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + if (cf->tid >= first_tid) + break; + } + if (!n) { + spin_unlock(&ci->i_ceph_lock); + break; + } + + cf = rb_entry(n, struct ceph_cap_flush, i_node); + first_tid = cf->tid + 1; + + dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, + cap, cf->tid, ceph_cap_string(cf->caps)); + delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, + __ceph_caps_used(ci), + __ceph_caps_wanted(ci), + cap->issued | cap->implemented, + cf->caps, cf->tid); + } + return delayed; +} + void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { @@ -2031,28 +2105,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps mds%d\n", session->s_mds); list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - struct inode *inode = &ci->vfs_inode; - struct ceph_cap *cap; - int delayed = 0; - - spin_lock(&ci->i_ceph_lock); - cap = ci->i_auth_cap; - if (cap && cap->session == session) { - dout("kick_flushing_caps %p cap %p %s\n", inode, - cap, ceph_cap_string(ci->i_flushing_caps)); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps); - if (delayed) { - spin_lock(&ci->i_ceph_lock); - __cap_delay_requeue(mdsc, ci); - spin_unlock(&ci->i_ceph_lock); - } - } else { - pr_err("%p auth cap %p not mds%d ???\n", inode, - cap, session->s_mds); + int delayed = __kick_flushing_caps(mdsc, session, ci); + if (delayed) { + spin_lock(&ci->i_ceph_lock); + __cap_delay_requeue(mdsc, ci); spin_unlock(&ci->i_ceph_lock); } } @@ -2064,7 +2120,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_cap *cap; - int delayed = 0; spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; @@ -2074,16 +2129,16 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, __ceph_flush_snaps(ci, &session, 1); if (ci->i_flushing_caps) { + int delayed; + spin_lock(&mdsc->cap_dirty_lock); list_move_tail(&ci->i_flushing_item, &cap->session->s_cap_flushing); spin_unlock(&mdsc->cap_dirty_lock); - delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, - __ceph_caps_used(ci), - __ceph_caps_wanted(ci), - cap->issued | cap->implemented, - ci->i_flushing_caps); + spin_unlock(&ci->i_ceph_lock); + + delayed = __kick_flushing_caps(mdsc, session, ci); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2836,16 +2891,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + struct ceph_cap_flush *cf; + struct rb_node *n; + LIST_HEAD(to_remove); unsigned seq = le32_to_cpu(m->seq); int dirty = le32_to_cpu(m->dirty); int cleaned = 0; int drop = 0; - int i; - for (i = 0; i < CEPH_CAP_BITS; i++) - if ((dirty & (1 << i)) && - (u16)flush_tid == ci->i_cap_flush_tid[i]) - cleaned |= 1 << i; + n = rb_first(&ci->i_cap_flush_tree); + while (n) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + n = rb_next(&cf->i_node); + if (cf->tid == flush_tid) + cleaned = cf->caps; + if (cf->tid <= flush_tid) { + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add_tail(&cf->list, &to_remove); + } else { + cleaned &= ~cf->caps; + if (!cleaned) + break; + } + } dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s," " flushing %s -> %s\n", @@ -2890,6 +2958,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, out: spin_unlock(&ci->i_ceph_lock); + + while (!list_empty(&to_remove)) { + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + kfree(cf); + } if (drop) iput(inode); } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 1c991df276c9..6d3f19db8c8a 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -417,8 +417,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); ci->i_cap_flush_seq = 0; - ci->i_cap_flush_last_tid = 0; - memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid)); + ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; ci->i_hold_caps_max = 0; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 8080d486a991..839901f51512 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1142,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg) { struct ceph_inode_info *ci = ceph_inode(inode); + LIST_HEAD(to_remove); int drop = 0; dout("removing cap %p, ci is %p, inode is %p\n", @@ -1149,9 +1150,19 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, spin_lock(&ci->i_ceph_lock); __ceph_remove_cap(cap, false); if (!ci->i_auth_cap) { + struct ceph_cap_flush *cf; struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; + while (true) { + struct rb_node *n = rb_first(&ci->i_cap_flush_tree); + if (!n) + break; + cf = rb_entry(n, struct ceph_cap_flush, i_node); + rb_erase(&cf->i_node, &ci->i_cap_flush_tree); + list_add(&cf->list, &to_remove); + } + spin_lock(&mdsc->cap_dirty_lock); if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( @@ -1173,8 +1184,16 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, drop = 1; } spin_unlock(&mdsc->cap_dirty_lock); + } spin_unlock(&ci->i_ceph_lock); + while (!list_empty(&to_remove)) { + struct ceph_cap_flush *cf; + cf = list_first_entry(&to_remove, + struct ceph_cap_flush, list); + list_del(&cf->list); + kfree(cf); + } while (drop--) iput(inode); return 0; @@ -3408,6 +3427,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); mdsc->cap_flush_seq = 0; + mdsc->last_cap_flush_tid = 1; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 509d6822e9b1..19f6084203f0 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -307,6 +307,7 @@ struct ceph_mds_client { spinlock_t snap_flush_lock; u64 cap_flush_seq; + u64 last_cap_flush_tid; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index c4961353d058..cc597f52e046 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -186,6 +186,15 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) } } +struct ceph_cap_flush { + u64 tid; + int caps; + union { + struct rb_node i_node; + struct list_head list; + }; +}; + /* * The frag tree describes how a directory is fragmented, potentially across * multiple metadata servers. It is also used to indicate points where @@ -299,7 +308,7 @@ struct ceph_inode_info { /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ - u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS]; + struct rb_root i_cap_flush_tree; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ unsigned long i_hold_caps_max; /* jiffies */ -- cgit v1.2.3-59-g8ed1b From 8310b08913eca8aee98744c9aff1ec0d1f603b19 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 9 Jun 2015 17:20:12 +0800 Subject: ceph: track pending caps flushing globally So we know TID of the oldest pending caps flushing. Later patch will send this information to MDS, so that MDS can trim its completed caps flush list. Tracking pending caps flushing globally also simplifies syncfs code. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 50 +++++++++++++++++++++++----- fs/ceph/inode.c | 1 - fs/ceph/mds_client.c | 93 +++++++++++++++++++++++++++------------------------- fs/ceph/mds_client.h | 2 +- fs/ceph/super.h | 2 +- 5 files changed, 91 insertions(+), 57 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 9a25f8d66fbc..0295048724d2 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1415,6 +1415,29 @@ static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci, rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree); } +static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, + struct ceph_cap_flush *cf) +{ + struct rb_node **p = &mdsc->cap_flush_tree.rb_node; + struct rb_node *parent = NULL; + struct ceph_cap_flush *other = NULL; + + while (*p) { + parent = *p; + other = rb_entry(parent, struct ceph_cap_flush, g_node); + + if (cf->tid < other->tid) + p = &(*p)->rb_left; + else if (cf->tid > other->tid) + p = &(*p)->rb_right; + else + BUG(); + } + + rb_link_node(&cf->g_node, parent, p); + rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1449,17 +1472,16 @@ static int __mark_caps_flushing(struct inode *inode, list_del_init(&ci->i_dirty_item); cf->tid = ++mdsc->last_cap_flush_tid; + __add_cap_flushing_to_mdsc(mdsc, cf); if (list_empty(&ci->i_flushing_item)) { - ci->i_cap_flush_seq = ++mdsc->cap_flush_seq; list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); mdsc->num_cap_flushing++; - dout(" inode %p now flushing seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing tid %llu\n", inode, cf->tid); } else { list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing); - dout(" inode %p now flushing (more) seq %lld\n", inode, - ci->i_cap_flush_seq); + dout(" inode %p now flushing (more) tid %llu\n", + inode, cf->tid); } spin_unlock(&mdsc->cap_dirty_lock); @@ -2123,8 +2145,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, spin_lock(&ci->i_ceph_lock); cap = ci->i_auth_cap; - dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode, - ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq); + dout("kick_flushing_inode_caps %p flushing %s\n", inode, + ceph_cap_string(ci->i_flushing_caps)); __ceph_flush_snaps(ci, &session, 1); @@ -2921,12 +2943,23 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps), ceph_cap_string(ci->i_flushing_caps & ~cleaned)); - if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned)) + if (list_empty(&to_remove) && !cleaned) goto out; ci->i_flushing_caps &= ~cleaned; spin_lock(&mdsc->cap_dirty_lock); + + if (!list_empty(&to_remove)) { + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (!cf || cf->tid > flush_tid) + wake_up_all(&mdsc->cap_flushing_wq); + } + if (ci->i_flushing_caps == 0) { list_del_init(&ci->i_flushing_item); if (!list_empty(&session->s_cap_flushing)) @@ -2936,7 +2969,6 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid, struct ceph_inode_info, i_flushing_item)->vfs_inode); mdsc->num_cap_flushing--; - wake_up_all(&mdsc->cap_flushing_wq); dout(" inode %p now !flushing\n", inode); if (ci->i_dirty_caps == 0) { diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 6d3f19db8c8a..3326302f5884 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -416,7 +416,6 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_flushing_caps = 0; INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); - ci->i_cap_flush_seq = 0; ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 839901f51512..31f6a78caa0a 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1164,6 +1164,10 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, } spin_lock(&mdsc->cap_dirty_lock); + + list_for_each_entry(cf, &to_remove, list) + rb_erase(&cf->g_node, &mdsc->cap_flush_tree); + if (!list_empty(&ci->i_dirty_item)) { pr_warn_ratelimited( " dropping dirty %s state for %p %lld\n", @@ -1467,39 +1471,56 @@ static int trim_caps(struct ceph_mds_client *mdsc, return 0; } -static int check_cap_flush(struct ceph_inode_info *ci, - u64 want_flush_seq, u64 want_snap_seq) +static int check_capsnap_flush(struct ceph_inode_info *ci, + u64 want_snap_seq) { - int ret1 = 1, ret2 = 1; + int ret = 1; spin_lock(&ci->i_ceph_lock); - if (want_flush_seq > 0 && ci->i_flushing_caps) - ret1 = ci->i_cap_flush_seq >= want_flush_seq; - if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) { struct ceph_cap_snap *capsnap = list_first_entry(&ci->i_cap_snaps, struct ceph_cap_snap, ci_item); - ret2 = capsnap->follows >= want_snap_seq; + ret = capsnap->follows >= want_snap_seq; } spin_unlock(&ci->i_ceph_lock); - return ret1 && ret2; + return ret; +} + +static int check_caps_flush(struct ceph_mds_client *mdsc, + u64 want_flush_tid) +{ + struct rb_node *n; + struct ceph_cap_flush *cf; + int ret = 1; + + spin_lock(&mdsc->cap_dirty_lock); + n = rb_first(&mdsc->cap_flush_tree); + cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL; + if (cf && cf->tid <= want_flush_tid) { + dout("check_caps_flush still flushing tid %llu <= %llu\n", + cf->tid, want_flush_tid); + ret = 0; + } + spin_unlock(&mdsc->cap_dirty_lock); + return ret; } /* * flush all dirty inode data to disk. * - * returns true if we've flushed through want_flush_seq + * returns true if we've flushed through want_flush_tid */ static void wait_caps_flush(struct ceph_mds_client *mdsc, - u64 want_flush_seq, u64 want_snap_seq) + u64 want_flush_tid, u64 want_snap_seq) { int mds; - dout("check_cap_flush want %lld\n", want_flush_seq); + dout("check_caps_flush want %llu snap want %llu\n", + want_flush_tid, want_snap_seq); mutex_lock(&mdsc->mutex); for (mds = 0; mds < mdsc->max_sessions; ) { struct ceph_mds_session *session = mdsc->sessions[mds]; - struct inode *inode1 = NULL, *inode2 = NULL; + struct inode *inode = NULL; if (!session) { mds++; @@ -1509,58 +1530,40 @@ static void wait_caps_flush(struct ceph_mds_client *mdsc, mutex_unlock(&mdsc->mutex); mutex_lock(&session->s_mutex); - if (!list_empty(&session->s_cap_flushing)) { - struct ceph_inode_info *ci = - list_first_entry(&session->s_cap_flushing, - struct ceph_inode_info, - i_flushing_item); - - if (!check_cap_flush(ci, want_flush_seq, 0)) { - dout("check_cap_flush still flushing %p " - "seq %lld <= %lld to mds%d\n", - &ci->vfs_inode, ci->i_cap_flush_seq, - want_flush_seq, mds); - inode1 = igrab(&ci->vfs_inode); - } - } if (!list_empty(&session->s_cap_snaps_flushing)) { struct ceph_cap_snap *capsnap = list_first_entry(&session->s_cap_snaps_flushing, struct ceph_cap_snap, flushing_item); struct ceph_inode_info *ci = capsnap->ci; - if (!check_cap_flush(ci, 0, want_snap_seq)) { + if (!check_capsnap_flush(ci, want_snap_seq)) { dout("check_cap_flush still flushing snap %p " "follows %lld <= %lld to mds%d\n", &ci->vfs_inode, capsnap->follows, want_snap_seq, mds); - inode2 = igrab(&ci->vfs_inode); + inode = igrab(&ci->vfs_inode); } } mutex_unlock(&session->s_mutex); ceph_put_mds_session(session); - if (inode1) { - wait_event(mdsc->cap_flushing_wq, - check_cap_flush(ceph_inode(inode1), - want_flush_seq, 0)); - iput(inode1); - } - if (inode2) { + if (inode) { wait_event(mdsc->cap_flushing_wq, - check_cap_flush(ceph_inode(inode2), - 0, want_snap_seq)); - iput(inode2); - } - - if (!inode1 && !inode2) + check_capsnap_flush(ceph_inode(inode), + want_snap_seq)); + iput(inode); + } else { mds++; + } mutex_lock(&mdsc->mutex); } - mutex_unlock(&mdsc->mutex); - dout("check_cap_flush ok, flushed thru %lld\n", want_flush_seq); + + wait_event(mdsc->cap_flushing_wq, + check_caps_flush(mdsc, want_flush_tid)); + + dout("check_caps_flush ok, flushed thru %llu\n", want_flush_tid); } /* @@ -3426,8 +3429,8 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc) spin_lock_init(&mdsc->cap_delay_lock); INIT_LIST_HEAD(&mdsc->snap_flush_list); spin_lock_init(&mdsc->snap_flush_lock); - mdsc->cap_flush_seq = 0; mdsc->last_cap_flush_tid = 1; + mdsc->cap_flush_tree = RB_ROOT; INIT_LIST_HEAD(&mdsc->cap_dirty); INIT_LIST_HEAD(&mdsc->cap_dirty_migrating); mdsc->num_cap_flushing = 0; @@ -3554,7 +3557,7 @@ void ceph_mdsc_sync(struct ceph_mds_client *mdsc) ceph_flush_dirty_caps(mdsc); spin_lock(&mdsc->cap_dirty_lock); - want_flush = mdsc->cap_flush_seq; + want_flush = mdsc->last_cap_flush_tid; spin_unlock(&mdsc->cap_dirty_lock); down_read(&mdsc->snap_rwsem); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 19f6084203f0..470be4eb25f3 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -306,8 +306,8 @@ struct ceph_mds_client { struct list_head snap_flush_list; /* cap_snaps ready to flush */ spinlock_t snap_flush_lock; - u64 cap_flush_seq; u64 last_cap_flush_tid; + struct rb_root cap_flush_tree; struct list_head cap_dirty; /* inodes with dirty caps */ struct list_head cap_dirty_migrating; /* ...that are migration... */ int num_cap_flushing; /* # caps we are flushing */ diff --git a/fs/ceph/super.h b/fs/ceph/super.h index cc597f52e046..94d91471165f 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -189,6 +189,7 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) struct ceph_cap_flush { u64 tid; int caps; + struct rb_node g_node; union { struct rb_node i_node; struct list_head list; @@ -304,7 +305,6 @@ struct ceph_inode_info { struct ceph_cap *i_auth_cap; /* authoritative cap, if any */ unsigned i_dirty_caps, i_flushing_caps; /* mask of dirtied fields */ struct list_head i_dirty_item, i_flushing_item; - u64 i_cap_flush_seq; /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ -- cgit v1.2.3-59-g8ed1b From a2971c8ccb9bd7677a6c43cdbed9aacfef5e9f26 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Jun 2015 11:09:32 +0800 Subject: ceph: send TID of the oldest pending caps flush to MDS According to this information, MDS can trim its completed caps flush list (which is used to detect duplicated cap flush). Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 67 ++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 18 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 0295048724d2..420272788e01 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -986,8 +986,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release) static int send_cap_msg(struct ceph_mds_session *session, u64 ino, u64 cid, int op, int caps, int wanted, int dirty, - u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq, - u64 size, u64 max_size, + u32 seq, u64 flush_tid, u64 oldest_flush_tid, + u32 issue_seq, u32 mseq, u64 size, u64 max_size, struct timespec *mtime, struct timespec *atime, u64 time_warp_seq, kuid_t uid, kgid_t gid, umode_t mode, @@ -1001,20 +1001,23 @@ static int send_cap_msg(struct ceph_mds_session *session, size_t extra_len; dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s" - " seq %u/%u mseq %u follows %lld size %llu/%llu" + " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu" " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op), cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted), ceph_cap_string(dirty), - seq, issue_seq, mseq, follows, size, max_size, + seq, issue_seq, flush_tid, oldest_flush_tid, + mseq, follows, size, max_size, xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0); - /* flock buffer size + inline version + inline data size */ - extra_len = 4 + 8 + 4; + /* flock buffer size + inline version + inline data size + + * osd_epoch_barrier + oldest_flush_tid */ + extra_len = 4 + 8 + 4 + 4 + 8; msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len, GFP_NOFS, false); if (!msg) return -ENOMEM; + msg->hdr.version = cpu_to_le16(6); msg->hdr.tid = cpu_to_le64(flush_tid); fc = msg->front.iov_base; @@ -1050,6 +1053,10 @@ static int send_cap_msg(struct ceph_mds_session *session, ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE); /* inline data size */ ceph_encode_32(&p, 0); + /* osd_epoch_barrier */ + ceph_encode_32(&p, 0); + /* oldest_flush_tid */ + ceph_encode_64(&p, oldest_flush_tid); fc->xattr_version = cpu_to_le64(xattr_version); if (xattrs_buf) { @@ -1098,7 +1105,7 @@ void ceph_queue_caps_release(struct inode *inode) */ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, int op, int used, int want, int retain, int flushing, - u64 flush_tid) + u64 flush_tid, u64 oldest_flush_tid) __releases(cap->ci->i_ceph_lock) { struct ceph_inode_info *ci = cap->ci; @@ -1187,7 +1194,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap, spin_unlock(&ci->i_ceph_lock); ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id, - op, keep, want, flushing, seq, flush_tid, issue_seq, mseq, + op, keep, want, flushing, seq, + flush_tid, oldest_flush_tid, issue_seq, mseq, size, max_size, &mtime, &atime, time_warp_seq, uid, gid, mode, xattr_version, xattr_blob, follows, inline_data); @@ -1307,8 +1315,8 @@ retry: inode, capsnap, capsnap->follows, capsnap->flush_tid); send_cap_msg(session, ceph_vino(inode).ino, 0, CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0, - capsnap->dirty, 0, capsnap->flush_tid, 0, mseq, - capsnap->size, 0, + capsnap->dirty, 0, capsnap->flush_tid, 0, + 0, mseq, capsnap->size, 0, &capsnap->mtime, &capsnap->atime, capsnap->time_warp_seq, capsnap->uid, capsnap->gid, capsnap->mode, @@ -1438,6 +1446,17 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); } +static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) +{ + struct rb_node *n = rb_first(&mdsc->cap_flush_tree); + if (n) { + struct ceph_cap_flush *cf = + rb_entry(n, struct ceph_cap_flush, g_node); + return cf->tid; + } + return 0; +} + /* * Add dirty inode to the flushing list. Assigned a seq number so we * can wait for caps to flush without starving. @@ -1446,7 +1465,7 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, */ static int __mark_caps_flushing(struct inode *inode, struct ceph_mds_session *session, - u64 *flush_tid) + u64 *flush_tid, u64 *oldest_flush_tid) { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); @@ -1473,6 +1492,7 @@ static int __mark_caps_flushing(struct inode *inode, cf->tid = ++mdsc->last_cap_flush_tid; __add_cap_flushing_to_mdsc(mdsc, cf); + *oldest_flush_tid = __get_oldest_flush_tid(mdsc); if (list_empty(&ci->i_flushing_item)) { list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing); @@ -1533,7 +1553,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags, struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; - u64 flush_tid; + u64 flush_tid, oldest_flush_tid; int file_wanted, used, cap_used; int took_snap_rwsem = 0; /* true if mdsc->snap_rwsem held */ int issued, implemented, want, retain, revoking, flushing = 0; @@ -1754,10 +1774,14 @@ ack: if (cap == ci->i_auth_cap && ci->i_dirty_caps) { flushing = __mark_caps_flushing(inode, session, - &flush_tid); + &flush_tid, + &oldest_flush_tid); } else { flushing = 0; flush_tid = 0; + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); } mds = cap->mds; /* remember mds, so we don't repeat */ @@ -1765,7 +1789,8 @@ ack: /* __send_cap drops i_ceph_lock */ delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used, - want, retain, flushing, flush_tid); + want, retain, flushing, + flush_tid, oldest_flush_tid); goto retry; /* retake i_ceph_lock and restart our cap scan. */ } @@ -1800,7 +1825,7 @@ static int try_flush_caps(struct inode *inode, u64 *ptid) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_session *session = NULL; int flushing = 0; - u64 flush_tid = 0; + u64 flush_tid = 0, oldest_flush_tid = 0; retry: spin_lock(&ci->i_ceph_lock); @@ -1825,12 +1850,13 @@ retry: if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) goto out; - flushing = __mark_caps_flushing(inode, session, &flush_tid); + flushing = __mark_caps_flushing(inode, session, &flush_tid, + &oldest_flush_tid); /* __send_cap drops i_ceph_lock */ delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want, (cap->issued | cap->implemented), - flushing, flush_tid); + flushing, flush_tid, oldest_flush_tid); if (delayed) { spin_lock(&ci->i_ceph_lock); @@ -2083,6 +2109,11 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, struct rb_node *n; int delayed = 0; u64 first_tid = 0; + u64 oldest_flush_tid; + + spin_lock(&mdsc->cap_dirty_lock); + oldest_flush_tid = __get_oldest_flush_tid(mdsc); + spin_unlock(&mdsc->cap_dirty_lock); while (true) { spin_lock(&ci->i_ceph_lock); @@ -2113,7 +2144,7 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, __ceph_caps_used(ci), __ceph_caps_wanted(ci), cap->issued | cap->implemented, - cf->caps, cf->tid); + cf->caps, cf->tid, oldest_flush_tid); } return delayed; } -- cgit v1.2.3-59-g8ed1b From e548e9b93d3e565e42b938a99804114565be1f81 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Jun 2015 15:17:56 +0800 Subject: ceph: re-send flushing caps (which are revoked) in reconnect stage if flushing caps were revoked, we should re-send the cap flush in client reconnect stage. This guarantees that MDS processes the cap flush message before issuing the flushing caps to other client. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 57 ++++++++++++++++++++++++++++++++++++++++++++++++---- fs/ceph/mds_client.c | 3 +++ fs/ceph/super.h | 7 +++++-- 3 files changed, 61 insertions(+), 6 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 420272788e01..69a16044ec41 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1486,6 +1486,7 @@ static int __mark_caps_flushing(struct inode *inode, cf = kmalloc(sizeof(*cf), GFP_ATOMIC); cf->caps = flushing; + cf->kick = false; spin_lock(&mdsc->cap_dirty_lock); list_del_init(&ci->i_dirty_item); @@ -2101,7 +2102,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, static int __kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, - struct ceph_inode_info *ci) + struct ceph_inode_info *ci, + bool kick_all) { struct inode *inode = &ci->vfs_inode; struct ceph_cap *cap; @@ -2127,7 +2129,9 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { cf = rb_entry(n, struct ceph_cap_flush, i_node); - if (cf->tid >= first_tid) + if (cf->tid < first_tid) + continue; + if (kick_all || cf->kick) break; } if (!n) { @@ -2136,6 +2140,8 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, } cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = false; + first_tid = cf->tid + 1; dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, @@ -2149,6 +2155,49 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, return delayed; } +void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session) +{ + struct ceph_inode_info *ci; + struct ceph_cap *cap; + struct ceph_cap_flush *cf; + struct rb_node *n; + + dout("early_kick_flushing_caps mds%d\n", session->s_mds); + list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { + spin_lock(&ci->i_ceph_lock); + cap = ci->i_auth_cap; + if (!(cap && cap->session == session)) { + pr_err("%p auth cap %p not mds%d ???\n", + &ci->vfs_inode, cap, session->s_mds); + spin_unlock(&ci->i_ceph_lock); + continue; + } + + + /* + * if flushing caps were revoked, we re-send the cap flush + * in client reconnect stage. This guarantees MDS * processes + * the cap flush message before issuing the flushing caps to + * other client. + */ + if ((cap->issued & ci->i_flushing_caps) != + ci->i_flushing_caps) { + spin_unlock(&ci->i_ceph_lock); + if (!__kick_flushing_caps(mdsc, session, ci, true)) + continue; + spin_lock(&ci->i_ceph_lock); + } + + for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { + cf = rb_entry(n, struct ceph_cap_flush, i_node); + cf->kick = true; + } + + spin_unlock(&ci->i_ceph_lock); + } +} + void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session) { @@ -2158,7 +2207,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, dout("kick_flushing_caps mds%d\n", session->s_mds); list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { - int delayed = __kick_flushing_caps(mdsc, session, ci); + int delayed = __kick_flushing_caps(mdsc, session, ci, false); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); @@ -2191,7 +2240,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, spin_unlock(&ci->i_ceph_lock); - delayed = __kick_flushing_caps(mdsc, session, ci); + delayed = __kick_flushing_caps(mdsc, session, ci, true); if (delayed) { spin_lock(&ci->i_ceph_lock); __cap_delay_requeue(mdsc, ci); diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 31f6a78caa0a..89e4305a94d4 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -2982,6 +2982,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, reply->hdr.data_len = cpu_to_le32(pagelist->length); ceph_msg_data_add_pagelist(reply, pagelist); + + ceph_early_kick_flushing_caps(mdsc, session); + ceph_con_send(&session->s_con, reply); mutex_unlock(&session->s_mutex); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 94d91471165f..e7f13f742357 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -189,9 +189,10 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) struct ceph_cap_flush { u64 tid; int caps; - struct rb_node g_node; + bool kick; + struct rb_node g_node; // global union { - struct rb_node i_node; + struct rb_node i_node; // inode struct list_head list; }; }; @@ -868,6 +869,8 @@ extern void ceph_queue_caps_release(struct inode *inode); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync); +extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc, + struct ceph_mds_session *session); extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session); extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, -- cgit v1.2.3-59-g8ed1b From f66fd9f0952187d274c13c136b74548f792c1925 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 10 Jun 2015 17:26:13 +0800 Subject: ceph: pre-allocate data structure that tracks caps flushing Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 19 +++++++++++++++---- fs/ceph/caps.c | 26 ++++++++++++++++++++++---- fs/ceph/file.c | 18 ++++++++++++++++-- fs/ceph/inode.c | 15 +++++++++++++-- fs/ceph/mds_client.c | 6 +++++- fs/ceph/super.c | 8 ++++++++ fs/ceph/super.h | 6 +++++- fs/ceph/xattr.c | 20 ++++++++++++++++++-- include/linux/ceph/libceph.h | 1 + 9 files changed, 103 insertions(+), 16 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 5f53ac0d9d7c..7edf3c49e661 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -1308,12 +1308,17 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) struct inode *inode = file_inode(vma->vm_file); struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_file_info *fi = vma->vm_file->private_data; + struct ceph_cap_flush *prealloc_cf; struct page *page = vmf->page; loff_t off = page_offset(page); loff_t size = i_size_read(inode); size_t len; int want, got, ret; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return VM_FAULT_SIGBUS; + if (ci->i_inline_version != CEPH_INLINE_NONE) { struct page *locked_page = NULL; if (off == 0) { @@ -1323,8 +1328,10 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) ret = ceph_uninline_data(vma->vm_file, locked_page); if (locked_page) unlock_page(locked_page); - if (ret < 0) - return VM_FAULT_SIGBUS; + if (ret < 0) { + ret = VM_FAULT_SIGBUS; + goto out_free; + } } if (off + PAGE_CACHE_SIZE <= size) @@ -1346,7 +1353,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf) break; if (ret != -ERESTARTSYS) { WARN_ON(1); - return VM_FAULT_SIGBUS; + ret = VM_FAULT_SIGBUS; + goto out_free; } } dout("page_mkwrite %p %llu~%zd got cap refs on %s\n", @@ -1381,7 +1389,8 @@ out: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1390,6 +1399,8 @@ out: dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %d\n", inode, off, len, ceph_cap_string(got), ret); ceph_put_cap_refs(ci, got); +out_free: + ceph_free_cap_flush(prealloc_cf); return ret; } diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index 69a16044ec41..dd7b20adf1d4 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -1356,7 +1356,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci) * Caller is then responsible for calling __mark_inode_dirty with the * returned flags value. */ -int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) +int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf) { struct ceph_mds_client *mdsc = ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc; @@ -1376,6 +1377,9 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ceph_cap_string(was | mask)); ci->i_dirty_caps |= mask; if (was == 0) { + WARN_ON_ONCE(ci->i_prealloc_cap_flush); + swap(ci->i_prealloc_cap_flush, *pcf); + if (!ci->i_head_snapc) { WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem)); ci->i_head_snapc = ceph_get_snap_context( @@ -1391,6 +1395,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask) ihold(inode); dirty |= I_DIRTY_SYNC; } + } else { + WARN_ON_ONCE(!ci->i_prealloc_cap_flush); } BUG_ON(list_empty(&ci->i_dirty_item)); if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) && @@ -1446,6 +1452,17 @@ static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc, rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree); } +struct ceph_cap_flush *ceph_alloc_cap_flush(void) +{ + return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL); +} + +void ceph_free_cap_flush(struct ceph_cap_flush *cf) +{ + if (cf) + kmem_cache_free(ceph_cap_flush_cachep, cf); +} + static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc) { struct rb_node *n = rb_first(&mdsc->cap_flush_tree); @@ -1469,11 +1486,12 @@ static int __mark_caps_flushing(struct inode *inode, { struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc; struct ceph_inode_info *ci = ceph_inode(inode); - struct ceph_cap_flush *cf; + struct ceph_cap_flush *cf = NULL; int flushing; BUG_ON(ci->i_dirty_caps == 0); BUG_ON(list_empty(&ci->i_dirty_item)); + BUG_ON(!ci->i_prealloc_cap_flush); flushing = ci->i_dirty_caps; dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n", @@ -1484,7 +1502,7 @@ static int __mark_caps_flushing(struct inode *inode, ci->i_dirty_caps = 0; dout(" inode %p now !dirty\n", inode); - cf = kmalloc(sizeof(*cf), GFP_ATOMIC); + swap(cf, ci->i_prealloc_cap_flush); cf->caps = flushing; cf->kick = false; @@ -3075,7 +3093,7 @@ out: cf = list_first_entry(&to_remove, struct ceph_cap_flush, list); list_del(&cf->list); - kfree(cf); + ceph_free_cap_flush(cf); } if (drop) iput(inode); diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 0a76a370d798..8a4eb4d21d3c 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -939,6 +939,7 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_sb_to_client(inode->i_sb)->client->osdc; + struct ceph_cap_flush *prealloc_cf; ssize_t count, written = 0; int err, want, got; loff_t pos; @@ -946,6 +947,10 @@ static ssize_t ceph_write_iter(struct kiocb *iocb, struct iov_iter *from) if (ceph_snap(inode) != CEPH_NOSNAP) return -EROFS; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + mutex_lock(&inode->i_mutex); /* We can write back this queue in page reclaim */ @@ -1050,7 +1055,8 @@ retry_snap: int dirty; spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1074,6 +1080,7 @@ retry_snap: out: mutex_unlock(&inode->i_mutex); out_unlocked: + ceph_free_cap_flush(prealloc_cf); current->backing_dev_info = NULL; return written ? written : err; } @@ -1270,6 +1277,7 @@ static long ceph_fallocate(struct file *file, int mode, struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_osd_client *osdc = &ceph_inode_to_client(inode)->client->osdc; + struct ceph_cap_flush *prealloc_cf; int want, got = 0; int dirty; int ret = 0; @@ -1282,6 +1290,10 @@ static long ceph_fallocate(struct file *file, int mode, if (!S_ISREG(inode->i_mode)) return -EOPNOTSUPP; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + mutex_lock(&inode->i_mutex); if (ceph_snap(inode) != CEPH_NOSNAP) { @@ -1328,7 +1340,8 @@ static long ceph_fallocate(struct file *file, int mode, if (!ret) { spin_lock(&ci->i_ceph_lock); ci->i_inline_version = CEPH_INLINE_NONE; - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &prealloc_cf); spin_unlock(&ci->i_ceph_lock); if (dirty) __mark_inode_dirty(inode, dirty); @@ -1337,6 +1350,7 @@ static long ceph_fallocate(struct file *file, int mode, ceph_put_cap_refs(ci, got); unlock: mutex_unlock(&inode->i_mutex); + ceph_free_cap_flush(prealloc_cf); return ret; } diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index 3326302f5884..e86d1a4efc46 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -416,6 +416,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_flushing_caps = 0; INIT_LIST_HEAD(&ci->i_dirty_item); INIT_LIST_HEAD(&ci->i_flushing_item); + ci->i_prealloc_cap_flush = NULL; ci->i_cap_flush_tree = RB_ROOT; init_waitqueue_head(&ci->i_cap_wq); ci->i_hold_caps_min = 0; @@ -1720,6 +1721,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) const unsigned int ia_valid = attr->ia_valid; struct ceph_mds_request *req; struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf; int issued; int release = 0, dirtied = 0; int mask = 0; @@ -1734,10 +1736,16 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) if (err != 0) return err; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETATTR, USE_AUTH_MDS); - if (IS_ERR(req)) + if (IS_ERR(req)) { + ceph_free_cap_flush(prealloc_cf); return PTR_ERR(req); + } spin_lock(&ci->i_ceph_lock); issued = __ceph_caps_issued(ci, NULL); @@ -1895,7 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) dout("setattr %p ATTR_FILE ... hrm!\n", inode); if (dirtied) { - inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied); + inode_dirty_flags = __ceph_mark_dirty_caps(ci, dirtied, + &prealloc_cf); inode->i_ctime = CURRENT_TIME; } @@ -1927,9 +1936,11 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr) ceph_mdsc_put_request(req); if (mask & CEPH_SETATTR_SIZE) __ceph_do_pending_vmtruncate(inode); + ceph_free_cap_flush(prealloc_cf); return err; out_put: ceph_mdsc_put_request(req); + ceph_free_cap_flush(prealloc_cf); return err; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 89e4305a94d4..8d73fe9d488b 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1189,6 +1189,10 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, } spin_unlock(&mdsc->cap_dirty_lock); + if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) { + list_add(&ci->i_prealloc_cap_flush->list, &to_remove); + ci->i_prealloc_cap_flush = NULL; + } } spin_unlock(&ci->i_ceph_lock); while (!list_empty(&to_remove)) { @@ -1196,7 +1200,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap, cf = list_first_entry(&to_remove, struct ceph_cap_flush, list); list_del(&cf->list); - kfree(cf); + ceph_free_cap_flush(cf); } while (drop--) iput(inode); diff --git a/fs/ceph/super.c b/fs/ceph/super.c index edeb83c43112..d1c833c321b9 100644 --- a/fs/ceph/super.c +++ b/fs/ceph/super.c @@ -622,6 +622,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc) */ struct kmem_cache *ceph_inode_cachep; struct kmem_cache *ceph_cap_cachep; +struct kmem_cache *ceph_cap_flush_cachep; struct kmem_cache *ceph_dentry_cachep; struct kmem_cache *ceph_file_cachep; @@ -647,6 +648,10 @@ static int __init init_caches(void) SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); if (ceph_cap_cachep == NULL) goto bad_cap; + ceph_cap_flush_cachep = KMEM_CACHE(ceph_cap_flush, + SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); + if (ceph_cap_flush_cachep == NULL) + goto bad_cap_flush; ceph_dentry_cachep = KMEM_CACHE(ceph_dentry_info, SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD); @@ -665,6 +670,8 @@ static int __init init_caches(void) bad_file: kmem_cache_destroy(ceph_dentry_cachep); bad_dentry: + kmem_cache_destroy(ceph_cap_flush_cachep); +bad_cap_flush: kmem_cache_destroy(ceph_cap_cachep); bad_cap: kmem_cache_destroy(ceph_inode_cachep); @@ -681,6 +688,7 @@ static void destroy_caches(void) kmem_cache_destroy(ceph_inode_cachep); kmem_cache_destroy(ceph_cap_cachep); + kmem_cache_destroy(ceph_cap_flush_cachep); kmem_cache_destroy(ceph_dentry_cachep); kmem_cache_destroy(ceph_file_cachep); diff --git a/fs/ceph/super.h b/fs/ceph/super.h index e7f13f742357..4415e977d72b 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -309,6 +309,7 @@ struct ceph_inode_info { /* we need to track cap writeback on a per-cap-bit basis, to allow * overlapping, pipelined cap flushes to the mds. we can probably * reduce the tid to 8 bits if we're concerned about inode size. */ + struct ceph_cap_flush *i_prealloc_cap_flush; struct rb_root i_cap_flush_tree; wait_queue_head_t i_cap_wq; /* threads waiting on a capability */ unsigned long i_hold_caps_min; /* jiffies */ @@ -578,7 +579,10 @@ static inline int __ceph_caps_dirty(struct ceph_inode_info *ci) { return ci->i_dirty_caps | ci->i_flushing_caps; } -extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask); +extern struct ceph_cap_flush *ceph_alloc_cap_flush(void); +extern void ceph_free_cap_flush(struct ceph_cap_flush *cf); +extern int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask, + struct ceph_cap_flush **pcf); extern int __ceph_caps_revoking_other(struct ceph_inode_info *ci, struct ceph_cap *ocap, int mask); diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c index c6f7d9b82085..819163d8313b 100644 --- a/fs/ceph/xattr.c +++ b/fs/ceph/xattr.c @@ -912,6 +912,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf = NULL; int issued; int err; int dirty = 0; @@ -950,6 +951,10 @@ int __ceph_setxattr(struct dentry *dentry, const char *name, if (!xattr) goto out; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + goto out; + spin_lock(&ci->i_ceph_lock); retry: issued = __ceph_caps_issued(ci, NULL); @@ -991,7 +996,8 @@ retry: flags, value ? 1 : -1, &xattr); if (!err) { - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, + &prealloc_cf); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; } @@ -1001,6 +1007,7 @@ retry: up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); + ceph_free_cap_flush(prealloc_cf); return err; do_sync: @@ -1010,6 +1017,7 @@ do_sync_unlocked: up_read(&mdsc->snap_rwsem); err = ceph_sync_setxattr(dentry, name, value, size, flags); out: + ceph_free_cap_flush(prealloc_cf); kfree(newname); kfree(newval); kfree(xattr); @@ -1062,6 +1070,7 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) struct ceph_vxattr *vxattr; struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc; + struct ceph_cap_flush *prealloc_cf = NULL; int issued; int err; int required_blob_size; @@ -1079,6 +1088,10 @@ int __ceph_removexattr(struct dentry *dentry, const char *name) if (!strncmp(name, XATTR_CEPH_PREFIX, XATTR_CEPH_PREFIX_LEN)) goto do_sync_unlocked; + prealloc_cf = ceph_alloc_cap_flush(); + if (!prealloc_cf) + return -ENOMEM; + err = -ENOMEM; spin_lock(&ci->i_ceph_lock); retry: @@ -1120,7 +1133,8 @@ retry: err = __remove_xattr_by_name(ceph_inode(inode), name); - dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL); + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL, + &prealloc_cf); ci->i_xattrs.dirty = true; inode->i_ctime = CURRENT_TIME; spin_unlock(&ci->i_ceph_lock); @@ -1128,12 +1142,14 @@ retry: up_read(&mdsc->snap_rwsem); if (dirty) __mark_inode_dirty(inode, dirty); + ceph_free_cap_flush(prealloc_cf); return err; do_sync: spin_unlock(&ci->i_ceph_lock); do_sync_unlocked: if (lock_snap_rwsem) up_read(&mdsc->snap_rwsem); + ceph_free_cap_flush(prealloc_cf); err = ceph_send_removexattr(dentry, name); return err; } diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h index d73a569f9bf5..9ebee53d3bf5 100644 --- a/include/linux/ceph/libceph.h +++ b/include/linux/ceph/libceph.h @@ -174,6 +174,7 @@ static inline int calc_pages_for(u64 off, u64 len) extern struct kmem_cache *ceph_inode_cachep; extern struct kmem_cache *ceph_cap_cachep; +extern struct kmem_cache *ceph_cap_flush_cachep; extern struct kmem_cache *ceph_dentry_cachep; extern struct kmem_cache *ceph_file_cachep; -- cgit v1.2.3-59-g8ed1b From 687265e5a885d6308f5d73e738efe3c2674fa218 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Sat, 13 Jun 2015 17:27:05 +0800 Subject: ceph: switch some GFP_NOFS memory allocation to GFP_KERNEL GFP_NOFS memory allocation is required for page writeback path. But there is no need to use GFP_NOFS in syscall path and readpage path Signed-off-by: Yan, Zheng --- fs/ceph/acl.c | 4 ++-- fs/ceph/addr.c | 4 ++-- fs/ceph/dir.c | 10 +++++----- fs/ceph/file.c | 8 ++++---- fs/ceph/mds_client.c | 3 ++- 5 files changed, 15 insertions(+), 14 deletions(-) (limited to 'fs') diff --git a/fs/ceph/acl.c b/fs/ceph/acl.c index 64fa248343f6..8f84646f10e9 100644 --- a/fs/ceph/acl.c +++ b/fs/ceph/acl.c @@ -187,10 +187,10 @@ int ceph_pre_init_acls(struct inode *dir, umode_t *mode, val_size2 = posix_acl_xattr_size(default_acl->a_count); err = -ENOMEM; - tmp_buf = kmalloc(max(val_size1, val_size2), GFP_NOFS); + tmp_buf = kmalloc(max(val_size1, val_size2), GFP_KERNEL); if (!tmp_buf) goto out_err; - pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_NOFS); + pagelist = kmalloc(sizeof(struct ceph_pagelist), GFP_KERNEL); if (!pagelist) goto out_err; ceph_pagelist_init(pagelist); diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 7edf3c49e661..97102038fe03 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -350,7 +350,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) /* build page vector */ nr_pages = calc_pages_for(0, len); - pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS); + pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL); ret = -ENOMEM; if (!pages) goto out; @@ -362,7 +362,7 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max) dout("start_read %p adding %p idx %lu\n", inode, page, page->index); if (add_to_page_cache_lru(page, &inode->i_data, page->index, - GFP_NOFS)) { + GFP_KERNEL)) { ceph_fscache_uncache_page(inode, page); page_cache_release(page); dout("start_read %p add_to_page_cache failed %p\n", diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index 424e23138c59..b99f2ff8189d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -38,7 +38,7 @@ int ceph_init_dentry(struct dentry *dentry) if (dentry->d_fsdata) return 0; - di = kmem_cache_alloc(ceph_dentry_cachep, GFP_NOFS | __GFP_ZERO); + di = kmem_cache_alloc(ceph_dentry_cachep, GFP_KERNEL | __GFP_ZERO); if (!di) return -ENOMEM; /* oh well */ @@ -231,7 +231,7 @@ static int note_last_dentry(struct ceph_file_info *fi, const char *name, int len) { kfree(fi->last_name); - fi->last_name = kmalloc(len+1, GFP_NOFS); + fi->last_name = kmalloc(len+1, GFP_KERNEL); if (!fi->last_name) return -ENOMEM; memcpy(fi->last_name, name, len); @@ -342,7 +342,7 @@ more: req->r_direct_hash = ceph_frag_value(frag); req->r_direct_is_hash = true; if (fi->last_name) { - req->r_path2 = kstrdup(fi->last_name, GFP_NOFS); + req->r_path2 = kstrdup(fi->last_name, GFP_KERNEL); if (!req->r_path2) { ceph_mdsc_put_request(req); return -ENOMEM; @@ -764,7 +764,7 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, err = PTR_ERR(req); goto out; } - req->r_path2 = kstrdup(dest, GFP_NOFS); + req->r_path2 = kstrdup(dest, GFP_KERNEL); if (!req->r_path2) { err = -ENOMEM; ceph_mdsc_put_request(req); @@ -1189,7 +1189,7 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size, return -EISDIR; if (!cf->dir_info) { - cf->dir_info = kmalloc(bufsize, GFP_NOFS); + cf->dir_info = kmalloc(bufsize, GFP_KERNEL); if (!cf->dir_info) return -ENOMEM; cf->dir_info_len = diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 8a4eb4d21d3c..424b5b540207 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -89,7 +89,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) case S_IFDIR: dout("init_file %p %p 0%o (regular)\n", inode, file, inode->i_mode); - cf = kmem_cache_alloc(ceph_file_cachep, GFP_NOFS | __GFP_ZERO); + cf = kmem_cache_alloc(ceph_file_cachep, GFP_KERNEL | __GFP_ZERO); if (cf == NULL) { ceph_put_fmode(ceph_inode(inode), fmode); /* clean up */ return -ENOMEM; @@ -483,7 +483,7 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, } } else { num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) return PTR_ERR(pages); ret = striped_read(inode, off, len, pages, @@ -734,7 +734,7 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, */ num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT; - pages = ceph_alloc_page_vector(num_pages, GFP_NOFS); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); if (IS_ERR(pages)) { ret = PTR_ERR(pages); goto out; @@ -858,7 +858,7 @@ again: struct page *page = NULL; loff_t i_size; if (retry_op == READ_INLINE) { - page = __page_cache_alloc(GFP_NOFS); + page = __page_cache_alloc(GFP_KERNEL); if (!page) return -ENOMEM; } diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c index 8d73fe9d488b..6aa07af67603 100644 --- a/fs/ceph/mds_client.c +++ b/fs/ceph/mds_client.c @@ -1668,7 +1668,8 @@ int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req, order = get_order(size * num_entries); while (order >= 0) { - rinfo->dir_in = (void*)__get_free_pages(GFP_NOFS | __GFP_NOWARN, + rinfo->dir_in = (void*)__get_free_pages(GFP_KERNEL | + __GFP_NOWARN, order); if (rinfo->dir_in) break; -- cgit v1.2.3-59-g8ed1b From fdd4e15838e59c394a1ec4963b57c22c12608685 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Tue, 16 Jun 2015 20:48:56 +0800 Subject: ceph: rework dcache readdir Previously our dcache readdir code relies on that child dentries in directory dentry's d_subdir list are sorted by dentry's offset in descending order. When adding dentries to the dcache, if a dentry already exists, our readdir code moves it to head of directory dentry's d_subdir list. This design relies on dcache internals. Al Viro suggests using ncpfs's approach: keeping array of pointers to dentries in page cache of directory inode. the validity of those pointers are presented by directory inode's complete and ordered flags. When a dentry gets pruned, we clear directory inode's complete flag in the d_prune() callback. Before moving a dentry to other directory, we clear the ordered flag for both old and new directory. Signed-off-by: Yan, Zheng --- fs/ceph/caps.c | 14 ++- fs/ceph/dir.c | 313 ++++++++++++++++++++++++++------------------------- fs/ceph/file.c | 2 +- fs/ceph/inode.c | 118 ++++++++++++++----- fs/ceph/mds_client.h | 3 + fs/ceph/super.h | 60 +++++----- 6 files changed, 295 insertions(+), 215 deletions(-) (limited to 'fs') diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c index dd7b20adf1d4..dc10c9dd36c1 100644 --- a/fs/ceph/caps.c +++ b/fs/ceph/caps.c @@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci) used |= CEPH_CAP_PIN; if (ci->i_rd_ref) used |= CEPH_CAP_FILE_RD; - if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages) + if (ci->i_rdcache_ref || + (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */ + ci->vfs_inode.i_data.nrpages)) used |= CEPH_CAP_FILE_CACHE; if (ci->i_wr_ref) used |= CEPH_CAP_FILE_WR; @@ -1651,9 +1653,10 @@ retry_locked: * If we fail, it's because pages are locked.... try again later. */ if ((!is_delayed || mdsc->stopping) && - ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ - inode->i_data.nrpages && /* have cached pages */ - (file_wanted == 0 || /* no open files */ + !S_ISDIR(inode->i_mode) && /* ignore readdir cache */ + ci->i_wrbuffer_ref == 0 && /* no dirty pages... */ + inode->i_data.nrpages && /* have cached pages */ + (file_wanted == 0 || /* no open files */ (revoking & (CEPH_CAP_FILE_CACHE| CEPH_CAP_FILE_LAZYIO))) && /* or revoking cache */ !tried_invalidate) { @@ -2805,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc, * try to invalidate (once). (If there are dirty buffers, we * will invalidate _after_ writeback.) */ - if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && + if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */ + ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) && (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 && !ci->i_wrbuffer_ref) { if (try_nonblocking_invalidate(inode)) { diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index b99f2ff8189d..9314b4ea2375 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -106,6 +106,27 @@ static int fpos_cmp(loff_t l, loff_t r) return (int)(fpos_off(l) - fpos_off(r)); } +/* + * make note of the last dentry we read, so we can + * continue at the same lexicographical point, + * regardless of what dir changes take place on the + * server. + */ +static int note_last_dentry(struct ceph_file_info *fi, const char *name, + int len, unsigned next_offset) +{ + char *buf = kmalloc(len+1, GFP_KERNEL); + if (!buf) + return -ENOMEM; + kfree(fi->last_name); + fi->last_name = buf; + memcpy(fi->last_name, name, len); + fi->last_name[len] = 0; + fi->next_offset = next_offset; + dout("note_last_dentry '%s'\n", fi->last_name); + return 0; +} + /* * When possible, we try to satisfy a readdir by peeking at the * dcache. We make this work by carefully ordering dentries on @@ -123,123 +144,113 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, struct ceph_file_info *fi = file->private_data; struct dentry *parent = file->f_path.dentry; struct inode *dir = d_inode(parent); - struct list_head *p; - struct dentry *dentry, *last; + struct dentry *dentry, *last = NULL; struct ceph_dentry_info *di; + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry *); int err = 0; + loff_t ptr_pos = 0; + struct ceph_readdir_cache_control cache_ctl = {}; - /* claim ref on last dentry we returned */ - last = fi->dentry; - fi->dentry = NULL; - - dout("__dcache_readdir %p v%u at %llu (last %p)\n", - dir, shared_gen, ctx->pos, last); - - spin_lock(&parent->d_lock); + dout("__dcache_readdir %p v%u at %llu\n", dir, shared_gen, ctx->pos); - /* start at beginning? */ - if (ctx->pos == 2 || last == NULL || - fpos_cmp(ctx->pos, ceph_dentry(last)->offset) < 0) { - if (list_empty(&parent->d_subdirs)) - goto out_unlock; - p = parent->d_subdirs.prev; - dout(" initial p %p/%p\n", p->prev, p->next); - } else { - p = last->d_child.prev; + /* we can calculate cache index for the first dirfrag */ + if (ceph_frag_is_leftmost(fpos_frag(ctx->pos))) { + cache_ctl.index = fpos_off(ctx->pos) - 2; + BUG_ON(cache_ctl.index < 0); + ptr_pos = cache_ctl.index * sizeof(struct dentry *); } -more: - dentry = list_entry(p, struct dentry, d_child); - di = ceph_dentry(dentry); - while (1) { - dout(" p %p/%p %s d_subdirs %p/%p\n", p->prev, p->next, - d_unhashed(dentry) ? "!hashed" : "hashed", - parent->d_subdirs.prev, parent->d_subdirs.next); - if (p == &parent->d_subdirs) { + while (true) { + pgoff_t pgoff; + bool emit_dentry; + + if (ptr_pos >= i_size_read(dir)) { fi->flags |= CEPH_F_ATEND; - goto out_unlock; + err = 0; + break; } - spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); + + err = -EAGAIN; + pgoff = ptr_pos >> PAGE_CACHE_SHIFT; + if (!cache_ctl.page || pgoff != page_index(cache_ctl.page)) { + ceph_readdir_cache_release(&cache_ctl); + cache_ctl.page = find_lock_page(&dir->i_data, pgoff); + if (!cache_ctl.page) { + dout(" page %lu not found\n", pgoff); + break; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(cache_ctl.page); + cache_ctl.dentries = kmap(cache_ctl.page); + } + + rcu_read_lock(); + spin_lock(&parent->d_lock); + /* check i_size again here, because empty directory can be + * marked as complete while not holding the i_mutex. */ + if (ceph_dir_is_complete_ordered(dir) && + ptr_pos < i_size_read(dir)) + dentry = cache_ctl.dentries[cache_ctl.index % nsize]; + else + dentry = NULL; + spin_unlock(&parent->d_lock); + if (dentry && !lockref_get_not_dead(&dentry->d_lockref)) + dentry = NULL; + rcu_read_unlock(); + if (!dentry) + break; + + emit_dentry = false; + di = ceph_dentry(dentry); + spin_lock(&dentry->d_lock); if (di->lease_shared_gen == shared_gen && - !d_unhashed(dentry) && d_really_is_positive(dentry) && + d_really_is_positive(dentry) && ceph_snap(d_inode(dentry)) != CEPH_SNAPDIR && ceph_ino(d_inode(dentry)) != CEPH_INO_CEPH && - fpos_cmp(ctx->pos, di->offset) <= 0) - break; - dout(" skipping %p %pd at %llu (%llu)%s%s\n", dentry, - dentry, di->offset, - ctx->pos, d_unhashed(dentry) ? " unhashed" : "", - !d_inode(dentry) ? " null" : ""); + fpos_cmp(ctx->pos, di->offset) <= 0) { + emit_dentry = true; + } spin_unlock(&dentry->d_lock); - p = p->prev; - dentry = list_entry(p, struct dentry, d_child); - di = ceph_dentry(dentry); - } - - dget_dlock(dentry); - spin_unlock(&dentry->d_lock); - spin_unlock(&parent->d_lock); - /* make sure a dentry wasn't dropped while we didn't have parent lock */ - if (!ceph_dir_is_complete_ordered(dir)) { - dout(" lost dir complete on %p; falling back to mds\n", dir); - dput(dentry); - err = -EAGAIN; - goto out; - } + if (emit_dentry) { + dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, + dentry, dentry, d_inode(dentry)); + ctx->pos = di->offset; + if (!dir_emit(ctx, dentry->d_name.name, + dentry->d_name.len, + ceph_translate_ino(dentry->d_sb, + d_inode(dentry)->i_ino), + d_inode(dentry)->i_mode >> 12)) { + dput(dentry); + err = 0; + break; + } + ctx->pos++; - dout(" %llu (%llu) dentry %p %pd %p\n", di->offset, ctx->pos, - dentry, dentry, d_inode(dentry)); - if (!dir_emit(ctx, dentry->d_name.name, - dentry->d_name.len, - ceph_translate_ino(dentry->d_sb, d_inode(dentry)->i_ino), - d_inode(dentry)->i_mode >> 12)) { - if (last) { - /* remember our position */ - fi->dentry = last; - fi->next_offset = fpos_off(di->offset); + if (last) + dput(last); + last = dentry; + } else { + dput(dentry); } - dput(dentry); - return 0; - } - - ctx->pos = di->offset + 1; - - if (last) - dput(last); - last = dentry; - - spin_lock(&parent->d_lock); - p = p->prev; /* advance to next dentry */ - goto more; -out_unlock: - spin_unlock(&parent->d_lock); -out: - if (last) + cache_ctl.index++; + ptr_pos += sizeof(struct dentry *); + } + ceph_readdir_cache_release(&cache_ctl); + if (last) { + int ret; + di = ceph_dentry(last); + ret = note_last_dentry(fi, last->d_name.name, last->d_name.len, + fpos_off(di->offset) + 1); + if (ret < 0) + err = ret; dput(last); + } return err; } -/* - * make note of the last dentry we read, so we can - * continue at the same lexicographical point, - * regardless of what dir changes take place on the - * server. - */ -static int note_last_dentry(struct ceph_file_info *fi, const char *name, - int len) -{ - kfree(fi->last_name); - fi->last_name = kmalloc(len+1, GFP_KERNEL); - if (!fi->last_name) - return -ENOMEM; - memcpy(fi->last_name, name, len); - fi->last_name[len] = 0; - dout("note_last_dentry '%s'\n", fi->last_name); - return 0; -} - static int ceph_readdir(struct file *file, struct dir_context *ctx) { struct ceph_file_info *fi = file->private_data; @@ -280,8 +291,7 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* can we use the dcache? */ spin_lock(&ci->i_ceph_lock); - if ((ctx->pos == 2 || fi->dentry) && - ceph_test_mount_opt(fsc, DCACHE) && + if (ceph_test_mount_opt(fsc, DCACHE) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete_ordered(ci) && @@ -296,24 +306,8 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) } else { spin_unlock(&ci->i_ceph_lock); } - if (fi->dentry) { - err = note_last_dentry(fi, fi->dentry->d_name.name, - fi->dentry->d_name.len); - if (err) - return err; - dput(fi->dentry); - fi->dentry = NULL; - } /* proceed with a normal readdir */ - - if (ctx->pos == 2) { - /* note dir version at start of readdir so we can tell - * if any dentries get dropped */ - fi->dir_release_count = atomic_read(&ci->i_release_count); - fi->dir_ordered_count = ci->i_ordered_count; - } - more: /* do we have the correct frag content buffered? */ if (fi->frag != frag || fi->last_readdir == NULL) { @@ -348,6 +342,9 @@ more: return -ENOMEM; } } + req->r_dir_release_cnt = fi->dir_release_count; + req->r_dir_ordered_cnt = fi->dir_ordered_count; + req->r_readdir_cache_idx = fi->readdir_cache_idx; req->r_readdir_offset = fi->next_offset; req->r_args.readdir.frag = cpu_to_le32(frag); @@ -364,26 +361,38 @@ more: (int)req->r_reply_info.dir_end, (int)req->r_reply_info.dir_complete); - if (!req->r_did_prepopulate) { - dout("readdir !did_prepopulate"); - /* preclude from marking dir complete */ - fi->dir_release_count--; - } /* note next offset and last dentry name */ rinfo = &req->r_reply_info; if (le32_to_cpu(rinfo->dir_dir->frag) != frag) { frag = le32_to_cpu(rinfo->dir_dir->frag); - if (ceph_frag_is_leftmost(frag)) - fi->next_offset = 2; - else - fi->next_offset = 0; - off = fi->next_offset; + off = req->r_readdir_offset; + fi->next_offset = off; } + fi->frag = frag; fi->offset = fi->next_offset; fi->last_readdir = req; + if (req->r_did_prepopulate) { + fi->readdir_cache_idx = req->r_readdir_cache_idx; + if (fi->readdir_cache_idx < 0) { + /* preclude from marking dir ordered */ + fi->dir_ordered_count = 0; + } else if (ceph_frag_is_leftmost(frag) && off == 2) { + /* note dir version at start of readdir so + * we can tell if any dentries get dropped */ + fi->dir_release_count = req->r_dir_release_cnt; + fi->dir_ordered_count = req->r_dir_ordered_cnt; + } + } else { + dout("readdir !did_prepopulate"); + /* disable readdir cache */ + fi->readdir_cache_idx = -1; + /* preclude from marking dir complete */ + fi->dir_release_count = 0; + } + if (req->r_reply_info.dir_end) { kfree(fi->last_name); fi->last_name = NULL; @@ -394,10 +403,10 @@ more: } else { err = note_last_dentry(fi, rinfo->dir_dname[rinfo->dir_nr-1], - rinfo->dir_dname_len[rinfo->dir_nr-1]); + rinfo->dir_dname_len[rinfo->dir_nr-1], + fi->next_offset + rinfo->dir_nr); if (err) return err; - fi->next_offset += rinfo->dir_nr; } } @@ -453,16 +462,22 @@ more: * were released during the whole readdir, and we should have * the complete dir contents in our cache. */ - spin_lock(&ci->i_ceph_lock); - if (atomic_read(&ci->i_release_count) == fi->dir_release_count) { - if (ci->i_ordered_count == fi->dir_ordered_count) + if (atomic64_read(&ci->i_release_count) == fi->dir_release_count) { + spin_lock(&ci->i_ceph_lock); + if (fi->dir_ordered_count == atomic64_read(&ci->i_ordered_count)) { dout(" marking %p complete and ordered\n", inode); - else + /* use i_size to track number of entries in + * readdir cache */ + BUG_ON(fi->readdir_cache_idx < 0); + i_size_write(inode, fi->readdir_cache_idx * + sizeof(struct dentry*)); + } else { dout(" marking %p complete\n", inode); + } __ceph_dir_set_complete(ci, fi->dir_release_count, fi->dir_ordered_count); + spin_unlock(&ci->i_ceph_lock); } - spin_unlock(&ci->i_ceph_lock); dout("readdir %p file %p done.\n", inode, file); return 0; @@ -476,14 +491,12 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag) } kfree(fi->last_name); fi->last_name = NULL; + fi->dir_release_count = 0; + fi->readdir_cache_idx = -1; if (ceph_frag_is_leftmost(frag)) fi->next_offset = 2; /* compensate for . and .. */ else fi->next_offset = 0; - if (fi->dentry) { - dput(fi->dentry); - fi->dentry = NULL; - } fi->flags &= ~CEPH_F_ATEND; } @@ -497,13 +510,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) mutex_lock(&inode->i_mutex); retval = -EINVAL; switch (whence) { - case SEEK_END: - offset += inode->i_size + 2; /* FIXME */ - break; case SEEK_CUR: offset += file->f_pos; case SEEK_SET: break; + case SEEK_END: + retval = -EOPNOTSUPP; default: goto out; } @@ -516,20 +528,18 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) } retval = offset; - /* - * discard buffered readdir content on seekdir(0), or - * seek to new frag, or seek prior to current chunk. - */ if (offset == 0 || fpos_frag(offset) != fi->frag || fpos_off(offset) < fi->offset) { + /* discard buffered readdir content on seekdir(0), or + * seek to new frag, or seek prior to current chunk */ dout("dir_llseek dropping %p content\n", file); reset_readdir(fi, fpos_frag(offset)); + } else if (fpos_cmp(offset, old_offset) > 0) { + /* reset dir_release_count if we did a forward seek */ + fi->dir_release_count = 0; + fi->readdir_cache_idx = -1; } - - /* bump dir_release_count if we did a forward seek */ - if (fpos_cmp(offset, old_offset) > 0) - fi->dir_release_count--; } out: mutex_unlock(&inode->i_mutex); @@ -985,16 +995,15 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, * to do it here. */ + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_complete(old_dir); + ceph_dir_clear_complete(new_dir); + d_move(old_dentry, new_dentry); /* ensure target dentry is invalidated, despite rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(new_dentry); - - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_complete(old_dir); - ceph_dir_clear_complete(new_dir); - } ceph_mdsc_put_request(req); return err; diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 424b5b540207..faf92095e105 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -96,6 +96,7 @@ static int ceph_init_file(struct inode *inode, struct file *file, int fmode) } cf->fmode = fmode; cf->next_offset = 2; + cf->readdir_cache_idx = -1; file->private_data = cf; BUG_ON(inode->i_fop->release != ceph_release); break; @@ -324,7 +325,6 @@ int ceph_release(struct inode *inode, struct file *file) ceph_mdsc_put_request(cf->last_readdir); kfree(cf->last_name); kfree(cf->dir_info); - dput(cf->dentry); kmem_cache_free(ceph_file_cachep, cf); /* wake up anyone waiting for caps on this inode */ diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c index e86d1a4efc46..2a6d93befbae 100644 --- a/fs/ceph/inode.c +++ b/fs/ceph/inode.c @@ -390,9 +390,10 @@ struct inode *ceph_alloc_inode(struct super_block *sb) ci->i_inline_version = 0; ci->i_time_warp_seq = 0; ci->i_ceph_flags = 0; - ci->i_ordered_count = 0; - atomic_set(&ci->i_release_count, 1); - atomic_set(&ci->i_complete_count, 0); + atomic64_set(&ci->i_ordered_count, 1); + atomic64_set(&ci->i_release_count, 1); + atomic64_set(&ci->i_complete_seq[0], 0); + atomic64_set(&ci->i_complete_seq[1], 0); ci->i_symlink = NULL; memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout)); @@ -860,9 +861,10 @@ static int fill_inode(struct inode *inode, struct page *locked_page, (issued & CEPH_CAP_FILE_EXCL) == 0 && !__ceph_dir_is_complete(ci)) { dout(" marking %p complete (empty)\n", inode); + i_size_write(inode, 0); __ceph_dir_set_complete(ci, - atomic_read(&ci->i_release_count), - ci->i_ordered_count); + atomic64_read(&ci->i_release_count), + atomic64_read(&ci->i_ordered_count)); } wake = true; @@ -1214,6 +1216,10 @@ retry_lookup: dout("fill_trace doing d_move %p -> %p\n", req->r_old_dentry, dn); + /* d_move screws up sibling dentries' offsets */ + ceph_dir_clear_ordered(dir); + ceph_dir_clear_ordered(olddir); + d_move(req->r_old_dentry, dn); dout(" src %p '%pd' dst %p '%pd'\n", req->r_old_dentry, @@ -1224,10 +1230,6 @@ retry_lookup: rehashing bug in vfs_rename_dir */ ceph_invalidate_dentry_lease(dn); - /* d_move screws up sibling dentries' offsets */ - ceph_dir_clear_ordered(dir); - ceph_dir_clear_ordered(olddir); - dout("dn %p gets new offset %lld\n", req->r_old_dentry, ceph_dentry(req->r_old_dentry)->offset); @@ -1335,6 +1337,49 @@ static int readdir_prepopulate_inodes_only(struct ceph_mds_request *req, return err; } +void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl) +{ + if (ctl->page) { + kunmap(ctl->page); + page_cache_release(ctl->page); + ctl->page = NULL; + } +} + +static int fill_readdir_cache(struct inode *dir, struct dentry *dn, + struct ceph_readdir_cache_control *ctl, + struct ceph_mds_request *req) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + unsigned nsize = PAGE_CACHE_SIZE / sizeof(struct dentry*); + unsigned idx = ctl->index % nsize; + pgoff_t pgoff = ctl->index / nsize; + + if (!ctl->page || pgoff != page_index(ctl->page)) { + ceph_readdir_cache_release(ctl); + ctl->page = grab_cache_page(&dir->i_data, pgoff); + if (!ctl->page) { + ctl->index = -1; + return -ENOMEM; + } + /* reading/filling the cache are serialized by + * i_mutex, no need to use page lock */ + unlock_page(ctl->page); + ctl->dentries = kmap(ctl->page); + } + + if (req->r_dir_release_cnt == atomic64_read(&ci->i_release_count) && + req->r_dir_ordered_cnt == atomic64_read(&ci->i_ordered_count)) { + dout("readdir cache dn %p idx %d\n", dn, ctl->index); + ctl->dentries[idx] = dn; + ctl->index++; + } else { + dout("disable readdir cache\n"); + ctl->index = -1; + } + return 0; +} + int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct ceph_mds_session *session) { @@ -1347,8 +1392,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, struct inode *snapdir = NULL; struct ceph_mds_request_head *rhead = req->r_request->front.iov_base; struct ceph_dentry_info *di; - u64 r_readdir_offset = req->r_readdir_offset; u32 frag = le32_to_cpu(rhead->args.readdir.frag); + struct ceph_readdir_cache_control cache_ctl = {}; + + if (req->r_aborted) + return readdir_prepopulate_inodes_only(req, session); if (rinfo->dir_dir && le32_to_cpu(rinfo->dir_dir->frag) != frag) { @@ -1356,14 +1404,11 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, frag, le32_to_cpu(rinfo->dir_dir->frag)); frag = le32_to_cpu(rinfo->dir_dir->frag); if (ceph_frag_is_leftmost(frag)) - r_readdir_offset = 2; + req->r_readdir_offset = 2; else - r_readdir_offset = 0; + req->r_readdir_offset = 0; } - if (req->r_aborted) - return readdir_prepopulate_inodes_only(req, session); - if (le32_to_cpu(rinfo->head->op) == CEPH_MDS_OP_LSSNAP) { snapdir = ceph_get_snapdir(d_inode(parent)); parent = d_find_alias(snapdir); @@ -1376,6 +1421,17 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req, ceph_fill_dirfrag(d_inode(parent), rinfo->dir_dir); } + if (ceph_frag_is_leftmost(frag) && req->r_readdir_offset == 2) { + /* note dir version at start of readdir so we can tell + * if any dentries get dropped */ + struct ceph_inode_info *ci = ceph_inode(d_inode(parent)); + req->r_dir_release_cnt = atomic64_read(&ci->i_release_count); + req->r_dir_ordered_cnt = atomic64_read(&ci->i_ordered_count); + req->r_readdir_cache_idx = 0; + } + + cache_ctl.index = req->r_readdir_cache_idx; + /* FIXME: release caps/leases if error occurs */ for (i = 0; i < rinfo->dir_nr; i++) { struct ceph_vino vino; @@ -1415,13 +1471,6 @@ retry_lookup: d_delete(dn); dput(dn); goto retry_lookup; - } else { - /* reorder parent's d_subdirs */ - spin_lock(&parent->d_lock); - spin_lock_nested(&dn->d_lock, DENTRY_D_LOCK_NESTED); - list_move(&dn->d_child, &parent->d_subdirs); - spin_unlock(&dn->d_lock); - spin_unlock(&parent->d_lock); } /* inode */ @@ -1438,13 +1487,15 @@ retry_lookup: } } - if (fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, - req->r_request_started, -1, - &req->r_caps_reservation) < 0) { + ret = fill_inode(in, NULL, &rinfo->dir_in[i], NULL, session, + req->r_request_started, -1, + &req->r_caps_reservation); + if (ret < 0) { pr_err("fill_inode badness on %p\n", in); if (d_really_is_negative(dn)) iput(in); d_drop(dn); + err = ret; goto next_item; } @@ -1460,19 +1511,28 @@ retry_lookup: } di = dn->d_fsdata; - di->offset = ceph_make_fpos(frag, i + r_readdir_offset); + di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset); update_dentry_lease(dn, rinfo->dir_dlease[i], req->r_session, req->r_request_started); + + if (err == 0 && cache_ctl.index >= 0) { + ret = fill_readdir_cache(d_inode(parent), dn, + &cache_ctl, req); + if (ret < 0) + err = ret; + } next_item: if (dn) dput(dn); } - if (err == 0) - req->r_did_prepopulate = true; - out: + if (err == 0) { + req->r_did_prepopulate = true; + req->r_readdir_cache_idx = cache_ctl.index; + } + ceph_readdir_cache_release(&cache_ctl); if (snapdir) { iput(snapdir); dput(parent); diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h index 470be4eb25f3..762757e6cebf 100644 --- a/fs/ceph/mds_client.h +++ b/fs/ceph/mds_client.h @@ -253,6 +253,9 @@ struct ceph_mds_request { bool r_got_unsafe, r_got_safe, r_got_result; bool r_did_prepopulate; + long long r_dir_release_cnt; + long long r_dir_ordered_cnt; + int r_readdir_cache_idx; u32 r_readdir_offset; struct ceph_cap_reservation r_caps_reservation; diff --git a/fs/ceph/super.h b/fs/ceph/super.h index 4415e977d72b..860cc016e70d 100644 --- a/fs/ceph/super.h +++ b/fs/ceph/super.h @@ -282,9 +282,9 @@ struct ceph_inode_info { u32 i_time_warp_seq; unsigned i_ceph_flags; - int i_ordered_count; - atomic_t i_release_count; - atomic_t i_complete_count; + atomic64_t i_release_count; + atomic64_t i_ordered_count; + atomic64_t i_complete_seq[2]; struct ceph_dir_layout i_dir_layout; struct ceph_file_layout i_layout; @@ -471,30 +471,36 @@ static inline struct inode *ceph_find_inode(struct super_block *sb, static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci, - int release_count, int ordered_count) + long long release_count, + long long ordered_count) { - atomic_set(&ci->i_complete_count, release_count); - if (ci->i_ordered_count == ordered_count) - ci->i_ceph_flags |= CEPH_I_DIR_ORDERED; - else - ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; + smp_mb__before_atomic(); + atomic64_set(&ci->i_complete_seq[0], release_count); + atomic64_set(&ci->i_complete_seq[1], ordered_count); } static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci) { - atomic_inc(&ci->i_release_count); + atomic64_inc(&ci->i_release_count); +} + +static inline void __ceph_dir_clear_ordered(struct ceph_inode_info *ci) +{ + atomic64_inc(&ci->i_ordered_count); } static inline bool __ceph_dir_is_complete(struct ceph_inode_info *ci) { - return atomic_read(&ci->i_complete_count) == - atomic_read(&ci->i_release_count); + return atomic64_read(&ci->i_complete_seq[0]) == + atomic64_read(&ci->i_release_count); } static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci) { - return __ceph_dir_is_complete(ci) && - (ci->i_ceph_flags & CEPH_I_DIR_ORDERED); + return atomic64_read(&ci->i_complete_seq[0]) == + atomic64_read(&ci->i_release_count) && + atomic64_read(&ci->i_complete_seq[1]) == + atomic64_read(&ci->i_ordered_count); } static inline void ceph_dir_clear_complete(struct inode *inode) @@ -504,20 +510,13 @@ static inline void ceph_dir_clear_complete(struct inode *inode) static inline void ceph_dir_clear_ordered(struct inode *inode) { - struct ceph_inode_info *ci = ceph_inode(inode); - spin_lock(&ci->i_ceph_lock); - ci->i_ordered_count++; - ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED; - spin_unlock(&ci->i_ceph_lock); + __ceph_dir_clear_ordered(ceph_inode(inode)); } static inline bool ceph_dir_is_complete_ordered(struct inode *inode) { - struct ceph_inode_info *ci = ceph_inode(inode); - bool ret; - spin_lock(&ci->i_ceph_lock); - ret = __ceph_dir_is_complete_ordered(ci); - spin_unlock(&ci->i_ceph_lock); + bool ret = __ceph_dir_is_complete_ordered(ceph_inode(inode)); + smp_rmb(); return ret; } @@ -636,16 +635,20 @@ struct ceph_file_info { unsigned offset; /* offset of last chunk, adjusted for . and .. */ unsigned next_offset; /* offset of next chunk (last_name's + 1) */ char *last_name; /* last entry in previous chunk */ - struct dentry *dentry; /* next dentry (for dcache readdir) */ - int dir_release_count; - int dir_ordered_count; + long long dir_release_count; + long long dir_ordered_count; + int readdir_cache_idx; /* used for -o dirstat read() on directory thing */ char *dir_info; int dir_info_len; }; - +struct ceph_readdir_cache_control { + struct page *page; + struct dentry **dentries; + int index; +}; /* * A "snap realm" describes a subset of the file hierarchy sharing @@ -944,6 +947,7 @@ extern void ceph_dentry_lru_del(struct dentry *dn); extern void ceph_invalidate_dentry_lease(struct dentry *dentry); extern unsigned ceph_dentry_hash(struct inode *dir, struct dentry *dn); extern struct inode *ceph_get_dentry_parent_inode(struct dentry *dentry); +extern void ceph_readdir_cache_release(struct ceph_readdir_cache_control *ctl); /* * our d_ops vary depending on whether the inode is live, -- cgit v1.2.3-59-g8ed1b From e1966b49446a43994c3f25a07d0eb4d05660b429 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Thu, 18 Jun 2015 03:10:58 +0800 Subject: ceph: fix ceph_writepages_start() Before a page get locked, someone else can write data to the page and increase the i_size. So we should re-check the i_size after pages are locked. Signed-off-by: Yan, Zheng --- fs/ceph/addr.c | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'fs') diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c index 97102038fe03..890c50971a69 100644 --- a/fs/ceph/addr.c +++ b/fs/ceph/addr.c @@ -440,7 +440,7 @@ out: * only snap context we are allowed to write back. */ static struct ceph_snap_context *get_oldest_context(struct inode *inode, - u64 *snap_size) + loff_t *snap_size) { struct ceph_inode_info *ci = ceph_inode(inode); struct ceph_snap_context *snapc = NULL; @@ -480,8 +480,9 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) struct ceph_osd_client *osdc; struct ceph_snap_context *snapc, *oldest; loff_t page_off = page_offset(page); + loff_t snap_size = -1; long writeback_stat; - u64 truncate_size, snap_size = 0; + u64 truncate_size; u32 truncate_seq; int err = 0, len = PAGE_CACHE_SIZE; @@ -516,7 +517,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc) spin_lock(&ci->i_ceph_lock); truncate_seq = ci->i_truncate_seq; truncate_size = ci->i_truncate_size; - if (!snap_size) + if (snap_size == -1) snap_size = i_size_read(inode); spin_unlock(&ci->i_ceph_lock); @@ -699,7 +700,8 @@ static int ceph_writepages_start(struct address_space *mapping, unsigned wsize = 1 << inode->i_blkbits; struct ceph_osd_request *req = NULL; int do_sync = 0; - u64 truncate_size, snap_size; + loff_t snap_size, i_size; + u64 truncate_size; u32 truncate_seq; /* @@ -745,7 +747,7 @@ static int ceph_writepages_start(struct address_space *mapping, retry: /* find oldest snap context with dirty data */ ceph_put_snap_context(snapc); - snap_size = 0; + snap_size = -1; snapc = get_oldest_context(inode, &snap_size); if (!snapc) { /* hmm, why does writepages get called when there @@ -753,16 +755,13 @@ retry: dout(" no snap context with dirty data?\n"); goto out; } - if (snap_size == 0) - snap_size = i_size_read(inode); dout(" oldest snapc is %p seq %lld (%d snaps)\n", snapc, snapc->seq, snapc->num_snaps); spin_lock(&ci->i_ceph_lock); truncate_seq = ci->i_truncate_seq; truncate_size = ci->i_truncate_size; - if (!snap_size) - snap_size = i_size_read(inode); + i_size = i_size_read(inode); spin_unlock(&ci->i_ceph_lock); if (last_snapc && snapc != last_snapc) { @@ -832,8 +831,10 @@ get_more_pages: dout("waiting on writeback %p\n", page); wait_on_page_writeback(page); } - if (page_offset(page) >= snap_size) { - dout("%p page eof %llu\n", page, snap_size); + if (page_offset(page) >= + (snap_size == -1 ? i_size : snap_size)) { + dout("%p page eof %llu\n", page, + (snap_size == -1 ? i_size : snap_size)); done = 1; unlock_page(page); break; @@ -949,10 +950,18 @@ get_more_pages: } /* Format the osd request message and submit the write */ - offset = page_offset(pages[0]); - len = min(snap_size - offset, - (u64)locked_pages << PAGE_CACHE_SHIFT); + len = (u64)locked_pages << PAGE_CACHE_SHIFT; + if (snap_size == -1) { + len = min(len, (u64)i_size_read(inode) - offset); + /* writepages_finish() clears writeback pages + * according to the data length, so make sure + * data length covers all locked pages */ + len = max(len, 1 + + ((u64)(locked_pages - 1) << PAGE_CACHE_SHIFT)); + } else { + len = min(len, snap_size - offset); + } dout("writepages got %d pages at %llu~%llu\n", locked_pages, offset, len); -- cgit v1.2.3-59-g8ed1b