diff options
author | Michael M <[email protected]> | 2023-05-02 12:06:28 +0200 |
---|---|---|
committer | Paul Adenot <[email protected]> | 2023-05-17 10:57:17 +0200 |
commit | febf49d0898d1c1691235639b0796b91df522394 (patch) | |
tree | f9040952df0563219c2e8dc70ab0699076ca1cf6 | |
parent | 1ba923736487443f5c7be09bdb6bb894f9ee9d06 (diff) | |
download | cubeb-febf49d0898d1c1691235639b0796b91df522394.tar.gz cubeb-febf49d0898d1c1691235639b0796b91df522394.zip |
wasapi: make render thread exist for the lifetime of the stream object
- Converts cubeb_stream into a reference-counted object, and gets rid of
emergency_bailout logic
- When reconfiguring, only restart a stream if it was already started
- Removes timeout inside the render thread, as we can't guarantee events
while a stream is stopped
Subsequently rebased by Paul Adenot <[email protected]>
-rw-r--r-- | src/cubeb_wasapi.cpp | 268 |
1 files changed, 137 insertions, 131 deletions
diff --git a/src/cubeb_wasapi.cpp b/src/cubeb_wasapi.cpp index 897c3f3..c28be38 100644 --- a/src/cubeb_wasapi.cpp +++ b/src/cubeb_wasapi.cpp @@ -110,7 +110,9 @@ struct com_heap_ptr_deleter { template <typename T> using com_heap_ptr = std::unique_ptr<T, com_heap_ptr_deleter>; -template <typename T, size_t N> constexpr size_t ARRAY_LENGTH(T (&)[N]) +template <typename T, size_t N> +constexpr size_t +ARRAY_LENGTH(T (&)[N]) { return N; } @@ -188,6 +190,20 @@ private: T * ptr = nullptr; }; +LONG +wasapi_stream_add_ref(cubeb_stream * stm); +LONG +wasapi_stream_release(cubeb_stream * stm); + +struct auto_stream_ref { + auto_stream_ref(cubeb_stream * stm_) : stm(stm_) + { + wasapi_stream_add_ref(stm); + } + ~auto_stream_ref() { wasapi_stream_release(stm); } + cubeb_stream * stm; +}; + extern cubeb_ops const wasapi_ops; static com_heap_ptr<wchar_t> @@ -377,8 +393,8 @@ struct cubeb_stream { com_ptr<IAudioClient> input_client; /* Interface to use the event driven capture interface */ com_ptr<IAudioCaptureClient> capture_client; - /* This event is set by the stream_stop and stream_destroy - function, so the render loop can exit properly. */ + /* This event is set by the stream_destroy function, so the render loop can + exit properly. */ HANDLE shutdown_event = 0; /* Set by OnDefaultDeviceChanged when a stream reconfiguration is required. The reconfiguration is handled by the render loop thread. */ @@ -422,17 +438,20 @@ struct cubeb_stream { float volume = 1.0; /* True if the stream is draining. */ bool draining = false; - /* If the render thread fails to stop, this is set to true and ownership of - * the stm is "leaked" to the render thread for later cleanup. */ - std::atomic<bool> emergency_bailout{false}; /* This needs an active audio input stream to be known, and is updated in the * first audio input callback. */ std::atomic<int64_t> input_latency_hns{LATENCY_NOT_AVAILABLE_YET}; - /* Those attributes count the number of frames requested (resp. received) by the OS, to be able to detect drifts. This is only used for logging for now. */ size_t total_input_frames = 0; size_t total_output_frames = 0; + /* This is set by the render loop thread once it has obtained a reference to + * COM and this stream object. */ + HANDLE thread_ready_event = 0; + /* Keep a ref count on this stream object. After both stream_destroy has been + * called and the render loop thread has exited, destroy this stream object. + */ + LONG ref_count = 0; }; class monitor_device_notifications { @@ -787,9 +806,6 @@ wasapi_data_callback(cubeb_stream * stm, void * user_ptr, void const * input_buffer, void * output_buffer, long nframes) { - if (stm->emergency_bailout) { - return CUBEB_ERROR; - } return stm->data_callback(stm, user_ptr, input_buffer, output_buffer, nframes); } @@ -797,9 +813,6 @@ wasapi_data_callback(cubeb_stream * stm, void * user_ptr, void wasapi_state_callback(cubeb_stream * stm, void * user_ptr, cubeb_state state) { - if (stm->emergency_bailout) { - return; - } return stm->state_callback(stm, user_ptr, state); } @@ -1353,31 +1366,12 @@ refill_callback_output(cubeb_stream * stm) void wasapi_stream_destroy(cubeb_stream * stm); -static void -handle_emergency_bailout(cubeb_stream * stm) -{ - if (stm->emergency_bailout) { - CloseHandle(stm->thread); - stm->thread = NULL; - CloseHandle(stm->shutdown_event); - stm->shutdown_event = 0; - wasapi_stream_destroy(stm); - _endthreadex(0); - } -} - static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) { AutoRegisterThread raii("cubeb rendering thread"); - cubeb_stream * stm = static_cast<cubeb_stream *>(stream); - bool is_playing = true; - HANDLE wait_array[4] = {stm->shutdown_event, stm->reconfigure_event, - stm->refill_event, stm->input_available_event}; - HANDLE mmcss_handle = NULL; - HRESULT hr = 0; - DWORD mmcss_task_index = 0; + auto_stream_ref stream_ref(stm); struct auto_com { auto_com() { @@ -1387,6 +1381,21 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) ~auto_com() { CoUninitialize(); } } com; + bool is_playing = true; + HANDLE wait_array[4] = {stm->shutdown_event, stm->reconfigure_event, + stm->refill_event, stm->input_available_event}; + HANDLE mmcss_handle = NULL; + HRESULT hr = 0; + DWORD mmcss_task_index = 0; + + // Signal wasapi_stream_start that we've initialized COM and incremented + // the stream's ref_count. + BOOL ok = SetEvent(stm->thread_ready_event); + if (!ok) { + LOG("thread_ready SetEvent failed: %lx", GetLastError()); + return 0; + } + /* We could consider using "Pro Audio" here for WebAudio and maybe WebRTC. */ mmcss_handle = AvSetMmThreadCharacteristicsA("Audio", &mmcss_task_index); @@ -1396,20 +1405,9 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) GetLastError()); } - /* WaitForMultipleObjects timeout can trigger in cases where we don't want to - treat it as a timeout, such as across a system sleep/wake cycle. Trigger - the timeout error handling only when the timeout_limit is reached, which is - reset on each successful loop. */ - unsigned timeout_count = 0; - const unsigned timeout_limit = 3; while (is_playing) { - handle_emergency_bailout(stm); DWORD waitResult = WaitForMultipleObjects(ARRAY_LENGTH(wait_array), - wait_array, FALSE, 1000); - handle_emergency_bailout(stm); - if (waitResult != WAIT_TIMEOUT) { - timeout_count = 0; - } + wait_array, FALSE, INFINITE); switch (waitResult) { case WAIT_OBJECT_0: { /* shutdown */ is_playing = false; @@ -1424,12 +1422,13 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) XASSERT(stm->output_client || stm->input_client); LOG("Reconfiguring the stream"); /* Close the stream */ + bool was_running = false; if (stm->output_client) { - stm->output_client->Stop(); + was_running = stm->output_client->Stop() == S_OK; LOG("Output stopped."); } if (stm->input_client) { - stm->input_client->Stop(); + was_running = stm->input_client->Stop() == S_OK; LOG("Input stopped."); } { @@ -1450,7 +1449,7 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) LOG("Stream setup successfuly."); } XASSERT(stm->output_client || stm->input_client); - if (stm->output_client) { + if (was_running && stm->output_client) { hr = stm->output_client->Start(); if (FAILED(hr)) { LOG("Error starting output after reconfigure, error: %lx", hr); @@ -1459,7 +1458,7 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) } LOG("Output started after reconfigure."); } - if (stm->input_client) { + if (was_running && stm->input_client) { hr = stm->input_client->Start(); if (FAILED(hr)) { LOG("Error starting input after reconfiguring, error: %lx", hr); @@ -1488,14 +1487,6 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) break; } - case WAIT_TIMEOUT: - XASSERT(stm->shutdown_event == wait_array[0]); - if (++timeout_count >= timeout_limit) { - LOG("Render loop reached the timeout limit."); - is_playing = false; - hr = E_FAIL; - } - break; default: LOG("case %lu not handled in render loop.", waitResult); XASSERT(false); @@ -1515,8 +1506,6 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream) AvRevertMmThreadCharacteristics(mmcss_handle); } - handle_emergency_bailout(stm); - if (FAILED(hr)) { wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); } @@ -1772,24 +1761,16 @@ namespace { enum ShutdownPhase { OnStop, OnDestroy }; bool -stop_and_join_render_thread(cubeb_stream * stm, ShutdownPhase phase) +stop_and_join_render_thread(cubeb_stream * stm) { - // Only safe to transfer `stm` ownership to the render thread when - // the stream is being destroyed by the caller. - bool bailout = phase == OnDestroy; - - LOG("%p: Stop and join render thread: %p (%d), phase=%d", stm, stm->thread, - stm->emergency_bailout.load(), static_cast<int>(phase)); + LOG("%p: Stop and join render thread: %p", stm, stm->thread); if (!stm->thread) { return true; } - XASSERT(!stm->emergency_bailout); - BOOL ok = SetEvent(stm->shutdown_event); if (!ok) { LOG("stop_and_join_render_thread: SetEvent failed: %lx", GetLastError()); - stm->emergency_bailout = bailout; return false; } @@ -1807,19 +1788,9 @@ stop_and_join_render_thread(cubeb_stream * stm, ShutdownPhase phase) LOG("stop_and_join_render_thread: WaitForSingleObject on thread failed: " "%lx, %lx", r, GetLastError()); - stm->emergency_bailout = bailout; return false; } - // Only attempt to close and null out the thread and event if the - // WaitForSingleObject above succeeded. - LOG("stop_and_join_render_thread: Closing thread."); - CloseHandle(stm->thread); - stm->thread = NULL; - - CloseHandle(stm->shutdown_event); - stm->shutdown_event = 0; - return true; } @@ -2725,8 +2696,8 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream, return CUBEB_ERROR_INVALID_FORMAT; } - std::unique_ptr<cubeb_stream, decltype(&wasapi_stream_destroy)> stm( - new cubeb_stream(), wasapi_stream_destroy); + cubeb_stream * stm = new cubeb_stream(); + auto_stream_ref stream_ref(stm); stm->context = context; stm->data_callback = data_callback; @@ -2798,12 +2769,24 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream, return CUBEB_ERROR; } + stm->shutdown_event = CreateEvent(NULL, 0, 0, NULL); + if (!stm->shutdown_event) { + LOG("Can't create the shutdown event, error: %lx", GetLastError()); + return CUBEB_ERROR; + } + + stm->thread_ready_event = CreateEvent(NULL, 0, 0, NULL); + if (!stm->thread_ready_event) { + LOG("Can't create the thread ready event, error: %lx", GetLastError()); + return CUBEB_ERROR; + } + { /* Locking here is not strictly necessary, because we don't have a notification client that can reset the stream yet, but it lets us assert that the lock is held in the function. */ auto_lock lock(stm->stream_reset_lock); - rv = setup_wasapi_stream(stm.get()); + rv = setup_wasapi_stream(stm); } if (rv != CUBEB_OK) { return rv; @@ -2818,7 +2801,7 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream, !(output_stream_params->prefs & CUBEB_STREAM_PREF_DISABLE_DEVICE_SWITCHING))) { LOG("Follow the system default input or/and output devices"); - HRESULT hr = register_notification_client(stm.get()); + HRESULT hr = register_notification_client(stm); if (FAILED(hr)) { /* this is not fatal, we can still play audio, but we won't be able to keep using the default audio endpoint if it changes. */ @@ -2826,7 +2809,24 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream, } } - *stream = stm.release(); + cubeb_async_log_reset_threads(); + stm->thread = + (HANDLE)_beginthreadex(NULL, 512 * 1024, wasapi_stream_render_loop, stm, + STACK_SIZE_PARAM_IS_A_RESERVATION, NULL); + if (stm->thread == NULL) { + LOG("could not create WASAPI render thread."); + return CUBEB_ERROR; + } + + // Wait for the wasapi_stream_render_loop thread to signal that COM has been + // initialized and the stream's ref_count has been incremented. + hr = WaitForSingleObject(stm->thread_ready_event, INFINITE); + XASSERT(hr == WAIT_OBJECT_0); + CloseHandle(stm->thread_ready_event); + stm->thread_ready_event = 0; + + wasapi_stream_add_ref(stm); + *stream = stm; LOG("Stream init successful (%p)", *stream); return CUBEB_OK; @@ -2866,32 +2866,59 @@ close_wasapi_stream(cubeb_stream * stm) } } -void -wasapi_stream_destroy(cubeb_stream * stm) +LONG +wasapi_stream_add_ref(cubeb_stream * stm) { XASSERT(stm); - LOG("Stream destroy (%p)", stm); + LONG result = InterlockedIncrement(&stm->ref_count); + LOGV("Stream ref count incremented = %i (%p)", result, stm); + return result; +} - if (!stop_and_join_render_thread(stm, OnDestroy)) { - // Emergency bailout: render thread becomes responsible for calling - // wasapi_stream_destroy. - return; - } +LONG +wasapi_stream_release(cubeb_stream * stm) +{ + XASSERT(stm); - if (stm->notification_client) { - unregister_notification_client(stm); - } + LONG result = InterlockedDecrement(&stm->ref_count); + LOGV("Stream ref count decremented = %i (%p)", result, stm); + if (result == 0) { + LOG("Stream ref count hit zero, destroying (%p)", stm); - { - auto_lock lock(stm->stream_reset_lock); - close_wasapi_stream(stm); + if (stm->notification_client) { + unregister_notification_client(stm); + } + + CloseHandle(stm->shutdown_event); + CloseHandle(stm->reconfigure_event); + CloseHandle(stm->refill_event); + CloseHandle(stm->input_available_event); + + CloseHandle(stm->thread); + + // The variables intialized in wasapi_stream_init, + // must be destroyed in wasapi_stream_release. + stm->linear_input_buffer.reset(); + + { + auto_lock lock(stm->stream_reset_lock); + close_wasapi_stream(stm); + } + + delete stm; } - CloseHandle(stm->reconfigure_event); - CloseHandle(stm->refill_event); - CloseHandle(stm->input_available_event); + return result; +} + +void +wasapi_stream_destroy(cubeb_stream * stm) +{ + XASSERT(stm); + LOG("Stream destroy called, decrementing ref count (%p)", stm); - delete stm; + stop_and_join_render_thread(stm); + wasapi_stream_release(stm); } enum StreamDirection { OUTPUT, INPUT }; @@ -2899,6 +2926,7 @@ enum StreamDirection { OUTPUT, INPUT }; int stream_start_one_side(cubeb_stream * stm, StreamDirection dir) { + XASSERT(stm); XASSERT((dir == OUTPUT && stm->output_client) || (dir == INPUT && stm->input_client)); @@ -2942,7 +2970,7 @@ wasapi_stream_start(cubeb_stream * stm) { auto_lock lock(stm->stream_reset_lock); - XASSERT(stm && !stm->thread && !stm->shutdown_event); + XASSERT(stm); XASSERT(stm->output_client || stm->input_client); if (stm->output_client) { @@ -2959,24 +2987,7 @@ wasapi_stream_start(cubeb_stream * stm) } } - stm->shutdown_event = CreateEvent(NULL, 0, 0, NULL); - if (!stm->shutdown_event) { - LOG("Can't create the shutdown event, error: %lx", GetLastError()); - return CUBEB_ERROR; - } - - cubeb_async_log_reset_threads(); - stm->thread = - (HANDLE)_beginthreadex(NULL, 512 * 1024, wasapi_stream_render_loop, stm, - STACK_SIZE_PARAM_IS_A_RESERVATION, NULL); - if (stm->thread == NULL) { - LOG("could not create WASAPI render thread."); - CloseHandle(stm->shutdown_event); - stm->shutdown_event = 0; - return CUBEB_ERROR; - } - - wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED); + stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED); return CUBEB_OK; } @@ -3009,12 +3020,6 @@ wasapi_stream_stop(cubeb_stream * stm) wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_STOPPED); } - if (!stop_and_join_render_thread(stm, OnStop)) { - // If we could not join the thread, put the stream in error. - wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR); - return CUBEB_ERROR; - } - return CUBEB_OK; } @@ -3147,8 +3152,9 @@ wstr_to_utf8(LPCWSTR str) return ret; } -static std::unique_ptr<wchar_t const []> -utf8_to_wstr(char const * str) { +static std::unique_ptr<wchar_t const[]> +utf8_to_wstr(char const * str) +{ int size = ::MultiByteToWideChar(CP_UTF8, 0, str, -1, nullptr, 0); if (size <= 0) { return nullptr; @@ -3159,8 +3165,8 @@ utf8_to_wstr(char const * str) { return ret; } -static com_ptr<IMMDevice> wasapi_get_device_node( - IMMDeviceEnumerator * enumerator, IMMDevice * dev) +static com_ptr<IMMDevice> +wasapi_get_device_node(IMMDeviceEnumerator * enumerator, IMMDevice * dev) { com_ptr<IMMDevice> ret; com_ptr<IDeviceTopology> devtopo; |