Line data Source code
1 : /*
2 : * Copyright © 2016 Mozilla Foundation
3 : *
4 : * This program is made available under an ISC-style license. See the
5 : * accompanying file LICENSE for details.
6 : */
7 :
8 : #ifndef CUBEB_RING_BUFFER_H
9 : #define CUBEB_RING_BUFFER_H
10 :
11 : #include "cubeb_utils.h"
12 : #include <algorithm>
13 : #include <atomic>
14 : #include <cstdint>
15 : #include <memory>
16 : #include <thread>
17 :
18 : /**
19 : * Single producer single consumer lock-free and wait-free ring buffer.
20 : *
21 : * This data structure allows producing data from one thread, and consuming it on
22 : * another thread, safely and without explicit synchronization. If used on two
23 : * threads, this data structure uses atomics for thread safety. It is possible
24 : * to disable the use of atomics at compile time and only use this data
25 : * structure on one thread.
26 : *
27 : * The role for the producer and the consumer must be constant, i.e., the
28 : * producer should always be on one thread and the consumer should always be on
29 : * another thread.
30 : *
31 : * Some words about the inner workings of this class:
32 : * - Capacity is fixed. Only one allocation is performed, in the constructor.
33 : * When reading and writing, the return value of the method allows checking if
34 : * the ring buffer is empty or full.
35 : * - We always keep the read index at least one element ahead of the write
36 : * index, so we can distinguish between an empty and a full ring buffer: an
37 : * empty ring buffer is when the write index is at the same position as the
38 : * read index. A full buffer is when the write index is exactly one position
39 : * before the read index.
40 : * - We synchronize updates to the read index after having read the data, and
41 : * the write index after having written the data. This means that the each
42 : * thread can only touch a portion of the buffer that is not touched by the
43 : * other thread.
44 : * - Callers are expected to provide buffers. When writing to the queue,
45 : * elements are copied into the internal storage from the buffer passed in.
46 : * When reading from the queue, the user is expected to provide a buffer.
47 : * Because this is a ring buffer, data might not be contiguous in memory,
48 : * providing an external buffer to copy into is an easy way to have linear
49 : * data for further processing.
50 : */
51 : template <typename T>
52 0 : class ring_buffer_base
53 : {
54 : public:
55 : /**
56 : * Constructor for a ring buffer.
57 : *
58 : * This performs an allocation, but is the only allocation that will happen
59 : * for the life time of a `ring_buffer_base`.
60 : *
61 : * @param capacity The maximum number of element this ring buffer will hold.
62 : */
63 0 : ring_buffer_base(int capacity)
64 : /* One more element to distinguish from empty and full buffer. */
65 0 : : capacity_(capacity + 1)
66 : {
67 0 : assert(storage_capacity() <
68 : std::numeric_limits<int>::max() / 2 &&
69 : "buffer too large for the type of index used.");
70 0 : assert(capacity_ > 0);
71 :
72 0 : data_.reset(new T[storage_capacity()]);
73 : /* If this queue is using atomics, initializing those members as the last
74 : * action in the constructor acts as a full barrier, and allow capacity() to
75 : * be thread-safe. */
76 0 : write_index_ = 0;
77 0 : read_index_ = 0;
78 0 : }
79 : /**
80 : * Push `count` zero or default constructed elements in the array.
81 : *
82 : * Only safely called on the producer thread.
83 : *
84 : * @param count The number of elements to enqueue.
85 : * @return The number of element enqueued.
86 : */
87 : int enqueue_default(int count)
88 : {
89 : return enqueue(nullptr, count);
90 : }
91 : /**
92 : * @brief Put an element in the queue
93 : *
94 : * Only safely called on the producer thread.
95 : *
96 : * @param element The element to put in the queue.
97 : *
98 : * @return 1 if the element was inserted, 0 otherwise.
99 : */
100 0 : int enqueue(T& element)
101 : {
102 0 : return enqueue(&element, 1);
103 : }
104 : /**
105 : * Push `count` elements in the ring buffer.
106 : *
107 : * Only safely called on the producer thread.
108 : *
109 : * @param elements a pointer to a buffer containing at least `count` elements.
110 : * If `elements` is nullptr, zero or default constructed elements are enqueued.
111 : * @param count The number of elements to read from `elements`
112 : * @return The number of elements successfully coped from `elements` and inserted
113 : * into the ring buffer.
114 : */
115 0 : int enqueue(T * elements, int count)
116 : {
117 : #ifndef NDEBUG
118 0 : assert_correct_thread(producer_id);
119 : #endif
120 :
121 0 : int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
122 0 : int wr_idx = write_index_.load(std::memory_order::memory_order_relaxed);
123 :
124 0 : if (full_internal(rd_idx, wr_idx)) {
125 0 : return 0;
126 : }
127 :
128 : int to_write =
129 0 : std::min(available_write_internal(rd_idx, wr_idx), count);
130 :
131 : /* First part, from the write index to the end of the array. */
132 0 : int first_part = std::min(storage_capacity() - wr_idx,
133 0 : to_write);
134 : /* Second part, from the beginning of the array */
135 0 : int second_part = to_write - first_part;
136 :
137 0 : if (elements) {
138 0 : Copy(data_.get() + wr_idx, elements, first_part);
139 0 : Copy(data_.get(), elements + first_part, second_part);
140 : } else {
141 0 : ConstructDefault(data_.get() + wr_idx, first_part);
142 0 : ConstructDefault(data_.get(), second_part);
143 : }
144 :
145 0 : write_index_.store(increment_index(wr_idx, to_write), std::memory_order::memory_order_release);
146 :
147 0 : return to_write;
148 : }
149 : /**
150 : * Retrieve at most `count` elements from the ring buffer, and copy them to
151 : * `elements`, if non-null.
152 : *
153 : * Only safely called on the consumer side.
154 : *
155 : * @param elements A pointer to a buffer with space for at least `count`
156 : * elements. If `elements` is `nullptr`, `count` element will be discarded.
157 : * @param count The maximum number of elements to dequeue.
158 : * @return The number of elements written to `elements`.
159 : */
160 0 : int dequeue(T * elements, int count)
161 : {
162 : #ifndef NDEBUG
163 0 : assert_correct_thread(consumer_id);
164 : #endif
165 :
166 0 : int wr_idx = write_index_.load(std::memory_order::memory_order_acquire);
167 0 : int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
168 :
169 0 : if (empty_internal(rd_idx, wr_idx)) {
170 0 : return 0;
171 : }
172 :
173 : int to_read =
174 0 : std::min(available_read_internal(rd_idx, wr_idx), count);
175 :
176 0 : int first_part = std::min(storage_capacity() - rd_idx, to_read);
177 0 : int second_part = to_read - first_part;
178 :
179 0 : if (elements) {
180 0 : Copy(elements, data_.get() + rd_idx, first_part);
181 0 : Copy(elements + first_part, data_.get(), second_part);
182 : }
183 :
184 0 : read_index_.store(increment_index(rd_idx, to_read), std::memory_order::memory_order_relaxed);
185 :
186 0 : return to_read;
187 : }
188 : /**
189 : * Get the number of available element for consuming.
190 : *
191 : * Only safely called on the consumer thread.
192 : *
193 : * @return The number of available elements for reading.
194 : */
195 : int available_read() const
196 : {
197 : #ifndef NDEBUG
198 : assert_correct_thread(consumer_id);
199 : #endif
200 : return available_read_internal(read_index_.load(std::memory_order::memory_order_relaxed),
201 : write_index_.load(std::memory_order::memory_order_relaxed));
202 : }
203 : /**
204 : * Get the number of available elements for consuming.
205 : *
206 : * Only safely called on the producer thread.
207 : *
208 : * @return The number of empty slots in the buffer, available for writing.
209 : */
210 : int available_write() const
211 : {
212 : #ifndef NDEBUG
213 : assert_correct_thread(producer_id);
214 : #endif
215 : return available_write_internal(read_index_.load(std::memory_order::memory_order_relaxed),
216 : write_index_.load(std::memory_order::memory_order_relaxed));
217 : }
218 : /**
219 : * Get the total capacity, for this ring buffer.
220 : *
221 : * Can be called safely on any thread.
222 : *
223 : * @return The maximum capacity of this ring buffer.
224 : */
225 : int capacity() const
226 : {
227 : return storage_capacity() - 1;
228 : }
229 : private:
230 : /** Return true if the ring buffer is empty.
231 : *
232 : * @param read_index the read index to consider
233 : * @param write_index the write index to consider
234 : * @return true if the ring buffer is empty, false otherwise.
235 : **/
236 0 : bool empty_internal(int read_index,
237 : int write_index) const
238 : {
239 0 : return write_index == read_index;
240 : }
241 : /** Return true if the ring buffer is full.
242 : *
243 : * This happens if the write index is exactly one element behind the read
244 : * index.
245 : *
246 : * @param read_index the read index to consider
247 : * @param write_index the write index to consider
248 : * @return true if the ring buffer is full, false otherwise.
249 : **/
250 0 : bool full_internal(int read_index,
251 : int write_index) const
252 : {
253 0 : return (write_index + 1) % storage_capacity() == read_index;
254 : }
255 : /**
256 : * Return the size of the storage. It is one more than the number of elements
257 : * that can be stored in the buffer.
258 : *
259 : * @return the number of elements that can be stored in the buffer.
260 : */
261 0 : int storage_capacity() const
262 : {
263 0 : return capacity_;
264 : }
265 : /**
266 : * Returns the number of elements available for reading.
267 : *
268 : * @return the number of available elements for reading.
269 : */
270 : int
271 0 : available_read_internal(int read_index,
272 : int write_index) const
273 : {
274 0 : if (write_index >= read_index) {
275 0 : return write_index - read_index;
276 : } else {
277 0 : return write_index + storage_capacity() - read_index;
278 : }
279 : }
280 : /**
281 : * Returns the number of empty elements, available for writing.
282 : *
283 : * @return the number of elements that can be written into the array.
284 : */
285 : int
286 0 : available_write_internal(int read_index,
287 : int write_index) const
288 : {
289 : /* We substract one element here to always keep at least one sample
290 : * free in the buffer, to distinguish between full and empty array. */
291 0 : int rv = read_index - write_index - 1;
292 0 : if (write_index >= read_index) {
293 0 : rv += storage_capacity();
294 : }
295 0 : return rv;
296 : }
297 : /**
298 : * Increments an index, wrapping it around the storage.
299 : *
300 : * @param index a reference to the index to increment.
301 : * @param increment the number by which `index` is incremented.
302 : * @return the new index.
303 : */
304 : int
305 0 : increment_index(int index, int increment) const
306 : {
307 0 : assert(increment >= 0);
308 0 : return (index + increment) % storage_capacity();
309 : }
310 : /**
311 : * @brief This allows checking that enqueue (resp. dequeue) are always called
312 : * by the right thread.
313 : *
314 : * @param id the id of the thread that has called the calling method first.
315 : */
316 : #ifndef NDEBUG
317 0 : static void assert_correct_thread(std::thread::id& id)
318 : {
319 0 : if (id == std::thread::id()) {
320 0 : id = std::this_thread::get_id();
321 0 : return;
322 : }
323 0 : assert(id == std::this_thread::get_id());
324 : }
325 : #endif
326 : /** Index at which the oldest element is at, in samples. */
327 : std::atomic<int> read_index_;
328 : /** Index at which to write new elements. `write_index` is always at
329 : * least one element ahead of `read_index_`. */
330 : std::atomic<int> write_index_;
331 : /** Maximum number of elements that can be stored in the ring buffer. */
332 : const int capacity_;
333 : /** Data storage */
334 : std::unique_ptr<T[]> data_;
335 : #ifndef NDEBUG
336 : /** The id of the only thread that is allowed to read from the queue. */
337 : mutable std::thread::id consumer_id;
338 : /** The id of the only thread that is allowed to write from the queue. */
339 : mutable std::thread::id producer_id;
340 : #endif
341 : };
342 :
343 : /**
344 : * Adapter for `ring_buffer_base` that exposes an interface in frames.
345 : */
346 : template <typename T>
347 : class audio_ring_buffer_base
348 : {
349 : public:
350 : /**
351 : * @brief Constructor.
352 : *
353 : * @param channel_count Number of channels.
354 : * @param capacity_in_frames The capacity in frames.
355 : */
356 : audio_ring_buffer_base(int channel_count, int capacity_in_frames)
357 : : channel_count(channel_count)
358 : , ring_buffer(frames_to_samples(capacity_in_frames))
359 : {
360 : assert(channel_count > 0);
361 : }
362 : /**
363 : * @brief Enqueue silence.
364 : *
365 : * Only safely called on the producer thread.
366 : *
367 : * @param frame_count The number of frames of silence to enqueue.
368 : * @return The number of frames of silence actually written to the queue.
369 : */
370 : int enqueue_default(int frame_count)
371 : {
372 : return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
373 : }
374 : /**
375 : * @brief Enqueue `frames_count` frames of audio.
376 : *
377 : * Only safely called from the producer thread.
378 : *
379 : * @param [in] frames If non-null, the frames to enqueue.
380 : * Otherwise, silent frames are enqueued.
381 : * @param frame_count The number of frames to enqueue.
382 : *
383 : * @return The number of frames enqueued
384 : */
385 :
386 : int enqueue(T * frames, int frame_count)
387 : {
388 : return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
389 : }
390 :
391 : /**
392 : * @brief Removes `frame_count` frames from the buffer, and
393 : * write them to `frames` if it is non-null.
394 : *
395 : * Only safely called on the consumer thread.
396 : *
397 : * @param frames If non-null, the frames are copied to `frames`.
398 : * Otherwise, they are dropped.
399 : * @param frame_count The number of frames to remove.
400 : *
401 : * @return The number of frames actually dequeud.
402 : */
403 : int dequeue(T * frames, int frame_count)
404 : {
405 : return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
406 : }
407 : /**
408 : * Get the number of available frames of audio for consuming.
409 : *
410 : * Only safely called on the consumer thread.
411 : *
412 : * @return The number of available frames of audio for reading.
413 : */
414 : int available_read() const
415 : {
416 : return samples_to_frames(ring_buffer.available_read());
417 : }
418 : /**
419 : * Get the number of available frames of audio for consuming.
420 : *
421 : * Only safely called on the producer thread.
422 : *
423 : * @return The number of empty slots in the buffer, available for writing.
424 : */
425 : int available_write() const
426 : {
427 : return samples_to_frames(ring_buffer.available_write());
428 : }
429 : /**
430 : * Get the total capacity, for this ring buffer.
431 : *
432 : * Can be called safely on any thread.
433 : *
434 : * @return The maximum capacity of this ring buffer.
435 : */
436 : int capacity() const
437 : {
438 : return samples_to_frames(ring_buffer.capacity());
439 : }
440 : private:
441 : /**
442 : * @brief Frames to samples conversion.
443 : *
444 : * @param frames The number of frames.
445 : *
446 : * @return A number of samples.
447 : */
448 : int frames_to_samples(int frames) const
449 : {
450 : return frames * channel_count;
451 : }
452 : /**
453 : * @brief Samples to frames conversion.
454 : *
455 : * @param samples The number of samples.
456 : *
457 : * @return A number of frames.
458 : */
459 : int samples_to_frames(int samples) const
460 : {
461 : return samples / channel_count;
462 : }
463 : /** Number of channels of audio that will stream through this ring buffer. */
464 : int channel_count;
465 : /** The underlying ring buffer that is used to store the data. */
466 : ring_buffer_base<T> ring_buffer;
467 : };
468 :
469 : /**
470 : * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
471 : * from two threads, one producer, one consumer (that never change role),
472 : * without explicit synchronization.
473 : */
474 : template<typename T>
475 : using lock_free_queue = ring_buffer_base<T>;
476 : /**
477 : * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
478 : * from two threads, one producer, one consumer (that never change role),
479 : * without explicit synchronization.
480 : */
481 : template<typename T>
482 : using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
483 :
484 : #endif // CUBEB_RING_BUFFER_H
|