diff options
-rw-r--r-- | src/cubeb_alsa.c | 185 |
1 files changed, 118 insertions, 67 deletions
diff --git a/src/cubeb_alsa.c b/src/cubeb_alsa.c index 3ece2da..64a1a0f 100644 --- a/src/cubeb_alsa.c +++ b/src/cubeb_alsa.c @@ -52,12 +52,17 @@ struct cubeb { /* Control pipe for forcing poll to wake and rebuild fds or recalculate the timeout. */ int control_fd_read; int control_fd_write; + + /* Track number of active streams. This is limited to CUBEB_STREAM_MAX + due to resource contraints. */ + unsigned int active_streams; }; enum stream_state { - IDLE, + INACTIVE, RUNNING, DRAINING, + PROCESSING, ERROR }; @@ -74,11 +79,11 @@ struct cubeb_stream { snd_pcm_uframes_t period_size; cubeb_stream_params params; - int slot; - /* Every member after this comment is protected by the owning context's mutex rather than the stream's mutex, or is only used on the context's run thread. */ + pthread_cond_t cond; /* Signaled when the stream's state is changed. */ + enum stream_state state; struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */ @@ -147,6 +152,22 @@ mutex_assert_not_held(struct mutex * mtx) assert(!mtx->locked || !pthread_equal(mtx->owner, pthread_self())); } +static void +cond_wait(pthread_cond_t * cond, struct mutex * mtx) +{ + int r; + + mutex_assert_held(mtx); + + r = pthread_cond_wait(cond, &mtx->mutex); + assert(r == 0); + + /* Reset state; another thread may have done a lock/unlock cycle during + our wait. */ + mtx->owner = pthread_self(); + mtx->locked = 1; +} + static long stream_data_callback(cubeb_stream * stm, void * user_ptr, void * buffer, long nframes) { @@ -288,14 +309,20 @@ set_timeout(struct timeval * timeout, unsigned int ms) static void cubeb_set_stream_state(cubeb_stream * stm, enum stream_state state) { + cubeb * ctx; + int r; + + ctx = stm->context; mutex_assert_not_held(&stm->mutex); - mutex_assert_held(&stm->context->mutex); + mutex_assert_held(&ctx->mutex); stm->state = state; - stm->context->rebuild = 1; - poll_wake(stm->context); + r = pthread_cond_broadcast(&stm->cond); + assert(r == 0); + ctx->rebuild = 1; + poll_wake(ctx); } -static void +static enum stream_state cubeb_refill_stream(cubeb_stream * stm) { int r; @@ -303,7 +330,9 @@ cubeb_refill_stream(cubeb_stream * stm) snd_pcm_sframes_t avail; long got; void * p; - int draining = 0; + int draining; + + draining = 0; mutex_lock(&stm->mutex); @@ -313,7 +342,7 @@ cubeb_refill_stream(cubeb_stream * stm) for this stream and then have the stream report that it's not ready. Unfortunately, this does happen, so just bail out and try again. */ mutex_unlock(&stm->mutex); - return; + return RUNNING; } avail = snd_pcm_avail_update(stm->pcm); @@ -325,9 +354,8 @@ cubeb_refill_stream(cubeb_stream * stm) /* Failed to recover from an xrun, this stream must be broken. */ if (avail < 0) { mutex_unlock(&stm->mutex); - cubeb_set_stream_state(stm, ERROR); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); - return; + return ERROR; } /* This should never happen. */ @@ -343,9 +371,8 @@ cubeb_refill_stream(cubeb_stream * stm) avail = snd_pcm_avail_update(stm->pcm); if (avail <= 0) { mutex_unlock(&stm->mutex); - cubeb_set_stream_state(stm, ERROR); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); - return; + return ERROR; } } @@ -357,9 +384,8 @@ cubeb_refill_stream(cubeb_stream * stm) mutex_lock(&stm->mutex); if (got < 0) { mutex_unlock(&stm->mutex); - cubeb_set_stream_state(stm, ERROR); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); - return; + return ERROR; } if (got > 0) { snd_pcm_sframes_t wrote = snd_pcm_writei(stm->pcm, p, got); @@ -386,33 +412,7 @@ cubeb_refill_stream(cubeb_stream * stm) free(p); mutex_unlock(&stm->mutex); - if (draining) { - cubeb_set_stream_state(stm, DRAINING); - } -} - -static int -calculate_timeout(cubeb * ctx, int initial) -{ - int i; - int timeout; - cubeb_stream * stm; - int ms; - - mutex_assert_held(&ctx->mutex); - - timeout = initial; - for (i = 0; i < CUBEB_STREAM_MAX; ++i) { - stm = ctx->streams[i]; - if (stm && stm->state == DRAINING) { - ms = ms_until(&stm->drain_timeout); - if (ms >= 0 && timeout > ms) { - timeout = ms; - } - } - } - - return timeout; + return draining ? DRAINING : RUNNING; } static int @@ -423,6 +423,7 @@ cubeb_run(cubeb * ctx) int i; char dummy; cubeb_stream * stm; + enum stream_state state; mutex_lock(&ctx->mutex); @@ -431,7 +432,16 @@ cubeb_run(cubeb * ctx) } /* Wake up at least once per second for the watchdog. */ - timeout = calculate_timeout(ctx, 1000); + timeout = 1000; + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + stm = ctx->streams[i]; + if (stm && stm->state == DRAINING) { + r = ms_until(&stm->drain_timeout); + if (r >= 0 && timeout > r) { + timeout = r; + } + } + } mutex_unlock(&ctx->mutex); r = poll(ctx->fds, ctx->nfds, timeout); @@ -450,7 +460,11 @@ cubeb_run(cubeb * ctx) for (i = 0; i < CUBEB_STREAM_MAX; ++i) { stm = ctx->streams[i]; if (stm && stm->state == RUNNING && stm->fds && any_revents(stm->fds, stm->nfds)) { - cubeb_refill_stream(stm); + cubeb_set_stream_state(stm, PROCESSING); + mutex_unlock(&ctx->mutex); + state = cubeb_refill_stream(stm); + mutex_lock(&ctx->mutex); + cubeb_set_stream_state(stm, state); } } } else if (r == 0) { @@ -458,7 +472,7 @@ cubeb_run(cubeb * ctx) stm = ctx->streams[i]; if (stm) { if (stm->state == DRAINING && ms_since(&stm->drain_timeout) >= 0) { - cubeb_set_stream_state(stm, IDLE); + cubeb_set_stream_state(stm, INACTIVE); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED); } else if (stm->state == RUNNING && ms_since(&stm->last_activity) > CUBEB_WATCHDOG_MS) { cubeb_set_stream_state(stm, ERROR); @@ -514,28 +528,34 @@ static int cubeb_register_stream(cubeb * ctx, cubeb_stream * stm) { int i; - int r = -1; mutex_lock(&ctx->mutex); for (i = 0; i < CUBEB_STREAM_MAX; ++i) { if (!ctx->streams[i]) { ctx->streams[i] = stm; - r = i; break; } } mutex_unlock(&ctx->mutex); - return r; + return i == CUBEB_STREAM_MAX; } static void -cubeb_unregister_stream(cubeb * ctx, int slot) +cubeb_unregister_stream(cubeb_stream * stm) { - assert(slot >= 0 && slot < CUBEB_STREAM_MAX); + cubeb * ctx; + int i; + + ctx = stm->context; mutex_lock(&ctx->mutex); - ctx->streams[slot] = NULL; + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + if (ctx->streams[i] == stm) { + ctx->streams[i] = NULL; + break; + } + } mutex_unlock(&ctx->mutex); } @@ -625,7 +645,7 @@ cubeb_destroy(cubeb * ctx) } int -cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_name UNUSED, +cubeb_stream_init(cubeb * ctx, cubeb_stream ** stream, char const * stream_name UNUSED, cubeb_stream_params stream_params, unsigned int latency, cubeb_data_callback data_callback, cubeb_state_callback state_callback, void * user_ptr) @@ -634,7 +654,7 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n int r; snd_pcm_format_t format; - assert(context && stream); + assert(ctx && stream); *stream = NULL; @@ -661,16 +681,23 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n return CUBEB_ERROR_INVALID_FORMAT; } + mutex_lock(&ctx->mutex); + if (ctx->active_streams >= CUBEB_STREAM_MAX) { + mutex_unlock(&ctx->mutex); + return CUBEB_ERROR; + } + ctx->active_streams += 1; + mutex_unlock(&ctx->mutex); + stm = calloc(1, sizeof(*stm)); assert(stm); - stm->context = context; + stm->context = ctx; stm->data_callback = data_callback; stm->state_callback = state_callback; stm->user_ptr = user_ptr; stm->params = stream_params; - stm->state = IDLE; - stm->slot = -1; + stm->state = INACTIVE; mutex_init(&stm->mutex); @@ -702,8 +729,10 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n r = snd_pcm_poll_descriptors(stm->pcm, stm->saved_fds, stm->nfds); assert((nfds_t) r == stm->nfds); - stm->slot = cubeb_register_stream(context, stm); - if (stm->slot == -1) { + r = pthread_cond_init(&stm->cond, NULL); + assert(r == 0); + + if (cubeb_register_stream(ctx, stm) != 0) { cubeb_stream_destroy(stm); return CUBEB_ERROR; } @@ -716,7 +745,12 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n void cubeb_stream_destroy(cubeb_stream * stm) { - assert(stm && (stm->state == IDLE || stm->state == ERROR)); + int r; + cubeb * ctx; + + assert(stm && (stm->state == INACTIVE || stm->state == ERROR)); + + ctx = stm->context; mutex_lock(&stm->mutex); if (stm->pcm) { @@ -727,24 +761,34 @@ cubeb_stream_destroy(cubeb_stream * stm) mutex_unlock(&stm->mutex); mutex_destroy(&stm->mutex); - if (stm->slot != -1) { - cubeb_unregister_stream(stm->context, stm->slot); - } + r = pthread_cond_destroy(&stm->cond); + assert(r == 0); + + cubeb_unregister_stream(stm); + + mutex_lock(&ctx->mutex); + assert(ctx->active_streams >= 1); + ctx->active_streams -= 1; + mutex_unlock(&ctx->mutex); + free(stm); } int cubeb_stream_start(cubeb_stream * stm) { + cubeb * ctx; + assert(stm); + ctx = stm->context; mutex_lock(&stm->mutex); snd_pcm_pause(stm->pcm, 0); mutex_unlock(&stm->mutex); - mutex_lock(&stm->context->mutex); + mutex_lock(&ctx->mutex); cubeb_set_stream_state(stm, RUNNING); - mutex_unlock(&stm->context->mutex); + mutex_unlock(&ctx->mutex); return CUBEB_OK; } @@ -752,11 +796,18 @@ cubeb_stream_start(cubeb_stream * stm) int cubeb_stream_stop(cubeb_stream * stm) { + cubeb * ctx; + assert(stm); + ctx = stm->context; - mutex_lock(&stm->context->mutex); - cubeb_set_stream_state(stm, IDLE); - mutex_unlock(&stm->context->mutex); + mutex_lock(&ctx->mutex); + while (stm->state == PROCESSING) { + cond_wait(&stm->cond, &ctx->mutex); + } + + cubeb_set_stream_state(stm, INACTIVE); + mutex_unlock(&ctx->mutex); mutex_lock(&stm->mutex); snd_pcm_pause(stm->pcm, 1); |