From 898256d41620878367bc40a2dd66d236e892c99f Mon Sep 17 00:00:00 2001 From: Richard Stanway Date: Thu, 30 Dec 2021 00:10:45 +0100 Subject: [PATCH] obs-ffmpeg: Add a circlebuf to buffer output in ffmpeg-mux This adds a circular buffer to ffmpeg-mux when writing to a file. Output from ffmpeg is buffered so that slow disk I/O does not block ffmpeg writes, as this causes the pipe to become full and OBS stops sending frames with a misleading "Encoding overloaded!" warning. The buffer may grow to 256 MB depending on the rate of data coming in and out, if the buffer is full OBS will start waiting in ffmpeg writes. A separate I/O thread is responsible for processing the contents of the buffer and writing them to the output file. It tries to process 1 MB at a time to minimize small I/O. Complicating things considerably, some formats in ffmpeg require seeking on the output, so we can't just treat everything as a stream of bytes. To handle this, we record offsets of each write and try to buffer as many contiguous writes as possible. This unfortunately makes the code quite complicated, but hopefully well commented. --- plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt | 3 + plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c | 297 ++++++++++++++++++- 2 files changed, 295 insertions(+), 5 deletions(-) diff --git a/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt b/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt index 25da53b93..a86c79dec 100644 --- a/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt +++ b/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt @@ -11,6 +11,9 @@ target_sources(obs-ffmpeg-mux PRIVATE ffmpeg-mux.c ffmpeg-mux.h) target_link_libraries(obs-ffmpeg-mux PRIVATE OBS::libobs FFmpeg::avcodec FFmpeg::avutil FFmpeg::avformat) +if(OS_WINDOWS) + target_link_libraries(obs-ffmpeg-mux PRIVATE OBS::w32-pthreads) +endif() if(ENABLE_FFMPEG_MUX_DEBUG) target_compile_definitions(obs-ffmpeg-mux PRIVATE ENABLE_FFMPEG_MUX_DEBUG) diff --git a/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c b/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c index 359582d64..a484a1aaa 100644 --- a/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c +++ b/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c @@ -26,6 +26,9 @@ #include #include "ffmpeg-mux.h" +#include +#include +#include #include #include #include @@ -42,6 +45,8 @@ #define CODEC_FLAG_GLOBAL_H CODEC_FLAG_GLOBAL_HEADER #endif +#define AVIO_BUFFER_SIZE 65536 + /* ------------------------------------------------------------------------- */ static char *global_stream_key = ""; @@ -117,6 +122,24 @@ struct audio_info { AVCodecContext *ctx; }; +struct io_header { + uint64_t seek_offset; + uint64_t data_length; +}; + +struct io_buffer { + bool active; + bool shutdown_requested; + bool output_error; + os_event_t *buffer_space_available_event; + os_event_t *new_data_available_event; + pthread_t io_thread; + pthread_mutex_t data_mutex; + FILE *output_file; + struct circlebuf data; + uint64_t next_pos; +}; + struct ffmpeg_mux { AVFormatContext *output; AVStream *video_stream; @@ -129,7 +152,7 @@ struct ffmpeg_mux { struct header *audio_header; int num_audio_streams; bool initialized; - char error[4096]; + struct io_buffer io; }; static void header_free(struct header *header) @@ -167,6 +190,29 @@ static void ffmpeg_mux_free(struct ffmpeg_mux *ffm) av_write_trailer(ffm->output); } + // If we're writing to a file with the circlebuf, shut it + // down gracefully + if (ffm->io.active) { + os_atomic_set_bool(&ffm->io.shutdown_requested, true); + + // Wakes up the I/O thread and waits for it to finish + pthread_mutex_lock(&ffm->io.data_mutex); + os_event_signal(ffm->io.new_data_available_event); + pthread_mutex_unlock(&ffm->io.data_mutex); + pthread_join(ffm->io.io_thread, NULL); + + // Cleanup everything else + av_free(ffm->output->pb->buffer); + avio_context_free(&ffm->output->pb); + + os_event_destroy(ffm->io.new_data_available_event); + os_event_destroy(ffm->io.buffer_space_available_event); + + pthread_mutex_destroy(&ffm->io.data_mutex); + + circlebuf_free(&ffm->io.data); + } + free_avformat(ffm); header_free(&ffm->video_header); @@ -612,6 +658,219 @@ static inline bool ffmpeg_mux_get_extra_data(struct ffmpeg_mux *ffm) #pragma warning(disable : 4996) #endif +#define CHUNK_SIZE 1048576 + +static void *ffmpeg_mux_io_thread(void *data) +{ + struct ffmpeg_mux *ffm = data; + + // Chunk collects the writes into a larger batch + size_t chunk_used = 0; + + unsigned char *chunk = malloc(CHUNK_SIZE); + if (!chunk) { + os_atomic_set_bool(&ffm->io.output_error, true); + fprintf(stderr, "Error allocating memory for output\n"); + goto error; + } + + bool shutting_down; + bool want_seek = false; + bool force_flush_chunk = false; + + // current_seek_position is a virtual position updated as we read from + // the buffer, if it becomes discontinuous due to a seek request from + // ffmpeg, then we flush the chunk. next_seek_position is the actual + // offset we should seek to when we write the chunk. + uint64_t current_seek_position = 0; + uint64_t next_seek_position; + + for (;;) { + // Wait for ffmpeg to write data to the buffer + os_event_wait(ffm->io.new_data_available_event); + + // Loop to write in chunk_size chunks + for (;;) { + shutting_down = os_atomic_load_bool( + &ffm->io.shutdown_requested); + + pthread_mutex_lock(&ffm->io.data_mutex); + + // Fetch as many writes as possible from the circlebuf + // and fill up our local chunk. This may involve seeking + // if ffmpeg needs to, so take care of that as well. + for (;;) { + size_t available = ffm->io.data.size; + + // Buffer is empty (now) or was already empty (we got + // woken up to exit) + if (!available) + break; + + // Get seek offset and data size + struct io_header header; + circlebuf_peek_front(&ffm->io.data, &header, + sizeof(header)); + + // Do we need to seek? + if (header.seek_offset != + current_seek_position) { + + // If there's already part of a chunk pending, + // flush it at the current offset. Similarly, + // if we already plan to seek, then seek. + if (chunk_used || want_seek) { + force_flush_chunk = true; + break; + } + + // Mark that we need to seek and where to + want_seek = true; + next_seek_position = header.seek_offset; + + // Update our virtual position + current_seek_position = + header.seek_offset; + } + + // Make sure there's enough room for the data, if + // not then force a flush + if (header.data_length + chunk_used > + CHUNK_SIZE) { + force_flush_chunk = true; + break; + } + + // Remove header that we already read + circlebuf_pop_front(&ffm->io.data, NULL, + sizeof(header)); + + // Copy from the buffer to our local chunk + circlebuf_pop_front(&ffm->io.data, + chunk + chunk_used, + header.data_length); + + // Update offsets + chunk_used += header.data_length; + current_seek_position += header.data_length; + } + + // Signal that there is more room in the buffer + os_event_signal(ffm->io.buffer_space_available_event); + + // Try to avoid lots of small writes unless this was the final + // data left in the buffer. The buffer might be entirely empty + // if we were woken up to exit. + if (!force_flush_chunk && + (!chunk_used || + (chunk_used < 65536 && !shutting_down))) { + os_event_reset( + ffm->io.new_data_available_event); + pthread_mutex_unlock(&ffm->io.data_mutex); + break; + } + + pthread_mutex_unlock(&ffm->io.data_mutex); + + // Seek if we need to + if (want_seek) { + os_fseeki64(ffm->io.output_file, + next_seek_position, SEEK_SET); + current_seek_position = next_seek_position; + want_seek = false; + } + + // Write the current chunk to the output file + if (fwrite(chunk, chunk_used, 1, ffm->io.output_file) != + 1) { + os_atomic_set_bool(&ffm->io.output_error, true); + fprintf(stderr, "Error writing to '%s', %s\n", + ffm->params.printable_file.array, + strerror(errno)); + goto error; + } + + chunk_used = 0; + force_flush_chunk = false; + } + + // If this was the last chunk, time to exit + if (shutting_down) + break; + } + +error: + if (chunk) + free(chunk); + + fclose(ffm->io.output_file); + return NULL; +} + +static int64_t ffmpeg_mux_seek_av_buffer(void *opaque, int64_t offset, + int whence) +{ + struct ffmpeg_mux *ffm = opaque; + + // If the output thread failed, signal that back up the stack + if (os_atomic_load_bool(&ffm->io.output_error)) + return -1; + + // Update where the next write should go + pthread_mutex_lock(&ffm->io.data_mutex); + if (whence == SEEK_SET) + ffm->io.next_pos = offset; + else if (whence == SEEK_CUR) + ffm->io.next_pos += offset; + pthread_mutex_unlock(&ffm->io.data_mutex); + + return 0; +} + +static int ffmpeg_mux_write_av_buffer(void *opaque, uint8_t *buf, int buf_size) +{ + struct ffmpeg_mux *ffm = opaque; + + // If the output thread failed, signal that back up the stack + if (os_atomic_load_bool(&ffm->io.output_error)) + return -1; + + for (;;) { + pthread_mutex_lock(&ffm->io.data_mutex); + + // Avoid unbounded growth of the circlebuf, cap to 256 MB + if (ffm->io.data.capacity >= 256 * 1048576 && + ffm->io.data.capacity - ffm->io.data.size < + buf_size + sizeof(struct io_header)) { + // No space, wait for the I/O thread to make space + os_event_reset(ffm->io.buffer_space_available_event); + pthread_mutex_unlock(&ffm->io.data_mutex); + os_event_wait(ffm->io.buffer_space_available_event); + } else { + break; + } + } + + struct io_header header; + + header.data_length = buf_size; + header.seek_offset = ffm->io.next_pos; + + // Copy the data into the buffer + circlebuf_push_back(&ffm->io.data, &header, sizeof(header)); + circlebuf_push_back(&ffm->io.data, buf, buf_size); + + // Advance the next write position + ffm->io.next_pos += buf_size; + + // Tell the I/O thread that there's new data to be written + os_event_signal(ffm->io.new_data_available_event); + + pthread_mutex_unlock(&ffm->io.data_mutex); + + return buf_size; +} + static inline int open_output_file(struct ffmpeg_mux *ffm) { #if LIBAVFORMAT_VERSION_INT < AV_VERSION_INT(59, 0, 100) @@ -622,14 +881,42 @@ static inline int open_output_file(struct ffmpeg_mux *ffm) int ret; if ((format->flags & AVFMT_NOFILE) == 0) { - ret = avio_open(&ffm->output->pb, ffm->params.file, - AVIO_FLAG_WRITE); - if (ret < 0) { + // If not outputting to a network, write to a circlebuf + // instead of relying on ffmpeg disk output. This hopefully + // works around too small buffers somewhere causing output + // stalls when recording. + + // We're in charge of managing the actual file now + ffm->io.output_file = os_fopen(ffm->params.file, "wb"); + if (!ffm->io.output_file) { fprintf(stderr, "Couldn't open '%s', %s\n", ffm->params.printable_file.array, - av_err2str(ret)); + strerror(errno)); return FFM_ERROR; } + + // Start at 1MB, this can grow up to 256 MB depending + // how fast data is going in and out (limited in + // ffmpeg_mux_write_av_buffer) + circlebuf_reserve(&ffm->io.data, 1048576); + + pthread_mutex_init(&ffm->io.data_mutex, NULL); + + os_event_init(&ffm->io.buffer_space_available_event, + OS_EVENT_TYPE_AUTO); + os_event_init(&ffm->io.new_data_available_event, + OS_EVENT_TYPE_AUTO); + + pthread_create(&ffm->io.io_thread, NULL, ffmpeg_mux_io_thread, + ffm); + + unsigned char *avio_ctx_buffer = av_malloc(AVIO_BUFFER_SIZE); + + ffm->output->pb = avio_alloc_context( + avio_ctx_buffer, AVIO_BUFFER_SIZE, 1, ffm, NULL, + ffmpeg_mux_write_av_buffer, ffmpeg_mux_seek_av_buffer); + + ffm->io.active = true; } AVDictionary *dict = NULL;