diff options
author | Alex Chronopoulos <[email protected]> | 2016-01-21 15:20:03 +1300 |
---|---|---|
committer | Matthew Gregan <[email protected]> | 2016-01-21 15:20:03 +1300 |
commit | 79ff75467fd439b435843d98da0d124babc6233b (patch) | |
tree | 514e8f6c73e411658ff3049dad51f08caf427c26 | |
parent | 6ee511d77b9cc34b0b5d28c77d5f0726143adc02 (diff) | |
download | cubeb-79ff75467fd439b435843d98da0d124babc6233b.tar.gz cubeb-79ff75467fd439b435843d98da0d124babc6233b.zip |
pulse: Add input and full-duplex stream modes.
Squashed commit of the following:
commit e7fbb781f8442be81871a9e2b4aeb620b599d56c
Merge: 6ee511d adf5f85
Author: Matthew Gregan <[email protected]>
Date: Thu Jan 21 15:16:51 2016 +1300
pulse: Address review comments.
commit adf5f856867a57280e5fee0db54376875f758e9b
Author: Alex Chronopoulos <[email protected]>
Date: Wed Jan 20 18:50:25 2016 +0200
Change input stream name and remove monotonic
commit 04623fdd54459812b8b132d8ac8540e2a9b420c2
Author: Alex Chronopoulos <[email protected]>
Date: Wed Jan 20 18:19:00 2016 +0200
Return error invalid format
commit 6c220747aeb94dd540ba9025b6574ee966c3f4b9
Author: Alex Chronopoulos <[email protected]>
Date: Wed Jan 20 17:15:44 2016 +0200
Apply review comments
commit 217b1c79f2f7282ff52faf81ad74760db570a3e0
Author: Alex Chronopoulos <[email protected]>
Date: Wed Jan 20 15:40:21 2016 +0200
reset test files to master
commit 008869c8176dfc77260913b1d03e1630db914995
Author: Alex Chronopoulos <[email protected]>
Date: Wed Jan 20 15:33:07 2016 +0200
Reset resampler to master
commit 5414e31bb0e4e5f3bb0901750aceeea9d441b982
Author: Alex Chronopoulos <[email protected]>
Date: Wed Jan 20 11:13:02 2016 +0200
Reset alsa to master
commit 3231a4394a6289e3299268e0d428c939199d3d52
Author: Alex Chronopoulos <[email protected]>
Date: Fri Jan 15 18:47:50 2016 +0200
Update tests with devid
commit 25b8fd6fb25291c7b9680c8adf01bd395c1a401e
Author: Alex Chronopoulos <[email protected]>
Date: Fri Jan 15 18:46:07 2016 +0200
Do not shutdown without drain first
commit 1b2766fcf0bd6e2a2a56715f8e0d0db638a00edb
Merge: 27e495b ce689bc
Author: Alex Chronopoulos <[email protected]>
Date: Fri Jan 15 14:29:25 2016 +0200
Rebase after review comments
commit 27e495b6f3f42c0bb7bdfc4faed35f231a89d4be
Author: Alex Chronopoulos <[email protected]>
Date: Fri Jan 8 16:49:46 2016 +0200
Update after review comments
commit 2142c6215a27d8c71437c6487f592199908afb84
Author: Alex Chronopoulos <[email protected]>
Date: Mon Dec 28 18:33:51 2015 +0200
Avoid truncating the input
commit 55f98e271378ab482a8cded6db6cc51afb8129a3
Author: Alex Chronopoulos <[email protected]>
Date: Wed Dec 16 15:29:40 2015 +0200
Create remaining buffer
commit 8a16fda1a6d71965797a03a41f887d26288ba10b
Author: Alex Chronopoulos <[email protected]>
Date: Thu Dec 10 14:26:07 2015 -0500
Dummy commit
commit d710886595cc4e8dc5567ac51bf9993f25b499bd
Author: Alex Chronopoulos <[email protected]>
Date: Wed Dec 9 09:54:10 2015 -0600
Add device id
commit dd805eed9641da44684d9d74fbd42fc7152402a0
Author: achronop <[email protected]>
Date: Thu Dec 3 18:57:26 2015 +0200
Set minimu buffer and some compile errors.
commit 1c48e603dd885ab29af888da93cf29367d9a6a47
Author: achronop <[email protected]>
Date: Tue Dec 1 15:22:09 2015 +0200
Separate input and output operation.
commit 1267ff37791a16bb5eadc029a57005f0e515edfa
Author: achronop <[email protected]>
Date: Mon Nov 30 11:13:19 2015 +0200
Set up record stream and feed it to data callback
commit 72cf1d740eb4b8b77ec18b3a9b2c05650490f484
Author: achronop <[email protected]>
Date: Wed Nov 25 11:46:23 2015 +0200
Replace stream to output stream and add input stream.
commit a15e1d74df39c3be15743b0bba2e34b4d9d7b87f
Author: achronop <[email protected]>
Date: Fri Nov 20 18:54:08 2015 +0200
alsa: just make it compile
commit d79e5b0f5fb2523418a2758f3a432aea292ef193
Author: achronop <[email protected]>
Date: Fri Nov 20 18:31:41 2015 +0200
Just make pulse compile
commit 916d0c4535b8a0ed027a6dae2730cd1b8f7e0db7
Author: achronop <[email protected]>
Date: Fri Nov 20 17:47:01 2015 +0200
Apply padenot's patch to HEAD without win
commit ce689bc203fc2d2a564e7e8aaa88fcacf94b5962
Merge: 23a17cb 47d5237
Author: Alex Chronopoulos <[email protected]>
Date: Fri Jan 8 20:21:41 2016 +0200
Full-duplex implementation for PulseAudio including common files
commit 47d5237e9510f934382b2c8179bf1f873922b94b
Author: Alex Chronopoulos <[email protected]>
Date: Fri Jan 8 16:49:46 2016 +0200
Update after review comments
commit d0c1ec7cf1d7d635a5f287a0cde17ade1d5df332
Author: Alex Chronopoulos <[email protected]>
Date: Mon Dec 28 18:33:51 2015 +0200
Avoid truncating the input
commit 083fd4bce68b102337451d6f75b832415364fabc
Author: Alex Chronopoulos <[email protected]>
Date: Wed Dec 16 15:29:40 2015 +0200
Create remaining buffer
commit 71d4c64ae5fc7c7f1268d5d2593bfac65d8689cf
Author: Alex Chronopoulos <[email protected]>
Date: Thu Dec 10 14:26:07 2015 -0500
Dummy commit
commit 60a01d0aa8cbbf9d73820a5504ecb1a04ffb33e1
Author: Alex Chronopoulos <[email protected]>
Date: Wed Dec 9 10:36:06 2015 -0600
Pual's change about dev id
commit 5a3fedd7d77b20b5d2c03a2aa375044218cd71db
Author: Alex Chronopoulos <[email protected]>
Date: Wed Dec 9 09:54:10 2015 -0600
Add device id
commit a433cec38f9c6d1854720717b1a56b513f6741ed
Author: achronop <[email protected]>
Date: Thu Dec 3 18:57:26 2015 +0200
Set minimu buffer and some compile errors.
commit c3d3c439319cd3f6aeaa2a653a83c2e59a8b756a
Author: achronop <[email protected]>
Date: Tue Dec 1 15:22:09 2015 +0200
Separate input and output operation.
commit cc12d7e3c4edc0ff50c58803b8a7c4d36e55a045
Author: achronop <[email protected]>
Date: Mon Nov 30 11:13:19 2015 +0200
Set up record stream and feed it to data callback
commit 0cb6b12eea12fc3ca07b016623451336927d37f7
Author: achronop <[email protected]>
Date: Wed Nov 25 11:46:23 2015 +0200
Replace stream to output stream and add input stream.
commit e059a8c995e653c3a98f9b1281d33cd79426a570
Author: achronop <[email protected]>
Date: Fri Nov 20 18:54:08 2015 +0200
alsa: just make it compile
commit f6b27e5383ba89c89836f6eb2296781f5fa81183
Author: achronop <[email protected]>
Date: Fri Nov 20 18:31:41 2015 +0200
Just make pulse compile
commit d3f10c5e277e7562ae92f9fd094c03b2bddc4c85
Author: achronop <[email protected]>
Date: Fri Nov 20 17:47:01 2015 +0200
Apply padenot's patch to HEAD without win
-rw-r--r-- | include/cubeb/cubeb.h | 8 | ||||
-rw-r--r-- | src/cubeb-internal.h | 4 | ||||
-rw-r--r-- | src/cubeb_pulse.c | 425 |
3 files changed, 336 insertions, 101 deletions
diff --git a/include/cubeb/cubeb.h b/include/cubeb/cubeb.h index 6a56b8b..f46842f 100644 --- a/include/cubeb/cubeb.h +++ b/include/cubeb/cubeb.h @@ -118,6 +118,10 @@ typedef enum { } cubeb_stream_type; #endif +/** An opaque handle used to refer a particular input or output device + * across calls. */ +typedef void * cubeb_devid; + /** Stream format initialization parameters. */ typedef struct { cubeb_sample_format format; /**< Requested sample format. One of @@ -171,10 +175,6 @@ typedef enum { CUBEB_DEVICE_STATE_ENABLED /**< The device is enabled. */ } cubeb_device_state; -/** An opaque handle used to refer a particular input or output device - * accross calls. */ -typedef void * cubeb_devid; - /** * Architecture specific sample type. */ diff --git a/src/cubeb-internal.h b/src/cubeb-internal.h index 11e8c99..3a0c1d8 100644 --- a/src/cubeb-internal.h +++ b/src/cubeb-internal.h @@ -22,7 +22,9 @@ struct cubeb_ops { int (* enumerate_devices)(cubeb * context, cubeb_device_type type, cubeb_device_collection ** collection); void (* destroy)(cubeb * context); - int (* stream_init)(cubeb * context, cubeb_stream ** stream, char const * stream_name, + int (* stream_init)(cubeb * context, + cubeb_stream ** stream, + char const * stream_name, cubeb_devid input_device, cubeb_stream_params * input_stream_params, cubeb_devid output_device, diff --git a/src/cubeb_pulse.c b/src/cubeb_pulse.c index 6f1fb70..5744567 100644 --- a/src/cubeb_pulse.c +++ b/src/cubeb_pulse.c @@ -70,12 +70,27 @@ X(pa_threaded_mainloop_unlock) \ X(pa_threaded_mainloop_wait) \ X(pa_usec_to_bytes) \ + X(pa_stream_set_read_callback) \ + X(pa_stream_connect_record) \ + X(pa_stream_readable_size) \ + X(pa_stream_peek) \ + X(pa_stream_drop) \ + X(pa_stream_get_buffer_attr) \ #define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x; LIBPULSE_API_VISIT(MAKE_TYPEDEF); #undef MAKE_TYPEDEF #endif +//#define LOGGING_ENABLED +#ifdef LOGGING_ENABLED +#define LOG(...) do { \ + fprintf(stderr, __VA_ARGS__); \ + } while(0) +#else +#define LOG(...) +#endif + static struct cubeb_ops const pulse_ops; struct cubeb { @@ -90,12 +105,14 @@ struct cubeb { struct cubeb_stream { cubeb * context; - pa_stream * stream; + pa_stream * output_stream; + pa_stream * input_stream; cubeb_data_callback data_callback; cubeb_state_callback state_callback; void * user_ptr; pa_time_event * drain_timer; - pa_sample_spec sample_spec; + pa_sample_spec output_sample_spec; + pa_sample_spec input_sample_spec; int shutdown; float volume; }; @@ -113,6 +130,7 @@ sink_info_callback(pa_context * context, const pa_sink_info * info, int eol, voi { cubeb * ctx = u; if (!eol) { + free(ctx->default_sink_info); ctx->default_sink_info = malloc(sizeof(pa_sink_info)); memcpy(ctx->default_sink_info, info, sizeof(pa_sink_info)); } @@ -170,46 +188,52 @@ stream_state_callback(pa_stream * s, void * u) } static void -stream_request_callback(pa_stream * s, size_t nbytes, void * u) +trigger_user_callback(pa_stream * s, void const * input_data, size_t nbytes, cubeb_stream * stm) { - cubeb_stream * stm; void * buffer; size_t size; int r; long got; - size_t towrite; + size_t towrite, read_offset; size_t frame_size; - stm = u; - - if (stm->shutdown) - return; - - frame_size = WRAP(pa_frame_size)(&stm->sample_spec); - + frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec); assert(nbytes % frame_size == 0); towrite = nbytes; - + read_offset = 0; while (towrite) { size = towrite; r = WRAP(pa_stream_begin_write)(s, &buffer, &size); + if (r < 0) { + // Never get here in normal scenario + LOG("Unexpected error. Debugging with rr causes this\n"); + WRAP(pa_stream_cancel_write)(s); + stm->shutdown = 1; + return; + } assert(r == 0); assert(size > 0); assert(size % frame_size == 0); - got = stm->data_callback(stm, stm->user_ptr, NULL, buffer, size / frame_size); + LOG("Trigger user callback with output buffer size=%zd, read_offset=%zd\n", size, read_offset); + got = stm->data_callback(stm, stm->user_ptr, (uint8_t const *)input_data + read_offset, buffer, size / frame_size); if (got < 0) { WRAP(pa_stream_cancel_write)(s); stm->shutdown = 1; return; } + // If more iterations move offset of read buffer + if (input_data) { + size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec); + read_offset += (size / frame_size) * in_frame_size; + } if (stm->volume != PULSE_NO_GAIN) { - uint32_t samples = size * stm->sample_spec.channels / frame_size ; + uint32_t samples = size * stm->output_sample_spec.channels / frame_size ; - if (stm->sample_spec.format == PA_SAMPLE_S16BE || - stm->sample_spec.format == PA_SAMPLE_S16LE) { + if (stm->output_sample_spec.format == PA_SAMPLE_S16BE || + stm->output_sample_spec.format == PA_SAMPLE_S16LE) { short * b = buffer; for (uint32_t i = 0; i < samples; i++) { b[i] *= stm->volume; @@ -247,6 +271,78 @@ stream_request_callback(pa_stream * s, size_t nbytes, void * u) } static int +read_from_input(pa_stream * s, void const ** buffer, size_t * size) +{ + size_t readable_size = WRAP(pa_stream_readable_size)(s); + if (readable_size > 0) { + if (WRAP(pa_stream_peek)(s, buffer, size) < 0) { + return -1; + } + } + return readable_size; +} + +static void +stream_write_callback(pa_stream * s, size_t nbytes, void * u) +{ + LOG("Output callback to be written buffer size %zd\n", nbytes); + cubeb_stream * stm = u; + if (stm->shutdown) { + return; + } + + if (!stm->input_stream){ + // Output/playback only operation. + // Write directly to output + assert(!stm->input_stream && stm->output_stream); + trigger_user_callback(s, NULL, nbytes, stm); + } +} + +static void +stream_read_callback(pa_stream * s, size_t nbytes, void * u) +{ + LOG("Input callback buffer size %zd\n", nbytes); + cubeb_stream * stm = u; + if (stm->shutdown) { + return; + } + + void const * read_data = NULL; + size_t read_size; + while (read_from_input(s, &read_data, &read_size) > 0) { + /* read_data can be NULL in case of a hole. */ + if (read_data) { + size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec); + size_t read_frames = read_size / in_frame_size; + + if (stm->output_stream) { + // input/capture + output/playback operation + size_t out_frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec); + size_t write_size = read_frames * out_frame_size; + // Offer full duplex data for writing + trigger_user_callback(stm->output_stream, read_data, write_size, stm); + } else { + // input/capture only operation. Call callback directly + long got = stm->data_callback(stm, stm->user_ptr, read_data, NULL, read_frames); + if (got < 0 || (size_t) got != read_frames) { + WRAP(pa_stream_cancel_write)(s); + stm->shutdown = 1; + break; + } + } + } + if (read_size > 0) { + WRAP(pa_stream_drop)(s); + } + + if (stm->shutdown) { + return; + } + } +} + +static int wait_until_context_ready(cubeb * ctx) { for (;;) { @@ -261,15 +357,32 @@ wait_until_context_ready(cubeb * ctx) } static int -wait_until_stream_ready(cubeb_stream * stm) +wait_until_io_stream_ready(pa_stream * stream, pa_threaded_mainloop * mainloop) { + if (!stream || !mainloop){ + return -1; + } for (;;) { - pa_stream_state_t state = WRAP(pa_stream_get_state)(stm->stream); + pa_stream_state_t state = WRAP(pa_stream_get_state)(stream); if (!PA_STREAM_IS_GOOD(state)) return -1; if (state == PA_STREAM_READY) break; - WRAP(pa_threaded_mainloop_wait)(stm->context->mainloop); + WRAP(pa_threaded_mainloop_wait)(mainloop); + } + return 0; +} + +static int +wait_until_stream_ready(cubeb_stream * stm) +{ + if (stm->output_stream && + wait_until_io_stream_ready(stm->output_stream, stm->context->mainloop) == -1) { + return -1; + } + if(stm->input_stream && + wait_until_io_stream_ready(stm->input_stream, stm->context->mainloop) == -1) { + return -1; } return 0; } @@ -290,16 +403,25 @@ operation_wait(cubeb * ctx, pa_stream * stream, pa_operation * o) } static void -stream_cork(cubeb_stream * stm, enum cork_state state) +cork_io_stream(cubeb_stream * stm, pa_stream * io_stream, enum cork_state state) { pa_operation * o; - - WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); - o = WRAP(pa_stream_cork)(stm->stream, state & CORK, stream_success_callback, stm); + if (!io_stream) { + return; + } + o = WRAP(pa_stream_cork)(io_stream, state & CORK, stream_success_callback, stm); if (o) { - operation_wait(stm->context, stm->stream, o); + operation_wait(stm->context, io_stream, o); WRAP(pa_operation_unref)(o); } +} + +static void +stream_cork(cubeb_stream * stm, enum cork_state state) +{ + WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); + cork_io_stream(stm, stm->output_stream, state); + cork_io_stream(stm, stm->input_stream, state); WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); if (state & NOTIFY) { @@ -308,6 +430,33 @@ stream_cork(cubeb_stream * stm, enum cork_state state) } } +static int +stream_update_timing_info(cubeb_stream * stm) +{ + int r = -1; + pa_operation * o = NULL; + if (stm->output_stream) { + o = WRAP(pa_stream_update_timing_info)(stm->output_stream, stream_success_callback, stm); + if (o) { + r = operation_wait(stm->context, stm->output_stream, o); + WRAP(pa_operation_unref)(o); + } + if (r != 0) { + return r; + } + } + + if (stm->input_stream) { + o = WRAP(pa_stream_update_timing_info)(stm->input_stream, stream_success_callback, stm); + if (o) { + r = operation_wait(stm->context, stm->input_stream, o); + WRAP(pa_operation_unref)(o); + } + } + + return r; +} + static void pulse_context_destroy(cubeb * ctx); static void pulse_destroy(cubeb * ctx); @@ -485,97 +634,145 @@ pulse_destroy(cubeb * ctx) static void pulse_stream_destroy(cubeb_stream * stm); +pa_sample_format_t +cubeb_to_pulse_format(cubeb_sample_format format) +{ + switch (format) { + case CUBEB_SAMPLE_S16LE: + return PA_SAMPLE_S16LE; + case CUBEB_SAMPLE_S16BE: + return PA_SAMPLE_S16BE; + case CUBEB_SAMPLE_FLOAT32LE: + return PA_SAMPLE_FLOAT32LE; + case CUBEB_SAMPLE_FLOAT32BE: + return PA_SAMPLE_FLOAT32BE; + default: + return PA_SAMPLE_INVALID; + } +} + +static int +create_pa_stream(cubeb_stream * stm, + pa_stream ** pa_stm, + cubeb_stream_params * stream_params, + char const * stream_name) +{ + assert(stm && stream_params); + *pa_stm = NULL; + pa_sample_spec ss; + ss.format = cubeb_to_pulse_format(stream_params->format); + if (ss.format == PA_SAMPLE_INVALID) + return CUBEB_ERROR_INVALID_FORMAT; + ss.rate = stream_params->rate; + ss.channels = stream_params->channels; + + *pa_stm = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL); + return (*pa_stm == NULL) ? CUBEB_ERROR : CUBEB_OK; +} + +static pa_buffer_attr +set_buffering_attribute(unsigned int latency, pa_sample_spec * sample_spec) +{ + pa_buffer_attr battr; + battr.maxlength = -1; + battr.prebuf = -1; + battr.tlength = WRAP(pa_usec_to_bytes)(latency * PA_USEC_PER_MSEC, sample_spec); + battr.minreq = battr.tlength / 4; + battr.fragsize = battr.minreq; + + LOG("Requested buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n", + battr.maxlength, battr.tlength, battr.prebuf, battr.minreq, battr.fragsize); + + return battr; +} + static int -pulse_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_name, +pulse_stream_init(cubeb * context, + cubeb_stream ** stream, + char const * stream_name, cubeb_devid input_device, cubeb_stream_params * input_stream_params, cubeb_devid output_device, cubeb_stream_params * output_stream_params, unsigned int latency, - cubeb_data_callback data_callback, cubeb_state_callback state_callback, + cubeb_data_callback data_callback, + cubeb_state_callback state_callback, void * user_ptr) { - pa_sample_spec ss; cubeb_stream * stm; - pa_operation * o; pa_buffer_attr battr; int r; assert(context); - assert(!input_stream_params && "not supported."); - if (input_device || output_device) { - /* Device selection not yet implemented. */ - return CUBEB_ERROR_DEVICE_UNAVAILABLE; - } - - *stream = NULL; - - switch (output_stream_params->format) { - case CUBEB_SAMPLE_S16LE: - ss.format = PA_SAMPLE_S16LE; - break; - case CUBEB_SAMPLE_S16BE: - ss.format = PA_SAMPLE_S16BE; - break; - case CUBEB_SAMPLE_FLOAT32LE: - ss.format = PA_SAMPLE_FLOAT32LE; - break; - case CUBEB_SAMPLE_FLOAT32BE: - ss.format = PA_SAMPLE_FLOAT32BE; - break; - default: - return CUBEB_ERROR_INVALID_FORMAT; - } // If the connection failed for some reason, try to reconnect if (context->error == 1 && pulse_context_init(context) != 0) { return CUBEB_ERROR; } - ss.rate = output_stream_params->rate; - ss.channels = output_stream_params->channels; + *stream = NULL; stm = calloc(1, sizeof(*stm)); assert(stm); stm->context = context; - stm->data_callback = data_callback; stm->state_callback = state_callback; stm->user_ptr = user_ptr; - - stm->sample_spec = ss; stm->volume = PULSE_NO_GAIN; - battr.maxlength = -1; - battr.tlength = WRAP(pa_usec_to_bytes)(latency * PA_USEC_PER_MSEC, &stm->sample_spec); - battr.prebuf = -1; - battr.minreq = battr.tlength / 4; - battr.fragsize = -1; - WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); - stm->stream = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL); - if (!stm->stream) { - pulse_stream_destroy(stm); - return CUBEB_ERROR; + if (output_stream_params) { + r = create_pa_stream(stm, &stm->output_stream, output_stream_params, stream_name); + if (r != CUBEB_OK) { + WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); + pulse_stream_destroy(stm); + return r; + } + + stm->output_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->output_stream)); + + WRAP(pa_stream_set_state_callback)(stm->output_stream, stream_state_callback, stm); + WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_write_callback, stm); + + battr = set_buffering_attribute(latency, &stm->output_sample_spec); + WRAP(pa_stream_connect_playback)(stm->output_stream, + output_device, + &battr, + PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING | + PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY, + NULL, NULL); } - WRAP(pa_stream_set_state_callback)(stm->stream, stream_state_callback, stm); - WRAP(pa_stream_set_write_callback)(stm->stream, stream_request_callback, stm); - WRAP(pa_stream_connect_playback)(stm->stream, NULL, &battr, + + // Set up input stream + if (input_stream_params) { + r = create_pa_stream(stm, &stm->input_stream, input_stream_params, stream_name); + if (r != CUBEB_OK) { + WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); + pulse_stream_destroy(stm); + return r; + } + + stm->input_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->input_stream)); + + WRAP(pa_stream_set_state_callback)(stm->input_stream, stream_state_callback, stm); + WRAP(pa_stream_set_read_callback)(stm->input_stream, stream_read_callback, stm); + + battr = set_buffering_attribute(latency, &stm->input_sample_spec); + WRAP(pa_stream_connect_record)(stm->input_stream, + input_device, + &battr, PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING | - PA_STREAM_START_CORKED, - NULL, NULL); + PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY); + } r = wait_until_stream_ready(stm); if (r == 0) { /* force a timing update now, otherwise timing info does not become valid until some point after initialization has completed. */ - o = WRAP(pa_stream_update_timing_info)(stm->stream, stream_success_callback, stm); - if (o) { - r = operation_wait(stm->context, stm->stream, o); - WRAP(pa_operation_unref)(o); - } + r = stream_update_timing_info(stm); } + WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); if (r != 0) { @@ -583,6 +780,22 @@ pulse_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n return CUBEB_ERROR; } +#ifdef LOGGING_ENABLED + if (output_stream_params){ + const pa_buffer_attr * output_att; + output_att = WRAP(pa_stream_get_buffer_attr)(stm->output_stream); + LOG("Output buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",output_att->maxlength, output_att->tlength, + output_att->prebuf, output_att->minreq, output_att->fragsize); + } + + if (input_stream_params){ + const pa_buffer_attr * input_att; + input_att = WRAP(pa_stream_get_buffer_attr)(stm->input_stream); + LOG("Input buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",input_att->maxlength, input_att->tlength, + input_att->prebuf, input_att->minreq, input_att->fragsize); + } +#endif + *stream = stm; return CUBEB_OK; @@ -591,21 +804,27 @@ pulse_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n static void pulse_stream_destroy(cubeb_stream * stm) { - if (stm->stream) { - stream_cork(stm, CORK); + stream_cork(stm, CORK); - WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); + WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); + if (stm->output_stream) { if (stm->drain_timer) { /* there's no pa_rttime_free, so use this instead. */ WRAP(pa_threaded_mainloop_get_api)(stm->context->mainloop)->time_free(stm->drain_timer); } - WRAP(pa_stream_set_state_callback)(stm->stream, NULL, NULL); - WRAP(pa_stream_disconnect)(stm->stream); - WRAP(pa_stream_unref)(stm->stream); - WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); + WRAP(pa_stream_set_state_callback)(stm->output_stream, NULL, NULL); + WRAP(pa_stream_disconnect)(stm->output_stream); + WRAP(pa_stream_unref)(stm->output_stream); + } + + if (stm->input_stream) { + WRAP(pa_stream_set_state_callback)(stm->input_stream, NULL, NULL); + WRAP(pa_stream_disconnect)(stm->input_stream); + WRAP(pa_stream_unref)(stm->input_stream); } + WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); free(stm); } @@ -631,12 +850,16 @@ pulse_stream_get_position(cubeb_stream * stm, uint64_t * position) pa_usec_t r_usec; uint64_t bytes; + if (!stm || !stm->output_stream) { + return CUBEB_ERROR; + } + in_thread = WRAP(pa_threaded_mainloop_in_thread)(stm->context->mainloop); if (!in_thread) { WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); } - r = WRAP(pa_stream_get_time)(stm->stream, &r_usec); + r = WRAP(pa_stream_get_time)(stm->output_stream, &r_usec); if (!in_thread) { WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); } @@ -645,8 +868,8 @@ pulse_stream_get_position(cubeb_stream * stm, uint64_t * position) return CUBEB_ERROR; } - bytes = WRAP(pa_usec_to_bytes)(r_usec, &stm->sample_spec); - *position = bytes / WRAP(pa_frame_size)(&stm->sample_spec); + bytes = WRAP(pa_usec_to_bytes)(r_usec, &stm->output_sample_spec); + *position = bytes / WRAP(pa_frame_size)(&stm->output_sample_spec); return CUBEB_OK; } @@ -657,17 +880,17 @@ pulse_stream_get_latency(cubeb_stream * stm, uint32_t * latency) pa_usec_t r_usec; int negative, r; - if (!stm) { + if (!stm || !stm->output_stream) { return CUBEB_ERROR; } - r = WRAP(pa_stream_get_latency)(stm->stream, &r_usec, &negative); + r = WRAP(pa_stream_get_latency)(stm->output_stream, &r_usec, &negative); assert(!negative); if (r) { return CUBEB_ERROR; } - *latency = r_usec * stm->sample_spec.rate / PA_USEC_PER_SEC; + *latency = r_usec * stm->output_sample_spec.rate / PA_USEC_PER_SEC; return CUBEB_OK; } @@ -687,6 +910,10 @@ pulse_stream_set_volume(cubeb_stream * stm, float volume) pa_cvolume cvol; const pa_sample_spec * ss; + if (!stm->output_stream) { + return CUBEB_ERROR; + } + WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); while (!stm->context->default_sink_info) { @@ -698,18 +925,18 @@ pulse_stream_set_volume(cubeb_stream * stm, float volume) if (stm->context->default_sink_info->flags & PA_SINK_FLAT_VOLUME) { stm->volume = volume; } else { - ss = WRAP(pa_stream_get_sample_spec)(stm->stream); + ss = WRAP(pa_stream_get_sample_spec)(stm->output_stream); vol = WRAP(pa_sw_volume_from_linear)(volume); WRAP(pa_cvolume_set)(&cvol, ss->channels, vol); - index = WRAP(pa_stream_get_index)(stm->stream); + index = WRAP(pa_stream_get_index)(stm->output_stream); op = WRAP(pa_context_set_sink_input_volume)(stm->context->context, index, &cvol, volume_success, stm); if (op) { - operation_wait(stm->context, stm->stream, op); + operation_wait(stm->context, stm->output_stream, op); WRAP(pa_operation_unref)(op); } } @@ -725,7 +952,11 @@ pulse_stream_set_panning(cubeb_stream * stream, float panning) const pa_channel_map * map; pa_cvolume vol; - map = WRAP(pa_stream_get_channel_map)(stream->stream); + if (!stream->output_stream) { + return CUBEB_ERROR; + } + + map = WRAP(pa_stream_get_channel_map)(stream->output_stream); if (!WRAP(pa_channel_map_can_balance)(map)) { return CUBEB_ERROR; @@ -935,11 +1166,13 @@ pulse_enumerate_devices(cubeb * context, cubeb_device_type type, } *collection = malloc(sizeof(cubeb_device_collection) + - sizeof(cubeb_device_info*) * (user_data.count > 0 ? user_data.count - 1 : 0)); + sizeof(cubeb_device_info *) * (user_data.count > 0 ? user_data.count - 1 : 0)); (*collection)->count = user_data.count; for (i = 0; i < user_data.count; i++) (*collection)->device[i] = user_data.devinfo[i]; + free(user_data.default_sink_name); + free(user_data.default_source_name); free(user_data.devinfo); return CUBEB_OK; } |