obs-studio/plugins/obs-ffmpeg/obs-ffmpeg-hls-mux.c
jpark37 3d18643088 obs-ffmpeg: Use new priority parsing for HLS
Parsing priority while serializing/discarding packet data is too gross.
2022-08-01 08:15:39 -07:00

349 lines
8.7 KiB
C

#include "obs-ffmpeg-mux.h"
#include <obs-avc.h>
#ifdef ENABLE_HEVC
#include <obs-hevc.h>
#endif
#define do_log(level, format, ...) \
blog(level, "[ffmpeg hls muxer: '%s'] " format, \
obs_output_get_name(stream->output), ##__VA_ARGS__)
#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
const char *ffmpeg_hls_mux_getname(void *type)
{
UNUSED_PARAMETER(type);
return obs_module_text("FFmpegHlsMuxer");
}
int hls_stream_dropped_frames(void *data)
{
struct ffmpeg_muxer *stream = data;
return stream->dropped_frames;
}
void ffmpeg_hls_mux_destroy(void *data)
{
struct ffmpeg_muxer *stream = data;
if (stream) {
deactivate(stream, 0);
pthread_mutex_destroy(&stream->write_mutex);
os_sem_destroy(stream->write_sem);
os_event_destroy(stream->stop_event);
da_free(stream->mux_packets);
circlebuf_free(&stream->packets);
os_process_pipe_destroy(stream->pipe);
dstr_free(&stream->path);
dstr_free(&stream->printable_path);
dstr_free(&stream->stream_key);
dstr_free(&stream->muxer_settings);
bfree(data);
}
}
void *ffmpeg_hls_mux_create(obs_data_t *settings, obs_output_t *output)
{
struct ffmpeg_muxer *stream = bzalloc(sizeof(*stream));
pthread_mutex_init_value(&stream->write_mutex);
stream->output = output;
/* init mutex, semaphore and event */
if (pthread_mutex_init(&stream->write_mutex, NULL) != 0)
goto fail;
if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_AUTO) != 0)
goto fail;
if (os_sem_init(&stream->write_sem, 0) != 0)
goto fail;
UNUSED_PARAMETER(settings);
return stream;
fail:
ffmpeg_hls_mux_destroy(stream);
return NULL;
}
static bool process_packet(struct ffmpeg_muxer *stream)
{
struct encoder_packet packet;
bool has_packet = false;
bool ret = true;
pthread_mutex_lock(&stream->write_mutex);
if (stream->packets.size) {
circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
has_packet = true;
}
pthread_mutex_unlock(&stream->write_mutex);
if (has_packet) {
ret = write_packet(stream, &packet);
obs_encoder_packet_release(&packet);
}
return ret;
}
static void *write_thread(void *data)
{
struct ffmpeg_muxer *stream = data;
while (os_sem_wait(stream->write_sem) == 0) {
if (os_event_try(stream->stop_event) == 0)
return NULL;
if (!process_packet(stream))
break;
}
obs_output_signal_stop(stream->output, OBS_OUTPUT_ERROR);
deactivate(stream, 0);
return NULL;
}
bool ffmpeg_hls_mux_start(void *data)
{
struct ffmpeg_muxer *stream = data;
obs_service_t *service;
const char *path_str;
const char *stream_key;
struct dstr path = {0};
obs_encoder_t *vencoder;
obs_data_t *settings;
int keyint_sec;
if (!obs_output_can_begin_data_capture(stream->output, 0))
return false;
if (!obs_output_initialize_encoders(stream->output, 0))
return false;
service = obs_output_get_service(stream->output);
if (!service)
return false;
path_str = obs_service_get_url(service);
stream_key = obs_service_get_key(service);
dstr_copy(&stream->stream_key, stream_key);
dstr_copy(&path, path_str);
dstr_replace(&path, "{stream_key}", stream_key);
dstr_copy(&stream->muxer_settings,
"method=PUT http_persistent=1 ignore_io_errors=1 ");
dstr_catf(&stream->muxer_settings, "http_user_agent=libobs/%s",
OBS_VERSION);
vencoder = obs_output_get_video_encoder(stream->output);
settings = obs_encoder_get_settings(vencoder);
keyint_sec = (int)obs_data_get_int(settings, "keyint_sec");
if (keyint_sec) {
dstr_catf(&stream->muxer_settings, " hls_time=%d", keyint_sec);
stream->keyint_sec = keyint_sec;
}
obs_data_release(settings);
start_pipe(stream, path.array);
dstr_free(&path);
if (!stream->pipe) {
obs_output_set_last_error(
stream->output, obs_module_text("HelperProcessFailed"));
warn("Failed to create process pipe");
return false;
}
stream->mux_thread_joinable = pthread_create(&stream->mux_thread, NULL,
write_thread, stream) == 0;
if (!stream->mux_thread_joinable)
return false;
/* write headers and start capture */
os_atomic_set_bool(&stream->active, true);
os_atomic_set_bool(&stream->capturing, true);
stream->is_hls = true;
stream->total_bytes = 0;
stream->dropped_frames = 0;
stream->min_priority = 0;
obs_output_begin_data_capture(stream->output, 0);
dstr_copy(&stream->printable_path, path_str);
info("Writing to path '%s'...", stream->printable_path.array);
return true;
}
static bool write_packet_to_buf(struct ffmpeg_muxer *stream,
struct encoder_packet *packet)
{
circlebuf_push_back(&stream->packets, packet,
sizeof(struct encoder_packet));
return true;
}
static void drop_frames(struct ffmpeg_muxer *stream, int highest_priority)
{
struct circlebuf new_buf = {0};
int num_frames_dropped = 0;
circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8);
while (stream->packets.size) {
struct encoder_packet packet;
circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
/* do not drop audio data or video keyframes */
if (packet.type == OBS_ENCODER_AUDIO ||
packet.drop_priority >= highest_priority) {
circlebuf_push_back(&new_buf, &packet, sizeof(packet));
} else {
num_frames_dropped++;
obs_encoder_packet_release(&packet);
}
}
circlebuf_free(&stream->packets);
stream->packets = new_buf;
if (stream->min_priority < highest_priority)
stream->min_priority = highest_priority;
stream->dropped_frames += num_frames_dropped;
}
static bool find_first_video_packet(struct ffmpeg_muxer *stream,
struct encoder_packet *first)
{
size_t count = stream->packets.size / sizeof(*first);
for (size_t i = 0; i < count; i++) {
struct encoder_packet *cur =
circlebuf_data(&stream->packets, i * sizeof(*first));
if (cur->type == OBS_ENCODER_VIDEO && !cur->keyframe) {
*first = *cur;
return true;
}
}
return false;
}
void check_to_drop_frames(struct ffmpeg_muxer *stream, bool pframes)
{
struct encoder_packet first;
int64_t buffer_duration_usec;
int priority = pframes ? OBS_NAL_PRIORITY_HIGHEST
: OBS_NAL_PRIORITY_HIGH;
int keyint_sec = stream->keyint_sec;
int64_t drop_threshold_sec = keyint_sec ? 2 * keyint_sec : 10;
if (!find_first_video_packet(stream, &first))
return;
buffer_duration_usec = stream->last_dts_usec - first.dts_usec;
if (buffer_duration_usec > drop_threshold_sec * 1000000)
drop_frames(stream, priority);
}
static bool add_video_packet(struct ffmpeg_muxer *stream,
struct encoder_packet *packet)
{
check_to_drop_frames(stream, false);
check_to_drop_frames(stream, true);
/* if currently dropping frames, drop packets until it reaches the
* desired priority */
if (packet->drop_priority < stream->min_priority) {
stream->dropped_frames++;
return false;
} else {
stream->min_priority = 0;
}
stream->last_dts_usec = packet->dts_usec;
return write_packet_to_buf(stream, packet);
}
void ffmpeg_hls_mux_data(void *data, struct encoder_packet *packet)
{
struct ffmpeg_muxer *stream = data;
struct encoder_packet new_packet;
bool added_packet = false;
if (!active(stream))
return;
/* encoder failure */
if (!packet) {
deactivate(stream, OBS_OUTPUT_ENCODE_ERROR);
return;
}
if (!stream->sent_headers) {
if (!send_headers(stream))
return;
stream->sent_headers = true;
}
if (stopping(stream)) {
if (packet->sys_dts_usec >= stream->stop_ts) {
deactivate(stream, 0);
return;
}
}
if (packet->type == OBS_ENCODER_VIDEO) {
const char *const codec =
obs_encoder_get_codec(packet->encoder);
if (strcmp(codec, "h264") == 0) {
packet->drop_priority =
obs_parse_avc_packet_priority(packet);
}
#ifdef ENABLE_HEVC
else if (strcmp(codec, "hevc") == 0) {
packet->drop_priority =
obs_parse_hevc_packet_priority(packet);
}
#endif
}
obs_encoder_packet_ref(&new_packet, packet);
pthread_mutex_lock(&stream->write_mutex);
if (active(stream)) {
added_packet =
(packet->type == OBS_ENCODER_VIDEO)
? add_video_packet(stream, &new_packet)
: write_packet_to_buf(stream, &new_packet);
}
pthread_mutex_unlock(&stream->write_mutex);
if (added_packet)
os_sem_post(stream->write_sem);
else
obs_encoder_packet_release(&new_packet);
}
struct obs_output_info ffmpeg_hls_muxer = {
.id = "ffmpeg_hls_muxer",
.flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK |
OBS_OUTPUT_SERVICE,
#ifdef ENABLE_HEVC
.encoded_video_codecs = "h264;hevc",
#else
.encoded_video_codecs = "h264",
#endif
.encoded_audio_codecs = "aac",
.get_name = ffmpeg_hls_mux_getname,
.create = ffmpeg_hls_mux_create,
.destroy = ffmpeg_hls_mux_destroy,
.start = ffmpeg_hls_mux_start,
.stop = ffmpeg_mux_stop,
.encoded_packet = ffmpeg_hls_mux_data,
.get_total_bytes = ffmpeg_mux_total_bytes,
.get_dropped_frames = hls_stream_dropped_frames,
};