diff options
Diffstat (limited to 'src/cubeb_alsa.c')
-rw-r--r-- | src/cubeb_alsa.c | 766 |
1 files changed, 240 insertions, 526 deletions
diff --git a/src/cubeb_alsa.c b/src/cubeb_alsa.c index 295b5d1..fb8bb5c 100644 --- a/src/cubeb_alsa.c +++ b/src/cubeb_alsa.c @@ -17,6 +17,7 @@ #include "cubeb/cubeb.h" #define CUBEB_STREAM_MAX 16 +#define CUBEB_WATCHDOG_MS 10000 #define UNUSED __attribute__ ((__unused__)) /* ALSA is not thread-safe. snd_pcm_t instances are individually protected @@ -24,10 +25,7 @@ is not thread-safe until ALSA 1.0.24 (see alsa-lib.git commit 91c9c8f1), so those calls must be wrapped in the following mutex. */ static pthread_mutex_t cubeb_alsa_mutex = PTHREAD_MUTEX_INITIALIZER; -static int cubeb_alsa_set_error_handler = 0; - -typedef void (*poll_waitable_callback)(void * user_ptr, struct pollfd * fds, nfds_t nfds); -typedef void (*poll_timer_callback)(void * user_ptr); +static int cubeb_alsa_error_handler_set = 0; struct mutex { pthread_mutex_t mutex; @@ -35,40 +33,14 @@ struct mutex { int locked; }; -struct poll_timer { - struct poll_timer * next; - struct poll_timer * prev; - - struct cubeb * context; - - struct timeval wakeup; - - poll_timer_callback callback; - void * user_ptr; -}; - -struct poll_waitable { - struct poll_waitable * next; - struct poll_waitable * prev; - - struct cubeb * context; - - struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */ - struct pollfd * fds; /* Pointer to this waitable's pollfds within struct cubeb's fds. */ - nfds_t nfds; - - poll_waitable_callback callback; - void * user_ptr; - - unsigned int idle_count; - unsigned int refs; -}; - struct cubeb { pthread_t thread; - struct poll_timer * timer; - struct poll_waitable * waitable; + /* Mutex for streams array, must not be held while blocked in poll(2). */ + struct mutex mutex; + + /* Sparse array of streams managed by this context. */ + cubeb_stream * streams[CUBEB_STREAM_MAX]; /* fds and nfds are only updated by cubeb_run when rebuild is set. */ struct pollfd * fds; @@ -77,20 +49,16 @@ struct cubeb { int shutdown; - /* Control pipe for forcing poll to wake and rebuild fds or recalculate timeout. */ + /* Control pipe for forcing poll to wake and rebuild fds or recalculate the timeout. */ int control_fd_read; int control_fd_write; +}; - /* Mutex for timer and waitable lists, must not be held while blocked in - poll(2) or when writing to control_fd */ - struct mutex mutex; - pthread_cond_t cond; - - int phase; - - unsigned int active_streams; - - struct poll_timer * watchdog_timer; +enum stream_state { + IDLE, + RUNNING, + DRAINING, + ERROR }; struct cubeb_stream { @@ -106,8 +74,25 @@ struct cubeb_stream { snd_pcm_uframes_t period_size; cubeb_stream_params params; - struct poll_waitable * waitable; - struct poll_timer * timer; + int slot; + + /* Every member after this comment is protected by the owning context's + mutex rather than the stream's mutex, or is only used on the context's + run thread. */ + enum stream_state state; + + struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */ + struct pollfd * fds; /* Pointer to this waitable's pollfds within struct cubeb's fds. */ + nfds_t nfds; + + struct timeval drain_timeout; + + /* XXX: Horrible hack -- if an active stream has been idle for + CUBEB_WATCHDOG_MS it will be disabled and the error callback will be + called. This works around a bug seen with older versions of ALSA and + PulseAudio where streams would stop requesting new data despite still + being logically active and playing. */ + struct timeval last_activity; }; static void @@ -208,449 +193,121 @@ timeval_to_relative_ms(struct timeval * tv) struct timeval now; struct timeval dt; long long t; + int r; gettimeofday(&now, NULL); - if (cmp_timeval(tv, &now) <= 0) { - return 0; + r = cmp_timeval(tv, &now); + if (r >= 0) { + timersub(tv, &now, &dt); + } else { + timersub(&now, tv, &dt); } - - timersub(tv, &now, &dt); t = dt.tv_sec; t *= 1000; t += (dt.tv_usec + 500) / 1000; - return t <= INT_MAX ? t : INT_MAX; -} - -static void -pipe_init(int * read_fd, int * write_fd) -{ - int r; - int fd[2]; - r = pipe(fd); - assert(r == 0); + if (t > INT_MAX) { + t = INT_MAX; + } else if (t < INT_MIN) { + t = INT_MIN; + } - *read_fd = fd[0]; - *write_fd = fd[1]; + return r >= 0 ? t : -t; } -static void -set_close_on_exec(int fd) +static int +ms_until(struct timeval * tv) { - long flags; - int r; - - assert(fd >= 0); - - flags = fcntl(fd, F_GETFD); - assert(flags >= 0); - - r = fcntl(fd, F_SETFD, flags | FD_CLOEXEC); - assert(r == 0); + return timeval_to_relative_ms(tv); } -static void -set_non_block(int fd) +static int +ms_since(struct timeval * tv) { - long flags; - int r; - - assert(fd >= 0); - - flags = fcntl(fd, F_GETFL); - assert(flags >= 0); - - r = fcntl(fd, F_SETFL, flags | O_NONBLOCK); - assert(r == 0); + return -timeval_to_relative_ms(tv); } static void -rebuild(struct cubeb * ctx) +rebuild(cubeb * ctx) { nfds_t nfds; int i; - struct poll_waitable * item; + nfds_t j; + cubeb_stream * stm; mutex_assert_held(&ctx->mutex); assert(ctx->rebuild); - nfds = 0; - for (item = ctx->waitable; item; item = item->next) { - nfds += item->nfds; + /* Always count context's control pipe fd. */ + nfds = 1; + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + stm = ctx->streams[i]; + if (stm) { + stm->fds = NULL; + if (stm->state == RUNNING) { + nfds += stm->nfds; + } + } } - /* Special case: add control pipe fd. */ - nfds += 1; - free(ctx->fds); ctx->fds = calloc(nfds, sizeof(struct pollfd)); assert(ctx->fds); ctx->nfds = nfds; - for (i = 0, item = ctx->waitable; item; item = item->next) { - memcpy(&ctx->fds[i], item->saved_fds, item->nfds * sizeof(struct pollfd)); - item->fds = &ctx->fds[i]; - i += item->nfds; - } - - /* Special case: add control pipe fd. */ - ctx->fds[i].fd = ctx->control_fd_read; - ctx->fds[i].events = POLLIN | POLLERR; - - ctx->rebuild = 0; -} - -static void -poll_woke(struct cubeb * ctx) -{ - ssize_t r; - char dummy; - - r = read(ctx->control_fd_read, &dummy, 1); - assert(dummy == 'x' && r == 1); -} - -static void -poll_wake(struct cubeb * ctx) -{ - ssize_t r; - char dummy; - - dummy = 'x'; - r = write(ctx->control_fd_write, &dummy, 1); - assert(r == 1); -} - -static struct poll_waitable * -poll_waitable_init(struct cubeb * ctx, struct pollfd * fds, nfds_t nfds, - poll_waitable_callback callback, void * user_ptr) -{ - struct poll_waitable * waitable; - - waitable = calloc(1, sizeof(struct poll_waitable)); - assert(waitable); - waitable->context = ctx; - - waitable->saved_fds = calloc(nfds, sizeof(struct pollfd)); - assert(waitable->saved_fds); - waitable->nfds = nfds; - memcpy(waitable->saved_fds, fds, nfds * sizeof(struct pollfd)); - - waitable->callback = callback; - waitable->user_ptr = user_ptr; - - waitable->idle_count = 0; - waitable->refs = 1; + /* Include context's control pipe fd. */ + ctx->fds[0].fd = ctx->control_fd_read; + ctx->fds[0].events = POLLIN | POLLERR; - mutex_lock(&ctx->mutex); - - waitable->next = ctx->waitable; - if (ctx->waitable) { - ctx->waitable->prev = waitable; - } - ctx->waitable = waitable; - ctx->rebuild = 1; - - poll_wake(ctx); - mutex_unlock(&ctx->mutex); - - return waitable; -} - -static void -poll_waitable_ref(struct poll_waitable * w) -{ - struct cubeb * ctx = w->context; - - mutex_assert_held(&ctx->mutex); - w->refs += 1; -} - -static void -poll_waitable_unref(struct poll_waitable * w) -{ - struct cubeb * ctx = w->context; - - mutex_lock(&ctx->mutex); - - w->refs -= 1; - - if (w->refs == 0) { - if (w->next) { - w->next->prev = w->prev; - } - if (w->prev) { - w->prev->next = w->next; - } - - if (ctx->waitable == w) { - ctx->waitable = w->next; - } - - free(w->saved_fds); - free(w); - - ctx->rebuild = 1; - poll_wake(ctx); - } - - mutex_unlock(&ctx->mutex); -} - -static struct poll_timer * -poll_timer_absolute_init(struct cubeb * ctx, struct timeval * wakeup, - poll_timer_callback callback, void * user_ptr) -{ - struct poll_timer * timer; - struct poll_timer * item; - - timer = calloc(1, sizeof(*timer)); - assert(timer); - timer->context = ctx; - timer->wakeup = *wakeup; - timer->callback = callback; - timer->user_ptr = user_ptr; - - mutex_lock(&ctx->mutex); - - for (item = ctx->timer; item; item = item->next) { - if (cmp_timeval(&timer->wakeup, &item->wakeup) < 0) { - timer->next = item; - timer->prev = item->prev; - - if (timer->prev) { - timer->prev->next = timer; - } - item->prev = timer; - - break; - } - - if (!item->next) { - item->next = timer; - timer->prev = item; - break; + for (i = 0, j = 1; i < CUBEB_STREAM_MAX; ++i) { + stm = ctx->streams[i]; + if (stm && stm->state == RUNNING) { + memcpy(&ctx->fds[j], stm->saved_fds, stm->nfds * sizeof(struct pollfd)); + stm->fds = &ctx->fds[j]; + j += stm->nfds; } } - if (!timer->prev) { - ctx->timer = timer; - } - - poll_wake(ctx); - mutex_unlock(&ctx->mutex); - - return timer; -} - -static struct poll_timer * -poll_timer_relative_init(struct cubeb * ctx, unsigned int ms, - poll_timer_callback callback, void * user_ptr) -{ - struct timeval wakeup; - - gettimeofday(&wakeup, NULL); - wakeup.tv_sec += ms / 1000; - wakeup.tv_usec += (ms % 1000) * 1000; - - return poll_timer_absolute_init(ctx, &wakeup, callback, user_ptr); + ctx->rebuild = 0; } static void -poll_timer_destroy(struct poll_timer * t) +poll_wake(cubeb * ctx) { - struct cubeb * ctx = t->context; - - mutex_lock(&ctx->mutex); - - if (t->next) { - t->next->prev = t->prev; - } - if (t->prev) { - t->prev->next = t->next; - } - - if (ctx->timer == t) { - ctx->timer = t->next; - } - - free(t); - - poll_wake(ctx); - mutex_unlock(&ctx->mutex); + write(ctx->control_fd_write, "x", 1); } static void -poll_phase_wait(cubeb * ctx) +set_timeout(struct timeval * timeout, unsigned int ms) { - int phase; - - mutex_lock(&ctx->mutex); - phase = ctx->phase; - while (ctx->phase == phase) { - pthread_cond_wait(&ctx->cond, &ctx->mutex.mutex); - } - mutex_unlock(&ctx->mutex); -} - -static int -cubeb_run(struct cubeb * ctx) -{ - int r; - int timeout; - struct poll_waitable * waitable; - struct poll_timer * timer; - struct poll_waitable ** ready; - int i; - - mutex_lock(&ctx->mutex); - - if (ctx->rebuild) { - rebuild(ctx); - } - - timeout = -1; - timer = ctx->timer; - if (timer) { - timeout = timeval_to_relative_ms(&timer->wakeup); - } - - /* No timers or waitables, we're done. */ - if (timeout == -1 && ctx->nfds == 0) { - return -1; - } - - mutex_unlock(&ctx->mutex); - r = poll(ctx->fds, ctx->nfds, timeout); - mutex_lock(&ctx->mutex); - - if (r > 0) { - if (ctx->fds[ctx->nfds - 1].revents & POLLIN) { - poll_woke(ctx); - if (ctx->shutdown) { - mutex_unlock(&ctx->mutex); - return -1; - } - } - - i = 0; - ready = calloc(r, sizeof(struct poll_waitable *)); - - /* TODO: Break once r pfds have been processed, ideally with a waitable - list sorted by latency. */ - for (waitable = ctx->waitable; waitable; waitable = waitable->next) { - if (waitable->fds && any_revents(waitable->fds, waitable->nfds)) { - poll_waitable_ref(waitable); - ready[i++] = waitable; - waitable->idle_count = 0; - } - } - - mutex_unlock(&ctx->mutex); - for (i = 0; i < r; ++i) { - if (!ready[i]) { - break; - } - ready[i]->callback(ready[i]->user_ptr, ready[i]->fds, ready[i]->nfds); - poll_waitable_unref(ready[i]); - } - mutex_lock(&ctx->mutex); - - free(ready); - } else if (r == 0) { - assert(timer); - mutex_unlock(&ctx->mutex); - timer->callback(timer->user_ptr); - mutex_lock(&ctx->mutex); - } - - ctx->phase += 1; - pthread_cond_broadcast(&ctx->cond); - - mutex_unlock(&ctx->mutex); - - return 0; -} - -static void -cubeb_watchdog(void * context) -{ - cubeb * ctx = context; - struct poll_waitable * waitable; - struct poll_waitable * tmp[16]; - int broken_streams; - int i; - - if (ctx->watchdog_timer) { - poll_timer_destroy(ctx->watchdog_timer); - } - - ctx->watchdog_timer = poll_timer_relative_init(ctx, 1000, cubeb_watchdog, ctx); - - mutex_lock(&ctx->mutex); - /* XXX: Horrible hack -- if an active (registered) stream has been idle - for 10 ticks of the watchdog, kill it and mark the stream in error. - This works around a bug seen with older versions of ALSA and PulseAudio - where streams would stop requesting new data despite still being - logically active and playing. */ - broken_streams = 0; - for (waitable = ctx->waitable; waitable; waitable = waitable->next) { - waitable->idle_count += 1; - if (waitable->idle_count >= 10) { - poll_waitable_ref(waitable); - tmp[broken_streams++] = waitable; - } - } - - mutex_unlock(&ctx->mutex); - - for (i = 0; i < broken_streams; ++i) { - cubeb_stream * stm = tmp[i]->user_ptr; - assert(tmp[i]->idle_count >= 10); - mutex_lock(&stm->mutex); - if (stm->waitable) { - poll_waitable_unref(stm->waitable); - stm->waitable = NULL; - } - mutex_unlock(&stm->mutex); - poll_waitable_unref(tmp[i]); - stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); - } + gettimeofday(timeout, NULL); + timeout->tv_sec += ms / 1000; + timeout->tv_usec += (ms % 1000) * 1000; } static void -cubeb_drain_stream(void * stream) +cubeb_set_stream_state(cubeb_stream * stm, enum stream_state state) { - cubeb_stream * stm = stream; - int drained = 0; - - mutex_lock(&stm->mutex); - /* It's possible that the stream was stopped after the timer fired but - before we locked the stream. */ - if (stm->timer) { - poll_timer_destroy(stm->timer); - stm->timer = NULL; - drained = 1; - } - mutex_unlock(&stm->mutex); - if (drained) { - stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED); - } + mutex_assert_not_held(&stm->mutex); + mutex_assert_held(&stm->context->mutex); + stm->state = state; + stm->context->rebuild = 1; + poll_wake(stm->context); } static void -cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds) +cubeb_refill_stream(cubeb_stream * stm) { - cubeb_stream * stm = stream; int r; unsigned short revents; snd_pcm_sframes_t avail; long got; void * p; + int draining = 0; mutex_lock(&stm->mutex); - r = snd_pcm_poll_descriptors_revents(stm->pcm, fds, nfds, &revents); + r = snd_pcm_poll_descriptors_revents(stm->pcm, stm->fds, stm->nfds, &revents); if (r < 0 || revents != POLLOUT) { /* This should be a stream error; it makes no sense for poll(2) to wake for this stream and then have the stream report that it's not ready. @@ -667,9 +324,8 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds) /* Failed to recover from an xrun, this stream must be broken. */ if (avail < 0) { - poll_waitable_unref(stm->waitable); - stm->waitable = NULL; mutex_unlock(&stm->mutex); + cubeb_set_stream_state(stm, ERROR); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); return; } @@ -686,9 +342,8 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds) snd_pcm_recover(stm->pcm, -EPIPE, 1); avail = snd_pcm_avail_update(stm->pcm); if (avail <= 0) { - poll_waitable_unref(stm->waitable); - stm->waitable = NULL; mutex_unlock(&stm->mutex); + cubeb_set_stream_state(stm, ERROR); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); return; } @@ -701,9 +356,8 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds) got = stream_data_callback(stm, stm->user_ptr, p, avail); mutex_lock(&stm->mutex); if (got < 0) { - poll_waitable_unref(stm->waitable); - stm->waitable = NULL; mutex_unlock(&stm->mutex); + cubeb_set_stream_state(stm, ERROR); stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); return; } @@ -715,23 +369,108 @@ cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds) } assert(wrote >= 0 && wrote == got); stm->write_position += wrote; + gettimeofday(&stm->last_activity, NULL); } if (got != avail) { long buffer_fill = stm->buffer_size - (avail - got); double buffer_time = (double) buffer_fill / stm->params.rate; - /* Fill the remaining buffer with silence to guarantee at least a period has been written. */ + /* Fill the remaining buffer with silence to guarantee one full period + * has been written. */ snd_pcm_writei(stm->pcm, (char *) p + got, avail - got); - poll_waitable_unref(stm->waitable); - stm->waitable = NULL; + set_timeout(&stm->drain_timeout, buffer_time * 1000); - stm->timer = poll_timer_relative_init(stm->context, buffer_time * 1000, - cubeb_drain_stream, stm); + draining = 1; } free(p); mutex_unlock(&stm->mutex); + if (draining) { + cubeb_set_stream_state(stm, DRAINING); + } +} + +static int +calculate_timeout(cubeb * ctx, int initial) +{ + int i; + int timeout; + cubeb_stream * stm; + int ms; + + mutex_assert_held(&ctx->mutex); + + timeout = initial; + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + stm = ctx->streams[i]; + if (stm && stm->state == DRAINING) { + ms = ms_until(&stm->drain_timeout); + if (ms >= 0 && timeout > ms) { + timeout = ms; + } + } + } + + return timeout; +} + +static int +cubeb_run(cubeb * ctx) +{ + int r; + int timeout; + int i; + char dummy; + cubeb_stream * stm; + + mutex_lock(&ctx->mutex); + + if (ctx->rebuild) { + rebuild(ctx); + } + + /* Wake up at least once per second for the watchdog. */ + timeout = calculate_timeout(ctx, 1000); + + mutex_unlock(&ctx->mutex); + r = poll(ctx->fds, ctx->nfds, timeout); + mutex_lock(&ctx->mutex); + + if (r > 0) { + if (ctx->fds[0].revents & POLLIN) { + read(ctx->control_fd_read, &dummy, 1); + + if (ctx->shutdown) { + mutex_unlock(&ctx->mutex); + return -1; + } + } + + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + stm = ctx->streams[i]; + if (stm && stm->state == RUNNING && stm->fds && any_revents(stm->fds, stm->nfds)) { + cubeb_refill_stream(stm); + } + } + } else if (r == 0) { + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + stm = ctx->streams[i]; + if (stm) { + if (stm->state == DRAINING && ms_since(&stm->drain_timeout) >= 0) { + cubeb_set_stream_state(stm, IDLE); + stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_DRAINED); + } else if (stm->state == RUNNING && ms_since(&stm->last_activity) > CUBEB_WATCHDOG_MS) { + cubeb_set_stream_state(stm, ERROR); + stream_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); + } + } + } + } + + mutex_unlock(&ctx->mutex); + + return 0; } static void * @@ -771,34 +510,33 @@ cubeb_locked_pcm_close(snd_pcm_t * pcm) return r; } -static cubeb_stream * -cubeb_new_stream(cubeb * ctx) +static int +cubeb_register_stream(cubeb * ctx, cubeb_stream * stm) { - cubeb_stream * stm = NULL; - - stm = calloc(1, sizeof(*stm)); - assert(stm); + int i; + int r = -1; mutex_lock(&ctx->mutex); - if (ctx->active_streams < CUBEB_STREAM_MAX) { - ctx->active_streams += 1; - } else { - free(stm); - stm = NULL; + for (i = 0; i < CUBEB_STREAM_MAX; ++i) { + if (!ctx->streams[i]) { + ctx->streams[i] = stm; + r = i; + break; + } } mutex_unlock(&ctx->mutex); - return stm; + return r; } static void -cubeb_free_stream(cubeb * ctx, cubeb_stream * stm) +cubeb_unregister_stream(cubeb * ctx, int slot) { + assert(slot >= 0 && slot < CUBEB_STREAM_MAX); + mutex_lock(&ctx->mutex); - assert(ctx->active_streams >= 1); - ctx->active_streams -= 1; + ctx->streams[slot] = NULL; mutex_unlock(&ctx->mutex); - free(stm); } static void @@ -812,42 +550,40 @@ cubeb_init(cubeb ** context, char const * context_name UNUSED) { cubeb * ctx; int r; + int i; + int fd[2]; pthread_attr_t attr; assert(context); *context = NULL; pthread_mutex_lock(&cubeb_alsa_mutex); - if (!cubeb_alsa_set_error_handler) { + if (!cubeb_alsa_error_handler_set) { snd_lib_error_set_handler(silent_error_handler); - cubeb_alsa_set_error_handler = 1; + cubeb_alsa_error_handler_set = 1; } pthread_mutex_unlock(&cubeb_alsa_mutex); ctx = calloc(1, sizeof(*ctx)); assert(ctx); - pipe_init(&ctx->control_fd_read, &ctx->control_fd_write); - - set_close_on_exec(ctx->control_fd_read); - set_non_block(ctx->control_fd_read); - - set_close_on_exec(ctx->control_fd_write); - set_non_block(ctx->control_fd_write); - mutex_init(&ctx->mutex); - r = pthread_cond_init(&ctx->cond, NULL); + r = pipe(fd); assert(r == 0); - ctx->phase = 0; + for (i = 0; i < 2; ++i) { + fcntl(fd[i], F_SETFD, fcntl(fd[i], F_GETFD) | FD_CLOEXEC); + fcntl(fd[i], F_SETFL, fcntl(fd[i], F_GETFL) | O_NONBLOCK); + } + + ctx->control_fd_read = fd[0]; + ctx->control_fd_write = fd[1]; /* Force an early rebuild when cubeb_run is first called to ensure fds and * nfds have been initialized. */ ctx->rebuild = 1; - cubeb_watchdog(ctx); - r = pthread_attr_init(&attr); assert(r == 0); @@ -860,8 +596,6 @@ cubeb_init(cubeb ** context, char const * context_name UNUSED) r = pthread_attr_destroy(&attr); assert(r == 0); - ctx->active_streams = 0; - *context = ctx; return CUBEB_OK; @@ -873,7 +607,6 @@ cubeb_destroy(cubeb * ctx) int r; assert(ctx); - assert(ctx->active_streams == 0); mutex_lock(&ctx->mutex); ctx->shutdown = 1; @@ -883,13 +616,8 @@ cubeb_destroy(cubeb * ctx) r = pthread_join(ctx->thread, NULL); assert(r == 0); - poll_timer_destroy(ctx->watchdog_timer); - ctx->watchdog_timer = NULL; - - assert(!ctx->waitable && !ctx->timer); close(ctx->control_fd_read); close(ctx->control_fd_write); - pthread_cond_destroy(&ctx->cond); mutex_destroy(&ctx->mutex); free(ctx->fds); @@ -933,16 +661,16 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n return CUBEB_ERROR_INVALID_FORMAT; } - stm = cubeb_new_stream(context); - if (!stm) { - return CUBEB_ERROR; - } + stm = calloc(1, sizeof(*stm)); + assert(stm); stm->context = context; stm->data_callback = data_callback; stm->state_callback = state_callback; stm->user_ptr = user_ptr; stm->params = stream_params; + stm->state = IDLE; + stm->slot = -1; mutex_init(&stm->mutex); @@ -966,6 +694,20 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n r = snd_pcm_get_params(stm->pcm, &stm->buffer_size, &stm->period_size); assert(r == 0); + stm->nfds = snd_pcm_poll_descriptors_count(stm->pcm); + assert(stm->nfds > 0); + + stm->saved_fds = calloc(stm->nfds, sizeof(struct pollfd)); + assert(stm->saved_fds); + r = snd_pcm_poll_descriptors(stm->pcm, stm->saved_fds, stm->nfds); + assert((nfds_t) r == stm->nfds); + + stm->slot = cubeb_register_stream(context, stm); + if (stm->slot == -1) { + cubeb_stream_destroy(stm); + return CUBEB_ERROR; + } + *stream = stm; return CUBEB_OK; @@ -974,50 +716,36 @@ cubeb_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n void cubeb_stream_destroy(cubeb_stream * stm) { - assert(stm && !stm->waitable && !stm->timer); + assert(stm && (stm->state == IDLE || stm->state == ERROR)); mutex_lock(&stm->mutex); if (stm->pcm) { cubeb_locked_pcm_close(stm->pcm); stm->pcm = NULL; } + free(stm->saved_fds); mutex_unlock(&stm->mutex); mutex_destroy(&stm->mutex); - cubeb_free_stream(stm->context, stm); + if (stm->slot != -1) { + cubeb_unregister_stream(stm->context, stm->slot); + } + free(stm); } int cubeb_stream_start(cubeb_stream * stm) { - int nfds; - struct pollfd * fds; - int r; - assert(stm); mutex_lock(&stm->mutex); - - if (stm->waitable) { - mutex_unlock(&stm->mutex); - return CUBEB_OK; - } - snd_pcm_pause(stm->pcm, 0); - - nfds = snd_pcm_poll_descriptors_count(stm->pcm); - assert(nfds > 0); - - fds = calloc(nfds, sizeof(struct pollfd)); - assert(fds); - r = snd_pcm_poll_descriptors(stm->pcm, fds, nfds); - assert(r == nfds); - - stm->waitable = poll_waitable_init(stm->context, fds, nfds, cubeb_refill_stream, stm); - - free(fds); mutex_unlock(&stm->mutex); + mutex_lock(&stm->context->mutex); + cubeb_set_stream_state(stm, RUNNING); + mutex_unlock(&stm->context->mutex); + return CUBEB_OK; } @@ -1026,26 +754,12 @@ cubeb_stream_stop(cubeb_stream * stm) { assert(stm); - mutex_lock(&stm->mutex); - - if (stm->waitable) { - poll_waitable_unref(stm->waitable); - stm->waitable = NULL; - } - - if (stm->timer) { - poll_timer_destroy(stm->timer); - stm->timer = NULL; - } - - mutex_unlock(&stm->mutex); - - poll_phase_wait(stm->context); + mutex_lock(&stm->context->mutex); + cubeb_set_stream_state(stm, IDLE); + mutex_unlock(&stm->context->mutex); mutex_lock(&stm->mutex); - snd_pcm_pause(stm->pcm, 1); - mutex_unlock(&stm->mutex); return CUBEB_OK; |