aboutsummaryrefslogtreecommitdiffstatshomepage
path: root/src/mpmc_ptr_ring.h
blob: 09d56df8b543398c42f46101b87f3be247be5379 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
/* SPDX-License-Identifier: GPL-2.0
 *
 * Copyright (C) 2018 Jonathan Neuschäfer
 * Copyright (C) 2018 Thomas Gschwantner <tharre3@gmail.com>. All Rights Reserved.
 */

#ifndef MPMC_RING_PTR_H
#define MPMC_RING_PTR_H

/*
 * This is an implementation of a Multi-Producer/Multi-Consumer (MPMC) queue,
 * strongly inspired by ConcurrencyKit[1], and Linux's own ptr_ring.h.
 *
 *
 *              +-----------------------------------------------+
 *        index | 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|10|11|12|13|14|15|
 *        state |--|--|--|**|**|**|**|**|**|**|++|++|++|--|--|--|
 *              +-----------------------------------------------+
 *                        ^                    ^        ^
 *                        consumer head        |        producer head
 *                                             producer tail
 * Possible states:
 *
 *  -- : unoccupied
 *  ++ : being written
 *  ** : occupied
 *
 * Differences between ptr_ring.h and this implementation:
 * - An additional producer tail pointer, which allows multiple enqueue
 *   operations to be in progress at the same time.
 * - No consumer tail pointer, for simplicity (although I expect it can be
 *   added later)
 * - Most importantly: No spinlocks.
 * - The head/tail pointers (or rather: indices) are stored untrimmed, i.e.
 *   without the bit mask (size - 1) applied, because that's how ConcurrencyKit
 *   does it.
 *
 * [1]: https://github.com/concurrencykit/ck/blob/master/include/ck_ring.h
 */

struct mpmc_ptr_ring {
	/* Read-mostly data */
	void **queue;
	unsigned int size;
	unsigned int mask;

	/* consumer_head: updated in _consume; read in _produce */
	atomic_t consumer_head ____cacheline_aligned_in_smp;

	/* producer_{head,tail}: updated in _produce */
	atomic_t producer_head ____cacheline_aligned_in_smp;
	atomic_t producer_tail;
};

static inline bool mpmc_ptr_ring_empty(struct mpmc_ptr_ring *r)
{
	unsigned int ptail, chead;

	/* Order the following reads against earlier stuff */
	smp_rmb();

	ptail = atomic_read(&r->producer_tail);
	chead = atomic_read(&r->consumer_head);

	return chead == ptail;
}

static inline int mpmc_ptr_ring_produce(struct mpmc_ptr_ring *r, void *ptr)
{
	unsigned int p, c;
	unsigned int mask = r->mask;

	p = atomic_read(&r->producer_head);

	for (;;) {
		smp_rmb();	 /* TODO */
		c = atomic_read(&r->consumer_head);

		if ((p - c) < mask) { /* fast path */
			if (atomic_cmpxchg(&r->producer_head, p, p + 1) == p)
				break;
		} else {
			unsigned int new_p;

			smp_rmb();
			new_p = atomic_read(&r->producer_head);

			if (new_p == p)
				return -ENOSPC;

			p = new_p;
		}
	}

	WRITE_ONCE(r->queue[p & mask], ptr);

	/* Wait until it's our term to update the producer tail pointer */
	while(atomic_read(&r->producer_tail) != p)
		cpu_relax();

	/*
	 * Make sure the WRITE_ONCE above becomes visible before producer_tail
	 * is updated.
	 */
	smp_wmb();
	atomic_set(&r->producer_tail, p + 1);

	return 0;
}

static inline void *mpmc_ptr_ring_consume(struct mpmc_ptr_ring *r)
{
	unsigned int c, p, old_c;
	unsigned int mask = r->mask;
	void *element;

	for (;;) {
		c = atomic_read(&r->consumer_head);

		/* Fetch consumer_head first. */
		smp_rmb();

		p = atomic_read(&r->producer_tail);

		/* Is the ring empty? */
		if (p == c)
			return NULL;

		element = READ_ONCE(r->queue[c & mask]);

		/* TODO: Why? */
		smp_rmb();

		old_c = atomic_cmpxchg(&r->consumer_head, c, c + 1);
		if (old_c == c)
			break;
	}

	return element;
}

/*
 * Warning: size must be greater than the number of concurrent consumers
 */
static inline int mpmc_ptr_ring_init(struct mpmc_ptr_ring *r, unsigned int size, gfp_t gfp)
{
	if (WARN_ONCE(!is_power_of_2(size), "size must be a power of two"))
		return -EINVAL;

	r->size = size;
	r->mask = size - 1;
	atomic_set(&r->consumer_head, 0);
	atomic_set(&r->producer_head, 0);
	atomic_set(&r->producer_tail, 0);

	r->queue = kcalloc(size, sizeof(r->queue[0]), gfp);
	if (!r->queue)
		return -ENOMEM;

	return 0;
}

static inline void mpmc_ptr_ring_cleanup(struct mpmc_ptr_ring *r, void (*destroy)(void *))
{
	void *ptr;

	if (destroy)
		while ((ptr = mpmc_ptr_ring_consume(r)))
			destroy(ptr);
	kfree(r->queue);
}

/**
 * __mpmc_ptr_ring_peek - Read the first element in an MPMC ring buffer
 *
 * @r: The ring buffer
 *
 * Note that this function should only be called in single-consumer situations.
 */
static inline void *__mpmc_ptr_ring_peek(struct mpmc_ptr_ring *r)
{
	unsigned int c, p;
	unsigned int mask = r->mask;
	void *element;

	c = atomic_read(&r->consumer_head);

	/* Fetch consumer_head first */
	smp_rmb();

	p = atomic_read(&r->producer_tail);

	if (c == p)
		return NULL;

	/* TODO */
	smp_rmb();

	element = READ_ONCE(r->queue[c & mask]);

	return element;
}

/**
 * __mpmc_ptr_ring_discard_one - Discard the first element in an MPMC ring buffer
 *
 * @r: The ring buffer
 *
 * Note that this function should only be called in single-consumer situations.
 */
static inline void __mpmc_ptr_ring_discard_one(struct mpmc_ptr_ring *r)
{
	smp_mb__before_atomic();
	atomic_inc(&r->consumer_head);
}

#endif /* MPMC_RING_PTR_H */