diff --git a/plugins/obs-outputs/CMakeLists.txt b/plugins/obs-outputs/CMakeLists.txt index 105197474..0a0b114d4 100644 --- a/plugins/obs-outputs/CMakeLists.txt +++ b/plugins/obs-outputs/CMakeLists.txt @@ -62,6 +62,7 @@ endif() set(obs-outputs_HEADERS obs-output-ver.h rtmp-helpers.h + rtmp-stream.h net-if.h flv-mux.h flv-output.h @@ -69,6 +70,7 @@ set(obs-outputs_HEADERS set(obs-outputs_SOURCES obs-outputs.c rtmp-stream.c + rtmp-windows.c flv-output.c flv-mux.c net-if.c) diff --git a/plugins/obs-outputs/rtmp-stream.c b/plugins/obs-outputs/rtmp-stream.c index 865e2db0d..46ddc81ce 100644 --- a/plugins/obs-outputs/rtmp-stream.c +++ b/plugins/obs-outputs/rtmp-stream.c @@ -15,96 +15,7 @@ along with this program. If not, see . ******************************************************************************/ -#include -#include -#include -#include -#include -#include -#include -#include "librtmp/rtmp.h" -#include "librtmp/log.h" -#include "flv-mux.h" -#include "net-if.h" - -#ifdef _WIN32 -#include -#else -#include -#endif - -#define do_log(level, format, ...) \ - blog(level, "[rtmp stream: '%s'] " format, \ - obs_output_get_name(stream->output), ##__VA_ARGS__) - -#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__) -#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__) -#define debug(format, ...) do_log(LOG_DEBUG, format, ##__VA_ARGS__) - -#define OPT_DROP_THRESHOLD "drop_threshold_ms" -#define OPT_PFRAME_DROP_THRESHOLD "pframe_drop_threshold_ms" -#define OPT_MAX_SHUTDOWN_TIME_SEC "max_shutdown_time_sec" -#define OPT_BIND_IP "bind_ip" - -//#define TEST_FRAMEDROPS - -#ifdef TEST_FRAMEDROPS - -#define DROPTEST_MAX_KBPS 3000 -#define DROPTEST_MAX_BYTES (DROPTEST_MAX_KBPS * 1000 / 8) - -struct droptest_info { - uint64_t ts; - size_t size; -}; -#endif - -struct rtmp_stream { - obs_output_t *output; - - pthread_mutex_t packets_mutex; - struct circlebuf packets; - bool sent_headers; - - volatile bool connecting; - pthread_t connect_thread; - - volatile bool active; - volatile bool disconnected; - pthread_t send_thread; - - int max_shutdown_time_sec; - - os_sem_t *send_sem; - os_event_t *stop_event; - uint64_t stop_ts; - uint64_t shutdown_timeout_ts; - - struct dstr path, key; - struct dstr username, password; - struct dstr encoder_name; - struct dstr bind_ip; - - /* frame drop variables */ - int64_t drop_threshold_usec; - int64_t min_drop_dts_usec; - int64_t pframe_drop_threshold_usec; - int64_t pframe_min_drop_dts_usec; - int min_priority; - float congestion; - - int64_t last_dts_usec; - - uint64_t total_bytes_sent; - int dropped_frames; - -#ifdef TEST_FRAMEDROPS - struct circlebuf droptest_info; - size_t droptest_size; -#endif - - RTMP rtmp; -}; +#include "rtmp-stream.h" static const char *rtmp_stream_getname(void *unused) { @@ -181,6 +92,9 @@ static void rtmp_stream_destroy(void *data) } } + if (stream->socket_thread_active) + pthread_join(stream->socket_thread, NULL); + if (stream) { free_packets(stream); dstr_free(&stream->path); @@ -196,6 +110,8 @@ static void rtmp_stream_destroy(void *data) #ifdef TEST_FRAMEDROPS circlebuf_free(&stream->droptest_info); #endif + if (stream->write_buf) + bfree(stream->write_buf); bfree(stream); } } @@ -347,6 +263,38 @@ static void droptest_cap_data_rate(struct rtmp_stream *stream, size_t size) } #endif +static int socket_queue_data(RTMPSockBuf *sb, const char *data, int len, void *arg) +{ + struct rtmp_stream *stream = arg; + +retry_send: + + if (!RTMP_IsConnected(&stream->rtmp)) + return 0; + + pthread_mutex_lock(&stream->write_buf_mutex); + + if (stream->write_buf_len + len > stream->write_buf_size) { + + pthread_mutex_unlock(&stream->write_buf_mutex); + + if (os_event_wait(stream->buffer_space_available_event)) { + return 0; + } + + goto retry_send; + } + + memcpy(stream->write_buf + stream->write_buf_len, data, len); + stream->write_buf_len += len; + + pthread_mutex_unlock(&stream->write_buf_mutex); + + os_event_signal (stream->buffer_has_data_event); + + return len; +} + static int send_packet(struct rtmp_stream *stream, struct encoder_packet *packet, bool is_header, size_t idx) { @@ -355,16 +303,18 @@ static int send_packet(struct rtmp_stream *stream, int recv_size = 0; int ret = 0; + if (!stream->new_socket_loop) { #ifdef _WIN32 - ret = ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONREAD, - (u_long*)&recv_size); + ret = ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONREAD, + (u_long*)&recv_size); #else - ret = ioctl(stream->rtmp.m_sb.sb_socket, FIONREAD, &recv_size); + ret = ioctl(stream->rtmp.m_sb.sb_socket, FIONREAD, &recv_size); #endif - if (ret >= 0 && recv_size > 0) { - if (!discard_recv_data(stream, (size_t)recv_size)) - return -1; + if (ret >= 0 && recv_size > 0) { + if (!discard_recv_data(stream, (size_t)recv_size)) + return -1; + } } flv_packet_mux(packet, &data, &size, is_header); @@ -581,6 +531,68 @@ static int init_send(struct rtmp_stream *stream) return OBS_OUTPUT_ERROR; } + if (stream->new_socket_loop) { + int one = 1; +#ifdef _WIN32 + if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) { +#else + if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) { +#endif + warn("Failed to set non-blocking socket"); + return OBS_OUTPUT_ERROR; + } + + if (pthread_mutex_init(&stream->write_buf_mutex, NULL) != 0) { + warn("Failed to initialize write buffer mutex"); + return OBS_OUTPUT_ERROR; + } + + if (os_event_init(&stream->buffer_space_available_event, + OS_EVENT_TYPE_MANUAL) != 0) { + warn("Failed to initialize write buffer event"); + return OBS_OUTPUT_ERROR; + } + if (os_event_init(&stream->buffer_has_data_event, + OS_EVENT_TYPE_MANUAL) != 0) { + warn("Failed to initialize data buffer event"); + return OBS_OUTPUT_ERROR; + } + if (os_event_init(&stream->socket_available_event, + OS_EVENT_TYPE_MANUAL) != 0) { + warn("Failed to initialize socket buffer event"); + return OBS_OUTPUT_ERROR; + } + + info("New socket loop enabled by user"); + if (stream->low_latency_mode) + info("Low latency mode enabled by user"); + + if (stream->write_buf) + bfree(stream->write_buf); + + stream->write_buf_size = STREAM_WRITE_BUFFER_SIZE; + stream->write_buf = bmalloc(STREAM_WRITE_BUFFER_SIZE); + +#ifdef _WIN32 + ret = pthread_create(&stream->socket_thread, NULL, + socket_thread_windows, stream); +#else + warn("New socket loop not supported on this platform"); + return OBS_OUTPUT_ERROR; +#endif + + if (ret != 0) { + RTMP_Close(&stream->rtmp); + warn("Failed to create socket thread"); + return OBS_OUTPUT_ERROR; + } + + stream->socket_thread_active = true; + stream->rtmp.m_bCustomSend = true; + stream->rtmp.m_customSendFunc = socket_queue_data; + stream->rtmp.m_customSendParam = stream; + } + os_atomic_set_bool(&stream->active, true); while (next) { if (!send_meta_data(stream, idx++, &next)) { @@ -681,8 +693,9 @@ static int try_connect(struct rtmp_stream *stream) &stream->rtmp.m_bindIP.addrLen, stream->bind_ip.array); if (success) { - info("Binding to IPv%d", (stream->rtmp.m_bindIP.addrLen == - sizeof(struct sockaddr_in6) ? 6 : 4)); + int len = stream->rtmp.m_bindIP.addrLen; + bool ipv6 = len == sizeof(struct sockaddr_in6); + info("Binding to IPv%d", ipv6 ? 6 : 4); } } @@ -726,8 +739,14 @@ static bool init_connect(struct rtmp_stream *stream) int64_t drop_p; int64_t drop_b; - if (stopping(stream)) + if (stopping(stream)) { pthread_join(stream->send_thread, NULL); + } + + if (stream->socket_thread_active) { + pthread_join(stream->socket_thread, NULL); + stream->socket_thread_active = false; + } free_packets(stream); @@ -762,6 +781,11 @@ static bool init_connect(struct rtmp_stream *stream) bind_ip = obs_data_get_string(settings, OPT_BIND_IP); dstr_copy(&stream->bind_ip, bind_ip); + stream->new_socket_loop = obs_data_get_bool(settings, + OPT_NEWSOCKETLOOP_ENABLED); + stream->low_latency_mode = obs_data_get_bool(settings, + OPT_LOWLATENCY_ENABLED); + obs_data_release(settings); return true; } @@ -801,6 +825,8 @@ static bool rtmp_stream_start(void *data) if (!obs_output_initialize_encoders(stream->output, 0)) return false; + RTMP_Init(&stream->rtmp); + os_atomic_set_bool(&stream->connecting, true); return pthread_create(&stream->connect_thread, NULL, connect_thread, stream) == 0; @@ -965,6 +991,8 @@ static void rtmp_stream_defaults(obs_data_t *defaults) obs_data_set_default_int(defaults, OPT_PFRAME_DROP_THRESHOLD, 900); obs_data_set_default_int(defaults, OPT_MAX_SHUTDOWN_TIME_SEC, 30); obs_data_set_default_string(defaults, OPT_BIND_IP, "default"); + obs_data_set_default_bool(defaults, OPT_NEWSOCKETLOOP_ENABLED, false); + obs_data_set_default_bool(defaults, OPT_LOWLATENCY_ENABLED, false); } static obs_properties_t *rtmp_stream_properties(void *unused) @@ -992,6 +1020,11 @@ static obs_properties_t *rtmp_stream_properties(void *unused) } netif_saddr_data_free(&addrs); + obs_properties_add_bool(props, OPT_NEWSOCKETLOOP_ENABLED, + obs_module_text("RTMPStream.NewSocketLoop")); + obs_properties_add_bool(props, OPT_LOWLATENCY_ENABLED, + obs_module_text("RTMPStream.LowLatencyMode")); + return props; } @@ -1010,7 +1043,12 @@ static int rtmp_stream_dropped_frames(void *data) static float rtmp_stream_congestion(void *data) { struct rtmp_stream *stream = data; - return stream->min_priority > 0 ? 1.0f : stream->congestion; + + if (stream->new_socket_loop) + return (float)stream->write_buf_len / + (float)stream->write_buf_size; + else + return stream->min_priority > 0 ? 1.0f : stream->congestion; } struct obs_output_info rtmp_output_info = { diff --git a/plugins/obs-outputs/rtmp-stream.h b/plugins/obs-outputs/rtmp-stream.h new file mode 100644 index 000000000..53b1b530f --- /dev/null +++ b/plugins/obs-outputs/rtmp-stream.h @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include +#include +#include +#include "librtmp/rtmp.h" +#include "librtmp/log.h" +#include "flv-mux.h" +#include "net-if.h" + +#ifdef _WIN32 +#include +#else +#include +#endif + +#define do_log(level, format, ...) \ + blog(level, "[rtmp stream: '%s'] " format, \ + obs_output_get_name(stream->output), ##__VA_ARGS__) + +#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__) +#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__) +#define debug(format, ...) do_log(LOG_DEBUG, format, ##__VA_ARGS__) + +#define OPT_DROP_THRESHOLD "drop_threshold_ms" +#define OPT_PFRAME_DROP_THRESHOLD "pframe_drop_threshold_ms" +#define OPT_MAX_SHUTDOWN_TIME_SEC "max_shutdown_time_sec" +#define OPT_BIND_IP "bind_ip" +#define OPT_NEWSOCKETLOOP_ENABLED "new_socket_loop_enabled" +#define OPT_LOWLATENCY_ENABLED "low_latency_mode_enabled" + +#define STREAM_WRITE_BUFFER_SIZE 524288 + +//#define TEST_FRAMEDROPS + +#ifdef TEST_FRAMEDROPS + +#define DROPTEST_MAX_KBPS 3000 +#define DROPTEST_MAX_BYTES (DROPTEST_MAX_KBPS * 1000 / 8) + +struct droptest_info { + uint64_t ts; + size_t size; +}; +#endif + +struct rtmp_stream { + obs_output_t *output; + + pthread_mutex_t packets_mutex; + struct circlebuf packets; + bool sent_headers; + + volatile bool connecting; + pthread_t connect_thread; + + volatile bool active; + volatile bool disconnected; + pthread_t send_thread; + + int max_shutdown_time_sec; + + os_sem_t *send_sem; + os_event_t *stop_event; + uint64_t stop_ts; + uint64_t shutdown_timeout_ts; + + struct dstr path, key; + struct dstr username, password; + struct dstr encoder_name; + struct dstr bind_ip; + + /* frame drop variables */ + int64_t drop_threshold_usec; + int64_t min_drop_dts_usec; + int64_t pframe_drop_threshold_usec; + int64_t pframe_min_drop_dts_usec; + int min_priority; + float congestion; + + int64_t last_dts_usec; + + uint64_t total_bytes_sent; + int dropped_frames; + +#ifdef TEST_FRAMEDROPS + struct circlebuf droptest_info; + size_t droptest_size; +#endif + + RTMP rtmp; + + bool new_socket_loop; + bool low_latency_mode; + bool disable_send_window_optimization; + bool socket_thread_active; + pthread_t socket_thread; + uint8_t *write_buf; + size_t write_buf_len; + size_t write_buf_size; + pthread_mutex_t write_buf_mutex; + os_event_t *buffer_space_available_event; + os_event_t *buffer_has_data_event; + os_event_t *socket_available_event; +}; + +#ifdef _WIN32 +void *socket_thread_windows(void *data); +#endif diff --git a/plugins/obs-outputs/rtmp-windows.c b/plugins/obs-outputs/rtmp-windows.c new file mode 100644 index 000000000..603d8b43d --- /dev/null +++ b/plugins/obs-outputs/rtmp-windows.c @@ -0,0 +1,342 @@ +#ifdef _WIN32 +#include "rtmp-stream.h" +#include + +static void fatal_sock_shutdown(struct rtmp_stream *stream) +{ + closesocket(stream->rtmp.m_sb.sb_socket); + stream->rtmp.m_sb.sb_socket = -1; + stream->write_buf_len = 0; +} + +static bool socket_event(struct rtmp_stream *stream, bool *can_write, + uint64_t last_send_time) +{ + WSANETWORKEVENTS net_events; + bool success; + + success = !!WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL, + &net_events); + if (success) { + blog(LOG_ERROR, "socket_thread_windows: Aborting due to " + "WSAEnumNetworkEvents failure, %d", + WSAGetLastError()); + fatal_sock_shutdown(stream); + return false; + } + + if (net_events.lNetworkEvents & FD_WRITE) + *can_write = true; + + if (net_events.lNetworkEvents & FD_CLOSE) { + if (last_send_time) { + uint32_t diff = + (os_gettime_ns() / 1000000) - last_send_time; + + blog(LOG_ERROR, "socket_thread_windows: Received " + "FD_CLOSE, %u ms since last send " + "(buffer: %d / %d)", + diff, + stream->write_buf_len, + stream->write_buf_size); + } + + if (os_event_try(stream->stop_event) != EAGAIN) + blog(LOG_ERROR, "socket_thread_windows: Aborting due " + "to FD_CLOSE during shutdown, " + "%d bytes lost, error %d", + stream->write_buf_len, + net_events.iErrorCode[FD_CLOSE_BIT]); + else + blog(LOG_ERROR, "socket_thread_windows: Aborting due " + "to FD_CLOSE, error %d", + net_events.iErrorCode[FD_CLOSE_BIT]); + + fatal_sock_shutdown(stream); + return false; + } + + if (net_events.lNetworkEvents & FD_READ) { + char discard[16384]; + int err_code; + bool fatal = false; + + for (;;) { + int ret = recv(stream->rtmp.m_sb.sb_socket, + discard, sizeof(discard), 0); + if (ret == -1) { + err_code = WSAGetLastError(); + if (err_code == WSAEWOULDBLOCK) + break; + + fatal = true; + } else if (ret == 0) { + err_code = 0; + fatal = true; + } + + if (fatal) { + blog(LOG_ERROR, "socket_thread_windows: " + "Socket error, recv() returned " + "%d, GetLastError() %d", + ret, err_code); + fatal_sock_shutdown(stream); + return false; + } + } + } + + return true; +} + +static void ideal_send_backlog_event(struct rtmp_stream *stream, + bool *can_write) +{ + ULONG ideal_send_backlog; + int ret; + + ret = idealsendbacklogquery( + stream->rtmp.m_sb.sb_socket, + &ideal_send_backlog); + if (ret == 0) { + int cur_tcp_bufsize; + int size = sizeof(cur_tcp_bufsize); + + ret = getsockopt(stream->rtmp.m_sb.sb_socket, + SOL_SOCKET, + SO_SNDBUF, + (char *)&cur_tcp_bufsize, + &size); + if (ret == 0) { + if (cur_tcp_bufsize < (int)ideal_send_backlog) { + int bufsize = (int)ideal_send_backlog; + setsockopt(stream->rtmp.m_sb.sb_socket, + SOL_SOCKET, + SO_SNDBUF, + (const char *)&bufsize, + sizeof(bufsize)); + + blog(LOG_INFO, "socket_thread_windows: " + "Increasing send buffer to " + "ISB %d (buffer: %d / %d)", + ideal_send_backlog, + stream->write_buf_len, + stream->write_buf_size); + } + } else { + blog(LOG_ERROR, "socket_thread_windows: Got " + "send_backlog_event but " + "getsockopt() returned %d", + WSAGetLastError()); + } + } else { + blog(LOG_ERROR, "socket_thread_windows: Got " + "send_backlog_event but WSAIoctl() " + "returned %d", + WSAGetLastError()); + } +} + +enum data_ret { + RET_BREAK, + RET_FATAL, + RET_CONTINUE +}; + +static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write, + uint64_t *last_send_time, size_t latency_packet_size, + int delay_time) +{ + bool exit_loop = false; + + pthread_mutex_lock(&stream->write_buf_mutex); + + if (!stream->write_buf_len) { + /* this is now an expected occasional condition due to use of + * auto-reset events, we could end up emptying the buffer as + * it's filled in a previous loop cycle, especially if using + * low latency mode. */ + pthread_mutex_unlock(&stream->write_buf_mutex); + /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, " + "but no data available"); */ + return RET_BREAK; + } + + int ret; + if (stream->low_latency_mode) { + size_t send_len = + min(latency_packet_size, stream->write_buf_len); + + ret = send(stream->rtmp.m_sb.sb_socket, + (const char *)stream->write_buf, + (int)send_len, 0); + } else { + ret = send(stream->rtmp.m_sb.sb_socket, + (const char *)stream->write_buf, + (int)stream->write_buf_len, 0); + } + + if (ret > 0) { + if (stream->write_buf_len - ret) + memmove(stream->write_buf, + stream->write_buf + ret, + stream->write_buf_len - ret); + stream->write_buf_len -= ret; + + *last_send_time = os_gettime_ns() / 1000000; + + os_event_signal(stream->buffer_space_available_event); + } else { + int err_code; + bool fatal_err = false; + + if (ret == -1) { + err_code = WSAGetLastError(); + + if (err_code == WSAEWOULDBLOCK) { + *can_write = false; + pthread_mutex_unlock(&stream->write_buf_mutex); + return RET_BREAK; + } + + fatal_err = true; + } else if (ret == 0) { + err_code = 0; + fatal_err = true; + } + + if (fatal_err) { + /* connection closed, or connection was aborted / + * socket closed / etc, that's a fatal error. */ + blog(LOG_ERROR, "socket_thread_windows: " + "Socket error, send() returned %d, " + "GetLastError() %d", + ret, err_code); + + pthread_mutex_unlock(&stream->write_buf_mutex); + fatal_sock_shutdown(stream); + return RET_FATAL; + } + } + + /* finish writing for now */ + if (stream->write_buf_len <= 1000) + exit_loop = true; + + pthread_mutex_unlock(&stream->write_buf_mutex); + + if (delay_time) + os_sleep_ms(delay_time); + + return exit_loop ? RET_BREAK : RET_CONTINUE; +} + +static inline void socket_thread_windows_internal(struct rtmp_stream *stream) +{ + bool can_write = false; + + int delay_time; + size_t latency_packet_size; + uint64_t last_send_time = 0; + + HANDLE send_backlog_event; + OVERLAPPED send_backlog_overlapped; + + SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); + + WSAEventSelect(stream->rtmp.m_sb.sb_socket, + stream->socket_available_event, + FD_READ|FD_WRITE|FD_CLOSE); + + send_backlog_event = CreateEvent(NULL, true, false, NULL); + + if (stream->low_latency_mode) { + delay_time = 1400.0f / (stream->write_buf_size / 1000.0f); + latency_packet_size = 1460; + } else { + latency_packet_size = stream->write_buf_size; + delay_time = 0; + } + + if (!stream->disable_send_window_optimization) { + memset(&send_backlog_overlapped, 0, + sizeof(send_backlog_overlapped)); + send_backlog_overlapped.hEvent = send_backlog_event; + idealsendbacklognotify(stream->rtmp.m_sb.sb_socket, + &send_backlog_overlapped, NULL); + } else { + blog(LOG_INFO, "socket_thread_windows: Send window " + "optimization disabled by user."); + } + + HANDLE objs[3]; + + objs[0] = stream->socket_available_event; + objs[1] = stream->buffer_has_data_event; + objs[2] = send_backlog_event; + + for (;;) { + if (os_event_try(stream->stop_event) != EAGAIN) { + pthread_mutex_lock(&stream->write_buf_mutex); + if (stream->write_buf_len == 0) { + //blog(LOG_DEBUG, "Exiting on empty buffer"); + pthread_mutex_unlock(&stream->write_buf_mutex); + break; + } + + pthread_mutex_unlock(&stream->write_buf_mutex); + } + + int status = WaitForMultipleObjects(3, objs, false, INFINITE); + if (status == WAIT_ABANDONED || status == WAIT_FAILED) { + blog(LOG_ERROR, "socket_thread_windows: Aborting due " + "to WaitForMultipleObjects failure"); + fatal_sock_shutdown(stream); + return; + } + + if (status == WAIT_OBJECT_0) { + /* Socket event */ + if (!socket_event(stream, &can_write, last_send_time)) + return; + + } else if (status == WAIT_OBJECT_0 + 2) { + /* Ideal send backlog event */ + ideal_send_backlog_event(stream, &can_write); + + ResetEvent(send_backlog_event); + idealsendbacklognotify(stream->rtmp.m_sb.sb_socket, + &send_backlog_overlapped, NULL); + continue; + } + + if (can_write) { + for (;;) { + enum data_ret ret = write_data( + stream, + &can_write, + &last_send_time, + latency_packet_size, + delay_time); + + switch (ret) { + case RET_BREAK: + break; + case RET_FATAL: + return; + case RET_CONTINUE:; + } + } + } + } + + blog(LOG_INFO, "socket_thread_windows: Normal exit"); +} + +void *socket_thread_windows(void *data) +{ + struct rtmp_stream *stream = data; + socket_thread_windows_internal(stream); + return NULL; +} +#endif