diff options
Diffstat (limited to 'fs/xfs/xfs_log_cil.c')
-rw-r--r-- | fs/xfs/xfs_log_cil.c | 859 |
1 files changed, 607 insertions, 252 deletions
diff --git a/fs/xfs/xfs_log_cil.c b/fs/xfs/xfs_log_cil.c index 83a039762b81..eccbfb99e894 100644 --- a/fs/xfs/xfs_log_cil.c +++ b/fs/xfs/xfs_log_cil.c @@ -37,16 +37,59 @@ xlog_cil_ticket_alloc( { struct xlog_ticket *tic; - tic = xlog_ticket_alloc(log, 0, 1, XFS_TRANSACTION, 0); + tic = xlog_ticket_alloc(log, 0, 1, 0); /* * set the current reservation to zero so we know to steal the basic * transaction overhead reservation from the first transaction commit. */ tic->t_curr_res = 0; + tic->t_iclog_hdrs = 0; return tic; } +static inline void +xlog_cil_set_iclog_hdr_count(struct xfs_cil *cil) +{ + struct xlog *log = cil->xc_log; + + atomic_set(&cil->xc_iclog_hdrs, + (XLOG_CIL_BLOCKING_SPACE_LIMIT(log) / + (log->l_iclog_size - log->l_iclog_hsize))); +} + +/* + * Check if the current log item was first committed in this sequence. + * We can't rely on just the log item being in the CIL, we have to check + * the recorded commit sequence number. + * + * Note: for this to be used in a non-racy manner, it has to be called with + * CIL flushing locked out. As a result, it should only be used during the + * transaction commit process when deciding what to format into the item. + */ +static bool +xlog_item_in_current_chkpt( + struct xfs_cil *cil, + struct xfs_log_item *lip) +{ + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) + return false; + + /* + * li_seq is written on the first commit of a log item to record the + * first checkpoint it is written to. Hence if it is different to the + * current sequence, we're in a new checkpoint. + */ + return lip->li_seq == READ_ONCE(cil->xc_current_sequence); +} + +bool +xfs_log_item_in_current_chkpt( + struct xfs_log_item *lip) +{ + return xlog_item_in_current_chkpt(lip->li_log->l_cilp, lip); +} + /* * Unavoidable forward declaration - xlog_cil_push_work() calls * xlog_cil_ctx_alloc() itself. @@ -61,15 +104,88 @@ xlog_cil_ctx_alloc(void) ctx = kmem_zalloc(sizeof(*ctx), KM_NOFS); INIT_LIST_HEAD(&ctx->committing); INIT_LIST_HEAD(&ctx->busy_extents); + INIT_LIST_HEAD(&ctx->log_items); + INIT_LIST_HEAD(&ctx->lv_chain); INIT_WORK(&ctx->push_work, xlog_cil_push_work); return ctx; } +/* + * Aggregate the CIL per cpu structures into global counts, lists, etc and + * clear the percpu state ready for the next context to use. This is called + * from the push code with the context lock held exclusively, hence nothing else + * will be accessing or modifying the per-cpu counters. + */ +static void +xlog_cil_push_pcp_aggregate( + struct xfs_cil *cil, + struct xfs_cil_ctx *ctx) +{ + struct xlog_cil_pcp *cilpcp; + int cpu; + + for_each_online_cpu(cpu) { + cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); + + ctx->ticket->t_curr_res += cilpcp->space_reserved; + cilpcp->space_reserved = 0; + + if (!list_empty(&cilpcp->busy_extents)) { + list_splice_init(&cilpcp->busy_extents, + &ctx->busy_extents); + } + if (!list_empty(&cilpcp->log_items)) + list_splice_init(&cilpcp->log_items, &ctx->log_items); + + /* + * We're in the middle of switching cil contexts. Reset the + * counter we use to detect when the current context is nearing + * full. + */ + cilpcp->space_used = 0; + } +} + +/* + * Aggregate the CIL per-cpu space used counters into the global atomic value. + * This is called when the per-cpu counter aggregation will first pass the soft + * limit threshold so we can switch to atomic counter aggregation for accurate + * detection of hard limit traversal. + */ +static void +xlog_cil_insert_pcp_aggregate( + struct xfs_cil *cil, + struct xfs_cil_ctx *ctx) +{ + struct xlog_cil_pcp *cilpcp; + int cpu; + int count = 0; + + /* Trigger atomic updates then aggregate only for the first caller */ + if (!test_and_clear_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) + return; + + for_each_online_cpu(cpu) { + int old, prev; + + cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); + do { + old = cilpcp->space_used; + prev = cmpxchg(&cilpcp->space_used, old, 0); + } while (old != prev); + count += old; + } + atomic_add(count, &ctx->space_used); +} + static void xlog_cil_ctx_switch( struct xfs_cil *cil, struct xfs_cil_ctx *ctx) { + xlog_cil_set_iclog_hdr_count(cil); + set_bit(XLOG_CIL_EMPTY, &cil->xc_flags); + set_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags); ctx->sequence = ++cil->xc_current_sequence; ctx->cil = cil; cil->xc_ctx = ctx; @@ -91,6 +207,7 @@ xlog_cil_init_post_recovery( { log->l_cilp->xc_ctx->ticket = xlog_cil_ticket_alloc(log); log->l_cilp->xc_ctx->sequence = 1; + xlog_cil_set_iclog_hdr_count(log->l_cilp); } static inline int @@ -103,39 +220,6 @@ xlog_cil_iovec_space( } /* - * shadow buffers can be large, so we need to use kvmalloc() here to ensure - * success. Unfortunately, kvmalloc() only allows GFP_KERNEL contexts to fall - * back to vmalloc, so we can't actually do anything useful with gfp flags to - * control the kmalloc() behaviour within kvmalloc(). Hence kmalloc() will do - * direct reclaim and compaction in the slow path, both of which are - * horrendously expensive. We just want kmalloc to fail fast and fall back to - * vmalloc if it can't get somethign straight away from the free lists or buddy - * allocator. Hence we have to open code kvmalloc outselves here. - * - * Also, we are in memalloc_nofs_save task context here, so despite the use of - * GFP_KERNEL here, we are actually going to be doing GFP_NOFS allocations. This - * is actually the only way to make vmalloc() do GFP_NOFS allocations, so lets - * just all pretend this is a GFP_KERNEL context operation.... - */ -static inline void * -xlog_cil_kvmalloc( - size_t buf_size) -{ - gfp_t flags = GFP_KERNEL; - void *p; - - flags &= ~__GFP_DIRECT_RECLAIM; - flags |= __GFP_NOWARN | __GFP_NORETRY; - do { - p = kmalloc(buf_size, flags); - if (!p) - p = vmalloc(buf_size); - } while (!p); - - return p; -} - -/* * Allocate or pin log vector buffers for CIL insertion. * * The CIL currently uses disposable buffers for copying a snapshot of the @@ -214,13 +298,20 @@ xlog_cil_alloc_shadow_bufs( } /* - * We 64-bit align the length of each iovec so that the start - * of the next one is naturally aligned. We'll need to - * account for that slack space here. Then round nbytes up - * to 64-bit alignment so that the initial buffer alignment is - * easy to calculate and verify. + * We 64-bit align the length of each iovec so that the start of + * the next one is naturally aligned. We'll need to account for + * that slack space here. + * + * We also add the xlog_op_header to each region when + * formatting, but that's not accounted to the size of the item + * at this point. Hence we'll need an addition number of bytes + * for each vector to hold an opheader. + * + * Then round nbytes up to 64-bit alignment so that the initial + * buffer alignment is easy to calculate and verify. */ - nbytes += niovecs * sizeof(uint64_t); + nbytes += niovecs * + (sizeof(uint64_t) + sizeof(struct xlog_op_header)); nbytes = round_up(nbytes, sizeof(uint64_t)); /* @@ -244,10 +335,11 @@ xlog_cil_alloc_shadow_bufs( * storage. */ kmem_free(lip->li_lv_shadow); - lv = xlog_cil_kvmalloc(buf_size); + lv = xlog_kvmalloc(buf_size); memset(lv, 0, xlog_cil_iovec_space(niovecs)); + INIT_LIST_HEAD(&lv->lv_list); lv->lv_item = lip; lv->lv_size = buf_size; if (ordered) @@ -263,7 +355,6 @@ xlog_cil_alloc_shadow_bufs( else lv->lv_buf_len = 0; lv->lv_bytes = 0; - lv->lv_next = NULL; } /* Ensure the lv is set up according to ->iop_size */ @@ -277,22 +368,18 @@ xlog_cil_alloc_shadow_bufs( /* * Prepare the log item for insertion into the CIL. Calculate the difference in - * log space and vectors it will consume, and if it is a new item pin it as - * well. + * log space it will consume, and if it is a new item pin it as well. */ STATIC void xfs_cil_prepare_item( struct xlog *log, struct xfs_log_vec *lv, struct xfs_log_vec *old_lv, - int *diff_len, - int *diff_iovecs) + int *diff_len) { /* Account for the new LV being passed in */ - if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) { + if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) *diff_len += lv->lv_bytes; - *diff_iovecs += lv->lv_niovecs; - } /* * If there is no old LV, this is the first time we've seen the item in @@ -309,7 +396,6 @@ xfs_cil_prepare_item( ASSERT(lv->lv_buf_len != XFS_LOG_VEC_ORDERED); *diff_len -= old_lv->lv_bytes; - *diff_iovecs -= old_lv->lv_niovecs; lv->lv_item->li_lv_shadow = old_lv; } @@ -358,12 +444,10 @@ static void xlog_cil_insert_format_items( struct xlog *log, struct xfs_trans *tp, - int *diff_len, - int *diff_iovecs) + int *diff_len) { struct xfs_log_item *lip; - /* Bail out if we didn't find a log item. */ if (list_empty(&tp->t_items)) { ASSERT(0); @@ -397,7 +481,6 @@ xlog_cil_insert_format_items( if (lip->li_lv && shadow->lv_size <= lip->li_lv->lv_size) { /* same or smaller, optimise common overwrite case */ lv = lip->li_lv; - lv->lv_next = NULL; if (ordered) goto insert; @@ -406,7 +489,6 @@ xlog_cil_insert_format_items( * set the item up as though it is a new insertion so * that the space reservation accounting is correct. */ - *diff_iovecs -= lv->lv_niovecs; *diff_len -= lv->lv_bytes; /* Ensure the lv is set up according to ->iop_size */ @@ -431,11 +513,28 @@ xlog_cil_insert_format_items( ASSERT(IS_ALIGNED((unsigned long)lv->lv_buf, sizeof(uint64_t))); lip->li_ops->iop_format(lip, lv); insert: - xfs_cil_prepare_item(log, lv, old_lv, diff_len, diff_iovecs); + xfs_cil_prepare_item(log, lv, old_lv, diff_len); } } /* + * The use of lockless waitqueue_active() requires that the caller has + * serialised itself against the wakeup call in xlog_cil_push_work(). That + * can be done by either holding the push lock or the context lock. + */ +static inline bool +xlog_cil_over_hard_limit( + struct xlog *log, + int32_t space_used) +{ + if (waitqueue_active(&log->l_cilp->xc_push_wait)) + return true; + if (space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log)) + return true; + return false; +} + +/* * Insert the log items into the CIL and calculate the difference in space * consumed by the item. Add the space to the checkpoint ticket and calculate * if the change requires additional log metadata. If it does, take that space @@ -445,15 +544,17 @@ insert: static void xlog_cil_insert_items( struct xlog *log, - struct xfs_trans *tp) + struct xfs_trans *tp, + uint32_t released_space) { struct xfs_cil *cil = log->l_cilp; struct xfs_cil_ctx *ctx = cil->xc_ctx; struct xfs_log_item *lip; int len = 0; - int diff_iovecs = 0; - int iclog_space; int iovhdr_res = 0, split_res = 0, ctx_res = 0; + int space_used; + int order; + struct xlog_cil_pcp *cilpcp; ASSERT(tp); @@ -461,51 +562,114 @@ xlog_cil_insert_items( * We can do this safely because the context can't checkpoint until we * are done so it doesn't matter exactly how we update the CIL. */ - xlog_cil_insert_format_items(log, tp, &len, &diff_iovecs); + xlog_cil_insert_format_items(log, tp, &len); + + /* + * Subtract the space released by intent cancelation from the space we + * consumed so that we remove it from the CIL space and add it back to + * the current transaction reservation context. + */ + len -= released_space; + + /* + * Grab the per-cpu pointer for the CIL before we start any accounting. + * That ensures that we are running with pre-emption disabled and so we + * can't be scheduled away between split sample/update operations that + * are done without outside locking to serialise them. + */ + cilpcp = get_cpu_ptr(cil->xc_pcp); + + /* + * We need to take the CIL checkpoint unit reservation on the first + * commit into the CIL. Test the XLOG_CIL_EMPTY bit first so we don't + * unnecessarily do an atomic op in the fast path here. We can clear the + * XLOG_CIL_EMPTY bit as we are under the xc_ctx_lock here and that + * needs to be held exclusively to reset the XLOG_CIL_EMPTY bit. + */ + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) && + test_and_clear_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) + ctx_res = ctx->ticket->t_unit_res; - spin_lock(&cil->xc_cil_lock); + /* + * Check if we need to steal iclog headers. atomic_read() is not a + * locked atomic operation, so we can check the value before we do any + * real atomic ops in the fast path. If we've already taken the CIL unit + * reservation from this commit, we've already got one iclog header + * space reserved so we have to account for that otherwise we risk + * overrunning the reservation on this ticket. + * + * If the CIL is already at the hard limit, we might need more header + * space that originally reserved. So steal more header space from every + * commit that occurs once we are over the hard limit to ensure the CIL + * push won't run out of reservation space. + * + * This can steal more than we need, but that's OK. + * + * The cil->xc_ctx_lock provides the serialisation necessary for safely + * calling xlog_cil_over_hard_limit() in this context. + */ + space_used = atomic_read(&ctx->space_used) + cilpcp->space_used + len; + if (atomic_read(&cil->xc_iclog_hdrs) > 0 || + xlog_cil_over_hard_limit(log, space_used)) { + split_res = log->l_iclog_hsize + + sizeof(struct xlog_op_header); + if (ctx_res) + ctx_res += split_res * (tp->t_ticket->t_iclog_hdrs - 1); + else + ctx_res = split_res * tp->t_ticket->t_iclog_hdrs; + atomic_sub(tp->t_ticket->t_iclog_hdrs, &cil->xc_iclog_hdrs); + } + cilpcp->space_reserved += ctx_res; - /* account for space used by new iovec headers */ - iovhdr_res = diff_iovecs * sizeof(xlog_op_header_t); - len += iovhdr_res; - ctx->nvecs += diff_iovecs; + /* + * Accurately account when over the soft limit, otherwise fold the + * percpu count into the global count if over the per-cpu threshold. + */ + if (!test_bit(XLOG_CIL_PCP_SPACE, &cil->xc_flags)) { + atomic_add(len, &ctx->space_used); + } else if (cilpcp->space_used + len > + (XLOG_CIL_SPACE_LIMIT(log) / num_online_cpus())) { + space_used = atomic_add_return(cilpcp->space_used + len, + &ctx->space_used); + cilpcp->space_used = 0; + /* + * If we just transitioned over the soft limit, we need to + * transition to the global atomic counter. + */ + if (space_used >= XLOG_CIL_SPACE_LIMIT(log)) + xlog_cil_insert_pcp_aggregate(cil, ctx); + } else { + cilpcp->space_used += len; + } /* attach the transaction to the CIL if it has any busy extents */ if (!list_empty(&tp->t_busy)) - list_splice_init(&tp->t_busy, &ctx->busy_extents); + list_splice_init(&tp->t_busy, &cilpcp->busy_extents); /* - * Now transfer enough transaction reservation to the context ticket - * for the checkpoint. The context ticket is special - the unit - * reservation has to grow as well as the current reservation as we - * steal from tickets so we can correctly determine the space used - * during the transaction commit. + * Now update the order of everything modified in the transaction + * and insert items into the CIL if they aren't already there. + * We do this here so we only need to take the CIL lock once during + * the transaction commit. */ - if (ctx->ticket->t_curr_res == 0) { - ctx_res = ctx->ticket->t_unit_res; - ctx->ticket->t_curr_res = ctx_res; - tp->t_ticket->t_curr_res -= ctx_res; - } + order = atomic_inc_return(&ctx->order_id); + list_for_each_entry(lip, &tp->t_items, li_trans) { + /* Skip items which aren't dirty in this transaction. */ + if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) + continue; - /* do we need space for more log record headers? */ - iclog_space = log->l_iclog_size - log->l_iclog_hsize; - if (len > 0 && (ctx->space_used / iclog_space != - (ctx->space_used + len) / iclog_space)) { - split_res = (len + iclog_space - 1) / iclog_space; - /* need to take into account split region headers, too */ - split_res *= log->l_iclog_hsize + sizeof(struct xlog_op_header); - ctx->ticket->t_unit_res += split_res; - ctx->ticket->t_curr_res += split_res; - tp->t_ticket->t_curr_res -= split_res; - ASSERT(tp->t_ticket->t_curr_res >= len); + lip->li_order_id = order; + if (!list_empty(&lip->li_cil)) + continue; + list_add_tail(&lip->li_cil, &cilpcp->log_items); } - tp->t_ticket->t_curr_res -= len; - ctx->space_used += len; + put_cpu_ptr(cilpcp); /* * If we've overrun the reservation, dump the tx details before we move * the log items. Shutdown is imminent... */ + tp->t_ticket->t_curr_res -= ctx_res + len; if (WARN_ON(tp->t_ticket->t_curr_res < 0)) { xfs_warn(log->l_mp, "Transaction log reservation overrun:"); xfs_warn(log->l_mp, @@ -515,44 +679,20 @@ xlog_cil_insert_items( split_res); xfs_warn(log->l_mp, " ctx ticket: %d bytes", ctx_res); xlog_print_trans(tp); + xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); } - - /* - * Now (re-)position everything modified at the tail of the CIL. - * We do this here so we only need to take the CIL lock once during - * the transaction commit. - */ - list_for_each_entry(lip, &tp->t_items, li_trans) { - - /* Skip items which aren't dirty in this transaction. */ - if (!test_bit(XFS_LI_DIRTY, &lip->li_flags)) - continue; - - /* - * Only move the item if it isn't already at the tail. This is - * to prevent a transient list_empty() state when reinserting - * an item that is already the only item in the CIL. - */ - if (!list_is_last(&lip->li_cil, &cil->xc_cil)) - list_move_tail(&lip->li_cil, &cil->xc_cil); - } - - spin_unlock(&cil->xc_cil_lock); - - if (tp->t_ticket->t_curr_res < 0) - xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); } static void xlog_cil_free_logvec( - struct xfs_log_vec *log_vector) + struct list_head *lv_chain) { struct xfs_log_vec *lv; - for (lv = log_vector; lv; ) { - struct xfs_log_vec *next = lv->lv_next; + while (!list_empty(lv_chain)) { + lv = list_first_entry(lv_chain, struct xfs_log_vec, lv_list); + list_del_init(&lv->lv_list); kmem_free(lv); - lv = next; } } @@ -605,7 +745,7 @@ xlog_discard_busy_extents( error = __blkdev_issue_discard(mp->m_ddev_targp->bt_bdev, XFS_AGB_TO_DADDR(mp, busyp->agno, busyp->bno), XFS_FSB_TO_BB(mp, busyp->length), - GFP_NOFS, 0, &bio); + GFP_NOFS, &bio); if (error && error != -EOPNOTSUPP) { xfs_info(mp, "discard failed for extent [0x%llx,%u], error %d", @@ -652,7 +792,7 @@ xlog_cil_committed( spin_unlock(&ctx->cil->xc_push_lock); } - xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, ctx->lv_chain, + xfs_trans_committed_bulk(ctx->cil->xc_log->l_ailp, &ctx->lv_chain, ctx->start_lsn, abort); xfs_extent_busy_sort(&ctx->busy_extents); @@ -663,7 +803,7 @@ xlog_cil_committed( list_del(&ctx->committing); spin_unlock(&ctx->cil->xc_push_lock); - xlog_cil_free_logvec(ctx->lv_chain); + xlog_cil_free_logvec(&ctx->lv_chain); if (!list_empty(&ctx->busy_extents)) xlog_discard_busy_extents(mp, ctx); @@ -705,11 +845,21 @@ xlog_cil_set_ctx_write_state( * The LSN we need to pass to the log items on transaction * commit is the LSN reported by the first log vector write, not * the commit lsn. If we use the commit record lsn then we can - * move the tail beyond the grant write head. + * move the grant write head beyond the tail LSN and overwrite + * it. */ ctx->start_lsn = lsn; wake_up_all(&cil->xc_start_wait); spin_unlock(&cil->xc_push_lock); + + /* + * Make sure the metadata we are about to overwrite in the log + * has been flushed to stable storage before this iclog is + * issued. + */ + spin_lock(&cil->xc_log->l_icloglock); + iclog->ic_flags |= XLOG_ICL_NEED_FLUSH; + spin_unlock(&cil->xc_log->l_icloglock); return; } @@ -812,7 +962,7 @@ restart: static int xlog_cil_write_chain( struct xfs_cil_ctx *ctx, - struct xfs_log_vec *chain) + uint32_t chain_len) { struct xlog *log = ctx->cil->xc_log; int error; @@ -820,7 +970,7 @@ xlog_cil_write_chain( error = xlog_cil_order_write(ctx->cil, ctx->sequence, _START_RECORD); if (error) return error; - return xlog_write(log, ctx, chain, ctx->ticket, XLOG_START_TRANS); + return xlog_write(log, ctx, &ctx->lv_chain, ctx->ticket, chain_len); } /* @@ -834,9 +984,14 @@ xlog_cil_write_commit_record( struct xfs_cil_ctx *ctx) { struct xlog *log = ctx->cil->xc_log; + struct xlog_op_header ophdr = { + .oh_clientid = XFS_TRANSACTION, + .oh_tid = cpu_to_be32(ctx->ticket->t_tid), + .oh_flags = XLOG_COMMIT_TRANS, + }; struct xfs_log_iovec reg = { - .i_addr = NULL, - .i_len = 0, + .i_addr = &ophdr, + .i_len = sizeof(struct xlog_op_header), .i_type = XLOG_REG_TYPE_COMMIT, }; struct xfs_log_vec vec = { @@ -844,6 +999,8 @@ xlog_cil_write_commit_record( .lv_iovecp = ®, }; int error; + LIST_HEAD(lv_chain); + list_add(&vec.lv_list, &lv_chain); if (xlog_is_shutdown(log)) return -EIO; @@ -852,12 +1009,155 @@ xlog_cil_write_commit_record( if (error) return error; - error = xlog_write(log, ctx, &vec, ctx->ticket, XLOG_COMMIT_TRANS); + /* account for space used by record data */ + ctx->ticket->t_curr_res -= reg.i_len; + error = xlog_write(log, ctx, &lv_chain, ctx->ticket, reg.i_len); if (error) - xfs_force_shutdown(log->l_mp, SHUTDOWN_LOG_IO_ERROR); + xlog_force_shutdown(log, SHUTDOWN_LOG_IO_ERROR); return error; } +struct xlog_cil_trans_hdr { + struct xlog_op_header oph[2]; + struct xfs_trans_header thdr; + struct xfs_log_iovec lhdr[2]; +}; + +/* + * Build a checkpoint transaction header to begin the journal transaction. We + * need to account for the space used by the transaction header here as it is + * not accounted for in xlog_write(). + * + * This is the only place we write a transaction header, so we also build the + * log opheaders that indicate the start of a log transaction and wrap the + * transaction header. We keep the start record in it's own log vector rather + * than compacting them into a single region as this ends up making the logic + * in xlog_write() for handling empty opheaders for start, commit and unmount + * records much simpler. + */ +static void +xlog_cil_build_trans_hdr( + struct xfs_cil_ctx *ctx, + struct xlog_cil_trans_hdr *hdr, + struct xfs_log_vec *lvhdr, + int num_iovecs) +{ + struct xlog_ticket *tic = ctx->ticket; + __be32 tid = cpu_to_be32(tic->t_tid); + + memset(hdr, 0, sizeof(*hdr)); + + /* Log start record */ + hdr->oph[0].oh_tid = tid; + hdr->oph[0].oh_clientid = XFS_TRANSACTION; + hdr->oph[0].oh_flags = XLOG_START_TRANS; + + /* log iovec region pointer */ + hdr->lhdr[0].i_addr = &hdr->oph[0]; + hdr->lhdr[0].i_len = sizeof(struct xlog_op_header); + hdr->lhdr[0].i_type = XLOG_REG_TYPE_LRHEADER; + + /* log opheader */ + hdr->oph[1].oh_tid = tid; + hdr->oph[1].oh_clientid = XFS_TRANSACTION; + hdr->oph[1].oh_len = cpu_to_be32(sizeof(struct xfs_trans_header)); + + /* transaction header in host byte order format */ + hdr->thdr.th_magic = XFS_TRANS_HEADER_MAGIC; + hdr->thdr.th_type = XFS_TRANS_CHECKPOINT; + hdr->thdr.th_tid = tic->t_tid; + hdr->thdr.th_num_items = num_iovecs; + + /* log iovec region pointer */ + hdr->lhdr[1].i_addr = &hdr->oph[1]; + hdr->lhdr[1].i_len = sizeof(struct xlog_op_header) + + sizeof(struct xfs_trans_header); + hdr->lhdr[1].i_type = XLOG_REG_TYPE_TRANSHDR; + + lvhdr->lv_niovecs = 2; + lvhdr->lv_iovecp = &hdr->lhdr[0]; + lvhdr->lv_bytes = hdr->lhdr[0].i_len + hdr->lhdr[1].i_len; + + tic->t_curr_res -= lvhdr->lv_bytes; +} + +/* + * CIL item reordering compare function. We want to order in ascending ID order, + * but we want to leave items with the same ID in the order they were added to + * the list. This is important for operations like reflink where we log 4 order + * dependent intents in a single transaction when we overwrite an existing + * shared extent with a new shared extent. i.e. BUI(unmap), CUI(drop), + * CUI (inc), BUI(remap)... + */ +static int +xlog_cil_order_cmp( + void *priv, + const struct list_head *a, + const struct list_head *b) +{ + struct xfs_log_vec *l1 = container_of(a, struct xfs_log_vec, lv_list); + struct xfs_log_vec *l2 = container_of(b, struct xfs_log_vec, lv_list); + + return l1->lv_order_id > l2->lv_order_id; +} + +/* + * Pull all the log vectors off the items in the CIL, and remove the items from + * the CIL. We don't need the CIL lock here because it's only needed on the + * transaction commit side which is currently locked out by the flush lock. + * + * If a log item is marked with a whiteout, we do not need to write it to the + * journal and so we just move them to the whiteout list for the caller to + * dispose of appropriately. + */ +static void +xlog_cil_build_lv_chain( + struct xfs_cil_ctx *ctx, + struct list_head *whiteouts, + uint32_t *num_iovecs, + uint32_t *num_bytes) +{ + while (!list_empty(&ctx->log_items)) { + struct xfs_log_item *item; + struct xfs_log_vec *lv; + + item = list_first_entry(&ctx->log_items, + struct xfs_log_item, li_cil); + + if (test_bit(XFS_LI_WHITEOUT, &item->li_flags)) { + list_move(&item->li_cil, whiteouts); + trace_xfs_cil_whiteout_skip(item); + continue; + } + + lv = item->li_lv; + lv->lv_order_id = item->li_order_id; + + /* we don't write ordered log vectors */ + if (lv->lv_buf_len != XFS_LOG_VEC_ORDERED) + *num_bytes += lv->lv_bytes; + *num_iovecs += lv->lv_niovecs; + list_add_tail(&lv->lv_list, &ctx->lv_chain); + + list_del_init(&item->li_cil); + item->li_order_id = 0; + item->li_lv = NULL; + } +} + +static void +xlog_cil_cleanup_whiteouts( + struct list_head *whiteouts) +{ + while (!list_empty(whiteouts)) { + struct xfs_log_item *item = list_first_entry(whiteouts, + struct xfs_log_item, li_cil); + list_del_init(&item->li_cil); + trace_xfs_cil_whiteout_unpin(item); + item->li_ops->iop_unpin(item, 1); + } +} + /* * Push the Committed Item List to the log. * @@ -880,19 +1180,16 @@ xlog_cil_push_work( container_of(work, struct xfs_cil_ctx, push_work); struct xfs_cil *cil = ctx->cil; struct xlog *log = cil->xc_log; - struct xfs_log_vec *lv; struct xfs_cil_ctx *new_ctx; - struct xlog_ticket *tic; - int num_iovecs; + int num_iovecs = 0; + int num_bytes = 0; int error = 0; - struct xfs_trans_header thdr; - struct xfs_log_iovec lhdr; - struct xfs_log_vec lvhdr = { NULL }; - xfs_lsn_t preflush_tail_lsn; + struct xlog_cil_trans_hdr thdr; + struct xfs_log_vec lvhdr = {}; xfs_csn_t push_seq; - struct bio bio; - DECLARE_COMPLETION_ONSTACK(bdev_flush); bool push_commit_stable; + LIST_HEAD (whiteouts); + struct xlog_ticket *ticket; new_ctx = xlog_cil_ctx_alloc(); new_ctx->ticket = xlog_cil_ticket_alloc(log); @@ -916,12 +1213,14 @@ xlog_cil_push_work( if (waitqueue_active(&cil->xc_push_wait)) wake_up_all(&cil->xc_push_wait); + xlog_cil_push_pcp_aggregate(cil, ctx); + /* * Check if we've anything to push. If there is nothing, then we don't * move on to a new sequence number and so we have to be able to push * this sequence again later. */ - if (list_empty(&cil->xc_cil)) { + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) { cil->xc_push_seq = 0; spin_unlock(&cil->xc_push_lock); goto out_skip; @@ -961,45 +1260,7 @@ xlog_cil_push_work( list_add(&ctx->committing, &cil->xc_committing); spin_unlock(&cil->xc_push_lock); - /* - * The CIL is stable at this point - nothing new will be added to it - * because we hold the flush lock exclusively. Hence we can now issue - * a cache flush to ensure all the completed metadata in the journal we - * are about to overwrite is on stable storage. - * - * Because we are issuing this cache flush before we've written the - * tail lsn to the iclog, we can have metadata IO completions move the - * tail forwards between the completion of this flush and the iclog - * being written. In this case, we need to re-issue the cache flush - * before the iclog write. To detect whether the log tail moves, sample - * the tail LSN *before* we issue the flush. - */ - preflush_tail_lsn = atomic64_read(&log->l_tail_lsn); - xfs_flush_bdev_async(&bio, log->l_mp->m_ddev_targp->bt_bdev, - &bdev_flush); - - /* - * Pull all the log vectors off the items in the CIL, and remove the - * items from the CIL. We don't need the CIL lock here because it's only - * needed on the transaction commit side which is currently locked out - * by the flush lock. - */ - lv = NULL; - num_iovecs = 0; - while (!list_empty(&cil->xc_cil)) { - struct xfs_log_item *item; - - item = list_first_entry(&cil->xc_cil, - struct xfs_log_item, li_cil); - list_del_init(&item->li_cil); - if (!ctx->lv_chain) - ctx->lv_chain = item->li_lv; - else - lv->lv_next = item->li_lv; - lv = item->li_lv; - item->li_lv = NULL; - num_iovecs += lv->lv_niovecs; - } + xlog_cil_build_lv_chain(ctx, &whiteouts, &num_iovecs, &num_bytes); /* * Switch the contexts so we can drop the context lock and move out @@ -1032,35 +1293,30 @@ xlog_cil_push_work( up_write(&cil->xc_ctx_lock); /* + * Sort the log vector chain before we add the transaction headers. + * This ensures we always have the transaction headers at the start + * of the chain. + */ + list_sort(NULL, &ctx->lv_chain, xlog_cil_order_cmp); + + /* * Build a checkpoint transaction header and write it to the log to * begin the transaction. We need to account for the space used by the * transaction header here as it is not accounted for in xlog_write(). - * - * The LSN we need to pass to the log items on transaction commit is - * the LSN reported by the first log vector write. If we use the commit - * record lsn then we can move the tail beyond the grant write head. + * Add the lvhdr to the head of the lv chain we pass to xlog_write() so + * it gets written into the iclog first. */ - tic = ctx->ticket; - thdr.th_magic = XFS_TRANS_HEADER_MAGIC; - thdr.th_type = XFS_TRANS_CHECKPOINT; - thdr.th_tid = tic->t_tid; - thdr.th_num_items = num_iovecs; - lhdr.i_addr = &thdr; - lhdr.i_len = sizeof(xfs_trans_header_t); - lhdr.i_type = XLOG_REG_TYPE_TRANSHDR; - tic->t_curr_res -= lhdr.i_len + sizeof(xlog_op_header_t); - - lvhdr.lv_niovecs = 1; - lvhdr.lv_iovecp = &lhdr; - lvhdr.lv_next = ctx->lv_chain; + xlog_cil_build_trans_hdr(ctx, &thdr, &lvhdr, num_iovecs); + num_bytes += lvhdr.lv_bytes; + list_add(&lvhdr.lv_list, &ctx->lv_chain); /* - * Before we format and submit the first iclog, we have to ensure that - * the metadata writeback ordering cache flush is complete. + * Take the lvhdr back off the lv_chain immediately after calling + * xlog_cil_write_chain() as it should not be passed to log IO + * completion. */ - wait_for_completion(&bdev_flush); - - error = xlog_cil_write_chain(ctx, &lvhdr); + error = xlog_cil_write_chain(ctx, num_bytes); + list_del(&lvhdr.lv_list); if (error) goto out_abort_free_ticket; @@ -1068,7 +1324,14 @@ xlog_cil_push_work( if (error) goto out_abort_free_ticket; - xfs_log_ticket_ungrant(log, tic); + /* + * Grab the ticket from the ctx so we can ungrant it after releasing the + * commit_iclog. The ctx may be freed by the time we return from + * releasing the commit_iclog (i.e. checkpoint has been completed and + * callback run) so we can't reference the ctx after the call to + * xlog_state_release_iclog(). + */ + ticket = ctx->ticket; /* * If the checkpoint spans multiple iclogs, wait for all previous iclogs @@ -1118,11 +1381,14 @@ xlog_cil_push_work( if (push_commit_stable && ctx->commit_iclog->ic_state == XLOG_STATE_ACTIVE) xlog_state_switch_iclogs(log, ctx->commit_iclog, 0); - xlog_state_release_iclog(log, ctx->commit_iclog, preflush_tail_lsn); + ticket = ctx->ticket; + xlog_state_release_iclog(log, ctx->commit_iclog, ticket); /* Not safe to reference ctx now! */ spin_unlock(&log->l_icloglock); + xlog_cil_cleanup_whiteouts(&whiteouts); + xfs_log_ticket_ungrant(log, ticket); return; out_skip: @@ -1132,16 +1398,19 @@ out_skip: return; out_abort_free_ticket: - xfs_log_ticket_ungrant(log, tic); ASSERT(xlog_is_shutdown(log)); + xlog_cil_cleanup_whiteouts(&whiteouts); if (!ctx->commit_iclog) { + xfs_log_ticket_ungrant(log, ctx->ticket); xlog_cil_committed(ctx); return; } spin_lock(&log->l_icloglock); - xlog_state_release_iclog(log, ctx->commit_iclog, 0); + ticket = ctx->ticket; + xlog_state_release_iclog(log, ctx->commit_iclog, ticket); /* Not safe to reference ctx now! */ spin_unlock(&log->l_icloglock); + xfs_log_ticket_ungrant(log, ticket); } /* @@ -1156,18 +1425,27 @@ xlog_cil_push_background( struct xlog *log) __releases(cil->xc_ctx_lock) { struct xfs_cil *cil = log->l_cilp; + int space_used = atomic_read(&cil->xc_ctx->space_used); /* * The cil won't be empty because we are called while holding the - * context lock so whatever we added to the CIL will still be there + * context lock so whatever we added to the CIL will still be there. */ - ASSERT(!list_empty(&cil->xc_cil)); + ASSERT(!test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)); /* - * Don't do a background push if we haven't used up all the - * space available yet. + * We are done if: + * - we haven't used up all the space available yet; or + * - we've already queued up a push; and + * - we're not over the hard limit; and + * - nothing has been over the hard limit. + * + * If so, we don't need to take the push lock as there's nothing to do. */ - if (cil->xc_ctx->space_used < XLOG_CIL_SPACE_LIMIT(log)) { + if (space_used < XLOG_CIL_SPACE_LIMIT(log) || + (cil->xc_push_seq == cil->xc_current_sequence && + space_used < XLOG_CIL_BLOCKING_SPACE_LIMIT(log) && + !waitqueue_active(&cil->xc_push_wait))) { up_read(&cil->xc_ctx_lock); return; } @@ -1194,12 +1472,11 @@ xlog_cil_push_background( * dipping back down under the hard limit. * * The ctx->xc_push_lock provides the serialisation necessary for safely - * using the lockless waitqueue_active() check in this context. + * calling xlog_cil_over_hard_limit() in this context. */ - if (cil->xc_ctx->space_used >= XLOG_CIL_BLOCKING_SPACE_LIMIT(log) || - waitqueue_active(&cil->xc_push_wait)) { + if (xlog_cil_over_hard_limit(log, space_used)) { trace_xfs_log_cil_wait(log, cil->xc_ctx->ticket); - ASSERT(cil->xc_ctx->space_used < log->l_logsize); + ASSERT(space_used < log->l_logsize); xlog_wait(&cil->xc_push_wait, &cil->xc_push_lock); return; } @@ -1243,18 +1520,28 @@ xlog_cil_push_now( if (!async) flush_workqueue(cil->xc_push_wq); + spin_lock(&cil->xc_push_lock); + + /* + * If this is an async flush request, we always need to set the + * xc_push_commit_stable flag even if something else has already queued + * a push. The flush caller is asking for the CIL to be on stable + * storage when the next push completes, so regardless of who has queued + * the push, the flush requires stable semantics from it. + */ + cil->xc_push_commit_stable = async; + /* * If the CIL is empty or we've already pushed the sequence then - * there's no work we need to do. + * there's no more work that we need to do. */ - spin_lock(&cil->xc_push_lock); - if (list_empty(&cil->xc_cil) || push_seq <= cil->xc_push_seq) { + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags) || + push_seq <= cil->xc_push_seq) { spin_unlock(&cil->xc_push_lock); return; } cil->xc_push_seq = push_seq; - cil->xc_push_commit_stable = async; queue_work(cil->xc_push_wq, &cil->xc_ctx->push_work); spin_unlock(&cil->xc_push_lock); } @@ -1267,13 +1554,50 @@ xlog_cil_empty( bool empty = false; spin_lock(&cil->xc_push_lock); - if (list_empty(&cil->xc_cil)) + if (test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) empty = true; spin_unlock(&cil->xc_push_lock); return empty; } /* + * If there are intent done items in this transaction and the related intent was + * committed in the current (same) CIL checkpoint, we don't need to write either + * the intent or intent done item to the journal as the change will be + * journalled atomically within this checkpoint. As we cannot remove items from + * the CIL here, mark the related intent with a whiteout so that the CIL push + * can remove it rather than writing it to the journal. Then remove the intent + * done item from the current transaction and release it so it doesn't get put + * into the CIL at all. + */ +static uint32_t +xlog_cil_process_intents( + struct xfs_cil *cil, + struct xfs_trans *tp) +{ + struct xfs_log_item *lip, *ilip, *next; + uint32_t len = 0; + + list_for_each_entry_safe(lip, next, &tp->t_items, li_trans) { + if (!(lip->li_ops->flags & XFS_ITEM_INTENT_DONE)) + continue; + + ilip = lip->li_ops->iop_intent(lip); + if (!ilip || !xlog_item_in_current_chkpt(cil, ilip)) + continue; + set_bit(XFS_LI_WHITEOUT, &ilip->li_flags); + trace_xfs_cil_whiteout_mark(ilip); + len += ilip->li_lv->lv_bytes; + kmem_free(ilip->li_lv); + ilip->li_lv = NULL; + + xfs_trans_del_item(lip); + lip->li_ops->iop_release(lip); + } + return len; +} + +/* * Commit a transaction with the given vector to the Committed Item List. * * To do this, we need to format the item, pin it in memory if required and @@ -1295,6 +1619,7 @@ xlog_cil_commit( { struct xfs_cil *cil = log->l_cilp; struct xfs_log_item *lip, *next; + uint32_t released_space = 0; /* * Do all necessary memory allocation before we lock the CIL. @@ -1306,7 +1631,10 @@ xlog_cil_commit( /* lock out background commit */ down_read(&cil->xc_ctx_lock); - xlog_cil_insert_items(log, tp); + if (tp->t_flags & XFS_TRANS_HAS_INTENT_DONE) + released_space = xlog_cil_process_intents(cil, tp); + + xlog_cil_insert_items(log, tp, released_space); if (regrant && !xlog_is_shutdown(log)) xfs_log_ticket_regrant(log, tp->t_ticket); @@ -1352,6 +1680,13 @@ xlog_cil_flush( trace_xfs_log_force(log->l_mp, seq, _RET_IP_); xlog_cil_push_now(log, seq, true); + + /* + * If the CIL is empty, make sure that any previous checkpoint that may + * still be in an active iclog is pushed to stable storage. + */ + if (test_bit(XLOG_CIL_EMPTY, &log->l_cilp->xc_flags)) + xfs_log_force(log->l_mp, 0); } /* @@ -1435,7 +1770,7 @@ restart: * we would have found the context on the committing list. */ if (sequence == cil->xc_current_sequence && - !list_empty(&cil->xc_cil)) { + !test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)) { spin_unlock(&cil->xc_push_lock); goto restart; } @@ -1456,29 +1791,35 @@ out_shutdown: } /* - * Check if the current log item was first committed in this sequence. - * We can't rely on just the log item being in the CIL, we have to check - * the recorded commit sequence number. + * Move dead percpu state to the relevant CIL context structures. * - * Note: for this to be used in a non-racy manner, it has to be called with - * CIL flushing locked out. As a result, it should only be used during the - * transaction commit process when deciding what to format into the item. + * We have to lock the CIL context here to ensure that nothing is modifying + * the percpu state, either addition or removal. Both of these are done under + * the CIL context lock, so grabbing that exclusively here will ensure we can + * safely drain the cilpcp for the CPU that is dying. */ -bool -xfs_log_item_in_current_chkpt( - struct xfs_log_item *lip) +void +xlog_cil_pcp_dead( + struct xlog *log, + unsigned int cpu) { - struct xfs_cil *cil = lip->li_mountp->m_log->l_cilp; - - if (list_empty(&lip->li_cil)) - return false; + struct xfs_cil *cil = log->l_cilp; + struct xlog_cil_pcp *cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); + struct xfs_cil_ctx *ctx; - /* - * li_seq is written on the first commit of a log item to record the - * first checkpoint it is written to. Hence if it is different to the - * current sequence, we're in a new checkpoint. - */ - return lip->li_seq == READ_ONCE(cil->xc_current_sequence); + down_write(&cil->xc_ctx_lock); + ctx = cil->xc_ctx; + if (ctx->ticket) + ctx->ticket->t_curr_res += cilpcp->space_reserved; + cilpcp->space_reserved = 0; + + if (!list_empty(&cilpcp->log_items)) + list_splice_init(&cilpcp->log_items, &ctx->log_items); + if (!list_empty(&cilpcp->busy_extents)) + list_splice_init(&cilpcp->busy_extents, &ctx->busy_extents); + atomic_add(cilpcp->space_used, &ctx->space_used); + cilpcp->space_used = 0; + up_write(&cil->xc_ctx_lock); } /* @@ -1486,10 +1827,12 @@ xfs_log_item_in_current_chkpt( */ int xlog_cil_init( - struct xlog *log) + struct xlog *log) { - struct xfs_cil *cil; - struct xfs_cil_ctx *ctx; + struct xfs_cil *cil; + struct xfs_cil_ctx *ctx; + struct xlog_cil_pcp *cilpcp; + int cpu; cil = kmem_zalloc(sizeof(*cil), KM_MAYFAIL); if (!cil) @@ -1504,22 +1847,31 @@ xlog_cil_init( if (!cil->xc_push_wq) goto out_destroy_cil; - INIT_LIST_HEAD(&cil->xc_cil); + cil->xc_log = log; + cil->xc_pcp = alloc_percpu(struct xlog_cil_pcp); + if (!cil->xc_pcp) + goto out_destroy_wq; + + for_each_possible_cpu(cpu) { + cilpcp = per_cpu_ptr(cil->xc_pcp, cpu); + INIT_LIST_HEAD(&cilpcp->busy_extents); + INIT_LIST_HEAD(&cilpcp->log_items); + } + INIT_LIST_HEAD(&cil->xc_committing); - spin_lock_init(&cil->xc_cil_lock); spin_lock_init(&cil->xc_push_lock); init_waitqueue_head(&cil->xc_push_wait); init_rwsem(&cil->xc_ctx_lock); init_waitqueue_head(&cil->xc_start_wait); init_waitqueue_head(&cil->xc_commit_wait); - cil->xc_log = log; log->l_cilp = cil; ctx = xlog_cil_ctx_alloc(); xlog_cil_ctx_switch(cil, ctx); - return 0; +out_destroy_wq: + destroy_workqueue(cil->xc_push_wq); out_destroy_cil: kmem_free(cil); return -ENOMEM; @@ -1529,14 +1881,17 @@ void xlog_cil_destroy( struct xlog *log) { - if (log->l_cilp->xc_ctx) { - if (log->l_cilp->xc_ctx->ticket) - xfs_log_ticket_put(log->l_cilp->xc_ctx->ticket); - kmem_free(log->l_cilp->xc_ctx); + struct xfs_cil *cil = log->l_cilp; + + if (cil->xc_ctx) { + if (cil->xc_ctx->ticket) + xfs_log_ticket_put(cil->xc_ctx->ticket); + kmem_free(cil->xc_ctx); } - ASSERT(list_empty(&log->l_cilp->xc_cil)); - destroy_workqueue(log->l_cilp->xc_push_wq); - kmem_free(log->l_cilp); + ASSERT(test_bit(XLOG_CIL_EMPTY, &cil->xc_flags)); + free_percpu(cil->xc_pcp); + destroy_workqueue(cil->xc_push_wq); + kmem_free(cil); } |