LCOV - code coverage report
Current view: top level - media/libcubeb/src - cubeb_ringbuffer.h (source / functions) Hit Total Coverage
Test: output.info Lines: 0 65 0.0 %
Date: 2017-07-14 16:53:18 Functions: 0 12 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * Copyright © 2016 Mozilla Foundation
       3             :  *
       4             :  * This program is made available under an ISC-style license.  See the
       5             :  * accompanying file LICENSE for details.
       6             :  */
       7             : 
       8             : #ifndef CUBEB_RING_BUFFER_H
       9             : #define CUBEB_RING_BUFFER_H
      10             : 
      11             : #include "cubeb_utils.h"
      12             : #include <algorithm>
      13             : #include <atomic>
      14             : #include <cstdint>
      15             : #include <memory>
      16             : #include <thread>
      17             : 
      18             : /**
      19             :  * Single producer single consumer lock-free and wait-free ring buffer.
      20             :  *
      21             :  * This data structure allows producing data from one thread, and consuming it on
      22             :  * another thread, safely and without explicit synchronization. If used on two
      23             :  * threads, this data structure uses atomics for thread safety. It is possible
      24             :  * to disable the use of atomics at compile time and only use this data
      25             :  * structure on one thread.
      26             :  *
      27             :  * The role for the producer and the consumer must be constant, i.e., the
      28             :  * producer should always be on one thread and the consumer should always be on
      29             :  * another thread.
      30             :  *
      31             :  * Some words about the inner workings of this class:
      32             :  * - Capacity is fixed. Only one allocation is performed, in the constructor.
      33             :  *   When reading and writing, the return value of the method allows checking if
      34             :  *   the ring buffer is empty or full.
      35             :  * - We always keep the read index at least one element ahead of the write
      36             :  *   index, so we can distinguish between an empty and a full ring buffer: an
      37             :  *   empty ring buffer is when the write index is at the same position as the
      38             :  *   read index. A full buffer is when the write index is exactly one position
      39             :  *   before the read index.
      40             :  * - We synchronize updates to the read index after having read the data, and
      41             :  *   the write index after having written the data. This means that the each
      42             :  *   thread can only touch a portion of the buffer that is not touched by the
      43             :  *   other thread.
      44             :  * - Callers are expected to provide buffers. When writing to the queue,
      45             :  *   elements are copied into the internal storage from the buffer passed in.
      46             :  *   When reading from the queue, the user is expected to provide a buffer.
      47             :  *   Because this is a ring buffer, data might not be contiguous in memory,
      48             :  *   providing an external buffer to copy into is an easy way to have linear
      49             :  *   data for further processing.
      50             :  */
      51             : template <typename T>
      52           0 : class ring_buffer_base
      53             : {
      54             : public:
      55             :   /**
      56             :    * Constructor for a ring buffer.
      57             :    *
      58             :    * This performs an allocation, but is the only allocation that will happen
      59             :    * for the life time of a `ring_buffer_base`.
      60             :    *
      61             :    * @param capacity The maximum number of element this ring buffer will hold.
      62             :    */
      63           0 :   ring_buffer_base(int capacity)
      64             :     /* One more element to distinguish from empty and full buffer. */
      65           0 :     : capacity_(capacity + 1)
      66             :   {
      67           0 :     assert(storage_capacity() <
      68             :            std::numeric_limits<int>::max() / 2 &&
      69             :            "buffer too large for the type of index used.");
      70           0 :     assert(capacity_ > 0);
      71             : 
      72           0 :     data_.reset(new T[storage_capacity()]);
      73             :     /* If this queue is using atomics, initializing those members as the last
      74             :      * action in the constructor acts as a full barrier, and allow capacity() to
      75             :      * be thread-safe. */
      76           0 :     write_index_ = 0;
      77           0 :     read_index_ = 0;
      78           0 :   }
      79             :   /**
      80             :    * Push `count` zero or default constructed elements in the array.
      81             :    *
      82             :    * Only safely called on the producer thread.
      83             :    *
      84             :    * @param count The number of elements to enqueue.
      85             :    * @return The number of element enqueued.
      86             :    */
      87             :   int enqueue_default(int count)
      88             :   {
      89             :     return enqueue(nullptr, count);
      90             :   }
      91             :   /**
      92             :    * @brief Put an element in the queue
      93             :    *
      94             :    * Only safely called on the producer thread.
      95             :    *
      96             :    * @param element The element to put in the queue.
      97             :    *
      98             :    * @return 1 if the element was inserted, 0 otherwise.
      99             :    */
     100           0 :   int enqueue(T& element)
     101             :   {
     102           0 :     return enqueue(&element, 1);
     103             :   }
     104             :   /**
     105             :    * Push `count` elements in the ring buffer.
     106             :    *
     107             :    * Only safely called on the producer thread.
     108             :    *
     109             :    * @param elements a pointer to a buffer containing at least `count` elements.
     110             :    * If `elements` is nullptr, zero or default constructed elements are enqueued.
     111             :    * @param count The number of elements to read from `elements`
     112             :    * @return The number of elements successfully coped from `elements` and inserted
     113             :    * into the ring buffer.
     114             :    */
     115           0 :   int enqueue(T * elements, int count)
     116             :   {
     117             : #ifndef NDEBUG
     118           0 :     assert_correct_thread(producer_id);
     119             : #endif
     120             : 
     121           0 :     int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
     122           0 :     int wr_idx = write_index_.load(std::memory_order::memory_order_relaxed);
     123             : 
     124           0 :     if (full_internal(rd_idx, wr_idx)) {
     125           0 :       return 0;
     126             :     }
     127             : 
     128             :     int to_write =
     129           0 :       std::min(available_write_internal(rd_idx, wr_idx), count);
     130             : 
     131             :     /* First part, from the write index to the end of the array. */
     132           0 :     int first_part = std::min(storage_capacity() - wr_idx,
     133           0 :                                           to_write);
     134             :     /* Second part, from the beginning of the array */
     135           0 :     int second_part = to_write - first_part;
     136             : 
     137           0 :     if (elements) {
     138           0 :       Copy(data_.get() + wr_idx, elements, first_part);
     139           0 :       Copy(data_.get(), elements + first_part, second_part);
     140             :     } else {
     141           0 :       ConstructDefault(data_.get() + wr_idx, first_part);
     142           0 :       ConstructDefault(data_.get(), second_part);
     143             :     }
     144             : 
     145           0 :     write_index_.store(increment_index(wr_idx, to_write), std::memory_order::memory_order_release);
     146             : 
     147           0 :     return to_write;
     148             :   }
     149             :   /**
     150             :    * Retrieve at most `count` elements from the ring buffer, and copy them to
     151             :    * `elements`, if non-null.
     152             :    *
     153             :    * Only safely called on the consumer side.
     154             :    *
     155             :    * @param elements A pointer to a buffer with space for at least `count`
     156             :    * elements. If `elements` is `nullptr`, `count` element will be discarded.
     157             :    * @param count The maximum number of elements to dequeue.
     158             :    * @return The number of elements written to `elements`.
     159             :    */
     160           0 :   int dequeue(T * elements, int count)
     161             :   {
     162             : #ifndef NDEBUG
     163           0 :     assert_correct_thread(consumer_id);
     164             : #endif
     165             : 
     166           0 :     int wr_idx = write_index_.load(std::memory_order::memory_order_acquire);
     167           0 :     int rd_idx = read_index_.load(std::memory_order::memory_order_relaxed);
     168             : 
     169           0 :     if (empty_internal(rd_idx, wr_idx)) {
     170           0 :       return 0;
     171             :     }
     172             : 
     173             :     int to_read =
     174           0 :       std::min(available_read_internal(rd_idx, wr_idx), count);
     175             : 
     176           0 :     int first_part = std::min(storage_capacity() - rd_idx, to_read);
     177           0 :     int second_part = to_read - first_part;
     178             : 
     179           0 :     if (elements) {
     180           0 :       Copy(elements, data_.get() + rd_idx, first_part);
     181           0 :       Copy(elements + first_part, data_.get(), second_part);
     182             :     }
     183             : 
     184           0 :     read_index_.store(increment_index(rd_idx, to_read), std::memory_order::memory_order_relaxed);
     185             : 
     186           0 :     return to_read;
     187             :   }
     188             :   /**
     189             :    * Get the number of available element for consuming.
     190             :    *
     191             :    * Only safely called on the consumer thread.
     192             :    *
     193             :    * @return The number of available elements for reading.
     194             :    */
     195             :   int available_read() const
     196             :   {
     197             : #ifndef NDEBUG
     198             :     assert_correct_thread(consumer_id);
     199             : #endif
     200             :     return available_read_internal(read_index_.load(std::memory_order::memory_order_relaxed),
     201             :                                    write_index_.load(std::memory_order::memory_order_relaxed));
     202             :   }
     203             :   /**
     204             :    * Get the number of available elements for consuming.
     205             :    *
     206             :    * Only safely called on the producer thread.
     207             :    *
     208             :    * @return The number of empty slots in the buffer, available for writing.
     209             :    */
     210             :   int available_write() const
     211             :   {
     212             : #ifndef NDEBUG
     213             :     assert_correct_thread(producer_id);
     214             : #endif
     215             :     return available_write_internal(read_index_.load(std::memory_order::memory_order_relaxed),
     216             :                                     write_index_.load(std::memory_order::memory_order_relaxed));
     217             :   }
     218             :   /**
     219             :    * Get the total capacity, for this ring buffer.
     220             :    *
     221             :    * Can be called safely on any thread.
     222             :    *
     223             :    * @return The maximum capacity of this ring buffer.
     224             :    */
     225             :   int capacity() const
     226             :   {
     227             :     return storage_capacity() - 1;
     228             :   }
     229             : private:
     230             :   /** Return true if the ring buffer is empty.
     231             :    *
     232             :    * @param read_index the read index to consider
     233             :    * @param write_index the write index to consider
     234             :    * @return true if the ring buffer is empty, false otherwise.
     235             :    **/
     236           0 :   bool empty_internal(int read_index,
     237             :                       int write_index) const
     238             :   {
     239           0 :     return write_index == read_index;
     240             :   }
     241             :   /** Return true if the ring buffer is full.
     242             :    *
     243             :    * This happens if the write index is exactly one element behind the read
     244             :    * index.
     245             :    *
     246             :    * @param read_index the read index to consider
     247             :    * @param write_index the write index to consider
     248             :    * @return true if the ring buffer is full, false otherwise.
     249             :    **/
     250           0 :   bool full_internal(int read_index,
     251             :                      int write_index) const
     252             :   {
     253           0 :     return (write_index + 1) % storage_capacity() == read_index;
     254             :   }
     255             :   /**
     256             :    * Return the size of the storage. It is one more than the number of elements
     257             :    * that can be stored in the buffer.
     258             :    *
     259             :    * @return the number of elements that can be stored in the buffer.
     260             :    */
     261           0 :   int storage_capacity() const
     262             :   {
     263           0 :     return capacity_;
     264             :   }
     265             :   /**
     266             :    * Returns the number of elements available for reading.
     267             :    *
     268             :    * @return the number of available elements for reading.
     269             :    */
     270             :   int
     271           0 :   available_read_internal(int read_index,
     272             :                           int write_index) const
     273             :   {
     274           0 :     if (write_index >= read_index) {
     275           0 :       return write_index - read_index;
     276             :     } else {
     277           0 :       return write_index + storage_capacity() - read_index;
     278             :     }
     279             :   }
     280             :   /**
     281             :    * Returns the number of empty elements, available for writing.
     282             :    *
     283             :    * @return the number of elements that can be written into the array.
     284             :    */
     285             :   int
     286           0 :   available_write_internal(int read_index,
     287             :                            int write_index) const
     288             :   {
     289             :     /* We substract one element here to always keep at least one sample
     290             :      * free in the buffer, to distinguish between full and empty array. */
     291           0 :     int rv = read_index - write_index - 1;
     292           0 :     if (write_index >= read_index) {
     293           0 :       rv += storage_capacity();
     294             :     }
     295           0 :     return rv;
     296             :   }
     297             :   /**
     298             :    * Increments an index, wrapping it around the storage.
     299             :    *
     300             :    * @param index a reference to the index to increment.
     301             :    * @param increment the number by which `index` is incremented.
     302             :    * @return the new index.
     303             :    */
     304             :   int
     305           0 :   increment_index(int index, int increment) const
     306             :   {
     307           0 :     assert(increment >= 0);
     308           0 :     return (index + increment) % storage_capacity();
     309             :   }
     310             :   /**
     311             :    * @brief This allows checking that enqueue (resp. dequeue) are always called
     312             :    * by the right thread.
     313             :    *
     314             :    * @param id the id of the thread that has called the calling method first.
     315             :    */
     316             : #ifndef NDEBUG
     317           0 :   static void assert_correct_thread(std::thread::id& id)
     318             :   {
     319           0 :     if (id == std::thread::id()) {
     320           0 :       id = std::this_thread::get_id();
     321           0 :       return;
     322             :     }
     323           0 :     assert(id == std::this_thread::get_id());
     324             :   }
     325             : #endif
     326             :   /** Index at which the oldest element is at, in samples. */
     327             :   std::atomic<int> read_index_;
     328             :   /** Index at which to write new elements. `write_index` is always at
     329             :    * least one element ahead of `read_index_`. */
     330             :   std::atomic<int> write_index_;
     331             :   /** Maximum number of elements that can be stored in the ring buffer. */
     332             :   const int capacity_;
     333             :   /** Data storage */
     334             :   std::unique_ptr<T[]> data_;
     335             : #ifndef NDEBUG
     336             :   /** The id of the only thread that is allowed to read from the queue. */
     337             :   mutable std::thread::id consumer_id;
     338             :   /** The id of the only thread that is allowed to write from the queue. */
     339             :   mutable std::thread::id producer_id;
     340             : #endif
     341             : };
     342             : 
     343             : /**
     344             :  * Adapter for `ring_buffer_base` that exposes an interface in frames.
     345             :  */
     346             : template <typename T>
     347             : class audio_ring_buffer_base
     348             : {
     349             : public:
     350             :   /**
     351             :    * @brief Constructor.
     352             :    *
     353             :    * @param channel_count       Number of channels.
     354             :    * @param capacity_in_frames  The capacity in frames.
     355             :    */
     356             :   audio_ring_buffer_base(int channel_count, int capacity_in_frames)
     357             :     : channel_count(channel_count)
     358             :     , ring_buffer(frames_to_samples(capacity_in_frames))
     359             :   {
     360             :     assert(channel_count > 0);
     361             :   }
     362             :   /**
     363             :    * @brief Enqueue silence.
     364             :    *
     365             :    * Only safely called on the producer thread.
     366             :    *
     367             :    * @param frame_count The number of frames of silence to enqueue.
     368             :    * @return  The number of frames of silence actually written to the queue.
     369             :    */
     370             :   int enqueue_default(int frame_count)
     371             :   {
     372             :     return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
     373             :   }
     374             :   /**
     375             :    * @brief Enqueue `frames_count` frames of audio.
     376             :    *
     377             :    * Only safely called from the producer thread.
     378             :    *
     379             :    * @param [in] frames If non-null, the frames to enqueue.
     380             :    *                    Otherwise, silent frames are enqueued.
     381             :    * @param frame_count The number of frames to enqueue.
     382             :    *
     383             :    * @return The number of frames enqueued
     384             :    */
     385             : 
     386             :   int enqueue(T * frames, int frame_count)
     387             :   {
     388             :     return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
     389             :   }
     390             : 
     391             :   /**
     392             :    * @brief Removes `frame_count` frames from the buffer, and
     393             :    *        write them to `frames` if it is non-null.
     394             :    *
     395             :    * Only safely called on the consumer thread.
     396             :    *
     397             :    * @param frames      If non-null, the frames are copied to `frames`.
     398             :    *                    Otherwise, they are dropped.
     399             :    * @param frame_count The number of frames to remove.
     400             :    *
     401             :    * @return  The number of frames actually dequeud.
     402             :    */
     403             :   int dequeue(T * frames, int frame_count)
     404             :   {
     405             :     return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
     406             :   }
     407             :   /**
     408             :    * Get the number of available frames of audio for consuming.
     409             :    *
     410             :    * Only safely called on the consumer thread.
     411             :    *
     412             :    * @return The number of available frames of audio for reading.
     413             :    */
     414             :   int available_read() const
     415             :   {
     416             :     return samples_to_frames(ring_buffer.available_read());
     417             :   }
     418             :   /**
     419             :    * Get the number of available frames of audio for consuming.
     420             :    *
     421             :    * Only safely called on the producer thread.
     422             :    *
     423             :    * @return The number of empty slots in the buffer, available for writing.
     424             :    */
     425             :   int available_write() const
     426             :   {
     427             :     return samples_to_frames(ring_buffer.available_write());
     428             :   }
     429             :   /**
     430             :    * Get the total capacity, for this ring buffer.
     431             :    *
     432             :    * Can be called safely on any thread.
     433             :    *
     434             :    * @return The maximum capacity of this ring buffer.
     435             :    */
     436             :   int capacity() const
     437             :   {
     438             :     return samples_to_frames(ring_buffer.capacity());
     439             :   }
     440             : private:
     441             :   /**
     442             :    * @brief Frames to samples conversion.
     443             :    *
     444             :    * @param frames The number of frames.
     445             :    *
     446             :    * @return  A number of samples.
     447             :    */
     448             :   int frames_to_samples(int frames) const
     449             :   {
     450             :     return frames * channel_count;
     451             :   }
     452             :   /**
     453             :    * @brief Samples to frames conversion.
     454             :    *
     455             :    * @param samples The number of samples.
     456             :    *
     457             :    * @return  A number of frames.
     458             :    */
     459             :   int samples_to_frames(int samples) const
     460             :   {
     461             :     return samples / channel_count;
     462             :   }
     463             :   /** Number of channels of audio that will stream through this ring buffer. */
     464             :   int channel_count;
     465             :   /** The underlying ring buffer that is used to store the data. */
     466             :   ring_buffer_base<T> ring_buffer;
     467             : };
     468             : 
     469             : /**
     470             :  * Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
     471             :  * from two threads, one producer, one consumer (that never change role),
     472             :  * without explicit synchronization.
     473             :  */
     474             : template<typename T>
     475             : using lock_free_queue = ring_buffer_base<T>;
     476             : /**
     477             :  * Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
     478             :  * from two threads, one producer, one consumer (that never change role),
     479             :  * without explicit synchronization.
     480             :  */
     481             : template<typename T>
     482             : using lock_free_audio_ring_buffer = audio_ring_buffer_base<T>;
     483             : 
     484             : #endif // CUBEB_RING_BUFFER_H

Generated by: LCOV version 1.13