aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cubeb_alsa.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/cubeb_alsa.c')
-rw-r--r--src/cubeb_alsa.c766
1 files changed, 240 insertions, 526 deletions
diff --git a/src/cubeb_alsa.c b/src/cubeb_alsa.c
index 295b5d1..fb8bb5c 100644
--- a/src/cubeb_alsa.c
+++ b/src/cubeb_alsa.c
@@ -17,6 +17,7 @@
#include "cubeb/cubeb.h"
#define CUBEB_STREAM_MAX 16
+#define CUBEB_WATCHDOG_MS 10000
#define UNUSED __attribute__ ((__unused__))
/* ALSA is not thread-safe. snd_pcm_t instances are individually protected
@@ -24,10 +25,7 @@
is not thread-safe until ALSA 1.0.24 (see alsa-lib.git commit 91c9c8f1),
so those calls must be wrapped in the following mutex. */
static pthread_mutex_t cubeb_alsa_mutex = PTHREAD_MUTEX_INITIALIZER;
-static int cubeb_alsa_set_error_handler = 0;
-
-typedef void (*poll_waitable_callback)(void * user_ptr, struct pollfd * fds, nfds_t nfds);
-typedef void (*poll_timer_callback)(void * user_ptr);
+static int cubeb_alsa_error_handler_set = 0;
struct mutex {
pthread_mutex_t mutex;
@@ -35,40 +33,14 @@ struct mutex {
int locked;
};
-struct poll_timer {
- struct poll_timer * next;
- struct poll_timer * prev;
-
- struct cubeb * context;
-
- struct timeval wakeup;
-
- poll_timer_callback callback;
- void * user_ptr;
-};
-
-struct poll_waitable {
- struct poll_waitable * next;
- struct poll_waitable * prev;
-
- struct cubeb * context;
-
- struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */
- struct pollfd * fds; /* Pointer to this waitable's pollfds within struct cubeb's fds. */
- nfds_t nfds;
-
- poll_waitable_callback callback;
- void * user_ptr;
-
- unsigned int idle_count;
- unsigned int refs;
-};
-
struct cubeb {
pthread_t thread;
- struct poll_timer * timer;
- struct poll_waitable * waitable;
+ /* Mutex for streams array, must not be held while blocked in poll(2). */
+ struct mutex mutex;
+
+ /* Sparse array of streams managed by this context. */
+ cubeb_stream * streams[CUBEB_STREAM_MAX];
/* fds and nfds are only updated by cubeb_run when rebuild is set. */
struct pollfd * fds;
@@ -77,20 +49,16 @@ struct cubeb {
int shutdown;
- /* Control pipe for forcing poll to wake and rebuild fds or recalculate timeout. */
+ /* Control pipe for forcing poll to wake and rebuild fds or recalculate the timeout. */
int control_fd_read;
int control_fd_write;
+};
- /* Mutex for timer and waitable lists, must not be held while blocked in
- poll(2) or when writing to control_fd */
- struct mutex mutex;
- pthread_cond_t cond;
-
- int phase;
-
- unsigned int active_streams;
-
- struct poll_timer * watchdog_timer;
+enum stream_state {
+ IDLE,
+ RUNNING,
+ DRAINING,
+ ERROR
};
struct cubeb_stream {
@@ -106,8 +74,25 @@ struct cubeb_stream {
snd_pcm_uframes_t period_size;
cubeb_stream_params params;
- struct poll_waitable * waitable;
- struct poll_timer * timer;
+ int slot;
+
+ /* Every member after this comment is protected by the owning context's
+ mutex rather than the stream's mutex, or is only used on the context's
+ run thread. */
+ enum stream_state state;
+
+ struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */
+ struct pollfd * fds; /* Pointer to this waitable's pollfds within struct cubeb's fds. */
+ nfds_t nfds;
+
+ struct timeval drain_timeout;
+
+ /* XXX: Horrible hack -- if an active stream has been idle for
+ CUBEB_WATCHDOG_MS it will be disabled and the error callback will be
+ called. This works around a bug seen with older versions of ALSA and
+ PulseAudio where streams would stop requesting new data despite still
+ being logically active and playing. */
+ struct timeval last_activity;
};
static void
@@ -208,449 +193,121 @@ timeval_to_relative_ms(struct timeval * tv)
struct timeval now;
struct timeval dt;
long long t;
+ int r;
gettimeofday(&now, NULL);
- if (cmp_timeval(tv, &now) <= 0) {
- return 0;
+ r = cmp_timeval(tv, &now);
+ if (r >= 0) {
+ timersub(tv, &now, &dt);
+ } else {
+ timersub(&now, tv, &dt);
}
-
- timersub(tv, &now, &dt);
t = dt.tv_sec;
t *= 1000;
t += (dt.tv_usec + 500) / 1000;
- return t <= INT_MAX ? t : INT_MAX;
-}
-
-static void
-pipe_init(int * read_fd, int * write_fd)
-{
- int r;
- int fd[2];
- r = pipe(fd);
- assert(r == 0);
+ if (t > INT_MAX) {
+ t = INT_MAX;
+ } else if (t < INT_MIN) {
+ t = INT_MIN;
+ }
- *read_fd = fd[0];
- *write_fd = fd[1];
+ return r >= 0 ? t : -t;
}
-static void
-set_close_on_exec(int fd)
+static int
+ms_until(struct timeval * tv)
{
- long flags;
- int r;
-
- assert(fd >= 0);
-
- flags = fcntl(fd, F_GETFD);
- assert(flags >= 0);
-
- r = fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
- assert(r == 0);
+ return timeval_to_relative_ms(tv);
}
-static void
-set_non_block(int fd)
+static int
+ms_since(struct timeval * tv)
{
- long flags;
- int r;
-
- assert(fd >= 0);
-
- flags = fcntl(fd, F_GETFL);
- assert(flags >= 0);
-
- r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
- assert(r == 0);
+ return -timeval_to_relative_ms(tv);
}
static void
-rebuild(struct cubeb * ctx)
+rebuild(cubeb * ctx)
{
nfds_t nfds;
int i;
- struct poll_waitable * item;
+ nfds_t j;
+ cubeb_stream * stm;
mutex_assert_held(&ctx->mutex);
assert(ctx->rebuild);
- nfds = 0;
- for (item = ctx->waitable; item; item = item->next) {
- nfds += item->nfds;
+ /* Always count context's control pipe fd. */
+ nfds = 1;
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ stm = ctx->streams[i];
+ if (stm) {
+ stm->fds = NULL;
+ if (stm->state == RUNNING) {
+ nfds += stm->nfds;
+ }
+ }
}
- /* Special case: add control pipe fd. */
- nfds += 1;
-
free(ctx->fds);
ctx->fds = calloc(nfds, sizeof(struct pollfd));
assert(ctx->fds);
ctx->nfds = nfds;
- for (i = 0, item = ctx->waitable; item; item = item->next) {
- memcpy(&ctx->fds[i], item->saved_fds, item->nfds * sizeof(struct pollfd));
- item->fds = &ctx->fds[i];
- i += item->nfds;
- }
-
- /* Special case: add control pipe fd. */
- ctx->fds[i].fd = ctx->control_fd_read;
- ctx->fds[i].events = POLLIN | POLLERR;
-
- ctx->rebuild = 0;
-}
-
-static void
-poll_woke(struct cubeb * ctx)
-{
- ssize_t r;
- char dummy;
-
- r = read(ctx->control_fd_read, &dummy, 1);
- assert(dummy == 'x' && r == 1);
-}
-
-static void
-poll_wake(struct cubeb * ctx)
-{
- ssize_t r;
- char dummy;
-
- dummy = 'x';
- r = write(ctx->control_fd_write, &dummy, 1);
- assert(r == 1);
-}
-
-static struct poll_waitable *
-poll_waitable_init(struct cubeb * ctx, struct pollfd * fds, nfds_t nfds,
- poll_waitable_callback callback, void * user_ptr)
-{
- struct poll_waitable * waitable;
-
- waitable = calloc(1, sizeof(struct poll_waitable));
- assert(waitable);
- waitable->context = ctx;
-
- waitable->saved_fds = calloc(nfds, sizeof(struct pollfd));
- assert(waitable->saved_fds);
- waitable->nfds = nfds;
- memcpy(waitable->saved_fds, fds, nfds * sizeof(struct pollfd));
-
- waitable->callback = callback;
- waitable->user_ptr = user_ptr;
-
- waitable->idle_count = 0;
- waitable->refs = 1;
+ /* Include context's control pipe fd. */
+ ctx->fds[0].fd = ctx->control_fd_read;
+ ctx->fds[0].events = POLLIN | POLLERR;
- mutex_lock(&ctx->mutex);
-
- waitable->next = ctx->waitable;
- if (ctx->waitable) {
- ctx->waitable->prev = waitable;
- }
- ctx->waitable = waitable;
- ctx->rebuild = 1;
-
- poll_wake(ctx);
- mutex_unlock(&ctx->mutex);
-
- return waitable;
-}
-
-static void
-poll_waitable_ref(struct poll_waitable * w)
-{
- struct cubeb * ctx = w->context;
-
- mutex_assert_held(&ctx->mutex);
- w->refs += 1;
-}
-
-static void
-poll_waitable_unref(struct poll_waitable * w)
-{
- struct cubeb * ctx = w->context;
-
- mutex_lock(&ctx->mutex);
-
- w->refs -= 1;
-
- if (w->refs == 0) {
- if (w->next) {
- w->next->prev = w->prev;
- }
- if (w->prev) {
- w->prev->next = w->next;
- }
-
- if (ctx->waitable == w) {
- ctx->waitable = w->next;
- }
-
- free(w->saved_fds);
- free(w);
-
- ctx->rebuild = 1;
- poll_wake(ctx);
- }
-
- mutex_unlock(&ctx->mutex);
-}
-
-static struct poll_timer *
-poll_timer_absolute_init(struct cubeb * ctx, struct timeval * wakeup,
- poll_timer_callback callback, void * user_ptr)
-{
- struct poll_timer * timer;
- struct poll_timer * item;
-
- timer = calloc(1, sizeof(*timer));
- assert(timer);
- timer->context = ctx;
- timer->wakeup = *wakeup;
- timer->callback = callback;
- timer->user_ptr = user_ptr;
-
- mutex_lock(&ctx->mutex);
-
- for (item = ctx->timer; item; item = item->next) {
- if (cmp_timeval(&timer->wakeup, &item->wakeup) < 0) {
- timer->next = item;
- timer->prev = item->prev;
-
- if (timer->prev) {
- timer->prev->next = timer;
- }
- item->prev = timer;
-
- break;
- }
-
- if (!item->next) {
- item->next = timer;
- timer->prev = item;
- break;
+ for (i = 0, j = 1; i < CUBEB_STREAM_MAX; ++i) {
+ stm = ctx->streams[i];
+ if (stm && stm->state == RUNNING) {
+ memcpy(&ctx->fds[j], stm->saved_fds, stm->nfds * sizeof(struct pollfd));
+ stm->fds = &ctx->fds[j];
+ j += stm->nfds;
}
}
- if (!timer->prev) {
- ctx->timer = timer;
- }
-
- poll_wake(ctx);
- mutex_unlock(&ctx->mutex);
-
- return timer;
-}
-
-static struct poll_timer *
-poll_timer_relative_init(struct cubeb * ctx, unsigned int ms,
- poll_timer_callback callback, void * user_ptr)
-{
- struct timeval wakeup;
-
- gettimeofday(&wakeup, NULL);
- wakeup.tv_sec += ms / 1000;
- wakeup.tv_usec += (ms % 1000) * 1000;
-
- return poll_timer_absolute_init(ctx, &wakeup, callback, user_ptr);
+ ctx->rebuild = 0;
}
static void
-poll_timer_destroy(struct poll_timer * t)
+poll_wake(cubeb * ctx)
{
- struct cubeb * ctx = t->context;
-
- mutex_lock(&ctx->mutex);
-
- if (t->next) {
- t->next->prev = t->prev;
- }
- if (t->prev) {
- t->prev->next = t->next;
- }
-
- if (ctx->timer == t) {
- ctx->timer = t->next;
- }
-
- free(t);
-
- poll_wake(ctx);
- mutex_unlock(&ctx->mutex);
+ write(ctx->control_fd_write, "x", 1);
}
static void
-poll_phase_wait(cubeb * ctx)
+set_timeout(struct timeval * timeout, unsigned int ms)
{
- int phase;
-
- mutex_lock(&ctx->mutex);
- phase = ctx->phase;
- while (ctx->phase == phase) {
- pthread_cond_wait(&ctx->cond, &ctx->mutex.mutex);
- }
- mutex_unlock(&ctx->mutex);
-}
-
-static int
-cubeb_run(struct cubeb * ctx)
-{
- int r;
- int timeout;
- struct poll_waitable * waitable;
- struct poll_timer * timer;
- struct poll_waitable ** ready;
- int i;
-
- mutex_lock(&ctx->mutex);
-
- if (ctx->rebuild) {
- rebuild(ctx);
- }
-
- timeout = -1;
- timer = ctx->timer;
- if (timer) {
- timeout = timeval_to_relative_ms(&timer->wakeup);
- }
-
- /* No timers or waitables, we're done. */
- if (timeout == -1 && ctx->nfds == 0) {
- return -1;
- }
-
- mutex_unlock(&ctx->mutex);
- r = poll(ctx->fds, ctx->nfds, timeout);
- mutex_lock(&ctx->mutex);
-
- if (r > 0) {
- if (ctx->fds[ctx->nfds - 1].revents & POLLIN) {
- poll_woke(ctx);
- if (ctx->shutdown) {
- mutex_unlock(&ctx->mutex);
- return -1;
- }
- }
-
- i = 0;
- ready = calloc(r, sizeof(struct poll_waitable *));
-
- /* TODO: Break once r pfds have been processed, ideally with a waitable
- list sorted by latency. */
- for (waitable = ctx->waitable; waitable; waitable = waitable->next) {
- if (waitable->fds && any_revents(waitable->fds, waitable->nfds)) {
- poll_waitable_ref(waitable);
- ready[i++] = waitable;
- waitable->idle_count = 0;
- }
- }
-
- mutex_unlock(&ctx->mutex);
- for (i = 0; i < r; ++i) {
- if (!ready[i]) {
- break;
- }
- ready[i]->callback(ready[i]->user_ptr, ready[i]->fds, ready[i]->nfds);
- poll_waitable_unref(ready[i]);
- }
- mutex_lock(&ctx->mutex);
-
- free(ready);
- } else if (r == 0) {
- assert(timer);
- mutex_unlock(&ctx->mutex);
- timer->callback(timer->user_ptr);
- mutex_lock(&ctx->mutex);
- }
-
- ctx->phase += 1;
- pthread_cond_broadcast(&ctx->cond);
-
- mutex_unlock(&ctx->mutex);
-
- return 0;
-}
-
-static void
-cubeb_watchdog(void * context)
-{
- cubeb * ctx = context;
- struct poll_waitable * waitable;
- struct poll_waitable * tmp[16];
- int broken_streams;
- int i;
-
- if (ctx->watchdog_timer) {
- poll_timer_destroy(ctx->watchdog_timer);
- }
-
- ctx->watchdog_timer = poll_timer_relative_init(ctx, 1000, cubeb_watchdog, ctx);
-
- mutex_lock(&ctx->mutex);
- /* XXX: Horrible hack -- if an active (registered) stream has been idle
- for 10 ticks of the watchdog, kill it and mark the stream in error.
- This works around a bug seen with older versions of ALSA and PulseAudio
- where streams would stop requesting new data despite still being
- logically active and playing. */
- broken_streams = 0;
- for (waitable = ctx->waitable; waitable; waitable = waitable->next) {
- waitable->idle_count += 1;
- if (waitable->idle_count >= 10) {
- poll_waitable_ref(waitable);
- tmp[broken_streams++] = waitable;
- }
- }
-
- mutex_unlock(&ctx->mutex);
-
- for (i = 0; i < broken_streams; ++i) {
- cubeb_stream * stm = tmp[i]->user_ptr;
- assert(tmp[i]->idle_count >= 10);
- mutex_lock(&stm->mutex);
- if (stm->waitable) {
- poll_waitable_unref(stm->waitable);
- stm->waitable = NULL;
- }
- mutex_unlock(&stm->mutex);
- poll_waitable_unref(tmp[i]);
- stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
- }
+ gettimeofday(timeout, NULL);
+ timeout->tv_sec += ms / 1000;
+ timeout->tv_usec += (ms % 1000) * 1000;
}
static void
-cubeb_drain_stream(void * stream)
+cubeb_set_stream_state(cubeb_stream * stm, enum stream_state state)
{
- cubeb_stream * stm = stream;
- int drained = 0;
-
- mutex_lock(&stm->mutex);
- /* It's possible that the stream was stopped after the timer fired but
- before we locked the stream. */
- if (stm->timer) {
- poll_timer_destroy(stm->timer);
- stm->timer = NULL;
- drained = 1;
- }
- mutex_unlock(&stm->mutex);
- if (drained) {
- stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED);
- }
+ mutex_assert_not_held(&stm->mutex);
+ mutex_assert_held(&stm->context->mutex);
+ stm->state = state;
+ stm->context->rebuild = 1;
+ poll_wake(stm->context);
}
static void
-cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds)
+cubeb_refill_stream(cubeb_stream * stm)
{
- cubeb_stream * stm = stream;
int r;
unsigned short revents;
snd_pcm_sframes_t avail;
long got;
void * p;
+ int draining = 0;
mutex_lock(&stm->mutex);
- r = snd_pcm_poll_descriptors_revents(stm->pcm, fds, nfds, &revents);
+ r = snd_pcm_poll_descriptors_revents(stm->pcm, stm->fds, stm->nfds, &revents);
if (r < 0 || revents != POLLOUT) {
/* This should be a stream error; it makes no sense for poll(2) to wake
for this stream and then have the stream report that it's not ready.
@@ -667,9 +324,8 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds)
/* Failed to recover from an xrun, this stream must be broken. */
if (avail < 0) {
- poll_waitable_unref(stm->waitable);
- stm->waitable = NULL;
mutex_unlock(&stm->mutex);
+ cubeb_set_stream_state(stm, ERROR);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
return;
}
@@ -686,9 +342,8 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds)
snd_pcm_recover(stm->pcm, -EPIPE, 1);
avail = snd_pcm_avail_update(stm->pcm);
if (avail <= 0) {
- poll_waitable_unref(stm->waitable);
- stm->waitable = NULL;
mutex_unlock(&stm->mutex);
+ cubeb_set_stream_state(stm, ERROR);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
return;
}
@@ -701,9 +356,8 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds)
got = stream_data_callback(stm, stm->user_ptr, p, avail);
mutex_lock(&stm->mutex);
if (got < 0) {
- poll_waitable_unref(stm->waitable);
- stm->waitable = NULL;
mutex_unlock(&stm->mutex);
+ cubeb_set_stream_state(stm, ERROR);
stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
return;
}
@@ -715,23 +369,108 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds)
}
assert(wrote >= 0 && wrote == got);
stm->write_position += wrote;
+ gettimeofday(&stm->last_activity, NULL);
}
if (got != avail) {
long buffer_fill = stm->buffer_size - (avail - got);
double buffer_time = (double) buffer_fill / stm->params.rate;
- /* Fill the remaining buffer with silence to guarantee at least a period has been written. */
+ /* Fill the remaining buffer with silence to guarantee one full period
+ * has been written. */
snd_pcm_writei(stm->pcm, (char *) p + got, avail - got);
- poll_waitable_unref(stm->waitable);
- stm->waitable = NULL;
+ set_timeout(&stm->drain_timeout, buffer_time * 1000);
- stm->timer = poll_timer_relative_init(stm->context, buffer_time * 1000,
- cubeb_drain_stream, stm);
+ draining = 1;
}
free(p);
mutex_unlock(&stm->mutex);
+ if (draining) {
+ cubeb_set_stream_state(stm, DRAINING);
+ }
+}
+
+static int
+calculate_timeout(cubeb * ctx, int initial)
+{
+ int i;
+ int timeout;
+ cubeb_stream * stm;
+ int ms;
+
+ mutex_assert_held(&ctx->mutex);
+
+ timeout = initial;
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ stm = ctx->streams[i];
+ if (stm && stm->state == DRAINING) {
+ ms = ms_until(&stm->drain_timeout);
+ if (ms >= 0 && timeout > ms) {
+ timeout = ms;
+ }
+ }
+ }
+
+ return timeout;
+}
+
+static int
+cubeb_run(cubeb * ctx)
+{
+ int r;
+ int timeout;
+ int i;
+ char dummy;
+ cubeb_stream * stm;
+
+ mutex_lock(&ctx->mutex);
+
+ if (ctx->rebuild) {
+ rebuild(ctx);
+ }
+
+ /* Wake up at least once per second for the watchdog. */
+ timeout = calculate_timeout(ctx, 1000);
+
+ mutex_unlock(&ctx->mutex);
+ r = poll(ctx->fds, ctx->nfds, timeout);
+ mutex_lock(&ctx->mutex);
+
+ if (r > 0) {
+ if (ctx->fds[0].revents & POLLIN) {
+ read(ctx->control_fd_read, &dummy, 1);
+
+ if (ctx->shutdown) {
+ mutex_unlock(&ctx->mutex);
+ return -1;
+ }
+ }
+
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ stm = ctx->streams[i];
+ if (stm && stm->state == RUNNING && stm->fds && any_revents(stm->fds, stm->nfds)) {
+ cubeb_refill_stream(stm);
+ }
+ }
+ } else if (r == 0) {
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ stm = ctx->streams[i];
+ if (stm) {
+ if (stm->state == DRAINING && ms_since(&stm->drain_timeout) >= 0) {
+ cubeb_set_stream_state(stm, IDLE);
+ stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED);
+ } else if (stm->state == RUNNING && ms_since(&stm->last_activity) > CUBEB_WATCHDOG_MS) {
+ cubeb_set_stream_state(stm, ERROR);
+ stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
+ }
+ }
+ }
+ }
+
+ mutex_unlock(&ctx->mutex);
+
+ return 0;
}
static void *
@@ -771,34 +510,33 @@ cubeb_locked_pcm_close(snd_pcm_t * pcm)
return r;
}
-static cubeb_stream *
-cubeb_new_stream(cubeb * ctx)
+static int
+cubeb_register_stream(cubeb * ctx, cubeb_stream * stm)
{
- cubeb_stream * stm = NULL;
-
- stm = calloc(1, sizeof(*stm));
- assert(stm);
+ int i;
+ int r = -1;
mutex_lock(&ctx->mutex);
- if (ctx->active_streams < CUBEB_STREAM_MAX) {
- ctx->active_streams += 1;
- } else {
- free(stm);
- stm = NULL;
+ for (i = 0; i < CUBEB_STREAM_MAX; ++i) {
+ if (!ctx->streams[i]) {
+ ctx->streams[i] = stm;
+ r = i;
+ break;
+ }
}
mutex_unlock(&ctx->mutex);
- return stm;
+ return r;
}
static void
-cubeb_free_stream(cubeb * ctx, cubeb_stream * stm)
+cubeb_unregister_stream(cubeb * ctx, int slot)
{
+ assert(slot >= 0 && slot < CUBEB_STREAM_MAX);
+
mutex_lock(&ctx->mutex);
- assert(ctx->active_streams >= 1);
- ctx->active_streams -= 1;
+ ctx->streams[slot] = NULL;
mutex_unlock(&ctx->mutex);
- free(stm);
}
static void
@@ -812,42 +550,40 @@ cubeb_init(cubeb ** context, char const * context_name UNUSED)
{
cubeb * ctx;
int r;
+ int i;
+ int fd[2];
pthread_attr_t attr;
assert(context);
*context = NULL;
pthread_mutex_lock(&cubeb_alsa_mutex);
- if (!cubeb_alsa_set_error_handler) {
+ if (!cubeb_alsa_error_handler_set) {
snd_lib_error_set_handler(silent_error_handler);
- cubeb_alsa_set_error_handler = 1;
+ cubeb_alsa_error_handler_set = 1;
}
pthread_mutex_unlock(&cubeb_alsa_mutex);
ctx = calloc(1, sizeof(*ctx));
assert(ctx);
- pipe_init(&ctx->control_fd_read, &ctx->control_fd_write);
-
- set_close_on_exec(ctx->control_fd_read);
- set_non_block(ctx->control_fd_read);
-
- set_close_on_exec(ctx->control_fd_write);
- set_non_block(ctx->control_fd_write);
-
mutex_init(&ctx->mutex);
- r = pthread_cond_init(&ctx->cond, NULL);
+ r = pipe(fd);
assert(r == 0);
- ctx->phase = 0;
+ for (i = 0; i < 2; ++i) {
+ fcntl(fd[i], F_SETFD, fcntl(fd[i], F_GETFD) | FD_CLOEXEC);
+ fcntl(fd[i], F_SETFL, fcntl(fd[i], F_GETFL) | O_NONBLOCK);
+ }
+
+ ctx->control_fd_read = fd[0];
+ ctx->control_fd_write = fd[1];
/* Force an early rebuild when cubeb_run is first called to ensure fds and
* nfds have been initialized. */
ctx->rebuild = 1;
- cubeb_watchdog(ctx);
-
r = pthread_attr_init(&attr);
assert(r == 0);
@@ -860,8 +596,6 @@ cubeb_init(cubeb ** context, char const * context_name UNUSED)
r = pthread_attr_destroy(&attr);
assert(r == 0);
- ctx->active_streams = 0;
-
*context = ctx;
return CUBEB_OK;
@@ -873,7 +607,6 @@ cubeb_destroy(cubeb * ctx)
int r;
assert(ctx);
- assert(ctx->active_streams == 0);
mutex_lock(&ctx->mutex);
ctx->shutdown = 1;
@@ -883,13 +616,8 @@ cubeb_destroy(cubeb * ctx)
r = pthread_join(ctx->thread, NULL);
assert(r == 0);
- poll_timer_destroy(ctx->watchdog_timer);
- ctx->watchdog_timer = NULL;
-
- assert(!ctx->waitable && !ctx->timer);
close(ctx->control_fd_read);
close(ctx->control_fd_write);
- pthread_cond_destroy(&ctx->cond);
mutex_destroy(&ctx->mutex);
free(ctx->fds);
@@ -933,16 +661,16 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
return CUBEB_ERROR_INVALID_FORMAT;
}
- stm = cubeb_new_stream(context);
- if (!stm) {
- return CUBEB_ERROR;
- }
+ stm = calloc(1, sizeof(*stm));
+ assert(stm);
stm->context = context;
stm->data_callback = data_callback;
stm->state_callback = state_callback;
stm->user_ptr = user_ptr;
stm->params = stream_params;
+ stm->state = IDLE;
+ stm->slot = -1;
mutex_init(&stm->mutex);
@@ -966,6 +694,20 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
r = snd_pcm_get_params(stm->pcm, &stm->buffer_size, &stm->period_size);
assert(r == 0);
+ stm->nfds = snd_pcm_poll_descriptors_count(stm->pcm);
+ assert(stm->nfds > 0);
+
+ stm->saved_fds = calloc(stm->nfds, sizeof(struct pollfd));
+ assert(stm->saved_fds);
+ r = snd_pcm_poll_descriptors(stm->pcm, stm->saved_fds, stm->nfds);
+ assert((nfds_t) r == stm->nfds);
+
+ stm->slot = cubeb_register_stream(context, stm);
+ if (stm->slot == -1) {
+ cubeb_stream_destroy(stm);
+ return CUBEB_ERROR;
+ }
+
*stream = stm;
return CUBEB_OK;
@@ -974,50 +716,36 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
void
cubeb_stream_destroy(cubeb_stream * stm)
{
- assert(stm && !stm->waitable && !stm->timer);
+ assert(stm && (stm->state == IDLE || stm->state == ERROR));
mutex_lock(&stm->mutex);
if (stm->pcm) {
cubeb_locked_pcm_close(stm->pcm);
stm->pcm = NULL;
}
+ free(stm->saved_fds);
mutex_unlock(&stm->mutex);
mutex_destroy(&stm->mutex);
- cubeb_free_stream(stm->context, stm);
+ if (stm->slot != -1) {
+ cubeb_unregister_stream(stm->context, stm->slot);
+ }
+ free(stm);
}
int
cubeb_stream_start(cubeb_stream * stm)
{
- int nfds;
- struct pollfd * fds;
- int r;
-
assert(stm);
mutex_lock(&stm->mutex);
-
- if (stm->waitable) {
- mutex_unlock(&stm->mutex);
- return CUBEB_OK;
- }
-
snd_pcm_pause(stm->pcm, 0);
-
- nfds = snd_pcm_poll_descriptors_count(stm->pcm);
- assert(nfds > 0);
-
- fds = calloc(nfds, sizeof(struct pollfd));
- assert(fds);
- r = snd_pcm_poll_descriptors(stm->pcm, fds, nfds);
- assert(r == nfds);
-
- stm->waitable = poll_waitable_init(stm->context, fds, nfds, cubeb_refill_stream, stm);
-
- free(fds);
mutex_unlock(&stm->mutex);
+ mutex_lock(&stm->context->mutex);
+ cubeb_set_stream_state(stm, RUNNING);
+ mutex_unlock(&stm->context->mutex);
+
return CUBEB_OK;
}
@@ -1026,26 +754,12 @@ cubeb_stream_stop(cubeb_stream * stm)
{
assert(stm);
- mutex_lock(&stm->mutex);
-
- if (stm->waitable) {
- poll_waitable_unref(stm->waitable);
- stm->waitable = NULL;
- }
-
- if (stm->timer) {
- poll_timer_destroy(stm->timer);
- stm->timer = NULL;
- }
-
- mutex_unlock(&stm->mutex);
-
- poll_phase_wait(stm->context);
+ mutex_lock(&stm->context->mutex);
+ cubeb_set_stream_state(stm, IDLE);
+ mutex_unlock(&stm->context->mutex);
mutex_lock(&stm->mutex);
-
snd_pcm_pause(stm->pcm, 1);
-
mutex_unlock(&stm->mutex);
return CUBEB_OK;