aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authornyorain <[email protected]>2019-12-08 00:19:10 +0100
committerPaul Adenot <[email protected]>2020-10-28 16:55:14 +0000
commit687fa393110a492f419b7450aaa38756e024c0e4 (patch)
tree323882379f7ffaa07bfa08f4d42c557fac8ff7e9 /src
parentefae368bcf1933316586a4669d1f7b70a45aa64a (diff)
downloadcubeb-687fa393110a492f419b7450aaa38756e024c0e4.tar.gz
cubeb-687fa393110a492f419b7450aaa38756e024c0e4.zip
AAudio: Use condition variable instead of sleeping
This removes the constant checking for state changes with 5ms of sleep in between for the state thread. We need a new thread to wakeup the state thread reliably without blocking in the audio thread. For a more detailed and theoretical explanation of the problem and solution (specifically written for this commit), see: https://nyorain.github.io/lock-free-wakeup.html Now, will only do this time-based sleeping when actively waiting for a state change. We can't implement that with a blocking call to AAudioStream_waitForStateChange since that can't be woken up and we furthermore might have to wait for multiple streams at once. This also fixes some issues and race conditions with stream destruction and adds some more documentation.
Diffstat (limited to 'src')
-rw-r--r--src/cubeb_aaudio.c275
1 files changed, 224 insertions, 51 deletions
diff --git a/src/cubeb_aaudio.c b/src/cubeb_aaudio.c
index a301aa9..f5a74bd 100644
--- a/src/cubeb_aaudio.c
+++ b/src/cubeb_aaudio.c
@@ -103,6 +103,7 @@ struct cubeb_stream {
/**/
_Atomic bool in_use;
+ _Atomic enum stream_state state;
AAudioStream * ostream;
AAudioStream * istream;
@@ -110,10 +111,12 @@ struct cubeb_stream {
cubeb_state_callback state_callback;
cubeb_resampler * resampler;
- _Atomic enum stream_state state;
- void * in_buf;
+ // mutex synchronizes access to the stream from the state thread
+ // and user-called functions. Everything that is accessed in the
+ // aaudio data (or error) callback is synchronized only via atomics.
pthread_mutex_t mutex;
+ void * in_buf;
unsigned in_frame_size;
cubeb_sample_format out_format;
@@ -126,14 +129,22 @@ struct cubeb {
struct cubeb_ops const * ops;
void * libaaudio;
- _Atomic bool state_join;
- pthread_t state_thread;
-
- pthread_mutex_t mutex;
+ struct {
+ // The state thread: it waits for state changes and stops
+ // drained streams.
+ pthread_t thread;
+ pthread_t notifier;
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ _Atomic bool join;
+ _Atomic bool waiting;
+ } state;
+
+ // streams[i].in_use signals whether a stream is used
struct cubeb_stream streams[MAX_STREAMS];
};
-// Only allowed from state thread
+// Only allowed from state thread, while mutex on stm is locked
static void shutdown(cubeb_stream* stm)
{
if (stm->istream) {
@@ -147,15 +158,34 @@ static void shutdown(cubeb_stream* stm)
atomic_store(&stm->state, STREAM_STATE_SHUTDOWN);
}
+// Returns whether the given state is one in which we wait for
+// an asynchronous change
+static bool waiting_state(enum stream_state state)
+{
+ switch(state) {
+ case STREAM_STATE_DRAINING:
+ case STREAM_STATE_STARTING:
+ case STREAM_STATE_STOPPING:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// Returns whether this stream is still waiting for a state change
static void update_state(cubeb_stream * stm)
{
- if (!atomic_load(&stm->in_use) || atomic_load(&stm->state) == STREAM_STATE_INIT) {
+ // fast path for streams that don't wait for state change or are invalid
+ enum stream_state old_state = atomic_load(&stm->state);
+ if (old_state == STREAM_STATE_INIT ||
+ old_state == STREAM_STATE_STARTED ||
+ old_state == STREAM_STATE_STOPPED ||
+ old_state == STREAM_STATE_SHUTDOWN) {
return;
}
- // Don't wait to stream operations in the main thread.
- // If an operation on a stream takes a while, the state thread
- // can still continue update the other streams.
+ // If the main thread currently operates on this thread, we don't
+ // have to wait for it
int err = pthread_mutex_trylock(&stm->mutex);
if (err != 0) {
if (err != EBUSY) {
@@ -164,14 +194,22 @@ static void update_state(cubeb_stream * stm)
return;
}
- aaudio_result_t res;
- enum stream_state old_state = atomic_load(&stm->state);
- enum stream_state new_state;
+ // check again: if this is true now, the stream was destroyed or
+ // changed between our fast path check and locking the mutex
+ old_state = atomic_load(&stm->state);
+ if (old_state == STREAM_STATE_INIT ||
+ old_state == STREAM_STATE_STARTED ||
+ old_state == STREAM_STATE_STOPPED ||
+ old_state == STREAM_STATE_SHUTDOWN) {
+ pthread_mutex_unlock(&stm->mutex);
+ return;
+ }
// We compute the new state the stream has and then compare_exchange it
// if it has changed. This way we will never just overwrite state
// changes that were set from the audio thread in the meantime,
// such as a draining or error state.
+ enum stream_state new_state;
do {
if (old_state == STREAM_STATE_SHUTDOWN) {
pthread_mutex_unlock(&stm->mutex);
@@ -189,6 +227,7 @@ static void update_state(cubeb_stream * stm)
aaudio_stream_state_t istate = 0;
aaudio_stream_state_t ostate = 0;
+ aaudio_result_t res;
if (stm->istream) {
res = WRAP(AAudioStream_waitForStateChange)(stm->istream,
AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0);
@@ -239,9 +278,6 @@ static void update_state(cubeb_stream * stm)
}
switch (old_state) {
- case STREAM_STATE_STOPPED:
- case STREAM_STATE_STARTED:
- break;
case STREAM_STATE_STARTING:
if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) {
@@ -311,23 +347,71 @@ static void update_state(cubeb_stream * stm)
pthread_mutex_unlock(&stm->mutex);
}
-static void * state_thread(void * user_ptr)
+static void * notifier_thread(void * user_ptr)
{
cubeb * ctx = (cubeb*) user_ptr;
- while (!atomic_load(&ctx->state_join)) {
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- cubeb_stream* stm = &ctx->streams[i];
- update_state(stm);
+ pthread_mutex_lock(&ctx->state.mutex);
+ while (!atomic_load(&ctx->state.join)) {
+ pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
+ if (atomic_load(&ctx->state.waiting)) {
+ pthread_cond_signal(&ctx->state.cond);
}
+ }
- struct timespec ts = {
- .tv_nsec = 1000 * 1000 * 5,
- .tv_sec = 0,
- };
+ // make sure other thread joins as well
+ pthread_cond_signal(&ctx->state.cond);
+ pthread_mutex_unlock(&ctx->state.mutex);
+ LOG("Exiting notifier thread");
+ return NULL;
+}
- nanosleep(&ts, NULL); // wait 5ms
+static void * state_thread(void * user_ptr)
+{
+ cubeb * ctx = (cubeb*) user_ptr;
+ pthread_mutex_lock(&ctx->state.mutex);
+
+ bool waiting = false;
+ while (!atomic_load(&ctx->state.join)) {
+ waiting |= atomic_load(&ctx->state.waiting);
+ if (waiting) {
+ atomic_store(&ctx->state.waiting, false);
+ waiting = false;
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ cubeb_stream* stm = &ctx->streams[i];
+ update_state(stm);
+ waiting |= waiting_state(atomic_load(&stm->state));
+ }
+
+ // state changed from another thread, update again immediately
+ if(atomic_load(&ctx->state.waiting)) {
+ waiting = true;
+ continue;
+ }
+
+ // Not waiting for any change anymore: we can wait on the
+ // condition variable without timeout
+ if (!waiting) {
+ continue;
+ }
+
+ // while any stream is waiting for state change we sleep with regular
+ // timeouts. But we wake up immediately if signaled.
+ // This might seem like a poor man's implementation of state change
+ // waiting but (as of december 2019), the implementation of
+ // AAudioStream_waitForStateChange is pretty much the same:
+ // https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp#277
+ struct timespec timeout;
+ clock_gettime(CLOCK_MONOTONIC, &timeout);
+ timeout.tv_nsec += 5 * 1000 * 1000; // wait 5ms
+ pthread_cond_timedwait(&ctx->state.cond, &ctx->state.mutex, &timeout);
+ } else {
+ pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
+ }
}
+ // make sure other thread joins as well
+ pthread_cond_signal(&ctx->state.cond);
+ pthread_mutex_unlock(&ctx->state.mutex);
LOG("Exiting state thread");
return NULL;
}
@@ -351,13 +435,34 @@ aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels)
static void
aaudio_destroy(cubeb * ctx)
{
- atomic_store(&ctx->state_join, true);
- int err = pthread_join(ctx->state_thread, NULL);
+#ifndef NDEBUG
+ // make sure all streams were destroyed
+ for(unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ assert(!atomic_load(&ctx->streams[i].in_use));
+ }
+#endif
+
+ // broadcast joining to both threads
+ // they will additionally signal each other before joining
+ atomic_store(&ctx->state.join, true);
+ pthread_cond_broadcast(&ctx->state.cond);
+
+ int err;
+ err = pthread_join(ctx->state.thread, NULL);
if (err != 0) {
LOG("pthread_join: %s", strerror(err));
}
- pthread_mutex_destroy(&ctx->mutex);
+ err = pthread_join(ctx->state.notifier, NULL);
+ if (err != 0) {
+ LOG("pthread_join: %s", strerror(err));
+ }
+
+ pthread_cond_destroy(&ctx->state.cond);
+ pthread_mutex_destroy(&ctx->state.mutex);
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ pthread_mutex_destroy(&ctx->streams[i].mutex);
+ }
if (ctx->libaaudio) {
dlclose(ctx->libaaudio);
@@ -368,10 +473,12 @@ aaudio_destroy(cubeb * ctx)
/*static*/ int
aaudio_init(cubeb ** context, char const * context_name) {
(void) context_name;
+ int err;
// load api
+ void * libaaudio = NULL;
#ifndef DISABLE_LIBAAUDIO_DLOPEN
- void * libaaudio = dlopen("libaaudio.so", RTLD_NOW);
+ libaaudio = dlopen("libaaudio.so", RTLD_NOW);
if (!libaaudio) {
return CUBEB_ERROR;
}
@@ -392,10 +499,46 @@ aaudio_init(cubeb ** context, char const * context_name) {
cubeb* ctx = (cubeb*) calloc(1, sizeof(*ctx));
ctx->ops = &aaudio_ops;
ctx->libaaudio = libaaudio;
- atomic_store(&ctx->state_join, false);
+ atomic_init(&ctx->state.join, false);
+ atomic_init(&ctx->state.waiting, false);
+ pthread_mutex_init(&ctx->state.mutex, NULL);
+
+ pthread_condattr_t cond_attr;
+ pthread_condattr_init(&cond_attr);
+ pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
+ err = pthread_cond_init(&ctx->state.cond, &cond_attr);
+ if (err) {
+ LOG("pthread_cond_init: %s", strerror(err));
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
- pthread_mutex_init(&ctx->mutex, NULL);
- pthread_create(&ctx->state_thread, NULL, state_thread, ctx);
+ // The stream mutexes are not bound to the lifetimes of the
+ // streams since we need them to synchronize the streams with
+ // the state thread.
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ atomic_init(&ctx->streams[i].in_use, false);
+ atomic_init(&ctx->streams[i].state, STREAM_STATE_INIT);
+ pthread_mutex_init(&ctx->streams[i].mutex, NULL);
+ }
+
+ err = pthread_create(&ctx->state.thread, NULL, state_thread, ctx);
+ if (err != 0) {
+ LOG("pthread_create: %s", strerror(err));
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
+
+ // TODO: we could set the priority of the notifier thread lower than
+ // the priority of the state thread. This way, it's more likely
+ // that the state thread will be woken up by the condition variable signal
+ // when both are currently waiting
+ err = pthread_create(&ctx->state.notifier, NULL, notifier_thread, ctx);
+ if (err != 0) {
+ LOG("pthread_create: %s", strerror(err));
+ aaudio_destroy(ctx);
+ return CUBEB_ERROR;
+ }
*context = ctx;
return CUBEB_OK;
@@ -487,6 +630,8 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
atomic_store(&stm->state, STREAM_STATE_DRAINING);
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
@@ -529,6 +674,8 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data,
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
atomic_store(&stm->state, STREAM_STATE_DRAINING);
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
@@ -574,6 +721,8 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
// stop it from the state thread. That is signaled via the
// DRAINING state.
atomic_store(&stm->state, STREAM_STATE_DRAINING);
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
}
return AAUDIO_CALLBACK_RESULT_CONTINUE;
@@ -686,10 +835,9 @@ aaudio_stream_destroy(cubeb_stream * stm)
free(stm->in_buf);
}
- atomic_store(&stm->in_use, false);
atomic_store(&stm->state, STREAM_STATE_INIT);
pthread_mutex_unlock(&stm->mutex);
- pthread_mutex_destroy(&stm->mutex);
+ atomic_store(&stm->in_use, false);
}
static int
@@ -718,20 +866,28 @@ aaudio_stream_init(cubeb * ctx,
return CUBEB_ERROR;
}
- // find a free stream
+ // atomically find a free stream.
cubeb_stream * stm = NULL;
- {
- pthread_mutex_lock(&ctx->mutex);
- for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
- if (!atomic_load(&ctx->streams[i].in_use)) {
- stm = &ctx->streams[i];
- break;
+ for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
+ // This check is only an optimization, we don't strictly need it
+ // since we check again after locking the mutex.
+ if (atomic_load(&ctx->streams[i].in_use)) {
+ continue;
+ }
+
+ // if this fails with EBUSY, another thread initialized this stream
+ // between our check of in_use and this.
+ int err = pthread_mutex_trylock(&ctx->streams[i].mutex);
+ if (err != 0 || atomic_load(&ctx->streams[i].in_use)) {
+ if (err && err != EBUSY) {
+ LOG("pthread_mutex_trylock: %s", strerror(err));
}
+
+ continue;
}
- atomic_store(&stm->state, STREAM_STATE_INIT);
- atomic_store(&stm->in_use, true);
- pthread_mutex_unlock(&ctx->mutex);
+ stm = &ctx->streams[i];
+ break;
}
if (!stm) {
@@ -739,10 +895,10 @@ aaudio_stream_init(cubeb * ctx,
return CUBEB_ERROR;
}
- unsigned res_err = CUBEB_ERROR;
- pthread_mutex_init(&stm->mutex, NULL);
- pthread_mutex_lock(&stm->mutex);
+ assert(atomic_load(&stm->state) == STREAM_STATE_INIT);
+ atomic_store(&stm->in_use, true);
+ unsigned res_err = CUBEB_ERROR;
stm->user_ptr = user_ptr;
stm->data_callback = data_callback;
stm->state_callback = state_callback;
@@ -860,6 +1016,9 @@ aaudio_stream_init(cubeb * ctx,
goto error;
}
+ // the stream isn't started initially. We don't need to differntiate
+ // between a stream that was just initialized and one that played
+ // already but was stopped
atomic_store(&stm->state, STREAM_STATE_STOPPED);
LOG("Cubeb stream (%p) init success", (void*) stm);
pthread_mutex_unlock(&stm->mutex);
@@ -948,7 +1107,8 @@ aaudio_stream_start(cubeb_stream * stm)
}
int ret = CUBEB_OK;
- while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING)) {
+ bool success;
+ while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
@@ -981,6 +1141,11 @@ aaudio_stream_start(cubeb_stream * stm)
break;
}
+ if(success) {
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
+ }
+
pthread_mutex_unlock(&stm->mutex);
return ret;
}
@@ -1061,7 +1226,8 @@ aaudio_stream_stop(cubeb_stream * stm)
}
int ret = CUBEB_OK;
- while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING)) {
+ bool success;
+ while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
@@ -1095,6 +1261,11 @@ aaudio_stream_stop(cubeb_stream * stm)
break;
}
+ if(success) {
+ atomic_store(&stm->context->state.waiting, true);
+ pthread_cond_signal(&stm->context->state.cond);
+ }
+
pthread_mutex_unlock(&stm->mutex);
return ret;
}
@@ -1178,6 +1349,8 @@ static const struct cubeb_ops aaudio_ops = {
.stream_stop = aaudio_stream_stop,
.stream_reset_default_device = NULL,
.stream_get_position = aaudio_stream_get_position,
+ // NOTE: this could be implemented via means comparable to the
+ // OpenSLES backend
.stream_get_latency = NULL,
.stream_set_volume = aaudio_stream_set_volume,
.stream_get_current_device = NULL,