diff --git a/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt b/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt index 25da53b93..a86c79dec 100644 --- a/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt +++ b/plugins/obs-ffmpeg/ffmpeg-mux/CMakeLists.txt @@ -11,6 +11,9 @@ target_sources(obs-ffmpeg-mux PRIVATE ffmpeg-mux.c ffmpeg-mux.h) target_link_libraries(obs-ffmpeg-mux PRIVATE OBS::libobs FFmpeg::avcodec FFmpeg::avutil FFmpeg::avformat) +if(OS_WINDOWS) + target_link_libraries(obs-ffmpeg-mux PRIVATE OBS::w32-pthreads) +endif() if(ENABLE_FFMPEG_MUX_DEBUG) target_compile_definitions(obs-ffmpeg-mux PRIVATE ENABLE_FFMPEG_MUX_DEBUG) diff --git a/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c b/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c index 359582d64..a484a1aaa 100644 --- a/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c +++ b/plugins/obs-ffmpeg/ffmpeg-mux/ffmpeg-mux.c @@ -26,6 +26,9 @@ #include #include "ffmpeg-mux.h" +#include +#include +#include #include #include #include @@ -42,6 +45,8 @@ #define CODEC_FLAG_GLOBAL_H CODEC_FLAG_GLOBAL_HEADER #endif +#define AVIO_BUFFER_SIZE 65536 + /* ------------------------------------------------------------------------- */ static char *global_stream_key = ""; @@ -117,6 +122,24 @@ struct audio_info { AVCodecContext *ctx; }; +struct io_header { + uint64_t seek_offset; + uint64_t data_length; +}; + +struct io_buffer { + bool active; + bool shutdown_requested; + bool output_error; + os_event_t *buffer_space_available_event; + os_event_t *new_data_available_event; + pthread_t io_thread; + pthread_mutex_t data_mutex; + FILE *output_file; + struct circlebuf data; + uint64_t next_pos; +}; + struct ffmpeg_mux { AVFormatContext *output; AVStream *video_stream; @@ -129,7 +152,7 @@ struct ffmpeg_mux { struct header *audio_header; int num_audio_streams; bool initialized; - char error[4096]; + struct io_buffer io; }; static void header_free(struct header *header) @@ -167,6 +190,29 @@ static void ffmpeg_mux_free(struct ffmpeg_mux *ffm) av_write_trailer(ffm->output); } + // If we're writing to a file with the circlebuf, shut it + // down gracefully + if (ffm->io.active) { + os_atomic_set_bool(&ffm->io.shutdown_requested, true); + + // Wakes up the I/O thread and waits for it to finish + pthread_mutex_lock(&ffm->io.data_mutex); + os_event_signal(ffm->io.new_data_available_event); + pthread_mutex_unlock(&ffm->io.data_mutex); + pthread_join(ffm->io.io_thread, NULL); + + // Cleanup everything else + av_free(ffm->output->pb->buffer); + avio_context_free(&ffm->output->pb); + + os_event_destroy(ffm->io.new_data_available_event); + os_event_destroy(ffm->io.buffer_space_available_event); + + pthread_mutex_destroy(&ffm->io.data_mutex); + + circlebuf_free(&ffm->io.data); + } + free_avformat(ffm); header_free(&ffm->video_header); @@ -612,6 +658,219 @@ static inline bool ffmpeg_mux_get_extra_data(struct ffmpeg_mux *ffm) #pragma warning(disable : 4996) #endif +#define CHUNK_SIZE 1048576 + +static void *ffmpeg_mux_io_thread(void *data) +{ + struct ffmpeg_mux *ffm = data; + + // Chunk collects the writes into a larger batch + size_t chunk_used = 0; + + unsigned char *chunk = malloc(CHUNK_SIZE); + if (!chunk) { + os_atomic_set_bool(&ffm->io.output_error, true); + fprintf(stderr, "Error allocating memory for output\n"); + goto error; + } + + bool shutting_down; + bool want_seek = false; + bool force_flush_chunk = false; + + // current_seek_position is a virtual position updated as we read from + // the buffer, if it becomes discontinuous due to a seek request from + // ffmpeg, then we flush the chunk. next_seek_position is the actual + // offset we should seek to when we write the chunk. + uint64_t current_seek_position = 0; + uint64_t next_seek_position; + + for (;;) { + // Wait for ffmpeg to write data to the buffer + os_event_wait(ffm->io.new_data_available_event); + + // Loop to write in chunk_size chunks + for (;;) { + shutting_down = os_atomic_load_bool( + &ffm->io.shutdown_requested); + + pthread_mutex_lock(&ffm->io.data_mutex); + + // Fetch as many writes as possible from the circlebuf + // and fill up our local chunk. This may involve seeking + // if ffmpeg needs to, so take care of that as well. + for (;;) { + size_t available = ffm->io.data.size; + + // Buffer is empty (now) or was already empty (we got + // woken up to exit) + if (!available) + break; + + // Get seek offset and data size + struct io_header header; + circlebuf_peek_front(&ffm->io.data, &header, + sizeof(header)); + + // Do we need to seek? + if (header.seek_offset != + current_seek_position) { + + // If there's already part of a chunk pending, + // flush it at the current offset. Similarly, + // if we already plan to seek, then seek. + if (chunk_used || want_seek) { + force_flush_chunk = true; + break; + } + + // Mark that we need to seek and where to + want_seek = true; + next_seek_position = header.seek_offset; + + // Update our virtual position + current_seek_position = + header.seek_offset; + } + + // Make sure there's enough room for the data, if + // not then force a flush + if (header.data_length + chunk_used > + CHUNK_SIZE) { + force_flush_chunk = true; + break; + } + + // Remove header that we already read + circlebuf_pop_front(&ffm->io.data, NULL, + sizeof(header)); + + // Copy from the buffer to our local chunk + circlebuf_pop_front(&ffm->io.data, + chunk + chunk_used, + header.data_length); + + // Update offsets + chunk_used += header.data_length; + current_seek_position += header.data_length; + } + + // Signal that there is more room in the buffer + os_event_signal(ffm->io.buffer_space_available_event); + + // Try to avoid lots of small writes unless this was the final + // data left in the buffer. The buffer might be entirely empty + // if we were woken up to exit. + if (!force_flush_chunk && + (!chunk_used || + (chunk_used < 65536 && !shutting_down))) { + os_event_reset( + ffm->io.new_data_available_event); + pthread_mutex_unlock(&ffm->io.data_mutex); + break; + } + + pthread_mutex_unlock(&ffm->io.data_mutex); + + // Seek if we need to + if (want_seek) { + os_fseeki64(ffm->io.output_file, + next_seek_position, SEEK_SET); + current_seek_position = next_seek_position; + want_seek = false; + } + + // Write the current chunk to the output file + if (fwrite(chunk, chunk_used, 1, ffm->io.output_file) != + 1) { + os_atomic_set_bool(&ffm->io.output_error, true); + fprintf(stderr, "Error writing to '%s', %s\n", + ffm->params.printable_file.array, + strerror(errno)); + goto error; + } + + chunk_used = 0; + force_flush_chunk = false; + } + + // If this was the last chunk, time to exit + if (shutting_down) + break; + } + +error: + if (chunk) + free(chunk); + + fclose(ffm->io.output_file); + return NULL; +} + +static int64_t ffmpeg_mux_seek_av_buffer(void *opaque, int64_t offset, + int whence) +{ + struct ffmpeg_mux *ffm = opaque; + + // If the output thread failed, signal that back up the stack + if (os_atomic_load_bool(&ffm->io.output_error)) + return -1; + + // Update where the next write should go + pthread_mutex_lock(&ffm->io.data_mutex); + if (whence == SEEK_SET) + ffm->io.next_pos = offset; + else if (whence == SEEK_CUR) + ffm->io.next_pos += offset; + pthread_mutex_unlock(&ffm->io.data_mutex); + + return 0; +} + +static int ffmpeg_mux_write_av_buffer(void *opaque, uint8_t *buf, int buf_size) +{ + struct ffmpeg_mux *ffm = opaque; + + // If the output thread failed, signal that back up the stack + if (os_atomic_load_bool(&ffm->io.output_error)) + return -1; + + for (;;) { + pthread_mutex_lock(&ffm->io.data_mutex); + + // Avoid unbounded growth of the circlebuf, cap to 256 MB + if (ffm->io.data.capacity >= 256 * 1048576 && + ffm->io.data.capacity - ffm->io.data.size < + buf_size + sizeof(struct io_header)) { + // No space, wait for the I/O thread to make space + os_event_reset(ffm->io.buffer_space_available_event); + pthread_mutex_unlock(&ffm->io.data_mutex); + os_event_wait(ffm->io.buffer_space_available_event); + } else { + break; + } + } + + struct io_header header; + + header.data_length = buf_size; + header.seek_offset = ffm->io.next_pos; + + // Copy the data into the buffer + circlebuf_push_back(&ffm->io.data, &header, sizeof(header)); + circlebuf_push_back(&ffm->io.data, buf, buf_size); + + // Advance the next write position + ffm->io.next_pos += buf_size; + + // Tell the I/O thread that there's new data to be written + os_event_signal(ffm->io.new_data_available_event); + + pthread_mutex_unlock(&ffm->io.data_mutex); + + return buf_size; +} + static inline int open_output_file(struct ffmpeg_mux *ffm) { #if LIBAVFORMAT_VERSION_INT < AV_VERSION_INT(59, 0, 100) @@ -622,14 +881,42 @@ static inline int open_output_file(struct ffmpeg_mux *ffm) int ret; if ((format->flags & AVFMT_NOFILE) == 0) { - ret = avio_open(&ffm->output->pb, ffm->params.file, - AVIO_FLAG_WRITE); - if (ret < 0) { + // If not outputting to a network, write to a circlebuf + // instead of relying on ffmpeg disk output. This hopefully + // works around too small buffers somewhere causing output + // stalls when recording. + + // We're in charge of managing the actual file now + ffm->io.output_file = os_fopen(ffm->params.file, "wb"); + if (!ffm->io.output_file) { fprintf(stderr, "Couldn't open '%s', %s\n", ffm->params.printable_file.array, - av_err2str(ret)); + strerror(errno)); return FFM_ERROR; } + + // Start at 1MB, this can grow up to 256 MB depending + // how fast data is going in and out (limited in + // ffmpeg_mux_write_av_buffer) + circlebuf_reserve(&ffm->io.data, 1048576); + + pthread_mutex_init(&ffm->io.data_mutex, NULL); + + os_event_init(&ffm->io.buffer_space_available_event, + OS_EVENT_TYPE_AUTO); + os_event_init(&ffm->io.new_data_available_event, + OS_EVENT_TYPE_AUTO); + + pthread_create(&ffm->io.io_thread, NULL, ffmpeg_mux_io_thread, + ffm); + + unsigned char *avio_ctx_buffer = av_malloc(AVIO_BUFFER_SIZE); + + ffm->output->pb = avio_alloc_context( + avio_ctx_buffer, AVIO_BUFFER_SIZE, 1, ffm, NULL, + ffmpeg_mux_write_av_buffer, ffmpeg_mux_seek_av_buffer); + + ffm->io.active = true; } AVDictionary *dict = NULL;