jp9000 ea69f9cf61 obs-outputs: Call recv when data received
Not calling recv when data is received will accumulate data in the
internal recveive buffer until it's full, in which case is will stop
acknowledging.  This can lead to unjustified disconnections.
2015-11-11 17:45:02 -08:00

855 lines
21 KiB
C

/******************************************************************************
Copyright (C) 2014 by Hugh Bailey <obs.jim@gmail.com>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
#include <obs-module.h>
#include <obs-avc.h>
#include <util/platform.h>
#include <util/circlebuf.h>
#include <util/dstr.h>
#include <util/threading.h>
#include <inttypes.h>
#include "librtmp/rtmp.h"
#include "librtmp/log.h"
#include "flv-mux.h"
#ifdef _WIN32
#include <Iphlpapi.h>
#else
#include <sys/ioctl.h>
#endif
#define do_log(level, format, ...) \
blog(level, "[rtmp stream: '%s'] " format, \
obs_output_get_name(stream->output), ##__VA_ARGS__)
#define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
#define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
#define debug(format, ...) do_log(LOG_DEBUG, format, ##__VA_ARGS__)
#define OPT_DROP_THRESHOLD "drop_threshold_ms"
#define OPT_MAX_SHUTDOWN_TIME_SEC "max_shutdown_time_sec"
//#define TEST_FRAMEDROPS
struct rtmp_stream {
obs_output_t *output;
pthread_mutex_t packets_mutex;
struct circlebuf packets;
bool sent_headers;
volatile bool connecting;
pthread_t connect_thread;
bool active;
volatile bool disconnected;
pthread_t send_thread;
pthread_t stop_thread;
int max_shutdown_time_sec;
os_sem_t *send_sem;
os_event_t *stop_event;
struct dstr path, key;
struct dstr username, password;
struct dstr encoder_name;
/* frame drop variables */
int64_t drop_threshold_usec;
int64_t min_drop_dts_usec;
int min_priority;
int64_t last_dts_usec;
uint64_t total_bytes_sent;
int dropped_frames;
RTMP rtmp;
};
static const char *rtmp_stream_getname(void *unused)
{
UNUSED_PARAMETER(unused);
return obs_module_text("RTMPStream");
}
static void log_rtmp(int level, const char *format, va_list args)
{
if (level > RTMP_LOGWARNING)
return;
blogva(LOG_INFO, format, args);
}
static inline size_t num_buffered_packets(struct rtmp_stream *stream);
static inline void free_packets(struct rtmp_stream *stream)
{
size_t num_packets;
pthread_mutex_lock(&stream->packets_mutex);
num_packets = num_buffered_packets(stream);
info("Freeing %d remaining packets", (int)num_packets);
while (stream->packets.size) {
struct encoder_packet packet;
circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
obs_free_encoder_packet(&packet);
}
pthread_mutex_unlock(&stream->packets_mutex);
}
static inline bool stopping(struct rtmp_stream *stream)
{
return os_event_try(stream->stop_event) != EAGAIN;
}
static void *rtmp_stream_actual_stop(void *data);
static void rtmp_stream_destroy(void *data)
{
struct rtmp_stream *stream = data;
if (stopping(stream) && !stream->connecting) {
pthread_join(stream->stop_thread, NULL);
} else if (stream->connecting || stream->active) {
if (stream->connecting)
pthread_join(stream->connect_thread, NULL);
os_event_signal(stream->stop_event);
if (stream->active) {
os_sem_post(stream->send_sem);
obs_output_end_data_capture(stream->output);
}
rtmp_stream_actual_stop(data);
}
if (stream) {
free_packets(stream);
dstr_free(&stream->path);
dstr_free(&stream->key);
dstr_free(&stream->username);
dstr_free(&stream->password);
dstr_free(&stream->encoder_name);
os_event_destroy(stream->stop_event);
os_sem_destroy(stream->send_sem);
pthread_mutex_destroy(&stream->packets_mutex);
circlebuf_free(&stream->packets);
bfree(stream);
}
}
static void *rtmp_stream_create(obs_data_t *settings, obs_output_t *output)
{
struct rtmp_stream *stream = bzalloc(sizeof(struct rtmp_stream));
stream->output = output;
pthread_mutex_init_value(&stream->packets_mutex);
RTMP_Init(&stream->rtmp);
RTMP_LogSetCallback(log_rtmp);
RTMP_LogSetLevel(RTMP_LOGWARNING);
if (pthread_mutex_init(&stream->packets_mutex, NULL) != 0)
goto fail;
if (os_event_init(&stream->stop_event, OS_EVENT_TYPE_MANUAL) != 0)
goto fail;
UNUSED_PARAMETER(settings);
return stream;
fail:
rtmp_stream_destroy(stream);
return NULL;
}
static void *rtmp_stream_actual_stop(void *data)
{
struct rtmp_stream *stream = data;
void *ret;
if (stream->active) {
pthread_join(stream->send_thread, &ret);
RTMP_Close(&stream->rtmp);
}
os_event_reset(stream->stop_event);
stream->sent_headers = false;
return NULL;
}
static void rtmp_stream_stop(void *data)
{
struct rtmp_stream *stream = data;
int ret;
if (stopping(stream))
return;
if (stream->connecting)
pthread_join(stream->connect_thread, NULL);
os_event_signal(stream->stop_event);
if (stream->active) {
os_sem_post(stream->send_sem);
obs_output_end_data_capture(stream->output);
}
ret = pthread_create(&stream->stop_thread, NULL,
rtmp_stream_actual_stop, stream);
if (ret != 0) {
warn("Could not create stop thread! Stopping directly");
rtmp_stream_actual_stop(stream);
}
}
static inline void set_rtmp_str(AVal *val, const char *str)
{
bool valid = (str && *str);
val->av_val = valid ? (char*)str : NULL;
val->av_len = valid ? (int)strlen(str) : 0;
}
static inline void set_rtmp_dstr(AVal *val, struct dstr *str)
{
bool valid = !dstr_is_empty(str);
val->av_val = valid ? str->array : NULL;
val->av_len = valid ? (int)str->len : 0;
}
static inline bool get_next_packet(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
bool new_packet = false;
pthread_mutex_lock(&stream->packets_mutex);
if (stream->packets.size) {
circlebuf_pop_front(&stream->packets, packet,
sizeof(struct encoder_packet));
new_packet = true;
}
pthread_mutex_unlock(&stream->packets_mutex);
return new_packet;
}
static void discard_recv_data(RTMP *rtmp, size_t size)
{
uint8_t buf[512];
do {
size_t bytes = size > 512 ? 512 : size;
size -= bytes;
#ifdef _WIN32
recv(rtmp->m_sb.sb_socket, buf, (int)bytes, 0);
#else
recv(rtmp->m_sb.sb_socket, buf, bytes, 0);
#endif
} while (size > 0);
}
static int send_packet(struct rtmp_stream *stream,
struct encoder_packet *packet, bool is_header, size_t idx)
{
uint8_t *data;
size_t size;
int recv_size = 0;
int ret = 0;
#ifdef _WIN32
ret = ioctlsocket(stream->rtmp.m_sb.sb_socket, FIONREAD,
(u_long*)&recv_size);
#else
ret = ioctl(stream->rtmp.m_sb.sb_socket, FIONREAD, &recv_size);
#endif
if (ret >= 0 && recv_size > 0)
discard_recv_data(&stream->rtmp, (size_t)recv_size);
flv_packet_mux(packet, &data, &size, is_header);
#ifdef TEST_FRAMEDROPS
os_sleep_ms(rand() % 40);
#endif
ret = RTMP_Write(&stream->rtmp, (char*)data, (int)size, (int)idx);
bfree(data);
obs_free_encoder_packet(packet);
stream->total_bytes_sent += size;
return ret;
}
static inline void send_headers(struct rtmp_stream *stream);
static bool send_remaining_packets(struct rtmp_stream *stream)
{
struct encoder_packet packet;
uint64_t max_ns = (uint64_t)stream->max_shutdown_time_sec * 1000000000;
uint64_t begin_time_ns = os_gettime_ns();
if (!stream->sent_headers)
send_headers(stream);
while (get_next_packet(stream, &packet)) {
if (send_packet(stream, &packet, false, packet.track_idx) < 0)
return false;
/* Just disconnect if it takes too long to shut down */
if ((os_gettime_ns() - begin_time_ns) > max_ns) {
info("Took longer than %d second(s) to shut down, "
"automatically stopping connection",
stream->max_shutdown_time_sec);
return false;
}
}
return true;
}
static void *send_thread(void *data)
{
struct rtmp_stream *stream = data;
os_set_thread_name("rtmp-stream: send_thread");
while (os_sem_wait(stream->send_sem) == 0) {
struct encoder_packet packet;
if (stopping(stream))
break;
if (!get_next_packet(stream, &packet))
continue;
if (!stream->sent_headers)
send_headers(stream);
if (send_packet(stream, &packet, false, packet.track_idx) < 0) {
os_atomic_set_bool(&stream->disconnected, true);
break;
}
}
if (!stream->disconnected && !send_remaining_packets(stream))
os_atomic_set_bool(&stream->disconnected, true);
if (stream->disconnected) {
info("Disconnected from %s", stream->path.array);
free_packets(stream);
} else {
info("User stopped the stream");
}
if (!stopping(stream)) {
pthread_detach(stream->send_thread);
obs_output_signal_stop(stream->output, OBS_OUTPUT_DISCONNECTED);
}
stream->active = false;
stream->sent_headers = false;
return NULL;
}
static bool send_meta_data(struct rtmp_stream *stream, size_t idx)
{
uint8_t *meta_data;
size_t meta_data_size;
bool success = flv_meta_data(stream->output, &meta_data,
&meta_data_size, false, idx);
if (success) {
RTMP_Write(&stream->rtmp, (char*)meta_data,
(int)meta_data_size, (int)idx);
bfree(meta_data);
}
return success;
}
static bool send_audio_header(struct rtmp_stream *stream, size_t idx)
{
obs_output_t *context = stream->output;
obs_encoder_t *aencoder = obs_output_get_audio_encoder(context, idx);
uint8_t *header;
struct encoder_packet packet = {
.type = OBS_ENCODER_AUDIO,
.timebase_den = 1
};
if (!aencoder)
return false;
obs_encoder_get_extra_data(aencoder, &header, &packet.size);
packet.data = bmemdup(header, packet.size);
send_packet(stream, &packet, true, idx);
return true;
}
static void send_video_header(struct rtmp_stream *stream)
{
obs_output_t *context = stream->output;
obs_encoder_t *vencoder = obs_output_get_video_encoder(context);
uint8_t *header;
size_t size;
struct encoder_packet packet = {
.type = OBS_ENCODER_VIDEO,
.timebase_den = 1,
.keyframe = true
};
obs_encoder_get_extra_data(vencoder, &header, &size);
packet.size = obs_parse_avc_header(&packet.data, header, size);
send_packet(stream, &packet, true, 0);
}
static inline void send_headers(struct rtmp_stream *stream)
{
stream->sent_headers = true;
size_t i = 0;
send_audio_header(stream, i++);
send_video_header(stream);
while (send_audio_header(stream, i++));
}
static inline bool reset_semaphore(struct rtmp_stream *stream)
{
os_sem_destroy(stream->send_sem);
return os_sem_init(&stream->send_sem, 0) == 0;
}
#ifdef _WIN32
#define socklen_t int
#endif
#define MIN_SENDBUF_SIZE 65535
static void adjust_sndbuf_size(struct rtmp_stream *stream, int new_size)
{
int cur_sendbuf_size = new_size;
socklen_t int_size = sizeof(int);
getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF,
(char*)&cur_sendbuf_size, &int_size);
if (cur_sendbuf_size < new_size) {
cur_sendbuf_size = new_size;
setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF,
(const char*)&cur_sendbuf_size, int_size);
}
}
static int init_send(struct rtmp_stream *stream)
{
int ret;
size_t idx = 0;
#if defined(_WIN32)
adjust_sndbuf_size(stream, MIN_SENDBUF_SIZE);
#endif
reset_semaphore(stream);
ret = pthread_create(&stream->send_thread, NULL, send_thread, stream);
if (ret != 0) {
RTMP_Close(&stream->rtmp);
warn("Failed to create send thread");
return OBS_OUTPUT_ERROR;
}
stream->active = true;
while (send_meta_data(stream, idx++));
obs_output_begin_data_capture(stream->output, 0);
return OBS_OUTPUT_SUCCESS;
}
#ifdef _WIN32
static void win32_log_interface_type(struct rtmp_stream *stream)
{
RTMP *rtmp = &stream->rtmp;
MIB_IPFORWARDROW route;
uint32_t dest_addr, source_addr;
char hostname[256];
HOSTENT *h;
if (rtmp->Link.hostname.av_len >= sizeof(hostname) - 1)
return;
strncpy(hostname, rtmp->Link.hostname.av_val, sizeof(hostname));
hostname[rtmp->Link.hostname.av_len] = 0;
h = gethostbyname(hostname);
if (!h)
return;
dest_addr = *(uint32_t*)h->h_addr_list[0];
if (rtmp->m_bindIP.addrLen == 0)
source_addr = 0;
else if (rtmp->m_bindIP.addr.ss_family = AF_INET)
source_addr = (*(struct sockaddr_in*)&rtmp->m_bindIP)
.sin_addr.S_un.S_addr;
else
return;
if (!GetBestRoute(dest_addr, source_addr, &route)) {
MIB_IFROW row;
memset(&row, 0, sizeof(row));
row.dwIndex = route.dwForwardIfIndex;
if (!GetIfEntry(&row)) {
uint32_t speed =row.dwSpeed / 1000000;
char *type;
struct dstr other = {0};
if (row.dwType == IF_TYPE_ETHERNET_CSMACD) {
type = "ethernet";
} else if (row.dwType == IF_TYPE_IEEE80211) {
type = "802.11";
} else {
dstr_printf(&other, "type %lu", row.dwType);
type = other.array;
}
info("Interface: %s (%s, %lu mbps)", row.bDescr, type,
speed);
dstr_free(&other);
}
}
}
#endif
static int try_connect(struct rtmp_stream *stream)
{
if (dstr_is_empty(&stream->path)) {
warn("URL is empty");
return OBS_OUTPUT_BAD_PATH;
}
info("Connecting to RTMP URL %s...", stream->path.array);
memset(&stream->rtmp.Link, 0, sizeof(stream->rtmp.Link));
if (!RTMP_SetupURL(&stream->rtmp, stream->path.array))
return OBS_OUTPUT_BAD_PATH;
RTMP_EnableWrite(&stream->rtmp);
dstr_copy(&stream->encoder_name, "FMLE/3.0 (compatible; obs-studio/");
#ifdef HAVE_OBSCONFIG_H
dstr_cat(&stream->encoder_name, OBS_VERSION);
#else
dstr_catf(&stream->encoder_name, "%d.%d.%d",
LIBOBS_API_MAJOR_VER,
LIBOBS_API_MINOR_VER,
LIBOBS_API_PATCH_VER);
#endif
dstr_cat(&stream->encoder_name, "; FMSc/1.0)");
set_rtmp_dstr(&stream->rtmp.Link.pubUser, &stream->username);
set_rtmp_dstr(&stream->rtmp.Link.pubPasswd, &stream->password);
set_rtmp_dstr(&stream->rtmp.Link.flashVer, &stream->encoder_name);
stream->rtmp.Link.swfUrl = stream->rtmp.Link.tcUrl;
RTMP_AddStream(&stream->rtmp, stream->key.array);
for (size_t idx = 1;; idx++) {
obs_encoder_t *encoder = obs_output_get_audio_encoder(
stream->output, idx);
const char *encoder_name;
if (!encoder)
break;
encoder_name = obs_encoder_get_name(encoder);
RTMP_AddStream(&stream->rtmp, encoder_name);
}
stream->rtmp.m_outChunkSize = 4096;
stream->rtmp.m_bSendChunkSizeInfo = true;
stream->rtmp.m_bUseNagle = true;
#ifdef _WIN32
win32_log_interface_type(stream);
#endif
if (!RTMP_Connect(&stream->rtmp, NULL))
return OBS_OUTPUT_CONNECT_FAILED;
if (!RTMP_ConnectStream(&stream->rtmp, 0))
return OBS_OUTPUT_INVALID_STREAM;
info("Connection to %s successful", stream->path.array);
return init_send(stream);
}
static bool init_connect(struct rtmp_stream *stream)
{
obs_service_t *service;
obs_data_t *settings;
if (stopping(stream))
pthread_join(stream->stop_thread, NULL);
service = obs_output_get_service(stream->output);
if (!service)
return false;
os_atomic_set_bool(&stream->disconnected, false);
stream->total_bytes_sent = 0;
stream->dropped_frames = 0;
stream->min_drop_dts_usec= 0;
stream->min_priority = 0;
settings = obs_output_get_settings(stream->output);
dstr_copy(&stream->path, obs_service_get_url(service));
dstr_copy(&stream->key, obs_service_get_key(service));
dstr_copy(&stream->username, obs_service_get_username(service));
dstr_copy(&stream->password, obs_service_get_password(service));
stream->drop_threshold_usec =
(int64_t)obs_data_get_int(settings, OPT_DROP_THRESHOLD) * 1000;
stream->max_shutdown_time_sec =
(int)obs_data_get_int(settings, OPT_MAX_SHUTDOWN_TIME_SEC);
obs_data_release(settings);
return true;
}
static void *connect_thread(void *data)
{
struct rtmp_stream *stream = data;
int ret;
os_set_thread_name("rtmp-stream: connect_thread");
if (!init_connect(stream)) {
obs_output_signal_stop(stream->output, OBS_OUTPUT_BAD_PATH);
return NULL;
}
ret = try_connect(stream);
if (ret != OBS_OUTPUT_SUCCESS) {
obs_output_signal_stop(stream->output, ret);
info("Connection to %s failed: %d", stream->path.array, ret);
}
if (!stopping(stream))
pthread_detach(stream->connect_thread);
os_atomic_set_bool(&stream->connecting, false);
return NULL;
}
static bool rtmp_stream_start(void *data)
{
struct rtmp_stream *stream = data;
if (!obs_output_can_begin_data_capture(stream->output, 0))
return false;
if (!obs_output_initialize_encoders(stream->output, 0))
return false;
os_atomic_set_bool(&stream->connecting, true);
return pthread_create(&stream->connect_thread, NULL, connect_thread,
stream) == 0;
}
static inline bool add_packet(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
circlebuf_push_back(&stream->packets, packet,
sizeof(struct encoder_packet));
stream->last_dts_usec = packet->dts_usec;
return true;
}
static inline size_t num_buffered_packets(struct rtmp_stream *stream)
{
return stream->packets.size / sizeof(struct encoder_packet);
}
static void drop_frames(struct rtmp_stream *stream)
{
struct circlebuf new_buf = {0};
int drop_priority = 0;
uint64_t last_drop_dts_usec = 0;
int num_frames_dropped = 0;
debug("Previous packet count: %d", (int)num_buffered_packets(stream));
circlebuf_reserve(&new_buf, sizeof(struct encoder_packet) * 8);
while (stream->packets.size) {
struct encoder_packet packet;
circlebuf_pop_front(&stream->packets, &packet, sizeof(packet));
last_drop_dts_usec = packet.dts_usec;
/* do not drop audio data or video keyframes */
if (packet.type == OBS_ENCODER_AUDIO ||
packet.drop_priority == OBS_NAL_PRIORITY_HIGHEST) {
circlebuf_push_back(&new_buf, &packet, sizeof(packet));
} else {
if (drop_priority < packet.drop_priority)
drop_priority = packet.drop_priority;
num_frames_dropped++;
obs_free_encoder_packet(&packet);
}
}
circlebuf_free(&stream->packets);
stream->packets = new_buf;
stream->min_priority = drop_priority;
stream->min_drop_dts_usec = last_drop_dts_usec;
stream->dropped_frames += num_frames_dropped;
debug("New packet count: %d", (int)num_buffered_packets(stream));
}
static void check_to_drop_frames(struct rtmp_stream *stream)
{
struct encoder_packet first;
int64_t buffer_duration_usec;
if (num_buffered_packets(stream) < 5)
return;
circlebuf_peek_front(&stream->packets, &first, sizeof(first));
/* do not drop frames if frames were just dropped within this time */
if (first.dts_usec < stream->min_drop_dts_usec)
return;
/* if the amount of time stored in the buffered packets waiting to be
* sent is higher than threshold, drop frames */
buffer_duration_usec = stream->last_dts_usec - first.dts_usec;
if (buffer_duration_usec > stream->drop_threshold_usec) {
drop_frames(stream);
debug("dropping %" PRId64 " worth of frames",
buffer_duration_usec);
}
}
static bool add_video_packet(struct rtmp_stream *stream,
struct encoder_packet *packet)
{
check_to_drop_frames(stream);
/* if currently dropping frames, drop packets until it reaches the
* desired priority */
if (packet->priority < stream->min_priority) {
stream->dropped_frames++;
return false;
} else {
stream->min_priority = 0;
}
return add_packet(stream, packet);
}
static void rtmp_stream_data(void *data, struct encoder_packet *packet)
{
struct rtmp_stream *stream = data;
struct encoder_packet new_packet;
bool added_packet;
if (stream->disconnected)
return;
if (packet->type == OBS_ENCODER_VIDEO)
obs_parse_avc_packet(&new_packet, packet);
else
obs_duplicate_encoder_packet(&new_packet, packet);
pthread_mutex_lock(&stream->packets_mutex);
added_packet = (packet->type == OBS_ENCODER_VIDEO) ?
add_video_packet(stream, &new_packet) :
add_packet(stream, &new_packet);
pthread_mutex_unlock(&stream->packets_mutex);
if (added_packet)
os_sem_post(stream->send_sem);
else
obs_free_encoder_packet(&new_packet);
}
static void rtmp_stream_defaults(obs_data_t *defaults)
{
obs_data_set_default_int(defaults, OPT_DROP_THRESHOLD, 600);
obs_data_set_default_int(defaults, OPT_MAX_SHUTDOWN_TIME_SEC, 5);
}
static obs_properties_t *rtmp_stream_properties(void *unused)
{
UNUSED_PARAMETER(unused);
obs_properties_t *props = obs_properties_create();
obs_properties_add_int(props, OPT_DROP_THRESHOLD,
obs_module_text("RTMPStream.DropThreshold"),
200, 10000, 100);
return props;
}
static uint64_t rtmp_stream_total_bytes_sent(void *data)
{
struct rtmp_stream *stream = data;
return stream->total_bytes_sent;
}
static int rtmp_stream_dropped_frames(void *data)
{
struct rtmp_stream *stream = data;
return stream->dropped_frames;
}
struct obs_output_info rtmp_output_info = {
.id = "rtmp_output",
.flags = OBS_OUTPUT_AV |
OBS_OUTPUT_ENCODED |
OBS_OUTPUT_SERVICE |
OBS_OUTPUT_MULTI_TRACK,
.get_name = rtmp_stream_getname,
.create = rtmp_stream_create,
.destroy = rtmp_stream_destroy,
.start = rtmp_stream_start,
.stop = rtmp_stream_stop,
.encoded_packet = rtmp_stream_data,
.get_defaults = rtmp_stream_defaults,
.get_properties = rtmp_stream_properties,
.get_total_bytes = rtmp_stream_total_bytes_sent,
.get_dropped_frames = rtmp_stream_dropped_frames
};