Rewrite RingBuffer with standard atomics...

... Contrary to the old comments, this class was no longer thread safe with
multi-core, because of the possibility of out-of-order reads and writes.

Use the minimal necessary memory ordering, not the default and maybe expensive
std::memory_order_seq_cst

At least one clicky recording has been seen where many small groups of
samples, a common power in two in size, seem to get displaced rightward.

I suspect out of order reads and writes might have caused that and this commit
might prevent it.
This commit is contained in:
Paul Licameli 2018-06-19 13:23:54 -04:00
parent 58fa401faf
commit 22f48d31e1
2 changed files with 74 additions and 24 deletions

View File

@ -5,16 +5,19 @@
RingBuffer.cpp
Dominic Mazzoni
Paul Licameli
*******************************************************************//*!
\class RingBuffer
\brief Holds streamed audio samples.
This class is thread-safe, assuming that there is only one
thread writing, and one thread reading. If two threads both
need to read, or both need to write, they need to lock this
class from outside using their own mutex.
Assuming that there is only one thread writing, and one thread reading,
this class implements a lock-free thread-safe bounded queue of samples
with atomic variables that contain the first filled and free positions.
If two threads both need to read, or both need to write, they need to lock
this class from outside using their own mutex.
AvailForPut and AvailForGet may underestimate but will never
overestimate.
@ -35,27 +38,46 @@ RingBuffer::~RingBuffer()
{
}
size_t RingBuffer::Len()
// Calculations of free and filled space, given snapshots taken of the start
// and end values
size_t RingBuffer::Filled( size_t start, size_t end )
{
return (mEnd + mBufferSize - mStart) % mBufferSize;
return (end + mBufferSize - start) % mBufferSize;
}
size_t RingBuffer::Free( size_t start, size_t end )
{
return std::max<size_t>(mBufferSize - Filled( start, end ), 4) - 4;
}
//
// For the writer only:
// Only writer writes the end, so it can read it again relaxed
// And it reads the start written by reader, but reader sends no other
// information needing to synchronize with the start, so relaxed memory order
// is good there too
//
size_t RingBuffer::AvailForPut()
{
return std::max<size_t>(mBufferSize - Len(), 4) - 4;
auto start = mStart.load( std::memory_order_relaxed );
auto end = mEnd.load( std::memory_order_relaxed );
return Free( start, end );
// Reader might increase the available free space after return, but will
// never decrease it, so writer can safely assume this much at least
}
size_t RingBuffer::Put(samplePtr buffer, sampleFormat format,
size_t samplesToCopy)
{
samplesToCopy = std::min( samplesToCopy, AvailForPut() );
auto start = mStart.load( std::memory_order_relaxed );
auto end = mEnd.load( std::memory_order_relaxed );
samplesToCopy = std::min( samplesToCopy, Free( start, end ) );
auto src = buffer;
size_t copied = 0;
auto pos = mEnd;
auto pos = end;
while(samplesToCopy) {
auto block = std::min( samplesToCopy, mBufferSize - pos );
@ -70,16 +92,20 @@ size_t RingBuffer::Put(samplePtr buffer, sampleFormat format,
copied += block;
}
mEnd = pos;
// Atomically update the end pointer with release, so the nonatomic writes
// just done to the buffer don't get reordered after
mEnd.store(pos, std::memory_order_release);
return copied;
}
size_t RingBuffer::Clear(sampleFormat format, size_t samplesToClear)
{
samplesToClear = std::min( samplesToClear, AvailForPut() );
auto start = mStart.load( std::memory_order_relaxed );
auto end = mEnd.load( std::memory_order_relaxed );
samplesToClear = std::min( samplesToClear, Free( start, end ) );
size_t cleared = 0;
auto pos = mEnd;
auto pos = end;
while(samplesToClear) {
auto block = std::min( samplesToClear, mBufferSize - pos );
@ -91,48 +117,70 @@ size_t RingBuffer::Clear(sampleFormat format, size_t samplesToClear)
cleared += block;
}
mEnd = pos;
// Atomically update the end pointer with release, so the nonatomic writes
// just done to the buffer don't get reordered after
mEnd.store(pos, std::memory_order_release);
return cleared;
}
//
// For the reader only:
// Only reader writes the start, so it can read it again relaxed
// But it reads the end written by the writer, who also sends sample data
// with the changes of end; therefore that must be read with acquire order
// if we do more than merely query the size or throw samples away
//
size_t RingBuffer::AvailForGet()
{
return Len();
auto end = mEnd.load( std::memory_order_relaxed ); // get away with it here
auto start = mStart.load( std::memory_order_relaxed );
return Filled( start, end );
// Writer might increase the available samples after return, but will
// never decrease them, so reader can safely assume this much at least
}
size_t RingBuffer::Get(samplePtr buffer, sampleFormat format,
size_t samplesToCopy)
{
samplesToCopy = std::min( samplesToCopy, Len() );
// Must match the writer's release with acquire for well defined reads of
// the buffer
auto end = mEnd.load( std::memory_order_acquire );
auto start = mStart.load( std::memory_order_relaxed );
samplesToCopy = std::min( samplesToCopy, Filled( start, end ) );
auto dest = buffer;
size_t copied = 0;
while(samplesToCopy) {
auto block = std::min( samplesToCopy, mBufferSize - mStart );
auto block = std::min( samplesToCopy, mBufferSize - start );
CopySamples(mBuffer.ptr() + mStart * SAMPLE_SIZE(mFormat), mFormat,
CopySamples(mBuffer.ptr() + start * SAMPLE_SIZE(mFormat), mFormat,
dest, format,
block);
dest += block * SAMPLE_SIZE(format);
mStart = (mStart + block) % mBufferSize;
start = (start + block) % mBufferSize;
samplesToCopy -= block;
copied += block;
}
// Communicate to writer that we have consumed some data, and that's all
mStart.store( start, std::memory_order_relaxed );
return copied;
}
size_t RingBuffer::Discard(size_t samplesToDiscard)
{
samplesToDiscard = std::min( samplesToDiscard, Len() );
auto end = mEnd.load( std::memory_order_relaxed ); // get away with it here
auto start = mStart.load( std::memory_order_relaxed );
samplesToDiscard = std::min( samplesToDiscard, Filled( start, end ) );
mStart = (mStart + samplesToDiscard) % mBufferSize;
// Communicate to writer that we have consumed some data, and that's all
mStart.store((start + samplesToDiscard) % mBufferSize,
std::memory_order_relaxed);
return samplesToDiscard;
}

View File

@ -12,6 +12,7 @@
#define __AUDACITY_RING_BUFFER__
#include "SampleFormat.h"
#include <atomic>
class RingBuffer {
public:
@ -35,12 +36,13 @@ class RingBuffer {
size_t Discard(size_t samples);
private:
size_t Len();
size_t Filled( size_t start, size_t end );
size_t Free( size_t start, size_t end );
sampleFormat mFormat;
size_t mStart { 0 };
size_t mEnd { 0 };
size_t mBufferSize;
std::atomic<size_t> mStart { 0 };
std::atomic<size_t> mEnd { 0 };
const size_t mBufferSize;
SampleBuffer mBuffer;
};