aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAlex Chronopoulos <[email protected]>2016-01-21 15:20:03 +1300
committerMatthew Gregan <[email protected]>2016-01-21 15:20:03 +1300
commit79ff75467fd439b435843d98da0d124babc6233b (patch)
tree514e8f6c73e411658ff3049dad51f08caf427c26
parent6ee511d77b9cc34b0b5d28c77d5f0726143adc02 (diff)
downloadcubeb-79ff75467fd439b435843d98da0d124babc6233b.tar.gz
cubeb-79ff75467fd439b435843d98da0d124babc6233b.zip
pulse: Add input and full-duplex stream modes.
Squashed commit of the following: commit e7fbb781f8442be81871a9e2b4aeb620b599d56c Merge: 6ee511d adf5f85 Author: Matthew Gregan <[email protected]> Date: Thu Jan 21 15:16:51 2016 +1300 pulse: Address review comments. commit adf5f856867a57280e5fee0db54376875f758e9b Author: Alex Chronopoulos <[email protected]> Date: Wed Jan 20 18:50:25 2016 +0200 Change input stream name and remove monotonic commit 04623fdd54459812b8b132d8ac8540e2a9b420c2 Author: Alex Chronopoulos <[email protected]> Date: Wed Jan 20 18:19:00 2016 +0200 Return error invalid format commit 6c220747aeb94dd540ba9025b6574ee966c3f4b9 Author: Alex Chronopoulos <[email protected]> Date: Wed Jan 20 17:15:44 2016 +0200 Apply review comments commit 217b1c79f2f7282ff52faf81ad74760db570a3e0 Author: Alex Chronopoulos <[email protected]> Date: Wed Jan 20 15:40:21 2016 +0200 reset test files to master commit 008869c8176dfc77260913b1d03e1630db914995 Author: Alex Chronopoulos <[email protected]> Date: Wed Jan 20 15:33:07 2016 +0200 Reset resampler to master commit 5414e31bb0e4e5f3bb0901750aceeea9d441b982 Author: Alex Chronopoulos <[email protected]> Date: Wed Jan 20 11:13:02 2016 +0200 Reset alsa to master commit 3231a4394a6289e3299268e0d428c939199d3d52 Author: Alex Chronopoulos <[email protected]> Date: Fri Jan 15 18:47:50 2016 +0200 Update tests with devid commit 25b8fd6fb25291c7b9680c8adf01bd395c1a401e Author: Alex Chronopoulos <[email protected]> Date: Fri Jan 15 18:46:07 2016 +0200 Do not shutdown without drain first commit 1b2766fcf0bd6e2a2a56715f8e0d0db638a00edb Merge: 27e495b ce689bc Author: Alex Chronopoulos <[email protected]> Date: Fri Jan 15 14:29:25 2016 +0200 Rebase after review comments commit 27e495b6f3f42c0bb7bdfc4faed35f231a89d4be Author: Alex Chronopoulos <[email protected]> Date: Fri Jan 8 16:49:46 2016 +0200 Update after review comments commit 2142c6215a27d8c71437c6487f592199908afb84 Author: Alex Chronopoulos <[email protected]> Date: Mon Dec 28 18:33:51 2015 +0200 Avoid truncating the input commit 55f98e271378ab482a8cded6db6cc51afb8129a3 Author: Alex Chronopoulos <[email protected]> Date: Wed Dec 16 15:29:40 2015 +0200 Create remaining buffer commit 8a16fda1a6d71965797a03a41f887d26288ba10b Author: Alex Chronopoulos <[email protected]> Date: Thu Dec 10 14:26:07 2015 -0500 Dummy commit commit d710886595cc4e8dc5567ac51bf9993f25b499bd Author: Alex Chronopoulos <[email protected]> Date: Wed Dec 9 09:54:10 2015 -0600 Add device id commit dd805eed9641da44684d9d74fbd42fc7152402a0 Author: achronop <[email protected]> Date: Thu Dec 3 18:57:26 2015 +0200 Set minimu buffer and some compile errors. commit 1c48e603dd885ab29af888da93cf29367d9a6a47 Author: achronop <[email protected]> Date: Tue Dec 1 15:22:09 2015 +0200 Separate input and output operation. commit 1267ff37791a16bb5eadc029a57005f0e515edfa Author: achronop <[email protected]> Date: Mon Nov 30 11:13:19 2015 +0200 Set up record stream and feed it to data callback commit 72cf1d740eb4b8b77ec18b3a9b2c05650490f484 Author: achronop <[email protected]> Date: Wed Nov 25 11:46:23 2015 +0200 Replace stream to output stream and add input stream. commit a15e1d74df39c3be15743b0bba2e34b4d9d7b87f Author: achronop <[email protected]> Date: Fri Nov 20 18:54:08 2015 +0200 alsa: just make it compile commit d79e5b0f5fb2523418a2758f3a432aea292ef193 Author: achronop <[email protected]> Date: Fri Nov 20 18:31:41 2015 +0200 Just make pulse compile commit 916d0c4535b8a0ed027a6dae2730cd1b8f7e0db7 Author: achronop <[email protected]> Date: Fri Nov 20 17:47:01 2015 +0200 Apply padenot's patch to HEAD without win commit ce689bc203fc2d2a564e7e8aaa88fcacf94b5962 Merge: 23a17cb 47d5237 Author: Alex Chronopoulos <[email protected]> Date: Fri Jan 8 20:21:41 2016 +0200 Full-duplex implementation for PulseAudio including common files commit 47d5237e9510f934382b2c8179bf1f873922b94b Author: Alex Chronopoulos <[email protected]> Date: Fri Jan 8 16:49:46 2016 +0200 Update after review comments commit d0c1ec7cf1d7d635a5f287a0cde17ade1d5df332 Author: Alex Chronopoulos <[email protected]> Date: Mon Dec 28 18:33:51 2015 +0200 Avoid truncating the input commit 083fd4bce68b102337451d6f75b832415364fabc Author: Alex Chronopoulos <[email protected]> Date: Wed Dec 16 15:29:40 2015 +0200 Create remaining buffer commit 71d4c64ae5fc7c7f1268d5d2593bfac65d8689cf Author: Alex Chronopoulos <[email protected]> Date: Thu Dec 10 14:26:07 2015 -0500 Dummy commit commit 60a01d0aa8cbbf9d73820a5504ecb1a04ffb33e1 Author: Alex Chronopoulos <[email protected]> Date: Wed Dec 9 10:36:06 2015 -0600 Pual's change about dev id commit 5a3fedd7d77b20b5d2c03a2aa375044218cd71db Author: Alex Chronopoulos <[email protected]> Date: Wed Dec 9 09:54:10 2015 -0600 Add device id commit a433cec38f9c6d1854720717b1a56b513f6741ed Author: achronop <[email protected]> Date: Thu Dec 3 18:57:26 2015 +0200 Set minimu buffer and some compile errors. commit c3d3c439319cd3f6aeaa2a653a83c2e59a8b756a Author: achronop <[email protected]> Date: Tue Dec 1 15:22:09 2015 +0200 Separate input and output operation. commit cc12d7e3c4edc0ff50c58803b8a7c4d36e55a045 Author: achronop <[email protected]> Date: Mon Nov 30 11:13:19 2015 +0200 Set up record stream and feed it to data callback commit 0cb6b12eea12fc3ca07b016623451336927d37f7 Author: achronop <[email protected]> Date: Wed Nov 25 11:46:23 2015 +0200 Replace stream to output stream and add input stream. commit e059a8c995e653c3a98f9b1281d33cd79426a570 Author: achronop <[email protected]> Date: Fri Nov 20 18:54:08 2015 +0200 alsa: just make it compile commit f6b27e5383ba89c89836f6eb2296781f5fa81183 Author: achronop <[email protected]> Date: Fri Nov 20 18:31:41 2015 +0200 Just make pulse compile commit d3f10c5e277e7562ae92f9fd094c03b2bddc4c85 Author: achronop <[email protected]> Date: Fri Nov 20 17:47:01 2015 +0200 Apply padenot's patch to HEAD without win
-rw-r--r--include/cubeb/cubeb.h8
-rw-r--r--src/cubeb-internal.h4
-rw-r--r--src/cubeb_pulse.c425
3 files changed, 336 insertions, 101 deletions
diff --git a/include/cubeb/cubeb.h b/include/cubeb/cubeb.h
index 6a56b8b..f46842f 100644
--- a/include/cubeb/cubeb.h
+++ b/include/cubeb/cubeb.h
@@ -118,6 +118,10 @@ typedef enum {
} cubeb_stream_type;
#endif
+/** An opaque handle used to refer a particular input or output device
+ * across calls. */
+typedef void * cubeb_devid;
+
/** Stream format initialization parameters. */
typedef struct {
cubeb_sample_format format; /**< Requested sample format. One of
@@ -171,10 +175,6 @@ typedef enum {
CUBEB_DEVICE_STATE_ENABLED /**< The device is enabled. */
} cubeb_device_state;
-/** An opaque handle used to refer a particular input or output device
- * accross calls. */
-typedef void * cubeb_devid;
-
/**
* Architecture specific sample type.
*/
diff --git a/src/cubeb-internal.h b/src/cubeb-internal.h
index 11e8c99..3a0c1d8 100644
--- a/src/cubeb-internal.h
+++ b/src/cubeb-internal.h
@@ -22,7 +22,9 @@ struct cubeb_ops {
int (* enumerate_devices)(cubeb * context, cubeb_device_type type,
cubeb_device_collection ** collection);
void (* destroy)(cubeb * context);
- int (* stream_init)(cubeb * context, cubeb_stream ** stream, char const * stream_name,
+ int (* stream_init)(cubeb * context,
+ cubeb_stream ** stream,
+ char const * stream_name,
cubeb_devid input_device,
cubeb_stream_params * input_stream_params,
cubeb_devid output_device,
diff --git a/src/cubeb_pulse.c b/src/cubeb_pulse.c
index 6f1fb70..5744567 100644
--- a/src/cubeb_pulse.c
+++ b/src/cubeb_pulse.c
@@ -70,12 +70,27 @@
X(pa_threaded_mainloop_unlock) \
X(pa_threaded_mainloop_wait) \
X(pa_usec_to_bytes) \
+ X(pa_stream_set_read_callback) \
+ X(pa_stream_connect_record) \
+ X(pa_stream_readable_size) \
+ X(pa_stream_peek) \
+ X(pa_stream_drop) \
+ X(pa_stream_get_buffer_attr) \
#define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x;
LIBPULSE_API_VISIT(MAKE_TYPEDEF);
#undef MAKE_TYPEDEF
#endif
+//#define LOGGING_ENABLED
+#ifdef LOGGING_ENABLED
+#define LOG(...) do { \
+ fprintf(stderr, __VA_ARGS__); \
+ } while(0)
+#else
+#define LOG(...)
+#endif
+
static struct cubeb_ops const pulse_ops;
struct cubeb {
@@ -90,12 +105,14 @@ struct cubeb {
struct cubeb_stream {
cubeb * context;
- pa_stream * stream;
+ pa_stream * output_stream;
+ pa_stream * input_stream;
cubeb_data_callback data_callback;
cubeb_state_callback state_callback;
void * user_ptr;
pa_time_event * drain_timer;
- pa_sample_spec sample_spec;
+ pa_sample_spec output_sample_spec;
+ pa_sample_spec input_sample_spec;
int shutdown;
float volume;
};
@@ -113,6 +130,7 @@ sink_info_callback(pa_context * context, const pa_sink_info * info, int eol, voi
{
cubeb * ctx = u;
if (!eol) {
+ free(ctx->default_sink_info);
ctx->default_sink_info = malloc(sizeof(pa_sink_info));
memcpy(ctx->default_sink_info, info, sizeof(pa_sink_info));
}
@@ -170,46 +188,52 @@ stream_state_callback(pa_stream * s, void * u)
}
static void
-stream_request_callback(pa_stream * s, size_t nbytes, void * u)
+trigger_user_callback(pa_stream * s, void const * input_data, size_t nbytes, cubeb_stream * stm)
{
- cubeb_stream * stm;
void * buffer;
size_t size;
int r;
long got;
- size_t towrite;
+ size_t towrite, read_offset;
size_t frame_size;
- stm = u;
-
- if (stm->shutdown)
- return;
-
- frame_size = WRAP(pa_frame_size)(&stm->sample_spec);
-
+ frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
assert(nbytes % frame_size == 0);
towrite = nbytes;
-
+ read_offset = 0;
while (towrite) {
size = towrite;
r = WRAP(pa_stream_begin_write)(s, &buffer, &size);
+ if (r < 0) {
+ // Never get here in normal scenario
+ LOG("Unexpected error. Debugging with rr causes this\n");
+ WRAP(pa_stream_cancel_write)(s);
+ stm->shutdown = 1;
+ return;
+ }
assert(r == 0);
assert(size > 0);
assert(size % frame_size == 0);
- got = stm->data_callback(stm, stm->user_ptr, NULL, buffer, size / frame_size);
+ LOG("Trigger user callback with output buffer size=%zd, read_offset=%zd\n", size, read_offset);
+ got = stm->data_callback(stm, stm->user_ptr, (uint8_t const *)input_data + read_offset, buffer, size / frame_size);
if (got < 0) {
WRAP(pa_stream_cancel_write)(s);
stm->shutdown = 1;
return;
}
+ // If more iterations move offset of read buffer
+ if (input_data) {
+ size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec);
+ read_offset += (size / frame_size) * in_frame_size;
+ }
if (stm->volume != PULSE_NO_GAIN) {
- uint32_t samples = size * stm->sample_spec.channels / frame_size ;
+ uint32_t samples = size * stm->output_sample_spec.channels / frame_size ;
- if (stm->sample_spec.format == PA_SAMPLE_S16BE ||
- stm->sample_spec.format == PA_SAMPLE_S16LE) {
+ if (stm->output_sample_spec.format == PA_SAMPLE_S16BE ||
+ stm->output_sample_spec.format == PA_SAMPLE_S16LE) {
short * b = buffer;
for (uint32_t i = 0; i < samples; i++) {
b[i] *= stm->volume;
@@ -247,6 +271,78 @@ stream_request_callback(pa_stream * s, size_t nbytes, void * u)
}
static int
+read_from_input(pa_stream * s, void const ** buffer, size_t * size)
+{
+ size_t readable_size = WRAP(pa_stream_readable_size)(s);
+ if (readable_size > 0) {
+ if (WRAP(pa_stream_peek)(s, buffer, size) < 0) {
+ return -1;
+ }
+ }
+ return readable_size;
+}
+
+static void
+stream_write_callback(pa_stream * s, size_t nbytes, void * u)
+{
+ LOG("Output callback to be written buffer size %zd\n", nbytes);
+ cubeb_stream * stm = u;
+ if (stm->shutdown) {
+ return;
+ }
+
+ if (!stm->input_stream){
+ // Output/playback only operation.
+ // Write directly to output
+ assert(!stm->input_stream && stm->output_stream);
+ trigger_user_callback(s, NULL, nbytes, stm);
+ }
+}
+
+static void
+stream_read_callback(pa_stream * s, size_t nbytes, void * u)
+{
+ LOG("Input callback buffer size %zd\n", nbytes);
+ cubeb_stream * stm = u;
+ if (stm->shutdown) {
+ return;
+ }
+
+ void const * read_data = NULL;
+ size_t read_size;
+ while (read_from_input(s, &read_data, &read_size) > 0) {
+ /* read_data can be NULL in case of a hole. */
+ if (read_data) {
+ size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec);
+ size_t read_frames = read_size / in_frame_size;
+
+ if (stm->output_stream) {
+ // input/capture + output/playback operation
+ size_t out_frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
+ size_t write_size = read_frames * out_frame_size;
+ // Offer full duplex data for writing
+ trigger_user_callback(stm->output_stream, read_data, write_size, stm);
+ } else {
+ // input/capture only operation. Call callback directly
+ long got = stm->data_callback(stm, stm->user_ptr, read_data, NULL, read_frames);
+ if (got < 0 || (size_t) got != read_frames) {
+ WRAP(pa_stream_cancel_write)(s);
+ stm->shutdown = 1;
+ break;
+ }
+ }
+ }
+ if (read_size > 0) {
+ WRAP(pa_stream_drop)(s);
+ }
+
+ if (stm->shutdown) {
+ return;
+ }
+ }
+}
+
+static int
wait_until_context_ready(cubeb * ctx)
{
for (;;) {
@@ -261,15 +357,32 @@ wait_until_context_ready(cubeb * ctx)
}
static int
-wait_until_stream_ready(cubeb_stream * stm)
+wait_until_io_stream_ready(pa_stream * stream, pa_threaded_mainloop * mainloop)
{
+ if (!stream || !mainloop){
+ return -1;
+ }
for (;;) {
- pa_stream_state_t state = WRAP(pa_stream_get_state)(stm->stream);
+ pa_stream_state_t state = WRAP(pa_stream_get_state)(stream);
if (!PA_STREAM_IS_GOOD(state))
return -1;
if (state == PA_STREAM_READY)
break;
- WRAP(pa_threaded_mainloop_wait)(stm->context->mainloop);
+ WRAP(pa_threaded_mainloop_wait)(mainloop);
+ }
+ return 0;
+}
+
+static int
+wait_until_stream_ready(cubeb_stream * stm)
+{
+ if (stm->output_stream &&
+ wait_until_io_stream_ready(stm->output_stream, stm->context->mainloop) == -1) {
+ return -1;
+ }
+ if(stm->input_stream &&
+ wait_until_io_stream_ready(stm->input_stream, stm->context->mainloop) == -1) {
+ return -1;
}
return 0;
}
@@ -290,16 +403,25 @@ operation_wait(cubeb * ctx, pa_stream * stream, pa_operation * o)
}
static void
-stream_cork(cubeb_stream * stm, enum cork_state state)
+cork_io_stream(cubeb_stream * stm, pa_stream * io_stream, enum cork_state state)
{
pa_operation * o;
-
- WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
- o = WRAP(pa_stream_cork)(stm->stream, state & CORK, stream_success_callback, stm);
+ if (!io_stream) {
+ return;
+ }
+ o = WRAP(pa_stream_cork)(io_stream, state & CORK, stream_success_callback, stm);
if (o) {
- operation_wait(stm->context, stm->stream, o);
+ operation_wait(stm->context, io_stream, o);
WRAP(pa_operation_unref)(o);
}
+}
+
+static void
+stream_cork(cubeb_stream * stm, enum cork_state state)
+{
+ WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
+ cork_io_stream(stm, stm->output_stream, state);
+ cork_io_stream(stm, stm->input_stream, state);
WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
if (state & NOTIFY) {
@@ -308,6 +430,33 @@ stream_cork(cubeb_stream * stm, enum cork_state state)
}
}
+static int
+stream_update_timing_info(cubeb_stream * stm)
+{
+ int r = -1;
+ pa_operation * o = NULL;
+ if (stm->output_stream) {
+ o = WRAP(pa_stream_update_timing_info)(stm->output_stream, stream_success_callback, stm);
+ if (o) {
+ r = operation_wait(stm->context, stm->output_stream, o);
+ WRAP(pa_operation_unref)(o);
+ }
+ if (r != 0) {
+ return r;
+ }
+ }
+
+ if (stm->input_stream) {
+ o = WRAP(pa_stream_update_timing_info)(stm->input_stream, stream_success_callback, stm);
+ if (o) {
+ r = operation_wait(stm->context, stm->input_stream, o);
+ WRAP(pa_operation_unref)(o);
+ }
+ }
+
+ return r;
+}
+
static void pulse_context_destroy(cubeb * ctx);
static void pulse_destroy(cubeb * ctx);
@@ -485,97 +634,145 @@ pulse_destroy(cubeb * ctx)
static void pulse_stream_destroy(cubeb_stream * stm);
+pa_sample_format_t
+cubeb_to_pulse_format(cubeb_sample_format format)
+{
+ switch (format) {
+ case CUBEB_SAMPLE_S16LE:
+ return PA_SAMPLE_S16LE;
+ case CUBEB_SAMPLE_S16BE:
+ return PA_SAMPLE_S16BE;
+ case CUBEB_SAMPLE_FLOAT32LE:
+ return PA_SAMPLE_FLOAT32LE;
+ case CUBEB_SAMPLE_FLOAT32BE:
+ return PA_SAMPLE_FLOAT32BE;
+ default:
+ return PA_SAMPLE_INVALID;
+ }
+}
+
+static int
+create_pa_stream(cubeb_stream * stm,
+ pa_stream ** pa_stm,
+ cubeb_stream_params * stream_params,
+ char const * stream_name)
+{
+ assert(stm && stream_params);
+ *pa_stm = NULL;
+ pa_sample_spec ss;
+ ss.format = cubeb_to_pulse_format(stream_params->format);
+ if (ss.format == PA_SAMPLE_INVALID)
+ return CUBEB_ERROR_INVALID_FORMAT;
+ ss.rate = stream_params->rate;
+ ss.channels = stream_params->channels;
+
+ *pa_stm = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL);
+ return (*pa_stm == NULL) ? CUBEB_ERROR : CUBEB_OK;
+}
+
+static pa_buffer_attr
+set_buffering_attribute(unsigned int latency, pa_sample_spec * sample_spec)
+{
+ pa_buffer_attr battr;
+ battr.maxlength = -1;
+ battr.prebuf = -1;
+ battr.tlength = WRAP(pa_usec_to_bytes)(latency * PA_USEC_PER_MSEC, sample_spec);
+ battr.minreq = battr.tlength / 4;
+ battr.fragsize = battr.minreq;
+
+ LOG("Requested buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",
+ battr.maxlength, battr.tlength, battr.prebuf, battr.minreq, battr.fragsize);
+
+ return battr;
+}
+
static int
-pulse_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_name,
+pulse_stream_init(cubeb * context,
+ cubeb_stream ** stream,
+ char const * stream_name,
cubeb_devid input_device,
cubeb_stream_params * input_stream_params,
cubeb_devid output_device,
cubeb_stream_params * output_stream_params,
unsigned int latency,
- cubeb_data_callback data_callback, cubeb_state_callback state_callback,
+ cubeb_data_callback data_callback,
+ cubeb_state_callback state_callback,
void * user_ptr)
{
- pa_sample_spec ss;
cubeb_stream * stm;
- pa_operation * o;
pa_buffer_attr battr;
int r;
assert(context);
- assert(!input_stream_params && "not supported.");
- if (input_device || output_device) {
- /* Device selection not yet implemented. */
- return CUBEB_ERROR_DEVICE_UNAVAILABLE;
- }
-
- *stream = NULL;
-
- switch (output_stream_params->format) {
- case CUBEB_SAMPLE_S16LE:
- ss.format = PA_SAMPLE_S16LE;
- break;
- case CUBEB_SAMPLE_S16BE:
- ss.format = PA_SAMPLE_S16BE;
- break;
- case CUBEB_SAMPLE_FLOAT32LE:
- ss.format = PA_SAMPLE_FLOAT32LE;
- break;
- case CUBEB_SAMPLE_FLOAT32BE:
- ss.format = PA_SAMPLE_FLOAT32BE;
- break;
- default:
- return CUBEB_ERROR_INVALID_FORMAT;
- }
// If the connection failed for some reason, try to reconnect
if (context->error == 1 && pulse_context_init(context) != 0) {
return CUBEB_ERROR;
}
- ss.rate = output_stream_params->rate;
- ss.channels = output_stream_params->channels;
+ *stream = NULL;
stm = calloc(1, sizeof(*stm));
assert(stm);
stm->context = context;
-
stm->data_callback = data_callback;
stm->state_callback = state_callback;
stm->user_ptr = user_ptr;
-
- stm->sample_spec = ss;
stm->volume = PULSE_NO_GAIN;
- battr.maxlength = -1;
- battr.tlength = WRAP(pa_usec_to_bytes)(latency * PA_USEC_PER_MSEC, &stm->sample_spec);
- battr.prebuf = -1;
- battr.minreq = battr.tlength / 4;
- battr.fragsize = -1;
-
WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
- stm->stream = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL);
- if (!stm->stream) {
- pulse_stream_destroy(stm);
- return CUBEB_ERROR;
+ if (output_stream_params) {
+ r = create_pa_stream(stm, &stm->output_stream, output_stream_params, stream_name);
+ if (r != CUBEB_OK) {
+ WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
+ pulse_stream_destroy(stm);
+ return r;
+ }
+
+ stm->output_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->output_stream));
+
+ WRAP(pa_stream_set_state_callback)(stm->output_stream, stream_state_callback, stm);
+ WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_write_callback, stm);
+
+ battr = set_buffering_attribute(latency, &stm->output_sample_spec);
+ WRAP(pa_stream_connect_playback)(stm->output_stream,
+ output_device,
+ &battr,
+ PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
+ PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY,
+ NULL, NULL);
}
- WRAP(pa_stream_set_state_callback)(stm->stream, stream_state_callback, stm);
- WRAP(pa_stream_set_write_callback)(stm->stream, stream_request_callback, stm);
- WRAP(pa_stream_connect_playback)(stm->stream, NULL, &battr,
+
+ // Set up input stream
+ if (input_stream_params) {
+ r = create_pa_stream(stm, &stm->input_stream, input_stream_params, stream_name);
+ if (r != CUBEB_OK) {
+ WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
+ pulse_stream_destroy(stm);
+ return r;
+ }
+
+ stm->input_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->input_stream));
+
+ WRAP(pa_stream_set_state_callback)(stm->input_stream, stream_state_callback, stm);
+ WRAP(pa_stream_set_read_callback)(stm->input_stream, stream_read_callback, stm);
+
+ battr = set_buffering_attribute(latency, &stm->input_sample_spec);
+ WRAP(pa_stream_connect_record)(stm->input_stream,
+ input_device,
+ &battr,
PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
- PA_STREAM_START_CORKED,
- NULL, NULL);
+ PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY);
+ }
r = wait_until_stream_ready(stm);
if (r == 0) {
/* force a timing update now, otherwise timing info does not become valid
until some point after initialization has completed. */
- o = WRAP(pa_stream_update_timing_info)(stm->stream, stream_success_callback, stm);
- if (o) {
- r = operation_wait(stm->context, stm->stream, o);
- WRAP(pa_operation_unref)(o);
- }
+ r = stream_update_timing_info(stm);
}
+
WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
if (r != 0) {
@@ -583,6 +780,22 @@ pulse_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
return CUBEB_ERROR;
}
+#ifdef LOGGING_ENABLED
+ if (output_stream_params){
+ const pa_buffer_attr * output_att;
+ output_att = WRAP(pa_stream_get_buffer_attr)(stm->output_stream);
+ LOG("Output buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",output_att->maxlength, output_att->tlength,
+ output_att->prebuf, output_att->minreq, output_att->fragsize);
+ }
+
+ if (input_stream_params){
+ const pa_buffer_attr * input_att;
+ input_att = WRAP(pa_stream_get_buffer_attr)(stm->input_stream);
+ LOG("Input buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u\n",input_att->maxlength, input_att->tlength,
+ input_att->prebuf, input_att->minreq, input_att->fragsize);
+ }
+#endif
+
*stream = stm;
return CUBEB_OK;
@@ -591,21 +804,27 @@ pulse_stream_init(cubeb * context, cubeb_stream ** stream, char const * stream_n
static void
pulse_stream_destroy(cubeb_stream * stm)
{
- if (stm->stream) {
- stream_cork(stm, CORK);
+ stream_cork(stm, CORK);
- WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
+ WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
+ if (stm->output_stream) {
if (stm->drain_timer) {
/* there's no pa_rttime_free, so use this instead. */
WRAP(pa_threaded_mainloop_get_api)(stm->context->mainloop)->time_free(stm->drain_timer);
}
- WRAP(pa_stream_set_state_callback)(stm->stream, NULL, NULL);
- WRAP(pa_stream_disconnect)(stm->stream);
- WRAP(pa_stream_unref)(stm->stream);
- WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
+ WRAP(pa_stream_set_state_callback)(stm->output_stream, NULL, NULL);
+ WRAP(pa_stream_disconnect)(stm->output_stream);
+ WRAP(pa_stream_unref)(stm->output_stream);
+ }
+
+ if (stm->input_stream) {
+ WRAP(pa_stream_set_state_callback)(stm->input_stream, NULL, NULL);
+ WRAP(pa_stream_disconnect)(stm->input_stream);
+ WRAP(pa_stream_unref)(stm->input_stream);
}
+ WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
free(stm);
}
@@ -631,12 +850,16 @@ pulse_stream_get_position(cubeb_stream * stm, uint64_t * position)
pa_usec_t r_usec;
uint64_t bytes;
+ if (!stm || !stm->output_stream) {
+ return CUBEB_ERROR;
+ }
+
in_thread = WRAP(pa_threaded_mainloop_in_thread)(stm->context->mainloop);
if (!in_thread) {
WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
}
- r = WRAP(pa_stream_get_time)(stm->stream, &r_usec);
+ r = WRAP(pa_stream_get_time)(stm->output_stream, &r_usec);
if (!in_thread) {
WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
}
@@ -645,8 +868,8 @@ pulse_stream_get_position(cubeb_stream * stm, uint64_t * position)
return CUBEB_ERROR;
}
- bytes = WRAP(pa_usec_to_bytes)(r_usec, &stm->sample_spec);
- *position = bytes / WRAP(pa_frame_size)(&stm->sample_spec);
+ bytes = WRAP(pa_usec_to_bytes)(r_usec, &stm->output_sample_spec);
+ *position = bytes / WRAP(pa_frame_size)(&stm->output_sample_spec);
return CUBEB_OK;
}
@@ -657,17 +880,17 @@ pulse_stream_get_latency(cubeb_stream * stm, uint32_t * latency)
pa_usec_t r_usec;
int negative, r;
- if (!stm) {
+ if (!stm || !stm->output_stream) {
return CUBEB_ERROR;
}
- r = WRAP(pa_stream_get_latency)(stm->stream, &r_usec, &negative);
+ r = WRAP(pa_stream_get_latency)(stm->output_stream, &r_usec, &negative);
assert(!negative);
if (r) {
return CUBEB_ERROR;
}
- *latency = r_usec * stm->sample_spec.rate / PA_USEC_PER_SEC;
+ *latency = r_usec * stm->output_sample_spec.rate / PA_USEC_PER_SEC;
return CUBEB_OK;
}
@@ -687,6 +910,10 @@ pulse_stream_set_volume(cubeb_stream * stm, float volume)
pa_cvolume cvol;
const pa_sample_spec * ss;
+ if (!stm->output_stream) {
+ return CUBEB_ERROR;
+ }
+
WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
while (!stm->context->default_sink_info) {
@@ -698,18 +925,18 @@ pulse_stream_set_volume(cubeb_stream * stm, float volume)
if (stm->context->default_sink_info->flags & PA_SINK_FLAT_VOLUME) {
stm->volume = volume;
} else {
- ss = WRAP(pa_stream_get_sample_spec)(stm->stream);
+ ss = WRAP(pa_stream_get_sample_spec)(stm->output_stream);
vol = WRAP(pa_sw_volume_from_linear)(volume);
WRAP(pa_cvolume_set)(&cvol, ss->channels, vol);
- index = WRAP(pa_stream_get_index)(stm->stream);
+ index = WRAP(pa_stream_get_index)(stm->output_stream);
op = WRAP(pa_context_set_sink_input_volume)(stm->context->context,
index, &cvol, volume_success,
stm);
if (op) {
- operation_wait(stm->context, stm->stream, op);
+ operation_wait(stm->context, stm->output_stream, op);
WRAP(pa_operation_unref)(op);
}
}
@@ -725,7 +952,11 @@ pulse_stream_set_panning(cubeb_stream * stream, float panning)
const pa_channel_map * map;
pa_cvolume vol;
- map = WRAP(pa_stream_get_channel_map)(stream->stream);
+ if (!stream->output_stream) {
+ return CUBEB_ERROR;
+ }
+
+ map = WRAP(pa_stream_get_channel_map)(stream->output_stream);
if (!WRAP(pa_channel_map_can_balance)(map)) {
return CUBEB_ERROR;
@@ -935,11 +1166,13 @@ pulse_enumerate_devices(cubeb * context, cubeb_device_type type,
}
*collection = malloc(sizeof(cubeb_device_collection) +
- sizeof(cubeb_device_info*) * (user_data.count > 0 ? user_data.count - 1 : 0));
+ sizeof(cubeb_device_info *) * (user_data.count > 0 ? user_data.count - 1 : 0));
(*collection)->count = user_data.count;
for (i = 0; i < user_data.count; i++)
(*collection)->device[i] = user_data.devinfo[i];
+ free(user_data.default_sink_name);
+ free(user_data.default_source_name);
free(user_data.devinfo);
return CUBEB_OK;
}