diff options
author | Linus Torvalds <torvalds@linux-foundation.org> | 2020-04-08 21:44:05 -0700 |
---|---|---|
committer | Linus Torvalds <torvalds@linux-foundation.org> | 2020-04-08 21:44:05 -0700 |
commit | fcc95f06403c956e3f50ca4a82db12b66a3078e0 (patch) | |
tree | 74bb3e387b635601ba351f621c7eea575e185aac /fs/ceph/dir.c | |
parent | Merge tag 'ovl-update-5.7' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/vfs (diff) | |
parent | ceph: fix snapshot directory timestamps (diff) | |
download | linux-dev-fcc95f06403c956e3f50ca4a82db12b66a3078e0.tar.xz linux-dev-fcc95f06403c956e3f50ca4a82db12b66a3078e0.zip |
Merge tag 'ceph-for-5.7-rc1' of git://github.com/ceph/ceph-client
Pull ceph updates from Ilya Dryomov:
"The main items are:
- support for asynchronous create and unlink (Jeff Layton).
Creates and unlinks are satisfied locally, without waiting for a
reply from the MDS, provided the client has been granted
appropriate caps (new in v15.y.z ("Octopus") release). This can be
a big help for metadata heavy workloads such as tar and rsync.
Opt-in with the new nowsync mount option.
- multiple blk-mq queues for rbd (Hannes Reinecke and myself).
When the driver was converted to blk-mq, we settled on a single
blk-mq queue because of a global lock in libceph and some other
technical debt. These have since been addressed, so allocate a
queue per CPU to enhance parallelism.
- don't hold onto caps that aren't actually needed (Zheng Yan).
This has been our long-standing behavior, but it causes issues with
some active/standby applications (synchronous I/O, stalls if the
standby goes down, etc).
- .snap directory timestamps consistent with ceph-fuse (Luis
Henriques)"
* tag 'ceph-for-5.7-rc1' of git://github.com/ceph/ceph-client: (49 commits)
ceph: fix snapshot directory timestamps
ceph: wait for async creating inode before requesting new max size
ceph: don't skip updating wanted caps when cap is stale
ceph: request new max size only when there is auth cap
ceph: cleanup return error of try_get_cap_refs()
ceph: return ceph_mdsc_do_request() errors from __get_parent()
ceph: check all mds' caps after page writeback
ceph: update i_requested_max_size only when sending cap msg to auth mds
ceph: simplify calling of ceph_get_fmode()
ceph: remove delay check logic from ceph_check_caps()
ceph: consider inode's last read/write when calculating wanted caps
ceph: always renew caps if mds_wanted is insufficient
ceph: update dentry lease for async create
ceph: attempt to do async create when possible
ceph: cache layout in parent dir on first sync create
ceph: add new MDS req field to hold delegated inode number
ceph: decode interval_sets for delegated inos
ceph: make ceph_fill_inode non-static
ceph: perform asynchronous unlink if we have sufficient caps
ceph: don't take refs to want mask unless we have all bits
...
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r-- | fs/ceph/dir.c | 132 |
1 files changed, 121 insertions, 11 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index d0cd0aba5843..d594c2627430 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -335,8 +335,11 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) ctx->pos = 2; } - /* can we use the dcache? */ spin_lock(&ci->i_ceph_lock); + /* request Fx cap. if have Fx, we don't need to release Fs cap + * for later create/unlink. */ + __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR); + /* can we use the dcache? */ if (ceph_test_mount_opt(fsc, DCACHE) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && @@ -752,7 +755,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, struct ceph_dentry_info *di = ceph_dentry(dentry); spin_lock(&ci->i_ceph_lock); - dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags); + dout(" dir %p flags are 0x%lx\n", dir, ci->i_ceph_flags); if (strncmp(dentry->d_name.name, fsc->mount_options->snapdir_name, dentry->d_name.len) && @@ -760,6 +763,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, ceph_test_mount_opt(fsc, DCACHE) && __ceph_dir_is_complete(ci) && (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { + __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD); spin_unlock(&ci->i_ceph_lock); dout(" dir %p complete, -ENOENT\n", dir); d_add(dentry, NULL); @@ -1036,6 +1040,78 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, return err; } +static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + int result = req->r_err ? req->r_err : + le32_to_cpu(req->r_reply_info.head->result); + + if (result == -EJUKEBOX) + goto out; + + /* If op failed, mark everyone involved for errors */ + if (result) { + int pathlen; + u64 base; + char *path = ceph_mdsc_build_path(req->r_dentry, &pathlen, + &base, 0); + + /* mark error on parent + clear complete */ + mapping_set_error(req->r_parent->i_mapping, result); + ceph_dir_clear_complete(req->r_parent); + + /* drop the dentry -- we don't know its status */ + if (!d_unhashed(req->r_dentry)) + d_drop(req->r_dentry); + + /* mark inode itself for an error (since metadata is bogus) */ + mapping_set_error(req->r_old_inode->i_mapping, result); + + pr_warn("ceph: async unlink failure path=(%llx)%s result=%d!\n", + base, IS_ERR(path) ? "<<bad>>" : path, result); + ceph_mdsc_free_path(path, pathlen); + } +out: + iput(req->r_old_inode); + ceph_mdsc_release_dir_caps(req); +} + +static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + struct ceph_dentry_info *di; + int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK; + + spin_lock(&ci->i_ceph_lock); + if ((__ceph_caps_issued(ci, NULL) & want) == want) { + ceph_take_cap_refs(ci, want, false); + got = want; + } + spin_unlock(&ci->i_ceph_lock); + + /* If we didn't get anything, return 0 */ + if (!got) + return 0; + + spin_lock(&dentry->d_lock); + di = ceph_dentry(dentry); + /* + * - We are holding Fx, which implies Fs caps. + * - Only support async unlink for primary linkage + */ + if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen || + !(di->flags & CEPH_DENTRY_PRIMARY_LINK)) + want = 0; + spin_unlock(&dentry->d_lock); + + /* Do we still want what we've got? */ + if (want == got) + return got; + + ceph_put_cap_refs(ci, got); + return 0; +} + /* * rmdir and unlink are differ only by the metadata op code */ @@ -1045,6 +1121,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = d_inode(dentry); struct ceph_mds_request *req; + bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS); int err = -EROFS; int op; @@ -1059,6 +1136,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK; } else goto out; +retry: req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -1067,13 +1145,39 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; - set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_inode_drop = ceph_drop_caps_for_unlink(inode); - err = ceph_mdsc_do_request(mdsc, dir, req); - if (!err && !req->r_reply_info.head->is_dentry) - d_delete(dentry); + + if (try_async && op == CEPH_MDS_OP_UNLINK && + (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) { + dout("async unlink on %lu/%.*s caps=%s", dir->i_ino, + dentry->d_name.len, dentry->d_name.name, + ceph_cap_string(req->r_dir_caps)); + set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags); + req->r_callback = ceph_async_unlink_cb; + req->r_old_inode = d_inode(dentry); + ihold(req->r_old_inode); + err = ceph_mdsc_submit_request(mdsc, dir, req); + if (!err) { + /* + * We have enough caps, so we assume that the unlink + * will succeed. Fix up the target inode and dcache. + */ + drop_nlink(inode); + d_delete(dentry); + } else if (err == -EJUKEBOX) { + try_async = false; + ceph_mdsc_put_request(req); + goto retry; + } + } else { + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); + err = ceph_mdsc_do_request(mdsc, dir, req); + if (!err && !req->r_reply_info.head->is_dentry) + d_delete(dentry); + } + ceph_mdsc_put_request(req); out: return err; @@ -1411,6 +1515,7 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry) spin_lock(&dentry->d_lock); di->time = jiffies; di->lease_shared_gen = 0; + di->flags &= ~CEPH_DENTRY_PRIMARY_LINK; __dentry_lease_unlist(di); spin_unlock(&dentry->d_lock); } @@ -1520,7 +1625,8 @@ static int __dir_lease_try_check(const struct dentry *dentry) /* * Check if directory-wide content lease/cap is valid. */ -static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) +static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry, + struct ceph_mds_client *mdsc) { struct ceph_inode_info *ci = ceph_inode(dir); int valid; @@ -1528,7 +1634,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) spin_lock(&ci->i_ceph_lock); valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); - shared_gen = atomic_read(&ci->i_shared_gen); + if (valid) { + __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD); + shared_gen = atomic_read(&ci->i_shared_gen); + } spin_unlock(&ci->i_ceph_lock); if (valid) { struct ceph_dentry_info *di; @@ -1554,6 +1663,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) int valid = 0; struct dentry *parent; struct inode *dir, *inode; + struct ceph_mds_client *mdsc; if (flags & LOOKUP_RCU) { parent = READ_ONCE(dentry->d_parent); @@ -1570,6 +1680,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry, dentry, inode, ceph_dentry(dentry)->offset); + mdsc = ceph_sb_to_client(dir->i_sb)->mdsc; + /* always trust cached snapped dentries, snapdir dentry */ if (ceph_snap(dir) != CEPH_NOSNAP) { dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry, @@ -1581,7 +1693,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) valid = dentry_lease_is_valid(dentry, flags); if (valid == -ECHILD) return valid; - if (valid || dir_lease_is_valid(dir, dentry)) { + if (valid || dir_lease_is_valid(dir, dentry, mdsc)) { if (inode) valid = ceph_is_any_caps(inode); else @@ -1590,8 +1702,6 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) } if (!valid) { - struct ceph_mds_client *mdsc = - ceph_sb_to_client(dir->i_sb)->mdsc; struct ceph_mds_request *req; int op, err; u32 mask; |