#ifdef _WIN32 #include "rtmp-stream.h" #include static void fatal_sock_shutdown(struct rtmp_stream *stream) { closesocket(stream->rtmp.m_sb.sb_socket); stream->rtmp.m_sb.sb_socket = -1; stream->write_buf_len = 0; os_event_signal(stream->buffer_space_available_event); } static bool socket_event(struct rtmp_stream *stream, bool *can_write, uint64_t last_send_time) { WSANETWORKEVENTS net_events; bool success; success = !WSAEnumNetworkEvents(stream->rtmp.m_sb.sb_socket, NULL, &net_events); if (!success) { blog(LOG_ERROR, "socket_thread_windows: Aborting due to " "WSAEnumNetworkEvents failure, %d", WSAGetLastError()); fatal_sock_shutdown(stream); return false; } if (net_events.lNetworkEvents & FD_WRITE) *can_write = true; if (net_events.lNetworkEvents & FD_CLOSE) { if (last_send_time) { uint32_t diff = (os_gettime_ns() / 1000000) - last_send_time; blog(LOG_ERROR, "socket_thread_windows: Received " "FD_CLOSE, %u ms since last send " "(buffer: %d / %d)", diff, stream->write_buf_len, stream->write_buf_size); } if (os_event_try(stream->stop_event) != EAGAIN) blog(LOG_ERROR, "socket_thread_windows: Aborting due " "to FD_CLOSE during shutdown, " "%d bytes lost, error %d", stream->write_buf_len, net_events.iErrorCode[FD_CLOSE_BIT]); else blog(LOG_ERROR, "socket_thread_windows: Aborting due " "to FD_CLOSE, error %d", net_events.iErrorCode[FD_CLOSE_BIT]); fatal_sock_shutdown(stream); return false; } if (net_events.lNetworkEvents & FD_READ) { char discard[16384]; int err_code; bool fatal = false; for (;;) { int ret = recv(stream->rtmp.m_sb.sb_socket, discard, sizeof(discard), 0); if (ret == -1) { err_code = WSAGetLastError(); if (err_code == WSAEWOULDBLOCK) break; fatal = true; } else if (ret == 0) { err_code = 0; fatal = true; } if (fatal) { blog(LOG_ERROR, "socket_thread_windows: " "Socket error, recv() returned " "%d, GetLastError() %d", ret, err_code); stream->rtmp.last_error_code = err_code; fatal_sock_shutdown(stream); return false; } } } return true; } static void ideal_send_backlog_event(struct rtmp_stream *stream, bool *can_write) { ULONG ideal_send_backlog; int ret; ret = idealsendbacklogquery(stream->rtmp.m_sb.sb_socket, &ideal_send_backlog); if (ret == 0) { int cur_tcp_bufsize; int size = sizeof(cur_tcp_bufsize); ret = getsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&cur_tcp_bufsize, &size); if (ret == 0) { if (cur_tcp_bufsize < (int)ideal_send_backlog) { int bufsize = (int)ideal_send_backlog; setsockopt(stream->rtmp.m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (const char *)&bufsize, sizeof(bufsize)); blog(LOG_INFO, "socket_thread_windows: " "Increasing send buffer to " "ISB %d (buffer: %d / %d)", ideal_send_backlog, stream->write_buf_len, stream->write_buf_size); } } else { blog(LOG_ERROR, "socket_thread_windows: Got " "send_backlog_event but " "getsockopt() returned %d", WSAGetLastError()); } } else { blog(LOG_ERROR, "socket_thread_windows: Got " "send_backlog_event but WSAIoctl() " "returned %d", WSAGetLastError()); } } enum data_ret { RET_BREAK, RET_FATAL, RET_CONTINUE }; static enum data_ret write_data(struct rtmp_stream *stream, bool *can_write, uint64_t *last_send_time, size_t latency_packet_size, int delay_time) { bool exit_loop = false; pthread_mutex_lock(&stream->write_buf_mutex); if (!stream->write_buf_len) { /* this is now an expected occasional condition due to use of * auto-reset events, we could end up emptying the buffer as * it's filled in a previous loop cycle, especially if using * low latency mode. */ pthread_mutex_unlock(&stream->write_buf_mutex); /* blog(LOG_DEBUG, "socket_thread_windows: Trying to send, " "but no data available"); */ return RET_BREAK; } int ret; if (stream->low_latency_mode) { size_t send_len = min(latency_packet_size, stream->write_buf_len); ret = RTMPSockBuf_Send(&stream->rtmp.m_sb, (const char *)stream->write_buf, (int)send_len); } else { ret = RTMPSockBuf_Send(&stream->rtmp.m_sb, (const char *)stream->write_buf, (int)stream->write_buf_len); } if (ret > 0) { if (stream->write_buf_len - ret) memmove(stream->write_buf, stream->write_buf + ret, stream->write_buf_len - ret); stream->write_buf_len -= ret; *last_send_time = os_gettime_ns() / 1000000; os_event_signal(stream->buffer_space_available_event); } else { int err_code; bool fatal_err = false; if (ret == -1) { err_code = WSAGetLastError(); if (err_code == WSAEWOULDBLOCK) { *can_write = false; pthread_mutex_unlock(&stream->write_buf_mutex); return RET_BREAK; } fatal_err = true; } else if (ret == 0) { err_code = 0; fatal_err = true; } if (fatal_err) { /* connection closed, or connection was aborted / * socket closed / etc, that's a fatal error. */ blog(LOG_ERROR, "socket_thread_windows: " "Socket error, send() returned %d, " "GetLastError() %d", ret, err_code); pthread_mutex_unlock(&stream->write_buf_mutex); stream->rtmp.last_error_code = err_code; fatal_sock_shutdown(stream); return RET_FATAL; } } /* finish writing for now */ if (stream->write_buf_len <= 1000) exit_loop = true; pthread_mutex_unlock(&stream->write_buf_mutex); if (delay_time) os_sleep_ms(delay_time); return exit_loop ? RET_BREAK : RET_CONTINUE; } #define LATENCY_FACTOR 20 static inline void socket_thread_windows_internal(struct rtmp_stream *stream) { bool can_write = false; int delay_time; size_t latency_packet_size; uint64_t last_send_time = 0; HANDLE send_backlog_event; OVERLAPPED send_backlog_overlapped; SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); WSAEventSelect(stream->rtmp.m_sb.sb_socket, stream->socket_available_event, FD_READ | FD_WRITE | FD_CLOSE); send_backlog_event = CreateEvent(NULL, true, false, NULL); if (stream->low_latency_mode) { delay_time = 1000 / LATENCY_FACTOR; latency_packet_size = stream->write_buf_size / (LATENCY_FACTOR - 2); } else { latency_packet_size = stream->write_buf_size; delay_time = 0; } if (!stream->disable_send_window_optimization) { memset(&send_backlog_overlapped, 0, sizeof(send_backlog_overlapped)); send_backlog_overlapped.hEvent = send_backlog_event; idealsendbacklognotify(stream->rtmp.m_sb.sb_socket, &send_backlog_overlapped, NULL); } else { blog(LOG_INFO, "socket_thread_windows: Send window " "optimization disabled by user."); } HANDLE objs[3]; objs[0] = stream->socket_available_event; objs[1] = stream->buffer_has_data_event; objs[2] = send_backlog_event; for (;;) { if (os_event_try(stream->send_thread_signaled_exit) != EAGAIN) { pthread_mutex_lock(&stream->write_buf_mutex); if (stream->write_buf_len == 0) { //blog(LOG_DEBUG, "Exiting on empty buffer"); pthread_mutex_unlock(&stream->write_buf_mutex); os_event_reset( stream->send_thread_signaled_exit); break; } pthread_mutex_unlock(&stream->write_buf_mutex); } int status = WaitForMultipleObjects(3, objs, false, INFINITE); if (status == WAIT_ABANDONED || status == WAIT_FAILED) { blog(LOG_ERROR, "socket_thread_windows: Aborting due " "to WaitForMultipleObjects failure"); fatal_sock_shutdown(stream); return; } if (status == WAIT_OBJECT_0) { /* Socket event */ if (!socket_event(stream, &can_write, last_send_time)) return; } else if (status == WAIT_OBJECT_0 + 2) { /* Ideal send backlog event */ ideal_send_backlog_event(stream, &can_write); ResetEvent(send_backlog_event); idealsendbacklognotify(stream->rtmp.m_sb.sb_socket, &send_backlog_overlapped, NULL); continue; } if (can_write) { for (;;) { enum data_ret ret = write_data( stream, &can_write, &last_send_time, latency_packet_size, delay_time); switch (ret) { case RET_BREAK: goto exit_write_loop; case RET_FATAL: return; case RET_CONTINUE:; } } } exit_write_loop:; } if (stream->rtmp.m_sb.sb_socket != INVALID_SOCKET) WSAEventSelect(stream->rtmp.m_sb.sb_socket, stream->socket_available_event, 0); blog(LOG_INFO, "socket_thread_windows: Normal exit"); } void *socket_thread_windows(void *data) { struct rtmp_stream *stream = data; socket_thread_windows_internal(stream); return NULL; } #endif