aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/cubeb_alsa.c185
1 files changed, 118 insertions, 67 deletions
diff --git a/src/cubeb_alsa.c b/src/cubeb_alsa.c
index 3ece2da..64a1a0f 100644
--- a/src/cubeb_alsa.c
+++ b/src/cubeb_alsa.c
@@ -52,12 +52,17 @@ struct cubeb {
/* Control pipe for forcing poll to wake and rebuild fds or recalculate the timeout. */
int control_fd_read;
int control_fd_write;
+
+ /* Track number of active streams. This is limited to CUBEB_STREAM_MAX
+ due to resource contraints. */
+ unsigned int active_streams;
};
enum stream_state {
- IDLE,
+ INACTIVE,
RUNNING,
DRAINING,
+ PROCESSING,
ERROR
};
@@ -74,11 +79,11 @@ struct cubeb_stream {
snd_pcm_uframes_t period_size;
cubeb_stream_params params;
- int slot;
-
/* Every member after this comment is protected by the owning context's
mutex rather than the stream's mutex, or is only used on the context's
run thread. */
+ pthread_cond_t cond; /* Signaled when the stream's state is changed. */
+
enum stream_state state;
struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */
@@ -147,6 +152,22 @@ mutex_assert_not_held(struct mutex * mtx)
assert(!mtx->locked || !pthread_equal(mtx->owner, pthread_self()));
}
+static void
+cond_wait(pthread_cond_t * cond, struct mutex * mtx)
+{
+ int r;
+
+ mutex_assert_held(mtx);
+
+ r = pthread_cond_wait(cond, &mtx->mutex);
+ assert(r == 0);
+
+ /* Reset state; another thread may have done a lock/unlock cycle during
+ our wait. */
+ mtx->owner = pthread_self();
+ mtx->locked = 1;
+}
+
static long
stream_data_callback(cubeb_stream * stm, void * user_ptr, void * buffer, long nframes)
{
@@ -288,14 +309,20 @@ set_timeout(struct timeval * timeout, unsigned int ms)
static void
cubeb_set_stream_state(cubeb_stream * stm, enum stream_state state)
{
+ cubeb * ctx;
+ int r;
+
+ ctx = stm->context;
mutex_assert_not_held(&stm->mutex);
- mutex_assert_held(&stm->context->mutex);
+ mutex_assert_held(&ctx->mutex);
stm->state = state;
- stm->context->rebuild = 1;
- poll_wake(stm->context);
+ r = pthread_cond_broadcast(&stm->cond);
+ assert(r == 0);
+ ctx->rebuild = 1;
+ poll_wake(ctx);
}
-static void
+static enum stream_state
cubeb_refill_stream(cubeb_stream * stm)
{
int r;
@@ -303,7 +330,9 @@ cubeb_refill_stream(cubeb_stream * stm)
snd_pcm_sframes_t avail;
long got;
void * p;
- int draining = 0;
+ int draining;
+
+ draining = 0;
mutex_lock(&stm->mutex);
@@ -313,7 +342,7 @@ cubeb_refill_stream(cubeb_stream * stm)
for this stream and then have the stream report that it's not ready.
Unfortunately, this does happen, so just bail out and try again. */
mutex_unlock(&stm->mutex);
- return;
+ return RUNNING;
}
avail = snd_pcm_avail_update(stm->pcm);
@@ -325,9 +354,8 @@ cubeb_refill_stream(cubeb_stream * stm)
/* Failed to recover from an xrun, this stream must be broken. */
if (avail < 0) {
mutex_unlock(&stm->mutex);
- cubeb_set_stream_state(stm, ERROR);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
- return;
+ return ERROR;
}
/* This should never happen. */
@@ -343,9 +371,8 @@ cubeb_refill_stream(cubeb_stream * stm)
avail = snd_pcm_avail_update(stm->pcm);
if (avail <= 0) {
mutex_unlock(&stm->mutex);
- cubeb_set_stream_state(stm, ERROR);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
- return;
+ return ERROR;
}
}
@@ -357,9 +384,8 @@ cubeb_refill_stream(cubeb_stream * stm)
mutex_lock(&stm->mutex);
if (got < 0) {
mutex_unlock(&stm->mutex);
- cubeb_set_stream_state(stm, ERROR);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
- return;
+ return ERROR;
}
if (got > 0) {
snd_pcm_sframes_t wrote = snd_pcm_writei(stm->pcm, p, got);
@@ -386,33 +412,7 @@ cubeb_refill_stream(cubeb_stream * stm)
free(p);
mutex_unlock(&stm->mutex);
- if (draining) {
- cubeb_set_stream_state(stm, DRAINING);
- }
-}
-
-static int
-calculate_timeout(cubeb * ctx, int initial)
-{
- int i;
- int timeout;
- cubeb_stream * stm;
- int ms;
-
- mutex_assert_held(&ctx->mutex);
-
- timeout = initial;
- for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
- stm = ctx->streams[i];
- if (stm && stm->state == DRAINING) {
- ms = ms_until(&stm->drain_timeout);
- if (ms >= 0 && timeout > ms) {
- timeout = ms;
- }
- }
- }
-
- return timeout;
+ return draining ? DRAINING : RUNNING;
}
static int
@@ -423,6 +423,7 @@ cubeb_run(cubeb * ctx)
int i;
char dummy;
cubeb_stream * stm;
+ enum stream_state state;
mutex_lock(&ctx->mutex);
@@ -431,7 +432,16 @@ cubeb_run(cubeb * ctx)
}
/* Wake up at least once per second for the watchdog. */
- timeout = calculate_timeout(ctx, 1000);
+ timeout = 1000;
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ stm = ctx->streams[i];
+ if (stm && stm->state == DRAINING) {
+ r = ms_until(&stm->drain_timeout);
+ if (r >= 0 && timeout > r) {
+ timeout = r;
+ }
+ }
+ }
mutex_unlock(&ctx->mutex);
r = poll(ctx->fds, ctx->nfds, timeout);
@@ -450,7 +460,11 @@ cubeb_run(cubeb * ctx)
for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
stm = ctx->streams[i];
if (stm && stm->state == RUNNING && stm->fds && any_revents(stm->fds, stm->nfds)) {
- cubeb_refill_stream(stm);
+ cubeb_set_stream_state(stm, PROCESSING);
+ mutex_unlock(&ctx->mutex);
+ state = cubeb_refill_stream(stm);
+ mutex_lock(&ctx->mutex);
+ cubeb_set_stream_state(stm, state);
}
}
} else if (r == 0) {
@@ -458,7 +472,7 @@ cubeb_run(cubeb * ctx)
stm = ctx->streams[i];
if (stm) {
if (stm->state == DRAINING && ms_since(&stm->drain_timeout) >= 0) {
- cubeb_set_stream_state(stm, IDLE);
+ cubeb_set_stream_state(stm, INACTIVE);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED);
} else if (stm->state == RUNNING && ms_since(&stm->last_activity) > CUBEB_WATCHDOG_MS) {
cubeb_set_stream_state(stm, ERROR);
@@ -514,28 +528,34 @@ static int
cubeb_register_stream(cubeb * ctx, cubeb_stream * stm)
{
int i;
- int r = -1;
mutex_lock(&ctx->mutex);
for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
if (!ctx->streams[i]) {
ctx->streams[i] = stm;
- r = i;
break;
}
}
mutex_unlock(&ctx->mutex);
- return r;
+ return i == CUBEB_STREAM_MAX;
}
static void
-cubeb_unregister_stream(cubeb * ctx, int slot)
+cubeb_unregister_stream(cubeb_stream * stm)
{
- assert(slot >= 0 && slot < CUBEB_STREAM_MAX);
+ cubeb * ctx;
+ int i;
+
+ ctx = stm->context;
mutex_lock(&ctx->mutex);
- ctx->streams[slot] = NULL;
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ if (ctx->streams[i] == stm) {
+ ctx->streams[i] = NULL;
+ break;
+ }
+ }
mutex_unlock(&ctx->mutex);
}
@@ -625,7 +645,7 @@ cubeb_destroy(cubeb * ctx)
}
int
-cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_name UNUSED,
+cubeb_stream_init(cubeb * ctx, cubeb_stream ** stream, char const * stream_name UNUSED,
cubeb_stream_params stream_params, unsigned int latency,
cubeb_data_callback data_callback, cubeb_state_callback state_callback,
void * user_ptr)
@@ -634,7 +654,7 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
int r;
snd_pcm_format_t format;
- assert(context && stream);
+ assert(ctx && stream);
*stream = NULL;
@@ -661,16 +681,23 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
return CUBEB_ERROR_INVALID_FORMAT;
}
+ mutex_lock(&ctx->mutex);
+ if (ctx->active_streams >= CUBEB_STREAM_MAX) {
+ mutex_unlock(&ctx->mutex);
+ return CUBEB_ERROR;
+ }
+ ctx->active_streams += 1;
+ mutex_unlock(&ctx->mutex);
+
stm = calloc(1, sizeof(*stm));
assert(stm);
- stm->context = context;
+ stm->context = ctx;
stm->data_callback = data_callback;
stm->state_callback = state_callback;
stm->user_ptr = user_ptr;
stm->params = stream_params;
- stm->state = IDLE;
- stm->slot = -1;
+ stm->state = INACTIVE;
mutex_init(&stm->mutex);
@@ -702,8 +729,10 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
r = snd_pcm_poll_descriptors(stm->pcm, stm->saved_fds, stm->nfds);
assert((nfds_t) r == stm->nfds);
- stm->slot = cubeb_register_stream(context, stm);
- if (stm->slot == -1) {
+ r = pthread_cond_init(&stm->cond, NULL);
+ assert(r == 0);
+
+ if (cubeb_register_stream(ctx, stm) != 0) {
cubeb_stream_destroy(stm);
return CUBEB_ERROR;
}
@@ -716,7 +745,12 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
void
cubeb_stream_destroy(cubeb_stream * stm)
{
- assert(stm && (stm->state == IDLE || stm->state == ERROR));
+ int r;
+ cubeb * ctx;
+
+ assert(stm && (stm->state == INACTIVE || stm->state == ERROR));
+
+ ctx = stm->context;
mutex_lock(&stm->mutex);
if (stm->pcm) {
@@ -727,24 +761,34 @@ cubeb_stream_destroy(cubeb_stream * stm)
mutex_unlock(&stm->mutex);
mutex_destroy(&stm->mutex);
- if (stm->slot != -1) {
- cubeb_unregister_stream(stm->context, stm->slot);
- }
+ r = pthread_cond_destroy(&stm->cond);
+ assert(r == 0);
+
+ cubeb_unregister_stream(stm);
+
+ mutex_lock(&ctx->mutex);
+ assert(ctx->active_streams >= 1);
+ ctx->active_streams -= 1;
+ mutex_unlock(&ctx->mutex);
+
free(stm);
}
int
cubeb_stream_start(cubeb_stream * stm)
{
+ cubeb * ctx;
+
assert(stm);
+ ctx = stm->context;
mutex_lock(&stm->mutex);
snd_pcm_pause(stm->pcm, 0);
mutex_unlock(&stm->mutex);
- mutex_lock(&stm->context->mutex);
+ mutex_lock(&ctx->mutex);
cubeb_set_stream_state(stm, RUNNING);
- mutex_unlock(&stm->context->mutex);
+ mutex_unlock(&ctx->mutex);
return CUBEB_OK;
}
@@ -752,11 +796,18 @@ cubeb_stream_start(cubeb_stream * stm)
int
cubeb_stream_stop(cubeb_stream * stm)
{
+ cubeb * ctx;
+
assert(stm);
+ ctx = stm->context;
- mutex_lock(&stm->context->mutex);
- cubeb_set_stream_state(stm, IDLE);
- mutex_unlock(&stm->context->mutex);
+ mutex_lock(&ctx->mutex);
+ while (stm->state == PROCESSING) {
+ cond_wait(&stm->cond, &ctx->mutex);
+ }
+
+ cubeb_set_stream_state(stm, INACTIVE);
+ mutex_unlock(&ctx->mutex);
mutex_lock(&stm->mutex);
snd_pcm_pause(stm->pcm, 1);