diff --git a/UI/obs-app.cpp b/UI/obs-app.cpp index 496583fd1..f9d74edd8 100644 --- a/UI/obs-app.cpp +++ b/UI/obs-app.cpp @@ -2689,6 +2689,14 @@ int main(int argc, char *argv[]) SetErrorMode(SEM_FAILCRITICALERRORS); load_debug_privilege(); base_set_crash_handler(main_crash_handler, nullptr); + + const HMODULE hRtwq = LoadLibrary(L"RTWorkQ.dll"); + if (hRtwq) { + typedef HRESULT(STDAPICALLTYPE * PFN_RtwqStartup)(); + PFN_RtwqStartup func = + (PFN_RtwqStartup)GetProcAddress(hRtwq, "RtwqStartup"); + func(); + } #endif base_get_log_handler(&def_log_handler, nullptr); @@ -2828,6 +2836,16 @@ int main(int argc, char *argv[]) curl_global_init(CURL_GLOBAL_ALL); int ret = run_program(logFile, argc, argv); +#ifdef _WIN32 + if (hRtwq) { + typedef HRESULT(STDAPICALLTYPE * PFN_RtwqShutdown)(); + PFN_RtwqShutdown func = + (PFN_RtwqShutdown)GetProcAddress(hRtwq, "RtwqShutdown"); + func(); + FreeLibrary(hRtwq); + } +#endif + blog(LOG_INFO, "Number of memory leaks: %ld", bnum_allocs()); base_set_log_handler(nullptr, nullptr); return ret; diff --git a/plugins/win-wasapi/CMakeLists.txt b/plugins/win-wasapi/CMakeLists.txt index 387efb608..4a5542960 100644 --- a/plugins/win-wasapi/CMakeLists.txt +++ b/plugins/win-wasapi/CMakeLists.txt @@ -15,6 +15,7 @@ add_library(win-wasapi MODULE ${win-wasapi_SOURCES} ${win-wasapi_HEADERS}) target_link_libraries(win-wasapi + Avrt libobs) set_target_properties(win-wasapi PROPERTIES FOLDER "plugins") diff --git a/plugins/win-wasapi/enum-wasapi.cpp b/plugins/win-wasapi/enum-wasapi.cpp index 8b3a062cb..d65f3a2aa 100644 --- a/plugins/win-wasapi/enum-wasapi.cpp +++ b/plugins/win-wasapi/enum-wasapi.cpp @@ -36,7 +36,7 @@ string GetDeviceName(IMMDevice *device) return device_name; } -void GetWASAPIAudioDevices_(vector &devices, bool input) +static void GetWASAPIAudioDevices_(vector &devices, bool input) { ComPtr enumerator; ComPtr collection; diff --git a/plugins/win-wasapi/win-wasapi.cpp b/plugins/win-wasapi/win-wasapi.cpp index 0dfd9cdd5..6dfa9996f 100644 --- a/plugins/win-wasapi/win-wasapi.cpp +++ b/plugins/win-wasapi/win-wasapi.cpp @@ -10,7 +10,11 @@ #include #include -#include +#include +#include + +#include +#include using namespace std; @@ -22,33 +26,147 @@ static void GetWASAPIDefaults(obs_data_t *settings); #define OBS_KSAUDIO_SPEAKER_4POINT1 \ (KSAUDIO_SPEAKER_SURROUND | SPEAKER_LOW_FREQUENCY) +typedef HRESULT(STDAPICALLTYPE *PFN_RtwqUnlockWorkQueue)(DWORD); +typedef HRESULT(STDAPICALLTYPE *PFN_RtwqLockSharedWorkQueue)(PCWSTR usageClass, + LONG basePriority, + DWORD *taskId, + DWORD *id); +typedef HRESULT(STDAPICALLTYPE *PFN_RtwqCreateAsyncResult)(IUnknown *, + IRtwqAsyncCallback *, + IUnknown *, + IRtwqAsyncResult **); +typedef HRESULT(STDAPICALLTYPE *PFN_RtwqPutWorkItem)(DWORD, LONG, + IRtwqAsyncResult *); +typedef HRESULT(STDAPICALLTYPE *PFN_RtwqPutWaitingWorkItem)(HANDLE, LONG, + IRtwqAsyncResult *, + RTWQWORKITEM_KEY *); + +class ARtwqAsyncCallback : public IRtwqAsyncCallback { +protected: + ARtwqAsyncCallback(void *source) : source(source) {} + +public: + STDMETHOD_(ULONG, AddRef)() { return ++refCount; } + + STDMETHOD_(ULONG, Release)() { return --refCount; } + + STDMETHOD(QueryInterface)(REFIID riid, void **ppvObject) + { + HRESULT hr = E_NOINTERFACE; + + if (riid == __uuidof(IRtwqAsyncCallback) || + riid == __uuidof(IUnknown)) { + *ppvObject = this; + AddRef(); + hr = S_OK; + } else { + *ppvObject = NULL; + } + + return hr; + } + + STDMETHOD(GetParameters) + (DWORD *pdwFlags, DWORD *pdwQueue) + { + *pdwFlags = 0; + *pdwQueue = queue_id; + return S_OK; + } + + STDMETHOD(Invoke) + (IRtwqAsyncResult *) override = 0; + + DWORD GetQueueId() const { return queue_id; } + void SetQueueId(DWORD id) { queue_id = id; } + +protected: + std::atomic refCount = 1; + void *source; + DWORD queue_id = 0; +}; + class WASAPISource { - ComPtr device; + ComPtr notify; + ComPtr enumerator; ComPtr client; ComPtr capture; - ComPtr render; - ComPtr enumerator; - ComPtr notify; obs_source_t *source; wstring default_id; string device_id; string device_name; - string device_sample = "-"; - uint64_t lastNotifyTime = 0; - bool isInputDevice; - bool useDeviceTiming = false; - bool isDefaultDevice = false; + PFN_RtwqUnlockWorkQueue rtwq_unlock_work_queue = NULL; + PFN_RtwqLockSharedWorkQueue rtwq_lock_shared_work_queue = NULL; + PFN_RtwqCreateAsyncResult rtwq_create_async_result = NULL; + PFN_RtwqPutWorkItem rtwq_put_work_item = NULL; + PFN_RtwqPutWaitingWorkItem rtwq_put_waiting_work_item = NULL; + bool rtwq_supported = false; + const bool isInputDevice; + std::atomic useDeviceTiming = false; + std::atomic isDefaultDevice = false; - bool reconnecting = false; bool previouslyFailed = false; WinHandle reconnectThread; - bool active = false; - WinHandle captureThread; + class CallbackStartCapture : public ARtwqAsyncCallback { + public: + CallbackStartCapture(WASAPISource *source) + : ARtwqAsyncCallback(source) + { + } + STDMETHOD(Invoke) + (IRtwqAsyncResult *) override + { + ((WASAPISource *)source)->OnStartCapture(); + return S_OK; + } + + } startCapture; + ComPtr startCaptureAsyncResult; + + class CallbackSampleReady : public ARtwqAsyncCallback { + public: + CallbackSampleReady(WASAPISource *source) + : ARtwqAsyncCallback(source) + { + } + + STDMETHOD(Invoke) + (IRtwqAsyncResult *) override + { + ((WASAPISource *)source)->OnSampleReady(); + return S_OK; + } + } sampleReady; + ComPtr sampleReadyAsyncResult; + + class CallbackRestart : public ARtwqAsyncCallback { + public: + CallbackRestart(WASAPISource *source) + : ARtwqAsyncCallback(source) + { + } + + STDMETHOD(Invoke) + (IRtwqAsyncResult *) override + { + ((WASAPISource *)source)->OnRestart(); + return S_OK; + } + } restart; + ComPtr restartAsyncResult; + + WinHandle captureThread; + WinHandle idleSignal; WinHandle stopSignal; WinHandle receiveSignal; + WinHandle restartSignal; + WinHandle exitSignal; + WinHandle initSignal; + DWORD reconnectDuration = 0; + WinHandle reconnectSignal; speaker_layout speakers; audio_format format; @@ -59,16 +177,24 @@ class WASAPISource { bool ProcessCaptureData(); - inline void Start(); - inline void Stop(); - void Reconnect(); + void Start(); + void Stop(); - bool InitDevice(); - void InitName(); - void InitClient(); - void InitRender(); - void InitFormat(WAVEFORMATEX *wfex); - void InitCapture(); + static ComPtr InitDevice(IMMDeviceEnumerator *enumerator, + bool isDefaultDevice, + bool isInputDevice, + const string device_id); + static ComPtr InitClient(IMMDevice *device, + bool isInputDevice, + enum speaker_layout &speakers, + enum audio_format &format, + uint32_t &sampleRate); + static void InitFormat(const WAVEFORMATEX *wfex, + enum speaker_layout &speakers, + enum audio_format &format, uint32_t &sampleRate); + static void ClearBuffer(IMMDevice *device); + static ComPtr InitCapture(IAudioClient *client, + HANDLE receiveSignal); void Initialize(); bool TryInitialize(); @@ -77,11 +203,15 @@ class WASAPISource { public: WASAPISource(obs_data_t *settings, obs_source_t *source_, bool input); - inline ~WASAPISource(); + ~WASAPISource(); void Update(obs_data_t *settings); void SetDefaultDevice(EDataFlow flow, ERole role, LPCWSTR id); + + void OnStartCapture(); + void OnSampleReady(); + void OnRestart(); }; class WASAPINotify : public IMMNotificationClient { @@ -137,10 +267,18 @@ public: WASAPISource::WASAPISource(obs_data_t *settings, obs_source_t *source_, bool input) - : source(source_), isInputDevice(input) + : source(source_), + isInputDevice(input), + startCapture(this), + sampleReady(this), + restart(this) { UpdateSettings(settings); + idleSignal = CreateEvent(nullptr, true, false, nullptr); + if (!idleSignal.Valid()) + throw "Could not create idle signal"; + stopSignal = CreateEvent(nullptr, true, false, nullptr); if (!stopSignal.Valid()) throw "Could not create stop signal"; @@ -149,37 +287,145 @@ WASAPISource::WASAPISource(obs_data_t *settings, obs_source_t *source_, if (!receiveSignal.Valid()) throw "Could not create receive signal"; + restartSignal = CreateEvent(nullptr, true, false, nullptr); + if (!restartSignal.Valid()) + throw "Could not create restart signal"; + + exitSignal = CreateEvent(nullptr, true, false, nullptr); + if (!exitSignal.Valid()) + throw "Could not create exit signal"; + + initSignal = CreateEvent(nullptr, false, false, nullptr); + if (!initSignal.Valid()) + throw "Could not create init signal"; + + reconnectSignal = CreateEvent(nullptr, false, false, nullptr); + if (!reconnectSignal.Valid()) + throw "Could not create reconnect signal"; + + reconnectThread = CreateThread( + nullptr, 0, WASAPISource::ReconnectThread, this, 0, nullptr); + if (!reconnectThread.Valid()) + throw "Failed to create reconnect thread"; + + notify = new WASAPINotify(this); + if (!notify) + throw "Could not create WASAPINotify"; + + HRESULT hr = CoCreateInstance(__uuidof(MMDeviceEnumerator), nullptr, + CLSCTX_ALL, + IID_PPV_ARGS(enumerator.Assign())); + if (FAILED(hr)) + throw HRError("Failed to create enumerator", hr); + + hr = enumerator->RegisterEndpointNotificationCallback(notify); + if (FAILED(hr)) + throw HRError("Failed to register endpoint callback", hr); + + /* OBS will already load DLL on startup if it exists */ + const HMODULE rtwq_module = GetModuleHandle(L"RTWorkQ.dll"); + rtwq_supported = rtwq_module != NULL; + if (rtwq_supported) { + rtwq_unlock_work_queue = + (PFN_RtwqUnlockWorkQueue)GetProcAddress( + rtwq_module, "RtwqUnlockWorkQueue"); + rtwq_lock_shared_work_queue = + (PFN_RtwqLockSharedWorkQueue)GetProcAddress( + rtwq_module, "RtwqLockSharedWorkQueue"); + rtwq_create_async_result = + (PFN_RtwqCreateAsyncResult)GetProcAddress( + rtwq_module, "RtwqCreateAsyncResult"); + rtwq_put_work_item = (PFN_RtwqPutWorkItem)GetProcAddress( + rtwq_module, "RtwqPutWorkItem"); + rtwq_put_waiting_work_item = + (PFN_RtwqPutWaitingWorkItem)GetProcAddress( + rtwq_module, "RtwqPutWaitingWorkItem"); + + hr = rtwq_create_async_result(nullptr, &startCapture, nullptr, + &startCaptureAsyncResult); + if (FAILED(hr)) { + enumerator->UnregisterEndpointNotificationCallback( + notify); + throw HRError( + "Could not create startCaptureAsyncResult", hr); + } + + hr = rtwq_create_async_result(nullptr, &sampleReady, nullptr, + &sampleReadyAsyncResult); + if (FAILED(hr)) { + enumerator->UnregisterEndpointNotificationCallback( + notify); + throw HRError("Could not create sampleReadyAsyncResult", + hr); + } + + hr = rtwq_create_async_result(nullptr, &restart, nullptr, + &restartAsyncResult); + if (FAILED(hr)) { + enumerator->UnregisterEndpointNotificationCallback( + notify); + throw HRError("Could not create restartAsyncResult", + hr); + } + + DWORD taskId = 0; + DWORD id = 0; + hr = rtwq_lock_shared_work_queue(L"Capture", 0, &taskId, &id); + if (FAILED(hr)) { + enumerator->UnregisterEndpointNotificationCallback( + notify); + throw HRError("RtwqLockSharedWorkQueue failed", hr); + } + + startCapture.SetQueueId(id); + sampleReady.SetQueueId(id); + restart.SetQueueId(id); + } else { + captureThread = CreateThread(nullptr, 0, + WASAPISource::CaptureThread, this, + 0, nullptr); + if (!captureThread.Valid()) { + enumerator->UnregisterEndpointNotificationCallback( + notify); + throw "Failed to create capture thread"; + } + } + Start(); } -inline void WASAPISource::Start() +void WASAPISource::Start() { - if (!TryInitialize()) { - blog(LOG_INFO, - "[WASAPISource::WASAPISource] " - "Device '%s' not found. Waiting for device", - device_id.c_str()); - Reconnect(); + if (rtwq_supported) { + rtwq_put_work_item(startCapture.GetQueueId(), 0, + startCaptureAsyncResult); + } else { + SetEvent(initSignal); } } -inline void WASAPISource::Stop() +void WASAPISource::Stop() { SetEvent(stopSignal); - if (active) { - blog(LOG_INFO, "WASAPI: Device '%s' Terminated", - device_name.c_str()); + blog(LOG_INFO, "WASAPI: Device '%s' Terminated", device_name.c_str()); + + if (rtwq_supported) + SetEvent(receiveSignal); + + WaitForSingleObject(idleSignal, INFINITE); + + SetEvent(exitSignal); + + WaitForSingleObject(reconnectThread, INFINITE); + + if (rtwq_supported) + rtwq_unlock_work_queue(sampleReady.GetQueueId()); + else WaitForSingleObject(captureThread, INFINITE); - } - - if (reconnecting) - WaitForSingleObject(reconnectThread, INFINITE); - - ResetEvent(stopSignal); } -inline WASAPISource::~WASAPISource() +WASAPISource::~WASAPISource() { enumerator->UnregisterEndpointNotificationCallback(notify); Stop(); @@ -190,79 +436,92 @@ void WASAPISource::UpdateSettings(obs_data_t *settings) device_id = obs_data_get_string(settings, OPT_DEVICE_ID); useDeviceTiming = obs_data_get_bool(settings, OPT_USE_DEVICE_TIMING); isDefaultDevice = _strcmpi(device_id.c_str(), "default") == 0; + + blog(LOG_INFO, + "[win-wasapi: '%s'] update settings:\n" + "\tdevice id: %s\n" + "\tuse device timing: %d", + obs_source_get_name(source), device_id.c_str(), + (int)useDeviceTiming); } void WASAPISource::Update(obs_data_t *settings) { - string newDevice = obs_data_get_string(settings, OPT_DEVICE_ID); - bool restart = newDevice.compare(device_id) != 0; - - if (restart) - Stop(); + const string newDevice = obs_data_get_string(settings, OPT_DEVICE_ID); + const bool restart = newDevice.compare(device_id) != 0; UpdateSettings(settings); if (restart) - Start(); + SetEvent(restartSignal); } -bool WASAPISource::InitDevice() +ComPtr WASAPISource::InitDevice(IMMDeviceEnumerator *enumerator, + bool isDefaultDevice, + bool isInputDevice, + const string device_id) { - HRESULT res; + ComPtr device; if (isDefaultDevice) { - res = enumerator->GetDefaultAudioEndpoint( + HRESULT res = enumerator->GetDefaultAudioEndpoint( isInputDevice ? eCapture : eRender, isInputDevice ? eCommunications : eConsole, device.Assign()); if (FAILED(res)) - return false; - - CoTaskMemPtr id; - res = device->GetId(&id); - default_id = id; - + throw HRError("Failed GetDefaultAudioEndpoint", res); } else { wchar_t *w_id; os_utf8_to_wcs_ptr(device_id.c_str(), device_id.size(), &w_id); + if (!w_id) + throw "Failed to widen device id string"; - res = enumerator->GetDevice(w_id, device.Assign()); + const HRESULT res = + enumerator->GetDevice(w_id, device.Assign()); bfree(w_id); + + if (FAILED(res)) + throw HRError("Failed to enumerate device", res); } - return SUCCEEDED(res); + return device; } #define BUFFER_TIME_100NS (5 * 10000000) -void WASAPISource::InitClient() +ComPtr WASAPISource::InitClient(IMMDevice *device, + bool isInputDevice, + enum speaker_layout &speakers, + enum audio_format &format, + uint32_t &sampleRate) { - CoTaskMemPtr wfex; - HRESULT res; - DWORD flags = AUDCLNT_STREAMFLAGS_EVENTCALLBACK; - - res = device->Activate(__uuidof(IAudioClient), CLSCTX_ALL, nullptr, - (void **)client.Assign()); + ComPtr client; + HRESULT res = device->Activate(__uuidof(IAudioClient), CLSCTX_ALL, + nullptr, (void **)client.Assign()); if (FAILED(res)) throw HRError("Failed to activate client context", res); + CoTaskMemPtr wfex; res = client->GetMixFormat(&wfex); if (FAILED(res)) throw HRError("Failed to get mix format", res); - InitFormat(wfex); + InitFormat(wfex, speakers, format, sampleRate); + DWORD flags = AUDCLNT_STREAMFLAGS_EVENTCALLBACK; if (!isInputDevice) flags |= AUDCLNT_STREAMFLAGS_LOOPBACK; res = client->Initialize(AUDCLNT_SHAREMODE_SHARED, flags, BUFFER_TIME_100NS, 0, wfex, nullptr); if (FAILED(res)) - throw HRError("Failed to get initialize audio client", res); + throw HRError("Failed to initialize audio client", res); + + return client; } -void WASAPISource::InitRender() +void WASAPISource::ClearBuffer(IMMDevice *device) { CoTaskMemPtr wfex; HRESULT res; @@ -282,7 +541,7 @@ void WASAPISource::InitRender() res = client->Initialize(AUDCLNT_SHAREMODE_SHARED, 0, BUFFER_TIME_100NS, 0, wfex, nullptr); if (FAILED(res)) - throw HRError("Failed to get initialize audio client", res); + throw HRError("Failed to initialize audio client", res); /* Silent loopback fix. Prevents audio stream from stopping and */ /* messing up timestamps and other weird glitches during silence */ @@ -292,8 +551,8 @@ void WASAPISource::InitRender() if (FAILED(res)) throw HRError("Failed to get buffer size", res); - res = client->GetService(__uuidof(IAudioRenderClient), - (void **)render.Assign()); + ComPtr render; + res = client->GetService(IID_PPV_ARGS(render.Assign())); if (FAILED(res)) throw HRError("Failed to get render client", res); @@ -301,7 +560,7 @@ void WASAPISource::InitRender() if (FAILED(res)) throw HRError("Failed to get buffer", res); - memset(buffer, 0, frames * wfex->nBlockAlign); + memset(buffer, 0, (size_t)frames * (size_t)wfex->nBlockAlign); render->ReleaseBuffer(frames, 0); } @@ -324,7 +583,9 @@ static speaker_layout ConvertSpeakerLayout(DWORD layout, WORD channels) return (speaker_layout)channels; } -void WASAPISource::InitFormat(WAVEFORMATEX *wfex) +void WASAPISource::InitFormat(const WAVEFORMATEX *wfex, + enum speaker_layout &speakers, + enum audio_format &format, uint32_t &sampleRate) { DWORD layout = 0; @@ -334,15 +595,16 @@ void WASAPISource::InitFormat(WAVEFORMATEX *wfex) } /* WASAPI is always float */ - sampleRate = wfex->nSamplesPerSec; - format = AUDIO_FORMAT_FLOAT; speakers = ConvertSpeakerLayout(layout, wfex->nChannels); + format = AUDIO_FORMAT_FLOAT; + sampleRate = wfex->nSamplesPerSec; } -void WASAPISource::InitCapture() +ComPtr WASAPISource::InitCapture(IAudioClient *client, + HANDLE receiveSignal) { - HRESULT res = client->GetService(__uuidof(IAudioCaptureClient), - (void **)capture.Assign()); + ComPtr capture; + HRESULT res = client->GetService(IID_PPV_ARGS(capture.Assign())); if (FAILED(res)) throw HRError("Failed to create capture context", res); @@ -350,137 +612,112 @@ void WASAPISource::InitCapture() if (FAILED(res)) throw HRError("Failed to set event handle", res); - captureThread = CreateThread(nullptr, 0, WASAPISource::CaptureThread, - this, 0, nullptr); - if (!captureThread.Valid()) - throw "Failed to create capture thread"; + res = client->Start(); + if (FAILED(res)) + throw HRError("Failed to start capture client", res); - client->Start(); - active = true; - - blog(LOG_INFO, "WASAPI: Device '%s' [%s Hz] initialized", - device_name.c_str(), device_sample.c_str()); + return capture; } void WASAPISource::Initialize() { - HRESULT res; - - res = CoCreateInstance(__uuidof(MMDeviceEnumerator), nullptr, - CLSCTX_ALL, __uuidof(IMMDeviceEnumerator), - (void **)enumerator.Assign()); - if (FAILED(res)) - throw HRError("Failed to create enumerator", res); - - if (!InitDevice()) - return; + ComPtr device = InitDevice(enumerator, isDefaultDevice, + isInputDevice, device_id); device_name = GetDeviceName(device); - if (!notify) { - notify = new WASAPINotify(this); - enumerator->RegisterEndpointNotificationCallback(notify); - } + ResetEvent(receiveSignal); - HRESULT resSample; - IPropertyStore *store = nullptr; - PWAVEFORMATEX deviceFormatProperties; - PROPVARIANT prop; - resSample = device->OpenPropertyStore(STGM_READ, &store); - if (!FAILED(resSample)) { - resSample = - store->GetValue(PKEY_AudioEngine_DeviceFormat, &prop); - if (!FAILED(resSample)) { - if (prop.vt != VT_EMPTY && prop.blob.pBlobData) { - deviceFormatProperties = - (PWAVEFORMATEX)prop.blob.pBlobData; - device_sample = std::to_string( - deviceFormatProperties->nSamplesPerSec); - } + ComPtr temp_client = + InitClient(device, isInputDevice, speakers, format, sampleRate); + if (!isInputDevice) + ClearBuffer(device); + ComPtr temp_capture = + InitCapture(temp_client, receiveSignal); + + client = std::move(temp_client); + capture = std::move(temp_capture); + + if (rtwq_supported) { + HRESULT hr = rtwq_put_waiting_work_item( + receiveSignal, 0, sampleReadyAsyncResult, nullptr); + if (FAILED(hr)) { + capture.Clear(); + client.Clear(); + throw HRError("RtwqPutWaitingWorkItem failed", hr); } - store->Release(); + hr = rtwq_put_waiting_work_item(restartSignal, 0, + restartAsyncResult, nullptr); + if (FAILED(hr)) { + capture.Clear(); + client.Clear(); + throw HRError("RtwqPutWaitingWorkItem failed", hr); + } } - InitClient(); - if (!isInputDevice) - InitRender(); - InitCapture(); + blog(LOG_INFO, "WASAPI: Device '%s' [%" PRIu32 " Hz] initialized", + device_name.c_str(), sampleRate); } bool WASAPISource::TryInitialize() { + bool success = false; try { Initialize(); + success = true; } catch (HRError &error) { - if (previouslyFailed) - return active; - - blog(LOG_WARNING, "[WASAPISource::TryInitialize]:[%s] %s: %lX", - device_name.empty() ? device_id.c_str() - : device_name.c_str(), - error.str, error.hr); - + if (!previouslyFailed) { + blog(LOG_WARNING, + "[WASAPISource::TryInitialize]:[%s] %s: %lX", + device_name.empty() ? device_id.c_str() + : device_name.c_str(), + error.str, error.hr); + } } catch (const char *error) { - if (previouslyFailed) - return active; - - blog(LOG_WARNING, "[WASAPISource::TryInitialize]:[%s] %s", - device_name.empty() ? device_id.c_str() - : device_name.c_str(), - error); + if (!previouslyFailed) { + blog(LOG_WARNING, + "[WASAPISource::TryInitialize]:[%s] %s", + device_name.empty() ? device_id.c_str() + : device_name.c_str(), + error); + } } - previouslyFailed = !active; - return active; + previouslyFailed = !success; + return success; } -void WASAPISource::Reconnect() -{ - reconnecting = true; - reconnectThread = CreateThread( - nullptr, 0, WASAPISource::ReconnectThread, this, 0, nullptr); - - if (!reconnectThread.Valid()) - blog(LOG_WARNING, - "[WASAPISource::Reconnect] " - "Failed to initialize reconnect thread: %lu", - GetLastError()); -} - -static inline bool WaitForSignal(HANDLE handle, DWORD time) -{ - return WaitForSingleObject(handle, time) != WAIT_TIMEOUT; -} - -#define RECONNECT_INTERVAL 3000 - DWORD WINAPI WASAPISource::ReconnectThread(LPVOID param) { - WASAPISource *source = (WASAPISource *)param; - os_set_thread_name("win-wasapi: reconnect thread"); - const HRESULT hr = CoInitializeEx(0, COINIT_MULTITHREADED); - const bool com_initialized = SUCCEEDED(hr); - if (!com_initialized) { - blog(LOG_ERROR, - "[WASAPISource::ReconnectThread]" - " CoInitializeEx failed: 0x%08X", - hr); - } + WASAPISource *source = (WASAPISource *)param; - while (!WaitForSignal(source->stopSignal, RECONNECT_INTERVAL)) { - if (source->TryInitialize()) + const HANDLE sigs[] = { + source->exitSignal, + source->reconnectSignal, + }; + + bool exit = false; + while (!exit) { + const DWORD ret = WaitForMultipleObjects(_countof(sigs), sigs, + false, INFINITE); + switch (ret) { + case WAIT_OBJECT_0: + exit = true; break; + default: + assert(ret == (WAIT_OBJECT_0 + 1)); + if (source->reconnectDuration > 0) { + WaitForSingleObject(source->stopSignal, + source->reconnectDuration); + } + source->Start(); + } } - if (com_initialized) - CoUninitialize(); - - source->reconnectThread = nullptr; - source->reconnecting = false; return 0; } @@ -495,11 +732,10 @@ bool WASAPISource::ProcessCaptureData() while (true) { res = capture->GetNextPacketSize(&captureSize); - if (FAILED(res)) { if (res != AUDCLNT_E_DEVICE_INVALIDATED) blog(LOG_WARNING, - "[WASAPISource::GetCaptureData]" + "[WASAPISource::ProcessCaptureData]" " capture->GetNextPacketSize" " failed: %lX", res); @@ -513,7 +749,7 @@ bool WASAPISource::ProcessCaptureData() if (FAILED(res)) { if (res != AUDCLNT_E_DEVICE_INVALIDATED) blog(LOG_WARNING, - "[WASAPISource::GetCaptureData]" + "[WASAPISource::ProcessCaptureData]" " capture->GetBuffer" " failed: %lX", res); @@ -540,44 +776,136 @@ bool WASAPISource::ProcessCaptureData() return true; } -static inline bool WaitForCaptureSignal(DWORD numSignals, const HANDLE *signals, - DWORD duration) -{ - DWORD ret; - ret = WaitForMultipleObjects(numSignals, signals, false, duration); - - return ret == WAIT_OBJECT_0 || ret == WAIT_TIMEOUT; -} +#define RECONNECT_INTERVAL 3000 DWORD WINAPI WASAPISource::CaptureThread(LPVOID param) { - WASAPISource *source = (WASAPISource *)param; - bool reconnect = false; - - /* Output devices don't signal, so just make it check every 10 ms */ - DWORD dur = source->isInputDevice ? RECONNECT_INTERVAL : 10; - - HANDLE sigs[2] = {source->receiveSignal, source->stopSignal}; - os_set_thread_name("win-wasapi: capture thread"); - while (WaitForCaptureSignal(2, sigs, dur)) { - if (!source->ProcessCaptureData()) { - reconnect = true; - break; + const HRESULT hr = CoInitializeEx(0, COINIT_MULTITHREADED); + const bool com_initialized = SUCCEEDED(hr); + if (!com_initialized) { + blog(LOG_ERROR, + "[WASAPISource::CaptureThread]" + " CoInitializeEx failed: 0x%08X", + hr); + } + + DWORD unused = 0; + const HANDLE handle = AvSetMmThreadCharacteristics(L"Audio", &unused); + + WASAPISource *source = (WASAPISource *)param; + + const HANDLE inactive_sigs[] = { + source->exitSignal, + source->stopSignal, + source->initSignal, + }; + + const HANDLE active_sigs[] = { + source->exitSignal, + source->stopSignal, + source->receiveSignal, + source->restartSignal, + }; + + DWORD sig_count = _countof(inactive_sigs); + const HANDLE *sigs = inactive_sigs; + + bool exit = false; + while (!exit) { + bool idle = false; + bool stop = false; + bool reconnect = false; + do { + /* Windows 7 does not seem to wake up for LOOPBACK */ + const DWORD dwMilliseconds = ((sigs == active_sigs) && + !source->isInputDevice) + ? 10 + : INFINITE; + + const DWORD ret = WaitForMultipleObjects( + sig_count, sigs, false, dwMilliseconds); + switch (ret) { + case WAIT_OBJECT_0: { + exit = true; + stop = true; + idle = true; + break; + } + + case WAIT_OBJECT_0 + 1: + stop = true; + idle = true; + break; + + case WAIT_OBJECT_0 + 2: + case WAIT_TIMEOUT: + if (sigs == inactive_sigs) { + assert(ret != WAIT_TIMEOUT); + + if (source->TryInitialize()) { + sig_count = + _countof(active_sigs); + sigs = active_sigs; + } else { + blog(LOG_INFO, + "WASAPI: Device '%s' failed to start", + source->device_id.c_str()); + stop = true; + reconnect = true; + source->reconnectDuration = + RECONNECT_INTERVAL; + } + } else { + stop = !source->ProcessCaptureData(); + if (stop) { + blog(LOG_INFO, + "Device '%s' invalidated. Retrying", + source->device_name + .c_str()); + stop = true; + reconnect = true; + source->reconnectDuration = + RECONNECT_INTERVAL; + } + } + break; + + default: + assert(sigs == active_sigs); + assert(ret == WAIT_OBJECT_0 + 3); + stop = true; + reconnect = true; + source->reconnectDuration = 0; + ResetEvent(source->restartSignal); + } + } while (!stop); + + sig_count = _countof(inactive_sigs); + sigs = inactive_sigs; + + if (source->client) { + source->client->Stop(); + + source->capture.Clear(); + source->client.Clear(); + } + + if (idle) { + SetEvent(source->idleSignal); + } else if (reconnect) { + blog(LOG_INFO, "Device '%s' invalidated. Retrying", + source->device_name.c_str()); + SetEvent(source->reconnectSignal); } } - source->client->Stop(); + if (handle) + AvRevertMmThreadCharacteristics(handle); - source->captureThread = nullptr; - source->active = false; - - if (reconnect) { - blog(LOG_INFO, "Device '%s' invalidated. Retrying", - source->device_name.c_str()); - source->Reconnect(); - } + if (com_initialized) + CoUninitialize(); return 0; } @@ -587,28 +915,104 @@ void WASAPISource::SetDefaultDevice(EDataFlow flow, ERole role, LPCWSTR id) if (!isDefaultDevice) return; - EDataFlow expectedFlow = isInputDevice ? eCapture : eRender; - ERole expectedRole = isInputDevice ? eCommunications : eConsole; - + const EDataFlow expectedFlow = isInputDevice ? eCapture : eRender; + const ERole expectedRole = isInputDevice ? eCommunications : eConsole; if (flow != expectedFlow || role != expectedRole) return; - if (id && default_id.compare(id) == 0) - return; + + if (id) { + if (default_id.compare(id) == 0) + return; + default_id = id; + } else { + if (default_id.empty()) + return; + default_id.clear(); + } blog(LOG_INFO, "WASAPI: Default %s device changed", isInputDevice ? "input" : "output"); - /* reset device only once every 300ms */ - uint64_t t = os_gettime_ns(); - if (t - lastNotifyTime < 300000000) - return; + SetEvent(restartSignal); +} - std::thread([this]() { - Stop(); - Start(); - }).detach(); +void WASAPISource::OnStartCapture() +{ + const DWORD ret = WaitForSingleObject(stopSignal, 0); + switch (ret) { + case WAIT_OBJECT_0: + SetEvent(idleSignal); + break; - lastNotifyTime = t; + default: + assert(ret == WAIT_TIMEOUT); + + if (!TryInitialize()) { + blog(LOG_INFO, "WASAPI: Device '%s' failed to start", + device_id.c_str()); + reconnectDuration = RECONNECT_INTERVAL; + SetEvent(reconnectSignal); + } + } +} + +void WASAPISource::OnSampleReady() +{ + bool stop = false; + bool reconnect = false; + + if (!ProcessCaptureData()) { + stop = true; + reconnect = true; + reconnectDuration = RECONNECT_INTERVAL; + } + + if (WaitForSingleObject(restartSignal, 0) == WAIT_OBJECT_0) { + stop = true; + reconnect = true; + reconnectDuration = 0; + + ResetEvent(restartSignal); + rtwq_put_waiting_work_item(restartSignal, 0, restartAsyncResult, + nullptr); + } + + if (WaitForSingleObject(stopSignal, 0) == WAIT_OBJECT_0) { + stop = true; + reconnect = false; + } + + if (!stop) { + if (FAILED(rtwq_put_waiting_work_item(receiveSignal, 0, + sampleReadyAsyncResult, + nullptr))) { + blog(LOG_ERROR, + "Could not requeue sample receive work"); + stop = true; + reconnect = true; + reconnectDuration = RECONNECT_INTERVAL; + } + } + + if (stop) { + client->Stop(); + + capture.Clear(); + client.Clear(); + + if (reconnect) { + blog(LOG_INFO, "Device '%s' invalidated. Retrying", + device_name.c_str()); + SetEvent(reconnectSignal); + } else { + SetEvent(idleSignal); + } + } +} + +void WASAPISource::OnRestart() +{ + SetEvent(receiveSignal); } /* ------------------------------------------------------------------------- */