aboutsummaryrefslogtreecommitdiffstats
path: root/kernel/bpf/ringbuf.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--kernel/bpf/ringbuf.c253
1 files changed, 237 insertions, 16 deletions
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index ded4faeca192..9e832acf4692 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -38,10 +38,43 @@ struct bpf_ringbuf {
struct page **pages;
int nr_pages;
spinlock_t spinlock ____cacheline_aligned_in_smp;
- /* Consumer and producer counters are put into separate pages to allow
- * mapping consumer page as r/w, but restrict producer page to r/o.
- * This protects producer position from being modified by user-space
- * application and ruining in-kernel position tracking.
+ /* For user-space producer ring buffers, an atomic_t busy bit is used
+ * to synchronize access to the ring buffers in the kernel, rather than
+ * the spinlock that is used for kernel-producer ring buffers. This is
+ * done because the ring buffer must hold a lock across a BPF program's
+ * callback:
+ *
+ * __bpf_user_ringbuf_peek() // lock acquired
+ * -> program callback_fn()
+ * -> __bpf_user_ringbuf_sample_release() // lock released
+ *
+ * It is unsafe and incorrect to hold an IRQ spinlock across what could
+ * be a long execution window, so we instead simply disallow concurrent
+ * access to the ring buffer by kernel consumers, and return -EBUSY from
+ * __bpf_user_ringbuf_peek() if the busy bit is held by another task.
+ */
+ atomic_t busy ____cacheline_aligned_in_smp;
+ /* Consumer and producer counters are put into separate pages to
+ * allow each position to be mapped with different permissions.
+ * This prevents a user-space application from modifying the
+ * position and ruining in-kernel tracking. The permissions of the
+ * pages depend on who is producing samples: user-space or the
+ * kernel.
+ *
+ * Kernel-producer
+ * ---------------
+ * The producer position and data pages are mapped as r/o in
+ * userspace. For this approach, bits in the header of samples are
+ * used to signal to user-space, and to other producers, whether a
+ * sample is currently being written.
+ *
+ * User-space producer
+ * -------------------
+ * Only the page containing the consumer position is mapped r/o in
+ * user-space. User-space producers also use bits of the header to
+ * communicate to the kernel, but the kernel must carefully check and
+ * validate each sample to ensure that they're correctly formatted, and
+ * fully contained within the ring buffer.
*/
unsigned long consumer_pos __aligned(PAGE_SIZE);
unsigned long producer_pos __aligned(PAGE_SIZE);
@@ -116,7 +149,7 @@ static struct bpf_ringbuf *bpf_ringbuf_area_alloc(size_t data_sz, int numa_node)
err_free_pages:
for (i = 0; i < nr_pages; i++)
__free_page(pages[i]);
- kvfree(pages);
+ bpf_map_area_free(pages);
return NULL;
}
@@ -136,6 +169,7 @@ static struct bpf_ringbuf *bpf_ringbuf_alloc(size_t data_sz, int numa_node)
return NULL;
spin_lock_init(&rb->spinlock);
+ atomic_set(&rb->busy, 0);
init_waitqueue_head(&rb->waitq);
init_irq_work(&rb->work, bpf_ringbuf_notify);
@@ -164,7 +198,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
return ERR_PTR(-E2BIG);
#endif
- rb_map = kzalloc(sizeof(*rb_map), GFP_USER | __GFP_ACCOUNT);
+ rb_map = bpf_map_area_alloc(sizeof(*rb_map), NUMA_NO_NODE);
if (!rb_map)
return ERR_PTR(-ENOMEM);
@@ -172,7 +206,7 @@ static struct bpf_map *ringbuf_map_alloc(union bpf_attr *attr)
rb_map->rb = bpf_ringbuf_alloc(attr->max_entries, rb_map->map.numa_node);
if (!rb_map->rb) {
- kfree(rb_map);
+ bpf_map_area_free(rb_map);
return ERR_PTR(-ENOMEM);
}
@@ -190,7 +224,7 @@ static void bpf_ringbuf_free(struct bpf_ringbuf *rb)
vunmap(rb);
for (i = 0; i < nr_pages; i++)
__free_page(pages[i]);
- kvfree(pages);
+ bpf_map_area_free(pages);
}
static void ringbuf_map_free(struct bpf_map *map)
@@ -199,7 +233,7 @@ static void ringbuf_map_free(struct bpf_map *map)
rb_map = container_of(map, struct bpf_ringbuf_map, map);
bpf_ringbuf_free(rb_map->rb);
- kfree(rb_map);
+ bpf_map_area_free(rb_map);
}
static void *ringbuf_map_lookup_elem(struct bpf_map *map, void *key)
@@ -224,7 +258,7 @@ static int ringbuf_map_get_next_key(struct bpf_map *map, void *key,
return -ENOTSUPP;
}
-static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
+static int ringbuf_map_mmap_kern(struct bpf_map *map, struct vm_area_struct *vma)
{
struct bpf_ringbuf_map *rb_map;
@@ -242,6 +276,26 @@ static int ringbuf_map_mmap(struct bpf_map *map, struct vm_area_struct *vma)
vma->vm_pgoff + RINGBUF_PGOFF);
}
+static int ringbuf_map_mmap_user(struct bpf_map *map, struct vm_area_struct *vma)
+{
+ struct bpf_ringbuf_map *rb_map;
+
+ rb_map = container_of(map, struct bpf_ringbuf_map, map);
+
+ if (vma->vm_flags & VM_WRITE) {
+ if (vma->vm_pgoff == 0)
+ /* Disallow writable mappings to the consumer pointer,
+ * and allow writable mappings to both the producer
+ * position, and the ring buffer data itself.
+ */
+ return -EPERM;
+ } else {
+ vma->vm_flags &= ~VM_MAYWRITE;
+ }
+ /* remap_vmalloc_range() checks size and offset constraints */
+ return remap_vmalloc_range(vma, rb_map->rb, vma->vm_pgoff + RINGBUF_PGOFF);
+}
+
static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
{
unsigned long cons_pos, prod_pos;
@@ -251,8 +305,13 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
return prod_pos - cons_pos;
}
-static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
- struct poll_table_struct *pts)
+static u32 ringbuf_total_data_sz(const struct bpf_ringbuf *rb)
+{
+ return rb->mask + 1;
+}
+
+static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
+ struct poll_table_struct *pts)
{
struct bpf_ringbuf_map *rb_map;
@@ -264,13 +323,26 @@ static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
return 0;
}
+static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
+ struct poll_table_struct *pts)
+{
+ struct bpf_ringbuf_map *rb_map;
+
+ rb_map = container_of(map, struct bpf_ringbuf_map, map);
+ poll_wait(filp, &rb_map->rb->waitq, pts);
+
+ if (ringbuf_avail_data_sz(rb_map->rb) < ringbuf_total_data_sz(rb_map->rb))
+ return EPOLLOUT | EPOLLWRNORM;
+ return 0;
+}
+
BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
const struct bpf_map_ops ringbuf_map_ops = {
.map_meta_equal = bpf_map_meta_equal,
.map_alloc = ringbuf_map_alloc,
.map_free = ringbuf_map_free,
- .map_mmap = ringbuf_map_mmap,
- .map_poll = ringbuf_map_poll,
+ .map_mmap = ringbuf_map_mmap_kern,
+ .map_poll = ringbuf_map_poll_kern,
.map_lookup_elem = ringbuf_map_lookup_elem,
.map_update_elem = ringbuf_map_update_elem,
.map_delete_elem = ringbuf_map_delete_elem,
@@ -278,6 +350,20 @@ const struct bpf_map_ops ringbuf_map_ops = {
.map_btf_id = &ringbuf_map_btf_ids[0],
};
+BTF_ID_LIST_SINGLE(user_ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
+const struct bpf_map_ops user_ringbuf_map_ops = {
+ .map_meta_equal = bpf_map_meta_equal,
+ .map_alloc = ringbuf_map_alloc,
+ .map_free = ringbuf_map_free,
+ .map_mmap = ringbuf_map_mmap_user,
+ .map_poll = ringbuf_map_poll_user,
+ .map_lookup_elem = ringbuf_map_lookup_elem,
+ .map_update_elem = ringbuf_map_update_elem,
+ .map_delete_elem = ringbuf_map_delete_elem,
+ .map_get_next_key = ringbuf_map_get_next_key,
+ .map_btf_id = &user_ringbuf_map_btf_ids[0],
+};
+
/* Given pointer to ring buffer record metadata and struct bpf_ringbuf itself,
* calculate offset from record metadata to ring buffer in pages, rounded
* down. This page offset is stored as part of record metadata and allows to
@@ -312,7 +398,7 @@ static void *__bpf_ringbuf_reserve(struct bpf_ringbuf *rb, u64 size)
return NULL;
len = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
- if (len > rb->mask + 1)
+ if (len > ringbuf_total_data_sz(rb))
return NULL;
cons_pos = smp_load_acquire(&rb->consumer_pos);
@@ -459,7 +545,7 @@ BPF_CALL_2(bpf_ringbuf_query, struct bpf_map *, map, u64, flags)
case BPF_RB_AVAIL_DATA:
return ringbuf_avail_data_sz(rb);
case BPF_RB_RING_SIZE:
- return rb->mask + 1;
+ return ringbuf_total_data_sz(rb);
case BPF_RB_CONS_POS:
return smp_load_acquire(&rb->consumer_pos);
case BPF_RB_PROD_POS:
@@ -553,3 +639,138 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
.arg1_type = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
.arg2_type = ARG_ANYTHING,
};
+
+static int __bpf_user_ringbuf_peek(struct bpf_ringbuf *rb, void **sample, u32 *size)
+{
+ int err;
+ u32 hdr_len, sample_len, total_len, flags, *hdr;
+ u64 cons_pos, prod_pos;
+
+ /* Synchronizes with smp_store_release() in user-space producer. */
+ prod_pos = smp_load_acquire(&rb->producer_pos);
+ if (prod_pos % 8)
+ return -EINVAL;
+
+ /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_sample_release() */
+ cons_pos = smp_load_acquire(&rb->consumer_pos);
+ if (cons_pos >= prod_pos)
+ return -ENODATA;
+
+ hdr = (u32 *)((uintptr_t)rb->data + (uintptr_t)(cons_pos & rb->mask));
+ /* Synchronizes with smp_store_release() in user-space producer. */
+ hdr_len = smp_load_acquire(hdr);
+ flags = hdr_len & (BPF_RINGBUF_BUSY_BIT | BPF_RINGBUF_DISCARD_BIT);
+ sample_len = hdr_len & ~flags;
+ total_len = round_up(sample_len + BPF_RINGBUF_HDR_SZ, 8);
+
+ /* The sample must fit within the region advertised by the producer position. */
+ if (total_len > prod_pos - cons_pos)
+ return -EINVAL;
+
+ /* The sample must fit within the data region of the ring buffer. */
+ if (total_len > ringbuf_total_data_sz(rb))
+ return -E2BIG;
+
+ /* The sample must fit into a struct bpf_dynptr. */
+ err = bpf_dynptr_check_size(sample_len);
+ if (err)
+ return -E2BIG;
+
+ if (flags & BPF_RINGBUF_DISCARD_BIT) {
+ /* If the discard bit is set, the sample should be skipped.
+ *
+ * Update the consumer pos, and return -EAGAIN so the caller
+ * knows to skip this sample and try to read the next one.
+ */
+ smp_store_release(&rb->consumer_pos, cons_pos + total_len);
+ return -EAGAIN;
+ }
+
+ if (flags & BPF_RINGBUF_BUSY_BIT)
+ return -ENODATA;
+
+ *sample = (void *)((uintptr_t)rb->data +
+ (uintptr_t)((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
+ *size = sample_len;
+ return 0;
+}
+
+static void __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size, u64 flags)
+{
+ u64 consumer_pos;
+ u32 rounded_size = round_up(size + BPF_RINGBUF_HDR_SZ, 8);
+
+ /* Using smp_load_acquire() is unnecessary here, as the busy-bit
+ * prevents another task from writing to consumer_pos after it was read
+ * by this task with smp_load_acquire() in __bpf_user_ringbuf_peek().
+ */
+ consumer_pos = rb->consumer_pos;
+ /* Synchronizes with smp_load_acquire() in user-space producer. */
+ smp_store_release(&rb->consumer_pos, consumer_pos + rounded_size);
+}
+
+BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
+ void *, callback_fn, void *, callback_ctx, u64, flags)
+{
+ struct bpf_ringbuf *rb;
+ long samples, discarded_samples = 0, ret = 0;
+ bpf_callback_t callback = (bpf_callback_t)callback_fn;
+ u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
+ int busy = 0;
+
+ if (unlikely(flags & ~wakeup_flags))
+ return -EINVAL;
+
+ rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
+
+ /* If another consumer is already consuming a sample, wait for them to finish. */
+ if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
+ return -EBUSY;
+
+ for (samples = 0; samples < BPF_MAX_USER_RINGBUF_SAMPLES && ret == 0; samples++) {
+ int err;
+ u32 size;
+ void *sample;
+ struct bpf_dynptr_kern dynptr;
+
+ err = __bpf_user_ringbuf_peek(rb, &sample, &size);
+ if (err) {
+ if (err == -ENODATA) {
+ break;
+ } else if (err == -EAGAIN) {
+ discarded_samples++;
+ continue;
+ } else {
+ ret = err;
+ goto schedule_work_return;
+ }
+ }
+
+ bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL, 0, size);
+ ret = callback((uintptr_t)&dynptr, (uintptr_t)callback_ctx, 0, 0, 0);
+ __bpf_user_ringbuf_sample_release(rb, size, flags);
+ }
+ ret = samples - discarded_samples;
+
+schedule_work_return:
+ /* Prevent the clearing of the busy-bit from being reordered before the
+ * storing of any rb consumer or producer positions.
+ */
+ smp_mb__before_atomic();
+ atomic_set(&rb->busy, 0);
+
+ if (flags & BPF_RB_FORCE_WAKEUP)
+ irq_work_queue(&rb->work);
+ else if (!(flags & BPF_RB_NO_WAKEUP) && samples > 0)
+ irq_work_queue(&rb->work);
+ return ret;
+}
+
+const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
+ .func = bpf_user_ringbuf_drain,
+ .ret_type = RET_INTEGER,
+ .arg1_type = ARG_CONST_MAP_PTR,
+ .arg2_type = ARG_PTR_TO_FUNC,
+ .arg3_type = ARG_PTR_TO_STACK_OR_NULL,
+ .arg4_type = ARG_ANYTHING,
+};