win-wasapi: Remove bools and persist threads

This simplifies synchronization, and fixes several races.
This commit is contained in:
jpark37 2021-09-24 21:17:12 -07:00
parent a504691af8
commit a995c305b7

View File

@ -10,8 +10,8 @@
#include <util/threading.h>
#include <util/util_uint64.h>
#include <atomic>
#include <cinttypes>
#include <thread>
using namespace std;
@ -35,18 +35,21 @@ class WASAPISource {
string device_name;
uint64_t lastNotifyTime = 0;
bool isInputDevice;
bool useDeviceTiming = false;
std::atomic<bool> useDeviceTiming = false;
bool isDefaultDevice = false;
bool reconnecting = false;
bool previouslyFailed = false;
WinHandle reconnectThread;
bool active = false;
WinHandle captureThread;
WinHandle idleSignal;
WinHandle stopSignal;
WinHandle receiveSignal;
WinHandle restartSignal;
WinHandle exitSignal;
WinHandle initSignal;
DWORD reconnectDuration = 0;
WinHandle reconnectSignal;
speaker_layout speakers;
audio_format format;
@ -59,7 +62,6 @@ class WASAPISource {
void Start();
void Stop();
void Reconnect();
static ComPtr<IMMDevice> InitDevice(IMMDeviceEnumerator *enumerator,
bool isDefaultDevice,
@ -149,6 +151,10 @@ WASAPISource::WASAPISource(obs_data_t *settings, obs_source_t *source_,
{
UpdateSettings(settings);
idleSignal = CreateEvent(nullptr, true, false, nullptr);
if (!idleSignal.Valid())
throw "Could not create idle signal";
stopSignal = CreateEvent(nullptr, true, false, nullptr);
if (!stopSignal.Valid())
throw "Could not create stop signal";
@ -157,49 +163,69 @@ WASAPISource::WASAPISource(obs_data_t *settings, obs_source_t *source_,
if (!receiveSignal.Valid())
throw "Could not create receive signal";
restartSignal = CreateEvent(nullptr, true, false, nullptr);
if (!restartSignal.Valid())
throw "Could not create restart signal";
exitSignal = CreateEvent(nullptr, true, false, nullptr);
if (!exitSignal.Valid())
throw "Could not create exit signal";
initSignal = CreateEvent(nullptr, false, false, nullptr);
if (!initSignal.Valid())
throw "Could not create init signal";
reconnectSignal = CreateEvent(nullptr, false, false, nullptr);
if (!reconnectSignal.Valid())
throw "Could not create reconnect signal";
reconnectThread = CreateThread(
nullptr, 0, WASAPISource::ReconnectThread, this, 0, nullptr);
if (!reconnectThread.Valid())
throw "Failed to create reconnect thread";
notify = new WASAPINotify(this);
if (!notify)
throw "Could not create WASAPINotify";
HRESULT res = CoCreateInstance(__uuidof(MMDeviceEnumerator), nullptr,
CLSCTX_ALL,
IID_PPV_ARGS(enumerator.Assign()));
if (FAILED(res))
throw HRError("Failed to create enumerator", res);
HRESULT hr = CoCreateInstance(__uuidof(MMDeviceEnumerator), nullptr,
CLSCTX_ALL,
IID_PPV_ARGS(enumerator.Assign()));
if (FAILED(hr))
throw HRError("Failed to create enumerator", hr);
res = enumerator->RegisterEndpointNotificationCallback(notify);
if (FAILED(res))
throw HRError("Failed to register endpoint callback", res);
hr = enumerator->RegisterEndpointNotificationCallback(notify);
if (FAILED(hr))
throw HRError("Failed to register endpoint callback", hr);
captureThread = CreateThread(nullptr, 0, WASAPISource::CaptureThread,
this, 0, nullptr);
if (!captureThread.Valid()) {
enumerator->UnregisterEndpointNotificationCallback(notify);
throw "Failed to create capture thread";
}
Start();
}
void WASAPISource::Start()
{
if (!TryInitialize()) {
blog(LOG_INFO, "WASAPI: Device '%s' failed to start",
device_id.c_str());
Reconnect();
}
SetEvent(initSignal);
}
void WASAPISource::Stop()
{
SetEvent(stopSignal);
if (active) {
blog(LOG_INFO, "WASAPI: Device '%s' Terminated",
device_name.c_str());
WaitForSingleObject(captureThread, INFINITE);
}
blog(LOG_INFO, "WASAPI: Device '%s' Terminated", device_name.c_str());
if (reconnecting)
WaitForSingleObject(reconnectThread, INFINITE);
WaitForSingleObject(idleSignal, INFINITE);
ResetEvent(stopSignal);
SetEvent(exitSignal);
capture.Clear();
client.Clear();
WaitForSingleObject(reconnectThread, INFINITE);
WaitForSingleObject(captureThread, INFINITE);
}
WASAPISource::~WASAPISource()
@ -217,16 +243,13 @@ void WASAPISource::UpdateSettings(obs_data_t *settings)
void WASAPISource::Update(obs_data_t *settings)
{
string newDevice = obs_data_get_string(settings, OPT_DEVICE_ID);
bool restart = newDevice.compare(device_id) != 0;
if (restart)
Stop();
const string newDevice = obs_data_get_string(settings, OPT_DEVICE_ID);
const bool restart = newDevice.compare(device_id) != 0;
UpdateSettings(settings);
if (restart)
Start();
SetEvent(restartSignal);
}
ComPtr<IMMDevice> WASAPISource::InitDevice(IMMDeviceEnumerator *enumerator,
@ -407,6 +430,8 @@ void WASAPISource::Initialize()
device_name = GetDeviceName(device);
ResetEvent(receiveSignal);
ComPtr<IAudioClient> temp_client =
InitClient(device, isInputDevice, speakers, format, sampleRate);
if (!isInputDevice)
@ -417,93 +442,68 @@ void WASAPISource::Initialize()
client = std::move(temp_client);
capture = std::move(temp_capture);
captureThread = CreateThread(nullptr, 0, WASAPISource::CaptureThread,
this, 0, nullptr);
if (!captureThread.Valid()) {
capture.Clear();
client.Clear();
throw "Failed to create capture thread";
}
active = true;
blog(LOG_INFO, "WASAPI: Device '%s' [%" PRIu32 " Hz] initialized",
device_name.c_str(), sampleRate);
}
bool WASAPISource::TryInitialize()
{
bool success = false;
try {
Initialize();
success = true;
} catch (HRError &error) {
if (previouslyFailed)
return active;
blog(LOG_WARNING, "[WASAPISource::TryInitialize]:[%s] %s: %lX",
device_name.empty() ? device_id.c_str()
: device_name.c_str(),
error.str, error.hr);
if (!previouslyFailed) {
blog(LOG_WARNING,
"[WASAPISource::TryInitialize]:[%s] %s: %lX",
device_name.empty() ? device_id.c_str()
: device_name.c_str(),
error.str, error.hr);
}
} catch (const char *error) {
if (previouslyFailed)
return active;
blog(LOG_WARNING, "[WASAPISource::TryInitialize]:[%s] %s",
device_name.empty() ? device_id.c_str()
: device_name.c_str(),
error);
if (!previouslyFailed) {
blog(LOG_WARNING,
"[WASAPISource::TryInitialize]:[%s] %s",
device_name.empty() ? device_id.c_str()
: device_name.c_str(),
error);
}
}
previouslyFailed = !active;
return active;
previouslyFailed = !success;
return success;
}
void WASAPISource::Reconnect()
{
reconnecting = true;
reconnectThread = CreateThread(
nullptr, 0, WASAPISource::ReconnectThread, this, 0, nullptr);
if (!reconnectThread.Valid())
blog(LOG_WARNING,
"[WASAPISource::Reconnect] "
"Failed to initialize reconnect thread: %lu",
GetLastError());
}
static inline bool WaitForSignal(HANDLE handle, DWORD time)
{
return WaitForSingleObject(handle, time) != WAIT_TIMEOUT;
}
#define RECONNECT_INTERVAL 3000
DWORD WINAPI WASAPISource::ReconnectThread(LPVOID param)
{
WASAPISource *source = (WASAPISource *)param;
os_set_thread_name("win-wasapi: reconnect thread");
const HRESULT hr = CoInitializeEx(0, COINIT_MULTITHREADED);
const bool com_initialized = SUCCEEDED(hr);
if (!com_initialized) {
blog(LOG_ERROR,
"[WASAPISource::ReconnectThread]"
" CoInitializeEx failed: 0x%08X",
hr);
}
WASAPISource *source = (WASAPISource *)param;
while (!WaitForSignal(source->stopSignal, RECONNECT_INTERVAL)) {
if (source->TryInitialize())
const HANDLE sigs[] = {
source->exitSignal,
source->reconnectSignal,
};
bool exit = false;
while (!exit) {
const DWORD ret = WaitForMultipleObjects(_countof(sigs), sigs,
false, INFINITE);
switch (ret) {
case WAIT_OBJECT_0:
exit = true;
break;
default:
assert(ret == (WAIT_OBJECT_0 + 1));
if (source->reconnectDuration > 0) {
WaitForSingleObject(source->stopSignal,
source->reconnectDuration);
}
source->Start();
}
}
if (com_initialized)
CoUninitialize();
source->reconnectThread = nullptr;
source->reconnecting = false;
return 0;
}
@ -518,7 +518,6 @@ bool WASAPISource::ProcessCaptureData()
while (true) {
res = capture->GetNextPacketSize(&captureSize);
if (FAILED(res)) {
if (res != AUDCLNT_E_DEVICE_INVALIDATED)
blog(LOG_WARNING,
@ -563,44 +562,130 @@ bool WASAPISource::ProcessCaptureData()
return true;
}
static inline bool WaitForCaptureSignal(DWORD numSignals, const HANDLE *signals,
DWORD duration)
{
DWORD ret;
ret = WaitForMultipleObjects(numSignals, signals, false, duration);
return ret == WAIT_OBJECT_0 || ret == WAIT_TIMEOUT;
}
#define RECONNECT_INTERVAL 3000
DWORD WINAPI WASAPISource::CaptureThread(LPVOID param)
{
WASAPISource *source = (WASAPISource *)param;
bool reconnect = false;
/* Output devices don't signal, so just make it check every 10 ms */
DWORD dur = source->isInputDevice ? RECONNECT_INTERVAL : 10;
HANDLE sigs[2] = {source->receiveSignal, source->stopSignal};
os_set_thread_name("win-wasapi: capture thread");
while (WaitForCaptureSignal(2, sigs, dur)) {
if (!source->ProcessCaptureData()) {
reconnect = true;
break;
const HRESULT hr = CoInitializeEx(0, COINIT_MULTITHREADED);
const bool com_initialized = SUCCEEDED(hr);
if (!com_initialized) {
blog(LOG_ERROR,
"[WASAPISource::CaptureThread]"
" CoInitializeEx failed: 0x%08X",
hr);
}
WASAPISource *source = (WASAPISource *)param;
const HANDLE inactive_sigs[] = {
source->exitSignal,
source->stopSignal,
source->initSignal,
};
const HANDLE active_sigs[] = {
source->exitSignal,
source->stopSignal,
source->receiveSignal,
source->restartSignal,
};
DWORD sig_count = _countof(inactive_sigs);
const HANDLE *sigs = inactive_sigs;
bool exit = false;
while (!exit) {
bool idle = false;
bool stop = false;
bool reconnect = false;
do {
/* Windows 7 does not seem to wake up for LOOPBACK */
const DWORD dwMilliseconds = ((sigs == active_sigs) &&
!source->isInputDevice)
? 10
: INFINITE;
const DWORD ret = WaitForMultipleObjects(
sig_count, sigs, false, dwMilliseconds);
switch (ret) {
case WAIT_OBJECT_0: {
exit = true;
stop = true;
idle = true;
break;
}
case WAIT_OBJECT_0 + 1:
stop = true;
idle = true;
break;
case WAIT_OBJECT_0 + 2:
case WAIT_TIMEOUT:
if (sigs == inactive_sigs) {
assert(ret != WAIT_TIMEOUT);
if (source->TryInitialize()) {
sig_count =
_countof(active_sigs);
sigs = active_sigs;
} else {
blog(LOG_INFO,
"WASAPI: Device '%s' failed to start",
source->device_id.c_str());
stop = true;
reconnect = true;
source->reconnectDuration =
RECONNECT_INTERVAL;
}
} else {
stop = !source->ProcessCaptureData();
if (stop) {
blog(LOG_INFO,
"Device '%s' invalidated. Retrying",
source->device_name
.c_str());
stop = true;
reconnect = true;
source->reconnectDuration =
RECONNECT_INTERVAL;
}
}
break;
default:
assert(sigs == active_sigs);
assert(ret == WAIT_OBJECT_0 + 3);
stop = true;
reconnect = true;
source->reconnectDuration = 0;
ResetEvent(source->restartSignal);
}
} while (!stop);
sig_count = _countof(inactive_sigs);
sigs = inactive_sigs;
if (source->client) {
source->client->Stop();
source->capture.Clear();
source->client.Clear();
}
if (idle) {
SetEvent(source->idleSignal);
} else if (reconnect) {
blog(LOG_INFO, "Device '%s' invalidated. Retrying",
source->device_name.c_str());
SetEvent(source->reconnectSignal);
}
}
source->client->Stop();
source->captureThread = nullptr;
source->active = false;
if (reconnect) {
blog(LOG_INFO, "Device '%s' invalidated. Retrying",
source->device_name.c_str());
source->Reconnect();
}
if (com_initialized)
CoUninitialize();
return 0;
}
@ -626,12 +711,9 @@ void WASAPISource::SetDefaultDevice(EDataFlow flow, ERole role, LPCWSTR id)
if (t - lastNotifyTime < 300000000)
return;
std::thread([this]() {
Stop();
Start();
}).detach();
lastNotifyTime = t;
SetEvent(restartSignal);
}
/* ------------------------------------------------------------------------- */