C++-ize the PipeWire backend some more

Use unique_ptrs for a few more types to avoid explicit free calls.

Move ThreadMainloop::wait to the unique_lock wrapper that's holding the lock.
Since the mainloop acts as both a lock and condition_variable, passing the lock
to the wait method makes no sense. Also have it optionally take a predicate
functor to dictate when to stop waiting.
This commit is contained in:
Chris Robinson 2021-12-28 16:29:07 -08:00
parent f3aa08aad9
commit f484782238

View File

@ -32,6 +32,7 @@
#include <memory>
#include <mutex>
#include <stdint.h>
#include <type_traits>
#include <utility>
#include "albyte.h"
@ -223,23 +224,56 @@ public:
ThreadMainloop& operator=(const ThreadMainloop&) = delete;
ThreadMainloop& operator=(ThreadMainloop&& rhs) noexcept
{ std::swap(mLoop, rhs.mLoop); return *this; }
ThreadMainloop& operator=(std::nullptr_t) noexcept
{
if(mLoop)
pw_thread_loop_destroy(mLoop);
mLoop = nullptr;
return *this;
}
operator bool() const noexcept { return mLoop != nullptr; }
auto start() const { return pw_thread_loop_start(mLoop); }
auto stop() const { return pw_thread_loop_stop(mLoop); }
auto signal(bool wait) const { return pw_thread_loop_signal(mLoop, wait); }
auto wait() const { return pw_thread_loop_wait(mLoop); }
auto getLoop() const { return pw_thread_loop_get_loop(mLoop); }
auto lock() const { return pw_thread_loop_lock(mLoop); }
auto unlock() const { return pw_thread_loop_unlock(mLoop); }
auto signal(bool wait) const { return pw_thread_loop_signal(mLoop, wait); }
friend struct MainloopUniqueLock;
};
struct MainloopUniqueLock : public std::unique_lock<ThreadMainloop> {
using std::unique_lock<ThreadMainloop>::unique_lock;
using std::unique_lock<ThreadMainloop>::operator=;
auto wait() const -> void
{ pw_thread_loop_wait(mutex()->mLoop); }
template<typename Predicate>
auto wait(Predicate done_waiting) const -> void
{ while(!done_waiting()) wait(); }
};
using MainloopUniqueLock = std::unique_lock<ThreadMainloop>;
using MainloopLockGuard = std::lock_guard<ThreadMainloop>;
struct PwContextDeleter {
void operator()(pw_context *context) const { pw_context_destroy(context); }
};
using PwContextPtr = std::unique_ptr<pw_context,PwContextDeleter>;
struct PwCoreDeleter {
void operator()(pw_core *core) const { pw_core_disconnect(core); }
};
using PwCorePtr = std::unique_ptr<pw_core,PwCoreDeleter>;
struct PwRegistryDeleter {
void operator()(pw_registry *reg) const { pw_proxy_destroy(reinterpret_cast<pw_proxy*>(reg)); }
};
using PwRegistryPtr = std::unique_ptr<pw_registry,PwRegistryDeleter>;
struct PwStreamDeleter {
void operator()(pw_stream *stream) const { pw_stream_destroy(stream); }
};
@ -248,7 +282,7 @@ using PwStreamPtr = std::unique_ptr<pw_stream,PwStreamDeleter>;
/* Enums for bitflags... again... *sigh* */
constexpr pw_stream_flags operator|(pw_stream_flags lhs, pw_stream_flags rhs) noexcept
{ return static_cast<pw_stream_flags>(lhs | uint{rhs}); }
{ return static_cast<pw_stream_flags>(lhs | std::underlying_type_t<pw_stream_flags>{rhs}); }
const spa_audio_channel MonoMap[]{
@ -301,9 +335,9 @@ struct MetadataProxy;
*/
struct EventManager {
ThreadMainloop mLoop{};
pw_core *mCore{};
pw_context *mContext{};
pw_registry *mRegistry{};
PwContextPtr mContext{};
PwCorePtr mCore{};
PwRegistryPtr mRegistry{};
spa_hook mRegistryListener{};
spa_hook mCoreListener{};
@ -332,13 +366,16 @@ struct EventManager {
auto unlock() const { return mLoop.unlock(); }
/**
* Waits for initialization to finish. The event manager must be locked
* when calling this.
* Waits for initialization to finish. The event manager must *NOT* be
* locked when calling this.
*/
void waitForInit()
{
while(unlikely(!mInitDone.load(std::memory_order_acquire)))
mLoop.wait();
if(unlikely(!mInitDone.load(std::memory_order_acquire)))
{
MainloopUniqueLock plock{mLoop};
plock.wait([this](){ return mInitDone.load(std::memory_order_acquire); });
}
}
/**
@ -348,13 +385,13 @@ struct EventManager {
*/
bool waitForAudio()
{
MainloopLockGuard _{mLoop};
bool has_audio{mHasAudio.load(std::memory_order_acquire)};
while(unlikely(!has_audio && !mInitDone.load(std::memory_order_acquire)))
MainloopUniqueLock plock{mLoop};
bool has_audio{};
plock.wait([this,&has_audio]()
{
mLoop.wait();
has_audio = mHasAudio.load(std::memory_order_acquire);
}
return has_audio || mInitDone.load(std::memory_order_acquire);
});
return has_audio;
}
@ -364,7 +401,7 @@ struct EventManager {
* complete until after currently scheduled events.
*/
if(!mInitDone.load(std::memory_order_relaxed))
mInitSeq = ppw_core_sync(mCore, PW_ID_CORE, mInitSeq);
mInitSeq = ppw_core_sync(mCore.get(), PW_ID_CORE, mInitSeq);
}
void addCallback(uint32_t id, uint32_t permissions, const char *type, uint32_t version,
@ -858,34 +895,34 @@ bool EventManager::init()
return false;
}
mContext = pw_context_new(mLoop.getLoop(), nullptr, 0);
mContext = PwContextPtr{pw_context_new(mLoop.getLoop(), nullptr, 0)};
if(!mContext)
{
ERR("Failed to create PipeWire event context (errno: %d)\n", errno);
return false;
}
mCore = pw_context_connect(mContext, nullptr, 0);
mCore = PwCorePtr{pw_context_connect(mContext.get(), nullptr, 0)};
if(!mCore)
{
ERR("Failed to connect PipeWire event context (errno: %d)\n", errno);
return false;
}
mRegistry = pw_core_get_registry(mCore, PW_VERSION_REGISTRY, 0);
mRegistry = PwRegistryPtr{pw_core_get_registry(mCore.get(), PW_VERSION_REGISTRY, 0)};
if(!mRegistry)
{
ERR("Failed to get PipeWire event registry (errno: %d)\n", errno);
return false;
}
ppw_registry_add_listener(mRegistry, &mRegistryListener, &sRegistryEvents, this);
ppw_core_add_listener(mCore, &mCoreListener, &sCoreEvents, this);
ppw_registry_add_listener(mRegistry.get(), &mRegistryListener, &sRegistryEvents, this);
ppw_core_add_listener(mCore.get(), &mCoreListener, &sCoreEvents, this);
/* Set an initial sequence ID for initialization, to trigger after the
* registry is first populated.
*/
mInitSeq = ppw_core_sync(mCore, PW_ID_CORE, 0);
mInitSeq = ppw_core_sync(mCore.get(), PW_ID_CORE, 0);
if(int res{mLoop.start()})
{
@ -904,10 +941,6 @@ EventManager::~EventManager()
al::destroy_at(node);
if(mDefaultMetadata)
al::destroy_at(mDefaultMetadata);
if(mRegistry) pw_proxy_destroy(reinterpret_cast<pw_proxy*>(mRegistry));
if(mCore) pw_core_disconnect(mCore);
if(mContext) pw_context_destroy(mContext);
}
void EventManager::kill()
@ -921,17 +954,10 @@ void EventManager::kill()
al::destroy_at(mDefaultMetadata);
mDefaultMetadata = nullptr;
if(mRegistry)
pw_proxy_destroy(reinterpret_cast<pw_proxy*>(mRegistry));
mRegistry = nullptr;
if(mCore)
pw_core_disconnect(mCore);
mCore = nullptr;
if(mContext)
pw_context_destroy(mContext);
mContext = nullptr;
mLoop = ThreadMainloop{};
mLoop = nullptr;
}
void EventManager::addCallback(uint32_t id, uint32_t, const char *type, uint32_t version,
@ -950,7 +976,7 @@ void EventManager::addCallback(uint32_t id, uint32_t, const char *type, uint32_t
return;
/* Create the proxy object. */
auto *proxy = static_cast<pw_proxy*>(pw_registry_bind(mRegistry, id, type, version,
auto *proxy = static_cast<pw_proxy*>(pw_registry_bind(mRegistry.get(), id, type, version,
sizeof(NodeProxy)));
if(!proxy)
{
@ -988,7 +1014,7 @@ void EventManager::addCallback(uint32_t id, uint32_t, const char *type, uint32_t
return;
}
auto *proxy = static_cast<pw_proxy*>(pw_registry_bind(mRegistry, id, type, version,
auto *proxy = static_cast<pw_proxy*>(pw_registry_bind(mRegistry.get(), id, type, version,
sizeof(MetadataProxy)));
if(!proxy)
{
@ -1203,10 +1229,10 @@ void PipeWirePlayback::open(const char *name)
uint32_t targetid{PwIdAny};
std::string devname{};
gEventHandler.waitForInit();
if(!name)
{
EventWatcherLockGuard _{gEventHandler};
gEventHandler.waitForInit();
auto match = DeviceList.cend();
if(!DefaultSinkDev.empty())
@ -1231,7 +1257,6 @@ void PipeWirePlayback::open(const char *name)
else
{
EventWatcherLockGuard _{gEventHandler};
gEventHandler.waitForInit();
auto match_name = [name](const DeviceNode &n) -> bool
{ return !n.mCapture && n.mName == name; };
@ -1359,13 +1384,15 @@ bool PipeWirePlayback::reset()
/* Wait for the stream to become paused (ready to start streaming). */
pw_stream_state state{};
const char *error{};
while((state=pw_stream_get_state(mStream.get(), &error)) != PW_STREAM_STATE_PAUSED)
plock.wait([stream=mStream.get(),&state,&error]()
{
state = pw_stream_get_state(stream, &error);
if(state == PW_STREAM_STATE_ERROR)
throw al::backend_exception{al::backend_error::DeviceError,
"Error connecting PipeWire stream: \"%s\"", error};
mLoop.wait();
}
return state == PW_STREAM_STATE_PAUSED;
});
/* TODO: Update mDevice->BufferSize with the total known buffering delay
* from the head of this playback stream to the tail of the device output.
*/
@ -1382,7 +1409,7 @@ bool PipeWirePlayback::reset()
void PipeWirePlayback::start()
{
MainloopLockGuard _{mLoop};
MainloopUniqueLock plock{mLoop};
if(int res{pw_stream_set_active(mStream.get(), true)})
throw al::backend_exception{al::backend_error::DeviceError,
"Failed to start PipeWire stream (res: %d)", res};
@ -1392,8 +1419,11 @@ void PipeWirePlayback::start()
*/
pw_stream_state state{};
const char *error{};
while((state=pw_stream_get_state(mStream.get(), &error)) == PW_STREAM_STATE_PAUSED)
mLoop.wait();
plock.wait([stream=mStream.get(),&state,&error]()
{
state = pw_stream_get_state(stream, &error);
return state != PW_STREAM_STATE_PAUSED;
});
if(state == PW_STREAM_STATE_ERROR)
throw al::backend_exception{al::backend_error::DeviceError,
@ -1407,14 +1437,14 @@ void PipeWirePlayback::start()
void PipeWirePlayback::stop()
{
MainloopLockGuard _{mLoop};
MainloopUniqueLock plock{mLoop};
if(int res{pw_stream_set_active(mStream.get(), false)})
throw al::backend_exception{al::backend_error::DeviceError,
"Failed to stop PipeWire stream (res: %d)", res};
/* Wait for the stream to stop playing. */
while(pw_stream_get_state(mStream.get(), nullptr) == PW_STREAM_STATE_STREAMING)
mLoop.wait();
plock.wait([stream=mStream.get()]()
{ return pw_stream_get_state(stream, nullptr) != PW_STREAM_STATE_STREAMING; });
}
ClockLatency PipeWirePlayback::getClockLatency()
@ -1576,10 +1606,10 @@ void PipeWireCapture::open(const char *name)
uint32_t targetid{PwIdAny};
std::string devname{};
gEventHandler.waitForInit();
if(!name)
{
EventWatcherLockGuard _{gEventHandler};
gEventHandler.waitForInit();
auto match = DeviceList.cend();
if(!DefaultSourceDev.empty())
@ -1611,7 +1641,6 @@ void PipeWireCapture::open(const char *name)
else
{
EventWatcherLockGuard _{gEventHandler};
gEventHandler.waitForInit();
auto match_name = [name](const DeviceNode &n) -> bool
{ return n.mCapture && n.mName == name; };
@ -1701,13 +1730,14 @@ void PipeWireCapture::open(const char *name)
/* Wait for the stream to become paused (ready to start streaming). */
pw_stream_state state{};
const char *error{};
while((state=pw_stream_get_state(mStream.get(), &error)) != PW_STREAM_STATE_PAUSED)
plock.wait([stream=mStream.get(),&state,&error]()
{
state = pw_stream_get_state(stream, &error);
if(state == PW_STREAM_STATE_ERROR)
throw al::backend_exception{al::backend_error::DeviceError,
"Error connecting PipeWire stream: \"%s\"", error};
mLoop.wait();
}
return state == PW_STREAM_STATE_PAUSED;
});
plock.unlock();
setDefaultWFXChannelOrder();
@ -1720,15 +1750,18 @@ void PipeWireCapture::open(const char *name)
void PipeWireCapture::start()
{
MainloopLockGuard _{mLoop};
MainloopUniqueLock plock{mLoop};
if(int res{pw_stream_set_active(mStream.get(), true)})
throw al::backend_exception{al::backend_error::DeviceError,
"Failed to start PipeWire stream (res: %d)", res};
pw_stream_state state{};
const char *error{};
while((state=pw_stream_get_state(mStream.get(), &error)) == PW_STREAM_STATE_PAUSED)
mLoop.wait();
plock.wait([stream=mStream.get(),&state,&error]()
{
state = pw_stream_get_state(stream, &error);
return state != PW_STREAM_STATE_PAUSED;
});
if(state == PW_STREAM_STATE_ERROR)
throw al::backend_exception{al::backend_error::DeviceError,
@ -1737,13 +1770,13 @@ void PipeWireCapture::start()
void PipeWireCapture::stop()
{
MainloopLockGuard _{mLoop};
MainloopUniqueLock plock{mLoop};
if(int res{pw_stream_set_active(mStream.get(), false)})
throw al::backend_exception{al::backend_error::DeviceError,
"Failed to stop PipeWire stream (res: %d)", res};
while(pw_stream_get_state(mStream.get(), nullptr) == PW_STREAM_STATE_STREAMING)
mLoop.wait();
plock.wait([stream=mStream.get()]()
{ return pw_stream_get_state(stream, nullptr) != PW_STREAM_STATE_STREAMING; });
}
uint PipeWireCapture::availableSamples()
@ -1784,8 +1817,8 @@ std::string PipeWireBackendFactory::probe(BackendType type)
{
std::string outnames;
EventWatcherLockGuard _{gEventHandler};
gEventHandler.waitForInit();
EventWatcherLockGuard _{gEventHandler};
auto match_defsink = [](const DeviceNode &n) -> bool
{ return n.mDevName == DefaultSinkDev; };