diff options
Diffstat (limited to 'kernel/trace/ring_buffer.c')
-rw-r--r-- | kernel/trace/ring_buffer.c | 1596 |
1 files changed, 1271 insertions, 325 deletions
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index 61f0e92ace99..9712083832f4 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -4,6 +4,7 @@ * * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com> */ +#include <linux/trace_recursion.h> #include <linux/trace_events.h> #include <linux/ring_buffer.h> #include <linux/trace_clock.h> @@ -28,6 +29,14 @@ #include <asm/local.h> +/* + * The "absolute" timestamp in the buffer is only 59 bits. + * If a clock has the 5 MSBs set, it needs to be saved and + * reinserted. + */ +#define TS_MSB (0xf8ULL << 56) +#define ABS_TS_MASK (~TS_MSB) + static void update_pages_handler(struct work_struct *work); /* @@ -129,7 +138,16 @@ int ring_buffer_print_entry_header(struct trace_seq *s) #define RB_ALIGNMENT 4U #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX) #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */ -#define RB_ALIGN_DATA __aligned(RB_ALIGNMENT) + +#ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS +# define RB_FORCE_8BYTE_ALIGNMENT 0 +# define RB_ARCH_ALIGNMENT RB_ALIGNMENT +#else +# define RB_FORCE_8BYTE_ALIGNMENT 1 +# define RB_ARCH_ALIGNMENT 8U +#endif + +#define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT) /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */ #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX @@ -193,7 +211,7 @@ rb_event_length(struct ring_buffer_event *event) case RINGBUF_TYPE_DATA: return rb_event_data_length(event); default: - BUG(); + WARN_ON_ONCE(1); } /* not hit */ return 0; @@ -249,7 +267,7 @@ rb_event_data(struct ring_buffer_event *event) { if (extended_time(event)) event = skip_time_extend(event); - BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); + WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX); /* If length is in len field, then array[0] has the data */ if (event->type_len) return (void *)&event->array[0]; @@ -270,21 +288,14 @@ EXPORT_SYMBOL_GPL(ring_buffer_event_data); #define for_each_buffer_cpu(buffer, cpu) \ for_each_cpu(cpu, buffer->cpumask) +#define for_each_online_buffer_cpu(buffer, cpu) \ + for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask) + #define TS_SHIFT 27 #define TS_MASK ((1ULL << TS_SHIFT) - 1) #define TS_DELTA_TEST (~TS_MASK) -/** - * ring_buffer_event_time_stamp - return the event's extended timestamp - * @event: the event to get the timestamp of - * - * Returns the extended timestamp associated with a data event. - * An extended time_stamp is a 64-bit timestamp represented - * internally in a special way that makes the best use of space - * contained within a ring buffer event. This function decodes - * it and maps it to a straight u64 value. - */ -u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event) +static u64 rb_event_time_stamp(struct ring_buffer_event *event) { u64 ts; @@ -402,6 +413,7 @@ struct rb_irq_work { struct irq_work work; wait_queue_head_t waiters; wait_queue_head_t full_waiters; + long wait_index; bool waiters_pending; bool full_waiters_pending; bool wakeup_full; @@ -413,21 +425,38 @@ struct rb_irq_work { struct rb_event_info { u64 ts; u64 delta; + u64 before; + u64 after; unsigned long length; struct buffer_page *tail_page; int add_timestamp; }; /* + * Used for the add_timestamp + * NONE + * EXTEND - wants a time extend + * ABSOLUTE - the buffer requests all events to have absolute time stamps + * FORCE - force a full time stamp. + */ +enum { + RB_ADD_STAMP_NONE = 0, + RB_ADD_STAMP_EXTEND = BIT(1), + RB_ADD_STAMP_ABSOLUTE = BIT(2), + RB_ADD_STAMP_FORCE = BIT(3) +}; +/* * Used for which event context the event is in. - * NMI = 0 - * IRQ = 1 - * SOFTIRQ = 2 - * NORMAL = 3 + * TRANSITION = 0 + * NMI = 1 + * IRQ = 2 + * SOFTIRQ = 3 + * NORMAL = 4 * * See trace_recursive_lock() comment below for more details. */ enum { + RB_CTX_TRANSITION, RB_CTX_NMI, RB_CTX_IRQ, RB_CTX_SOFTIRQ, @@ -435,12 +464,38 @@ enum { RB_CTX_MAX }; +#if BITS_PER_LONG == 32 +#define RB_TIME_32 +#endif + +/* To test on 64 bit machines */ +//#define RB_TIME_32 + +#ifdef RB_TIME_32 + +struct rb_time_struct { + local_t cnt; + local_t top; + local_t bottom; + local_t msb; +}; +#else +#include <asm/local64.h> +struct rb_time_struct { + local64_t time; +}; +#endif +typedef struct rb_time_struct rb_time_t; + +#define MAX_NEST 5 + /* * head_page == tail_page && head == tail then buffer is empty. */ struct ring_buffer_per_cpu { int cpu; atomic_t record_disabled; + atomic_t resize_disabled; struct trace_buffer *buffer; raw_spinlock_t reader_lock; /* serialize readers */ arch_spinlock_t lock; @@ -469,7 +524,9 @@ struct ring_buffer_per_cpu { size_t shortest_full; unsigned long read; unsigned long read_bytes; - u64 write_stamp; + rb_time_t write_stamp; + rb_time_t before_stamp; + u64 event_stamp[MAX_NEST]; u64 read_stamp; /* ring buffer pages to update, > 0 to add, < 0 to remove */ long nr_pages_to_update; @@ -484,7 +541,6 @@ struct trace_buffer { unsigned flags; int cpus; atomic_t record_disabled; - atomic_t resize_disabled; cpumask_var_t cpumask; struct lock_class_key *reader_lock_key; @@ -503,12 +559,319 @@ struct trace_buffer { struct ring_buffer_iter { struct ring_buffer_per_cpu *cpu_buffer; unsigned long head; + unsigned long next_event; struct buffer_page *head_page; struct buffer_page *cache_reader_page; unsigned long cache_read; u64 read_stamp; + u64 page_stamp; + struct ring_buffer_event *event; + int missed_events; }; +#ifdef RB_TIME_32 + +/* + * On 32 bit machines, local64_t is very expensive. As the ring + * buffer doesn't need all the features of a true 64 bit atomic, + * on 32 bit, it uses these functions (64 still uses local64_t). + * + * For the ring buffer, 64 bit required operations for the time is + * the following: + * + * - Reads may fail if it interrupted a modification of the time stamp. + * It will succeed if it did not interrupt another write even if + * the read itself is interrupted by a write. + * It returns whether it was successful or not. + * + * - Writes always succeed and will overwrite other writes and writes + * that were done by events interrupting the current write. + * + * - A write followed by a read of the same time stamp will always succeed, + * but may not contain the same value. + * + * - A cmpxchg will fail if it interrupted another write or cmpxchg. + * Other than that, it acts like a normal cmpxchg. + * + * The 60 bit time stamp is broken up by 30 bits in a top and bottom half + * (bottom being the least significant 30 bits of the 60 bit time stamp). + * + * The two most significant bits of each half holds a 2 bit counter (0-3). + * Each update will increment this counter by one. + * When reading the top and bottom, if the two counter bits match then the + * top and bottom together make a valid 60 bit number. + */ +#define RB_TIME_SHIFT 30 +#define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1) +#define RB_TIME_MSB_SHIFT 60 + +static inline int rb_time_cnt(unsigned long val) +{ + return (val >> RB_TIME_SHIFT) & 3; +} + +static inline u64 rb_time_val(unsigned long top, unsigned long bottom) +{ + u64 val; + + val = top & RB_TIME_VAL_MASK; + val <<= RB_TIME_SHIFT; + val |= bottom & RB_TIME_VAL_MASK; + + return val; +} + +static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt) +{ + unsigned long top, bottom, msb; + unsigned long c; + + /* + * If the read is interrupted by a write, then the cnt will + * be different. Loop until both top and bottom have been read + * without interruption. + */ + do { + c = local_read(&t->cnt); + top = local_read(&t->top); + bottom = local_read(&t->bottom); + msb = local_read(&t->msb); + } while (c != local_read(&t->cnt)); + + *cnt = rb_time_cnt(top); + + /* If top and bottom counts don't match, this interrupted a write */ + if (*cnt != rb_time_cnt(bottom)) + return false; + + /* The shift to msb will lose its cnt bits */ + *ret = rb_time_val(top, bottom) | ((u64)msb << RB_TIME_MSB_SHIFT); + return true; +} + +static bool rb_time_read(rb_time_t *t, u64 *ret) +{ + unsigned long cnt; + + return __rb_time_read(t, ret, &cnt); +} + +static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt) +{ + return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT); +} + +static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom, + unsigned long *msb) +{ + *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK); + *bottom = (unsigned long)(val & RB_TIME_VAL_MASK); + *msb = (unsigned long)(val >> RB_TIME_MSB_SHIFT); +} + +static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt) +{ + val = rb_time_val_cnt(val, cnt); + local_set(t, val); +} + +static void rb_time_set(rb_time_t *t, u64 val) +{ + unsigned long cnt, top, bottom, msb; + + rb_time_split(val, &top, &bottom, &msb); + + /* Writes always succeed with a valid number even if it gets interrupted. */ + do { + cnt = local_inc_return(&t->cnt); + rb_time_val_set(&t->top, top, cnt); + rb_time_val_set(&t->bottom, bottom, cnt); + rb_time_val_set(&t->msb, val >> RB_TIME_MSB_SHIFT, cnt); + } while (cnt != local_read(&t->cnt)); +} + +static inline bool +rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set) +{ + unsigned long ret; + + ret = local_cmpxchg(l, expect, set); + return ret == expect; +} + +static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) +{ + unsigned long cnt, top, bottom, msb; + unsigned long cnt2, top2, bottom2, msb2; + u64 val; + + /* The cmpxchg always fails if it interrupted an update */ + if (!__rb_time_read(t, &val, &cnt2)) + return false; + + if (val != expect) + return false; + + cnt = local_read(&t->cnt); + if ((cnt & 3) != cnt2) + return false; + + cnt2 = cnt + 1; + + rb_time_split(val, &top, &bottom, &msb); + top = rb_time_val_cnt(top, cnt); + bottom = rb_time_val_cnt(bottom, cnt); + + rb_time_split(set, &top2, &bottom2, &msb2); + top2 = rb_time_val_cnt(top2, cnt2); + bottom2 = rb_time_val_cnt(bottom2, cnt2); + + if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2)) + return false; + if (!rb_time_read_cmpxchg(&t->msb, msb, msb2)) + return false; + if (!rb_time_read_cmpxchg(&t->top, top, top2)) + return false; + if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2)) + return false; + return true; +} + +#else /* 64 bits */ + +/* local64_t always succeeds */ + +static inline bool rb_time_read(rb_time_t *t, u64 *ret) +{ + *ret = local64_read(&t->time); + return true; +} +static void rb_time_set(rb_time_t *t, u64 val) +{ + local64_set(&t->time, val); +} + +static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set) +{ + u64 val; + val = local64_cmpxchg(&t->time, expect, set); + return val == expect; +} +#endif + +/* + * Enable this to make sure that the event passed to + * ring_buffer_event_time_stamp() is not committed and also + * is on the buffer that it passed in. + */ +//#define RB_VERIFY_EVENT +#ifdef RB_VERIFY_EVENT +static struct list_head *rb_list_head(struct list_head *list); +static void verify_event(struct ring_buffer_per_cpu *cpu_buffer, + void *event) +{ + struct buffer_page *page = cpu_buffer->commit_page; + struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page); + struct list_head *next; + long commit, write; + unsigned long addr = (unsigned long)event; + bool done = false; + int stop = 0; + + /* Make sure the event exists and is not committed yet */ + do { + if (page == tail_page || WARN_ON_ONCE(stop++ > 100)) + done = true; + commit = local_read(&page->page->commit); + write = local_read(&page->write); + if (addr >= (unsigned long)&page->page->data[commit] && + addr < (unsigned long)&page->page->data[write]) + return; + + next = rb_list_head(page->list.next); + page = list_entry(next, struct buffer_page, list); + } while (!done); + WARN_ON_ONCE(1); +} +#else +static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer, + void *event) +{ +} +#endif + +/* + * The absolute time stamp drops the 5 MSBs and some clocks may + * require them. The rb_fix_abs_ts() will take a previous full + * time stamp, and add the 5 MSB of that time stamp on to the + * saved absolute time stamp. Then they are compared in case of + * the unlikely event that the latest time stamp incremented + * the 5 MSB. + */ +static inline u64 rb_fix_abs_ts(u64 abs, u64 save_ts) +{ + if (save_ts & TS_MSB) { + abs |= save_ts & TS_MSB; + /* Check for overflow */ + if (unlikely(abs < save_ts)) + abs += 1ULL << 59; + } + return abs; +} + +static inline u64 rb_time_stamp(struct trace_buffer *buffer); + +/** + * ring_buffer_event_time_stamp - return the event's current time stamp + * @buffer: The buffer that the event is on + * @event: the event to get the time stamp of + * + * Note, this must be called after @event is reserved, and before it is + * committed to the ring buffer. And must be called from the same + * context where the event was reserved (normal, softirq, irq, etc). + * + * Returns the time stamp associated with the current event. + * If the event has an extended time stamp, then that is used as + * the time stamp to return. + * In the highly unlikely case that the event was nested more than + * the max nesting, then the write_stamp of the buffer is returned, + * otherwise current time is returned, but that really neither of + * the last two cases should ever happen. + */ +u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer, + struct ring_buffer_event *event) +{ + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()]; + unsigned int nest; + u64 ts; + + /* If the event includes an absolute time, then just use that */ + if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { + ts = rb_event_time_stamp(event); + return rb_fix_abs_ts(ts, cpu_buffer->tail_page->page->time_stamp); + } + + nest = local_read(&cpu_buffer->committing); + verify_event(cpu_buffer, event); + if (WARN_ON_ONCE(!nest)) + goto fail; + + /* Read the current saved nesting level time stamp */ + if (likely(--nest < MAX_NEST)) + return cpu_buffer->event_stamp[nest]; + + /* Shouldn't happen, warn if it does */ + WARN_ONCE(1, "nest (%d) greater than max", nest); + + fail: + /* Can only fail on 32 bit */ + if (!rb_time_read(&cpu_buffer->write_stamp, &ts)) + /* Screw it, just read the current time */ + ts = rb_time_stamp(cpu_buffer->buffer); + + return ts; +} + /** * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer * @buffer: The ring_buffer to get the number of pages from @@ -522,7 +885,7 @@ size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu) } /** - * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer + * ring_buffer_nr_dirty_pages - get the number of used pages in the ring buffer * @buffer: The ring_buffer to get the number of pages from * @cpu: The cpu of the ring_buffer to get the number of pages from * @@ -555,17 +918,60 @@ static void rb_wake_up_waiters(struct irq_work *work) struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work); wake_up_all(&rbwork->waiters); - if (rbwork->wakeup_full) { + if (rbwork->full_waiters_pending || rbwork->wakeup_full) { rbwork->wakeup_full = false; + rbwork->full_waiters_pending = false; wake_up_all(&rbwork->full_waiters); } } /** + * ring_buffer_wake_waiters - wake up any waiters on this ring buffer + * @buffer: The ring buffer to wake waiters on + * + * In the case of a file that represents a ring buffer is closing, + * it is prudent to wake up any waiters that are on this. + */ +void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + struct rb_irq_work *rbwork; + + if (!buffer) + return; + + if (cpu == RING_BUFFER_ALL_CPUS) { + + /* Wake up individual ones too. One level recursion */ + for_each_buffer_cpu(buffer, cpu) + ring_buffer_wake_waiters(buffer, cpu); + + rbwork = &buffer->irq_work; + } else { + if (WARN_ON_ONCE(!buffer->buffers)) + return; + if (WARN_ON_ONCE(cpu >= nr_cpu_ids)) + return; + + cpu_buffer = buffer->buffers[cpu]; + /* The CPU buffer may not have been initialized yet */ + if (!cpu_buffer) + return; + rbwork = &cpu_buffer->irq_work; + } + + rbwork->wait_index++; + /* make sure the waiters see the new index */ + smp_wmb(); + + rb_wake_up_waiters(&rbwork->work); +} + +/** * ring_buffer_wait - wait for input to the ring buffer * @buffer: buffer to wait on * @cpu: the cpu buffer to wait on - * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS + * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS * * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon * as data is added to any of the @buffer's cpu buffers. Otherwise @@ -573,9 +979,10 @@ static void rb_wake_up_waiters(struct irq_work *work) */ int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) { - struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer); + struct ring_buffer_per_cpu *cpu_buffer; DEFINE_WAIT(wait); struct rb_irq_work *work; + long wait_index; int ret = 0; /* @@ -594,6 +1001,7 @@ int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) work = &cpu_buffer->irq_work; } + wait_index = READ_ONCE(work->wait_index); while (true) { if (full) @@ -649,7 +1057,7 @@ int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) nr_pages = cpu_buffer->nr_pages; dirty = ring_buffer_nr_dirty_pages(buffer, cpu); if (!cpu_buffer->shortest_full || - cpu_buffer->shortest_full < full) + cpu_buffer->shortest_full > full) cpu_buffer->shortest_full = full; raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); if (!pagebusy && @@ -658,6 +1066,11 @@ int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full) } schedule(); + + /* Make sure to see the new wait index */ + smp_rmb(); + if (wait_index != work->wait_index) + break; } if (full) @@ -742,11 +1155,19 @@ __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu, static inline u64 rb_time_stamp(struct trace_buffer *buffer) { + u64 ts; + + /* Skip retpolines :-( */ + if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local)) + ts = trace_clock_local(); + else + ts = buffer->clock(); + /* shift to debug/test normalization and TIME_EXTENTS */ - return buffer->clock() << DEBUG_SHIFT; + return ts << DEBUG_SHIFT; } -u64 ring_buffer_time_stamp(struct trace_buffer *buffer, int cpu) +u64 ring_buffer_time_stamp(struct trace_buffer *buffer) { u64 time; @@ -864,8 +1285,7 @@ static struct list_head *rb_list_head(struct list_head *list) * its flags will be non zero. */ static inline int -rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer, - struct buffer_page *page, struct list_head *list) +rb_is_head_page(struct buffer_page *page, struct list_head *list) { unsigned long val; @@ -894,8 +1314,7 @@ static bool rb_is_reader_page(struct buffer_page *page) /* * rb_set_list_to_head - set a list_head to be pointing to head. */ -static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer, - struct list_head *list) +static void rb_set_list_to_head(struct list_head *list) { unsigned long *ptr; @@ -918,7 +1337,7 @@ static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer) /* * Set the previous list pointer to have the HEAD flag. */ - rb_set_list_to_head(cpu_buffer, head->list.prev); + rb_set_list_to_head(head->list.prev); } static void rb_list_head_clear(struct list_head *list) @@ -993,8 +1412,7 @@ static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer, old_flag, RB_PAGE_NORMAL); } -static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer, - struct buffer_page **bpage) +static inline void rb_inc_page(struct buffer_page **bpage) { struct list_head *p = rb_list_head((*bpage)->list.next); @@ -1026,11 +1444,11 @@ rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer) */ for (i = 0; i < 3; i++) { do { - if (rb_is_head_page(cpu_buffer, page, page->list.prev)) { + if (rb_is_head_page(page, page->list.prev)) { cpu_buffer->head_page = page; return page; } - rb_inc_page(cpu_buffer, &page); + rb_inc_page(&page); } while (page != head); } @@ -1184,7 +1602,8 @@ static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer) return 0; } -static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu) +static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, + long nr_pages, struct list_head *pages) { struct buffer_page *bpage, *tmp; bool user_thread = current->mm != NULL; @@ -1224,13 +1643,15 @@ static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu) struct page *page; bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), - mflags, cpu_to_node(cpu)); + mflags, cpu_to_node(cpu_buffer->cpu)); if (!bpage) goto free_pages; + rb_check_bpage(cpu_buffer, bpage); + list_add(&bpage->list, pages); - page = alloc_pages_node(cpu_to_node(cpu), mflags, 0); + page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0); if (!page) goto free_pages; bpage->page = page_address(page); @@ -1262,7 +1683,7 @@ static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer, WARN_ON(!nr_pages); - if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu)) + if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages)) return -ENOMEM; /* @@ -1573,7 +1994,7 @@ rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages) cond_resched(); to_remove_page = tmp_iter_page; - rb_inc_page(cpu_buffer, &tmp_iter_page); + rb_inc_page(&tmp_iter_page); /* update the counters */ page_entries = rb_page_entries(to_remove_page); @@ -1716,18 +2137,18 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, { struct ring_buffer_per_cpu *cpu_buffer; unsigned long nr_pages; - int cpu, err = 0; + int cpu, err; /* * Always succeed at resizing a non-existent buffer: */ if (!buffer) - return size; + return 0; /* Make sure the requested buffer exists */ if (cpu_id != RING_BUFFER_ALL_CPUS && !cpumask_test_cpu(cpu_id, buffer->cpumask)) - return size; + return 0; nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE); @@ -1735,20 +2156,24 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, if (nr_pages < 2) nr_pages = 2; - size = nr_pages * BUF_PAGE_SIZE; - - /* - * Don't succeed if resizing is disabled, as a reader might be - * manipulating the ring buffer and is expecting a sane state while - * this is true. - */ - if (atomic_read(&buffer->resize_disabled)) - return -EBUSY; - /* prevent another thread from changing buffer sizes */ mutex_lock(&buffer->mutex); + if (cpu_id == RING_BUFFER_ALL_CPUS) { + /* + * Don't succeed if resizing is disabled, as a reader might be + * manipulating the ring buffer and is expecting a sane state while + * this is true. + */ + for_each_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; + if (atomic_read(&cpu_buffer->resize_disabled)) { + err = -EBUSY; + goto out_err_unlock; + } + } + /* calculate the pages to update */ for_each_buffer_cpu(buffer, cpu) { cpu_buffer = buffer->buffers[cpu]; @@ -1765,15 +2190,15 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, * allocated without receiving ENOMEM */ INIT_LIST_HEAD(&cpu_buffer->new_pages); - if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update, - &cpu_buffer->new_pages, cpu)) { + if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, + &cpu_buffer->new_pages)) { /* not enough memory for new pages */ err = -ENOMEM; goto out_err; } } - get_online_cpus(); + cpus_read_lock(); /* * Fire off all the required work handlers * We can't schedule on offline CPUs, but it's not necessary @@ -1805,29 +2230,35 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, cpu_buffer->nr_pages_to_update = 0; } - put_online_cpus(); + cpus_read_unlock(); } else { - /* Make sure this CPU has been initialized */ - if (!cpumask_test_cpu(cpu_id, buffer->cpumask)) - goto out; - cpu_buffer = buffer->buffers[cpu_id]; if (nr_pages == cpu_buffer->nr_pages) goto out; + /* + * Don't succeed if resizing is disabled, as a reader might be + * manipulating the ring buffer and is expecting a sane state while + * this is true. + */ + if (atomic_read(&cpu_buffer->resize_disabled)) { + err = -EBUSY; + goto out_err_unlock; + } + cpu_buffer->nr_pages_to_update = nr_pages - cpu_buffer->nr_pages; INIT_LIST_HEAD(&cpu_buffer->new_pages); if (cpu_buffer->nr_pages_to_update > 0 && - __rb_allocate_pages(cpu_buffer->nr_pages_to_update, - &cpu_buffer->new_pages, cpu_id)) { + __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update, + &cpu_buffer->new_pages)) { err = -ENOMEM; goto out_err; } - get_online_cpus(); + cpus_read_lock(); /* Can't run something on an offline CPU. */ if (!cpu_online(cpu_id)) @@ -1839,7 +2270,7 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, } cpu_buffer->nr_pages_to_update = 0; - put_online_cpus(); + cpus_read_unlock(); } out: @@ -1867,7 +2298,7 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, } mutex_unlock(&buffer->mutex); - return size; + return 0; out_err: for_each_buffer_cpu(buffer, cpu) { @@ -1885,6 +2316,7 @@ int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size, free_buffer_page(bpage); } } + out_err_unlock: mutex_unlock(&buffer->mutex); return err; } @@ -1913,15 +2345,63 @@ rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer) cpu_buffer->reader_page->read); } -static __always_inline struct ring_buffer_event * -rb_iter_head_event(struct ring_buffer_iter *iter) +static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) { - return __rb_page_index(iter->head_page, iter->head); + return local_read(&bpage->page->commit); } -static __always_inline unsigned rb_page_commit(struct buffer_page *bpage) +static struct ring_buffer_event * +rb_iter_head_event(struct ring_buffer_iter *iter) { - return local_read(&bpage->page->commit); + struct ring_buffer_event *event; + struct buffer_page *iter_head_page = iter->head_page; + unsigned long commit; + unsigned length; + + if (iter->head != iter->next_event) + return iter->event; + + /* + * When the writer goes across pages, it issues a cmpxchg which + * is a mb(), which will synchronize with the rmb here. + * (see rb_tail_page_update() and __rb_reserve_next()) + */ + commit = rb_page_commit(iter_head_page); + smp_rmb(); + event = __rb_page_index(iter_head_page, iter->head); + length = rb_event_length(event); + + /* + * READ_ONCE() doesn't work on functions and we don't want the + * compiler doing any crazy optimizations with length. + */ + barrier(); + + if ((iter->head + length) > commit || length > BUF_MAX_DATA_SIZE) + /* Writer corrupted the read? */ + goto reset; + + memcpy(iter->event, event, length); + /* + * If the page stamp is still the same after this rmb() then the + * event was safely copied without the writer entering the page. + */ + smp_rmb(); + + /* Make sure the page didn't change since we read this */ + if (iter->page_stamp != iter_head_page->page->time_stamp || + commit > rb_page_commit(iter_head_page)) + goto reset; + + iter->next_event = iter->head + length; + return iter->event; + reset: + /* Reset to the beginning */ + iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; + iter->head = 0; + iter->next_event = 0; + iter->missed_events = 1; + return NULL; } /* Size is determined by what has been committed */ @@ -1957,10 +2437,11 @@ static void rb_inc_iter(struct ring_buffer_iter *iter) if (iter->head_page == cpu_buffer->reader_page) iter->head_page = rb_set_head_page(cpu_buffer); else - rb_inc_page(cpu_buffer, &iter->head_page); + rb_inc_page(&iter->head_page); - iter->read_stamp = iter->head_page->page->time_stamp; + iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp; iter->head = 0; + iter->next_event = 0; } /* @@ -2059,7 +2540,7 @@ rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer, * want the outer most commit to reset it. */ new_head = next_page; - rb_inc_page(cpu_buffer, &new_head); + rb_inc_page(&new_head); ret = rb_head_page_set_head(cpu_buffer, new_head, next_page, RB_PAGE_NORMAL); @@ -2178,6 +2659,9 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, /* Mark the rest of the page with padding */ rb_event_set_padding(event); + /* Make sure the padding is visible before the write update */ + smp_wmb(); + /* Set the write back to the previous setting */ local_sub(length, &tail_page->write); return; @@ -2189,6 +2673,9 @@ rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer, /* time delta must be non zero */ event->time_delta = 1; + /* Make sure the padding is visible before the tail_page->write update */ + smp_wmb(); + /* Set write to end of buffer */ length = (tail + length) - BUF_PAGE_SIZE; local_sub(length, &tail_page->write); @@ -2211,7 +2698,7 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, next_page = tail_page; - rb_inc_page(cpu_buffer, &next_page); + rb_inc_page(&next_page); /* * If for some reason, we had an interrupt storm that made @@ -2237,7 +2724,7 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, * the buffer, unless the commit page is still on the * reader page. */ - if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) { + if (rb_is_head_page(next_page, &tail_page->list)) { /* * If the commit is not on the reader page, then @@ -2268,7 +2755,7 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, * have filled up the buffer with events * from interrupts and such, and wrapped. * - * Note, if the tail page is also the on the + * Note, if the tail page is also on the * reader_page, we let it move out. */ if (unlikely((cpu_buffer->commit_page != @@ -2302,8 +2789,8 @@ rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer, return NULL; } -/* Slow path, do not inline */ -static noinline struct ring_buffer_event * +/* Slow path */ +static struct ring_buffer_event * rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) { if (abs) @@ -2324,8 +2811,72 @@ rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs) return skip_time_extend(event); } -static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, - struct ring_buffer_event *event); +#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK +static inline bool sched_clock_stable(void) +{ + return true; +} +#endif + +static void +rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer, + struct rb_event_info *info) +{ + u64 write_stamp; + + WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s", + (unsigned long long)info->delta, + (unsigned long long)info->ts, + (unsigned long long)info->before, + (unsigned long long)info->after, + (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0), + sched_clock_stable() ? "" : + "If you just came from a suspend/resume,\n" + "please switch to the trace global clock:\n" + " echo global > /sys/kernel/debug/tracing/trace_clock\n" + "or add trace_clock=global to the kernel command line\n"); +} + +static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer, + struct ring_buffer_event **event, + struct rb_event_info *info, + u64 *delta, + unsigned int *length) +{ + bool abs = info->add_timestamp & + (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE); + + if (unlikely(info->delta > (1ULL << 59))) { + /* + * Some timers can use more than 59 bits, and when a timestamp + * is added to the buffer, it will lose those bits. + */ + if (abs && (info->ts & TS_MSB)) { + info->delta &= ABS_TS_MASK; + + /* did the clock go backwards */ + } else if (info->before == info->after && info->before > info->ts) { + /* not interrupted */ + static int once; + + /* + * This is possible with a recalibrating of the TSC. + * Do not produce a call stack, but just report it. + */ + if (!once) { + once++; + pr_warn("Ring buffer clock went backwards: %llu -> %llu\n", + info->before, info->ts); + } + } else + rb_check_timestamp(cpu_buffer, info); + if (!abs) + info->delta = 0; + } + *event = rb_add_time_stamp(*event, info->delta, abs); + *length -= RB_LEN_TIME_EXTEND; + *delta = 0; +} /** * rb_update_event - update event type and data @@ -2345,26 +2896,21 @@ rb_update_event(struct ring_buffer_per_cpu *cpu_buffer, { unsigned length = info->length; u64 delta = info->delta; + unsigned int nest = local_read(&cpu_buffer->committing) - 1; - /* Only a commit updates the timestamp */ - if (unlikely(!rb_event_is_commit(cpu_buffer, event))) - delta = 0; + if (!WARN_ON_ONCE(nest >= MAX_NEST)) + cpu_buffer->event_stamp[nest] = info->ts; /* * If we need to add a timestamp, then we * add it to the start of the reserved space. */ - if (unlikely(info->add_timestamp)) { - bool abs = ring_buffer_time_stamp_abs(cpu_buffer->buffer); - - event = rb_add_time_stamp(event, info->delta, abs); - length -= RB_LEN_TIME_EXTEND; - delta = 0; - } + if (unlikely(info->add_timestamp)) + rb_add_timestamp(cpu_buffer, &event, info, &delta, &length); event->time_delta = delta; length -= RB_EVNT_HDR_SIZE; - if (length > RB_MAX_SMALL_DATA) { + if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) { event->type_len = 0; event->array[0] = length; } else @@ -2379,11 +2925,11 @@ static unsigned rb_calculate_event_length(unsigned length) if (!length) length++; - if (length > RB_MAX_SMALL_DATA) + if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) length += sizeof(event.array[0]); length += RB_EVNT_HDR_SIZE; - length = ALIGN(length, RB_ALIGNMENT); + length = ALIGN(length, RB_ARCH_ALIGNMENT); /* * In case the time delta is larger than the 27 bits for it @@ -2403,12 +2949,24 @@ static unsigned rb_calculate_event_length(unsigned length) return length; } -#ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK -static inline bool sched_clock_stable(void) +static u64 rb_time_delta(struct ring_buffer_event *event) { - return true; + switch (event->type_len) { + case RINGBUF_TYPE_PADDING: + return 0; + + case RINGBUF_TYPE_TIME_EXTEND: + return rb_event_time_stamp(event); + + case RINGBUF_TYPE_TIME_STAMP: + return 0; + + case RINGBUF_TYPE_DATA: + return event->time_delta; + default: + return 0; + } } -#endif static inline int rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, @@ -2418,6 +2976,8 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, struct buffer_page *bpage; unsigned long index; unsigned long addr; + u64 write_stamp; + u64 delta; new_index = rb_event_index(event); old_index = new_index + rb_event_ts_length(event); @@ -2426,10 +2986,43 @@ rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer, bpage = READ_ONCE(cpu_buffer->tail_page); + delta = rb_time_delta(event); + + if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp)) + return 0; + + /* Make sure the write stamp is read before testing the location */ + barrier(); + if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) { unsigned long write_mask = local_read(&bpage->write) & ~RB_WRITE_MASK; unsigned long event_length = rb_event_length(event); + + /* Something came in, can't discard */ + if (!rb_time_cmpxchg(&cpu_buffer->write_stamp, + write_stamp, write_stamp - delta)) + return 0; + + /* + * It's possible that the event time delta is zero + * (has the same time stamp as the previous event) + * in which case write_stamp and before_stamp could + * be the same. In such a case, force before_stamp + * to be different than write_stamp. It doesn't + * matter what it is, as long as its different. + */ + if (!delta) + rb_time_set(&cpu_buffer->before_stamp, 0); + + /* + * If an event were to come in now, it would see that the + * write_stamp and the before_stamp are different, and assume + * that this event just added itself before updating + * the write stamp. The interrupting event will fix the + * write stamp for us, and use the before stamp as its delta. + */ + /* * This is on the tail page. It is possible that * a write could come in and move the tail page @@ -2480,11 +3073,7 @@ rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer) return; local_set(&cpu_buffer->commit_page->page->commit, rb_page_write(cpu_buffer->commit_page)); - rb_inc_page(cpu_buffer, &cpu_buffer->commit_page); - /* Only update the write stamp if the page has an event */ - if (rb_page_write(cpu_buffer->commit_page)) - cpu_buffer->write_stamp = - cpu_buffer->commit_page->page->time_stamp; + rb_inc_page(&cpu_buffer->commit_page); /* add barrier to keep gcc from optimizing too much */ barrier(); } @@ -2556,54 +3145,10 @@ static inline void rb_event_discard(struct ring_buffer_event *event) event->time_delta = 1; } -static __always_inline bool -rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer, - struct ring_buffer_event *event) -{ - unsigned long addr = (unsigned long)event; - unsigned long index; - - index = rb_event_index(event); - addr &= PAGE_MASK; - - return cpu_buffer->commit_page->page == (void *)addr && - rb_commit_index(cpu_buffer) == index; -} - -static __always_inline void -rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer, - struct ring_buffer_event *event) -{ - u64 delta; - - /* - * The event first in the commit queue updates the - * time stamp. - */ - if (rb_event_is_commit(cpu_buffer, event)) { - /* - * A commit event that is first on a page - * updates the write timestamp with the page stamp - */ - if (!rb_event_index(event)) - cpu_buffer->write_stamp = - cpu_buffer->commit_page->page->time_stamp; - else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) { - delta = ring_buffer_event_time_stamp(event); - cpu_buffer->write_stamp += delta; - } else if (event->type_len == RINGBUF_TYPE_TIME_STAMP) { - delta = ring_buffer_event_time_stamp(event); - cpu_buffer->write_stamp = delta; - } else - cpu_buffer->write_stamp += event->time_delta; - } -} - static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer, struct ring_buffer_event *event) { local_inc(&cpu_buffer->entries); - rb_update_write_stamp(cpu_buffer, event); rb_end_commit(cpu_buffer); } @@ -2649,6 +3194,13 @@ rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) irq_work_queue(&cpu_buffer->irq_work.work); } +#ifdef CONFIG_RING_BUFFER_RECORD_RECURSION +# define do_ring_buffer_record_recursion() \ + do_ftrace_record_recursion(_THIS_IP_, _RET_IP_) +#else +# define do_ring_buffer_record_recursion() do { } while (0) +#endif + /* * The lock and unlock are done within a preempt disable section. * The current_context per_cpu variable can only be modified @@ -2659,10 +3211,10 @@ rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) * a bit of overhead in something as critical as function tracing, * we use a bitmask trick. * - * bit 0 = NMI context - * bit 1 = IRQ context - * bit 2 = SoftIRQ context - * bit 3 = normal context. + * bit 1 = NMI context + * bit 2 = IRQ context + * bit 3 = SoftIRQ context + * bit 4 = normal context. * * This works because this is the order of contexts that can * preempt other contexts. A SoftIRQ never preempts an IRQ @@ -2685,23 +3237,52 @@ rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer) * The least significant bit can be cleared this way, and it * just so happens that it is the same bit corresponding to * the current context. + * + * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit + * is set when a recursion is detected at the current context, and if + * the TRANSITION bit is already set, it will fail the recursion. + * This is needed because there's a lag between the changing of + * interrupt context and updating the preempt count. In this case, + * a false positive will be found. To handle this, one extra recursion + * is allowed, and this is done by the TRANSITION bit. If the TRANSITION + * bit is already set, then it is considered a recursion and the function + * ends. Otherwise, the TRANSITION bit is set, and that bit is returned. + * + * On the trace_recursive_unlock(), the TRANSITION bit will be the first + * to be cleared. Even if it wasn't the context that set it. That is, + * if an interrupt comes in while NORMAL bit is set and the ring buffer + * is called before preempt_count() is updated, since the check will + * be on the NORMAL bit, the TRANSITION bit will then be set. If an + * NMI then comes in, it will set the NMI bit, but when the NMI code + * does the trace_recursive_unlock() it will clear the TRANSITION bit + * and leave the NMI bit set. But this is fine, because the interrupt + * code that set the TRANSITION bit will then clear the NMI bit when it + * calls trace_recursive_unlock(). If another NMI comes in, it will + * set the TRANSITION bit and continue. + * + * Note: The TRANSITION bit only handles a single transition between context. */ static __always_inline int trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer) { unsigned int val = cpu_buffer->current_context; - unsigned long pc = preempt_count(); - int bit; + int bit = interrupt_context_level(); - if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET))) - bit = RB_CTX_NORMAL; - else - bit = pc & NMI_MASK ? RB_CTX_NMI : - pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ; + bit = RB_CTX_NORMAL - bit; - if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) - return 1; + if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) { + /* + * It is possible that this was called by transitioning + * between interrupt context, and preempt_count() has not + * been updated yet. In this case, use the TRANSITION bit. + */ + bit = RB_CTX_TRANSITION; + if (val & (1 << (bit + cpu_buffer->nest))) { + do_ring_buffer_record_recursion(); + return 1; + } + } val |= (1 << (bit + cpu_buffer->nest)); cpu_buffer->current_context = val; @@ -2716,8 +3297,8 @@ trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer) cpu_buffer->current_context - (1 << cpu_buffer->nest); } -/* The recursive locking above uses 4 bits */ -#define NESTED_BITS 4 +/* The recursive locking above uses 5 bits */ +#define NESTED_BITS 5 /** * ring_buffer_nest_start - Allow to trace while nested @@ -2794,58 +3375,290 @@ int ring_buffer_unlock_commit(struct trace_buffer *buffer, } EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit); -static noinline void -rb_handle_timestamp(struct ring_buffer_per_cpu *cpu_buffer, - struct rb_event_info *info) +/* Special value to validate all deltas on a page. */ +#define CHECK_FULL_PAGE 1L + +#ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS +static void dump_buffer_page(struct buffer_data_page *bpage, + struct rb_event_info *info, + unsigned long tail) { - WARN_ONCE(info->delta > (1ULL << 59), - KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s", - (unsigned long long)info->delta, - (unsigned long long)info->ts, - (unsigned long long)cpu_buffer->write_stamp, - sched_clock_stable() ? "" : - "If you just came from a suspend/resume,\n" - "please switch to the trace global clock:\n" - " echo global > /sys/kernel/debug/tracing/trace_clock\n" - "or add trace_clock=global to the kernel command line\n"); - info->add_timestamp = 1; + struct ring_buffer_event *event; + u64 ts, delta; + int e; + + ts = bpage->time_stamp; + pr_warn(" [%lld] PAGE TIME STAMP\n", ts); + + for (e = 0; e < tail; e += rb_event_length(event)) { + + event = (struct ring_buffer_event *)(bpage->data + e); + + switch (event->type_len) { + + case RINGBUF_TYPE_TIME_EXTEND: + delta = rb_event_time_stamp(event); + ts += delta; + pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta); + break; + + case RINGBUF_TYPE_TIME_STAMP: + delta = rb_event_time_stamp(event); + ts = rb_fix_abs_ts(delta, ts); + pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta); + break; + + case RINGBUF_TYPE_PADDING: + ts += event->time_delta; + pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta); + break; + + case RINGBUF_TYPE_DATA: + ts += event->time_delta; + pr_warn(" [%lld] delta:%d\n", ts, event->time_delta); + break; + + default: + break; + } + } } +static DEFINE_PER_CPU(atomic_t, checking); +static atomic_t ts_dump; + +/* + * Check if the current event time stamp matches the deltas on + * the buffer page. + */ +static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, + struct rb_event_info *info, + unsigned long tail) +{ + struct ring_buffer_event *event; + struct buffer_data_page *bpage; + u64 ts, delta; + bool full = false; + int e; + + bpage = info->tail_page->page; + + if (tail == CHECK_FULL_PAGE) { + full = true; + tail = local_read(&bpage->commit); + } else if (info->add_timestamp & + (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) { + /* Ignore events with absolute time stamps */ + return; + } + + /* + * Do not check the first event (skip possible extends too). + * Also do not check if previous events have not been committed. + */ + if (tail <= 8 || tail > local_read(&bpage->commit)) + return; + + /* + * If this interrupted another event, + */ + if (atomic_inc_return(this_cpu_ptr(&checking)) != 1) + goto out; + + ts = bpage->time_stamp; + + for (e = 0; e < tail; e += rb_event_length(event)) { + + event = (struct ring_buffer_event *)(bpage->data + e); + + switch (event->type_len) { + + case RINGBUF_TYPE_TIME_EXTEND: + delta = rb_event_time_stamp(event); + ts += delta; + break; + + case RINGBUF_TYPE_TIME_STAMP: + delta = rb_event_time_stamp(event); + ts = rb_fix_abs_ts(delta, ts); + break; + + case RINGBUF_TYPE_PADDING: + if (event->time_delta == 1) + break; + fallthrough; + case RINGBUF_TYPE_DATA: + ts += event->time_delta; + break; + + default: + RB_WARN_ON(cpu_buffer, 1); + } + } + if ((full && ts > info->ts) || + (!full && ts + info->delta != info->ts)) { + /* If another report is happening, ignore this one */ + if (atomic_inc_return(&ts_dump) != 1) { + atomic_dec(&ts_dump); + goto out; + } + atomic_inc(&cpu_buffer->record_disabled); + /* There's some cases in boot up that this can happen */ + WARN_ON_ONCE(system_state != SYSTEM_BOOTING); + pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n", + cpu_buffer->cpu, + ts + info->delta, info->ts, info->delta, + info->before, info->after, + full ? " (full)" : ""); + dump_buffer_page(bpage, info, tail); + atomic_dec(&ts_dump); + /* Do not re-enable checking */ + return; + } +out: + atomic_dec(this_cpu_ptr(&checking)); +} +#else +static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer, + struct rb_event_info *info, + unsigned long tail) +{ +} +#endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */ + static struct ring_buffer_event * __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, struct rb_event_info *info) { struct ring_buffer_event *event; struct buffer_page *tail_page; - unsigned long tail, write; - - /* - * If the time delta since the last event is too big to - * hold in the time field of the event, then we append a - * TIME EXTEND event ahead of the data event. - */ - if (unlikely(info->add_timestamp)) - info->length += RB_LEN_TIME_EXTEND; + unsigned long tail, write, w; + bool a_ok; + bool b_ok; /* Don't let the compiler play games with cpu_buffer->tail_page */ tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page); - write = local_add_return(info->length, &tail_page->write); + + /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK; + barrier(); + b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); + a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); + barrier(); + info->ts = rb_time_stamp(cpu_buffer->buffer); + + if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) { + info->delta = info->ts; + } else { + /* + * If interrupting an event time update, we may need an + * absolute timestamp. + * Don't bother if this is the start of a new page (w == 0). + */ + if (unlikely(!a_ok || !b_ok || (info->before != info->after && w))) { + info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND; + info->length += RB_LEN_TIME_EXTEND; + } else { + info->delta = info->ts - info->after; + if (unlikely(test_time_stamp(info->delta))) { + info->add_timestamp |= RB_ADD_STAMP_EXTEND; + info->length += RB_LEN_TIME_EXTEND; + } + } + } + + /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts); + + /*C*/ write = local_add_return(info->length, &tail_page->write); /* set write to only the index of the write */ write &= RB_WRITE_MASK; + tail = write - info->length; + /* See if we shot pass the end of this buffer page */ + if (unlikely(write > BUF_PAGE_SIZE)) { + /* before and after may now different, fix it up*/ + b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before); + a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); + if (a_ok && b_ok && info->before != info->after) + (void)rb_time_cmpxchg(&cpu_buffer->before_stamp, + info->before, info->after); + if (a_ok && b_ok) + check_buffer(cpu_buffer, info, CHECK_FULL_PAGE); + return rb_move_tail(cpu_buffer, tail, info); + } + + if (likely(tail == w)) { + u64 save_before; + bool s_ok; + + /* Nothing interrupted us between A and C */ + /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts); + barrier(); + /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before); + RB_WARN_ON(cpu_buffer, !s_ok); + if (likely(!(info->add_timestamp & + (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) + /* This did not interrupt any time update */ + info->delta = info->ts - info->after; + else + /* Just use full timestamp for interrupting event */ + info->delta = info->ts; + barrier(); + check_buffer(cpu_buffer, info, tail); + if (unlikely(info->ts != save_before)) { + /* SLOW PATH - Interrupted between C and E */ + + a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); + RB_WARN_ON(cpu_buffer, !a_ok); + + /* Write stamp must only go forward */ + if (save_before > info->after) { + /* + * We do not care about the result, only that + * it gets updated atomically. + */ + (void)rb_time_cmpxchg(&cpu_buffer->write_stamp, + info->after, save_before); + } + } + } else { + u64 ts; + /* SLOW PATH - Interrupted between A and C */ + a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after); + /* Was interrupted before here, write_stamp must be valid */ + RB_WARN_ON(cpu_buffer, !a_ok); + ts = rb_time_stamp(cpu_buffer->buffer); + barrier(); + /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) && + info->after < ts && + rb_time_cmpxchg(&cpu_buffer->write_stamp, + info->after, ts)) { + /* Nothing came after this event between C and E */ + info->delta = ts - info->after; + } else { + /* + * Interrupted between C and E: + * Lost the previous events time stamp. Just set the + * delta to zero, and this will be the same time as + * the event this event interrupted. And the events that + * came after this will still be correct (as they would + * have built their delta on the previous event. + */ + info->delta = 0; + } + info->ts = ts; + info->add_timestamp &= ~RB_ADD_STAMP_FORCE; + } + /* * If this is the first commit on the page, then it has the same * timestamp as the page itself. */ - if (!tail && !ring_buffer_time_stamp_abs(cpu_buffer->buffer)) + if (unlikely(!tail && !(info->add_timestamp & + (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)))) info->delta = 0; - /* See if we shot pass the end of this buffer page */ - if (unlikely(write > BUF_PAGE_SIZE)) - return rb_move_tail(cpu_buffer, tail, info); - /* We reserved something on the buffer */ event = __rb_page_index(tail_page, tail); @@ -2857,7 +3670,7 @@ __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer, * If this is the first commit on the page, then update * its timestamp. */ - if (!tail) + if (unlikely(!tail)) tail_page->page->time_stamp = info->ts; /* account for these added bytes */ @@ -2874,9 +3687,10 @@ rb_reserve_next_event(struct trace_buffer *buffer, struct ring_buffer_event *event; struct rb_event_info info; int nr_loops = 0; - u64 diff; + int add_ts_default; rb_start_commit(cpu_buffer); + /* The commit page can not change after this */ #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP /* @@ -2894,8 +3708,16 @@ rb_reserve_next_event(struct trace_buffer *buffer, #endif info.length = rb_calculate_event_length(length); + + if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) { + add_ts_default = RB_ADD_STAMP_ABSOLUTE; + info.length += RB_LEN_TIME_EXTEND; + } else { + add_ts_default = RB_ADD_STAMP_NONE; + } + again: - info.add_timestamp = 0; + info.add_timestamp = add_ts_default; info.delta = 0; /* @@ -2910,35 +3732,16 @@ rb_reserve_next_event(struct trace_buffer *buffer, if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000)) goto out_fail; - info.ts = rb_time_stamp(cpu_buffer->buffer); - diff = info.ts - cpu_buffer->write_stamp; - - /* make sure this diff is calculated here */ - barrier(); - - if (ring_buffer_time_stamp_abs(buffer)) { - info.delta = info.ts; - rb_handle_timestamp(cpu_buffer, &info); - } else /* Did the write stamp get updated already? */ - if (likely(info.ts >= cpu_buffer->write_stamp)) { - info.delta = diff; - if (unlikely(test_time_stamp(info.delta))) - rb_handle_timestamp(cpu_buffer, &info); - } - event = __rb_reserve_next(cpu_buffer, &info); if (unlikely(PTR_ERR(event) == -EAGAIN)) { - if (info.add_timestamp) + if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND)) info.length -= RB_LEN_TIME_EXTEND; goto again; } - if (!event) - goto out_fail; - - return event; - + if (likely(event)) + return event; out_fail: rb_end_commit(cpu_buffer); return NULL; @@ -3028,14 +3831,14 @@ rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, * Because the commit page may be on the reader page we * start with the next page and check the end loop there. */ - rb_inc_page(cpu_buffer, &bpage); + rb_inc_page(&bpage); start = bpage; do { if (bpage->page == (void *)addr) { local_dec(&bpage->entries); return; } - rb_inc_page(cpu_buffer, &bpage); + rb_inc_page(&bpage); } while (bpage != start); /* commit not part of this buffer?? */ @@ -3043,7 +3846,7 @@ rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer, } /** - * ring_buffer_commit_discard - discard an event that has not been committed + * ring_buffer_discard_commit - discard an event that has not been committed * @buffer: the ring buffer * @event: non committed event to discard * @@ -3084,11 +3887,6 @@ void ring_buffer_discard_commit(struct trace_buffer *buffer, if (rb_try_to_discard(cpu_buffer, event)) goto out; - /* - * The commit is still visible by the reader, so we - * must still update the timestamp. - */ - rb_update_write_stamp(cpu_buffer, event); out: rb_end_commit(cpu_buffer); @@ -3177,10 +3975,30 @@ static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer) if (unlikely(!head)) return true; - return reader->read == rb_page_commit(reader) && - (commit == reader || - (commit == head && - head->read == rb_page_commit(commit))); + /* Reader should exhaust content in reader page */ + if (reader->read != rb_page_commit(reader)) + return false; + + /* + * If writers are committing on the reader page, knowing all + * committed content has been read, the ring buffer is empty. + */ + if (commit == reader) + return true; + + /* + * If writers are committing on a page other than reader page + * and head page, there should always be content to read. + */ + if (commit != head) + return false; + + /* + * Writers are committing on the head page, we just need + * to care about there're committed data, and the reader will + * swap reader page with head page when it is to read data. + */ + return rb_page_commit(commit) == 0; } /** @@ -3547,14 +4365,18 @@ static void rb_iter_reset(struct ring_buffer_iter *iter) /* Iterator usage is expected to have record disabled */ iter->head_page = cpu_buffer->reader_page; iter->head = cpu_buffer->reader_page->read; + iter->next_event = iter->head; iter->cache_reader_page = iter->head_page; iter->cache_read = cpu_buffer->read; - if (iter->head) + if (iter->head) { iter->read_stamp = cpu_buffer->read_stamp; - else + iter->page_stamp = cpu_buffer->reader_page->page->time_stamp; + } else { iter->read_stamp = iter->head_page->page->time_stamp; + iter->page_stamp = iter->read_stamp; + } } /** @@ -3590,17 +4412,38 @@ int ring_buffer_iter_empty(struct ring_buffer_iter *iter) struct buffer_page *reader; struct buffer_page *head_page; struct buffer_page *commit_page; + struct buffer_page *curr_commit_page; unsigned commit; + u64 curr_commit_ts; + u64 commit_ts; cpu_buffer = iter->cpu_buffer; - - /* Remember, trace recording is off when iterator is in use */ reader = cpu_buffer->reader_page; head_page = cpu_buffer->head_page; commit_page = cpu_buffer->commit_page; + commit_ts = commit_page->page->time_stamp; + + /* + * When the writer goes across pages, it issues a cmpxchg which + * is a mb(), which will synchronize with the rmb here. + * (see rb_tail_page_update()) + */ + smp_rmb(); commit = rb_page_commit(commit_page); + /* We want to make sure that the commit page doesn't change */ + smp_rmb(); + + /* Make sure commit page didn't change */ + curr_commit_page = READ_ONCE(cpu_buffer->commit_page); + curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp); - return ((iter->head_page == commit_page && iter->head == commit) || + /* If the commit page changed, then there's more data */ + if (curr_commit_page != commit_page || + curr_commit_ts != commit_ts) + return 0; + + /* Still racy, as it may return a false positive, but that's OK */ + return ((iter->head_page == commit_page && iter->head >= commit) || (iter->head_page == reader && commit_page == head_page && head_page->read == commit && iter->head == rb_page_commit(cpu_buffer->reader_page))); @@ -3618,12 +4461,13 @@ rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, return; case RINGBUF_TYPE_TIME_EXTEND: - delta = ring_buffer_event_time_stamp(event); + delta = rb_event_time_stamp(event); cpu_buffer->read_stamp += delta; return; case RINGBUF_TYPE_TIME_STAMP: - delta = ring_buffer_event_time_stamp(event); + delta = rb_event_time_stamp(event); + delta = rb_fix_abs_ts(delta, cpu_buffer->read_stamp); cpu_buffer->read_stamp = delta; return; @@ -3632,7 +4476,7 @@ rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer, return; default: - BUG(); + RB_WARN_ON(cpu_buffer, 1); } return; } @@ -3648,12 +4492,13 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter, return; case RINGBUF_TYPE_TIME_EXTEND: - delta = ring_buffer_event_time_stamp(event); + delta = rb_event_time_stamp(event); iter->read_stamp += delta; return; case RINGBUF_TYPE_TIME_STAMP: - delta = ring_buffer_event_time_stamp(event); + delta = rb_event_time_stamp(event); + delta = rb_fix_abs_ts(delta, iter->read_stamp); iter->read_stamp = delta; return; @@ -3662,7 +4507,7 @@ rb_update_iter_read_stamp(struct ring_buffer_iter *iter, return; default: - BUG(); + RB_WARN_ON(iter->cpu_buffer, 1); } return; } @@ -3737,7 +4582,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) cpu_buffer->pages = reader->list.prev; /* The reader page will be pointing to the new head */ - rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list); + rb_set_list_to_head(&cpu_buffer->reader_page->list); /* * We want to make sure we read the overruns after we set up our @@ -3776,7 +4621,7 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) * Now make the new head point back to the reader page. */ rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list; - rb_inc_page(cpu_buffer, &cpu_buffer->head_page); + rb_inc_page(&cpu_buffer->head_page); local_inc(&cpu_buffer->pages_read); @@ -3799,6 +4644,33 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer) arch_spin_unlock(&cpu_buffer->lock); local_irq_restore(flags); + /* + * The writer has preempt disable, wait for it. But not forever + * Although, 1 second is pretty much "forever" + */ +#define USECS_WAIT 1000000 + for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) { + /* If the write is past the end of page, a writer is still updating it */ + if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE)) + break; + + udelay(1); + + /* Get the latest version of the reader write value */ + smp_rmb(); + } + + /* The writer is not moving forward? Something is wrong */ + if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT)) + reader = NULL; + + /* + * Make sure we see any padding after the write update + * (see rb_reset_tail()) + */ + smp_rmb(); + + return reader; } @@ -3828,15 +4700,22 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer) static void rb_advance_iter(struct ring_buffer_iter *iter) { struct ring_buffer_per_cpu *cpu_buffer; - struct ring_buffer_event *event; - unsigned length; cpu_buffer = iter->cpu_buffer; + /* If head == next_event then we need to jump to the next event */ + if (iter->head == iter->next_event) { + /* If the event gets overwritten again, there's nothing to do */ + if (rb_iter_head_event(iter) == NULL) + return; + } + + iter->head = iter->next_event; + /* * Check if we are at the end of the buffer. */ - if (iter->head >= rb_page_size(iter->head_page)) { + if (iter->next_event >= rb_page_size(iter->head_page)) { /* discarded commits can make the page empty */ if (iter->head_page == cpu_buffer->commit_page) return; @@ -3844,27 +4723,7 @@ static void rb_advance_iter(struct ring_buffer_iter *iter) return; } - event = rb_iter_head_event(iter); - - length = rb_event_length(event); - - /* - * This should not be called to advance the header if we are - * at the tail of the buffer. - */ - if (RB_WARN_ON(cpu_buffer, - (iter->head_page == cpu_buffer->commit_page) && - (iter->head + length > rb_commit_index(cpu_buffer)))) - return; - - rb_update_iter_read_stamp(iter, event); - - iter->head += length; - - /* check for end of page padding */ - if ((iter->head >= rb_page_size(iter->head_page)) && - (iter->head_page != cpu_buffer->commit_page)) - rb_inc_iter(iter); + rb_update_iter_read_stamp(iter, iter->event); } static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer) @@ -3919,7 +4778,8 @@ rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, case RINGBUF_TYPE_TIME_STAMP: if (ts) { - *ts = ring_buffer_event_time_stamp(event); + *ts = rb_event_time_stamp(event); + *ts = rb_fix_abs_ts(*ts, reader->page->time_stamp); ring_buffer_normalize_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu, ts); } @@ -3938,7 +4798,7 @@ rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts, return event; default: - BUG(); + RB_WARN_ON(cpu_buffer, 1); } return NULL; @@ -3973,14 +4833,13 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) return NULL; /* - * We repeat when a time extend is encountered or we hit - * the end of the page. Since the time extend is always attached - * to a data event, we should never loop more than three times. - * Once for going to next page, once on time extend, and - * finally once to get the event. - * (We never hit the following condition more than thrice). + * As the writer can mess with what the iterator is trying + * to read, just give up if we fail to get an event after + * three tries. The iterator is not as reliable when reading + * the ring buffer with an active write as the consumer is. + * Do not warn if the three failures is reached. */ - if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) + if (++nr_loops > 3) return NULL; if (rb_per_cpu_empty(cpu_buffer)) @@ -3992,6 +4851,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) } event = rb_iter_head_event(iter); + if (!event) + goto again; switch (event->type_len) { case RINGBUF_TYPE_PADDING: @@ -4009,7 +4870,8 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) case RINGBUF_TYPE_TIME_STAMP: if (ts) { - *ts = ring_buffer_event_time_stamp(event); + *ts = rb_event_time_stamp(event); + *ts = rb_fix_abs_ts(*ts, iter->head_page->page->time_stamp); ring_buffer_normalize_time_stamp(cpu_buffer->buffer, cpu_buffer->cpu, ts); } @@ -4026,7 +4888,7 @@ rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts) return event; default: - BUG(); + RB_WARN_ON(cpu_buffer, 1); } return NULL; @@ -4102,6 +4964,20 @@ ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts, return event; } +/** ring_buffer_iter_dropped - report if there are dropped events + * @iter: The ring buffer iterator + * + * Returns true if there was dropped events since the last peek. + */ +bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter) +{ + bool ret = iter->missed_events != 0; + + iter->missed_events = 0; + return ret; +} +EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped); + /** * ring_buffer_iter_peek - peek at the next event to be read * @iter: The ring buffer iterator @@ -4208,16 +5084,21 @@ ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags) if (!cpumask_test_cpu(cpu, buffer->cpumask)) return NULL; - iter = kmalloc(sizeof(*iter), flags); + iter = kzalloc(sizeof(*iter), flags); if (!iter) return NULL; + iter->event = kmalloc(BUF_MAX_DATA_SIZE, flags); + if (!iter->event) { + kfree(iter); + return NULL; + } + cpu_buffer = buffer->buffers[cpu]; iter->cpu_buffer = cpu_buffer; - atomic_inc(&buffer->resize_disabled); - atomic_inc(&cpu_buffer->record_disabled); + atomic_inc(&cpu_buffer->resize_disabled); return iter; } @@ -4290,42 +5171,31 @@ ring_buffer_read_finish(struct ring_buffer_iter *iter) rb_check_pages(cpu_buffer); raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); - atomic_dec(&cpu_buffer->record_disabled); - atomic_dec(&cpu_buffer->buffer->resize_disabled); + atomic_dec(&cpu_buffer->resize_disabled); + kfree(iter->event); kfree(iter); } EXPORT_SYMBOL_GPL(ring_buffer_read_finish); /** - * ring_buffer_read - read the next item in the ring buffer by the iterator + * ring_buffer_iter_advance - advance the iterator to the next location * @iter: The ring buffer iterator - * @ts: The time stamp of the event read. * - * This reads the next event in the ring buffer and increments the iterator. + * Move the location of the iterator such that the next read will + * be the next location of the iterator. */ -struct ring_buffer_event * -ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts) +void ring_buffer_iter_advance(struct ring_buffer_iter *iter) { - struct ring_buffer_event *event; struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer; unsigned long flags; raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); - again: - event = rb_iter_peek(iter, ts); - if (!event) - goto out; - - if (event->type_len == RINGBUF_TYPE_PADDING) - goto again; rb_advance_iter(iter); - out: - raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); - return event; + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); } -EXPORT_SYMBOL_GPL(ring_buffer_read); +EXPORT_SYMBOL_GPL(ring_buffer_iter_advance); /** * ring_buffer_size - return the size of the ring buffer (in bytes) @@ -4384,8 +5254,10 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) cpu_buffer->read = 0; cpu_buffer->read_bytes = 0; - cpu_buffer->write_stamp = 0; - cpu_buffer->read_stamp = 0; + rb_time_set(&cpu_buffer->write_stamp, 0); + rb_time_set(&cpu_buffer->before_stamp, 0); + + memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp)); cpu_buffer->lost_events = 0; cpu_buffer->last_overrun = 0; @@ -4393,6 +5265,26 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) rb_head_page_activate(cpu_buffer); } +/* Must have disabled the cpu buffer then done a synchronize_rcu */ +static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer) +{ + unsigned long flags; + + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + + if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) + goto out; + + arch_spin_lock(&cpu_buffer->lock); + + rb_reset_cpu(cpu_buffer); + + arch_spin_unlock(&cpu_buffer->lock); + + out: + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +} + /** * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer * @buffer: The ring buffer to reset a per cpu buffer of @@ -4401,35 +5293,62 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu) { struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; - unsigned long flags; if (!cpumask_test_cpu(cpu, buffer->cpumask)) return; - atomic_inc(&buffer->resize_disabled); + /* prevent another thread from changing buffer sizes */ + mutex_lock(&buffer->mutex); + + atomic_inc(&cpu_buffer->resize_disabled); atomic_inc(&cpu_buffer->record_disabled); /* Make sure all commits have finished */ synchronize_rcu(); - raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + reset_disabled_cpu_buffer(cpu_buffer); - if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing))) - goto out; + atomic_dec(&cpu_buffer->record_disabled); + atomic_dec(&cpu_buffer->resize_disabled); - arch_spin_lock(&cpu_buffer->lock); + mutex_unlock(&buffer->mutex); +} +EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); - rb_reset_cpu(cpu_buffer); +/** + * ring_buffer_reset_online_cpus - reset a ring buffer per CPU buffer + * @buffer: The ring buffer to reset a per cpu buffer of + * @cpu: The CPU buffer to be reset + */ +void ring_buffer_reset_online_cpus(struct trace_buffer *buffer) +{ + struct ring_buffer_per_cpu *cpu_buffer; + int cpu; - arch_spin_unlock(&cpu_buffer->lock); + /* prevent another thread from changing buffer sizes */ + mutex_lock(&buffer->mutex); - out: - raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + for_each_online_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; - atomic_dec(&cpu_buffer->record_disabled); - atomic_dec(&buffer->resize_disabled); + atomic_inc(&cpu_buffer->resize_disabled); + atomic_inc(&cpu_buffer->record_disabled); + } + + /* Make sure all commits have finished */ + synchronize_rcu(); + + for_each_online_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; + + reset_disabled_cpu_buffer(cpu_buffer); + + atomic_dec(&cpu_buffer->record_disabled); + atomic_dec(&cpu_buffer->resize_disabled); + } + + mutex_unlock(&buffer->mutex); } -EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); /** * ring_buffer_reset - reset a ring buffer @@ -4437,15 +5356,37 @@ EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu); */ void ring_buffer_reset(struct trace_buffer *buffer) { + struct ring_buffer_per_cpu *cpu_buffer; int cpu; - for_each_buffer_cpu(buffer, cpu) - ring_buffer_reset_cpu(buffer, cpu); + /* prevent another thread from changing buffer sizes */ + mutex_lock(&buffer->mutex); + + for_each_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; + + atomic_inc(&cpu_buffer->resize_disabled); + atomic_inc(&cpu_buffer->record_disabled); + } + + /* Make sure all commits have finished */ + synchronize_rcu(); + + for_each_buffer_cpu(buffer, cpu) { + cpu_buffer = buffer->buffers[cpu]; + + reset_disabled_cpu_buffer(cpu_buffer); + + atomic_dec(&cpu_buffer->record_disabled); + atomic_dec(&cpu_buffer->resize_disabled); + } + + mutex_unlock(&buffer->mutex); } EXPORT_SYMBOL_GPL(ring_buffer_reset); /** - * rind_buffer_empty - is the ring buffer empty? + * ring_buffer_empty - is the ring buffer empty? * @buffer: The ring buffer to test */ bool ring_buffer_empty(struct trace_buffer *buffer) @@ -4759,7 +5700,15 @@ int ring_buffer_read_page(struct trace_buffer *buffer, unsigned int pos = 0; unsigned int size; - if (full) + /* + * If a full page is expected, this can still be returned + * if there's been a previous partial read and the + * rest of the page can be read and the commit page is off + * the reader page. + */ + if (full && + (!read || (len < (commit - read)) || + cpu_buffer->reader_page == cpu_buffer->commit_page)) goto out_unlock; if (len > (commit - read)) @@ -5088,16 +6037,13 @@ static __init int test_ringbuffer(void) rb_data[cpu].buffer = buffer; rb_data[cpu].cpu = cpu; rb_data[cpu].cnt = cpu; - rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu], - "rbtester/%d", cpu); + rb_threads[cpu] = kthread_run_on_cpu(rb_test, &rb_data[cpu], + cpu, "rbtester/%u"); if (WARN_ON(IS_ERR(rb_threads[cpu]))) { pr_cont("FAILED\n"); ret = PTR_ERR(rb_threads[cpu]); goto out_free; } - - kthread_bind(rb_threads[cpu], cpu); - wake_up_process(rb_threads[cpu]); } /* Now create the rb hammer! */ @@ -5204,10 +6150,10 @@ static __init int test_ringbuffer(void) pr_info(" total events: %ld\n", total_lost + total_read); pr_info(" recorded len bytes: %ld\n", total_len); pr_info(" recorded size bytes: %ld\n", total_size); - if (total_lost) + if (total_lost) { pr_info(" With dropped events, record len and size may not match\n" " alloced and written from above\n"); - if (!total_lost) { + } else { if (RB_WARN_ON(buffer, total_len != total_alloc || total_size != total_written)) break; |