aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/cubeb_aaudio.c275
1 files changed, 224 insertions, 51 deletions
diff --git a/src/cubeb_aaudio.c b/src/cubeb_aaudio.c
index a301aa9..f5a74bd 100644
--- a/src/cubeb_aaudio.c
+++ b/src/cubeb_aaudio.c
@@ -103,6 +103,7 @@ struct cubeb_stream {
/**/
_Atomic bool in_use;
+ _Atomic enum stream_state state;
AAudioStream * ostream;
AAudioStream * istream;
@@ -110,10 +111,12 @@ struct cubeb_stream {
cubeb_state_callback state_callback;
cubeb_resampler * resampler;
- _Atomic enum stream_state state;
- void * in_buf;
+ // mutex synchronizes access to the stream from the state thread
+ // and user-called functions. Everything that is accessed in the
+ // aaudio data (or error) callback is synchronized only via atomics.
pthread_mutex_t mutex;
+ void * in_buf;
unsigned in_frame_size;
cubeb_sample_format out_format;
@@ -126,14 +129,22 @@ struct cubeb {
struct cubeb_ops const * ops;
void * libaaudio;
- _Atomic bool state_join;
- pthread_t state_thread;
-
- pthread_mutex_t mutex;
+ struct {
+ // The state thread: it waits for state changes and stops
+ // drained streams.
+ pthread_t thread;
+ pthread_t notifier;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ _Atomic bool join;
+ _Atomic bool waiting;
+ } state;
+
+ // streams[i].in_use signals whether a stream is used
struct cubeb_stream streams[MAX_STREAMS];
};
-// Only allowed from state thread
+// Only allowed from state thread, while mutex on stm is locked
static void shutdown(cubeb_stream* stm)
{
if (stm->istream) {
@@ -147,15 +158,34 @@ static void shutdown(cubeb_stream* stm)
atomic_store(&stm->state, STREAM_STATE_SHUTDOWN);
}
+// Returns whether the given state is one in which we wait for
+// an asynchronous change
+static bool waiting_state(enum stream_state state)
+{
+ switch(state) {
+ case STREAM_STATE_DRAINING:
+ case STREAM_STATE_STARTING:
+ case STREAM_STATE_STOPPING:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Returns whether this stream is still waiting for a state change
static void update_state(cubeb_stream * stm)
{
- if (!atomic_load(&stm->in_use) || atomic_load(&stm->state) == STREAM_STATE_INIT) {
+ // fast path for streams that don't wait for state change or are invalid
+ enum stream_state old_state = atomic_load(&stm->state);
+ if (old_state == STREAM_STATE_INIT ||
+ old_state == STREAM_STATE_STARTED ||
+ old_state == STREAM_STATE_STOPPED ||
+ old_state == STREAM_STATE_SHUTDOWN) {
return;
}
- // Don't wait to stream operations in the main thread.
- // If an operation on a stream takes a while, the state thread
- // can still continue update the other streams.
+ // If the main thread currently operates on this thread, we don't
+ // have to wait for it
int err = pthread_mutex_trylock(&stm->mutex);
if (err != 0) {
if (err != EBUSY) {
@@ -164,14 +194,22 @@ static void update_state(cubeb_stream * stm)
return;
}
- aaudio_result_t res;
- enum stream_state old_state = atomic_load(&stm->state);
- enum stream_state new_state;
+ // check again: if this is true now, the stream was destroyed or
+ // changed between our fast path check and locking the mutex
+ old_state = atomic_load(&stm->state);
+ if (old_state == STREAM_STATE_INIT ||
+ old_state == STREAM_STATE_STARTED ||
+ old_state == STREAM_STATE_STOPPED ||
+ old_state == STREAM_STATE_SHUTDOWN) {
+ pthread_mutex_unlock(&stm->mutex);
+ return;
+ }
// We compute the new state the stream has and then compare_exchange it
// if it has changed. This way we will never just overwrite state
// changes that were set from the audio thread in the meantime,
// such as a draining or error state.
+ enum stream_state new_state;
do {
if (old_state == STREAM_STATE_SHUTDOWN) {
pthread_mutex_unlock(&stm->mutex);
@@ -189,6 +227,7 @@ static void update_state(cubeb_stream * stm)
aaudio_stream_state_t istate = 0;
aaudio_stream_state_t ostate = 0;
+ aaudio_result_t res;
if (stm->istream) {
res = WRAP(AAudioStream_waitForStateChange)(stm->istream,
AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0);
@@ -239,9 +278,6 @@ static void update_state(cubeb_stream * stm)
}
switch (old_state) {
- case STREAM_STATE_STOPPED:
- case STREAM_STATE_STARTED:
- break;
case STREAM_STATE_STARTING:
if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) {
@@ -311,23 +347,71 @@ static void update_state(cubeb_stream * stm)
pthread_mutex_unlock(&stm->mutex);
}
-static void * state_thread(void * user_ptr)
+static void * notifier_thread(void * user_ptr)
{
cubeb * ctx = (cubeb*) user_ptr;
- while (!atomic_load(&ctx->state_join)) {
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- cubeb_stream* stm = &ctx->streams[i];
- update_state(stm);
+ pthread_mutex_lock(&ctx->state.mutex);
+ while (!atomic_load(&ctx->state.join)) {
+ pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
+ if (atomic_load(&ctx->state.waiting)) {
+ pthread_cond_signal(&ctx->state.cond);
}
+ }
- struct timespec ts = {
- .tv_nsec = 1000 * 1000 * 5,
- .tv_sec = 0,
- };
+ // make sure other thread joins as well
+ pthread_cond_signal(&ctx->state.cond);
+ pthread_mutex_unlock(&ctx->state.mutex);
+ LOG("Exiting notifier thread");
+ return NULL;
+}
- nanosleep(&ts, NULL); // wait 5ms
+static void * state_thread(void * user_ptr)
+{
+ cubeb * ctx = (cubeb*) user_ptr;
+ pthread_mutex_lock(&ctx->state.mutex);
+
+ bool waiting = false;
+ while (!atomic_load(&ctx->state.join)) {
+ waiting |= atomic_load(&ctx->state.waiting);
+ if (waiting) {
+ atomic_store(&ctx->state.waiting, false);
+ waiting = false;
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ cubeb_stream* stm = &ctx->streams[i];
+ update_state(stm);
+ waiting |= waiting_state(atomic_load(&stm->state));
+ }
+
+ // state changed from another thread, update again immediately
+ if(atomic_load(&ctx->state.waiting)) {
+ waiting = true;
+ continue;
+ }
+
+ // Not waiting for any change anymore: we can wait on the
+ // condition variable without timeout
+ if (!waiting) {
+ continue;
+ }
+
+ // while any stream is waiting for state change we sleep with regular
+ // timeouts. But we wake up immediately if signaled.
+ // This might seem like a poor man's implementation of state change
+ // waiting but (as of december 2019), the implementation of
+ // AAudioStream_waitForStateChange is pretty much the same:
+ // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp#277
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_nsec += 5 * 1000 * 1000; // wait 5ms
+ pthread_cond_timedwait(&ctx->state.cond, &ctx->state.mutex, &timeout);
+ } else {
+ pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
+ }
}
+ // make sure other thread joins as well
+ pthread_cond_signal(&ctx->state.cond);
+ pthread_mutex_unlock(&ctx->state.mutex);
LOG("Exiting state thread");
return NULL;
}
@@ -351,13 +435,34 @@ aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels)
static void
aaudio_destroy(cubeb * ctx)
{
- atomic_store(&ctx->state_join, true);
- int err = pthread_join(ctx->state_thread, NULL);
+#ifndef NDEBUG
+ // make sure all streams were destroyed
+ for(unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ assert(!atomic_load(&ctx->streams[i].in_use));
+ }
+#endif
+
+ // broadcast joining to both threads
+ // they will additionally signal each other before joining
+ atomic_store(&ctx->state.join, true);
+ pthread_cond_broadcast(&ctx->state.cond);
+
+ int err;
+ err = pthread_join(ctx->state.thread, NULL);
if (err != 0) {
LOG("pthread_join: %s", strerror(err));
}
- pthread_mutex_destroy(&ctx->mutex);
+ err = pthread_join(ctx->state.notifier, NULL);
+ if (err != 0) {
+ LOG("pthread_join: %s", strerror(err));
+ }
+
+ pthread_cond_destroy(&ctx->state.cond);
+ pthread_mutex_destroy(&ctx->state.mutex);
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ pthread_mutex_destroy(&ctx->streams[i].mutex);
+ }
if (ctx->libaaudio) {
dlclose(ctx->libaaudio);
@@ -368,10 +473,12 @@ aaudio_destroy(cubeb * ctx)
/*static*/ int
aaudio_init(cubeb ** context, char const * context_name) {
(void) context_name;
+ int err;
// load api
+ void * libaaudio = NULL;
#ifndef DISABLE_LIBAAUDIO_DLOPEN
- void * libaaudio = dlopen("libaaudio.so", RTLD_NOW);
+ libaaudio = dlopen("libaaudio.so", RTLD_NOW);
if (!libaaudio) {
return CUBEB_ERROR;
}
@@ -392,10 +499,46 @@ aaudio_init(cubeb ** context, char const * context_name) {
cubeb* ctx = (cubeb*) calloc(1, sizeof(*ctx));
ctx->ops = &aaudio_ops;
ctx->libaaudio = libaaudio;
- atomic_store(&ctx->state_join, false);
+ atomic_init(&ctx->state.join, false);
+ atomic_init(&ctx->state.waiting, false);
+ pthread_mutex_init(&ctx->state.mutex, NULL);
+
+ pthread_condattr_t cond_attr;
+ pthread_condattr_init(&cond_attr);
+ pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
+ err = pthread_cond_init(&ctx->state.cond, &cond_attr);
+ if (err) {
+ LOG("pthread_cond_init: %s", strerror(err));
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
- pthread_mutex_init(&ctx->mutex, NULL);
- pthread_create(&ctx->state_thread, NULL, state_thread, ctx);
+ // The stream mutexes are not bound to the lifetimes of the
+ // streams since we need them to synchronize the streams with
+ // the state thread.
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ atomic_init(&ctx->streams[i].in_use, false);
+ atomic_init(&ctx->streams[i].state, STREAM_STATE_INIT);
+ pthread_mutex_init(&ctx->streams[i].mutex, NULL);
+ }
+
+ err = pthread_create(&ctx->state.thread, NULL, state_thread, ctx);
+ if (err != 0) {
+ LOG("pthread_create: %s", strerror(err));
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
+
+ // TODO: we could set the priority of the notifier thread lower than
+ // the priority of the state thread. This way, it's more likely
+ // that the state thread will be woken up by the condition variable signal
+ // when both are currently waiting
+ err = pthread_create(&ctx->state.notifier, NULL, notifier_thread, ctx);
+ if (err != 0) {
+ LOG("pthread_create: %s", strerror(err));
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
*context = ctx;
return CUBEB_OK;
@@ -487,6 +630,8 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
atomic_store(&stm->state, STREAM_STATE_DRAINING);
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
@@ -529,6 +674,8 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data,
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
atomic_store(&stm->state, STREAM_STATE_DRAINING);
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
@@ -574,6 +721,8 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
// stop it from the state thread. That is signaled via the
// DRAINING state.
atomic_store(&stm->state, STREAM_STATE_DRAINING);
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
}
return AAUDIO_CALLBACK_RESULT_CONTINUE;
@@ -686,10 +835,9 @@ aaudio_stream_destroy(cubeb_stream * stm)
free(stm->in_buf);
}
- atomic_store(&stm->in_use, false);
atomic_store(&stm->state, STREAM_STATE_INIT);
pthread_mutex_unlock(&stm->mutex);
- pthread_mutex_destroy(&stm->mutex);
+ atomic_store(&stm->in_use, false);
}
static int
@@ -718,20 +866,28 @@ aaudio_stream_init(cubeb * ctx,
return CUBEB_ERROR;
}
- // find a free stream
+ // atomically find a free stream.
cubeb_stream * stm = NULL;
- {
- pthread_mutex_lock(&ctx->mutex);
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- if (!atomic_load(&ctx->streams[i].in_use)) {
- stm = &ctx->streams[i];
- break;
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ // This check is only an optimization, we don't strictly need it
+ // since we check again after locking the mutex.
+ if (atomic_load(&ctx->streams[i].in_use)) {
+ continue;
+ }
+
+ // if this fails with EBUSY, another thread initialized this stream
+ // between our check of in_use and this.
+ int err = pthread_mutex_trylock(&ctx->streams[i].mutex);
+ if (err != 0 || atomic_load(&ctx->streams[i].in_use)) {
+ if (err && err != EBUSY) {
+ LOG("pthread_mutex_trylock: %s", strerror(err));
}
+
+ continue;
}
- atomic_store(&stm->state, STREAM_STATE_INIT);
- atomic_store(&stm->in_use, true);
- pthread_mutex_unlock(&ctx->mutex);
+ stm = &ctx->streams[i];
+ break;
}
if (!stm) {
@@ -739,10 +895,10 @@ aaudio_stream_init(cubeb * ctx,
return CUBEB_ERROR;
}
- unsigned res_err = CUBEB_ERROR;
- pthread_mutex_init(&stm->mutex, NULL);
- pthread_mutex_lock(&stm->mutex);
+ assert(atomic_load(&stm->state) == STREAM_STATE_INIT);
+ atomic_store(&stm->in_use, true);
+ unsigned res_err = CUBEB_ERROR;
stm->user_ptr = user_ptr;
stm->data_callback = data_callback;
stm->state_callback = state_callback;
@@ -860,6 +1016,9 @@ aaudio_stream_init(cubeb * ctx,
goto error;
}
+ // the stream isn't started initially. We don't need to differntiate
+ // between a stream that was just initialized and one that played
+ // already but was stopped
atomic_store(&stm->state, STREAM_STATE_STOPPED);
LOG("Cubeb stream (%p) init success", (void*) stm);
pthread_mutex_unlock(&stm->mutex);
@@ -948,7 +1107,8 @@ aaudio_stream_start(cubeb_stream * stm)
}
int ret = CUBEB_OK;
- while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING)) {
+ bool success;
+ while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
@@ -981,6 +1141,11 @@ aaudio_stream_start(cubeb_stream * stm)
break;
}
+ if(success) {
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
+ }
+
pthread_mutex_unlock(&stm->mutex);
return ret;
}
@@ -1061,7 +1226,8 @@ aaudio_stream_stop(cubeb_stream * stm)
}
int ret = CUBEB_OK;
- while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING)) {
+ bool success;
+ while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
@@ -1095,6 +1261,11 @@ aaudio_stream_stop(cubeb_stream * stm)
break;
}
+ if(success) {
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
+ }
+
pthread_mutex_unlock(&stm->mutex);
return ret;
}
@@ -1178,6 +1349,8 @@ static const struct cubeb_ops aaudio_ops = {
.stream_stop = aaudio_stream_stop,
.stream_reset_default_device = NULL,
.stream_get_position = aaudio_stream_get_position,
+ // NOTE: this could be implemented via means comparable to the
+ // OpenSLES backend
.stream_get_latency = NULL,
.stream_set_volume = aaudio_stream_set_volume,
.stream_get_current_device = NULL,