obs-ffmpeg: Add HLS output

Add and register an obs_output_info struct called
ffmpeg_hls_muxer. It uses ffmpeg's HLS muxer to stream output.

Add threading and buffer to reduce skipped frames.
Also add frame drop logic when there are too many packets in the
buffer for congestion control.
This commit is contained in:
Maya Venkatraman 2020-10-13 13:50:47 -07:00 committed by jp9000
parent c4e71794d9
commit 49d351b537
6 changed files with 385 additions and 7 deletions

View File

@ -30,6 +30,7 @@ set(obs-ffmpeg_SOURCES
obs-ffmpeg-nvenc.c obs-ffmpeg-nvenc.c
obs-ffmpeg-output.c obs-ffmpeg-output.c
obs-ffmpeg-mux.c obs-ffmpeg-mux.c
obs-ffmpeg-hls-mux.c
obs-ffmpeg-source.c) obs-ffmpeg-source.c)
if(UNIX AND NOT APPLE) if(UNIX AND NOT APPLE)

View File

@ -623,29 +623,38 @@ static inline int open_output_file(struct ffmpeg_mux *ffm)
#define SRT_PROTO "srt" #define SRT_PROTO "srt"
#define UDP_PROTO "udp" #define UDP_PROTO "udp"
#define TCP_PROTO "tcp" #define TCP_PROTO "tcp"
#define HTTP_PROTO "http"
static int ffmpeg_mux_init_context(struct ffmpeg_mux *ffm) static int ffmpeg_mux_init_context(struct ffmpeg_mux *ffm)
{ {
AVOutputFormat *output_format; AVOutputFormat *output_format;
int ret; int ret;
bool is_network = false; bool is_network = false;
bool is_http = false;
is_http = (strncmp(ffm->params.file, HTTP_PROTO,
sizeof(HTTP_PROTO) - 1) == 0);
if (strncmp(ffm->params.file, SRT_PROTO, sizeof(SRT_PROTO) - 1) == 0 || if (strncmp(ffm->params.file, SRT_PROTO, sizeof(SRT_PROTO) - 1) == 0 ||
strncmp(ffm->params.file, UDP_PROTO, sizeof(UDP_PROTO) - 1) == 0 || strncmp(ffm->params.file, UDP_PROTO, sizeof(UDP_PROTO) - 1) == 0 ||
strncmp(ffm->params.file, TCP_PROTO, sizeof(TCP_PROTO) - 1) == 0) strncmp(ffm->params.file, TCP_PROTO, sizeof(TCP_PROTO) - 1) == 0 ||
is_http) {
is_network = true; is_network = true;
if (is_network) {
avformat_network_init(); avformat_network_init();
output_format = av_guess_format("mpegts", NULL, "video/M2PT");
} else {
output_format = av_guess_format(NULL, ffm->params.file, NULL);
} }
if (is_network && !is_http)
output_format = av_guess_format("mpegts", NULL, "video/M2PT");
else
output_format = av_guess_format(NULL, ffm->params.file, NULL);
if (output_format == NULL) { if (output_format == NULL) {
fprintf(stderr, "Couldn't find an appropriate muxer for '%s'\n", fprintf(stderr, "Couldn't find an appropriate muxer for '%s'\n",
ffm->params.printable_file.array); ffm->params.printable_file.array);
return FFM_ERROR; return FFM_ERROR;
} }
printf("info: Output format name and long_name: %s, %s\n",
output_format->name ? output_format->name : "unknown",
output_format->long_name ? output_format->long_name : "unknown");
ret = avformat_alloc_output_context2(&ffm->output, output_format, NULL, ret = avformat_alloc_output_context2(&ffm->output, output_format, NULL,
ffm->params.file); ffm->params.file);

View File

@ -0,0 +1,332 @@
#include "obs-ffmpeg-mux.h"
#define do_log(level, format, ...) \
blog(level, "[ffmpeg hls muxer: '%s'] " format, \
obs_output_get_name(stream->output), ##__VA_ARGS__)
#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
const char *ffmpeg_hls_mux_getname(void *type)
{
UNUSED_PARAMETER(type);
return obs_module_text("FFmpegHlsMuxer");
}
int hls_stream_dropped_frames(void *data)
{
struct ffmpeg_muxer *stream = data;
return stream->dropped_frames;
}
void ffmpeg_hls_mux_destroy(void *data)
{
struct ffmpeg_muxer *stream = data;
if (stream) {
deactivate(stream, 0);
pthread_mutex_destroy(&stream->write_mutex);
os_sem_destroy(stream->write_sem);
os_event_destroy(stream->stop_event);
da_free(stream->mux_packets);
circlebuf_free(&stream->packets);
os_process_pipe_destroy(stream->pipe);
dstr_free(&stream->path);
dstr_free(&stream->printable_path);
dstr_free(&stream->stream_key);
dstr_free(&stream->muxer_settings);
bfree(data);
}
}
void *ffmpeg_hls_mux_create(obs_data_t *settings, obs_output_t *output)
{
struct ffmpeg_muxer *stream = bzalloc(sizeof(*stream));
pthread_mutex_init_value(&stream->write_mutex);
stream->output = output;
/* init mutex, semaphore and event */
if (pthread_mutex_init(&stream->write_mutex, NULL) != 0)
goto fail;
if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_AUTO) != 0)
goto fail;
if (os_sem_init(&stream->write_sem, 0) != 0)
goto fail;
UNUSED_PARAMETER(settings);
return stream;
fail:
ffmpeg_hls_mux_destroy(stream);
return NULL;
}
static bool process_packet(struct ffmpeg_muxer *stream)
{
struct encoder_packet packet;
bool has_packet = false;
bool ret = true;
pthread_mutex_lock(&stream->write_mutex);
if (stream->packets.size) {
circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
has_packet = true;
}
pthread_mutex_unlock(&stream->write_mutex);
if (has_packet) {
ret = write_packet(stream, &packet);
obs_encoder_packet_release(&packet);
}
return ret;
}
static void *write_thread(void *data)
{
struct ffmpeg_muxer *stream = data;
while (os_sem_wait(stream->write_sem) == 0) {
if (os_event_try(stream->stop_event) == 0)
return NULL;
if (!process_packet(stream))
break;
}
obs_output_signal_stop(stream->output, OBS_OUTPUT_ERROR);
deactivate(stream, 0);
return NULL;
}
bool ffmpeg_hls_mux_start(void *data)
{
struct ffmpeg_muxer *stream = data;
obs_service_t *service;
const char *path_str;
const char *stream_key;
struct dstr path = {0};
obs_encoder_t *vencoder;
obs_data_t *settings;
int keyint_sec;
if (!obs_output_can_begin_data_capture(stream->output, 0))
return false;
if (!obs_output_initialize_encoders(stream->output, 0))
return false;
service = obs_output_get_service(stream->output);
if (!service)
return false;
path_str = obs_service_get_url(service);
stream_key = obs_service_get_key(service);
dstr_copy(&stream->stream_key, stream_key);
dstr_copy(&path, path_str);
dstr_replace(&path, "{stream_key}", stream_key);
dstr_copy(&stream->muxer_settings,
"method=PUT http_persistent=1 ignore_io_errors=1 ");
dstr_catf(&stream->muxer_settings, "http_user_agent=libobs/%s",
OBS_VERSION);
vencoder = obs_output_get_video_encoder(stream->output);
settings = obs_encoder_get_settings(vencoder);
keyint_sec = obs_data_get_int(settings, "keyint_sec");
if (keyint_sec) {
dstr_catf(&stream->muxer_settings, " hls_time=%d", keyint_sec);
stream->keyint_sec = keyint_sec;
}
obs_data_release(settings);
start_pipe(stream, path.array);
dstr_free(&path);
if (!stream->pipe) {
obs_output_set_last_error(
stream->output, obs_module_text("HelperProcessFailed"));
warn("Failed to create process pipe");
return false;
}
stream->mux_thread_joinable = pthread_create(&stream->mux_thread, NULL,
write_thread, stream) == 0;
if (!stream->mux_thread_joinable)
return false;
/* write headers and start capture */
os_atomic_set_bool(&stream->active, true);
os_atomic_set_bool(&stream->capturing, true);
stream->is_hls = true;
stream->total_bytes = 0;
stream->dropped_frames = 0;
stream->min_priority = 0;
obs_output_begin_data_capture(stream->output, 0);
dstr_copy(&stream->printable_path, path_str);
info("Writing to path '%s'...", stream->printable_path.array);
return true;
}
static bool write_packet_to_buf(struct ffmpeg_muxer *stream,
struct encoder_packet *packet)
{
circlebuf_push_back(&stream->packets, packet,
sizeof(struct encoder_packet));
return true;
}
static void drop_frames(struct ffmpeg_muxer *stream, int highest_priority)
{
struct circlebuf new_buf = {0};
int num_frames_dropped = 0;
circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8);
while (stream->packets.size) {
struct encoder_packet packet;
circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
/* do not drop audio data or video keyframes */
if (packet.type == OBS_ENCODER_AUDIO ||
packet.drop_priority >= highest_priority) {
circlebuf_push_back(&new_buf, &packet, sizeof(packet));
} else {
num_frames_dropped++;
obs_encoder_packet_release(&packet);
}
}
circlebuf_free(&stream->packets);
stream->packets = new_buf;
if (stream->min_priority < highest_priority)
stream->min_priority = highest_priority;
stream->dropped_frames += num_frames_dropped;
}
static bool find_first_video_packet(struct ffmpeg_muxer *stream,
struct encoder_packet *first)
{
size_t count = stream->packets.size / sizeof(*first);
for (size_t i = 0; i < count; i++) {
struct encoder_packet *cur =
circlebuf_data(&stream->packets, i * sizeof(*first));
if (cur->type == OBS_ENCODER_VIDEO && !cur->keyframe) {
*first = *cur;
return true;
}
}
return false;
}
void check_to_drop_frames(struct ffmpeg_muxer *stream, bool pframes)
{
struct encoder_packet first;
int64_t buffer_duration_usec;
int priority = pframes ? OBS_NAL_PRIORITY_HIGHEST
: OBS_NAL_PRIORITY_HIGH;
int keyint_sec = stream->keyint_sec;
int64_t drop_threshold_sec = keyint_sec ? 2 * keyint_sec : 10;
if (!find_first_video_packet(stream, &first))
return;
buffer_duration_usec = stream->last_dts_usec - first.dts_usec;
if (buffer_duration_usec > drop_threshold_sec * 1000000)
drop_frames(stream, priority);
}
static bool add_video_packet(struct ffmpeg_muxer *stream,
struct encoder_packet *packet)
{
check_to_drop_frames(stream, false);
check_to_drop_frames(stream, true);
/* if currently dropping frames, drop packets until it reaches the
* desired priority */
if (packet->drop_priority < stream->min_priority) {
stream->dropped_frames++;
return false;
} else {
stream->min_priority = 0;
}
stream->last_dts_usec = packet->dts_usec;
return write_packet_to_buf(stream, packet);
}
void ffmpeg_hls_mux_data(void *data, struct encoder_packet *packet)
{
struct ffmpeg_muxer *stream = data;
struct encoder_packet new_packet;
struct encoder_packet tmp_packet;
bool added_packet = false;
if (!active(stream))
return;
/* encoder failure */
if (!packet) {
deactivate(stream, OBS_OUTPUT_ENCODE_ERROR);
return;
}
if (!stream->sent_headers) {
if (!send_headers(stream))
return;
stream->sent_headers = true;
}
if (stopping(stream)) {
if (packet->sys_dts_usec >= stream->stop_ts) {
deactivate(stream, 0);
return;
}
}
if (packet->type == OBS_ENCODER_VIDEO) {
obs_parse_avc_packet(&tmp_packet, packet);
packet->drop_priority = tmp_packet.priority;
obs_encoder_packet_release(&tmp_packet);
}
obs_encoder_packet_ref(&new_packet, packet);
pthread_mutex_lock(&stream->write_mutex);
if (active(stream)) {
added_packet =
(packet->type == OBS_ENCODER_VIDEO)
? add_video_packet(stream, &new_packet)
: write_packet_to_buf(stream, &new_packet);
}
pthread_mutex_unlock(&stream->write_mutex);
if (added_packet)
os_sem_post(stream->write_sem);
else
obs_encoder_packet_release(&new_packet);
}
struct obs_output_info ffmpeg_hls_muxer = {
.id = "ffmpeg_hls_muxer",
.flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK |
OBS_OUTPUT_SERVICE,
.encoded_video_codecs = "h264",
.encoded_audio_codecs = "aac",
.get_name = ffmpeg_hls_mux_getname,
.create = ffmpeg_hls_mux_create,
.destroy = ffmpeg_hls_mux_destroy,
.start = ffmpeg_hls_mux_start,
.stop = ffmpeg_mux_stop,
.encoded_packet = ffmpeg_hls_mux_data,
.get_total_bytes = ffmpeg_mux_total_bytes,
.get_dropped_frames = hls_stream_dropped_frames,
};

