libobs: Implement deferred destruction of sources

(This also modifies the UI)

The purpose of deferring destruction of sources is to ensure that:
1.) Hard locks from enumeration cannot occur with source destruction.
  For example, if the browser source is destroyed while in the graphics
  thread, the browser thread would wait for the graphics thread, but the
  graphics thread would still be waiting for the browser thread, causing
  a hard lock.
2.) When destroys occur during source enumeration, that the integrity of
  the context's next pointer in the linked list can no longer be
  compromised
3.) Source releases are fully asynchronous rather than having the risk
  of stalling the calling thread
4.) We can wait for source destruction when switching scene collections
  or when shutting down rather than hoping for threads to be finished
  with sources.

This introduces a new requirement when cleaning up scene/source data:
the obs_wait_for_destroy_queue() function. It is highly recommended that
this function be called after cleaning up sources. It will return true
if at least one or more sources were destroyed. Otherwise it will return
false. Forks are highly advised to call this function manually on source
cleanup -- preferably in a loop, in conjunction with processing
outstanding OBS signals and UI events.
master
jp9000 2021-12-19 09:55:51 -08:00
parent ab84214356
commit 8b3416c1e7
6 changed files with 99 additions and 33 deletions

View File

@ -4567,6 +4567,10 @@ void OBSBasic::ClearSceneData()
* that deleteLater events are processed at this point */
QApplication::sendPostedEvents(nullptr, QEvent::DeferredDelete);
do {
QApplication::sendPostedEvents(nullptr);
} while (obs_wait_for_destroy_queue());
disableSaving--;
blog(LOG_INFO, "All scene data cleared");

View File

