diff options
Diffstat (limited to 'drivers/staging/lustre/lustre/osc/osc_lock.c')
-rw-r--r-- | drivers/staging/lustre/lustre/osc/osc_lock.c | 140 |
1 files changed, 76 insertions, 64 deletions
diff --git a/drivers/staging/lustre/lustre/osc/osc_lock.c b/drivers/staging/lustre/lustre/osc/osc_lock.c index 71f2810d18b9..013df9787f3e 100644 --- a/drivers/staging/lustre/lustre/osc/osc_lock.c +++ b/drivers/staging/lustre/lustre/osc/osc_lock.c @@ -79,7 +79,7 @@ static struct ldlm_lock *osc_handle_ptr(struct lustre_handle *handle) struct ldlm_lock *lock; lock = ldlm_handle2lock(handle); - if (lock != NULL) + if (lock) LDLM_LOCK_PUT(lock); return lock; } @@ -94,42 +94,40 @@ static int osc_lock_invariant(struct osc_lock *ols) int handle_used = lustre_handle_is_used(&ols->ols_handle); if (ergo(osc_lock_is_lockless(ols), - ols->ols_locklessable && ols->ols_lock == NULL)) + ols->ols_locklessable && !ols->ols_lock)) return 1; /* * If all the following "ergo"s are true, return 1, otherwise 0 */ - if (!ergo(olock != NULL, handle_used)) + if (!ergo(olock, handle_used)) return 0; - if (!ergo(olock != NULL, - olock->l_handle.h_cookie == ols->ols_handle.cookie)) + if (!ergo(olock, olock->l_handle.h_cookie == ols->ols_handle.cookie)) return 0; if (!ergo(handle_used, - ergo(lock != NULL && olock != NULL, lock == olock) && - ergo(lock == NULL, olock == NULL))) + ergo(lock && olock, lock == olock) && + ergo(!lock, !olock))) return 0; /* * Check that ->ols_handle and ->ols_lock are consistent, but * take into account that they are set at the different time. */ if (!ergo(ols->ols_state == OLS_CANCELLED, - olock == NULL && !handle_used)) + !olock && !handle_used)) return 0; /* * DLM lock is destroyed only after we have seen cancellation * ast. */ - if (!ergo(olock != NULL && ols->ols_state < OLS_CANCELLED, - ((olock->l_flags & LDLM_FL_DESTROYED) == 0))) + if (!ergo(olock && ols->ols_state < OLS_CANCELLED, + ((olock->l_flags & LDLM_FL_DESTROYED) == 0))) return 0; if (!ergo(ols->ols_state == OLS_GRANTED, - olock != NULL && - olock->l_req_mode == olock->l_granted_mode && - ols->ols_hold)) + olock && olock->l_req_mode == olock->l_granted_mode && + ols->ols_hold)) return 0; return 1; } @@ -149,14 +147,15 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) spin_lock(&osc_ast_guard); dlmlock = olck->ols_lock; - if (dlmlock == NULL) { + if (!dlmlock) { spin_unlock(&osc_ast_guard); return; } olck->ols_lock = NULL; /* wb(); --- for all who checks (ols->ols_lock != NULL) before - * call to osc_lock_detach() */ + * call to osc_lock_detach() + */ dlmlock->l_ast_data = NULL; olck->ols_handle.cookie = 0ULL; spin_unlock(&osc_ast_guard); @@ -171,7 +170,8 @@ static void osc_lock_detach(const struct lu_env *env, struct osc_lock *olck) /* Must get the value under the lock to avoid possible races. */ old_kms = cl2osc(obj)->oo_oinfo->loi_kms; /* Update the kms. Need to loop all granted locks. - * Not a problem for the client */ + * Not a problem for the client + */ attr->cat_kms = ldlm_extent_shift_kms(dlmlock, old_kms); cl_object_attr_set(env, obj, attr, CAT_KMS); @@ -223,8 +223,7 @@ static int osc_lock_unuse(const struct lu_env *env, /* * Move lock into OLS_RELEASED state before calling * osc_cancel_base() so that possible synchronous cancellation - * (that always happens e.g., for liblustre) sees that lock is - * released. + * sees that lock is released. */ ols->ols_state = OLS_RELEASED; return osc_lock_unhold(ols); @@ -247,7 +246,7 @@ static void osc_lock_fini(const struct lu_env *env, * lock is destroyed immediately after upcall. */ osc_lock_unhold(ols); - LASSERT(ols->ols_lock == NULL); + LASSERT(!ols->ols_lock); LASSERT(atomic_read(&ols->ols_pageref) == 0 || atomic_read(&ols->ols_pageref) == _PAGEREF_MAGIC); @@ -292,7 +291,7 @@ static struct osc_lock *osc_ast_data_get(struct ldlm_lock *dlm_lock) lock_res_and_lock(dlm_lock); spin_lock(&osc_ast_guard); olck = dlm_lock->l_ast_data; - if (olck != NULL) { + if (olck) { struct cl_lock *lock = olck->ols_cl.cls_lock; /* * If osc_lock holds a reference on ldlm lock, return it even @@ -359,13 +358,13 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, __u64 size; dlmlock = olck->ols_lock; - LASSERT(dlmlock != NULL); /* re-grab LVB from a dlm lock under DLM spin-locks. */ *lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; size = lvb->lvb_size; /* Extend KMS up to the end of this lock and no further - * A lock on [x,y] means a KMS of up to y + 1 bytes! */ + * A lock on [x,y] means a KMS of up to y + 1 bytes! + */ if (size > dlmlock->l_policy_data.l_extent.end) size = dlmlock->l_policy_data.l_extent.end + 1; if (size >= oinfo->loi_kms) { @@ -429,7 +428,8 @@ static void osc_lock_granted(const struct lu_env *env, struct osc_lock *olck, * to take a semaphore on a parent lock. This is safe, because * spin-locks are needed to protect consistency of * dlmlock->l_*_mode and LVB, and we have finished processing - * them. */ + * them. + */ unlock_res_and_lock(dlmlock); cl_lock_modify(env, lock, descr); cl_lock_signal(env, lock); @@ -444,12 +444,12 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) struct ldlm_lock *dlmlock; dlmlock = ldlm_handle2lock_long(&olck->ols_handle, 0); - LASSERT(dlmlock != NULL); + LASSERT(dlmlock); lock_res_and_lock(dlmlock); spin_lock(&osc_ast_guard); LASSERT(dlmlock->l_ast_data == olck); - LASSERT(olck->ols_lock == NULL); + LASSERT(!olck->ols_lock); olck->ols_lock = dlmlock; spin_unlock(&osc_ast_guard); @@ -470,7 +470,8 @@ static void osc_lock_upcall0(const struct lu_env *env, struct osc_lock *olck) olck->ols_hold = 1; /* lock reference taken by ldlm_handle2lock_long() is owned by - * osc_lock and released in osc_lock_detach() */ + * osc_lock and released in osc_lock_detach() + */ lu_ref_add(&dlmlock->l_reference, "osc_lock", olck); olck->ols_has_ref = 1; } @@ -508,10 +509,10 @@ static int osc_lock_upcall(void *cookie, int errcode) struct ldlm_lock *dlmlock; dlmlock = ldlm_handle2lock(&olck->ols_handle); - if (dlmlock != NULL) { + if (dlmlock) { lock_res_and_lock(dlmlock); spin_lock(&osc_ast_guard); - LASSERT(olck->ols_lock == NULL); + LASSERT(!olck->ols_lock); dlmlock->l_ast_data = NULL; olck->ols_handle.cookie = 0ULL; spin_unlock(&osc_ast_guard); @@ -548,7 +549,8 @@ static int osc_lock_upcall(void *cookie, int errcode) /* For AGL case, the RPC sponsor may exits the cl_lock * processing without wait() called before related OSC * lock upcall(). So update the lock status according - * to the enqueue result inside AGL upcall(). */ + * to the enqueue result inside AGL upcall(). + */ if (olck->ols_agl) { lock->cll_flags |= CLF_FROM_UPCALL; cl_wait_try(env, lock); @@ -571,7 +573,8 @@ static int osc_lock_upcall(void *cookie, int errcode) lu_ref_del(&lock->cll_reference, "upcall", lock); /* This maybe the last reference, so must be called after - * cl_lock_mutex_put(). */ + * cl_lock_mutex_put(). + */ cl_lock_put(env, lock); cl_env_nested_put(&nest, env); @@ -634,7 +637,7 @@ static int osc_dlm_blocking_ast0(const struct lu_env *env, cancel = 0; olck = osc_ast_data_get(dlmlock); - if (olck != NULL) { + if (olck) { lock = olck->ols_cl.cls_lock; cl_lock_mutex_get(env, lock); LINVRNT(osc_lock_invariant(olck)); @@ -786,17 +789,17 @@ static int osc_ldlm_completion_ast(struct ldlm_lock *dlmlock, env = cl_env_nested_get(&nest); if (!IS_ERR(env)) { olck = osc_ast_data_get(dlmlock); - if (olck != NULL) { + if (olck) { lock = olck->ols_cl.cls_lock; cl_lock_mutex_get(env, lock); /* * ldlm_handle_cp_callback() copied LVB from request * to lock->l_lvb_data, store it in osc_lock. */ - LASSERT(dlmlock->l_lvb_data != NULL); + LASSERT(dlmlock->l_lvb_data); lock_res_and_lock(dlmlock); olck->ols_lvb = *(struct ost_lvb *)dlmlock->l_lvb_data; - if (olck->ols_lock == NULL) { + if (!olck->ols_lock) { /* * upcall (osc_lock_upcall()) hasn't yet been * called. Do nothing now, upcall will bind @@ -850,14 +853,15 @@ static int osc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data) * environment. */ olck = osc_ast_data_get(dlmlock); - if (olck != NULL) { + if (olck) { lock = olck->ols_cl.cls_lock; /* Do not grab the mutex of cl_lock for glimpse. * See LU-1274 for details. * BTW, it's okay for cl_lock to be cancelled during * this period because server can handle this race. * See ldlm_server_glimpse_ast() for details. - * cl_lock_mutex_get(env, lock); */ + * cl_lock_mutex_get(env, lock); + */ cap = &req->rq_pill; req_capsule_extend(cap, &RQF_LDLM_GL_CALLBACK); req_capsule_set_size(cap, &RMF_DLM_LVB, RCL_SERVER, @@ -1017,7 +1021,8 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, LASSERT(cl_lock_is_mutexed(lock)); /* make it enqueue anyway for glimpse lock, because we actually - * don't need to cancel any conflicting locks. */ + * don't need to cancel any conflicting locks. + */ if (olck->ols_glimpse) return 0; @@ -1051,7 +1056,8 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, * imagine that client has PR lock on [0, 1000], and thread T0 * is doing lockless IO in [500, 1500] region. Concurrent * thread T1 can see lockless data in [500, 1000], which is - * wrong, because these data are possibly stale. */ + * wrong, because these data are possibly stale. + */ if (!lockless && osc_lock_compatible(olck, scan_ols)) continue; @@ -1074,7 +1080,7 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, } else { CDEBUG(D_DLMTRACE, "lock %p is conflicted with %p, will wait\n", lock, conflict); - LASSERT(lock->cll_conflict == NULL); + LASSERT(!lock->cll_conflict); lu_ref_add(&conflict->cll_reference, "cancel-wait", lock); lock->cll_conflict = conflict; @@ -1111,7 +1117,7 @@ static int osc_lock_enqueue(const struct lu_env *env, "Impossible state: %d\n", ols->ols_state); LASSERTF(ergo(ols->ols_glimpse, lock->cll_descr.cld_mode <= CLM_READ), - "lock = %p, ols = %p\n", lock, ols); + "lock = %p, ols = %p\n", lock, ols); result = osc_lock_enqueue_wait(env, ols); if (result == 0) { @@ -1123,7 +1129,8 @@ static int osc_lock_enqueue(const struct lu_env *env, struct ldlm_enqueue_info *einfo = &ols->ols_einfo; /* lock will be passed as upcall cookie, - * hold ref to prevent to be released. */ + * hold ref to prevent to be released. + */ cl_lock_hold_add(env, lock, "upcall", lock); /* a user for lock also */ cl_lock_user_add(env, lock); @@ -1137,12 +1144,12 @@ static int osc_lock_enqueue(const struct lu_env *env, ostid_build_res_name(&obj->oo_oinfo->loi_oi, resname); osc_lock_build_policy(env, lock, policy); result = osc_enqueue_base(osc_export(obj), resname, - &ols->ols_flags, policy, - &ols->ols_lvb, - obj->oo_oinfo->loi_kms_valid, - osc_lock_upcall, - ols, einfo, &ols->ols_handle, - PTLRPCD_SET, 1, ols->ols_agl); + &ols->ols_flags, policy, + &ols->ols_lvb, + obj->oo_oinfo->loi_kms_valid, + osc_lock_upcall, + ols, einfo, &ols->ols_handle, + PTLRPCD_SET, 1, ols->ols_agl); if (result != 0) { cl_lock_user_del(env, lock); cl_lock_unhold(env, lock, "upcall", lock); @@ -1174,7 +1181,8 @@ static int osc_lock_wait(const struct lu_env *env, } else if (olck->ols_agl) { if (lock->cll_flags & CLF_FROM_UPCALL) /* It is from enqueue RPC reply upcall for - * updating state. Do not re-enqueue. */ + * updating state. Do not re-enqueue. + */ return -ENAVAIL; olck->ols_state = OLS_NEW; } else { @@ -1197,7 +1205,7 @@ static int osc_lock_wait(const struct lu_env *env, } LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED && - lock->cll_error == 0, olck->ols_lock != NULL)); + lock->cll_error == 0, olck->ols_lock)); return lock->cll_error ?: olck->ols_state >= OLS_GRANTED ? 0 : CLO_WAIT; } @@ -1235,7 +1243,8 @@ static int osc_lock_use(const struct lu_env *env, LASSERT(lock->cll_state == CLS_INTRANSIT); LASSERT(lock->cll_users > 0); /* set a flag for osc_dlm_blocking_ast0() to signal the - * lock.*/ + * lock. + */ olck->ols_ast_wait = 1; rc = CLO_WAIT; } @@ -1257,11 +1266,12 @@ static int osc_lock_flush(struct osc_lock *ols, int discard) if (descr->cld_mode >= CLM_WRITE) { result = osc_cache_writeback_range(env, obj, - descr->cld_start, descr->cld_end, - 1, discard); + descr->cld_start, + descr->cld_end, + 1, discard); LDLM_DEBUG(ols->ols_lock, - "lock %p: %d pages were %s.\n", lock, result, - discard ? "discarded" : "written"); + "lock %p: %d pages were %s.\n", lock, result, + discard ? "discarded" : "written"); if (result > 0) result = 0; } @@ -1306,7 +1316,7 @@ static void osc_lock_cancel(const struct lu_env *env, LASSERT(cl_lock_is_mutexed(lock)); LINVRNT(osc_lock_invariant(olck)); - if (dlmlock != NULL) { + if (dlmlock) { int do_cancel; discard = !!(dlmlock->l_flags & LDLM_FL_DISCARD_DATA); @@ -1318,7 +1328,8 @@ static void osc_lock_cancel(const struct lu_env *env, /* Now that we're the only user of dlm read/write reference, * mostly the ->l_readers + ->l_writers should be zero. * However, there is a corner case. - * See bug 18829 for details.*/ + * See bug 18829 for details. + */ do_cancel = (dlmlock->l_readers == 0 && dlmlock->l_writers == 0); dlmlock->l_flags |= LDLM_FL_CBPENDING; @@ -1382,7 +1393,7 @@ static void osc_lock_state(const struct lu_env *env, if (state == CLS_HELD && slice->cls_lock->cll_state != CLS_HELD) { struct osc_io *oio = osc_env_io(env); - LASSERT(lock->ols_owner == NULL); + LASSERT(!lock->ols_owner); lock->ols_owner = oio; } else if (state != CLS_HELD) lock->ols_owner = NULL; @@ -1517,7 +1528,8 @@ static void osc_lock_lockless_state(const struct lu_env *env, lock->ols_owner = oio; /* set the io to be lockless if this lock is for io's - * host object */ + * host object + */ if (cl_object_same(oio->oi_cl.cis_obj, slice->cls_obj)) oio->oi_lockless = 1; } @@ -1555,8 +1567,8 @@ int osc_lock_init(const struct lu_env *env, struct osc_lock *clk; int result; - clk = kmem_cache_alloc(osc_lock_kmem, GFP_NOFS | __GFP_ZERO); - if (clk != NULL) { + clk = kmem_cache_zalloc(osc_lock_kmem, GFP_NOFS); + if (clk) { __u32 enqflags = lock->cll_descr.cld_enq_flags; osc_lock_build_einfo(env, lock, clk, &clk->ols_einfo); @@ -1578,8 +1590,8 @@ int osc_lock_init(const struct lu_env *env, if (clk->ols_locklessable && !(enqflags & CEF_DISCARD_DATA)) clk->ols_flags |= LDLM_FL_DENY_ON_CONTENTION; - LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx\n", - lock, clk, clk->ols_flags); + LDLM_DEBUG_NOLOCK("lock %p, osc lock %p, flags %llx", + lock, clk, clk->ols_flags); result = 0; } else @@ -1599,9 +1611,9 @@ int osc_dlm_lock_pageref(struct ldlm_lock *dlm) * doesn't matter because in the worst case we don't cancel a lock * which we actually can, that's no harm. */ - if (olock != NULL && + if (olock && atomic_add_return(_PAGEREF_MAGIC, - &olock->ols_pageref) != _PAGEREF_MAGIC) { + &olock->ols_pageref) != _PAGEREF_MAGIC) { atomic_sub(_PAGEREF_MAGIC, &olock->ols_pageref); rc = 1; } |