diff options
author | Matthew Gregan <[email protected]> | 2011-12-01 16:07:34 +1300 |
---|---|---|
committer | Matthew Gregan <[email protected]> | 2011-12-01 16:07:34 +1300 |
commit | c9f2831c17107217e29f4528ac55370e88d8439f (patch) | |
tree | c2ac673dbb4f3e2b748bedbfda0eb43045d6319a /src | |
parent | dccd4a4918fc97fc0054eb7ed0f113b21aafb71c (diff) | |
download | cubeb-c9f2831c17107217e29f4528ac55370e88d8439f.tar.gz cubeb-c9f2831c17107217e29f4528ac55370e88d8439f.zip |
alsa: post rework code cleanup.
Diffstat (limited to 'src')
-rw-r--r-- | src/cubeb_alsa.c | 438 |
1 files changed, 232 insertions, 206 deletions
diff --git a/src/cubeb_alsa.c b/src/cubeb_alsa.c index 47976a5..7373a87 100644 --- a/src/cubeb_alsa.c +++ b/src/cubeb_alsa.c @@ -22,8 +22,11 @@ so those calls must be wrapped in the following global mutex. */ static pthread_mutex_t cubeb_alsa_mutex = PTHREAD_MUTEX_INITIALIZER; -#define XPOLL_CALLBACK_CONTINUE 0 -#define XPOLL_CALLBACK_REMOVE 1 +#define XPOLL_WAITABLE_CONTINUE 0 +#define XPOLL_WAITABLE_REMOVE 1 + +typedef int (*xpoll_waitable_callback)(void * user_ptr, struct pollfd * fds, nfds_t nfds); +typedef void (*xpoll_timer_callback)(void * user_ptr); struct xpoll_timer { struct xpoll_timer * next; @@ -33,7 +36,7 @@ struct xpoll_timer { struct timeval wakeup; - void (* callback)(void * user_ptr); + xpoll_timer_callback callback; void * user_ptr; }; @@ -43,34 +46,31 @@ struct xpoll_waitable { struct xpoll * xpoll; - struct pollfd * saved_fds; - struct pollfd * fds; + struct pollfd * saved_fds; /* A copy of the pollfds passed in at init time. */ + struct pollfd * fds; /* Pointer to this waitable's pollfds within struct xpoll's fds. */ nfds_t nfds; - int (* callback)(void * user_ptr); + xpoll_waitable_callback callback; void * user_ptr; }; struct xpoll { - pthread_mutex_t timer_mutex; struct xpoll_timer * timer; pthread_mutex_t waitable_mutex; struct xpoll_waitable * waitable; + + /* fds and nfds are only updated by xpoll_run when rebuild is set. */ struct pollfd * fds; nfds_t nfds; int rebuild; + /* Waitable for forcing poll to wake and rebuild fds or recalculate timeout. */ int control_fd; struct xpoll_waitable * control; }; -struct xpoll_waitable * xpoll_waitable_init(struct xpoll * p, struct pollfd * fds, nfds_t nfds, - int (* callback)(void * user_ptr), void * user_ptr); -void xpoll_timer_destroy(struct xpoll_timer * w); -void xpoll_waitable_destroy(struct xpoll_waitable * w); - struct cubeb { pthread_t thread; struct xpoll * xpoll; @@ -92,9 +92,8 @@ struct cubeb_stream { struct xpoll_timer * timer; }; - static int -any_events(struct pollfd * fds, nfds_t nfds) +any_revents(struct pollfd * fds, nfds_t nfds) { nfds_t i; @@ -107,8 +106,39 @@ any_events(struct pollfd * fds, nfds_t nfds) return 0; } +static int +cmp_timeval(struct timeval * a, struct timeval * b) +{ + if (a->tv_sec == b->tv_sec) { + if (a->tv_usec == b->tv_usec) { + return 0; + } + return a->tv_usec > b->tv_usec ? 1 : -1; + } + return a->tv_sec > b->tv_sec ? 1 : -1; +} + +static int +timeval_to_relative_ms(struct timeval * tv) +{ + struct timeval now; + struct timeval dt; + long long t; + + gettimeofday(&now, NULL); + if (cmp_timeval(tv, &now) <= 0) { + return 0; + } + + timersub(tv, &now, &dt); + t = dt.tv_sec; + t *= 1000; + t += (dt.tv_usec + 500) / 1000; + return t <= INT_MAX ? t : INT_MAX; +} + static void -xpoll_rebuild(struct xpoll * p) +rebuild(struct xpoll * p) { nfds_t nfds; int i; @@ -123,6 +153,7 @@ xpoll_rebuild(struct xpoll * p) free(p->fds); p->fds = calloc(nfds, sizeof(struct pollfd)); + assert(p->fds); p->nfds = nfds; for (i = 0, item = p->waitable; item; item = item->next) { @@ -134,71 +165,18 @@ xpoll_rebuild(struct xpoll * p) p->rebuild = 0; } -static int -xpoll_wakeup(void * user_ptr) -{ - struct xpoll * p = user_ptr; - char dummy; - - read(p->control->fds[0].fd, &dummy, 1); - return XPOLL_CALLBACK_CONTINUE; -} - -struct xpoll * -xpoll_init(void) -{ - struct xpoll * p; - int r; - int pipe_fd[2]; - struct pollfd fd; - - p = calloc(1, sizeof(struct xpoll)); - - pthread_mutex_init(&p->waitable_mutex, NULL); - pthread_mutex_init(&p->timer_mutex, NULL); - - r = pipe(pipe_fd); - assert(r == 0); - - fd.fd = pipe_fd[0]; - fd.events = POLLIN; - - p->control_fd = pipe_fd[1]; - - p->control = xpoll_waitable_init(p, &fd, 1, xpoll_wakeup, p); - - p->rebuild = 1; - - return p; -} - -void -xpoll_quit(struct xpoll * p) -{ - xpoll_waitable_destroy(p->control); -} - -void -xpoll_destroy(struct xpoll * p) -{ - assert(!p->waitable); - assert(!p->timer); - free(p->fds); - pthread_mutex_destroy(&p->waitable_mutex); - pthread_mutex_destroy(&p->timer_mutex); - free(p); -} - -struct xpoll_waitable * +static struct xpoll_waitable * xpoll_waitable_init(struct xpoll * p, struct pollfd * fds, nfds_t nfds, - int (* callback)(void * user_ptr), void * user_ptr) + xpoll_waitable_callback callback, void * user_ptr) { struct xpoll_waitable * w; w = calloc(1, sizeof(struct xpoll_waitable)); + assert(w); w->xpoll = p; w->saved_fds = calloc(nfds, sizeof(struct pollfd)); + assert(w->saved_fds); w->nfds = nfds; memcpy(w->saved_fds, fds, nfds * sizeof(struct pollfd)); @@ -222,11 +200,12 @@ xpoll_waitable_init(struct xpoll * p, struct pollfd * fds, nfds_t nfds, return w; } -void -xpoll_waitable_destroy(struct xpoll_waitable * w) +static struct xpoll_waitable * +waitable_destroy_unlocked(struct xpoll_waitable * w) { - pthread_mutex_lock(&w->xpoll->waitable_mutex); + struct xpoll_waitable * next; + next = w->next; if (w->next) { w->next->prev = w->prev; } @@ -239,138 +218,41 @@ xpoll_waitable_destroy(struct xpoll_waitable * w) w->xpoll->rebuild = 1; - pthread_mutex_unlock(&w->xpoll->waitable_mutex); - - write(w->xpoll->control_fd, "x", 1); - free(w->saved_fds); - free(w); + + return next; } -int -xpoll_run(struct xpoll * p) +static void +xpoll_waitable_destroy(struct xpoll_waitable * w) { - int r; - struct xpoll_timer * timer; - int timeout; - struct timeval now; - struct timeval dt; - struct xpoll_waitable * waitable; - int no_work; + struct xpoll * p = w->xpoll; pthread_mutex_lock(&p->waitable_mutex); - - if (p->rebuild) { - xpoll_rebuild(p); - } - - no_work = !p->waitable; - + waitable_destroy_unlocked(w); pthread_mutex_unlock(&p->waitable_mutex); - pthread_mutex_lock(&p->timer_mutex); - - no_work &= !p->timer; - if (no_work) { - pthread_mutex_unlock(&p->timer_mutex); - return -1; - } - - timeout = -1; - timer = p->timer; - if (timer) { - gettimeofday(&now, NULL); - if (now.tv_sec < timer->wakeup.tv_sec || - (now.tv_sec == timer->wakeup.tv_sec && - now.tv_usec < timer->wakeup.tv_usec)) { - long long t; - - timersub(&timer->wakeup, &now, &dt); - - t = dt.tv_sec; - t *= 1000; - t += (dt.tv_usec + 500) / 1000; - timeout = t <= INT_MAX ? t : INT_MAX; - } else { - timeout = 0; - } - } - - pthread_mutex_unlock(&p->timer_mutex); - - // XXX what can become invalid while the lock is available? - // - fds mutated - // - new timer with earlier timeout - r = poll(p->fds, p->nfds, timeout); - - - if (r > 0) { - pthread_mutex_lock(&p->waitable_mutex); - for (waitable = p->waitable; waitable; ) { - if (waitable->fds && any_events(waitable->fds, waitable->nfds)) { - r = waitable->callback(waitable->user_ptr); - if (r == XPOLL_CALLBACK_REMOVE) { - struct xpoll_waitable * dead = waitable; - - if (dead->next) { - dead->next->prev = dead->prev; - } - if (dead->prev) { - dead->prev->next = dead->next; - } - if (p->waitable == dead) { - p->waitable = dead->next; - } - - p->rebuild = 1; - free(dead->saved_fds); - waitable = dead->next; - free(dead); - continue; - } - } - waitable = waitable->next; - } - pthread_mutex_unlock(&p->waitable_mutex); - } else if (r == 0) { - pthread_mutex_lock(&p->timer_mutex); - assert(timer && timer == p->timer); - timer->callback(timer->user_ptr); - if (timer->next) { - timer->next->prev = timer->prev; - } - if (timer->prev) { - timer->prev->next = timer->next; - } - if (p->timer == timer) { - p->timer = timer->next; - } - free(timer); - pthread_mutex_unlock(&p->timer_mutex); - } - - return 0; + write(p->control_fd, "x", 1); } -struct xpoll_timer * -xpoll_timer_absolute_oneshot(struct xpoll * p, struct timeval wakeup, - void (* callback)(void * user_ptr), void * user_ptr) +static struct xpoll_timer * +xpoll_timer_absolute_oneshot(struct xpoll * p, struct timeval * wakeup, + xpoll_timer_callback callback, void * user_ptr) { struct xpoll_timer * timer; struct xpoll_timer * item; timer = calloc(1, sizeof(*timer)); + assert(timer); timer->xpoll = p; - timer->wakeup = wakeup; + timer->wakeup = *wakeup; timer->callback = callback; timer->user_ptr = user_ptr; pthread_mutex_lock(&p->timer_mutex); for (item = p->timer; item; item = item->next) { - if (wakeup.tv_sec < item->wakeup.tv_sec || - (wakeup.tv_sec == item->wakeup.tv_sec && - wakeup.tv_usec < item->wakeup.tv_usec)) { + if (cmp_timeval(wakeup, &item->wakeup) < 0) { timer->next = item; timer->prev = item->prev; @@ -400,9 +282,9 @@ xpoll_timer_absolute_oneshot(struct xpoll * p, struct timeval wakeup, return timer; } -struct xpoll_timer * +static struct xpoll_timer * xpoll_timer_relative_oneshot(struct xpoll * p, unsigned int ms, - void (* callback)(void * user_ptr), void * user_ptr) + xpoll_timer_callback callback, void * user_ptr) { struct timeval wakeup; @@ -410,14 +292,12 @@ xpoll_timer_relative_oneshot(struct xpoll * p, unsigned int ms, wakeup.tv_sec += ms / 1000; wakeup.tv_usec += (ms % 1000) * 1000; - return xpoll_timer_absolute_oneshot(p, wakeup, callback, user_ptr); + return xpoll_timer_absolute_oneshot(p, &wakeup, callback, user_ptr); } -void -xpoll_timer_destroy(struct xpoll_timer * t) +static void +timer_destroy_unlocked(struct xpoll_timer * t) { - pthread_mutex_lock(&t->xpoll->timer_mutex); - if (t->next) { t->next->prev = t->prev; } @@ -428,11 +308,133 @@ xpoll_timer_destroy(struct xpoll_timer * t) t->xpoll->timer = t->next; } - pthread_mutex_unlock(&t->xpoll->timer_mutex); + free(t); +} + +static void +xpoll_timer_destroy(struct xpoll_timer * t) +{ + struct xpoll * p = t->xpoll; + + pthread_mutex_lock(&p->timer_mutex); + timer_destroy_unlocked(t); + pthread_mutex_unlock(&p->timer_mutex); - write(t->xpoll->control_fd, "x", 1); + write(p->control_fd, "x", 1); +} - free(t); +static int +wakeup(void * user_ptr, struct pollfd * fds, nfds_t nfds) +{ + char dummy; + + assert(nfds == 1); + assert(fds[0].revents == POLLIN); + + read(fds[0].fd, &dummy, 1); + return XPOLL_WAITABLE_CONTINUE; +} + +static struct xpoll * +xpoll_init(void) +{ + struct xpoll * p; + int r; + int pipe_fd[2]; + struct pollfd fd; + + p = calloc(1, sizeof(struct xpoll)); + assert(p); + + pthread_mutex_init(&p->waitable_mutex, NULL); + pthread_mutex_init(&p->timer_mutex, NULL); + + r = pipe(pipe_fd); + assert(r == 0); + + fd.fd = pipe_fd[0]; + fd.events = POLLIN; + + p->control_fd = pipe_fd[1]; + + p->control = xpoll_waitable_init(p, &fd, 1, wakeup, p); + + /* Force an early rebuild when xpoll_run is called to ensure fds and nfds + * have been initialized. */ + p->rebuild = 1; + + return p; +} + +static void +xpoll_quit(struct xpoll * p) +{ + xpoll_waitable_destroy(p->control); +} + +static void +xpoll_destroy(struct xpoll * p) +{ + assert(!p->waitable); + assert(!p->timer); + free(p->fds); + pthread_mutex_destroy(&p->waitable_mutex); + pthread_mutex_destroy(&p->timer_mutex); + free(p); +} + +static int +xpoll_run(struct xpoll * p) +{ + int r; + int timeout; + struct xpoll_waitable * waitable; + struct xpoll_timer * timer; + + pthread_mutex_lock(&p->waitable_mutex); + if (p->rebuild) { + rebuild(p); + } + pthread_mutex_unlock(&p->waitable_mutex); + + pthread_mutex_lock(&p->timer_mutex); + timeout = -1; + timer = p->timer; + if (timer) { + timeout = timeval_to_relative_ms(&timer->wakeup); + } + pthread_mutex_unlock(&p->timer_mutex); + + /* No timers or waitables, we're done. */ + if (timeout == -1 && p->nfds == 0) { + return -1; + } + + r = poll(p->fds, p->nfds, timeout); + + if (r > 0) { + pthread_mutex_lock(&p->waitable_mutex); + for (waitable = p->waitable; waitable; ) { + if (waitable->fds && any_revents(waitable->fds, waitable->nfds)) { + r = waitable->callback(waitable->user_ptr, waitable->fds, waitable->nfds); + if (r == XPOLL_WAITABLE_REMOVE) { + waitable = waitable_destroy_unlocked(waitable); + continue; + } + } + waitable = waitable->next; + } + pthread_mutex_unlock(&p->waitable_mutex); + } else if (r == 0) { + pthread_mutex_lock(&p->timer_mutex); + if (timer == p->timer) { + timer->callback(timer->user_ptr); + timer_destroy_unlocked(timer); + } + pthread_mutex_unlock(&p->timer_mutex); + } + + return 0; } static void @@ -445,20 +447,38 @@ cubeb_drain_stream(void * stream) } static int -cubeb_refill_stream(void * stream) +cubeb_refill_stream(void * stream, struct pollfd * fds, nfds_t nfds) { cubeb_stream * stm = stream; - long got; + int r; + unsigned short revents; snd_pcm_sframes_t avail; + long got; void * p; + r = snd_pcm_poll_descriptors_revents(stm->pcm, fds, nfds, &revents); + assert(r >= 0); + + if (revents == POLLERR) { + return XPOLL_WAITABLE_REMOVE; + } + + if (revents != POLLOUT) { + return XPOLL_WAITABLE_CONTINUE; + } + avail = snd_pcm_avail_update(stm->pcm); if (avail == -EPIPE) { snd_pcm_recover(stm->pcm, avail, 1); avail = snd_pcm_avail_update(stm->pcm); } + if (avail < 0) { + return XPOLL_WAITABLE_REMOVE; + } + p = calloc(1, snd_pcm_frames_to_bytes(stm->pcm, avail)); assert(p); + got = stm->data_callback(stm, stm->user_ptr, p, avail); if (got < 0) { assert(0); /* XXX handle this case */ @@ -471,15 +491,18 @@ cubeb_refill_stream(void * stream) long buffer_fill = stm->buffer_size - (avail - got); double buffer_time = (double) buffer_fill / stm->params.rate; - /* XXX write out a period of data to ensure real data is flushed to speakers */ + /* Fill the remaining buffer with silence to guarantee at least a period has been written. */ snd_pcm_writei(stm->pcm, (char *) p + got, avail - got); stm->waitable = NULL; - stm->timer = xpoll_timer_relative_oneshot(stm->context->xpoll, buffer_time * 1000, cubeb_drain_stream, stm); + stm->timer = xpoll_timer_relative_oneshot(stm->context->xpoll, buffer_time * 1000, + cubeb_drain_stream, stm); } + free(p); - return stm->waitable ? XPOLL_CALLBACK_CONTINUE : XPOLL_CALLBACK_REMOVE; + + return stm->waitable ? XPOLL_WAITABLE_CONTINUE : XPOLL_WAITABLE_REMOVE; } static void * @@ -532,6 +555,7 @@ cubeb_init(cubeb ** context, char const * context_name) assert(ctx); ctx->xpoll = xpoll_init(); + assert(ctx->xpoll); r = pthread_attr_init(&attr); assert(r == 0); @@ -674,6 +698,7 @@ cubeb_stream_start(cubeb_stream * stm) assert(nfds > 0); fds = calloc(nfds, sizeof(struct pollfd)); + assert(fds); r = snd_pcm_poll_descriptors(stm->pcm, fds, nfds); assert(r == nfds); @@ -692,13 +717,14 @@ cubeb_stream_stop(cubeb_stream * stm) assert(stm); pthread_mutex_lock(&stm->mutex); - if (!stm->waitable) { - pthread_mutex_unlock(&stm->mutex); - return CUBEB_OK; + if (stm->waitable) { + xpoll_waitable_destroy(stm->waitable); + stm->waitable = NULL; + } + if (stm->timer) { + xpoll_timer_destroy(stm->timer); + stm->timer = NULL; } - - xpoll_waitable_destroy(stm->waitable); - stm->waitable = NULL; snd_pcm_pause(stm->pcm, 1); pthread_mutex_unlock(&stm->mutex); |