View File

@ -67,6 +67,7 @@ static void ffmpeg_mux_destroy(void *data)
if (stream->mux_thread_joinable) if (stream->mux_thread_joinable)
pthread_join(stream->mux_thread, NULL); pthread_join(stream->mux_thread, NULL);
da_free(stream->mux_packets); da_free(stream->mux_packets);
circlebuf_free(&stream->packets);
os_process_pipe_destroy(stream->pipe); os_process_pipe_destroy(stream->pipe);
dstr_free(&stream->path); dstr_free(&stream->path);
@ -364,6 +365,15 @@ int deactivate(struct ffmpeg_muxer *stream, int code)
{ {
int ret = -1; int ret = -1;
if (stream->is_hls) {
if (stream->mux_thread_joinable) {
os_event_signal(stream->stop_event);
os_sem_post(stream->write_sem);
pthread_join(stream->mux_thread, NULL);
stream->mux_thread_joinable = false;
}
}
if (active(stream)) { if (active(stream)) {
ret = os_process_pipe_destroy(stream->pipe); ret = os_process_pipe_destroy(stream->pipe);
stream->pipe = NULL; stream->pipe = NULL;
@ -383,6 +393,19 @@ int deactivate(struct ffmpeg_muxer *stream, int code)
obs_output_end_data_capture(stream->output); obs_output_end_data_capture(stream->output);
} }
if (stream->is_hls) {
pthread_mutex_lock(&stream->write_mutex);
while (stream->packets.size) {
struct encoder_packet packet;
circlebuf_pop_front(&stream->packets, &packet,
sizeof(packet));
obs_encoder_packet_release(&packet);
}
pthread_mutex_unlock(&stream->write_mutex);
}
os_atomic_set_bool(&stream->stopping, false); os_atomic_set_bool(&stream->stopping, false);
return ret; return ret;
} }

