diff options
author | nyorain <[email protected]> | 2020-10-10 22:51:46 +0200 |
---|---|---|
committer | Paul Adenot <[email protected]> | 2020-10-28 16:55:14 +0000 |
commit | 9183f2e654e6e58e534b87490eb646236ecb8df7 (patch) | |
tree | 942270aa83e6fcc0378a2c0b4a1e8f0e1edf3adc | |
parent | 9905226379329bf95ce6b761eceae46dfe429d9a (diff) | |
download | cubeb-9183f2e654e6e58e534b87490eb646236ecb8df7.tar.gz cubeb-9183f2e654e6e58e534b87490eb646236ecb8df7.zip |
Move aaudio backend to C++ & enable low latency
-rw-r--r-- | CMakeLists.txt | 14 | ||||
-rw-r--r-- | src/cubeb_aaudio.cpp (renamed from src/cubeb_aaudio.c) | 847 |
2 files changed, 421 insertions, 440 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index a64b9e1..f8813c2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -214,8 +214,20 @@ endif() check_include_files(aaudio/AAudio.h USE_AAUDIO) if(USE_AAUDIO) target_sources(cubeb PRIVATE - src/cubeb_aaudio.c) + src/cubeb_aaudio.cpp) target_compile_definitions(cubeb PRIVATE USE_AAUDIO) + + # set this definition to enable low latency mode. Possibly bad for battery + target_compile_definitions(cubeb PRIVATE CUBEB_AAUDIO_LOW_LATENCY) + + # set this definition to enable power saving mode. Possibly resulting + # in high latency + # target_compile_definitions(cubeb PRIVATE CUBEB_AAUDIO_LOW_POWER_SAVING) + + # set this mode to make the backend use an exclusive stream. + # will decrease latency. + # target_compile_definitions(cubeb PRIVATE CUBEB_AAUDIO_EXCLUSIVE_STREAM) + target_link_libraries(cubeb PRIVATE ${CMAKE_DL_LIBS}) endif() diff --git a/src/cubeb_aaudio.c b/src/cubeb_aaudio.cpp index 513af5e..3246d55 100644 --- a/src/cubeb_aaudio.c +++ b/src/cubeb_aaudio.cpp @@ -4,19 +4,23 @@ * This program is made available under an ISC-style license. See the * accompanying file LICENSE for details. */ -#include <assert.h> +#include <cassert> +#include <mutex> +#include <condition_variable> +#include <thread> +#include <cstring> +#include <atomic> +#include <cstring> +#include <chrono> +#include <memory> #include <dlfcn.h> -#include <pthread.h> -#include <errno.h> #include <time.h> -#include <stdatomic.h> #include <aaudio/AAudio.h> #include "cubeb/cubeb.h" #include "cubeb-internal.h" #include "cubeb_resampler.h" #include "cubeb_log.h" - #ifdef DISABLE_LIBAAUDIO_DLOPEN #define WRAP(x) x #else @@ -77,67 +81,69 @@ // X(AAudioStream_getSessionId) \ // -#define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x; +#define MAKE_TYPEDEF(x) static decltype(x) * cubeb_##x; LIBAAUDIO_API_VISIT(MAKE_TYPEDEF) #undef MAKE_TYPEDEF #endif #define MAX_STREAMS 16 -static const struct cubeb_ops aaudio_ops; - -enum stream_state { - STREAM_STATE_INIT = 0, - STREAM_STATE_STOPPED, - STREAM_STATE_STOPPING, - STREAM_STATE_STARTED, - STREAM_STATE_STARTING, - STREAM_STATE_DRAINING, - STREAM_STATE_ERROR, - STREAM_STATE_SHUTDOWN, + +using unique_lock = std::unique_lock<std::mutex>; +using lock_guard = std::lock_guard<std::mutex>; + +enum class stream_state { + init = 0, + stopped, + stopping, + started, + starting, + draining, + error, + shutdown, }; struct cubeb_stream { /* Note: Must match cubeb_stream layout in cubeb.c. */ - cubeb * context; - void * user_ptr; + cubeb * context {}; + void * user_ptr {}; /**/ - _Atomic bool in_use; - _Atomic enum stream_state state; + std::atomic<bool> in_use {false}; + std::atomic<stream_state> state {stream_state::init}; - AAudioStream * ostream; - AAudioStream * istream; - cubeb_data_callback data_callback; - cubeb_state_callback state_callback; - cubeb_resampler * resampler; + AAudioStream * ostream {}; + AAudioStream * istream {}; + cubeb_data_callback data_callback {}; + cubeb_state_callback state_callback {}; + cubeb_resampler * resampler {}; // mutex synchronizes access to the stream from the state thread // and user-called functions. Everything that is accessed in the // aaudio data (or error) callback is synchronized only via atomics. - pthread_mutex_t mutex; + std::mutex mutex; - void * in_buf; - unsigned in_frame_size; + std::unique_ptr<char[]> in_buf; + unsigned in_frame_size {}; // size of one input frame - cubeb_sample_format out_format; - _Atomic float volume; - unsigned out_channels; - unsigned out_frame_size; + cubeb_sample_format out_format {}; + std::atomic<float> volume {1.f}; + unsigned out_channels {}; + unsigned out_frame_size {}; }; struct cubeb { - struct cubeb_ops const * ops; - void * libaaudio; + struct cubeb_ops const * ops {}; + void * libaaudio {}; struct { // The state thread: it waits for state changes and stops // drained streams. - pthread_t thread; - pthread_t notifier; - pthread_mutex_t mutex; - pthread_cond_t cond; - _Atomic bool join; - _Atomic bool waiting; + std::thread thread; + std::thread notifier; + std::mutex mutex; + std::condition_variable cond; + std::atomic<bool> join {false}; + std::atomic<bool> waiting {false}; } state; // streams[i].in_use signals whether a stream is used @@ -155,53 +161,48 @@ static void shutdown(cubeb_stream * stm) } stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); - atomic_store(&stm->state, STREAM_STATE_SHUTDOWN); + stm->state.store(stream_state::shutdown); } // Returns whether the given state is one in which we wait for // an asynchronous change -static bool waiting_state(enum stream_state state) +static bool waiting_state(stream_state state) { switch (state) { - case STREAM_STATE_DRAINING: - case STREAM_STATE_STARTING: - case STREAM_STATE_STOPPING: + case stream_state::draining: + case stream_state::starting: + case stream_state::stopping: return true; default: return false; } } -// Returns whether this stream is still waiting for a state change static void update_state(cubeb_stream * stm) { - // fast path for streams that don't wait for state change or are invalid - enum stream_state old_state = atomic_load(&stm->state); - if (old_state == STREAM_STATE_INIT || - old_state == STREAM_STATE_STARTED || - old_state == STREAM_STATE_STOPPED || - old_state == STREAM_STATE_SHUTDOWN) { + // Fast path for streams that don't wait for state change or are invalid + enum stream_state old_state = stm->state.load(); + if (old_state == stream_state::init || + old_state == stream_state::started || + old_state == stream_state::stopped || + old_state == stream_state::shutdown) { return; } // If the main thread currently operates on this thread, we don't // have to wait for it - int err = pthread_mutex_trylock(&stm->mutex); - if (err != 0) { - if (err != EBUSY) { - LOG("pthread_mutex_trylock: %s", strerror(err)); - } + unique_lock lock(stm->mutex, std::try_to_lock); + if (!lock.owns_lock()) { return; } // check again: if this is true now, the stream was destroyed or // changed between our fast path check and locking the mutex - old_state = atomic_load(&stm->state); - if (old_state == STREAM_STATE_INIT || - old_state == STREAM_STATE_STARTED || - old_state == STREAM_STATE_STOPPED || - old_state == STREAM_STATE_SHUTDOWN) { - pthread_mutex_unlock(&stm->mutex); + old_state = stm->state.load(); + if (old_state == stream_state::init || + old_state == stream_state::started || + old_state == stream_state::stopped || + old_state == stream_state::shutdown) { return; } @@ -211,14 +212,12 @@ static void update_state(cubeb_stream * stm) // such as a draining or error state. enum stream_state new_state; do { - if (old_state == STREAM_STATE_SHUTDOWN) { - pthread_mutex_unlock(&stm->mutex); + if (old_state == stream_state::shutdown) { return; } - if (old_state == STREAM_STATE_ERROR) { + if (old_state == stream_state::error) { shutdown(stm); - pthread_mutex_unlock(&stm->mutex); return; } @@ -227,12 +226,15 @@ static void update_state(cubeb_stream * stm) aaudio_stream_state_t istate = 0; aaudio_stream_state_t ostate = 0; + // We use waitForStateChange (with zero timeout) instead of just + // getState since only the former internally updates the state. + // See the docs of aaudio getState/waitForStateChange for details, + // why we are passing STATE_UNKNOWN. aaudio_result_t res; if (stm->istream) { res = WRAP(AAudioStream_waitForStateChange)(stm->istream, AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0); if (res != AAUDIO_OK) { - pthread_mutex_unlock(&stm->mutex); LOG("AAudioStream_waitForStateChanged: %s", WRAP(AAudio_convertResultToText)(res)); return; } @@ -243,7 +245,6 @@ static void update_state(cubeb_stream * stm) res = WRAP(AAudioStream_waitForStateChange)(stm->ostream, AAUDIO_STREAM_STATE_UNKNOWN, &ostate, 0); if (res != AAUDIO_OK) { - pthread_mutex_unlock(&stm->mutex); LOG("AAudioStream_waitForStateChanged: %s", WRAP(AAudio_convertResultToText)(res)); return; } @@ -258,9 +259,8 @@ static void update_state(cubeb_stream * stm) istate == AAUDIO_STREAM_STATE_UNKNOWN || istate == AAUDIO_STREAM_STATE_DISCONNECTED) { const char * name = WRAP(AAudio_convertStreamStateToText)(istate); - LOG("Invalid android input stream state %s", name); + LOG("Unexpected android input stream state %s", name); shutdown(stm); - pthread_mutex_unlock(&stm->mutex); return; } @@ -271,21 +271,20 @@ static void update_state(cubeb_stream * stm) ostate == AAUDIO_STREAM_STATE_UNKNOWN || ostate == AAUDIO_STREAM_STATE_DISCONNECTED) { const char * name = WRAP(AAudio_convertStreamStateToText)(istate); - LOG("Invalid android output stream state %s", name); + LOG("Unexpected android output stream state %s", name); shutdown(stm); - pthread_mutex_unlock(&stm->mutex); return; } switch (old_state) { - case STREAM_STATE_STARTING: + case stream_state::starting: if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) && (!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) { stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED); - new_state = STREAM_STATE_STARTED; + new_state = stream_state::started; } break; - case STREAM_STATE_DRAINING: + case stream_state::draining: // The DRAINING state means that we want to stop the streams but // may not have done so yet. // The aaudio docs state that returning STOP from the callback isn't @@ -315,17 +314,18 @@ static void update_state(cubeb_stream * stm) } // we always wait until both streams are stopped until we - // send STATE_DRAINED. Then we can directly transition - // our logical state to STREAM_STATE_STOPPED, not triggering - // an additional STATE_STOPPED callback + // send CUBEB_STATE_DRAINED. Then we can directly transition + // our logical state to stream_state::stopped, not triggering + // an additional CUBEB_STATE_STOPPED callback (which might + // be unexpected for the user). if ((!ostate || ostate == AAUDIO_STREAM_STATE_STOPPED) && (!istate || istate == AAUDIO_STREAM_STATE_STOPPED)) { - new_state = STREAM_STATE_STOPPED; + new_state = stream_state::stopped; stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED); } break; - case STREAM_STATE_STOPPING: + case stream_state::stopping: assert(!istate || istate == AAUDIO_STREAM_STATE_STOPPING || istate == AAUDIO_STREAM_STATE_STOPPED); @@ -335,46 +335,52 @@ static void update_state(cubeb_stream * stm) if ((!istate || istate == AAUDIO_STREAM_STATE_STOPPED) && (!ostate || ostate == AAUDIO_STREAM_STATE_STOPPED)) { stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STOPPED); - new_state = STREAM_STATE_STOPPED; + new_state = stream_state::stopped; } break; default: assert(false && "Unreachable: invalid state"); } - } while (old_state != new_state && - !atomic_compare_exchange_strong(&stm->state, &old_state, new_state)); - - pthread_mutex_unlock(&stm->mutex); + } while (old_state != new_state && !stm->state.compare_exchange_strong(old_state, new_state)); } -static void * notifier_thread(void * user_ptr) +// See https://nyorain.github.io/lock-free-wakeup.html for a note +// why this is needed. The audio thread notifies the state thread about +// state changes and must not block. The state thread on the other hand should +// sleep until there is work to be done. So we need a lockfree producer +// and blocking producer. This can only be achieved safely with a new thread +// that only serves as notifier backup (in case the notification happens +// right between the state thread checking and going to sleep in which case +// this thread will kick in and signal it right again). +static void notifier_thread(cubeb* ctx) { - cubeb * ctx = (cubeb*) user_ptr; - pthread_mutex_lock(&ctx->state.mutex); - while (!atomic_load(&ctx->state.join)) { - pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex); - if (atomic_load(&ctx->state.waiting)) { - pthread_cond_signal(&ctx->state.cond); + unique_lock lock(ctx->state.mutex); + + while (!ctx->state.join.load()) { + ctx->state.cond.wait(lock); + if (ctx->state.waiting.load()) { + // This must signal our state thread since there is no other + // thread currently waiting on the condition variable. + // The state change thread is guaranteed to be waiting since + // we hold the mutex it locks when awake. + ctx->state.cond.notify_one(); } } // make sure other thread joins as well - pthread_cond_signal(&ctx->state.cond); - pthread_mutex_unlock(&ctx->state.mutex); + ctx->state.cond.notify_one(); LOG("Exiting notifier thread"); - return NULL; } -static void * state_thread(void * user_ptr) +static void state_thread(cubeb* ctx) { - cubeb * ctx = (cubeb*) user_ptr; - pthread_mutex_lock(&ctx->state.mutex); + unique_lock lock(ctx->state.mutex); bool waiting = false; - while (!atomic_load(&ctx->state.join)) { - waiting |= atomic_load(&ctx->state.waiting); + while (!ctx->state.join.load()) { + waiting |= ctx->state.waiting.load(); if (waiting) { - atomic_store(&ctx->state.waiting, false); + ctx->state.waiting.store(false); waiting = false; for (unsigned i = 0u; i < MAX_STREAMS; ++i) { cubeb_stream * stm = &ctx->streams[i]; @@ -383,7 +389,7 @@ static void * state_thread(void * user_ptr) } // state changed from another thread, update again immediately - if (atomic_load(&ctx->state.waiting)) { + if (ctx->state.waiting.load()) { waiting = true; continue; } @@ -397,31 +403,20 @@ static void * state_thread(void * user_ptr) // while any stream is waiting for state change we sleep with regular // timeouts. But we wake up immediately if signaled. // This might seem like a poor man's implementation of state change - // waiting but (as of december 2019), the implementation of - // AAudioStream_waitForStateChange is pretty much the same: - // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp#277 - struct timespec timeout; - - // pthread_condattr_setclock isn't available before android api v21. - // pthread condition variables use the realtime clock by default, which - // means that system time changes can interfer with wakeup timeouts. -#if __ANDROID_API__ >= 21 - clock_gettime(CLOCK_MONOTONIC, &timeout); -#else - clock_gettime(CLOCK_REALTIME, &timeout); -#endif - timeout.tv_nsec += 5 * 1000 * 1000; // wait 5ms - pthread_cond_timedwait(&ctx->state.cond, &ctx->state.mutex, &timeout); + // waiting but (as of october 2020), the implementation of + // AAudioStream_waitForStateChange is just sleeping with regular + // timeouts as well: + // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp + auto dur = std::chrono::milliseconds(5); + ctx->state.cond.wait_for(lock, dur); } else { - pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex); + ctx->state.cond.wait(lock); } } // make sure other thread joins as well - pthread_cond_signal(&ctx->state.cond); - pthread_mutex_unlock(&ctx->state.mutex); + ctx->state.cond.notify_one(); LOG("Exiting state thread"); - return NULL; } static char const * @@ -443,127 +438,41 @@ aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels) static void aaudio_destroy(cubeb * ctx) { + assert(ctx); + #ifndef NDEBUG // make sure all streams were destroyed for (unsigned i = 0u; i < MAX_STREAMS; ++i) { - assert(!atomic_load(&ctx->streams[i].in_use)); + assert(!ctx->streams[i].in_use.load()); } #endif // broadcast joining to both threads // they will additionally signal each other before joining - atomic_store(&ctx->state.join, true); - pthread_cond_broadcast(&ctx->state.cond); - - int err; - err = pthread_join(ctx->state.thread, NULL); - if (err != 0) { - LOG("pthread_join: %s", strerror(err)); - } + ctx->state.join.store(true); + ctx->state.cond.notify_all(); - err = pthread_join(ctx->state.notifier, NULL); - if (err != 0) { - LOG("pthread_join: %s", strerror(err)); - } - - pthread_cond_destroy(&ctx->state.cond); - pthread_mutex_destroy(&ctx->state.mutex); - for (unsigned i = 0u; i < MAX_STREAMS; ++i) { - pthread_mutex_destroy(&ctx->streams[i].mutex); + try { + if(ctx->state.thread.joinable()) { + ctx->state.thread.join(); + } + if(ctx->state.notifier.joinable()) { + ctx->state.notifier.join(); + } + } catch(const std::system_error& err) { + LOG("Joining thread failed: %s", err.what()); } if (ctx->libaaudio) { dlclose(ctx->libaaudio); } - free(ctx); -} - -/*static*/ int -aaudio_init(cubeb ** context, char const * context_name) { - (void) context_name; - int err; - - // load api - void * libaaudio = NULL; -#ifndef DISABLE_LIBAAUDIO_DLOPEN - libaaudio = dlopen("libaaudio.so", RTLD_NOW); - if (!libaaudio) { - return CUBEB_ERROR; - } - -#define LOAD(x) { \ - WRAP(x) = (typeof(WRAP(x))) (dlsym(libaaudio, #x)); \ - if (!WRAP(x)) { \ - LOG("AAudio: Failed to load %s", #x); \ - dlclose(libaaudio); \ - return CUBEB_ERROR; \ - } \ - } - - LIBAAUDIO_API_VISIT(LOAD); -#undef LOAD -#endif - - cubeb * ctx = (cubeb*) calloc(1, sizeof(*ctx)); - ctx->ops = &aaudio_ops; - ctx->libaaudio = libaaudio; - atomic_init(&ctx->state.join, false); - atomic_init(&ctx->state.waiting, false); - pthread_mutex_init(&ctx->state.mutex, NULL); - - pthread_condattr_t cond_attr; - pthread_condattr_init(&cond_attr); - - // pthread_condttr_setclock isn't available before android api v21. - // pthread condition variables use the realtime clock by default, which - // means that system time changes can interfer with wakeup timeouts. -#if __ANDROID_API__ >= 21 - pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); -#endif - - err = pthread_cond_init(&ctx->state.cond, &cond_attr); - pthread_condattr_destroy(&cond_attr); - if (err) { - LOG("pthread_cond_init: %s", strerror(err)); - aaudio_destroy(ctx); - return CUBEB_ERROR; - } - - // The stream mutexes are not bound to the lifetimes of the - // streams since we need them to synchronize the streams with - // the state thread. - for (unsigned i = 0u; i < MAX_STREAMS; ++i) { - atomic_init(&ctx->streams[i].in_use, false); - atomic_init(&ctx->streams[i].state, STREAM_STATE_INIT); - pthread_mutex_init(&ctx->streams[i].mutex, NULL); - } - - err = pthread_create(&ctx->state.thread, NULL, state_thread, ctx); - if (err != 0) { - LOG("pthread_create: %s", strerror(err)); - aaudio_destroy(ctx); - return CUBEB_ERROR; - } - - // TODO: we could set the priority of the notifier thread lower than - // the priority of the state thread. This way, it's more likely - // that the state thread will be woken up by the condition variable signal - // when both are currently waiting - err = pthread_create(&ctx->state.notifier, NULL, notifier_thread, ctx); - if (err != 0) { - LOG("pthread_create: %s", strerror(err)); - aaudio_destroy(ctx); - return CUBEB_ERROR; - } - - *context = ctx; - return CUBEB_OK; + delete ctx; } static void apply_volume(cubeb_stream * stm, void * audio_data, uint32_t num_frames) { + float volume = stm->volume.load(); // optimization: we don't have to change anything in this case - float volume = atomic_load(&stm->volume); if (volume == 1.f) { return; } @@ -571,12 +480,12 @@ apply_volume(cubeb_stream * stm, void * audio_data, uint32_t num_frames) { switch (stm->out_format) { case CUBEB_SAMPLE_S16NE: for (uint32_t i = 0u; i < num_frames * stm->out_channels; ++i) { - ((int16_t*)audio_data)[i] *= volume; + (static_cast<int16_t*>(audio_data))[i] *= volume; } break; case CUBEB_SAMPLE_FLOAT32NE: for (uint32_t i = 0u; i < num_frames * stm->out_channels; ++i) { - ((float*)audio_data)[i] *= volume; + (static_cast<float*>(audio_data))[i] *= volume; } break; default: @@ -597,7 +506,7 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data, assert(stm->istream); assert(num_frames >= 0); - enum stream_state state = atomic_load(&stm->state); + stream_state state = atomic_load(&stm->state); // int istate = WRAP(AAudioStream_getState)(stm->istream); // int ostate = WRAP(AAudioStream_getState)(stm->ostream); // ALOGV("aaudio duplex data cb on stream %p: state %ld (in: %d, out: %d), num_frames: %ld", @@ -605,19 +514,19 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data, // all other states may happen since the callback might be called // from within requestStart - assert(state != STREAM_STATE_SHUTDOWN); + assert(state != stream_state::shutdown); // This might happen when we started draining but not yet actually // stopped the stream from the state thread. - if (state == STREAM_STATE_DRAINING) { - memset(audio_data, 0x0, num_frames * stm->out_frame_size); + if (state == stream_state::draining) { + std::memset(audio_data, 0x0, num_frames * stm->out_frame_size); return AAUDIO_CALLBACK_RESULT_CONTINUE; } long in_num_frames = WRAP(AAudioStream_read)(stm->istream, - stm->in_buf, num_frames, 0); + stm->in_buf.get(), num_frames, 0); if (in_num_frames < 0) { // error - atomic_store(&stm->state, STREAM_STATE_ERROR); + stm->state.store(stream_state::error); LOG("AAudioStream_read: %s", WRAP(AAudio_convertResultToText)(in_num_frames)); return AAUDIO_CALLBACK_RESULT_STOP; } @@ -632,25 +541,25 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data, // ALOGV("AAudioStream_read returned not enough frames: %ld instead of %d", // in_num_frames, num_frames); unsigned left = num_frames - in_num_frames; - char * buf = ((char*) stm->in_buf) + in_num_frames * stm->in_frame_size; - memset(buf, 0x0, left * stm->in_frame_size); + char * buf = stm->in_buf.get() + in_num_frames * stm->in_frame_size; + std::memset(buf, 0x0, left * stm->in_frame_size); in_num_frames = num_frames; } - long done_frames = cubeb_resampler_fill(stm->resampler, stm->in_buf, + long done_frames = cubeb_resampler_fill(stm->resampler, stm->in_buf.get(), &in_num_frames, audio_data, num_frames); if (done_frames < 0 || done_frames > num_frames) { LOG("Error in data callback or resampler: %ld", done_frames); - atomic_store(&stm->state, STREAM_STATE_ERROR); + stm->state.store(stream_state::error); return AAUDIO_CALLBACK_RESULT_STOP; } else if (done_frames < num_frames) { - atomic_store(&stm->state, STREAM_STATE_DRAINING); - atomic_store(&stm->context->state.waiting, true); - pthread_cond_signal(&stm->context->state.cond); + stm->state.store(stream_state::draining); + stm->context->state.waiting.store(true); + stm->context->state.cond.notify_one(); - char * begin = ((char*)audio_data) + done_frames * stm->out_frame_size; - memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size); + char * begin = static_cast<char*>(audio_data) + done_frames * stm->out_frame_size; + std::memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size); } apply_volume(stm, audio_data, done_frames); @@ -666,19 +575,19 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data, assert(!stm->istream); assert(num_frames >= 0); - enum stream_state state = atomic_load(&stm->state); + stream_state state = stm->state.load(); // int ostate = WRAP(AAudioStream_getState)(stm->ostream); // ALOGV("aaudio output data cb on stream %p: state %ld (%d), num_frames: %ld", // (void*) stm, state, ostate, num_frames); // all other states may happen since the callback might be called // from within requestStart - assert(state != STREAM_STATE_SHUTDOWN); + assert(state != stream_state::shutdown); // This might happen when we started draining but not yet actually // stopped the stream from the state thread. - if (state == STREAM_STATE_DRAINING) { - memset(audio_data, 0x0, num_frames * stm->out_frame_size); + if (state == stream_state::draining) { + std::memset(audio_data, 0x0, num_frames * stm->out_frame_size); return AAUDIO_CALLBACK_RESULT_CONTINUE; } @@ -686,15 +595,15 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data, audio_data, num_frames); if (done_frames < 0 || done_frames > num_frames) { LOG("Error in data callback or resampler: %ld", done_frames); - atomic_store(&stm->state, STREAM_STATE_ERROR); + stm->state.store(stream_state::error); return AAUDIO_CALLBACK_RESULT_STOP; } else if (done_frames < num_frames) { - atomic_store(&stm->state, STREAM_STATE_DRAINING); - atomic_store(&stm->context->state.waiting, true); - pthread_cond_signal(&stm->context->state.cond); + stm->state.store(stream_state::draining); + stm->context->state.waiting.store(true); + stm->context->state.cond.notify_one(); - char * begin = ((char*)audio_data) + done_frames * stm->out_frame_size; - memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size); + char * begin = static_cast<char*>(audio_data) + done_frames * stm->out_frame_size; + std::memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size); } apply_volume(stm, audio_data, done_frames); @@ -710,18 +619,18 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data, assert(!stm->ostream); assert(num_frames >= 0); - enum stream_state state = atomic_load(&stm->state); + stream_state state = stm->state.load(); // int istate = WRAP(AAudioStream_getState)(stm->istream); // ALOGV("aaudio input data cb on stream %p: state %ld (%d), num_frames: %ld", // (void*) stm, state, istate, num_frames); // all other states may happen since the callback might be called // from within requestStart - assert(state != STREAM_STATE_SHUTDOWN); + assert(state != stream_state::shutdown); // This might happen when we started draining but not yet actually // stopped the stream from the state thread. - if (state == STREAM_STATE_DRAINING) { + if (state == stream_state::draining) { return AAUDIO_CALLBACK_RESULT_CONTINUE; } @@ -730,15 +639,15 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data, audio_data, &input_frame_count, NULL, 0); if (done_frames < 0 || done_frames > num_frames) { LOG("Error in data callback or resampler: %ld", done_frames); - atomic_store(&stm->state, STREAM_STATE_ERROR); + stm->state.store(stream_state::error); return AAUDIO_CALLBACK_RESULT_STOP; } else if (done_frames < input_frame_count) { // we don't really drain an input stream, just have to // stop it from the state thread. That is signaled via the // DRAINING state. - atomic_store(&stm->state, STREAM_STATE_DRAINING); - atomic_store(&stm->context->state.waiting, true); - pthread_cond_signal(&stm->context->state.cond); + stm->state.store(stream_state::draining); + stm->context->state.waiting.store(true); + stm->context->state.cond.notify_one(); } return AAUDIO_CALLBACK_RESULT_CONTINUE; @@ -747,10 +656,10 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data, static void aaudio_error_cb(AAudioStream * astream, void * user_data, aaudio_result_t error) { - cubeb_stream * stm = (cubeb_stream*) user_data; + cubeb_stream * stm = static_cast<cubeb_stream*>(user_data); assert(stm->ostream == astream || stm->istream == astream); LOG("AAudio error callback: %s", WRAP(AAudio_convertResultToText)(error)); - atomic_store(&stm->state, STREAM_STATE_ERROR); + stm->state.store(stream_state::error); } static int @@ -803,13 +712,13 @@ realize_stream(AAudioStreamBuilder * sb, const cubeb_stream_params * params, static void aaudio_stream_destroy(cubeb_stream * stm) { - pthread_mutex_lock(&stm->mutex); - assert(stm->state == STREAM_STATE_STOPPED || - stm->state == STREAM_STATE_STOPPING || - stm->state == STREAM_STATE_INIT || - stm->state == STREAM_STATE_DRAINING || - stm->state == STREAM_STATE_ERROR || - stm->state == STREAM_STATE_SHUTDOWN); + lock_guard lock(stm->mutex); + assert(stm->state == stream_state::stopped || + stm->state == stream_state::stopping || + stm->state == stream_state::init || + stm->state == stream_state::draining || + stm->state == stream_state::error || + stm->state == stream_state::shutdown); aaudio_result_t res; @@ -817,9 +726,9 @@ aaudio_stream_destroy(cubeb_stream * stm) // That is important as we otherwise might read from a closed istream // for a duplex stream. if (stm->ostream) { - if (stm->state != STREAM_STATE_STOPPED && - stm->state != STREAM_STATE_STOPPING && - stm->state != STREAM_STATE_SHUTDOWN) { + if (stm->state != stream_state::stopped && + stm->state != stream_state::stopping && + stm->state != stream_state::shutdown) { res = WRAP(AAudioStream_requestStop)(stm->ostream); if (res != AAUDIO_OK) { LOG("AAudioStreamBuilder_requestStop: %s", WRAP(AAudio_convertResultToText)(res)); @@ -831,9 +740,9 @@ aaudio_stream_destroy(cubeb_stream * stm) } if (stm->istream) { - if (stm->state != STREAM_STATE_STOPPED && - stm->state != STREAM_STATE_STOPPING && - stm->state != STREAM_STATE_SHUTDOWN) { + if (stm->state != stream_state::stopped && + stm->state != stream_state::stopping && + stm->state != stream_state::shutdown) { res = WRAP(AAudioStream_requestStop)(stm->istream); if (res != AAUDIO_OK) { LOG("AAudioStreamBuilder_requestStop: %s", WRAP(AAudio_convertResultToText)(res)); @@ -847,78 +756,40 @@ aaudio_stream_destroy(cubeb_stream * stm) if (stm->resampler) { cubeb_resampler_destroy(stm->resampler); } - if (stm->in_buf) { - free(stm->in_buf); - } - atomic_store(&stm->state, STREAM_STATE_INIT); - pthread_mutex_unlock(&stm->mutex); - atomic_store(&stm->in_use, false); + stm->state.store(stream_state::init); + stm->in_use.store(false); } static int -aaudio_stream_init(cubeb * ctx, - cubeb_stream ** stream, +aaudio_stream_init_impl( + cubeb_stream * stm, char const * stream_name, cubeb_devid input_device, cubeb_stream_params * input_stream_params, cubeb_devid output_device, cubeb_stream_params * output_stream_params, - unsigned int latency_frames, - cubeb_data_callback data_callback, - cubeb_state_callback state_callback, - void * user_ptr) + unsigned int latency_frames) { - assert(!input_device); - assert(!output_device); - - (void) stream_name; + assert(stm->state.load() == stream_state::init); + stm->in_use.store(true); aaudio_result_t res; - AAudioStreamBuilder * sb; + AAudioStreamBuilder* sb; res = WRAP(AAudio_createStreamBuilder)(&sb); if (res != AAUDIO_OK) { LOG("AAudio_createStreamBuilder: %s", WRAP(AAudio_convertResultToText)(res)); return CUBEB_ERROR; } - // atomically find a free stream. - cubeb_stream * stm = NULL; - for (unsigned i = 0u; i < MAX_STREAMS; ++i) { - // This check is only an optimization, we don't strictly need it - // since we check again after locking the mutex. - if (atomic_load(&ctx->streams[i].in_use)) { - continue; - } - - // if this fails with EBUSY, another thread initialized this stream - // between our check of in_use and this. - int err = pthread_mutex_trylock(&ctx->streams[i].mutex); - if (err != 0 || atomic_load(&ctx->streams[i].in_use)) { - if (err && err != EBUSY) { - LOG("pthread_mutex_trylock: %s", strerror(err)); - } - - continue; + // make sure the builder is always destroyed + struct StreamBuilderDestructor { + void operator()(AAudioStreamBuilder* sb) { + WRAP(AAudioStreamBuilder_delete)(sb); } + }; - stm = &ctx->streams[i]; - break; - } - - if (!stm) { - LOG("Error: maximum number of streams reached"); - return CUBEB_ERROR; - } - - assert(atomic_load(&stm->state) == STREAM_STATE_INIT); - atomic_store(&stm->in_use, true); - - unsigned res_err = CUBEB_ERROR; - stm->user_ptr = user_ptr; - stm->data_callback = data_callback; - stm->state_callback = state_callback; - stm->context = ctx; + std::unique_ptr<AAudioStreamBuilder, StreamBuilderDestructor> sbPtr(sb); WRAP(AAudioStreamBuilder_setErrorCallback)(sb, aaudio_error_cb, stm); WRAP(AAudioStreamBuilder_setBufferCapacityInFrames)(sb, latency_frames); @@ -934,16 +805,16 @@ aaudio_stream_init(cubeb * ctx, out_data_callback = aaudio_output_data_cb; } else { LOG("Tried to open stream without input or output parameters"); - goto error; + return CUBEB_ERROR; } -#ifdef AAUDIO_EXCLUSIVE +#ifdef CUBEB_AAUDIO_EXCLUSIVE_STREAM WRAP(AAudioStreamBuilder_setSharingMode)(sb, AAUDIO_SHARING_MODE_EXCLUSIVE); #endif -#ifdef AAUDIO_LOW_LATENCY +#ifdef CUBEB_AAUDIO_LOW_LATENCY WRAP(AAudioStreamBuilder_setPerformanceMode)(sb, AAUDIO_PERFORMANCE_MODE_LOW_LATENCY); -#elif defined(AAUDIO_POWER_SAVING) +#elif defined(CUBEB_AAUDIO_POWER_SAVING) WRAP(AAudioStreamBuilder_setPerformanceMode)(sb, AAUDIO_PERFORMANCE_MODE_POWER_SAVING); #endif @@ -956,9 +827,9 @@ aaudio_stream_init(cubeb * ctx, if (output_stream_params) { WRAP(AAudioStreamBuilder_setDirection)(sb, AAUDIO_DIRECTION_OUTPUT); WRAP(AAudioStreamBuilder_setDataCallback)(sb, out_data_callback, stm); - res_err = realize_stream(sb, output_stream_params, &stm->ostream, &frame_size); + int res_err = realize_stream(sb, output_stream_params, &stm->ostream, &frame_size); if (res_err) { - goto error; + return res_err; } // output debug information @@ -980,7 +851,7 @@ aaudio_stream_init(cubeb * ctx, stm->out_channels = output_stream_params->channels; stm->out_format = output_stream_params->format; stm->out_frame_size = frame_size; - atomic_store(&stm->volume, 1.f); + stm->volume.store(1.f); } // input @@ -988,9 +859,9 @@ aaudio_stream_init(cubeb * ctx, if (input_stream_params) { WRAP(AAudioStreamBuilder_setDirection)(sb, AAUDIO_DIRECTION_INPUT); WRAP(AAudioStreamBuilder_setDataCallback)(sb, in_data_callback, stm); - res_err = realize_stream(sb, input_stream_params, &stm->istream, &frame_size); + int res_err = realize_stream(sb, input_stream_params, &stm->istream, &frame_size); if (res_err) { - goto error; + return res_err; } // output debug information @@ -1005,7 +876,7 @@ aaudio_stream_init(cubeb * ctx, LOG("AAudio input stream buffer size: %d", bsize); LOG("AAudio input stream buffer rate: %d", rate); - stm->in_buf = malloc(bcap * frame_size); + stm->in_buf.reset(new char[bcap * frame_size]()); assert(!target_sample_rate || target_sample_rate == input_stream_params->rate); target_sample_rate = input_stream_params->rate; @@ -1019,58 +890,117 @@ aaudio_stream_init(cubeb * ctx, input_stream_params ? &in_params : NULL, output_stream_params ? &out_params : NULL, target_sample_rate, - data_callback, - user_ptr, + stm->data_callback, + stm->user_ptr, CUBEB_RESAMPLER_QUALITY_DEFAULT); if (!stm->resampler) { LOG("Failed to create resampler"); - goto error; + return CUBEB_ERROR; } // the stream isn't started initially. We don't need to differntiate // between a stream that was just initialized and one that played // already but was stopped - atomic_store(&stm->state, STREAM_STATE_STOPPED); + stm->state.store(stream_state::stopped); LOG("Cubeb stream (%p) init success", (void*) stm); - pthread_mutex_unlock(&stm->mutex); + return CUBEB_OK; +} + +static int +aaudio_stream_init(cubeb * ctx, + cubeb_stream ** stream, + char const * stream_name, + cubeb_devid input_device, + cubeb_stream_params * input_stream_params, + cubeb_devid output_device, + cubeb_stream_params * output_stream_params, + unsigned int latency_frames, + cubeb_data_callback data_callback, + cubeb_state_callback state_callback, + void * user_ptr) +{ + assert(!input_device); + assert(!output_device); + + (void) stream_name; + + // atomically find a free stream. + cubeb_stream * stm = NULL; + unique_lock lock; + for (unsigned i = 0u; i < MAX_STREAMS; ++i) { + // This check is only an optimization, we don't strictly need it + // since we check again after locking the mutex. + if (ctx->streams[i].in_use.load()) { + continue; + } + + // if this fails, another thread initialized this stream + // between our check of in_use and this. + lock = unique_lock(ctx->streams[i].mutex, std::try_to_lock); + if(!lock.owns_lock()) { + continue; + } + + if(ctx->streams[i].in_use.load()) { + lock = {}; + continue; + } + + stm = &ctx->streams[i]; + break; + } + + if (!stm) { + LOG("Error: maximum number of streams reached"); + return CUBEB_ERROR; + } + + stm->context = ctx; + stm->user_ptr = user_ptr; + stm->data_callback = data_callback; + stm->state_callback = state_callback; + + int err = aaudio_stream_init_impl(stm, stream_name, input_device, + input_stream_params, output_device, output_stream_params, latency_frames); + if(err != CUBEB_OK) { + // This is needed since aaudio_stream_destroy will lock the mutex again. + // It's no problem that there is a gap in between as the stream isn't + // actually in u se. + lock.unlock(); + aaudio_stream_destroy(stm); + return err; + } - WRAP(AAudioStreamBuilder_delete)(sb); *stream = stm; return CUBEB_OK; - -error: - WRAP(AAudioStreamBuilder_delete)(sb); - pthread_mutex_unlock(&stm->mutex); - aaudio_stream_destroy(stm); - return res_err; } static int aaudio_stream_start(cubeb_stream * stm) { - pthread_mutex_lock(&stm->mutex); - enum stream_state state = atomic_load(&stm->state); + assert(stm && stm->in_use.load()); + lock_guard lock(stm->mutex); + + stream_state state = stm->state.load(); int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0; int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0; LOGV("starting stream %p: %d (%d %d)", (void*) stm, state, istate, ostate); switch (state) { - case STREAM_STATE_STARTED: - case STREAM_STATE_STARTING: - pthread_mutex_unlock(&stm->mutex); + case stream_state::started: + case stream_state::starting: LOG("cubeb stream %p already starting/started", (void*) stm); return CUBEB_OK; - case STREAM_STATE_ERROR: - case STREAM_STATE_SHUTDOWN: + case stream_state::error: + case stream_state::shutdown: return CUBEB_ERROR; - case STREAM_STATE_INIT: - pthread_mutex_unlock(&stm->mutex); + case stream_state::init: assert(false && "Invalid stream"); return CUBEB_ERROR; - case STREAM_STATE_STOPPED: - case STREAM_STATE_STOPPING: - case STREAM_STATE_DRAINING: + case stream_state::stopped: + case stream_state::stopping: + case stream_state::draining: break; } @@ -1078,7 +1008,7 @@ aaudio_stream_start(cubeb_stream * stm) // NOTE: aaudio docs don't state explicitly if we have to do this or // if we are allowed to call requestStart while the stream is - // in the transient STOPPING state. Works without in testing though + // in the transient STOPPING state. Works without this in testing though // if (ostate == AAUDIO_STREAM_STATE_STOPPING) { // res = WRAP(AAudioStream_waitForStateChange)(stm->ostream, // AAUDIO_STREAM_STATE_STOPPING, &ostate, INT64_MAX); @@ -1102,8 +1032,7 @@ aaudio_stream_start(cubeb_stream * stm) res = WRAP(AAudioStream_requestStart)(stm->istream); if (res != AAUDIO_OK) { LOG("AAudioStream_requestStart (istream): %s", WRAP(AAudio_convertResultToText)(res)); - atomic_store(&stm->state, STREAM_STATE_ERROR); - pthread_mutex_unlock(&stm->mutex); + stm->state.store(stream_state::error); return CUBEB_ERROR; } } @@ -1112,32 +1041,32 @@ aaudio_stream_start(cubeb_stream * stm) res = WRAP(AAudioStream_requestStart)(stm->ostream); if (res != AAUDIO_OK) { LOG("AAudioStream_requestStart (ostream): %s", WRAP(AAudio_convertResultToText)(res)); - atomic_store(&stm->state, STREAM_STATE_ERROR); - pthread_mutex_unlock(&stm->mutex); + stm->state.store(stream_state::error); return CUBEB_ERROR; } } int ret = CUBEB_OK; bool success; - while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING))) { + + while (!(success = stm->state.compare_exchange_strong(state, stream_state::starting))) { // we land here only if the state has changed in the meantime switch (state) { // If an error ocurred in the meantime, we can't change that. // The stream will be stopped when shut down. - case STREAM_STATE_ERROR: + case stream_state::error: ret = CUBEB_ERROR; break; // The only situation in which the state could have switched to draining // is if the callback was already fired and requested draining. Don't // overwrite that. It's not an error either though. - case STREAM_STATE_DRAINING: + case stream_state::draining: break; // If the state switched [draining -> stopping] or [draining/stopping -> stopped] // in the meantime, we can simply overwrite that since we restarted the stream. - case STREAM_STATE_STOPPING: - case STREAM_STATE_STOPPED: + case stream_state::stopping: + case stream_state::stopped: continue; // There is no situation in which the state could have been valid before @@ -1154,40 +1083,38 @@ aaudio_stream_start(cubeb_stream * stm) } if (success) { - atomic_store(&stm->context->state.waiting, true); - pthread_cond_signal(&stm->context->state.cond); + stm->context->state.waiting.store(true); + stm->context->state.cond.notify_one(); } - pthread_mutex_unlock(&stm->mutex); return ret; } static int aaudio_stream_stop(cubeb_stream * stm) { - pthread_mutex_lock(&stm->mutex); - enum stream_state state = atomic_load(&stm->state); + assert(stm && stm->in_use.load()); + lock_guard lock(stm->mutex); + + stream_state state = stm->state.load(); int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0; int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0; LOGV("stopping stream %p: %d (%d %d)", (void*) stm, state, istate, ostate); switch (state) { - case STREAM_STATE_STOPPED: - case STREAM_STATE_STOPPING: - case STREAM_STATE_DRAINING: - pthread_mutex_unlock(&stm->mutex); + case stream_state::stopped: + case stream_state::stopping: + case stream_state::draining: LOG("cubeb stream %p already stopping/stopped", (void*) stm); return CUBEB_OK; - case STREAM_STATE_ERROR: - case STREAM_STATE_SHUTDOWN: - pthread_mutex_unlock(&stm->mutex); + case stream_state::error: + case stream_state::shutdown: return CUBEB_ERROR; - case STREAM_STATE_INIT: - pthread_mutex_unlock(&stm->mutex); + case stream_state::init: assert(false && "Invalid stream"); return CUBEB_ERROR; - case STREAM_STATE_STARTED: - case STREAM_STATE_STARTING: + case stream_state::started: + case stream_state::starting: break; } @@ -1221,8 +1148,7 @@ aaudio_stream_stop(cubeb_stream * stm) res = WRAP(AAudioStream_requestStop)(stm->ostream); if (res != AAUDIO_OK) { LOG("AAudioStream_requestStop (ostream): %s", WRAP(AAudio_convertResultToText)(res)); - atomic_store(&stm->state, STREAM_STATE_ERROR); - pthread_mutex_unlock(&stm->mutex); + stm->state.store(stream_state::error); return CUBEB_ERROR; } } @@ -1231,34 +1157,33 @@ aaudio_stream_stop(cubeb_stream * stm) res = WRAP(AAudioStream_requestStop)(stm->istream); if (res != AAUDIO_OK) { LOG("AAudioStream_requestStop (istream): %s", WRAP(AAudio_convertResultToText)(res)); - atomic_store(&stm->state, STREAM_STATE_ERROR); - pthread_mutex_unlock(&stm->mutex); + stm->state.store(stream_state::error); return CUBEB_ERROR; } } int ret = CUBEB_OK; bool success; - while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING))) { + while (!(success = atomic_compare_exchange_strong(&stm->state, &state, stream_state::stopping))) { // we land here only if the state has changed in the meantime switch (state) { // If an error ocurred in the meantime, we can't change that. // The stream will be stopped when shut down. - case STREAM_STATE_ERROR: + case stream_state::error: ret = CUBEB_ERROR; break; // If it was switched to draining in the meantime, it was or // will be stopped soon anyways. We don't interfere with // the draining process, no matter in which state. // Not an error - case STREAM_STATE_DRAINING: - case STREAM_STATE_STOPPING: - case STREAM_STATE_STOPPED: + case stream_state::draining: + case stream_state::stopping: + case stream_state::stopped: break; // If the state switched from starting to started in the meantime // we can simply overwrite that since we just stopped it. - case STREAM_STATE_STARTED: + case stream_state::started: continue; // There is no situation in which the state could have been valid before @@ -1274,39 +1199,37 @@ aaudio_stream_stop(cubeb_stream * stm) } if (success) { - atomic_store(&stm->context->state.waiting, true); - pthread_cond_signal(&stm->context->state.cond); + stm->context->state.waiting.store(true); + stm->context->state.cond.notify_one(); } - pthread_mutex_unlock(&stm->mutex); return ret; } static int aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position) { - pthread_mutex_lock(&stm->mutex); - enum stream_state state = atomic_load(&stm->state); + assert(stm && stm->in_use.load()); + lock_guard lock(stm->mutex); + + stream_state state = stm->state.load(); AAudioStream * stream = stm->ostream ? stm->ostream : stm->istream; switch (state) { - case STREAM_STATE_ERROR: - case STREAM_STATE_SHUTDOWN: - pthread_mutex_unlock(&stm->mutex); + case stream_state::error: + case stream_state::shutdown: return CUBEB_ERROR; - case STREAM_STATE_STOPPED: - case STREAM_STATE_DRAINING: - case STREAM_STATE_STOPPING: + case stream_state::draining: + case stream_state::stopped: + case stream_state::stopping: // getTimestamp is only valid when the stream is playing. // Simply return the number of frames passed to aaudio *position = WRAP(AAudioStream_getFramesRead)(stream); - pthread_mutex_unlock(&stm->mutex); return CUBEB_OK; - case STREAM_STATE_INIT: + case stream_state::init: assert(false && "Invalid stream"); - pthread_mutex_unlock(&stm->mutex); return CUBEB_ERROR; - case STREAM_STATE_STARTED: - case STREAM_STATE_STARTING: + case stream_state::started: + case stream_state::starting: break; } @@ -1319,18 +1242,15 @@ aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position) // has internally started and gives us a valid timestamp. // If that is not the case (invalid_state is returned) we simply // fall back to the method we use for non-playing streams. - if (res == AAUDIO_ERROR_INVALID_STATE && state == STREAM_STATE_STARTING) { + if (res == AAUDIO_ERROR_INVALID_STATE && state == stream_state::starting) { *position = WRAP(AAudioStream_getFramesRead)(stream); - pthread_mutex_unlock(&stm->mutex); return CUBEB_OK; } LOG("AAudioStream_getTimestamp: %s", WRAP(AAudio_convertResultToText)(res)); - pthread_mutex_unlock(&stm->mutex); return CUBEB_ERROR; } - pthread_mutex_unlock(&stm->mutex); *position = pos; return CUBEB_OK; } @@ -1338,36 +1258,85 @@ aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position) static int aaudio_stream_set_volume(cubeb_stream * stm, float volume) { - assert(stm && stm->ostream); - atomic_store(&stm->volume, volume); + assert(stm && stm->in_use.load() && stm->ostream); + stm->volume.store(volume); return CUBEB_OK; } -static const struct cubeb_ops aaudio_ops = { - .init = aaudio_init, - .get_backend_id = aaudio_get_backend_id, - .get_max_channel_count = aaudio_get_max_channel_count, +extern "C" int aaudio_init(cubeb ** context, char const * context_name); + +const static struct cubeb_ops aaudio_ops = { + /*.init =*/ aaudio_init, + /*.get_backend_id =*/ aaudio_get_backend_id, + /*.get_max_channel_count =*/ aaudio_get_max_channel_count, // NOTE: i guess we could support min_latency and preferred sample // rate via guessing, i.e. creating a dummy stream and check // its settings. - .get_min_latency = NULL, - .get_preferred_sample_rate = NULL, - .enumerate_devices = NULL, - .device_collection_destroy = NULL, - .destroy = aaudio_destroy, - .stream_init = aaudio_stream_init, - .stream_destroy = aaudio_stream_destroy, - .stream_start = aaudio_stream_start, - .stream_stop = aaudio_stream_stop, - .stream_reset_default_device = NULL, - .stream_get_position = aaudio_stream_get_position, + /*.get_min_latency =*/ NULL, + /*.get_preferred_sample_rate =*/ NULL, + /*.enumerate_devices =*/ NULL, + /*.device_collection_destroy =*/ NULL, + /*.destroy =*/ aaudio_destroy, + /*.stream_init =*/ aaudio_stream_init, + /*.stream_destroy =*/ aaudio_stream_destroy, + /*.stream_start =*/ aaudio_stream_start, + /*.stream_stop =*/ aaudio_stream_stop, + /*.stream_reset_default_device =*/ NULL, + /*.stream_get_position =*/ aaudio_stream_get_position, // NOTE: this could be implemented via means comparable to the // OpenSLES backend - .stream_get_latency = NULL, - .stream_set_volume = aaudio_stream_set_volume, - .stream_get_current_device = NULL, - .stream_device_destroy = NULL, - .stream_register_device_changed_callback = NULL, - .register_device_collection_changed = NULL + /*.stream_get_latency =*/ NULL, + /*.stream_set_volume =*/ aaudio_stream_set_volume, + /*.stream_get_current_device =*/ NULL, + /*.stream_device_destroy =*/ NULL, + /*.stream_register_device_changed_callback =*/ NULL, + /*.register_device_collection_changed =*/ NULL }; + +extern "C" /*static*/ int +aaudio_init(cubeb ** context, char const * context_name) { + (void) context_name; + + // load api + void * libaaudio = NULL; +#ifndef DISABLE_LIBAAUDIO_DLOPEN + libaaudio = dlopen("libaaudio.so", RTLD_NOW); + if (!libaaudio) { + return CUBEB_ERROR; + } + +#define LOAD(x) { \ + WRAP(x) = (decltype(WRAP(x))) (dlsym(libaaudio, #x)); \ + if (!WRAP(x)) { \ + LOG("AAudio: Failed to load %s", #x); \ + dlclose(libaaudio); \ + return CUBEB_ERROR; \ + } \ + } + + LIBAAUDIO_API_VISIT(LOAD); +#undef LOAD +#endif + + cubeb * ctx = new cubeb; + ctx->ops = &aaudio_ops; + ctx->libaaudio = libaaudio; + + try { + ctx->state.thread = std::thread(state_thread, ctx); + + // NOTE: using platform-specific APIs we could set the priority of the + // notifier thread lower than the priority of the state thread. + // This way, it's more likely that the state thread will be woken up + // by the condition variable signal when both are currently waiting + ctx->state.notifier = std::thread(notifier_thread, ctx); + } catch(const std::exception& err) { + LOG("Failed to create thread: %s", err.what()); + aaudio_destroy(ctx); + return CUBEB_ERROR; + } + + *context = ctx; + return CUBEB_OK; +} |