obs-outputs: Port windows socket loop from OBS Classic

master
Richard Stanway 2017-02-21 01:57:03 +01:00 committed by jp9000
parent 941d040fd9
commit 935223be34
4 changed files with 593 additions and 100 deletions

View File

@ -62,6 +62,7 @@ endif()
set(obs-outputs_HEADERS
obs-output-ver.h
rtmp-helpers.h
rtmp-stream.h
net-if.h
flv-mux.h
flv-output.h
@ -69,6 +70,7 @@ set(obs-outputs_HEADERS
set(obs-outputs_SOURCES
obs-outputs.c
rtmp-stream.c
rtmp-windows.c
flv-output.c
flv-mux.c
net-if.c)

View File

@ -15,96 +15,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
#include <obs-module.h>
#include <obs-avc.h>
#include <util/platform.h>
#include <util/circlebuf.h>
#include <util/dstr.h>
#include <util/threading.h>
#include <inttypes.h>
#include "librtmp/rtmp.h"
#include "librtmp/log.h"
#include "flv-mux.h"
#include "net-if.h"
#ifdef _WIN32
#include <Iphlpapi.h>
#else
#include <sys/ioctl.h>
#endif
#define do_log(level, format, ...) \
blog(level, "[rtmp stream: '%s'] " format, \
obs_output_get_name(stream->output), ##__VA_ARGS__)
#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
#define debug(format, ...) do_log(LOG_DEBUG, format, ##__VA_ARGS__)
#define OPT_DROP_THRESHOLD "drop_threshold_ms"
#define OPT_PFRAME_DROP_THRESHOLD "pframe_drop_threshold_ms"
#define OPT_MAX_SHUTDOWN_TIME_SEC "max_shutdown_time_sec"
#define OPT_BIND_IP "bind_ip"
//#define TEST_FRAMEDROPS
#ifdef TEST_FRAMEDROPS
#define DROPTEST_MAX_KBPS 3000
#define DROPTEST_MAX_BYTES (DROPTEST_MAX_KBPS * 1000 / 8)
struct droptest_info {
uint64_t ts;
size_t size;
};
#endif
struct rtmp_stream {
obs_output_t *output;
pthread_mutex_t packets_mutex;
struct circlebuf packets;
bool sent_headers;
volatile bool connecting;
pthread_t connect_thread;
volatile bool active;
volatile bool disconnected;
pthread_t send_thread;
int max_shutdown_time_sec;
os_sem_t *send_sem;
os_event_t *stop_event;
uint64_t stop_ts;
uint64_t shutdown_timeout_ts;
struct dstr path, key;
struct dstr username, password;
struct dstr encoder_name;
struct dstr bind_ip;
/* frame drop variables */
int64_t drop_threshold_usec;
int64_t min_drop_dts_usec;
int64_t pframe_drop_threshold_usec;
int64_t pframe_min_drop_dts_usec;
int min_priority;
float congestion;
int64_t last_dts_usec;
uint64_t total_bytes_sent;
int dropped_frames;
#ifdef TEST_FRAMEDROPS
struct circlebuf droptest_info;
size_t droptest_size;
#endif
RTMP rtmp;
};
#include "rtmp-stream.h"
static const char *rtmp_stream_getname(void *unused)
{
@ -181,6 +92,9 @@ static void rtmp_stream_destroy(void *data)
}
}
if (stream->socket_thread_active)
pthread_join(stream->socket_thread, NULL);
if (stream) {
free_packets(stream);
dstr_free(&stream->path);
@ -196,6 +110,8 @@ static void rtmp_stream_destroy(void *data)
#ifdef TEST_FRAMEDROPS
circlebuf_free(&stream->droptest_info);
#endif
if (stream->write_buf)
bfree(stream->write_buf);
bfree(stream);
}
}
@ -347,6 +263,38 @@ static void droptest_cap_data_rate(struct rtmp_stream *stream, size_t size)
}
#endif
static int socket_queue_data(RTMPSockBuf *sb, const char *data, int len, void *arg)
{
struct rtmp_stream *stream = arg;
retry_send:
if (!RTMP_IsConnected(&stream->rtmp))
return 0;
pthread_mutex_lock(&stream->write_buf_mutex);
if (stream->write_buf_len + len > stream->write_buf_size) {
pthread_mutex_unlock(&stream->write_buf_mutex);
if (os_event_wait(stream->buffer_space_available_event)) {
return 0;
}
goto retry_send;
}
memcpy(stream->write_buf + stream->write_buf_len, data, len);
stream->write_buf_len += len;
pthread_mutex_unlock(&stream->write_buf_mutex);
os_event_signal (stream->buffer_has_data_event);
return len;
}
static int send_packet(struct rtmp_stream *stream,
struct encoder_packet *packet, bool is_header, size_t idx)
{
@ -355,16 +303,18 @@ static int send_packet(struct rtmp_stream *stream,
int recv_size = 0;
int ret = 0;
if (!stream->new_socket_loop) {
#ifdef _WIN32
ret = ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONREAD,
(u_long*)&recv_size);
ret = ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONREAD,
(u_long*)&recv_size);
#else
ret = ioctl(stream->rtmp.m_sb.sb_socket, FIONREAD, &recv_size);
ret = ioctl(stream->rtmp.m_sb.sb_socket, FIONREAD, &recv_size);
#endif
if (ret >= 0 && recv_size > 0) {
if (!discard_recv_data(stream, (size_t)recv_size))
return -1;
if (ret >= 0 && recv_size > 0) {
if (!discard_recv_data(stream, (size_t)recv_size))
return -1;
}
}
flv_packet_mux(packet, &data, &size, is_header);
@ -581,6 +531,68 @@ static int init_send(struct rtmp_stream *stream)
return OBS_OUTPUT_ERROR;
}
if (stream->new_socket_loop) {
int one = 1;
#ifdef _WIN32
if (ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
#else
if (ioctl(stream->rtmp.m_sb.sb_socket, FIONBIO, &one)) {
#endif
warn("Failed to set non-blocking socket");
return OBS_OUTPUT_ERROR;
}
if (pthread_mutex_init(&stream->write_buf_mutex, NULL) != 0) {
warn("Failed to initialize write buffer mutex");
return OBS_OUTPUT_ERROR;
}
if (os_event_init(&stream->buffer_space_available_event,
OS_EVENT_TYPE_MANUAL) != 0) {
warn("Failed to initialize write buffer event");
return OBS_OUTPUT_ERROR;
}
if (os_event_init(&stream->buffer_has_data_event,
OS_EVENT_TYPE_MANUAL) != 0) {
warn("Failed to initialize data buffer event");
return OBS_OUTPUT_ERROR;
}
if (os_event_init(&stream->socket_available_event,
OS_EVENT_TYPE_MANUAL) != 0) {
warn("Failed to initialize socket buffer event");
return OBS_OUTPUT_ERROR;
}
info("New socket loop enabled by user");
if (stream->low_latency_mode)
info("Low latency mode enabled by user");
if (stream->write_buf)
bfree(stream->write_buf);
stream->write_buf_size = STREAM_WRITE_BUFFER_SIZE;
stream->write_buf = bmalloc(STREAM_WRITE_BUFFER_SIZE);
#ifdef _WIN32
ret = pthread_create(&stream->socket_thread, NULL,
socket_thread_windows, stream);
#else
warn("New socket loop not supported on this platform");
return OBS_OUTPUT_ERROR;
#endif
if (ret != 0) {
RTMP_Close(&stream->rtmp);
warn("Failed to create socket thread");
return OBS_OUTPUT_ERROR;
}
stream->socket_thread_active = true;
stream->rtmp.m_bCustomSend = true;
stream->rtmp.m_customSendFunc = socket_queue_data;
stream->rtmp.m_customSendParam = stream;
}
os_atomic_set_bool(&stream->active, true);
while (next) {
if (!send_meta_data(stream, idx++, &next)) {
@ -681,8 +693,9 @@ static int try_connect(struct rtmp_stream *stream)
&stream->rtmp.m_bindIP.addrLen,
stream->bind_ip.array);
if (success) {
info("Binding to IPv%d", (stream->rtmp.m_bindIP.addrLen ==
sizeof(struct sockaddr_in6) ? 6 : 4));
int len = stream->rtmp.m_bindIP.addrLen;
bool ipv6 = len == sizeof(struct sockaddr_in6);
info("Binding to IPv%d", ipv6 ? 6 : 4);
}
}
@ -726,8 +739,14 @@ static bool init_connect(struct rtmp_stream *stream)
int64_t drop_p;
int64_t drop_b;
if (stopping(stream))
if (stopping(stream)) {
pthread_join(stream->send_thread, NULL);
}
if (stream->socket_thread_active) {
pthread_join(stream->socket_thread, NULL);
stream->socket_thread_active = false;
}
free_packets(stream);
@ -762,6 +781,11 @@ static bool init_connect(struct rtmp_stream *stream)
bind_ip = obs_data_get_string(settings, OPT_BIND_IP);
dstr_copy(&stream->bind_ip, bind_ip);
stream->new_socket_loop = obs_data_get_bool(settings,
OPT_NEWSOCKETLOOP_ENABLED);
stream->low_latency_mode = obs_data_get_bool(settings,
OPT_LOWLATENCY_ENABLED);
obs_data_release(settings);
return true;
}
@ -801,6 +825,8 @@ static bool rtmp_stream_start(void *data)
if (!obs_output_initialize_encoders(stream->output, 0))
return false;
RTMP_Init(&stream->rtmp);
os_atomic_set_bool(&stream->connecting, true);
return pthread_create(&stream->connect_thread, NULL, connect_thread,
stream) == 0;
@ -965,6 +991,8 @@ static void rtmp_stream_defaults(obs_data_t *defaults)
obs_data_set_default_int(defaults, OPT_PFRAME_DROP_THRESHOLD, 900);
obs_data_set_default_int(defaults, OPT_MAX_SHUTDOWN_TIME_SEC, 30);
obs_data_set_default_string(defaults, OPT_BIND_IP, "default");
obs_data_set_default_bool(defaults, OPT_NEWSOCKETLOOP_ENABLED, false);
obs_data_set_default_bool(defaults, OPT_LOWLATENCY_ENABLED, false);
}
static obs_properties_t *rtmp_stream_properties(void *unused)
@ -992,6 +1020,11 @@ static obs_properties_t *rtmp_stream_properties(void *unused)
}
netif_saddr_data_free(&addrs);
obs_properties_add_bool(props, OPT_NEWSOCKETLOOP_ENABLED,
obs_module_text("RTMPStream.NewSocketLoop"));
obs_properties_add_bool(props, OPT_LOWLATENCY_ENABLED,
obs_module_text("RTMPStream.LowLatencyMode"));
return props;
}
@ -1010,7 +1043,12 @@ static int rtmp_stream_dropped_frames(void *data)
static float rtmp_stream_congestion(void *data)
{
struct rtmp_stream *stream = data;
return stream->min_priority > 0 ? 1.0f : stream->congestion;
if (stream->new_socket_loop)
return (float)stream->write_buf_len /
(float)stream->write_buf_size;
else
return stream->min_priority > 0 ? 1.0f : stream->congestion;
}
struct obs_output_info rtmp_output_info = {

View File

@ -0,0 +1,111 @@
#include <obs-module.h>
#include <obs-avc.h>
#include <util/platform.h>
#include <util/circlebuf.h>
#include <util/dstr.h>
#include <util/threading.h>
#include <inttypes.h>
#include "librtmp/rtmp.h"
#include "librtmp/log.h"
#include "flv-mux.h"
#include "net-if.h"
#ifdef _WIN32
#include <Iphlpapi.h>
#else
#include <sys/ioctl.h>
#endif
#define do_log(level, format, ...) \
blog(level, "[rtmp stream: '%s'] " format, \
obs_output_get_name(stream->output), ##__VA_ARGS__)
#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
#define debug(format, ...) do_log(LOG_DEBUG, format, ##__VA_ARGS__)
#define OPT_DROP_THRESHOLD "drop_threshold_ms"
#define OPT_PFRAME_DROP_THRESHOLD "pframe_drop_threshold_ms"
#define OPT_MAX_SHUTDOWN_TIME_SEC "max_shutdown_time_sec"
#define OPT_BIND_IP "bind_ip"
#define OPT_NEWSOCKETLOOP_ENABLED "new_socket_loop_enabled"
#define OPT_LOWLATENCY_ENABLED "low_latency_mode_enabled"
#define STREAM_WRITE_BUFFER_SIZE 524288
//#define TEST_FRAMEDROPS
#ifdef TEST_FRAMEDROPS
#define DROPTEST_MAX_KBPS 3000
#define DROPTEST_MAX_BYTES (DROPTEST_MAX_KBPS * 1000 / 8)
struct droptest_info {
uint64_t ts;
size_t size;
};
#endif
struct rtmp_stream {
obs_output_t *output;
pthread_mutex_t packets_mutex;
struct circlebuf packets;
bool sent_headers;
volatile bool connecting;
pthread_t connect_thread;
volatile bool active;
volatile bool disconnected;
pthread_t send_thread;
int max_shutdown_time_sec;
os_sem_t *send_sem;
os_event_t *stop_event;
uint64_t stop_ts;
uint64_t shutdown_timeout_ts;
struct dstr path, key;
struct dstr username, password;
struct dstr encoder_name;
struct dstr bind_ip;
/* frame drop variables */
int64_t drop_threshold_usec;
int64_t min_drop_dts_usec;
int64_t pframe_drop_threshold_usec;
int64_t pframe_min_drop_dts_usec;
int min_priority;
float congestion;
int64_t last_dts_usec;
uint64_t total_bytes_sent;
int dropped_frames;
#ifdef TEST_FRAMEDROPS
struct circlebuf droptest_info;
size_t droptest_size;
#endif
RTMP rtmp;
bool new_socket_loop;
bool low_latency_mode;
bool disable_send_window_optimization;
bool socket_thread_active;
pthread_t socket_thread;
uint8_t *write_buf;
size_t write_buf_len;
size_t write_buf_size;
pthread_mutex_t write_buf_mutex;
os_event_t *buffer_space_available_event;
os_event_t *buffer_has_data_event;
os_event_t *socket_available_event;
};
#ifdef _WIN32
void *socket_thread_windows(void *data);
#endif

View File

@ -0,0 +1,342 @@
#ifdef _WIN32
#include "rtmp-stream.h"
#include <winsock2.h>
static void fatal_sock_shutdown(struct rtmp_stream *stream)
{
closesocket(stream->rtmp.m_sb.sb_socket);
stream->rtmp.m_sb.sb_socket = -1;
stream->write_buf_len = 0;
}
static bool socket_event(struct rtmp_stream *stream, bool *can_write,
uint64_t last_send_time)
{
WSANETWORKEVENTS net_events;
bool success;
success = !!WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL,
&net_events);
if (success) {
blog(LOG_ERROR, "socket_thread_windows: Aborting due to "
"WSAEnumNetworkEvents failure, %d",
WSAGetLastError());
fatal_sock_shutdown(stream);
return false;
}
if (net_events.lNetworkEvents & FD_WRITE)
*can_write = true;
if (net_events.lNetworkEvents & FD_CLOSE) {
if (last_send_time) {
uint32_t diff =
(os_gettime_ns() / 1000000) - last_send_time;
blog(LOG_ERROR, "socket_thread_windows: Received "
"FD_CLOSE, %u ms since last send "
"(buffer: %d / %d)",
diff,
stream->write_buf_len,
stream->write_buf_size);
}
if (os_event_try(stream->stop_event) != EAGAIN)
blog(LOG_ERROR, "socket_thread_windows: Aborting due "
"to FD_CLOSE during shutdown, "
"%d bytes lost, error %d",
stream->write_buf_len,
net_events.iErrorCode[FD_CLOSE_BIT]);
else
blog(LOG_ERROR, "socket_thread_windows: Aborting due "
"to FD_CLOSE, error %d",
net_events.iErrorCode[FD_CLOSE_BIT]);
fatal_sock_shutdown(stream);
return false;
}
if (net_events.lNetworkEvents & FD_READ) {
char discard[16384];
int err_code;
bool fatal = false;
for (;;) {
int ret = recv(stream->rtmp.m_sb.sb_socket,
discard, sizeof(discard), 0);
if (ret == -1) {
err_code = WSAGetLastError();
if (err_code == WSAEWOULDBLOCK)
break;
fatal = true;
} else if (ret == 0) {
err_code = 0;
fatal = true;
}
if (fatal) {
blog(LOG_ERROR, "socket_thread_windows: "
"Socket error, recv() returned "
"%d, GetLastError() %d",
ret, err_code);
fatal_sock_shutdown(stream);
return false;
}
}
}
return true;
}
static void ideal_send_backlog_event(struct rtmp_stream *stream,
bool *can_write)
{
ULONG ideal_send_backlog;
int ret;
ret = idealsendbacklogquery(
stream->rtmp.m_sb.sb_socket,
&ideal_send_backlog);
if (ret == 0) {
int cur_tcp_bufsize;
int size = sizeof(cur_tcp_bufsize);
ret = getsockopt(stream->rtmp.m_sb.sb_socket,
SOL_SOCKET,
SO_SNDBUF,
(char *)&cur_tcp_bufsize,
&size);
if (ret == 0) {
if (cur_tcp_bufsize < (int)ideal_send_backlog) {
int bufsize = (int)ideal_send_backlog;
setsockopt(stream->rtmp.m_sb.sb_socket,
SOL_SOCKET,
SO_SNDBUF,
(const char *)&bufsize,
sizeof(bufsize));
blog(LOG_INFO, "socket_thread_windows: "
"Increasing send buffer to "
"ISB %d (buffer: %d / %d)",
ideal_send_backlog,
stream->write_buf_len,
stream->write_buf_size);
}
} else {
blog(LOG_ERROR, "socket_thread_windows: Got "
"send_backlog_event but "
"getsockopt() returned %d",
WSAGetLastError());
}
} else {
blog(LOG_ERROR, "socket_thread_windows: Got "
"send_backlog_event but WSAIoctl() "
"returned %d",
WSAGetLastError());
}
}
enum data_ret {
RET_BREAK,
RET_FATAL,
RET_CONTINUE
};
static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write,
uint64_t *last_send_time, size_t latency_packet_size,
int delay_time)
{
bool exit_loop = false;
pthread_mutex_lock(&stream->write_buf_mutex);
if (!stream->write_buf_len) {
/* this is now an expected occasional condition due to use of
* auto-reset events, we could end up emptying the buffer as
* it's filled in a previous loop cycle, especially if using
* low latency mode. */
pthread_mutex_unlock(&stream->write_buf_mutex);
/* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, "
"but no data available"); */
return RET_BREAK;
}
int ret;
if (stream->low_latency_mode) {
size_t send_len =
min(latency_packet_size, stream->write_buf_len);
ret = send(stream->rtmp.m_sb.sb_socket,
(const char *)stream->write_buf,
(int)send_len, 0);
} else {
ret = send(stream->rtmp.m_sb.sb_socket,
(const char *)stream->write_buf,
(int)stream->write_buf_len, 0);
}
if (ret > 0) {
if (stream->write_buf_len - ret)
memmove(stream->write_buf,
stream->write_buf + ret,
stream->write_buf_len - ret);
stream->write_buf_len -= ret;
*last_send_time = os_gettime_ns() / 1000000;
os_event_signal(stream->buffer_space_available_event);
} else {
int err_code;
bool fatal_err = false;
if (ret == -1) {
err_code = WSAGetLastError();
if (err_code == WSAEWOULDBLOCK) {
*can_write = false;
pthread_mutex_unlock(&stream->write_buf_mutex);
return RET_BREAK;
}
fatal_err = true;
} else if (ret == 0) {
err_code = 0;
fatal_err = true;
}
if (fatal_err) {
/* connection closed, or connection was aborted /
* socket closed / etc, that's a fatal error. */
blog(LOG_ERROR, "socket_thread_windows: "
"Socket error, send() returned %d, "
"GetLastError() %d",
ret, err_code);
pthread_mutex_unlock(&stream->write_buf_mutex);
fatal_sock_shutdown(stream);
return RET_FATAL;
}
}
/* finish writing for now */
if (stream->write_buf_len <= 1000)
exit_loop = true;
pthread_mutex_unlock(&stream->write_buf_mutex);
if (delay_time)
os_sleep_ms(delay_time);
return exit_loop ? RET_BREAK : RET_CONTINUE;
}
static inline void socket_thread_windows_internal(struct rtmp_stream *stream)
{
bool can_write = false;
int delay_time;
size_t latency_packet_size;
uint64_t last_send_time = 0;
HANDLE send_backlog_event;
OVERLAPPED send_backlog_overlapped;
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);
WSAEventSelect(stream->rtmp.m_sb.sb_socket,
stream->socket_available_event,
FD_READ|FD_WRITE|FD_CLOSE);
send_backlog_event = CreateEvent(NULL, true, false, NULL);
if (stream->low_latency_mode) {
delay_time = 1400.0f / (stream->write_buf_size / 1000.0f);
latency_packet_size = 1460;
} else {
latency_packet_size = stream->write_buf_size;
delay_time = 0;
}
if (!stream->disable_send_window_optimization) {
memset(&send_backlog_overlapped, 0,
sizeof(send_backlog_overlapped));
send_backlog_overlapped.hEvent = send_backlog_event;
idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
&send_backlog_overlapped, NULL);
} else {
blog(LOG_INFO, "socket_thread_windows: Send window "
"optimization disabled by user.");
}
HANDLE objs[3];
objs[0] = stream->socket_available_event;
objs[1] = stream->buffer_has_data_event;
objs[2] = send_backlog_event;
for (;;) {
if (os_event_try(stream->stop_event) != EAGAIN) {
pthread_mutex_lock(&stream->write_buf_mutex);
if (stream->write_buf_len == 0) {
//blog(LOG_DEBUG, "Exiting on empty buffer");
pthread_mutex_unlock(&stream->write_buf_mutex);
break;
}
pthread_mutex_unlock(&stream->write_buf_mutex);
}
int status = WaitForMultipleObjects(3, objs, false, INFINITE);
if (status == WAIT_ABANDONED || status == WAIT_FAILED) {
blog(LOG_ERROR, "socket_thread_windows: Aborting due "
"to WaitForMultipleObjects failure");
fatal_sock_shutdown(stream);
return;
}
if (status == WAIT_OBJECT_0) {
/* Socket event */
if (!socket_event(stream, &can_write, last_send_time))
return;
} else if (status == WAIT_OBJECT_0 + 2) {
/* Ideal send backlog event */
ideal_send_backlog_event(stream, &can_write);
ResetEvent(send_backlog_event);
idealsendbacklognotify(stream->rtmp.m_sb.sb_socket,
&send_backlog_overlapped, NULL);
continue;
}
if (can_write) {
for (;;) {
enum data_ret ret = write_data(
stream,
&can_write,
&last_send_time,
latency_packet_size,
delay_time);
switch (ret) {
case RET_BREAK:
break;
case RET_FATAL:
return;
case RET_CONTINUE:;
}
}
}
}
blog(LOG_INFO, "socket_thread_windows: Normal exit");
}
void *socket_thread_windows(void *data)
{
struct rtmp_stream *stream = data;
socket_thread_windows_internal(stream);
return NULL;
}
#endif