View File

@ -33,12 +33,23 @@ struct ffmpeg_muxer {
int keyframes; int keyframes;
obs_hotkey_id hotkey; obs_hotkey_id hotkey;
volatile bool muxing; volatile bool muxing;
DARRAY(struct encoder_packet) mux_packets; DARRAY(struct encoder_packet) mux_packets;
/* these are accessed both by replay buffer and by HLS */
pthread_t mux_thread; pthread_t mux_thread;
bool mux_thread_joinable; bool mux_thread_joinable;
struct circlebuf packets; struct circlebuf packets;
/* HLS only */
int keyint_sec;
pthread_mutex_t write_mutex;
os_sem_t *write_sem;
os_event_t *stop_event;
bool is_hls;
int dropped_frames;
int min_priority;
int64_t last_dts_usec;
bool is_network; bool is_network;
}; };

View File

@ -24,6 +24,7 @@ extern struct obs_output_info ffmpeg_output;
extern struct obs_output_info ffmpeg_muxer; extern struct obs_output_info ffmpeg_muxer;
extern struct obs_output_info ffmpeg_mpegts_muxer; extern struct obs_output_info ffmpeg_mpegts_muxer;
extern struct obs_output_info replay_buffer; extern struct obs_output_info replay_buffer;
extern struct obs_output_info ffmpeg_hls_muxer;
extern struct obs_encoder_info aac_encoder_info; extern struct obs_encoder_info aac_encoder_info;
extern struct obs_encoder_info opus_encoder_info; extern struct obs_encoder_info opus_encoder_info;
extern struct obs_encoder_info nvenc_encoder_info; extern struct obs_encoder_info nvenc_encoder_info;
@ -231,6 +232,7 @@ bool obs_module_load(void)
obs_register_output(&ffmpeg_output); obs_register_output(&ffmpeg_output);
obs_register_output(&ffmpeg_muxer); obs_register_output(&ffmpeg_muxer);
obs_register_output(&ffmpeg_mpegts_muxer); obs_register_output(&ffmpeg_mpegts_muxer);
obs_register_output(&ffmpeg_hls_muxer);
obs_register_output(&replay_buffer); obs_register_output(&replay_buffer);
obs_register_encoder(&aac_encoder_info); obs_register_encoder(&aac_encoder_info);
obs_register_encoder(&opus_encoder_info); obs_register_encoder(&opus_encoder_info);