diff options
-rw-r--r-- | src/cubeb_aaudio.c | 275 |
1 files changed, 224 insertions, 51 deletions
diff --git a/src/cubeb_aaudio.c b/src/cubeb_aaudio.c index a301aa9..f5a74bd 100644 --- a/src/cubeb_aaudio.c +++ b/src/cubeb_aaudio.c @@ -103,6 +103,7 @@ struct cubeb_stream { /**/ _Atomic bool in_use; + _Atomic enum stream_state state; AAudioStream * ostream; AAudioStream * istream; @@ -110,10 +111,12 @@ struct cubeb_stream { cubeb_state_callback state_callback; cubeb_resampler * resampler; - _Atomic enum stream_state state; - void * in_buf; + // mutex synchronizes access to the stream from the state thread + // and user-called functions. Everything that is accessed in the + // aaudio data (or error) callback is synchronized only via atomics. pthread_mutex_t mutex; + void * in_buf; unsigned in_frame_size; cubeb_sample_format out_format; @@ -126,14 +129,22 @@ struct cubeb { struct cubeb_ops const * ops; void * libaaudio; - _Atomic bool state_join; - pthread_t state_thread; - - pthread_mutex_t mutex; + struct { + // The state thread: it waits for state changes and stops + // drained streams. + pthread_t thread; + pthread_t notifier; + pthread_mutex_t mutex; + pthread_cond_t cond; + _Atomic bool join; + _Atomic bool waiting; + } state; + + // streams[i].in_use signals whether a stream is used struct cubeb_stream streams[MAX_STREAMS]; }; -// Only allowed from state thread +// Only allowed from state thread, while mutex on stm is locked static void shutdown(cubeb_stream* stm) { if (stm->istream) { @@ -147,15 +158,34 @@ static void shutdown(cubeb_stream* stm) atomic_store(&stm->state, STREAM_STATE_SHUTDOWN); } +// Returns whether the given state is one in which we wait for +// an asynchronous change +static bool waiting_state(enum stream_state state) +{ + switch(state) { + case STREAM_STATE_DRAINING: + case STREAM_STATE_STARTING: + case STREAM_STATE_STOPPING: + return true; + default: + return false; + } +} + +// Returns whether this stream is still waiting for a state change static void update_state(cubeb_stream * stm) { - if (!atomic_load(&stm->in_use) || atomic_load(&stm->state) == STREAM_STATE_INIT) { + // fast path for streams that don't wait for state change or are invalid + enum stream_state old_state = atomic_load(&stm->state); + if (old_state == STREAM_STATE_INIT || + old_state == STREAM_STATE_STARTED || + old_state == STREAM_STATE_STOPPED || + old_state == STREAM_STATE_SHUTDOWN) { return; } - // Don't wait to stream operations in the main thread. - // If an operation on a stream takes a while, the state thread - // can still continue update the other streams. + // If the main thread currently operates on this thread, we don't + // have to wait for it int err = pthread_mutex_trylock(&stm->mutex); if (err != 0) { if (err != EBUSY) { @@ -164,14 +194,22 @@ static void update_state(cubeb_stream * stm) return; } - aaudio_result_t res; - enum stream_state old_state = atomic_load(&stm->state); - enum stream_state new_state; + // check again: if this is true now, the stream was destroyed or + // changed between our fast path check and locking the mutex + old_state = atomic_load(&stm->state); + if (old_state == STREAM_STATE_INIT || + old_state == STREAM_STATE_STARTED || + old_state == STREAM_STATE_STOPPED || + old_state == STREAM_STATE_SHUTDOWN) { + pthread_mutex_unlock(&stm->mutex); + return; + } // We compute the new state the stream has and then compare_exchange it // if it has changed. This way we will never just overwrite state // changes that were set from the audio thread in the meantime, // such as a draining or error state. + enum stream_state new_state; do { if (old_state == STREAM_STATE_SHUTDOWN) { pthread_mutex_unlock(&stm->mutex); @@ -189,6 +227,7 @@ static void update_state(cubeb_stream * stm) aaudio_stream_state_t istate = 0; aaudio_stream_state_t ostate = 0; + aaudio_result_t res; if (stm->istream) { res = WRAP(AAudioStream_waitForStateChange)(stm->istream, AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0); @@ -239,9 +278,6 @@ static void update_state(cubeb_stream * stm) } switch (old_state) { - case STREAM_STATE_STOPPED: - case STREAM_STATE_STARTED: - break; case STREAM_STATE_STARTING: if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) && (!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) { @@ -311,23 +347,71 @@ static void update_state(cubeb_stream * stm) pthread_mutex_unlock(&stm->mutex); } -static void * state_thread(void * user_ptr) +static void * notifier_thread(void * user_ptr) { cubeb * ctx = (cubeb*) user_ptr; - while (!atomic_load(&ctx->state_join)) { - for (unsigned i = 0u; i < MAX_STREAMS; ++i) { - cubeb_stream* stm = &ctx->streams[i]; - update_state(stm); + pthread_mutex_lock(&ctx->state.mutex); + while (!atomic_load(&ctx->state.join)) { + pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex); + if (atomic_load(&ctx->state.waiting)) { + pthread_cond_signal(&ctx->state.cond); } + } - struct timespec ts = { - .tv_nsec = 1000 * 1000 * 5, - .tv_sec = 0, - }; + // make sure other thread joins as well + pthread_cond_signal(&ctx->state.cond); + pthread_mutex_unlock(&ctx->state.mutex); + LOG("Exiting notifier thread"); + return NULL; +} - nanosleep(&ts, NULL); // wait 5ms +static void * state_thread(void * user_ptr) +{ + cubeb * ctx = (cubeb*) user_ptr; + pthread_mutex_lock(&ctx->state.mutex); + + bool waiting = false; + while (!atomic_load(&ctx->state.join)) { + waiting |= atomic_load(&ctx->state.waiting); + if (waiting) { + atomic_store(&ctx->state.waiting, false); + waiting = false; + for (unsigned i = 0u; i < MAX_STREAMS; ++i) { + cubeb_stream* stm = &ctx->streams[i]; + update_state(stm); + waiting |= waiting_state(atomic_load(&stm->state)); + } + + // state changed from another thread, update again immediately + if(atomic_load(&ctx->state.waiting)) { + waiting = true; + continue; + } + + // Not waiting for any change anymore: we can wait on the + // condition variable without timeout + if (!waiting) { + continue; + } + + // while any stream is waiting for state change we sleep with regular + // timeouts. But we wake up immediately if signaled. + // This might seem like a poor man's implementation of state change + // waiting but (as of december 2019), the implementation of + // AAudioStream_waitForStateChange is pretty much the same: + // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp#277 + struct timespec timeout; + clock_gettime(CLOCK_MONOTONIC, &timeout); + timeout.tv_nsec += 5 * 1000 * 1000; // wait 5ms + pthread_cond_timedwait(&ctx->state.cond, &ctx->state.mutex, &timeout); + } else { + pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex); + } } + // make sure other thread joins as well + pthread_cond_signal(&ctx->state.cond); + pthread_mutex_unlock(&ctx->state.mutex); LOG("Exiting state thread"); return NULL; } @@ -351,13 +435,34 @@ aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels) static void aaudio_destroy(cubeb * ctx) { - atomic_store(&ctx->state_join, true); - int err = pthread_join(ctx->state_thread, NULL); +#ifndef NDEBUG + // make sure all streams were destroyed + for(unsigned i = 0u; i < MAX_STREAMS; ++i) { + assert(!atomic_load(&ctx->streams[i].in_use)); + } +#endif + + // broadcast joining to both threads + // they will additionally signal each other before joining + atomic_store(&ctx->state.join, true); + pthread_cond_broadcast(&ctx->state.cond); + + int err; + err = pthread_join(ctx->state.thread, NULL); if (err != 0) { LOG("pthread_join: %s", strerror(err)); } - pthread_mutex_destroy(&ctx->mutex); + err = pthread_join(ctx->state.notifier, NULL); + if (err != 0) { + LOG("pthread_join: %s", strerror(err)); + } + + pthread_cond_destroy(&ctx->state.cond); + pthread_mutex_destroy(&ctx->state.mutex); + for (unsigned i = 0u; i < MAX_STREAMS; ++i) { + pthread_mutex_destroy(&ctx->streams[i].mutex); + } if (ctx->libaaudio) { dlclose(ctx->libaaudio); @@ -368,10 +473,12 @@ aaudio_destroy(cubeb * ctx) /*static*/ int aaudio_init(cubeb ** context, char const * context_name) { (void) context_name; + int err; // load api + void * libaaudio = NULL; #ifndef DISABLE_LIBAAUDIO_DLOPEN - void * libaaudio = dlopen("libaaudio.so", RTLD_NOW); + libaaudio = dlopen("libaaudio.so", RTLD_NOW); if (!libaaudio) { return CUBEB_ERROR; } @@ -392,10 +499,46 @@ aaudio_init(cubeb ** context, char const * context_name) { cubeb* ctx = (cubeb*) calloc(1, sizeof(*ctx)); ctx->ops = &aaudio_ops; ctx->libaaudio = libaaudio; - atomic_store(&ctx->state_join, false); + atomic_init(&ctx->state.join, false); + atomic_init(&ctx->state.waiting, false); + pthread_mutex_init(&ctx->state.mutex, NULL); + + pthread_condattr_t cond_attr; + pthread_condattr_init(&cond_attr); + pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC); + err = pthread_cond_init(&ctx->state.cond, &cond_attr); + if (err) { + LOG("pthread_cond_init: %s", strerror(err)); + aaudio_destroy(ctx); + return CUBEB_ERROR; + } - pthread_mutex_init(&ctx->mutex, NULL); - pthread_create(&ctx->state_thread, NULL, state_thread, ctx); + // The stream mutexes are not bound to the lifetimes of the + // streams since we need them to synchronize the streams with + // the state thread. + for (unsigned i = 0u; i < MAX_STREAMS; ++i) { + atomic_init(&ctx->streams[i].in_use, false); + atomic_init(&ctx->streams[i].state, STREAM_STATE_INIT); + pthread_mutex_init(&ctx->streams[i].mutex, NULL); + } + + err = pthread_create(&ctx->state.thread, NULL, state_thread, ctx); + if (err != 0) { + LOG("pthread_create: %s", strerror(err)); + aaudio_destroy(ctx); + return CUBEB_ERROR; + } + + // TODO: we could set the priority of the notifier thread lower than + // the priority of the state thread. This way, it's more likely + // that the state thread will be woken up by the condition variable signal + // when both are currently waiting + err = pthread_create(&ctx->state.notifier, NULL, notifier_thread, ctx); + if (err != 0) { + LOG("pthread_create: %s", strerror(err)); + aaudio_destroy(ctx); + return CUBEB_ERROR; + } *context = ctx; return CUBEB_OK; @@ -487,6 +630,8 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data, return AAUDIO_CALLBACK_RESULT_STOP; } else if (done_frames < num_frames) { atomic_store(&stm->state, STREAM_STATE_DRAINING); + atomic_store(&stm->context->state.waiting, true); + pthread_cond_signal(&stm->context->state.cond); char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size; memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size); @@ -529,6 +674,8 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data, return AAUDIO_CALLBACK_RESULT_STOP; } else if (done_frames < num_frames) { atomic_store(&stm->state, STREAM_STATE_DRAINING); + atomic_store(&stm->context->state.waiting, true); + pthread_cond_signal(&stm->context->state.cond); char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size; memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size); @@ -574,6 +721,8 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data, // stop it from the state thread. That is signaled via the // DRAINING state. atomic_store(&stm->state, STREAM_STATE_DRAINING); + atomic_store(&stm->context->state.waiting, true); + pthread_cond_signal(&stm->context->state.cond); } return AAUDIO_CALLBACK_RESULT_CONTINUE; @@ -686,10 +835,9 @@ aaudio_stream_destroy(cubeb_stream * stm) free(stm->in_buf); } - atomic_store(&stm->in_use, false); atomic_store(&stm->state, STREAM_STATE_INIT); pthread_mutex_unlock(&stm->mutex); - pthread_mutex_destroy(&stm->mutex); + atomic_store(&stm->in_use, false); } static int @@ -718,20 +866,28 @@ aaudio_stream_init(cubeb * ctx, return CUBEB_ERROR; } - // find a free stream + // atomically find a free stream. cubeb_stream * stm = NULL; - { - pthread_mutex_lock(&ctx->mutex); - for (unsigned i = 0u; i < MAX_STREAMS; ++i) { - if (!atomic_load(&ctx->streams[i].in_use)) { - stm = &ctx->streams[i]; - break; + for (unsigned i = 0u; i < MAX_STREAMS; ++i) { + // This check is only an optimization, we don't strictly need it + // since we check again after locking the mutex. + if (atomic_load(&ctx->streams[i].in_use)) { + continue; + } + + // if this fails with EBUSY, another thread initialized this stream + // between our check of in_use and this. + int err = pthread_mutex_trylock(&ctx->streams[i].mutex); + if (err != 0 || atomic_load(&ctx->streams[i].in_use)) { + if (err && err != EBUSY) { + LOG("pthread_mutex_trylock: %s", strerror(err)); } + + continue; } - atomic_store(&stm->state, STREAM_STATE_INIT); - atomic_store(&stm->in_use, true); - pthread_mutex_unlock(&ctx->mutex); + stm = &ctx->streams[i]; + break; } if (!stm) { @@ -739,10 +895,10 @@ aaudio_stream_init(cubeb * ctx, return CUBEB_ERROR; } - unsigned res_err = CUBEB_ERROR; - pthread_mutex_init(&stm->mutex, NULL); - pthread_mutex_lock(&stm->mutex); + assert(atomic_load(&stm->state) == STREAM_STATE_INIT); + atomic_store(&stm->in_use, true); + unsigned res_err = CUBEB_ERROR; stm->user_ptr = user_ptr; stm->data_callback = data_callback; stm->state_callback = state_callback; @@ -860,6 +1016,9 @@ aaudio_stream_init(cubeb * ctx, goto error; } + // the stream isn't started initially. We don't need to differntiate + // between a stream that was just initialized and one that played + // already but was stopped atomic_store(&stm->state, STREAM_STATE_STOPPED); LOG("Cubeb stream (%p) init success", (void*) stm); pthread_mutex_unlock(&stm->mutex); @@ -948,7 +1107,8 @@ aaudio_stream_start(cubeb_stream * stm) } int ret = CUBEB_OK; - while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING)) { + bool success; + while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING))) { // we land here only if the state has changed in the meantime switch (state) { // If an error ocurred in the meantime, we can't change that. @@ -981,6 +1141,11 @@ aaudio_stream_start(cubeb_stream * stm) break; } + if(success) { + atomic_store(&stm->context->state.waiting, true); + pthread_cond_signal(&stm->context->state.cond); + } + pthread_mutex_unlock(&stm->mutex); return ret; } @@ -1061,7 +1226,8 @@ aaudio_stream_stop(cubeb_stream * stm) } int ret = CUBEB_OK; - while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING)) { + bool success; + while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING))) { // we land here only if the state has changed in the meantime switch (state) { // If an error ocurred in the meantime, we can't change that. @@ -1095,6 +1261,11 @@ aaudio_stream_stop(cubeb_stream * stm) break; } + if(success) { + atomic_store(&stm->context->state.waiting, true); + pthread_cond_signal(&stm->context->state.cond); + } + pthread_mutex_unlock(&stm->mutex); return ret; } @@ -1178,6 +1349,8 @@ static const struct cubeb_ops aaudio_ops = { .stream_stop = aaudio_stream_stop, .stream_reset_default_device = NULL, .stream_get_position = aaudio_stream_get_position, + // NOTE: this could be implemented via means comparable to the + // OpenSLES backend .stream_get_latency = NULL, .stream_set_volume = aaudio_stream_set_volume, .stream_get_current_device = NULL, |