Use one PulseAudio mainloop per device

To help avoid devices blocking on each other when handling asynchronous
messages.
This commit is contained in:
Chris Robinson 2019-10-09 23:04:05 -07:00
parent 8bf3da0cd2
commit d2053e6784

View File

@ -32,6 +32,7 @@
#include <atomic>
#include <thread>
#include <algorithm>
#include <functional>
#include <condition_variable>
#include "alcmain.h"
@ -53,6 +54,7 @@ namespace {
MAGIC(pa_mainloop_free); \
MAGIC(pa_mainloop_set_poll_func); \
MAGIC(pa_mainloop_run); \
MAGIC(pa_mainloop_quit); \
MAGIC(pa_mainloop_get_api); \
MAGIC(pa_context_new); \
MAGIC(pa_context_unref); \
@ -119,6 +121,7 @@ PULSE_FUNCS(MAKE_FUNC)
#define pa_mainloop_free ppa_mainloop_free
#define pa_mainloop_set_poll_func ppa_mainloop_set_poll_func
#define pa_mainloop_run ppa_mainloop_run
#define pa_mainloop_quit ppa_mainloop_quit
#define pa_mainloop_get_api ppa_mainloop_get_api
#define pa_context_new ppa_context_new
#define pa_context_unref ppa_context_unref
@ -285,34 +288,29 @@ void SetChannelOrderFromMap(ALCdevice *device, const pa_channel_map &chanmap)
/* *grumble* Don't use enums for bitflags. */
inline pa_stream_flags_t operator|(pa_stream_flags_t lhs, pa_stream_flags_t rhs)
constexpr inline pa_stream_flags_t operator|(pa_stream_flags_t lhs, pa_stream_flags_t rhs)
{ return pa_stream_flags_t(int(lhs) | int(rhs)); }
inline pa_stream_flags_t& operator|=(pa_stream_flags_t &lhs, pa_stream_flags_t rhs)
{
lhs = pa_stream_flags_t(int(lhs) | int(rhs));
lhs = lhs | rhs;
return lhs;
}
inline pa_context_flags_t& operator|=(pa_context_flags_t &lhs, pa_context_flags_t rhs)
{
lhs = pa_context_flags_t(int(lhs) | int(rhs));
return lhs;
}
inline pa_stream_flags_t& operator&=(pa_stream_flags_t &lhs, int rhs)
{
lhs = pa_stream_flags_t(int(lhs) & rhs);
return lhs;
}
inline pa_context_flags_t& operator|=(pa_context_flags_t &lhs, pa_context_flags_t rhs)
{
lhs = pa_context_flags_t(int(lhs) | int(rhs));
return lhs;
}
/* Global flags and properties */
pa_context_flags_t pulse_ctx_flags;
pa_mainloop *pulse_mainloop{nullptr};
std::mutex pulse_lock;
std::condition_variable pulse_condvar;
int pulse_poll_func(struct pollfd *ufds, unsigned long nfds, int timeout, void *userdata) noexcept
{
auto plock = static_cast<std::unique_lock<std::mutex>*>(userdata);
@ -322,58 +320,90 @@ int pulse_poll_func(struct pollfd *ufds, unsigned long nfds, int timeout, void *
return r;
}
int pulse_mainloop_thread()
{
SetRTPriority();
class PulseMainloop {
std::thread mThread;
std::mutex mMutex;
std::condition_variable mCondVar;
pa_mainloop *mMainloop{nullptr};
std::unique_lock<std::mutex> plock{pulse_lock};
pulse_mainloop = pa_mainloop_new();
pa_mainloop_set_poll_func(pulse_mainloop, pulse_poll_func, &plock);
pulse_condvar.notify_all();
int ret{};
pa_mainloop_run(pulse_mainloop, &ret);
pa_mainloop_free(pulse_mainloop);
pulse_mainloop = nullptr;
return ret;
}
/* PulseAudio Event Callbacks */
void context_state_callback(pa_context *context, void* /*pdata*/) noexcept
{
pa_context_state_t state{pa_context_get_state(context)};
if(state == PA_CONTEXT_READY || !PA_CONTEXT_IS_GOOD(state))
pulse_condvar.notify_all();
}
void stream_state_callback(pa_stream *stream, void* /*pdata*/) noexcept
{
pa_stream_state_t state{pa_stream_get_state(stream)};
if(state == PA_STREAM_READY || !PA_STREAM_IS_GOOD(state))
pulse_condvar.notify_all();
}
void stream_success_callback(pa_stream* /*stream*/, int /*success*/, void* /*pdata*/) noexcept
{
pulse_condvar.notify_all();
}
void wait_for_operation(pa_operation *op, std::unique_lock<std::mutex> &plock)
{
if(op)
public:
~PulseMainloop()
{
while(pa_operation_get_state(op) == PA_OPERATION_RUNNING)
pulse_condvar.wait(plock);
pa_operation_unref(op);
if(mThread.joinable())
{
pa_mainloop_quit(mMainloop, 0);
mThread.join();
}
}
}
int mainloop_thread()
{
SetRTPriority();
std::unique_lock<std::mutex> plock{mMutex};
mMainloop = pa_mainloop_new();
pa_mainloop_set_poll_func(mMainloop, pulse_poll_func, &plock);
mCondVar.notify_all();
int ret{};
pa_mainloop_run(mMainloop, &ret);
pa_mainloop_free(mMainloop);
mMainloop = nullptr;
return ret;
}
void doLock() { mMutex.lock(); }
void doUnlock() { mMutex.unlock(); }
std::unique_lock<std::mutex> getLock() { return std::unique_lock<std::mutex>{mMutex}; }
std::condition_variable &getCondVar() noexcept { return mCondVar; }
void contextStateCallback(pa_context *context) noexcept
{
pa_context_state_t state{pa_context_get_state(context)};
if(state == PA_CONTEXT_READY || !PA_CONTEXT_IS_GOOD(state))
mCondVar.notify_all();
}
static void contextStateCallbackC(pa_context *context, void *pdata) noexcept
{ static_cast<PulseMainloop*>(pdata)->contextStateCallback(context); }
void streamStateCallback(pa_stream *stream) noexcept
{
pa_stream_state_t state{pa_stream_get_state(stream)};
if(state == PA_STREAM_READY || !PA_STREAM_IS_GOOD(state))
mCondVar.notify_all();
}
static void streamStateCallbackC(pa_stream *stream, void *pdata) noexcept
{ static_cast<PulseMainloop*>(pdata)->streamStateCallback(stream); }
void streamSuccessCallback(pa_stream*, int) noexcept
{ mCondVar.notify_all(); }
static void streamSuccessCallbackC(pa_stream *stream, int success, void *pdata) noexcept
{ static_cast<PulseMainloop*>(pdata)->streamSuccessCallback(stream, success); }
void waitForOperation(pa_operation *op, std::unique_lock<std::mutex> &plock)
{
if(op)
{
while(pa_operation_get_state(op) == PA_OPERATION_RUNNING)
mCondVar.wait(plock);
pa_operation_unref(op);
}
}
pa_context *connectContext(std::unique_lock<std::mutex> &plock);
pa_stream *connectStream(const char *device_name, std::unique_lock<std::mutex> &plock,
pa_context *context, pa_stream_flags_t flags, pa_buffer_attr *attr, pa_sample_spec *spec,
pa_channel_map *chanmap, BackendType type);
void close(pa_context *context, pa_stream *stream);
};
pa_context *connect_context(std::unique_lock<std::mutex> &plock)
pa_context *PulseMainloop::connectContext(std::unique_lock<std::mutex> &plock)
{
const char *name{"OpenAL Soft"};
@ -381,17 +411,16 @@ pa_context *connect_context(std::unique_lock<std::mutex> &plock)
if(!binname.fname.empty())
name = binname.fname.c_str();
if UNLIKELY(!pulse_mainloop)
if(!mMainloop)
{
std::thread{pulse_mainloop_thread}.detach();
while(!pulse_mainloop)
pulse_condvar.wait(plock);
mThread = std::thread{std::mem_fn(&PulseMainloop::mainloop_thread), this};
while(!mMainloop) mCondVar.wait(plock);
}
pa_context *context{pa_context_new(pa_mainloop_get_api(pulse_mainloop), name)};
pa_context *context{pa_context_new(pa_mainloop_get_api(mMainloop), name)};
if(!context) throw al::backend_exception{ALC_OUT_OF_MEMORY, "pa_context_new() failed"};
pa_context_set_state_callback(context, context_state_callback, nullptr);
pa_context_set_state_callback(context, &contextStateCallbackC, this);
int err;
if((err=pa_context_connect(context, nullptr, pulse_ctx_flags, nullptr)) >= 0)
@ -406,7 +435,7 @@ pa_context *connect_context(std::unique_lock<std::mutex> &plock)
break;
}
pulse_condvar.wait(plock);
mCondVar.wait(plock);
}
}
pa_context_set_state_callback(context, nullptr, nullptr);
@ -421,43 +450,9 @@ pa_context *connect_context(std::unique_lock<std::mutex> &plock)
return context;
}
void pulse_close(pa_context *context, pa_stream *stream)
{
std::lock_guard<std::mutex> _{pulse_lock};
if(stream)
{
pa_stream_set_state_callback(stream, nullptr, nullptr);
pa_stream_set_moved_callback(stream, nullptr, nullptr);
pa_stream_set_write_callback(stream, nullptr, nullptr);
pa_stream_set_buffer_attr_callback(stream, nullptr, nullptr);
pa_stream_disconnect(stream);
pa_stream_unref(stream);
}
pa_context_disconnect(context);
pa_context_unref(context);
}
struct DevMap {
std::string name;
std::string device_name;
};
bool checkName(const al::vector<DevMap> &list, const std::string &name)
{
auto match_name = [&name](const DevMap &entry) -> bool { return entry.name == name; };
return std::find_if(list.cbegin(), list.cend(), match_name) != list.cend();
}
al::vector<DevMap> PlaybackDevices;
al::vector<DevMap> CaptureDevices;
pa_stream *pulse_connect_stream(const char *device_name, std::unique_lock<std::mutex> &plock,
pa_context *context, pa_stream_flags_t flags, pa_buffer_attr *attr, pa_sample_spec *spec,
pa_channel_map *chanmap, BackendType type)
pa_stream *PulseMainloop::connectStream(const char *device_name,
std::unique_lock<std::mutex> &plock, pa_context *context, pa_stream_flags_t flags,
pa_buffer_attr *attr, pa_sample_spec *spec, pa_channel_map *chanmap, BackendType type)
{
const char *stream_id{(type==BackendType::Playback) ? "Playback Stream" : "Capture Stream"};
pa_stream *stream{pa_stream_new(context, stream_id, spec, chanmap)};
@ -465,7 +460,7 @@ pa_stream *pulse_connect_stream(const char *device_name, std::unique_lock<std::m
throw al::backend_exception{ALC_OUT_OF_MEMORY, "pa_stream_new() failed (%s)",
pa_strerror(pa_context_errno(context))};
pa_stream_set_state_callback(stream, stream_state_callback, nullptr);
pa_stream_set_state_callback(stream, &streamStateCallbackC, this);
int err{(type==BackendType::Playback) ?
pa_stream_connect_playback(stream, device_name, attr, flags, nullptr, nullptr) :
@ -488,19 +483,55 @@ pa_stream *pulse_connect_stream(const char *device_name, std::unique_lock<std::m
pa_strerror(err)};
}
pulse_condvar.wait(plock);
mCondVar.wait(plock);
}
pa_stream_set_state_callback(stream, nullptr, nullptr);
return stream;
}
void PulseMainloop::close(pa_context *context, pa_stream *stream)
{
std::lock_guard<std::mutex> _{mMutex};
if(stream)
{
pa_stream_set_state_callback(stream, nullptr, nullptr);
pa_stream_set_moved_callback(stream, nullptr, nullptr);
pa_stream_set_write_callback(stream, nullptr, nullptr);
pa_stream_set_buffer_attr_callback(stream, nullptr, nullptr);
pa_stream_disconnect(stream);
pa_stream_unref(stream);
}
void device_sink_callback(pa_context*, const pa_sink_info *info, int eol, void*) noexcept
pa_context_disconnect(context);
pa_context_unref(context);
}
/* Used for initial connection test and enumeration. */
PulseMainloop gGlobalMainloop;
struct DevMap {
std::string name;
std::string device_name;
};
bool checkName(const al::vector<DevMap> &list, const std::string &name)
{
auto match_name = [&name](const DevMap &entry) -> bool { return entry.name == name; };
return std::find_if(list.cbegin(), list.cend(), match_name) != list.cend();
}
al::vector<DevMap> PlaybackDevices;
al::vector<DevMap> CaptureDevices;
void device_sink_callback(pa_context*, const pa_sink_info *info, int eol, void *pdata) noexcept
{
if(eol)
{
pulse_condvar.notify_all();
static_cast<PulseMainloop*>(pdata)->getCondVar().notify_all();
return;
}
@ -528,50 +559,54 @@ void device_sink_callback(pa_context*, const pa_sink_info *info, int eol, void*)
TRACE("Got device \"%s\", \"%s\"\n", newentry.name.c_str(), newentry.device_name.c_str());
}
void probePlaybackDevices()
void probePlaybackDevices(PulseMainloop &mainloop)
{
pa_context *context{};
pa_stream *stream{};
PlaybackDevices.clear();
try {
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mainloop.getLock();
pa_context *context{connect_context(plock)};
context = mainloop.connectContext(plock);
const pa_stream_flags_t flags{PA_STREAM_FIX_FORMAT | PA_STREAM_FIX_RATE |
PA_STREAM_FIX_CHANNELS | PA_STREAM_DONT_MOVE};
constexpr pa_stream_flags_t flags{PA_STREAM_FIX_FORMAT | PA_STREAM_FIX_RATE |
PA_STREAM_FIX_CHANNELS | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED};
pa_sample_spec spec{};
spec.format = PA_SAMPLE_S16NE;
spec.rate = 44100;
spec.channels = 2;
pa_stream *stream{pulse_connect_stream(nullptr, plock, context, flags, nullptr, &spec,
nullptr, BackendType::Playback)};
stream = mainloop.connectStream(nullptr, plock, context, flags, nullptr, &spec, nullptr,
BackendType::Playback);
pa_operation *op{pa_context_get_sink_info_by_name(context,
pa_stream_get_device_name(stream), device_sink_callback, nullptr)};
wait_for_operation(op, plock);
pa_stream_get_device_name(stream), device_sink_callback, &mainloop)};
mainloop.waitForOperation(op, plock);
pa_stream_disconnect(stream);
pa_stream_unref(stream);
stream = nullptr;
op = pa_context_get_sink_info_list(context, device_sink_callback, nullptr);
wait_for_operation(op, plock);
op = pa_context_get_sink_info_list(context, device_sink_callback, &mainloop);
mainloop.waitForOperation(op, plock);
pa_context_disconnect(context);
pa_context_unref(context);
context = nullptr;
}
catch(std::exception &e) {
ERR("Error enumerating devices: %s\n", e.what());
if(context) mainloop.close(context, stream);
}
}
void device_source_callback(pa_context*, const pa_source_info *info, int eol, void*) noexcept
void device_source_callback(pa_context*, const pa_source_info *info, int eol, void *pdata) noexcept
{
if(eol)
{
pulse_condvar.notify_all();
static_cast<PulseMainloop*>(pdata)->getCondVar().notify_all();
return;
}
@ -599,41 +634,45 @@ void device_source_callback(pa_context*, const pa_source_info *info, int eol, vo
TRACE("Got device \"%s\", \"%s\"\n", newentry.name.c_str(), newentry.device_name.c_str());
}
void probeCaptureDevices()
void probeCaptureDevices(PulseMainloop &mainloop)
{
pa_context *context{};
pa_stream *stream{};
CaptureDevices.clear();
try {
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mainloop.getLock();
pa_context *context{connect_context(plock)};
context = mainloop.connectContext(plock);
const pa_stream_flags_t flags{PA_STREAM_FIX_FORMAT | PA_STREAM_FIX_RATE |
PA_STREAM_FIX_CHANNELS | PA_STREAM_DONT_MOVE};
constexpr pa_stream_flags_t flags{PA_STREAM_FIX_FORMAT | PA_STREAM_FIX_RATE |
PA_STREAM_FIX_CHANNELS | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED};
pa_sample_spec spec{};
spec.format = PA_SAMPLE_S16NE;
spec.rate = 44100;
spec.channels = 1;
pa_stream *stream{pulse_connect_stream(nullptr, plock, context, flags, nullptr, &spec, nullptr,
BackendType::Capture)};
stream = mainloop.connectStream(nullptr, plock, context, flags, nullptr, &spec, nullptr,
BackendType::Capture);
pa_operation *op{pa_context_get_source_info_by_name(context,
pa_stream_get_device_name(stream), device_source_callback, nullptr)};
wait_for_operation(op, plock);
pa_stream_get_device_name(stream), device_source_callback, &mainloop)};
mainloop.waitForOperation(op, plock);
pa_stream_disconnect(stream);
pa_stream_unref(stream);
stream = nullptr;
op = pa_context_get_source_info_list(context, device_source_callback, nullptr);
wait_for_operation(op, plock);
op = pa_context_get_source_info_list(context, device_source_callback, &mainloop);
mainloop.waitForOperation(op, plock);
pa_context_disconnect(context);
pa_context_unref(context);
context = nullptr;
}
catch(std::exception &e) {
ERR("Error enumerating devices: %s\n", e.what());
if(context) mainloop.close(context, stream);
}
}
@ -671,8 +710,10 @@ struct PulsePlayback final : public BackendBase {
bool start() override;
void stop() override;
ClockLatency getClockLatency() override;
void lock() override { pulse_lock.lock(); }
void unlock() override { pulse_lock.unlock(); }
void lock() override { mMainloop.doLock(); }
void unlock() override { mMainloop.doUnlock(); }
PulseMainloop mMainloop;
std::string mDeviceName;
@ -692,7 +733,7 @@ PulsePlayback::~PulsePlayback()
if(!mContext)
return;
pulse_close(mContext, mStream);
mMainloop.close(mContext, mStream);
mContext = nullptr;
mStream = nullptr;
}
@ -716,7 +757,7 @@ void PulsePlayback::streamStateCallback(pa_stream *stream) noexcept
ERR("Received stream failure!\n");
aluHandleDisconnect(mDevice, "Playback stream failure");
}
pulse_condvar.notify_all();
mMainloop.getCondVar().notify_all();
}
void PulsePlayback::streamWriteCallback(pa_stream *stream, size_t nbytes) noexcept
@ -747,7 +788,7 @@ void PulsePlayback::sinkInfoCallback(pa_context*, const pa_sink_info *info, int
if(eol)
{
pulse_condvar.notify_all();
mMainloop.getCondVar().notify_all();
return;
}
@ -777,7 +818,7 @@ void PulsePlayback::sinkNameCallback(pa_context*, const pa_sink_info *info, int
{
if(eol)
{
pulse_condvar.notify_all();
mMainloop.getCondVar().notify_all();
return;
}
mDevice->DeviceName = info->description;
@ -798,7 +839,7 @@ void PulsePlayback::open(const ALCchar *name)
if(name)
{
if(PlaybackDevices.empty())
probePlaybackDevices();
probePlaybackDevices(mMainloop);
auto iter = std::find_if(PlaybackDevices.cbegin(), PlaybackDevices.cend(),
[name](const DevMap &entry) -> bool
@ -810,9 +851,9 @@ void PulsePlayback::open(const ALCchar *name)
dev_name = iter->name.c_str();
}
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mMainloop.getLock();
mContext = connect_context(plock);
mContext = mMainloop.connectContext(plock);
pa_stream_flags_t flags{PA_STREAM_START_CORKED | PA_STREAM_FIX_FORMAT | PA_STREAM_FIX_RATE |
PA_STREAM_FIX_CHANNELS};
@ -830,7 +871,7 @@ void PulsePlayback::open(const ALCchar *name)
if(defname) pulse_name = defname->c_str();
}
TRACE("Connecting to \"%s\"\n", pulse_name ? pulse_name : "(default)");
mStream = pulse_connect_stream(pulse_name, plock, mContext, flags, nullptr, &spec, nullptr,
mStream = mMainloop.connectStream(pulse_name, plock, mContext, flags, nullptr, &spec, nullptr,
BackendType::Playback);
pa_stream_set_moved_callback(mStream, &PulsePlayback::streamMovedCallbackC, this);
@ -841,7 +882,7 @@ void PulsePlayback::open(const ALCchar *name)
{
pa_operation *op{pa_context_get_sink_info_by_name(mContext, mDeviceName.c_str(),
&PulsePlayback::sinkNameCallbackC, this)};
wait_for_operation(op, plock);
mMainloop.waitForOperation(op, plock);
}
else
mDevice->DeviceName = dev_name;
@ -849,7 +890,7 @@ void PulsePlayback::open(const ALCchar *name)
bool PulsePlayback::reset()
{
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mMainloop.getLock();
if(mStream)
{
@ -864,7 +905,7 @@ bool PulsePlayback::reset()
pa_operation *op{pa_context_get_sink_info_by_name(mContext, mDeviceName.c_str(),
&PulsePlayback::sinkInfoCallbackC, this)};
wait_for_operation(op, plock);
mMainloop.waitForOperation(op, plock);
pa_stream_flags_t flags{PA_STREAM_START_CORKED | PA_STREAM_INTERPOLATE_TIMING |
PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_EARLY_REQUESTS};
@ -886,56 +927,56 @@ bool PulsePlayback::reset()
pa_channel_map chanmap{};
switch(mDevice->FmtChans)
{
case DevFmtMono:
chanmap = MonoChanMap;
break;
case DevFmtAmbi3D:
mDevice->FmtChans = DevFmtStereo;
/*fall-through*/
case DevFmtStereo:
chanmap = StereoChanMap;
break;
case DevFmtQuad:
chanmap = QuadChanMap;
break;
case DevFmtX51:
chanmap = X51ChanMap;
break;
case DevFmtX51Rear:
chanmap = X51RearChanMap;
break;
case DevFmtX61:
chanmap = X61ChanMap;
break;
case DevFmtX71:
chanmap = X71ChanMap;
break;
case DevFmtMono:
chanmap = MonoChanMap;
break;
case DevFmtAmbi3D:
mDevice->FmtChans = DevFmtStereo;
/*fall-through*/
case DevFmtStereo:
chanmap = StereoChanMap;
break;
case DevFmtQuad:
chanmap = QuadChanMap;
break;
case DevFmtX51:
chanmap = X51ChanMap;
break;
case DevFmtX51Rear:
chanmap = X51RearChanMap;
break;
case DevFmtX61:
chanmap = X61ChanMap;
break;
case DevFmtX71:
chanmap = X71ChanMap;
break;
}
SetChannelOrderFromMap(mDevice, chanmap);
switch(mDevice->FmtType)
{
case DevFmtByte:
mDevice->FmtType = DevFmtUByte;
/* fall-through */
case DevFmtUByte:
mSpec.format = PA_SAMPLE_U8;
break;
case DevFmtUShort:
mDevice->FmtType = DevFmtShort;
/* fall-through */
case DevFmtShort:
mSpec.format = PA_SAMPLE_S16NE;
break;
case DevFmtUInt:
mDevice->FmtType = DevFmtInt;
/* fall-through */
case DevFmtInt:
mSpec.format = PA_SAMPLE_S32NE;
break;
case DevFmtFloat:
mSpec.format = PA_SAMPLE_FLOAT32NE;
break;
case DevFmtByte:
mDevice->FmtType = DevFmtUByte;
/* fall-through */
case DevFmtUByte:
mSpec.format = PA_SAMPLE_U8;
break;
case DevFmtUShort:
mDevice->FmtType = DevFmtShort;
/* fall-through */
case DevFmtShort:
mSpec.format = PA_SAMPLE_S16NE;
break;
case DevFmtUInt:
mDevice->FmtType = DevFmtInt;
/* fall-through */
case DevFmtInt:
mSpec.format = PA_SAMPLE_S32NE;
break;
case DevFmtFloat:
mSpec.format = PA_SAMPLE_FLOAT32NE;
break;
}
mSpec.rate = mDevice->Frequency;
mSpec.channels = static_cast<uint8_t>(mDevice->channelsFromFmt());
@ -949,7 +990,7 @@ bool PulsePlayback::reset()
mAttr.minreq = mDevice->UpdateSize * frame_size;
mAttr.fragsize = ~0u;
mStream = pulse_connect_stream(mDeviceName.c_str(), plock, mContext, flags, &mAttr, &mSpec,
mStream = mMainloop.connectStream(mDeviceName.c_str(), plock, mContext, flags, &mAttr, &mSpec,
&chanmap, BackendType::Playback);
pa_stream_set_state_callback(mStream, &PulsePlayback::streamStateCallbackC, this);
@ -974,8 +1015,9 @@ bool PulsePlayback::reset()
mAttr.prebuf = 0u;
mAttr.minreq = perlen * mFrameSize;
op = pa_stream_set_buffer_attr(mStream, &mAttr, stream_success_callback, nullptr);
wait_for_operation(op, plock);
op = pa_stream_set_buffer_attr(mStream, &mAttr, &PulseMainloop::streamSuccessCallbackC,
&mMainloop);
mMainloop.waitForOperation(op, plock);
mDevice->Frequency = mSpec.rate;
}
@ -991,22 +1033,24 @@ bool PulsePlayback::reset()
bool PulsePlayback::start()
{
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mMainloop.getLock();
pa_stream_set_write_callback(mStream, &PulsePlayback::streamWriteCallbackC, this);
pa_operation *op{pa_stream_cork(mStream, 0, stream_success_callback, nullptr)};
wait_for_operation(op, plock);
pa_operation *op{pa_stream_cork(mStream, 0, &PulseMainloop::streamSuccessCallbackC,
&mMainloop)};
mMainloop.waitForOperation(op, plock);
return true;
}
void PulsePlayback::stop()
{
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mMainloop.getLock();
pa_operation *op{pa_stream_cork(mStream, 1, &PulseMainloop::streamSuccessCallbackC,
&mMainloop)};
mMainloop.waitForOperation(op, plock);
pa_stream_set_write_callback(mStream, nullptr, nullptr);
pa_operation *op{pa_stream_cork(mStream, 1, stream_success_callback, nullptr)};
wait_for_operation(op, plock);
}
@ -1017,7 +1061,7 @@ ClockLatency PulsePlayback::getClockLatency()
int neg, err;
{
std::lock_guard<std::mutex> _{pulse_lock};
auto _ = mMainloop.getLock();
ret.ClockTime = GetDeviceClockTime(mDevice);
err = pa_stream_get_latency(mStream, &latency, &neg);
}
@ -1063,8 +1107,10 @@ struct PulseCapture final : public BackendBase {
ALCenum captureSamples(al::byte *buffer, ALCuint samples) override;
ALCuint availableSamples() override;
ClockLatency getClockLatency() override;
void lock() override { pulse_lock.lock(); }
void unlock() override { pulse_lock.unlock(); }
void lock() override { mMainloop.doLock(); }
void unlock() override { mMainloop.doUnlock(); }
PulseMainloop mMainloop;
std::string mDeviceName;
@ -1088,7 +1134,7 @@ PulseCapture::~PulseCapture()
if(!mContext)
return;
pulse_close(mContext, mStream);
mMainloop.close(mContext, mStream);
mContext = nullptr;
mStream = nullptr;
}
@ -1101,14 +1147,14 @@ void PulseCapture::streamStateCallback(pa_stream *stream) noexcept
ERR("Received stream failure!\n");
aluHandleDisconnect(mDevice, "Capture stream failure");
}
pulse_condvar.notify_all();
mMainloop.getCondVar().notify_all();
}
void PulseCapture::sourceNameCallback(pa_context*, const pa_source_info *info, int eol) noexcept
{
if(eol)
{
pulse_condvar.notify_all();
mMainloop.getCondVar().notify_all();
return;
}
mDevice->DeviceName = info->description;
@ -1127,7 +1173,7 @@ void PulseCapture::open(const ALCchar *name)
if(name)
{
if(CaptureDevices.empty())
probeCaptureDevices();
probeCaptureDevices(mMainloop);
auto iter = std::find_if(CaptureDevices.cbegin(), CaptureDevices.cend(),
[name](const DevMap &entry) -> bool
@ -1139,9 +1185,9 @@ void PulseCapture::open(const ALCchar *name)
mDevice->DeviceName = iter->name;
}
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mMainloop.getLock();
mContext = connect_context(plock);
mContext = mMainloop.connectContext(plock);
pa_channel_map chanmap{};
switch(mDevice->FmtChans)
@ -1212,7 +1258,7 @@ void PulseCapture::open(const ALCchar *name)
flags |= PA_STREAM_DONT_MOVE;
TRACE("Connecting to \"%s\"\n", pulse_name ? pulse_name : "(default)");
mStream = pulse_connect_stream(pulse_name, plock, mContext, flags, &mAttr, &mSpec, &chanmap,
mStream = mMainloop.connectStream(pulse_name, plock, mContext, flags, &mAttr, &mSpec, &chanmap,
BackendType::Capture);
pa_stream_set_moved_callback(mStream, &PulseCapture::streamMovedCallbackC, this);
@ -1223,23 +1269,25 @@ void PulseCapture::open(const ALCchar *name)
{
pa_operation *op{pa_context_get_source_info_by_name(mContext, mDeviceName.c_str(),
&PulseCapture::sourceNameCallbackC, this)};
wait_for_operation(op, plock);
mMainloop.waitForOperation(op, plock);
}
}
bool PulseCapture::start()
{
std::unique_lock<std::mutex> plock{pulse_lock};
pa_operation *op{pa_stream_cork(mStream, 0, stream_success_callback, nullptr)};
wait_for_operation(op, plock);
auto plock = mMainloop.getLock();
pa_operation *op{pa_stream_cork(mStream, 0, &PulseMainloop::streamSuccessCallbackC,
&mMainloop)};
mMainloop.waitForOperation(op, plock);
return true;
}
void PulseCapture::stop()
{
std::unique_lock<std::mutex> plock{pulse_lock};
pa_operation *op{pa_stream_cork(mStream, 1, stream_success_callback, nullptr)};
wait_for_operation(op, plock);
auto plock = mMainloop.getLock();
pa_operation *op{pa_stream_cork(mStream, 1, &PulseMainloop::streamSuccessCallbackC,
&mMainloop)};
mMainloop.waitForOperation(op, plock);
}
ALCenum PulseCapture::captureSamples(al::byte *buffer, ALCuint samples)
@ -1267,7 +1315,7 @@ ALCenum PulseCapture::captureSamples(al::byte *buffer, ALCuint samples)
if UNLIKELY(!mDevice->Connected.load(std::memory_order_acquire))
break;
std::unique_lock<std::mutex> plock{pulse_lock};
auto plock = mMainloop.getLock();
if(mCapLen != 0)
{
pa_stream_drop(mStream);
@ -1309,9 +1357,9 @@ ALCuint PulseCapture::availableSamples()
if(mDevice->Connected.load(std::memory_order_acquire))
{
std::lock_guard<std::mutex> _{pulse_lock};
auto _ = mMainloop.getLock();
size_t got{pa_stream_readable_size(mStream)};
if(static_cast<ssize_t>(got) < 0)
if UNLIKELY(static_cast<ssize_t>(got) < 0)
{
const char *err{pa_strerror(static_cast<int>(got))};
ERR("pa_stream_readable_size() failed: %s\n", err);
@ -1337,7 +1385,7 @@ ClockLatency PulseCapture::getClockLatency()
int neg, err;
{
std::lock_guard<std::mutex> _{pulse_lock};
auto _ = mMainloop.getLock();
ret.ClockTime = GetDeviceClockTime(mDevice);
err = pa_stream_get_latency(mStream, &latency, &neg);
}
@ -1405,8 +1453,8 @@ bool PulseBackendFactory::init()
pulse_ctx_flags |= PA_CONTEXT_NOAUTOSPAWN;
try {
std::unique_lock<std::mutex> plock{pulse_lock};
pa_context *context{connect_context(plock)};
auto plock = gGlobalMainloop.getLock();
pa_context *context{gGlobalMainloop.connectContext(plock)};
pa_context_disconnect(context);
pa_context_unref(context);
return true;
@ -1428,17 +1476,18 @@ void PulseBackendFactory::probe(DevProbe type, std::string *outnames)
*/
outnames->append(entry.name.c_str(), entry.name.length()+1);
};
switch(type)
{
case DevProbe::Playback:
probePlaybackDevices();
std::for_each(PlaybackDevices.cbegin(), PlaybackDevices.cend(), add_device);
break;
case DevProbe::Playback:
probePlaybackDevices(gGlobalMainloop);
std::for_each(PlaybackDevices.cbegin(), PlaybackDevices.cend(), add_device);
break;
case DevProbe::Capture:
probeCaptureDevices();
std::for_each(CaptureDevices.cbegin(), CaptureDevices.cend(), add_device);
break;
case DevProbe::Capture:
probeCaptureDevices(gGlobalMainloop);
std::for_each(CaptureDevices.cbegin(), CaptureDevices.cend(), add_device);
break;
}
}