@ -24,6 +24,7 @@
#include "util/threading.h"
#include "util/platform.h"
#include "util/profiler.h"
#include "util/task.h"
#include "callback/signal.h"
#include "callback/proc.h"
@ -363,6 +364,8 @@ struct obs_core_data {
DARRAY(struct draw_callback) draw_callbacks;
DARRAY(struct tick_callback) tick_callbacks;
os_task_queue_t *destruction_task_thread;
struct obs_view main_view;
long long unnamed_index;
@ -512,6 +515,7 @@ extern void obs_context_data_free(struct obs_context_data *context);
extern void obs_context_data_insert(struct obs_context_data *context,
pthread_mutex_t *mutex, void *first);
extern void obs_context_data_remove(struct obs_context_data *context);
extern void obs_context_wait(struct obs_context_data *context);
extern void obs_context_data_setname(struct obs_context_data *context,
const char *name);

View File

@ -607,11 +607,10 @@ static inline void obs_source_frame_decref(struct obs_source_frame *frame)
static bool obs_source_filter_remove_refless(obs_source_t *source,
obs_source_t *filter);
static void obs_source_destroy_defer(struct obs_source *source);
void obs_source_destroy(struct obs_source *source)
{
size_t i;
if (!obs_source_valid(source, "obs_source_destroy"))
return;
@ -635,16 +634,29 @@ void obs_source_destroy(struct obs_source *source)
obs_context_data_remove(&source->context);
blog(LOG_DEBUG, "%ssource '%s' destroyed",
source->context.private ? "private " : "", source->context.name);
/* defer source destroy */
os_task_queue_queue_task(obs->data.destruction_task_thread,
(os_task_t)obs_source_destroy_defer, source);
}
static void obs_source_destroy_defer(struct obs_source *source)
{
size_t i;
obs_source_dosignal(source, "source_destroy", "destroy");
/* prevents the destruction of sources if destroy triggered inside of
* a video tick call */
obs_context_wait(&source->context);
if (source->context.data) {
source->info.destroy(source->context.data);
source->context.data = NULL;
}
blog(LOG_DEBUG, "%ssource '%s' destroyed",
source->context.private ? "private " : "", source->context.name);
audio_monitor_destroy(source->monitor);
obs_hotkey_unregister(source->push_to_talk_key);

View File

@ -63,15 +63,14 @@ static uint64_t tick_sources(uint64_t cur_time, uint64_t last_time)
source = data->first_source;
while (source) {
struct obs_source *cur_source = obs_source_get_ref(source);
obs_source_t *s = obs_source_get_ref(source);
if (cur_source)
obs_source_video_tick(cur_source, seconds);
if (s) {
obs_source_video_tick(s, seconds);
obs_source_release(s);
}
source = (struct obs_source *)source->context.next;
if (cur_source)
obs_source_release(cur_source);
}
pthread_mutex_unlock(&data->sources_mutex);

View File

@ -648,6 +648,11 @@ static bool obs_init_data(void)
goto fail;
if (pthread_mutex_init_recursive(&obs->data.draw_callbacks_mutex) != 0)
goto fail;
data->destruction_task_thread = os_task_queue_create();
if (!data->destruction_task_thread)
goto fail;
if (!obs_view_init(&data->main_view))
goto fail;
@ -705,6 +710,7 @@ static void obs_free_data(void)
pthread_mutex_destroy(&data->encoders_mutex);
pthread_mutex_destroy(&data->services_mutex);
pthread_mutex_destroy(&data->draw_callbacks_mutex);
os_task_queue_destroy(data->destruction_task_thread);
da_free(data->draw_callbacks);
da_free(data->tick_callbacks);
obs_data_release(data->private_data);
@ -993,6 +999,8 @@ void obs_shutdown(void)
{
struct obs_module *module;
obs_wait_for_destroy_queue();
for (size_t i = 0; i < obs->source_types.num; i++) {
struct obs_source_info *item = &obs->source_types.array[i];
if (item->type_data && item->free_type_data)
@ -1463,19 +1471,22 @@ void obs_enum_sources(bool (*enum_proc)(void *, obs_source_t *), void *param)
source = obs->data.first_source;
while (source) {
obs_source_t *next_source =
(obs_source_t *)source->context.next;
if (strcmp(source->info.id, group_info.id) == 0 &&
!enum_proc(param, source)) {
break;
} else if (source->info.type == OBS_SOURCE_TYPE_INPUT &&
!source->context.private &&
!enum_proc(param, source)) {
break;
obs_source_t *s = obs_source_get_ref(source);
if (s) {
if (strcmp(s->info.id, group_info.id) == 0 &&
!enum_proc(param, s)) {
obs_source_release(s);
break;
} else if (s->info.type == OBS_SOURCE_TYPE_INPUT &&
!s->context.private &&
!enum_proc(param, s)) {
obs_source_release(s);
break;
}
obs_source_release(s);
}
source = next_source;
source = (obs_source_t *)source->context.next;
}
pthread_mutex_unlock(&obs->data.sources_mutex);
@ -1489,15 +1500,17 @@ void obs_enum_scenes(bool (*enum_proc)(void *, obs_source_t *), void *param)
source = obs->data.first_source;
while (source) {
obs_source_t *next_source =
(obs_source_t *)source->context.next;
if (source->info.type == OBS_SOURCE_TYPE_SCENE &&
!source->context.private && !enum_proc(param, source)) {
break;
obs_source_t *s = obs_source_get_ref(source);
if (s) {
if (source->info.type == OBS_SOURCE_TYPE_SCENE &&
!source->context.private && !enum_proc(param, s)) {
obs_source_release(s);
break;
}
obs_source_release(s);
}
source = next_source;
source = (obs_source_t *)source->context.next;
}
pthread_mutex_unlock(&obs->data.sources_mutex);
@ -2173,18 +2186,22 @@ void obs_context_data_insert(struct obs_context_data *context,
void obs_context_data_remove(struct obs_context_data *context)
{
if (context && context->mutex) {
if (context && context->prev_next) {
pthread_mutex_lock(context->mutex);
if (context->prev_next)
*context->prev_next = context->next;
*context->prev_next = context->next;
if (context->next)
context->next->prev_next = context->prev_next;
context->prev_next = NULL;
pthread_mutex_unlock(context->mutex);
context->mutex = NULL;
}
}
void obs_context_wait(struct obs_context_data *context)
{
pthread_mutex_lock(context->mutex);
pthread_mutex_unlock(context->mutex);
}
void obs_context_data_setname(struct obs_context_data *context,
const char *name)
{
@ -2545,6 +2562,8 @@ bool obs_in_task_thread(enum obs_task_type type)
return is_audio_thread;
else if (type == OBS_TASK_UI)
return is_ui_thread;
else if (type == OBS_TASK_DESTROY)
return os_task_queue_inside(obs->data.destruction_task_thread);
assert(false);
return false;
@ -2590,10 +2609,35 @@ void obs_queue_task(enum obs_task_type type, obs_task_t task, void *param,
pthread_mutex_lock(&audio->task_mutex);
circlebuf_push_back(&audio->tasks, &info, sizeof(info));
pthread_mutex_unlock(&audio->task_mutex);
} else if (type == OBS_TASK_DESTROY) {
os_task_t os_task = (os_task_t)task;
os_task_queue_queue_task(
obs->data.destruction_task_thread, os_task,
param);
}
}
}
bool obs_wait_for_destroy_queue(void)
{
struct task_wait_info info = {0};
if (!obs->video.thread_initialized || !obs->audio.audio)
return false;
/* allow video and audio threads time to release objects */
os_event_init(&info.event, OS_EVENT_TYPE_AUTO);
obs_queue_task(OBS_TASK_GRAPHICS, task_wait_callback, &info, false);
os_event_wait(info.event);
obs_queue_task(OBS_TASK_AUDIO, task_wait_callback, &info, false);
os_event_wait(info.event);
os_event_destroy(info.event);
/* wait for destroy task queue */
return os_task_queue_wait(obs->data.destruction_task_thread);
}
static void set_ui_thread(void *unused)
{
is_ui_thread = true;

View File

@ -797,12 +797,15 @@ enum obs_task_type {
OBS_TASK_UI,
OBS_TASK_GRAPHICS,
OBS_TASK_AUDIO,
OBS_TASK_DESTROY,
};
EXPORT void obs_queue_task(enum obs_task_type type, obs_task_t task,
void *param, bool wait);
EXPORT bool obs_in_task_thread(enum obs_task_type type);
EXPORT bool obs_wait_for_destroy_queue(void);
typedef void (*obs_task_handler_t)(obs_task_t task, void *param, bool wait);
EXPORT void obs_set_ui_task_handler(obs_task_handler_t handler);