diff --git a/libobs/util/circlebuf.h b/libobs/util/circlebuf.h index 5f95049ae..2bc349ea3 100644 --- a/libobs/util/circlebuf.h +++ b/libobs/util/circlebuf.h @@ -168,7 +168,7 @@ static inline void circlebuf_push_back(struct circlebuf *cb, const void *data, cb->end_pos = new_end_pos; } -static inline void circlebuf_pop_front(struct circlebuf *cb, void *data, +static inline void circlebuf_peek_front(struct circlebuf *cb, void *data, size_t size) { size_t start_size; @@ -186,6 +186,12 @@ static inline void circlebuf_pop_front(struct circlebuf *cb, void *data, memcpy(data, (uint8_t*)cb->data + cb->start_pos, size); } } +} + +static inline void circlebuf_pop_front(struct circlebuf *cb, void *data, + size_t size) +{ + circlebuf_peek_front(cb, data, size); cb->size -= size; cb->start_pos += size; diff --git a/plugins/obs-outputs/rtmp-stream.c b/plugins/obs-outputs/rtmp-stream.c index 63bf56f7e..29740f3bd 100644 --- a/plugins/obs-outputs/rtmp-stream.c +++ b/plugins/obs-outputs/rtmp-stream.c @@ -26,6 +26,7 @@ #include "flv-mux.h" //#define FILE_TEST +//#define TEST_FRAMEDROPS struct rtmp_stream { obs_output_t output; @@ -45,6 +46,13 @@ struct rtmp_stream { struct dstr path, key; struct dstr username, password; + /* frame drop variables */ + int64_t drop_threshold_usec; + int64_t min_drop_dts_usec; + int min_priority; + + int64_t last_dts_usec; + #ifdef FILE_TEST FILE *test; #endif @@ -61,9 +69,10 @@ static const char *rtmp_stream_getname(const char *locale) static void log_rtmp(int level, const char *format, va_list args) { - blogva(LOG_DEBUG, format, args); + if (level > RTMP_LOGERROR) + return; - UNUSED_PARAMETER(level); + blogva(LOG_INFO, format, args); } static inline void free_packets(struct rtmp_stream *stream) @@ -232,8 +241,6 @@ static void *send_thread(void *data) return NULL; } -#define MIN_SENDBUF_SIZE 65535 - static void send_meta_data(struct rtmp_stream *stream) { uint8_t *meta_data; @@ -302,20 +309,34 @@ static inline bool reset_semaphore(struct rtmp_stream *stream) #define socklen_t int #endif +#define MIN_SENDBUF_SIZE 65535 + +static void adjust_sndbuf_size(struct rtmp_stream *stream, int new_size) +{ + int cur_sendbuf_size = new_size; + socklen_t int_size = sizeof(int); + +#ifndef TEST_FRAMEDROPS + getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, + (char*)&cur_sendbuf_size, &int_size); + + if (cur_sendbuf_size < new_size) { + cur_sendbuf_size = new_size; +#else + {cur_sendbuf_size = 1024*8; +#endif + setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, + (const char*)&cur_sendbuf_size, int_size); + } +} + static int init_send(struct rtmp_stream *stream) { - int cur_sendbuf_size = MIN_SENDBUF_SIZE; - socklen_t size = sizeof(int); int ret; - getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, - (char*)&cur_sendbuf_size, &size); - - if (cur_sendbuf_size < MIN_SENDBUF_SIZE) { - cur_sendbuf_size = 65535; - setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, - (const char*)&cur_sendbuf_size, size); - } +#if defined(_WIN32) && !defined(FILE_TEST) + adjust_sndbuf_size(stream, MIN_SENDBUF_SIZE); +#endif reset_semaphore(stream); @@ -334,6 +355,7 @@ static int init_send(struct rtmp_stream *stream) static int try_connect(struct rtmp_stream *stream) { +#ifndef FILE_TEST if (!RTMP_SetupURL2(&stream->rtmp, stream->path.array, stream->key.array)) return OBS_OUTPUT_BAD_PATH; @@ -350,7 +372,6 @@ static int try_connect(struct rtmp_stream *stream) stream->rtmp.m_bSendChunkSizeInfo = true; stream->rtmp.m_bUseNagle = true; -#ifndef FILE_TEST if (!RTMP_Connect(&stream->rtmp, NULL)) return OBS_OUTPUT_CONNECT_FAILED; if (!RTMP_ConnectStream(&stream->rtmp, 0)) @@ -390,26 +411,132 @@ static bool rtmp_stream_start(void *data) dstr_copy(&stream->key, obs_data_getstring(settings, "key")); dstr_copy(&stream->username, obs_data_getstring(settings, "username")); dstr_copy(&stream->password, obs_data_getstring(settings, "password")); + stream->drop_threshold_usec = + (int64_t)obs_data_getint(settings, "drop_threshold"); obs_data_release(settings); return pthread_create(&stream->connect_thread, NULL, connect_thread, stream) == 0; } +static inline bool add_packet(struct rtmp_stream *stream, + struct encoder_packet *packet) +{ + circlebuf_push_back(&stream->packets, packet, + sizeof(struct encoder_packet)); + stream->last_dts_usec = packet->dts_usec; + return true; +} + +static inline size_t num_buffered_packets(struct rtmp_stream *stream) +{ + return stream->packets.size / sizeof(struct encoder_packet); +} + +static void drop_frames(struct rtmp_stream *stream) +{ + struct circlebuf new_buf = {0}; + int drop_priority = 0; + uint64_t last_drop_dts_usec = 0; + + blog(LOG_DEBUG, "Previous packet count: %d", + (int)num_buffered_packets(stream)); + + circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8); + + while (stream->packets.size) { + struct encoder_packet packet; + circlebuf_pop_front(&stream->packets, &packet, sizeof(packet)); + + last_drop_dts_usec = packet.dts_usec; + + if (packet.type == OBS_ENCODER_AUDIO) { + circlebuf_push_back(&new_buf, &packet, sizeof(packet)); + + } else { + if (drop_priority < packet.drop_priority) + drop_priority = packet.drop_priority; + + obs_free_encoder_packet(&packet); + } + } + + circlebuf_free(&stream->packets); + stream->packets = new_buf; + stream->min_priority = drop_priority; + stream->min_drop_dts_usec = last_drop_dts_usec; + + blog(LOG_DEBUG, "New packet count: %d", + (int)num_buffered_packets(stream)); +} + +static void check_to_drop_frames(struct rtmp_stream *stream) +{ + struct encoder_packet first; + int64_t buffer_duration_usec; + + if (num_buffered_packets(stream) < 5) + return; + + circlebuf_peek_front(&stream->packets, &first, sizeof(first)); + + /* do not drop frames if frames were just dropped within this time */ + if (first.dts_usec < stream->min_drop_dts_usec) + return; + + /* if the amount of time stored in the buffered packets waiting to be + * sent is higher than threshold, drop frames */ + buffer_duration_usec = stream->last_dts_usec - first.dts_usec; + if (buffer_duration_usec > stream->drop_threshold_usec) { + drop_frames(stream); + blog(LOG_INFO, "dropping %lld worth of frames", + buffer_duration_usec); + } +} + +static bool add_video_packet(struct rtmp_stream *stream, + struct encoder_packet *packet) +{ + check_to_drop_frames(stream); + + /* if currently dropping frames, drop packets until it reaches the + * desired priority */ + if (packet->priority < stream->min_priority) + return false; + else + stream->min_priority = 0; + + return add_packet(stream, packet); +} + static void rtmp_stream_data(void *data, struct encoder_packet *packet) { - struct rtmp_stream *stream = data; + struct rtmp_stream *stream = data; struct encoder_packet new_packet; + bool added_packet; - if (packet->type == OBS_ENCODER_AUDIO) - obs_duplicate_encoder_packet(&new_packet, packet); - else if (packet->type == OBS_ENCODER_VIDEO) + if (packet->type == OBS_ENCODER_VIDEO) obs_parse_avc_packet(&new_packet, packet); + else + obs_duplicate_encoder_packet(&new_packet, packet); pthread_mutex_lock(&stream->packets_mutex); - circlebuf_push_back(&stream->packets, &new_packet, sizeof(new_packet)); + + added_packet = (packet->type == OBS_ENCODER_VIDEO) ? + add_video_packet(stream, &new_packet) : + add_packet(stream, &new_packet); + pthread_mutex_unlock(&stream->packets_mutex); - os_sem_post(stream->send_sem); + + if (added_packet) + os_sem_post(stream->send_sem); + else + obs_free_encoder_packet(&new_packet); +} + +static void rtmp_stream_defaults(obs_data_t defaults) +{ + obs_data_set_default_int(defaults, "drop_threshold", 600000); } static obs_properties_t rtmp_stream_properties(const char *locale) @@ -428,13 +555,13 @@ static obs_properties_t rtmp_stream_properties(const char *locale) struct obs_output_info rtmp_output_info = { .id = "rtmp_output", - .flags = OBS_OUTPUT_AV | - OBS_OUTPUT_ENCODED, + .flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED, .getname = rtmp_stream_getname, .create = rtmp_stream_create, .destroy = rtmp_stream_destroy, .start = rtmp_stream_start, .stop = rtmp_stream_stop, .encoded_packet = rtmp_stream_data, + .defaults = rtmp_stream_defaults, .properties = rtmp_stream_properties };