aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authornyorain <[email protected]>2020-10-10 22:51:46 +0200
committerPaul Adenot <[email protected]>2020-10-28 16:55:14 +0000
commit9183f2e654e6e58e534b87490eb646236ecb8df7 (patch)
tree942270aa83e6fcc0378a2c0b4a1e8f0e1edf3adc /src
parent9905226379329bf95ce6b761eceae46dfe429d9a (diff)
downloadcubeb-9183f2e654e6e58e534b87490eb646236ecb8df7.tar.gz
cubeb-9183f2e654e6e58e534b87490eb646236ecb8df7.zip
Move aaudio backend to C++ & enable low latency
Diffstat (limited to 'src')
-rw-r--r--src/cubeb_aaudio.cpp (renamed from src/cubeb_aaudio.c)847
1 files changed, 408 insertions, 439 deletions
diff --git a/src/cubeb_aaudio.c b/src/cubeb_aaudio.cpp
index 513af5e..3246d55 100644
--- a/src/cubeb_aaudio.c
+++ b/src/cubeb_aaudio.cpp
@@ -4,19 +4,23 @@
* This program is made available under an ISC-style license. See the
* accompanying file LICENSE for details.
*/
-#include <assert.h>
+#include <cassert>
+#include <mutex>
+#include <condition_variable>
+#include <thread>
+#include <cstring>
+#include <atomic>
+#include <cstring>
+#include <chrono>
+#include <memory>
#include <dlfcn.h>
-#include <pthread.h>
-#include <errno.h>
#include <time.h>
-#include <stdatomic.h>
#include <aaudio/AAudio.h>
#include "cubeb/cubeb.h"
#include "cubeb-internal.h"
#include "cubeb_resampler.h"
#include "cubeb_log.h"
-
#ifdef DISABLE_LIBAAUDIO_DLOPEN
#define WRAP(x) x
#else
@@ -77,67 +81,69 @@
// X(AAudioStream_getSessionId) \
//
-#define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x;
+#define MAKE_TYPEDEF(x) static decltype(x) * cubeb_##x;
LIBAAUDIO_API_VISIT(MAKE_TYPEDEF)
#undef MAKE_TYPEDEF
#endif
#define MAX_STREAMS 16
-static const struct cubeb_ops aaudio_ops;
-
-enum stream_state {
- STREAM_STATE_INIT = 0,
- STREAM_STATE_STOPPED,
- STREAM_STATE_STOPPING,
- STREAM_STATE_STARTED,
- STREAM_STATE_STARTING,
- STREAM_STATE_DRAINING,
- STREAM_STATE_ERROR,
- STREAM_STATE_SHUTDOWN,
+
+using unique_lock = std::unique_lock<std::mutex>;
+using lock_guard = std::lock_guard<std::mutex>;
+
+enum class stream_state {
+ init = 0,
+ stopped,
+ stopping,
+ started,
+ starting,
+ draining,
+ error,
+ shutdown,
};
struct cubeb_stream {
/* Note: Must match cubeb_stream layout in cubeb.c. */
- cubeb * context;
- void * user_ptr;
+ cubeb * context {};
+ void * user_ptr {};
/**/
- _Atomic bool in_use;
- _Atomic enum stream_state state;
+ std::atomic<bool> in_use {false};
+ std::atomic<stream_state> state {stream_state::init};
- AAudioStream * ostream;
- AAudioStream * istream;
- cubeb_data_callback data_callback;
- cubeb_state_callback state_callback;
- cubeb_resampler * resampler;
+ AAudioStream * ostream {};
+ AAudioStream * istream {};
+ cubeb_data_callback data_callback {};
+ cubeb_state_callback state_callback {};
+ cubeb_resampler * resampler {};
// mutex synchronizes access to the stream from the state thread
// and user-called functions. Everything that is accessed in the
// aaudio data (or error) callback is synchronized only via atomics.
- pthread_mutex_t mutex;
+ std::mutex mutex;
- void * in_buf;
- unsigned in_frame_size;
+ std::unique_ptr<char[]> in_buf;
+ unsigned in_frame_size {}; // size of one input frame
- cubeb_sample_format out_format;
- _Atomic float volume;
- unsigned out_channels;
- unsigned out_frame_size;
+ cubeb_sample_format out_format {};
+ std::atomic<float> volume {1.f};
+ unsigned out_channels {};
+ unsigned out_frame_size {};
};
struct cubeb {
- struct cubeb_ops const * ops;
- void * libaaudio;
+ struct cubeb_ops const * ops {};
+ void * libaaudio {};
struct {
// The state thread: it waits for state changes and stops
// drained streams.
- pthread_t thread;
- pthread_t notifier;
- pthread_mutex_t mutex;
- pthread_cond_t cond;
- _Atomic bool join;
- _Atomic bool waiting;
+ std::thread thread;
+ std::thread notifier;
+ std::mutex mutex;
+ std::condition_variable cond;
+ std::atomic<bool> join {false};
+ std::atomic<bool> waiting {false};
} state;
// streams[i].in_use signals whether a stream is used
@@ -155,53 +161,48 @@ static void shutdown(cubeb_stream * stm)
}
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
- atomic_store(&stm->state, STREAM_STATE_SHUTDOWN);
+ stm->state.store(stream_state::shutdown);
}
// Returns whether the given state is one in which we wait for
// an asynchronous change
-static bool waiting_state(enum stream_state state)
+static bool waiting_state(stream_state state)
{
switch (state) {
- case STREAM_STATE_DRAINING:
- case STREAM_STATE_STARTING:
- case STREAM_STATE_STOPPING:
+ case stream_state::draining:
+ case stream_state::starting:
+ case stream_state::stopping:
return true;
default:
return false;
}
}
-// Returns whether this stream is still waiting for a state change
static void update_state(cubeb_stream * stm)
{
- // fast path for streams that don't wait for state change or are invalid
- enum stream_state old_state = atomic_load(&stm->state);
- if (old_state == STREAM_STATE_INIT ||
- old_state == STREAM_STATE_STARTED ||
- old_state == STREAM_STATE_STOPPED ||
- old_state == STREAM_STATE_SHUTDOWN) {
+ // Fast path for streams that don't wait for state change or are invalid
+ enum stream_state old_state = stm->state.load();
+ if (old_state == stream_state::init ||
+ old_state == stream_state::started ||
+ old_state == stream_state::stopped ||
+ old_state == stream_state::shutdown) {
return;
}
// If the main thread currently operates on this thread, we don't
// have to wait for it
- int err = pthread_mutex_trylock(&stm->mutex);
- if (err != 0) {
- if (err != EBUSY) {
- LOG("pthread_mutex_trylock: %s", strerror(err));
- }
+ unique_lock lock(stm->mutex, std::try_to_lock);
+ if (!lock.owns_lock()) {
return;
}
// check again: if this is true now, the stream was destroyed or
// changed between our fast path check and locking the mutex
- old_state = atomic_load(&stm->state);
- if (old_state == STREAM_STATE_INIT ||
- old_state == STREAM_STATE_STARTED ||
- old_state == STREAM_STATE_STOPPED ||
- old_state == STREAM_STATE_SHUTDOWN) {
- pthread_mutex_unlock(&stm->mutex);
+ old_state = stm->state.load();
+ if (old_state == stream_state::init ||
+ old_state == stream_state::started ||
+ old_state == stream_state::stopped ||
+ old_state == stream_state::shutdown) {
return;
}
@@ -211,14 +212,12 @@ static void update_state(cubeb_stream * stm)
// such as a draining or error state.
enum stream_state new_state;
do {
- if (old_state == STREAM_STATE_SHUTDOWN) {
- pthread_mutex_unlock(&stm->mutex);
+ if (old_state == stream_state::shutdown) {
return;
}
- if (old_state == STREAM_STATE_ERROR) {
+ if (old_state == stream_state::error) {
shutdown(stm);
- pthread_mutex_unlock(&stm->mutex);
return;
}
@@ -227,12 +226,15 @@ static void update_state(cubeb_stream * stm)
aaudio_stream_state_t istate = 0;
aaudio_stream_state_t ostate = 0;
+ // We use waitForStateChange (with zero timeout) instead of just
+ // getState since only the former internally updates the state.
+ // See the docs of aaudio getState/waitForStateChange for details,
+ // why we are passing STATE_UNKNOWN.
aaudio_result_t res;
if (stm->istream) {
res = WRAP(AAudioStream_waitForStateChange)(stm->istream,
AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0);
if (res != AAUDIO_OK) {
- pthread_mutex_unlock(&stm->mutex);
LOG("AAudioStream_waitForStateChanged: %s", WRAP(AAudio_convertResultToText)(res));
return;
}
@@ -243,7 +245,6 @@ static void update_state(cubeb_stream * stm)
res = WRAP(AAudioStream_waitForStateChange)(stm->ostream,
AAUDIO_STREAM_STATE_UNKNOWN, &ostate, 0);
if (res != AAUDIO_OK) {
- pthread_mutex_unlock(&stm->mutex);
LOG("AAudioStream_waitForStateChanged: %s", WRAP(AAudio_convertResultToText)(res));
return;
}
@@ -258,9 +259,8 @@ static void update_state(cubeb_stream * stm)
istate == AAUDIO_STREAM_STATE_UNKNOWN ||
istate == AAUDIO_STREAM_STATE_DISCONNECTED) {
const char * name = WRAP(AAudio_convertStreamStateToText)(istate);
- LOG("Invalid android input stream state %s", name);
+ LOG("Unexpected android input stream state %s", name);
shutdown(stm);
- pthread_mutex_unlock(&stm->mutex);
return;
}
@@ -271,21 +271,20 @@ static void update_state(cubeb_stream * stm)
ostate == AAUDIO_STREAM_STATE_UNKNOWN ||
ostate == AAUDIO_STREAM_STATE_DISCONNECTED) {
const char * name = WRAP(AAudio_convertStreamStateToText)(istate);
- LOG("Invalid android output stream state %s", name);
+ LOG("Unexpected android output stream state %s", name);
shutdown(stm);
- pthread_mutex_unlock(&stm->mutex);
return;
}
switch (old_state) {
- case STREAM_STATE_STARTING:
+ case stream_state::starting:
if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) {
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED);
- new_state = STREAM_STATE_STARTED;
+ new_state = stream_state::started;
}
break;
- case STREAM_STATE_DRAINING:
+ case stream_state::draining:
// The DRAINING state means that we want to stop the streams but
// may not have done so yet.
// The aaudio docs state that returning STOP from the callback isn't
@@ -315,17 +314,18 @@ static void update_state(cubeb_stream * stm)
}
// we always wait until both streams are stopped until we
- // send STATE_DRAINED. Then we can directly transition
- // our logical state to STREAM_STATE_STOPPED, not triggering
- // an additional STATE_STOPPED callback
+ // send CUBEB_STATE_DRAINED. Then we can directly transition
+ // our logical state to stream_state::stopped, not triggering
+ // an additional CUBEB_STATE_STOPPED callback (which might
+ // be unexpected for the user).
if ((!ostate || ostate == AAUDIO_STREAM_STATE_STOPPED) &&
(!istate || istate == AAUDIO_STREAM_STATE_STOPPED)) {
- new_state = STREAM_STATE_STOPPED;
+ new_state = stream_state::stopped;
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED);
}
break;
- case STREAM_STATE_STOPPING:
+ case stream_state::stopping:
assert(!istate ||
istate == AAUDIO_STREAM_STATE_STOPPING ||
istate == AAUDIO_STREAM_STATE_STOPPED);
@@ -335,46 +335,52 @@ static void update_state(cubeb_stream * stm)
if ((!istate || istate == AAUDIO_STREAM_STATE_STOPPED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_STOPPED)) {
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STOPPED);
- new_state = STREAM_STATE_STOPPED;
+ new_state = stream_state::stopped;
}
break;
default:
assert(false && "Unreachable: invalid state");
}
- } while (old_state != new_state &&
- !atomic_compare_exchange_strong(&stm->state, &old_state, new_state));
-
- pthread_mutex_unlock(&stm->mutex);
+ } while (old_state != new_state && !stm->state.compare_exchange_strong(old_state, new_state));
}
-static void * notifier_thread(void * user_ptr)
+// See https://nyorain.github.io/lock-free-wakeup.html for a note
+// why this is needed. The audio thread notifies the state thread about
+// state changes and must not block. The state thread on the other hand should
+// sleep until there is work to be done. So we need a lockfree producer
+// and blocking producer. This can only be achieved safely with a new thread
+// that only serves as notifier backup (in case the notification happens
+// right between the state thread checking and going to sleep in which case
+// this thread will kick in and signal it right again).
+static void notifier_thread(cubeb* ctx)
{
- cubeb * ctx = (cubeb*) user_ptr;
- pthread_mutex_lock(&ctx->state.mutex);
- while (!atomic_load(&ctx->state.join)) {
- pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
- if (atomic_load(&ctx->state.waiting)) {
- pthread_cond_signal(&ctx->state.cond);
+ unique_lock lock(ctx->state.mutex);
+
+ while (!ctx->state.join.load()) {
+ ctx->state.cond.wait(lock);
+ if (ctx->state.waiting.load()) {
+ // This must signal our state thread since there is no other
+ // thread currently waiting on the condition variable.
+ // The state change thread is guaranteed to be waiting since
+ // we hold the mutex it locks when awake.
+ ctx->state.cond.notify_one();
}
}
// make sure other thread joins as well
- pthread_cond_signal(&ctx->state.cond);
- pthread_mutex_unlock(&ctx->state.mutex);
+ ctx->state.cond.notify_one();
LOG("Exiting notifier thread");
- return NULL;
}
-static void * state_thread(void * user_ptr)
+static void state_thread(cubeb* ctx)
{
- cubeb * ctx = (cubeb*) user_ptr;
- pthread_mutex_lock(&ctx->state.mutex);
+ unique_lock lock(ctx->state.mutex);
bool waiting = false;
- while (!atomic_load(&ctx->state.join)) {
- waiting |= atomic_load(&ctx->state.waiting);
+ while (!ctx->state.join.load()) {
+ waiting |= ctx->state.waiting.load();
if (waiting) {
- atomic_store(&ctx->state.waiting, false);
+ ctx->state.waiting.store(false);
waiting = false;
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
cubeb_stream * stm = &ctx->streams[i];
@@ -383,7 +389,7 @@ static void * state_thread(void * user_ptr)
}
// state changed from another thread, update again immediately
- if (atomic_load(&ctx->state.waiting)) {
+ if (ctx->state.waiting.load()) {
waiting = true;
continue;
}
@@ -397,31 +403,20 @@ static void * state_thread(void * user_ptr)
// while any stream is waiting for state change we sleep with regular
// timeouts. But we wake up immediately if signaled.
// This might seem like a poor man's implementation of state change
- // waiting but (as of december 2019), the implementation of
- // AAudioStream_waitForStateChange is pretty much the same:
- // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp#277
- struct timespec timeout;
-
- // pthread_condattr_setclock isn't available before android api v21.
- // pthread condition variables use the realtime clock by default, which
- // means that system time changes can interfer with wakeup timeouts.
-#if __ANDROID_API__ >= 21
- clock_gettime(CLOCK_MONOTONIC, &timeout);
-#else
- clock_gettime(CLOCK_REALTIME, &timeout);
-#endif
- timeout.tv_nsec += 5 * 1000 * 1000; // wait 5ms
- pthread_cond_timedwait(&ctx->state.cond, &ctx->state.mutex, &timeout);
+ // waiting but (as of october 2020), the implementation of
+ // AAudioStream_waitForStateChange is just sleeping with regular
+ // timeouts as well:
+ // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp
+ auto dur = std::chrono::milliseconds(5);
+ ctx->state.cond.wait_for(lock, dur);
} else {
- pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
+ ctx->state.cond.wait(lock);
}
}
// make sure other thread joins as well
- pthread_cond_signal(&ctx->state.cond);
- pthread_mutex_unlock(&ctx->state.mutex);
+ ctx->state.cond.notify_one();
LOG("Exiting state thread");
- return NULL;
}
static char const *
@@ -443,127 +438,41 @@ aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels)
static void
aaudio_destroy(cubeb * ctx)
{
+ assert(ctx);
+
#ifndef NDEBUG
// make sure all streams were destroyed
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- assert(!atomic_load(&ctx->streams[i].in_use));
+ assert(!ctx->streams[i].in_use.load());
}
#endif
// broadcast joining to both threads
// they will additionally signal each other before joining
- atomic_store(&ctx->state.join, true);
- pthread_cond_broadcast(&ctx->state.cond);
-
- int err;
- err = pthread_join(ctx->state.thread, NULL);
- if (err != 0) {
- LOG("pthread_join: %s", strerror(err));
- }
+ ctx->state.join.store(true);
+ ctx->state.cond.notify_all();
- err = pthread_join(ctx->state.notifier, NULL);
- if (err != 0) {
- LOG("pthread_join: %s", strerror(err));
- }
-
- pthread_cond_destroy(&ctx->state.cond);
- pthread_mutex_destroy(&ctx->state.mutex);
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- pthread_mutex_destroy(&ctx->streams[i].mutex);
+ try {
+ if(ctx->state.thread.joinable()) {
+ ctx->state.thread.join();
+ }
+ if(ctx->state.notifier.joinable()) {
+ ctx->state.notifier.join();
+ }
+ } catch(const std::system_error& err) {
+ LOG("Joining thread failed: %s", err.what());
}
if (ctx->libaaudio) {
dlclose(ctx->libaaudio);
}
- free(ctx);
-}
-
-/*static*/ int
-aaudio_init(cubeb ** context, char const * context_name) {
- (void) context_name;
- int err;
-
- // load api
- void * libaaudio = NULL;
-#ifndef DISABLE_LIBAAUDIO_DLOPEN
- libaaudio = dlopen("libaaudio.so", RTLD_NOW);
- if (!libaaudio) {
- return CUBEB_ERROR;
- }
-
-#define LOAD(x) { \
- WRAP(x) = (typeof(WRAP(x))) (dlsym(libaaudio, #x)); \
- if (!WRAP(x)) { \
- LOG("AAudio: Failed to load %s", #x); \
- dlclose(libaaudio); \
- return CUBEB_ERROR; \
- } \
- }
-
- LIBAAUDIO_API_VISIT(LOAD);
-#undef LOAD
-#endif
-
- cubeb * ctx = (cubeb*) calloc(1, sizeof(*ctx));
- ctx->ops = &aaudio_ops;
- ctx->libaaudio = libaaudio;
- atomic_init(&ctx->state.join, false);
- atomic_init(&ctx->state.waiting, false);
- pthread_mutex_init(&ctx->state.mutex, NULL);
-
- pthread_condattr_t cond_attr;
- pthread_condattr_init(&cond_attr);
-
- // pthread_condttr_setclock isn't available before android api v21.
- // pthread condition variables use the realtime clock by default, which
- // means that system time changes can interfer with wakeup timeouts.
-#if __ANDROID_API__ >= 21
- pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
-#endif
-
- err = pthread_cond_init(&ctx->state.cond, &cond_attr);
- pthread_condattr_destroy(&cond_attr);
- if (err) {
- LOG("pthread_cond_init: %s", strerror(err));
- aaudio_destroy(ctx);
- return CUBEB_ERROR;
- }
-
- // The stream mutexes are not bound to the lifetimes of the
- // streams since we need them to synchronize the streams with
- // the state thread.
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- atomic_init(&ctx->streams[i].in_use, false);
- atomic_init(&ctx->streams[i].state, STREAM_STATE_INIT);
- pthread_mutex_init(&ctx->streams[i].mutex, NULL);
- }
-
- err = pthread_create(&ctx->state.thread, NULL, state_thread, ctx);
- if (err != 0) {
- LOG("pthread_create: %s", strerror(err));
- aaudio_destroy(ctx);
- return CUBEB_ERROR;
- }
-
- // TODO: we could set the priority of the notifier thread lower than
- // the priority of the state thread. This way, it's more likely
- // that the state thread will be woken up by the condition variable signal
- // when both are currently waiting
- err = pthread_create(&ctx->state.notifier, NULL, notifier_thread, ctx);
- if (err != 0) {
- LOG("pthread_create: %s", strerror(err));
- aaudio_destroy(ctx);
- return CUBEB_ERROR;
- }
-
- *context = ctx;
- return CUBEB_OK;
+ delete ctx;
}
static void
apply_volume(cubeb_stream * stm, void * audio_data, uint32_t num_frames) {
+ float volume = stm->volume.load();
// optimization: we don't have to change anything in this case
- float volume = atomic_load(&stm->volume);
if (volume == 1.f) {
return;
}
@@ -571,12 +480,12 @@ apply_volume(cubeb_stream * stm, void * audio_data, uint32_t num_frames) {
switch (stm->out_format) {
case CUBEB_SAMPLE_S16NE:
for (uint32_t i = 0u; i < num_frames * stm->out_channels; ++i) {
- ((int16_t*)audio_data)[i] *= volume;
+ (static_cast<int16_t*>(audio_data))[i] *= volume;
}
break;
case CUBEB_SAMPLE_FLOAT32NE:
for (uint32_t i = 0u; i < num_frames * stm->out_channels; ++i) {
- ((float*)audio_data)[i] *= volume;
+ (static_cast<float*>(audio_data))[i] *= volume;
}
break;
default:
@@ -597,7 +506,7 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
assert(stm->istream);
assert(num_frames >= 0);
- enum stream_state state = atomic_load(&stm->state);
+ stream_state state = atomic_load(&stm->state);
// int istate = WRAP(AAudioStream_getState)(stm->istream);
// int ostate = WRAP(AAudioStream_getState)(stm->ostream);
// ALOGV("aaudio duplex data cb on stream %p: state %ld (in: %d, out: %d), num_frames: %ld",
@@ -605,19 +514,19 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
// all other states may happen since the callback might be called
// from within requestStart
- assert(state != STREAM_STATE_SHUTDOWN);
+ assert(state != stream_state::shutdown);
// This might happen when we started draining but not yet actually
// stopped the stream from the state thread.
- if (state == STREAM_STATE_DRAINING) {
- memset(audio_data, 0x0, num_frames * stm->out_frame_size);
+ if (state == stream_state::draining) {
+ std::memset(audio_data, 0x0, num_frames * stm->out_frame_size);
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
long in_num_frames = WRAP(AAudioStream_read)(stm->istream,
- stm->in_buf, num_frames, 0);
+ stm->in_buf.get(), num_frames, 0);
if (in_num_frames < 0) { // error
- atomic_store(&stm->state, STREAM_STATE_ERROR);
+ stm->state.store(stream_state::error);
LOG("AAudioStream_read: %s", WRAP(AAudio_convertResultToText)(in_num_frames));
return AAUDIO_CALLBACK_RESULT_STOP;
}
@@ -632,25 +541,25 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
// ALOGV("AAudioStream_read returned not enough frames: %ld instead of %d",
// in_num_frames, num_frames);
unsigned left = num_frames - in_num_frames;
- char * buf = ((char*) stm->in_buf) + in_num_frames * stm->in_frame_size;
- memset(buf, 0x0, left * stm->in_frame_size);
+ char * buf = stm->in_buf.get() + in_num_frames * stm->in_frame_size;
+ std::memset(buf, 0x0, left * stm->in_frame_size);
in_num_frames = num_frames;
}
- long done_frames = cubeb_resampler_fill(stm->resampler, stm->in_buf,
+ long done_frames = cubeb_resampler_fill(stm->resampler, stm->in_buf.get(),
&in_num_frames, audio_data, num_frames);
if (done_frames < 0 || done_frames > num_frames) {
LOG("Error in data callback or resampler: %ld", done_frames);
- atomic_store(&stm->state, STREAM_STATE_ERROR);
+ stm->state.store(stream_state::error);
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
- atomic_store(&stm->state, STREAM_STATE_DRAINING);
- atomic_store(&stm->context->state.waiting, true);
- pthread_cond_signal(&stm->context->state.cond);
+ stm->state.store(stream_state::draining);
+ stm->context->state.waiting.store(true);
+ stm->context->state.cond.notify_one();
- char * begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
- memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
+ char * begin = static_cast<char*>(audio_data) + done_frames * stm->out_frame_size;
+ std::memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
}
apply_volume(stm, audio_data, done_frames);
@@ -666,19 +575,19 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data,
assert(!stm->istream);
assert(num_frames >= 0);
- enum stream_state state = atomic_load(&stm->state);
+ stream_state state = stm->state.load();
// int ostate = WRAP(AAudioStream_getState)(stm->ostream);
// ALOGV("aaudio output data cb on stream %p: state %ld (%d), num_frames: %ld",
// (void*) stm, state, ostate, num_frames);
// all other states may happen since the callback might be called
// from within requestStart
- assert(state != STREAM_STATE_SHUTDOWN);
+ assert(state != stream_state::shutdown);
// This might happen when we started draining but not yet actually
// stopped the stream from the state thread.
- if (state == STREAM_STATE_DRAINING) {
- memset(audio_data, 0x0, num_frames * stm->out_frame_size);
+ if (state == stream_state::draining) {
+ std::memset(audio_data, 0x0, num_frames * stm->out_frame_size);
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
@@ -686,15 +595,15 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data,
audio_data, num_frames);
if (done_frames < 0 || done_frames > num_frames) {
LOG("Error in data callback or resampler: %ld", done_frames);
- atomic_store(&stm->state, STREAM_STATE_ERROR);
+ stm->state.store(stream_state::error);
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
- atomic_store(&stm->state, STREAM_STATE_DRAINING);
- atomic_store(&stm->context->state.waiting, true);
- pthread_cond_signal(&stm->context->state.cond);
+ stm->state.store(stream_state::draining);
+ stm->context->state.waiting.store(true);
+ stm->context->state.cond.notify_one();
- char * begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
- memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
+ char * begin = static_cast<char*>(audio_data) + done_frames * stm->out_frame_size;
+ std::memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
}
apply_volume(stm, audio_data, done_frames);
@@ -710,18 +619,18 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
assert(!stm->ostream);
assert(num_frames >= 0);
- enum stream_state state = atomic_load(&stm->state);
+ stream_state state = stm->state.load();
// int istate = WRAP(AAudioStream_getState)(stm->istream);
// ALOGV("aaudio input data cb on stream %p: state %ld (%d), num_frames: %ld",
// (void*) stm, state, istate, num_frames);
// all other states may happen since the callback might be called
// from within requestStart
- assert(state != STREAM_STATE_SHUTDOWN);
+ assert(state != stream_state::shutdown);
// This might happen when we started draining but not yet actually
// stopped the stream from the state thread.
- if (state == STREAM_STATE_DRAINING) {
+ if (state == stream_state::draining) {
return AAUDIO_CALLBACK_RESULT_CONTINUE;
}
@@ -730,15 +639,15 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
audio_data, &input_frame_count, NULL, 0);
if (done_frames < 0 || done_frames > num_frames) {
LOG("Error in data callback or resampler: %ld", done_frames);
- atomic_store(&stm->state, STREAM_STATE_ERROR);
+ stm->state.store(stream_state::error);
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < input_frame_count) {
// we don't really drain an input stream, just have to
// stop it from the state thread. That is signaled via the
// DRAINING state.
- atomic_store(&stm->state, STREAM_STATE_DRAINING);
- atomic_store(&stm->context->state.waiting, true);
- pthread_cond_signal(&stm->context->state.cond);
+ stm->state.store(stream_state::draining);
+ stm->context->state.waiting.store(true);
+ stm->context->state.cond.notify_one();
}
return AAUDIO_CALLBACK_RESULT_CONTINUE;
@@ -747,10 +656,10 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
static void
aaudio_error_cb(AAudioStream * astream, void * user_data, aaudio_result_t error)
{
- cubeb_stream * stm = (cubeb_stream*) user_data;
+ cubeb_stream * stm = static_cast<cubeb_stream*>(user_data);
assert(stm->ostream == astream || stm->istream == astream);
LOG("AAudio error callback: %s", WRAP(AAudio_convertResultToText)(error));
- atomic_store(&stm->state, STREAM_STATE_ERROR);
+ stm->state.store(stream_state::error);
}
static int
@@ -803,13 +712,13 @@ realize_stream(AAudioStreamBuilder * sb, const cubeb_stream_params * params,
static void
aaudio_stream_destroy(cubeb_stream * stm)
{
- pthread_mutex_lock(&stm->mutex);
- assert(stm->state == STREAM_STATE_STOPPED ||
- stm->state == STREAM_STATE_STOPPING ||
- stm->state == STREAM_STATE_INIT ||
- stm->state == STREAM_STATE_DRAINING ||
- stm->state == STREAM_STATE_ERROR ||
- stm->state == STREAM_STATE_SHUTDOWN);
+ lock_guard lock(stm->mutex);
+ assert(stm->state == stream_state::stopped ||
+ stm->state == stream_state::stopping ||
+ stm->state == stream_state::init ||
+ stm->state == stream_state::draining ||
+ stm->state == stream_state::error ||
+ stm->state == stream_state::shutdown);
aaudio_result_t res;
@@ -817,9 +726,9 @@ aaudio_stream_destroy(cubeb_stream * stm)
// That is important as we otherwise might read from a closed istream
// for a duplex stream.
if (stm->ostream) {
- if (stm->state != STREAM_STATE_STOPPED &&
- stm->state != STREAM_STATE_STOPPING &&
- stm->state != STREAM_STATE_SHUTDOWN) {
+ if (stm->state != stream_state::stopped &&
+ stm->state != stream_state::stopping &&
+ stm->state != stream_state::shutdown) {
res = WRAP(AAudioStream_requestStop)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStreamBuilder_requestStop: %s", WRAP(AAudio_convertResultToText)(res));
@@ -831,9 +740,9 @@ aaudio_stream_destroy(cubeb_stream * stm)
}
if (stm->istream) {
- if (stm->state != STREAM_STATE_STOPPED &&
- stm->state != STREAM_STATE_STOPPING &&
- stm->state != STREAM_STATE_SHUTDOWN) {
+ if (stm->state != stream_state::stopped &&
+ stm->state != stream_state::stopping &&
+ stm->state != stream_state::shutdown) {
res = WRAP(AAudioStream_requestStop)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStreamBuilder_requestStop: %s", WRAP(AAudio_convertResultToText)(res));
@@ -847,78 +756,40 @@ aaudio_stream_destroy(cubeb_stream * stm)
if (stm->resampler) {
cubeb_resampler_destroy(stm->resampler);
}
- if (stm->in_buf) {
- free(stm->in_buf);
- }
- atomic_store(&stm->state, STREAM_STATE_INIT);
- pthread_mutex_unlock(&stm->mutex);
- atomic_store(&stm->in_use, false);
+ stm->state.store(stream_state::init);
+ stm->in_use.store(false);
}
static int
-aaudio_stream_init(cubeb * ctx,
- cubeb_stream ** stream,
+aaudio_stream_init_impl(
+ cubeb_stream * stm,
char const * stream_name,
cubeb_devid input_device,
cubeb_stream_params * input_stream_params,
cubeb_devid output_device,
cubeb_stream_params * output_stream_params,
- unsigned int latency_frames,
- cubeb_data_callback data_callback,
- cubeb_state_callback state_callback,
- void * user_ptr)
+ unsigned int latency_frames)
{
- assert(!input_device);
- assert(!output_device);
-
- (void) stream_name;
+ assert(stm->state.load() == stream_state::init);
+ stm->in_use.store(true);
aaudio_result_t res;
- AAudioStreamBuilder * sb;
+ AAudioStreamBuilder* sb;
res = WRAP(AAudio_createStreamBuilder)(&sb);
if (res != AAUDIO_OK) {
LOG("AAudio_createStreamBuilder: %s", WRAP(AAudio_convertResultToText)(res));
return CUBEB_ERROR;
}
- // atomically find a free stream.
- cubeb_stream * stm = NULL;
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- // This check is only an optimization, we don't strictly need it
- // since we check again after locking the mutex.
- if (atomic_load(&ctx->streams[i].in_use)) {
- continue;
- }
-
- // if this fails with EBUSY, another thread initialized this stream
- // between our check of in_use and this.
- int err = pthread_mutex_trylock(&ctx->streams[i].mutex);
- if (err != 0 || atomic_load(&ctx->streams[i].in_use)) {
- if (err && err != EBUSY) {
- LOG("pthread_mutex_trylock: %s", strerror(err));
- }
-
- continue;
+ // make sure the builder is always destroyed
+ struct StreamBuilderDestructor {
+ void operator()(AAudioStreamBuilder* sb) {
+ WRAP(AAudioStreamBuilder_delete)(sb);
}
+ };
- stm = &ctx->streams[i];
- break;
- }
-
- if (!stm) {
- LOG("Error: maximum number of streams reached");
- return CUBEB_ERROR;
- }
-
- assert(atomic_load(&stm->state) == STREAM_STATE_INIT);
- atomic_store(&stm->in_use, true);
-
- unsigned res_err = CUBEB_ERROR;
- stm->user_ptr = user_ptr;
- stm->data_callback = data_callback;
- stm->state_callback = state_callback;
- stm->context = ctx;
+ std::unique_ptr<AAudioStreamBuilder, StreamBuilderDestructor> sbPtr(sb);
WRAP(AAudioStreamBuilder_setErrorCallback)(sb, aaudio_error_cb, stm);
WRAP(AAudioStreamBuilder_setBufferCapacityInFrames)(sb, latency_frames);
@@ -934,16 +805,16 @@ aaudio_stream_init(cubeb * ctx,
out_data_callback = aaudio_output_data_cb;
} else {
LOG("Tried to open stream without input or output parameters");
- goto error;
+ return CUBEB_ERROR;
}
-#ifdef AAUDIO_EXCLUSIVE
+#ifdef CUBEB_AAUDIO_EXCLUSIVE_STREAM
WRAP(AAudioStreamBuilder_setSharingMode)(sb, AAUDIO_SHARING_MODE_EXCLUSIVE);
#endif
-#ifdef AAUDIO_LOW_LATENCY
+#ifdef CUBEB_AAUDIO_LOW_LATENCY
WRAP(AAudioStreamBuilder_setPerformanceMode)(sb, AAUDIO_PERFORMANCE_MODE_LOW_LATENCY);
-#elif defined(AAUDIO_POWER_SAVING)
+#elif defined(CUBEB_AAUDIO_POWER_SAVING)
WRAP(AAudioStreamBuilder_setPerformanceMode)(sb, AAUDIO_PERFORMANCE_MODE_POWER_SAVING);
#endif
@@ -956,9 +827,9 @@ aaudio_stream_init(cubeb * ctx,
if (output_stream_params) {
WRAP(AAudioStreamBuilder_setDirection)(sb, AAUDIO_DIRECTION_OUTPUT);
WRAP(AAudioStreamBuilder_setDataCallback)(sb, out_data_callback, stm);
- res_err = realize_stream(sb, output_stream_params, &stm->ostream, &frame_size);
+ int res_err = realize_stream(sb, output_stream_params, &stm->ostream, &frame_size);
if (res_err) {
- goto error;
+ return res_err;
}
// output debug information
@@ -980,7 +851,7 @@ aaudio_stream_init(cubeb * ctx,
stm->out_channels = output_stream_params->channels;
stm->out_format = output_stream_params->format;
stm->out_frame_size = frame_size;
- atomic_store(&stm->volume, 1.f);
+ stm->volume.store(1.f);
}
// input
@@ -988,9 +859,9 @@ aaudio_stream_init(cubeb * ctx,
if (input_stream_params) {
WRAP(AAudioStreamBuilder_setDirection)(sb, AAUDIO_DIRECTION_INPUT);
WRAP(AAudioStreamBuilder_setDataCallback)(sb, in_data_callback, stm);
- res_err = realize_stream(sb, input_stream_params, &stm->istream, &frame_size);
+ int res_err = realize_stream(sb, input_stream_params, &stm->istream, &frame_size);
if (res_err) {
- goto error;
+ return res_err;
}
// output debug information
@@ -1005,7 +876,7 @@ aaudio_stream_init(cubeb * ctx,
LOG("AAudio input stream buffer size: %d", bsize);
LOG("AAudio input stream buffer rate: %d", rate);
- stm->in_buf = malloc(bcap * frame_size);
+ stm->in_buf.reset(new char[bcap * frame_size]());
assert(!target_sample_rate || target_sample_rate == input_stream_params->rate);
target_sample_rate = input_stream_params->rate;
@@ -1019,58 +890,117 @@ aaudio_stream_init(cubeb * ctx,
input_stream_params ? &in_params : NULL,
output_stream_params ? &out_params : NULL,
target_sample_rate,
- data_callback,
- user_ptr,
+ stm->data_callback,
+ stm->user_ptr,
CUBEB_RESAMPLER_QUALITY_DEFAULT);
if (!stm->resampler) {
LOG("Failed to create resampler");
- goto error;
+ return CUBEB_ERROR;
}
// the stream isn't started initially. We don't need to differntiate
// between a stream that was just initialized and one that played
// already but was stopped
- atomic_store(&stm->state, STREAM_STATE_STOPPED);
+ stm->state.store(stream_state::stopped);
LOG("Cubeb stream (%p) init success", (void*) stm);
- pthread_mutex_unlock(&stm->mutex);
+ return CUBEB_OK;
+}
+
+static int
+aaudio_stream_init(cubeb * ctx,
+ cubeb_stream ** stream,
+ char const * stream_name,
+ cubeb_devid input_device,
+ cubeb_stream_params * input_stream_params,
+ cubeb_devid output_device,
+ cubeb_stream_params * output_stream_params,
+ unsigned int latency_frames,
+ cubeb_data_callback data_callback,
+ cubeb_state_callback state_callback,
+ void * user_ptr)
+{
+ assert(!input_device);
+ assert(!output_device);
+
+ (void) stream_name;
+
+ // atomically find a free stream.
+ cubeb_stream * stm = NULL;
+ unique_lock lock;
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ // This check is only an optimization, we don't strictly need it
+ // since we check again after locking the mutex.
+ if (ctx->streams[i].in_use.load()) {
+ continue;
+ }
+
+ // if this fails, another thread initialized this stream
+ // between our check of in_use and this.
+ lock = unique_lock(ctx->streams[i].mutex, std::try_to_lock);
+ if(!lock.owns_lock()) {
+ continue;
+ }
+
+ if(ctx->streams[i].in_use.load()) {
+ lock = {};
+ continue;
+ }
+
+ stm = &ctx->streams[i];
+ break;
+ }
+
+ if (!stm) {
+ LOG("Error: maximum number of streams reached");
+ return CUBEB_ERROR;
+ }
+
+ stm->context = ctx;
+ stm->user_ptr = user_ptr;
+ stm->data_callback = data_callback;
+ stm->state_callback = state_callback;
+
+ int err = aaudio_stream_init_impl(stm, stream_name, input_device,
+ input_stream_params, output_device, output_stream_params, latency_frames);
+ if(err != CUBEB_OK) {
+ // This is needed since aaudio_stream_destroy will lock the mutex again.
+ // It's no problem that there is a gap in between as the stream isn't
+ // actually in u se.
+ lock.unlock();
+ aaudio_stream_destroy(stm);
+ return err;
+ }
- WRAP(AAudioStreamBuilder_delete)(sb);
*stream = stm;
return CUBEB_OK;
-
-error:
- WRAP(AAudioStreamBuilder_delete)(sb);
- pthread_mutex_unlock(&stm->mutex);
- aaudio_stream_destroy(stm);
- return res_err;
}
static int
aaudio_stream_start(cubeb_stream * stm)
{
- pthread_mutex_lock(&stm->mutex);
- enum stream_state state = atomic_load(&stm->state);
+ assert(stm && stm->in_use.load());
+ lock_guard lock(stm->mutex);
+
+ stream_state state = stm->state.load();
int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0;
int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0;
LOGV("starting stream %p: %d (%d %d)", (void*) stm, state, istate, ostate);
switch (state) {
- case STREAM_STATE_STARTED:
- case STREAM_STATE_STARTING:
- pthread_mutex_unlock(&stm->mutex);
+ case stream_state::started:
+ case stream_state::starting:
LOG("cubeb stream %p already starting/started", (void*) stm);
return CUBEB_OK;
- case STREAM_STATE_ERROR:
- case STREAM_STATE_SHUTDOWN:
+ case stream_state::error:
+ case stream_state::shutdown:
return CUBEB_ERROR;
- case STREAM_STATE_INIT:
- pthread_mutex_unlock(&stm->mutex);
+ case stream_state::init:
assert(false && "Invalid stream");
return CUBEB_ERROR;
- case STREAM_STATE_STOPPED:
- case STREAM_STATE_STOPPING:
- case STREAM_STATE_DRAINING:
+ case stream_state::stopped:
+ case stream_state::stopping:
+ case stream_state::draining:
break;
}
@@ -1078,7 +1008,7 @@ aaudio_stream_start(cubeb_stream * stm)
// NOTE: aaudio docs don't state explicitly if we have to do this or
// if we are allowed to call requestStart while the stream is
- // in the transient STOPPING state. Works without in testing though
+ // in the transient STOPPING state. Works without this in testing though
// if (ostate == AAUDIO_STREAM_STATE_STOPPING) {
// res = WRAP(AAudioStream_waitForStateChange)(stm->ostream,
// AAUDIO_STREAM_STATE_STOPPING, &ostate, INT64_MAX);
@@ -1102,8 +1032,7 @@ aaudio_stream_start(cubeb_stream * stm)
res = WRAP(AAudioStream_requestStart)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStart (istream): %s", WRAP(AAudio_convertResultToText)(res));
- atomic_store(&stm->state, STREAM_STATE_ERROR);
- pthread_mutex_unlock(&stm->mutex);
+ stm->state.store(stream_state::error);
return CUBEB_ERROR;
}
}
@@ -1112,32 +1041,32 @@ aaudio_stream_start(cubeb_stream * stm)
res = WRAP(AAudioStream_requestStart)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStart (ostream): %s", WRAP(AAudio_convertResultToText)(res));
- atomic_store(&stm->state, STREAM_STATE_ERROR);
- pthread_mutex_unlock(&stm->mutex);
+ stm->state.store(stream_state::error);
return CUBEB_ERROR;
}
}
int ret = CUBEB_OK;
bool success;
- while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING))) {
+
+ while (!(success = stm->state.compare_exchange_strong(state, stream_state::starting))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
// The stream will be stopped when shut down.
- case STREAM_STATE_ERROR:
+ case stream_state::error:
ret = CUBEB_ERROR;
break;
// The only situation in which the state could have switched to draining
// is if the callback was already fired and requested draining. Don't
// overwrite that. It's not an error either though.
- case STREAM_STATE_DRAINING:
+ case stream_state::draining:
break;
// If the state switched [draining -> stopping] or [draining/stopping -> stopped]
// in the meantime, we can simply overwrite that since we restarted the stream.
- case STREAM_STATE_STOPPING:
- case STREAM_STATE_STOPPED:
+ case stream_state::stopping:
+ case stream_state::stopped:
continue;
// There is no situation in which the state could have been valid before
@@ -1154,40 +1083,38 @@ aaudio_stream_start(cubeb_stream * stm)
}
if (success) {
- atomic_store(&stm->context->state.waiting, true);
- pthread_cond_signal(&stm->context->state.cond);
+ stm->context->state.waiting.store(true);
+ stm->context->state.cond.notify_one();
}
- pthread_mutex_unlock(&stm->mutex);
return ret;
}
static int
aaudio_stream_stop(cubeb_stream * stm)
{
- pthread_mutex_lock(&stm->mutex);
- enum stream_state state = atomic_load(&stm->state);
+ assert(stm && stm->in_use.load());
+ lock_guard lock(stm->mutex);
+
+ stream_state state = stm->state.load();
int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0;
int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0;
LOGV("stopping stream %p: %d (%d %d)", (void*) stm, state, istate, ostate);
switch (state) {
- case STREAM_STATE_STOPPED:
- case STREAM_STATE_STOPPING:
- case STREAM_STATE_DRAINING:
- pthread_mutex_unlock(&stm->mutex);
+ case stream_state::stopped:
+ case stream_state::stopping:
+ case stream_state::draining:
LOG("cubeb stream %p already stopping/stopped", (void*) stm);
return CUBEB_OK;
- case STREAM_STATE_ERROR:
- case STREAM_STATE_SHUTDOWN:
- pthread_mutex_unlock(&stm->mutex);
+ case stream_state::error:
+ case stream_state::shutdown:
return CUBEB_ERROR;
- case STREAM_STATE_INIT:
- pthread_mutex_unlock(&stm->mutex);
+ case stream_state::init:
assert(false && "Invalid stream");
return CUBEB_ERROR;
- case STREAM_STATE_STARTED:
- case STREAM_STATE_STARTING:
+ case stream_state::started:
+ case stream_state::starting:
break;
}
@@ -1221,8 +1148,7 @@ aaudio_stream_stop(cubeb_stream * stm)
res = WRAP(AAudioStream_requestStop)(stm->ostream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStop (ostream): %s", WRAP(AAudio_convertResultToText)(res));
- atomic_store(&stm->state, STREAM_STATE_ERROR);
- pthread_mutex_unlock(&stm->mutex);
+ stm->state.store(stream_state::error);
return CUBEB_ERROR;
}
}
@@ -1231,34 +1157,33 @@ aaudio_stream_stop(cubeb_stream * stm)
res = WRAP(AAudioStream_requestStop)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestStop (istream): %s", WRAP(AAudio_convertResultToText)(res));
- atomic_store(&stm->state, STREAM_STATE_ERROR);
- pthread_mutex_unlock(&stm->mutex);
+ stm->state.store(stream_state::error);
return CUBEB_ERROR;
}
}
int ret = CUBEB_OK;
bool success;
- while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING))) {
+ while (!(success = atomic_compare_exchange_strong(&stm->state, &state, stream_state::stopping))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
// The stream will be stopped when shut down.
- case STREAM_STATE_ERROR:
+ case stream_state::error:
ret = CUBEB_ERROR;
break;
// If it was switched to draining in the meantime, it was or
// will be stopped soon anyways. We don't interfere with
// the draining process, no matter in which state.
// Not an error
- case STREAM_STATE_DRAINING:
- case STREAM_STATE_STOPPING:
- case STREAM_STATE_STOPPED:
+ case stream_state::draining:
+ case stream_state::stopping:
+ case stream_state::stopped:
break;
// If the state switched from starting to started in the meantime
// we can simply overwrite that since we just stopped it.
- case STREAM_STATE_STARTED:
+ case stream_state::started:
continue;
// There is no situation in which the state could have been valid before
@@ -1274,39 +1199,37 @@ aaudio_stream_stop(cubeb_stream * stm)
}
if (success) {
- atomic_store(&stm->context->state.waiting, true);
- pthread_cond_signal(&stm->context->state.cond);
+ stm->context->state.waiting.store(true);
+ stm->context->state.cond.notify_one();
}
- pthread_mutex_unlock(&stm->mutex);
return ret;
}
static int
aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position)
{
- pthread_mutex_lock(&stm->mutex);
- enum stream_state state = atomic_load(&stm->state);
+ assert(stm && stm->in_use.load());
+ lock_guard lock(stm->mutex);
+
+ stream_state state = stm->state.load();
AAudioStream * stream = stm->ostream ? stm->ostream : stm->istream;
switch (state) {
- case STREAM_STATE_ERROR:
- case STREAM_STATE_SHUTDOWN:
- pthread_mutex_unlock(&stm->mutex);
+ case stream_state::error:
+ case stream_state::shutdown:
return CUBEB_ERROR;
- case STREAM_STATE_STOPPED:
- case STREAM_STATE_DRAINING:
- case STREAM_STATE_STOPPING:
+ case stream_state::draining:
+ case stream_state::stopped:
+ case stream_state::stopping:
// getTimestamp is only valid when the stream is playing.
// Simply return the number of frames passed to aaudio
*position = WRAP(AAudioStream_getFramesRead)(stream);
- pthread_mutex_unlock(&stm->mutex);
return CUBEB_OK;
- case STREAM_STATE_INIT:
+ case stream_state::init:
assert(false && "Invalid stream");
- pthread_mutex_unlock(&stm->mutex);
return CUBEB_ERROR;
- case STREAM_STATE_STARTED:
- case STREAM_STATE_STARTING:
+ case stream_state::started:
+ case stream_state::starting:
break;
}
@@ -1319,18 +1242,15 @@ aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position)
// has internally started and gives us a valid timestamp.
// If that is not the case (invalid_state is returned) we simply
// fall back to the method we use for non-playing streams.
- if (res == AAUDIO_ERROR_INVALID_STATE && state == STREAM_STATE_STARTING) {
+ if (res == AAUDIO_ERROR_INVALID_STATE && state == stream_state::starting) {
*position = WRAP(AAudioStream_getFramesRead)(stream);
- pthread_mutex_unlock(&stm->mutex);
return CUBEB_OK;
}
LOG("AAudioStream_getTimestamp: %s", WRAP(AAudio_convertResultToText)(res));
- pthread_mutex_unlock(&stm->mutex);
return CUBEB_ERROR;
}
- pthread_mutex_unlock(&stm->mutex);
*position = pos;
return CUBEB_OK;
}
@@ -1338,36 +1258,85 @@ aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position)
static int
aaudio_stream_set_volume(cubeb_stream * stm, float volume)
{
- assert(stm && stm->ostream);
- atomic_store(&stm->volume, volume);
+ assert(stm && stm->in_use.load() && stm->ostream);
+ stm->volume.store(volume);
return CUBEB_OK;
}
-static const struct cubeb_ops aaudio_ops = {
- .init = aaudio_init,
- .get_backend_id = aaudio_get_backend_id,
- .get_max_channel_count = aaudio_get_max_channel_count,
+extern "C" int aaudio_init(cubeb ** context, char const * context_name);
+
+const static struct cubeb_ops aaudio_ops = {
+ /*.init =*/ aaudio_init,
+ /*.get_backend_id =*/ aaudio_get_backend_id,
+ /*.get_max_channel_count =*/ aaudio_get_max_channel_count,
// NOTE: i guess we could support min_latency and preferred sample
// rate via guessing, i.e. creating a dummy stream and check
// its settings.
- .get_min_latency = NULL,
- .get_preferred_sample_rate = NULL,
- .enumerate_devices = NULL,
- .device_collection_destroy = NULL,
- .destroy = aaudio_destroy,
- .stream_init = aaudio_stream_init,
- .stream_destroy = aaudio_stream_destroy,
- .stream_start = aaudio_stream_start,
- .stream_stop = aaudio_stream_stop,
- .stream_reset_default_device = NULL,
- .stream_get_position = aaudio_stream_get_position,
+ /*.get_min_latency =*/ NULL,
+ /*.get_preferred_sample_rate =*/ NULL,
+ /*.enumerate_devices =*/ NULL,
+ /*.device_collection_destroy =*/ NULL,
+ /*.destroy =*/ aaudio_destroy,
+ /*.stream_init =*/ aaudio_stream_init,
+ /*.stream_destroy =*/ aaudio_stream_destroy,
+ /*.stream_start =*/ aaudio_stream_start,
+ /*.stream_stop =*/ aaudio_stream_stop,
+ /*.stream_reset_default_device =*/ NULL,
+ /*.stream_get_position =*/ aaudio_stream_get_position,
// NOTE: this could be implemented via means comparable to the
// OpenSLES backend
- .stream_get_latency = NULL,
- .stream_set_volume = aaudio_stream_set_volume,
- .stream_get_current_device = NULL,
- .stream_device_destroy = NULL,
- .stream_register_device_changed_callback = NULL,
- .register_device_collection_changed = NULL
+ /*.stream_get_latency =*/ NULL,
+ /*.stream_set_volume =*/ aaudio_stream_set_volume,
+ /*.stream_get_current_device =*/ NULL,
+ /*.stream_device_destroy =*/ NULL,
+ /*.stream_register_device_changed_callback =*/ NULL,
+ /*.register_device_collection_changed =*/ NULL
};
+
+extern "C" /*static*/ int
+aaudio_init(cubeb ** context, char const * context_name) {
+ (void) context_name;
+
+ // load api
+ void * libaaudio = NULL;
+#ifndef DISABLE_LIBAAUDIO_DLOPEN
+ libaaudio = dlopen("libaaudio.so", RTLD_NOW);
+ if (!libaaudio) {
+ return CUBEB_ERROR;
+ }
+
+#define LOAD(x) { \
+ WRAP(x) = (decltype(WRAP(x))) (dlsym(libaaudio, #x)); \
+ if (!WRAP(x)) { \
+ LOG("AAudio: Failed to load %s", #x); \
+ dlclose(libaaudio); \
+ return CUBEB_ERROR; \
+ } \
+ }
+
+ LIBAAUDIO_API_VISIT(LOAD);
+#undef LOAD
+#endif
+
+ cubeb * ctx = new cubeb;
+ ctx->ops = &aaudio_ops;
+ ctx->libaaudio = libaaudio;
+
+ try {
+ ctx->state.thread = std::thread(state_thread, ctx);
+
+ // NOTE: using platform-specific APIs we could set the priority of the
+ // notifier thread lower than the priority of the state thread.
+ // This way, it's more likely that the state thread will be woken up
+ // by the condition variable signal when both are currently waiting
+ ctx->state.notifier = std::thread(notifier_thread, ctx);
+ } catch(const std::exception& err) {
+ LOG("Failed to create thread: %s", err.what());
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
+
+ *context = ctx;
+ return CUBEB_OK;
+}