diff options
Diffstat (limited to 'fs/ceph/dir.c')
-rw-r--r-- | fs/ceph/dir.c | 357 |
1 files changed, 281 insertions, 76 deletions
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c index d0cd0aba5843..e7e2ebac330d 100644 --- a/fs/ceph/dir.c +++ b/fs/ceph/dir.c @@ -38,6 +38,7 @@ static int __dir_lease_try_check(const struct dentry *dentry); static int ceph_d_init(struct dentry *dentry) { struct ceph_dentry_info *di; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dentry->d_sb); di = kmem_cache_zalloc(ceph_dentry_cachep, GFP_KERNEL); if (!di) @@ -48,6 +49,9 @@ static int ceph_d_init(struct dentry *dentry) di->time = jiffies; dentry->d_fsdata = di; INIT_LIST_HEAD(&di->lease_list); + + atomic64_inc(&mdsc->metric.total_dentries); + return 0; } @@ -141,7 +145,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, return ERR_PTR(-EAGAIN); } /* reading/filling the cache are serialized by - i_mutex, no need to use page lock */ + i_rwsem, no need to use page lock */ unlock_page(cache_ctl->page); cache_ctl->dentries = kmap(cache_ctl->page); } @@ -151,7 +155,7 @@ __dcache_find_get_entry(struct dentry *parent, u64 idx, rcu_read_lock(); spin_lock(&parent->d_lock); /* check i_size again here, because empty directory can be - * marked as complete while not holding the i_mutex. */ + * marked as complete while not holding the i_rwsem. */ if (ceph_dir_is_complete_ordered(dir) && ptr_pos < i_size_read(dir)) dentry = cache_ctl->dentries[cache_ctl->index]; else @@ -254,9 +258,7 @@ static int __dcache_readdir(struct file *file, struct dir_context *ctx, dentry, dentry, d_inode(dentry)); ctx->pos = di->offset; if (!dir_emit(ctx, dentry->d_name.name, - dentry->d_name.len, - ceph_translate_ino(dentry->d_sb, - d_inode(dentry)->i_ino), + dentry->d_name.len, ceph_present_inode(d_inode(dentry)), d_inode(dentry)->i_mode >> 12)) { dput(dentry); err = 0; @@ -319,30 +321,37 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx) /* always start with . and .. */ if (ctx->pos == 0) { dout("readdir off 0 -> '.'\n"); - if (!dir_emit(ctx, ".", 1, - ceph_translate_ino(inode->i_sb, inode->i_ino), + if (!dir_emit(ctx, ".", 1, ceph_present_inode(inode), inode->i_mode >> 12)) return 0; ctx->pos = 1; } if (ctx->pos == 1) { - ino_t ino = parent_ino(file->f_path.dentry); + u64 ino; + struct dentry *dentry = file->f_path.dentry; + + spin_lock(&dentry->d_lock); + ino = ceph_present_inode(dentry->d_parent->d_inode); + spin_unlock(&dentry->d_lock); + dout("readdir off 1 -> '..'\n"); - if (!dir_emit(ctx, "..", 2, - ceph_translate_ino(inode->i_sb, ino), - inode->i_mode >> 12)) + if (!dir_emit(ctx, "..", 2, ino, inode->i_mode >> 12)) return 0; ctx->pos = 2; } - /* can we use the dcache? */ spin_lock(&ci->i_ceph_lock); + /* request Fx cap. if have Fx, we don't need to release Fs cap + * for later create/unlink. */ + __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_WR); + /* can we use the dcache? */ if (ceph_test_mount_opt(fsc, DCACHE) && !ceph_test_mount_opt(fsc, NOASYNCREADDIR) && ceph_snap(inode) != CEPH_SNAPDIR && __ceph_dir_is_complete_ordered(ci) && - __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) { + __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) { int shared_gen = atomic_read(&ci->i_shared_gen); + spin_unlock(&ci->i_ceph_lock); err = __dcache_readdir(file, ctx, shared_gen); if (err != -EAGAIN) @@ -469,8 +478,11 @@ more: 2 : (fpos_off(rde->offset) + 1); err = note_last_dentry(dfi, rde->name, rde->name_len, next_offset); - if (err) + if (err) { + ceph_mdsc_put_request(dfi->last_readdir); + dfi->last_readdir = NULL; return err; + } } else if (req->r_reply_info.dir_end) { dfi->next_offset = 2; /* keep last name */ @@ -498,9 +510,6 @@ more: } for (; i < rinfo->dir_nr; i++) { struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i; - struct ceph_vino vino; - ino_t ino; - u32 ftype; BUG_ON(rde->offset < ctx->pos); @@ -510,13 +519,16 @@ more: rde->name_len, rde->name, &rde->inode.in); BUG_ON(!rde->inode.in); - ftype = le32_to_cpu(rde->inode.in->mode) >> 12; - vino.ino = le64_to_cpu(rde->inode.in->ino); - vino.snap = le64_to_cpu(rde->inode.in->snapid); - ino = ceph_vino_to_ino(vino); if (!dir_emit(ctx, rde->name, rde->name_len, - ceph_translate_ino(inode->i_sb, ino), ftype)) { + ceph_present_ino(inode->i_sb, le64_to_cpu(rde->inode.in->ino)), + le32_to_cpu(rde->inode.in->mode) >> 12)) { + /* + * NOTE: Here no need to put the 'dfi->last_readdir', + * because when dir_emit stops us it's most likely + * doesn't have enough memory, etc. So for next readdir + * it will continue. + */ dout("filldir stopping us...\n"); return 0; } @@ -628,10 +640,12 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence) switch (whence) { case SEEK_CUR: offset += file->f_pos; + break; case SEEK_SET: break; case SEEK_END: retval = -EOPNOTSUPP; + goto out; default: goto out; } @@ -662,25 +676,25 @@ out: /* * Handle lookups for the hidden .snap directory. */ -int ceph_handle_snapdir(struct ceph_mds_request *req, - struct dentry *dentry, int err) +struct dentry *ceph_handle_snapdir(struct ceph_mds_request *req, + struct dentry *dentry) { struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); - struct inode *parent = d_inode(dentry->d_parent); /* we hold i_mutex */ + struct inode *parent = d_inode(dentry->d_parent); /* we hold i_rwsem */ /* .snap dir? */ - if (err == -ENOENT && - ceph_snap(parent) == CEPH_NOSNAP && - strcmp(dentry->d_name.name, - fsc->mount_options->snapdir_name) == 0) { + if (ceph_snap(parent) == CEPH_NOSNAP && + strcmp(dentry->d_name.name, fsc->mount_options->snapdir_name) == 0) { + struct dentry *res; struct inode *inode = ceph_get_snapdir(parent); - dout("ENOENT on snapdir %p '%pd', linking to snapdir %p\n", - dentry, dentry, inode); - BUG_ON(!d_unhashed(dentry)); - d_add(dentry, inode); - err = 0; + + res = d_splice_alias(inode, dentry); + dout("ENOENT on snapdir %p '%pd', linking to snapdir %p. Spliced dentry %p\n", + dentry, dentry, inode, res); + if (res) + dentry = res; } - return err; + return dentry; } /* @@ -734,7 +748,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, unsigned int flags) { struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_mds_request *req; int op; int mask; @@ -752,14 +766,15 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, struct ceph_dentry_info *di = ceph_dentry(dentry); spin_lock(&ci->i_ceph_lock); - dout(" dir %p flags are %d\n", dir, ci->i_ceph_flags); + dout(" dir %p flags are 0x%lx\n", dir, ci->i_ceph_flags); if (strncmp(dentry->d_name.name, fsc->mount_options->snapdir_name, dentry->d_name.len) && !is_root_ceph_dentry(dir, dentry) && ceph_test_mount_opt(fsc, DCACHE) && __ceph_dir_is_complete(ci) && - (__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1))) { + __ceph_caps_issued_mask_metric(ci, CEPH_CAP_FILE_SHARED, 1)) { + __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD); spin_unlock(&ci->i_ceph_lock); dout(" dir %p complete, -ENOENT\n", dir); d_add(dentry, NULL); @@ -782,10 +797,21 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry, mask |= CEPH_CAP_XATTR_SHARED; req->r_args.getattr.mask = cpu_to_le32(mask); + ihold(dir); req->r_parent = dir; set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); err = ceph_mdsc_do_request(mdsc, NULL, req); - err = ceph_handle_snapdir(req, dentry, err); + if (err == -ENOENT) { + struct dentry *res; + + res = ceph_handle_snapdir(req, dentry); + if (IS_ERR(res)) { + err = PTR_ERR(res); + } else { + dentry = res; + err = 0; + } + } dentry = ceph_finish_lookup(req, dentry, err); ceph_mdsc_put_request(req); /* will dput(dentry) */ dout("lookup result=%p\n", dentry); @@ -819,11 +845,10 @@ int ceph_handle_notrace_create(struct inode *dir, struct dentry *dentry) return PTR_ERR(result); } -static int ceph_mknod(struct inode *dir, struct dentry *dentry, - umode_t mode, dev_t rdev) +static int ceph_mknod(struct user_namespace *mnt_userns, struct inode *dir, + struct dentry *dentry, umode_t mode, dev_t rdev) { - struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_mds_request *req; struct ceph_acl_sec_ctx as_ctx = {}; int err; @@ -831,6 +856,10 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_quota_is_max_files_exceeded(dir)) { err = -EDQUOT; goto out; @@ -853,6 +882,7 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry, req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mknod.mode = cpu_to_le32(mode); req->r_args.mknod.rdev = cpu_to_le32(rdev); @@ -875,17 +905,16 @@ out: return err; } -static int ceph_create(struct inode *dir, struct dentry *dentry, umode_t mode, - bool excl) +static int ceph_create(struct user_namespace *mnt_userns, struct inode *dir, + struct dentry *dentry, umode_t mode, bool excl) { - return ceph_mknod(dir, dentry, mode, 0); + return ceph_mknod(mnt_userns, dir, dentry, mode, 0); } -static int ceph_symlink(struct inode *dir, struct dentry *dentry, - const char *dest) +static int ceph_symlink(struct user_namespace *mnt_userns, struct inode *dir, + struct dentry *dentry, const char *dest) { - struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_mds_request *req; struct ceph_acl_sec_ctx as_ctx = {}; int err; @@ -893,6 +922,10 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_quota_is_max_files_exceeded(dir)) { err = -EDQUOT; goto out; @@ -915,11 +948,17 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry, goto out; } req->r_parent = dir; + ihold(dir); + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; + if (as_ctx.pagelist) { + req->r_pagelist = as_ctx.pagelist; + as_ctx.pagelist = NULL; + } err = ceph_mdsc_do_request(mdsc, dir, req); if (!err && !req->r_reply_info.head->is_dentry) err = ceph_handle_notrace_create(dir, dentry); @@ -931,15 +970,19 @@ out: return err; } -static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) +static int ceph_mkdir(struct user_namespace *mnt_userns, struct inode *dir, + struct dentry *dentry, umode_t mode) { - struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_mds_request *req; struct ceph_acl_sec_ctx as_ctx = {}; - int err = -EROFS; + int err; int op; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_snap(dir) == CEPH_SNAPDIR) { /* mkdir .snap/foo is a MKSNAP */ op = CEPH_MDS_OP_MKSNAP; @@ -949,6 +992,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) dout("mkdir dir %p dn %p mode 0%ho\n", dir, dentry, mode); op = CEPH_MDS_OP_MKDIR; } else { + err = -EROFS; goto out; } @@ -975,6 +1019,7 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_args.mkdir.mode = cpu_to_le32(mode); req->r_dentry_drop = CEPH_CAP_FILE_SHARED | CEPH_CAP_AUTH_EXCL; @@ -1001,11 +1046,14 @@ out: static int ceph_link(struct dentry *old_dentry, struct inode *dir, struct dentry *dentry) { - struct ceph_fs_client *fsc = ceph_sb_to_client(dir->i_sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(dir->i_sb); struct ceph_mds_request *req; int err; + err = ceph_wait_on_conflict_unlink(dentry); + if (err) + return err; + if (ceph_snap(dir) != CEPH_NOSNAP) return -EROFS; @@ -1020,6 +1068,7 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, req->r_num_caps = 2; req->r_old_dentry = dget(old_dentry); req->r_parent = dir; + ihold(dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -1036,6 +1085,96 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir, return err; } +static void ceph_async_unlink_cb(struct ceph_mds_client *mdsc, + struct ceph_mds_request *req) +{ + struct dentry *dentry = req->r_dentry; + struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); + struct ceph_dentry_info *di = ceph_dentry(dentry); + int result = req->r_err ? req->r_err : + le32_to_cpu(req->r_reply_info.head->result); + + if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags)) + pr_warn("%s dentry %p:%pd async unlink bit is not set\n", + __func__, dentry, dentry); + + spin_lock(&fsc->async_unlink_conflict_lock); + hash_del_rcu(&di->hnode); + spin_unlock(&fsc->async_unlink_conflict_lock); + + spin_lock(&dentry->d_lock); + di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK; + wake_up_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT); + spin_unlock(&dentry->d_lock); + + synchronize_rcu(); + + if (result == -EJUKEBOX) + goto out; + + /* If op failed, mark everyone involved for errors */ + if (result) { + int pathlen = 0; + u64 base = 0; + char *path = ceph_mdsc_build_path(dentry, &pathlen, + &base, 0); + + /* mark error on parent + clear complete */ + mapping_set_error(req->r_parent->i_mapping, result); + ceph_dir_clear_complete(req->r_parent); + + /* drop the dentry -- we don't know its status */ + if (!d_unhashed(dentry)) + d_drop(dentry); + + /* mark inode itself for an error (since metadata is bogus) */ + mapping_set_error(req->r_old_inode->i_mapping, result); + + pr_warn("async unlink failure path=(%llx)%s result=%d!\n", + base, IS_ERR(path) ? "<<bad>>" : path, result); + ceph_mdsc_free_path(path, pathlen); + } +out: + iput(req->r_old_inode); + ceph_mdsc_release_dir_caps(req); +} + +static int get_caps_for_async_unlink(struct inode *dir, struct dentry *dentry) +{ + struct ceph_inode_info *ci = ceph_inode(dir); + struct ceph_dentry_info *di; + int got = 0, want = CEPH_CAP_FILE_EXCL | CEPH_CAP_DIR_UNLINK; + + spin_lock(&ci->i_ceph_lock); + if ((__ceph_caps_issued(ci, NULL) & want) == want) { + ceph_take_cap_refs(ci, want, false); + got = want; + } + spin_unlock(&ci->i_ceph_lock); + + /* If we didn't get anything, return 0 */ + if (!got) + return 0; + + spin_lock(&dentry->d_lock); + di = ceph_dentry(dentry); + /* + * - We are holding Fx, which implies Fs caps. + * - Only support async unlink for primary linkage + */ + if (atomic_read(&ci->i_shared_gen) != di->lease_shared_gen || + !(di->flags & CEPH_DENTRY_PRIMARY_LINK)) + want = 0; + spin_unlock(&dentry->d_lock); + + /* Do we still want what we've got? */ + if (want == got) + return got; + + ceph_put_cap_refs(ci, got); + return 0; +} + /* * rmdir and unlink are differ only by the metadata op code */ @@ -1045,6 +1184,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) struct ceph_mds_client *mdsc = fsc->mdsc; struct inode *inode = d_inode(dentry); struct ceph_mds_request *req; + bool try_async = ceph_test_mount_opt(fsc, ASYNC_DIROPS); int err = -EROFS; int op; @@ -1059,6 +1199,7 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) CEPH_MDS_OP_RMDIR : CEPH_MDS_OP_UNLINK; } else goto out; +retry: req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); if (IS_ERR(req)) { err = PTR_ERR(req); @@ -1067,24 +1208,72 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; - set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); + ihold(dir); req->r_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_dentry_unless = CEPH_CAP_FILE_EXCL; req->r_inode_drop = ceph_drop_caps_for_unlink(inode); - err = ceph_mdsc_do_request(mdsc, dir, req); - if (!err && !req->r_reply_info.head->is_dentry) - d_delete(dentry); + + if (try_async && op == CEPH_MDS_OP_UNLINK && + (req->r_dir_caps = get_caps_for_async_unlink(dir, dentry))) { + struct ceph_dentry_info *di = ceph_dentry(dentry); + + dout("async unlink on %llu/%.*s caps=%s", ceph_ino(dir), + dentry->d_name.len, dentry->d_name.name, + ceph_cap_string(req->r_dir_caps)); + set_bit(CEPH_MDS_R_ASYNC, &req->r_req_flags); + req->r_callback = ceph_async_unlink_cb; + req->r_old_inode = d_inode(dentry); + ihold(req->r_old_inode); + + spin_lock(&dentry->d_lock); + di->flags |= CEPH_DENTRY_ASYNC_UNLINK; + spin_unlock(&dentry->d_lock); + + spin_lock(&fsc->async_unlink_conflict_lock); + hash_add_rcu(fsc->async_unlink_conflict, &di->hnode, + dentry->d_name.hash); + spin_unlock(&fsc->async_unlink_conflict_lock); + + err = ceph_mdsc_submit_request(mdsc, dir, req); + if (!err) { + /* + * We have enough caps, so we assume that the unlink + * will succeed. Fix up the target inode and dcache. + */ + drop_nlink(inode); + d_delete(dentry); + } else { + spin_lock(&fsc->async_unlink_conflict_lock); + hash_del_rcu(&di->hnode); + spin_unlock(&fsc->async_unlink_conflict_lock); + + spin_lock(&dentry->d_lock); + di->flags &= ~CEPH_DENTRY_ASYNC_UNLINK; + spin_unlock(&dentry->d_lock); + + if (err == -EJUKEBOX) { + try_async = false; + ceph_mdsc_put_request(req); + goto retry; + } + } + } else { + set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); + err = ceph_mdsc_do_request(mdsc, dir, req); + if (!err && !req->r_reply_info.head->is_dentry) + d_delete(dentry); + } + ceph_mdsc_put_request(req); out: return err; } -static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, - struct inode *new_dir, struct dentry *new_dentry, - unsigned int flags) +static int ceph_rename(struct user_namespace *mnt_userns, struct inode *old_dir, + struct dentry *old_dentry, struct inode *new_dir, + struct dentry *new_dentry, unsigned int flags) { - struct ceph_fs_client *fsc = ceph_sb_to_client(old_dir->i_sb); - struct ceph_mds_client *mdsc = fsc->mdsc; + struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(old_dir->i_sb); struct ceph_mds_request *req; int op = CEPH_MDS_OP_RENAME; int err; @@ -1105,6 +1294,10 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, (!ceph_quota_is_same_realm(old_dir, new_dir))) return -EXDEV; + err = ceph_wait_on_conflict_unlink(new_dentry); + if (err) + return err; + dout("rename dir %p dentry %p to dir %p dentry %p\n", old_dir, old_dentry, new_dir, new_dentry); req = ceph_mdsc_create_request(mdsc, op, USE_AUTH_MDS); @@ -1116,6 +1309,7 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry, req->r_old_dentry = dget(old_dentry); req->r_old_dentry_dir = old_dir; req->r_parent = new_dir; + ihold(new_dir); set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags); req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED; req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL; @@ -1411,6 +1605,7 @@ void ceph_invalidate_dentry_lease(struct dentry *dentry) spin_lock(&dentry->d_lock); di->time = jiffies; di->lease_shared_gen = 0; + di->flags &= ~CEPH_DENTRY_PRIMARY_LINK; __dentry_lease_unlist(di); spin_unlock(&dentry->d_lock); } @@ -1431,10 +1626,8 @@ static bool __dentry_lease_is_valid(struct ceph_dentry_info *di) u32 gen; unsigned long ttl; - spin_lock(&session->s_gen_ttl_lock); - gen = session->s_cap_gen; + gen = atomic_read(&session->s_cap_gen); ttl = session->s_cap_ttl; - spin_unlock(&session->s_gen_ttl_lock); if (di->lease_gen == gen && time_before(jiffies, ttl) && @@ -1520,7 +1713,8 @@ static int __dir_lease_try_check(const struct dentry *dentry) /* * Check if directory-wide content lease/cap is valid. */ -static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) +static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry, + struct ceph_mds_client *mdsc) { struct ceph_inode_info *ci = ceph_inode(dir); int valid; @@ -1528,7 +1722,10 @@ static int dir_lease_is_valid(struct inode *dir, struct dentry *dentry) spin_lock(&ci->i_ceph_lock); valid = __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1); - shared_gen = atomic_read(&ci->i_shared_gen); + if (valid) { + __ceph_touch_fmode(ci, mdsc, CEPH_FILE_MODE_RD); + shared_gen = atomic_read(&ci->i_shared_gen); + } spin_unlock(&ci->i_ceph_lock); if (valid) { struct ceph_dentry_info *di; @@ -1554,6 +1751,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) int valid = 0; struct dentry *parent; struct inode *dir, *inode; + struct ceph_mds_client *mdsc; if (flags & LOOKUP_RCU) { parent = READ_ONCE(dentry->d_parent); @@ -1570,6 +1768,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) dout("d_revalidate %p '%pd' inode %p offset 0x%llx\n", dentry, dentry, inode, ceph_dentry(dentry)->offset); + mdsc = ceph_sb_to_client(dir->i_sb)->mdsc; + /* always trust cached snapped dentries, snapdir dentry */ if (ceph_snap(dir) != CEPH_NOSNAP) { dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry, @@ -1581,7 +1781,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) valid = dentry_lease_is_valid(dentry, flags); if (valid == -ECHILD) return valid; - if (valid || dir_lease_is_valid(dir, dentry)) { + if (valid || dir_lease_is_valid(dir, dentry, mdsc)) { if (inode) valid = ceph_is_any_caps(inode); else @@ -1590,8 +1790,6 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) } if (!valid) { - struct ceph_mds_client *mdsc = - ceph_sb_to_client(dir->i_sb)->mdsc; struct ceph_mds_request *req; int op, err; u32 mask; @@ -1599,6 +1797,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) if (flags & LOOKUP_RCU) return -ECHILD; + percpu_counter_inc(&mdsc->metric.d_lease_mis); + op = ceph_snap(dir) == CEPH_SNAPDIR ? CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP; req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS); @@ -1606,6 +1806,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) req->r_dentry = dget(dentry); req->r_num_caps = 2; req->r_parent = dir; + ihold(dir); mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED; if (ceph_security_xattr_wanted(dir)) @@ -1622,7 +1823,7 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) case -ENOENT: if (d_really_is_negative(dentry)) valid = 1; - /* Fallthrough */ + fallthrough; default: break; } @@ -1630,6 +1831,8 @@ static int ceph_d_revalidate(struct dentry *dentry, unsigned int flags) dout("d_revalidate %p lookup result=%d\n", dentry, err); } + } else { + percpu_counter_inc(&mdsc->metric.d_lease_hit); } dout("d_revalidate %p %s\n", dentry, valid ? "valid" : "invalid"); @@ -1672,16 +1875,18 @@ static int ceph_d_delete(const struct dentry *dentry) static void ceph_d_release(struct dentry *dentry) { struct ceph_dentry_info *di = ceph_dentry(dentry); + struct ceph_fs_client *fsc = ceph_sb_to_client(dentry->d_sb); dout("d_release %p\n", dentry); + atomic64_dec(&fsc->mdsc->metric.total_dentries); + spin_lock(&dentry->d_lock); __dentry_lease_unlist(di); dentry->d_fsdata = NULL; spin_unlock(&dentry->d_lock); - if (di->lease_session) - ceph_put_mds_session(di->lease_session); + ceph_put_mds_session(di->lease_session); kmem_cache_free(ceph_dentry_cachep, di); } |