/******************************************************************************** Copyright (C) 2012 Hugh Bailey This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ********************************************************************************/ #include "Main.h" #include "RTMPStuff.h" #include "RTMPPublisher.h" #define MAX_BUFFERED_PACKETS 10 String RTMPPublisher::strRTMPErrors; //QWORD totalCalls = 0, totalTime = 0; void rtmp_log_output(int level, const char *format, va_list vl) { int size = _vscprintf(format, vl); LPSTR lpTemp = (LPSTR)Allocate(size+1); vsprintf_s(lpTemp, size+1, format, vl); // OSDebugOut(TEXT("%S\r\n"), lpTemp); Log(TEXT("%S\r\n"), lpTemp); Free(lpTemp); } #ifdef _DEBUG DWORD quickHash (BYTE *data, UINT len) { DWORD hash = 276277; for (unsigned i=0; i RTMP_LOGERROR) return; vsnprintf(ansiStr, sizeof(ansiStr)-1, format, vl); ansiStr[sizeof(ansiStr)-1] = 0; MultiByteToWideChar(CP_UTF8, 0, ansiStr, -1, logStr, _countof(logStr)-1); Log (TEXT("librtmp error: %s"), logStr); strRTMPErrors << logStr << TEXT("\n"); } String RTMPPublisher::GetRTMPErrors() { return strRTMPErrors; } RTMPPublisher::RTMPPublisher() { //bufferedPackets.SetBaseSize(MAX_BUFFERED_PACKETS); bFirstKeyframe = true; hSendSempahore = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL); if(!hSendSempahore) CrashError(TEXT("RTMPPublisher: Could not create semaphore")); hDataMutex = OSCreateMutex(); if(!hDataMutex) CrashError(TEXT("RTMPPublisher: Could not create mutex")); //------------------------------------------ bframeDropThreshold = AppConfig->GetInt(TEXT("Publish"), TEXT("BFrameDropThreshold"), 400); if(bframeDropThreshold < 50) bframeDropThreshold = 50; else if(bframeDropThreshold > 1000) bframeDropThreshold = 1000; dropThreshold = AppConfig->GetInt(TEXT("Publish"), TEXT("FrameDropThreshold"), 600); if(dropThreshold < 50) dropThreshold = 50; else if(dropThreshold > 1000) dropThreshold = 1000; if (AppConfig->GetInt(TEXT("Publish"), TEXT("LowLatencyMode"), 0)) { if (AppConfig->GetInt(TEXT("Publish"), TEXT("LowLatencyMethod"), 0) == 0) { latencyFactor = AppConfig->GetInt(TEXT("Publish"), TEXT("LatencyFactor"), 20); if (latencyFactor < 3) latencyFactor = 3; lowLatencyMode = LL_MODE_FIXED; Log(TEXT("Using fixed low latency mode, factor %d"), latencyFactor); } else { lowLatencyMode = LL_MODE_AUTO; Log(TEXT("Using automatic low latency mode")); } } else lowLatencyMode = LL_MODE_NONE; bFastInitialKeyframe = AppConfig->GetInt(TEXT("Publish"), TEXT("FastInitialKeyframe"), 0) == 1; strRTMPErrors.Clear(); } bool RTMPPublisher::Init(RTMP *rtmpIn, UINT tcpBufferSize) { rtmp = rtmpIn; //------------------------------------------ //Log(TEXT("Using Send Buffer Size: %u"), sendBufferSize); rtmp->m_customSendFunc = (CUSTOMSEND)RTMPPublisher::BufferedSend; rtmp->m_customSendParam = this; rtmp->m_bCustomSend = TRUE; //------------------------------------------ int curTCPBufSize, curTCPBufSizeSize = sizeof(curTCPBufSize); getsockopt (rtmp->m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&curTCPBufSize, &curTCPBufSizeSize); Log(TEXT("SO_SNDBUF was at %u"), curTCPBufSize); if(curTCPBufSize < int(tcpBufferSize)) { setsockopt (rtmp->m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (const char *)&tcpBufferSize, sizeof(tcpBufferSize)); getsockopt (rtmp->m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&curTCPBufSize, &curTCPBufSizeSize); if(curTCPBufSize != tcpBufferSize) Log(TEXT("Could not set SO_SNDBUF to %u, value is now %u"), tcpBufferSize, curTCPBufSize); } Log(TEXT("SO_SNDBUF is now %u"), tcpBufferSize); //------------------------------------------ hSendThread = OSCreateThread((XTHREAD)RTMPPublisher::SendThread, this); if(!hSendThread) CrashError(TEXT("RTMPPublisher: Could not create send thread")); hBufferEvent = CreateEvent(NULL, FALSE, FALSE, NULL); hBufferSpaceAvailableEvent = CreateEvent(NULL, FALSE, FALSE, NULL); hWriteEvent = CreateEvent(NULL, FALSE, FALSE, NULL); hSendLoopExit = CreateEvent(NULL, TRUE, FALSE, NULL); hSocketLoopExit = CreateEvent(NULL, TRUE, FALSE, NULL); hSendBacklogEvent = CreateEvent(NULL, FALSE, FALSE, NULL); hDataBufferMutex = OSCreateMutex(); dataBufferSize = (App->GetVideoEncoder()->GetBitRate() + App->GetAudioEncoder()->GetBitRate()) / 8 * 1024; if (dataBufferSize < 131072) dataBufferSize = 131072; dataBuffer = (BYTE *)Allocate(dataBufferSize); hSocketThread = OSCreateThread((XTHREAD)RTMPPublisher::SocketThread, this); if(!hSocketThread) CrashError(TEXT("RTMPPublisher: Could not create send thread")); //------------------------------------------ packetWaitType = 0; return true; } RTMPPublisher::~RTMPPublisher() { //OSDebugOut (TEXT("*** ~RTMPPublisher (%d queued, %d buffered, %d data)\n"), queuedPackets.Num(), bufferedPackets.Num(), curDataBufferLen); bStopping = true; //we're in the middle of connecting! wait for that to happen to avoid all manner of race conditions if (hConnectionThread) { WaitForSingleObject(hConnectionThread, INFINITE); OSCloseThread(hConnectionThread); } //send all remaining buffered packets, this may block since it respects timestamps FlushBufferedPackets (); //OSDebugOut (TEXT("%d queued after flush\n"), queuedPackets.Num()); if(hSendThread) { //this marks the thread to exit after current work is done SetEvent(hSendLoopExit); //these wake up the thread ReleaseSemaphore(hSendSempahore, 1, NULL); SetEvent(hBufferSpaceAvailableEvent); //wait 50 sec for all data to finish sending if (WaitForSingleObject(hSendThread, 50000) == WAIT_TIMEOUT) { Log(TEXT("~RTMPPublisher: Network appears stalled with %d / %d buffered, dropping connection!"), curDataBufferLen, dataBufferSize); FatalSocketShutdown(); //this will wake up and flush the sendloop if it's still trying to send out stuff ReleaseSemaphore(hSendSempahore, 1, NULL); SetEvent(hBufferSpaceAvailableEvent); } OSTerminateThread(hSendThread, 10000); } if(hSendSempahore) CloseHandle(hSendSempahore); //OSDebugOut (TEXT("*** ~RTMPPublisher hSendThread terminated (%d queued, %d buffered, %d data)\n"), queuedPackets.Num(), bufferedPackets.Num(), curDataBufferLen); if (hSocketThread) { //mark the socket loop to shut down after the buffer is empty SetEvent(hSocketLoopExit); //wake it up in case it already is empty SetEvent(hBufferEvent); //wait 60 sec for it to exit OSTerminateThread(hSocketThread, 60000); } //OSDebugOut (TEXT("*** ~RTMPPublisher hSocketThread terminated (%d queued, %d buffered, %d data)\n"), queuedPackets.Num(), bufferedPackets.Num(), curDataBufferLen); if(rtmp) { if (RTMP_IsConnected(rtmp)) { //at this point nothing should be in the buffer, flush out what remains to the net and make it blocking FlushDataBuffer(); //disable the buffered send, so RTMP_* functions write directly to the net (and thus block) rtmp->m_bCustomSend = 0; //manually shut down the stream and issue a graceful socket shutdown RTMP_DeleteStream(rtmp); shutdown(rtmp->m_sb.sb_socket, SD_SEND); //this waits for the socket shutdown to complete gracefully for (;;) { char buff[1024]; int ret; ret = recv(rtmp->m_sb.sb_socket, buff, sizeof(buff), 0); if (!ret) break; else if (ret == -1) { Log(TEXT("~RTMPublisher: Received error %d while waiting for graceful shutdown."), WSAGetLastError()); break; } } //OSDebugOut(TEXT("Graceful shutdown complete.\n")); } //this closes the socket if not already done RTMP_Close(rtmp); } if(hDataMutex) OSCloseMutex(hDataMutex); while (bufferedPackets.Num()) { //this should not happen any more... bufferedPackets[0].data.Clear(); bufferedPackets.Remove(0); } if (dataBuffer) Free(dataBuffer); if (hDataBufferMutex) OSCloseMutex(hDataBufferMutex); if (hBufferEvent) CloseHandle(hBufferEvent); if (hSendLoopExit) CloseHandle(hSendLoopExit); if (hSocketLoopExit) CloseHandle(hSocketLoopExit); if (hSendBacklogEvent) CloseHandle(hSendBacklogEvent); if (hBufferSpaceAvailableEvent) CloseHandle(hBufferSpaceAvailableEvent); if (hWriteEvent) CloseHandle(hWriteEvent); if(rtmp) { if (rtmp->Link.pubUser.av_val) Free(rtmp->Link.pubUser.av_val); if (rtmp->Link.pubPasswd.av_val) Free(rtmp->Link.pubPasswd.av_val); RTMP_Free(rtmp); } //-------------------------- for(UINT i=0; i timestamp) break; } return index; } UINT RTMPPublisher::FindClosestBufferIndex(DWORD timestamp) { UINT index; for (index=0; index timestamp) break; } return index; } void RTMPPublisher::InitializeBuffer() { bool bFirstAudio = true; for (UINT i=0; i= dropThreshold + audioTimeOffset) { minFramedropTimestsamp = queuedPackets.Last().timestamp; OSDebugOut(TEXT("dropped all at %u, threshold is %u, total duration is %u, %d in queue\r\n"), currentBufferSize, dropThreshold + audioTimeOffset, queueDuration, queuedPackets.Num()); //what the hell, just flush it all for now as a test and force a keyframe 1 second after while (DoIFrameDelay(false)); if(packetWaitType > PacketType_VideoLow) RequestKeyframe(1000); } else if (queueDuration >= bframeDropThreshold + audioTimeOffset && curTime-lastBFrameDropTime >= dropThreshold + audioTimeOffset) { OSDebugOut(TEXT("dropped b-frames at %u, threshold is %u, total duration is %u\r\n"), currentBufferSize, bframeDropThreshold + audioTimeOffset, queueDuration); while (DoIFrameDelay(true)); lastBFrameDropTime = curTime; } } } if(queuedPackets.Num()) ReleaseSemaphore(hSendSempahore, 1, NULL); } void RTMPPublisher::SendPacket(BYTE *data, UINT size, DWORD timestamp, PacketType type) { if(!bConnected && !bConnecting && !bStopping) { hConnectionThread = OSCreateThread((XTHREAD)CreateConnectionThread, this); bConnecting = true; } if (bFastInitialKeyframe) { if (!bConnected) { //while not connected, keep at most one keyframe buffered if (type != PacketType_VideoHighest) return; bufferedPackets.Clear(); } if (bConnected && bFirstKeyframe) { bFirstKeyframe = false; firstTimestamp = timestamp; //send out our buffered keyframe immediately, unless this packet happens to also be a keyframe if (type != PacketType_VideoHighest && bufferedPackets.Num() == 1) { TimedPacket packet; mcpy(&packet, &bufferedPackets[0], sizeof(TimedPacket)); bufferedPackets.Remove(0); packet.timestamp = 0; SendPacketForReal(packet.data.Array(), packet.data.Num(), packet.timestamp, packet.type); } else bufferedPackets.Clear(); } } else { if (bFirstKeyframe) { if (!bConnected || type != PacketType_VideoHighest) return; firstTimestamp = timestamp; bFirstKeyframe = false; } } //OSDebugOut (TEXT("%u: SendPacket (%d bytes - %08x @ %u)\n"), OSGetTime(), size, quickHash(data,size), timestamp); if (bufferedPackets.Num() == MAX_BUFFERED_PACKETS) { if (!bBufferFull) { InitializeBuffer(); bBufferFull = true; } TimedPacket packet; mcpy(&packet, &bufferedPackets[0], sizeof(TimedPacket)); bufferedPackets.Remove(0); SendPacketForReal(packet.data.Array(), packet.data.Num(), packet.timestamp, packet.type); } timestamp -= firstTimestamp; TimedPacket *packet; if (type == PacketType_Audio) { UINT newID; timestamp -= audioTimeOffset; newID = FindClosestBufferIndex(timestamp); packet = bufferedPackets.InsertNew(newID); } else { packet = bufferedPackets.CreateNew(); } packet->data.CopyArray(data, size); packet->timestamp = timestamp; packet->type = type; /*for (UINT i=0; i= packetWaitType) { if(type != PacketType_Audio) packetWaitType = PacketType_VideoDisposable; bAddPacket = true; } if(bAddPacket) { List paddedData; paddedData.SetSize(size+RTMP_MAX_HEADER_SIZE); mcpy(paddedData.Array()+RTMP_MAX_HEADER_SIZE, data, size); if(!bSentFirstKeyframe) { DataPacket sei; App->GetVideoEncoder()->GetSEI(sei); paddedData.InsertArray(RTMP_MAX_HEADER_SIZE+5, sei.lpPacket, sei.size); bSentFirstKeyframe = true; } currentBufferSize += paddedData.Num(); UINT droppedFrameVal = queuedPackets.Num() ? queuedPackets.Last().distanceFromDroppedFrame+1 : 10000; UINT id = FindClosestQueueIndex(timestamp); NetworkPacket *queuedPacket = queuedPackets.InsertNew(id); queuedPacket->distanceFromDroppedFrame = droppedFrameVal; queuedPacket->data.TransferFrom(paddedData); queuedPacket->timestamp = timestamp; queuedPacket->type = type; } else { if(type < PacketType_VideoHigh) numBFramesDumped++; else numPFramesDumped++; } } } OSLeaveMutex(hDataMutex); } void RTMPPublisher::BeginPublishingInternal() { RTMPPacket packet; char pbuf[2048], *pend = pbuf+sizeof(pbuf); packet.m_nChannel = 0x03; // control channel (invoke) packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_INFO; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = rtmp->m_stream_id; packet.m_hasAbsTimestamp = TRUE; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_setDataFrame); enc = AMF_EncodeString(enc, pend, &av_onMetaData); enc = App->EncMetaData(enc, pend); packet.m_nBodySize = enc - packet.m_body; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { App->PostStopMessage(); return; } //---------------------------------------------- List packetPadding; DataPacket mediaHeaders; //---------------------------------------------- packet.m_nChannel = 0x05; // source channel packet.m_packetType = RTMP_PACKET_TYPE_AUDIO; App->GetAudioHeaders(mediaHeaders); packetPadding.SetSize(RTMP_MAX_HEADER_SIZE); packetPadding.AppendArray(mediaHeaders.lpPacket, mediaHeaders.size); packet.m_body = (char*)packetPadding.Array()+RTMP_MAX_HEADER_SIZE; packet.m_nBodySize = mediaHeaders.size; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { App->PostStopMessage(); return; } //---------------------------------------------- packet.m_nChannel = 0x04; // source channel packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_VIDEO; App->GetVideoHeaders(mediaHeaders); packetPadding.SetSize(RTMP_MAX_HEADER_SIZE); packetPadding.AppendArray(mediaHeaders.lpPacket, mediaHeaders.size); packet.m_body = (char*)packetPadding.Array()+RTMP_MAX_HEADER_SIZE; packet.m_nBodySize = mediaHeaders.size; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { App->PostStopMessage(); return; } } void RTMPPublisher::BeginPublishing() { } void LogInterfaceType (RTMP *rtmp) { MIB_IPFORWARDROW route; DWORD destAddr; CHAR hostname[256]; if (rtmp->Link.hostname.av_len >= sizeof(hostname)-1) return; strncpy (hostname, rtmp->Link.hostname.av_val, sizeof(hostname)-1); hostname[rtmp->Link.hostname.av_len] = 0; HOSTENT *h = gethostbyname(hostname); if (!h) return; destAddr = *(DWORD *)h->h_addr_list[0]; if (!GetBestRoute (destAddr, rtmp->m_bindIP.addr.sin_addr.S_un.S_addr, &route)) { MIB_IFROW row; zero (&row, sizeof(row)); row.dwIndex = route.dwForwardIfIndex; if (!GetIfEntry (&row)) { DWORD speed = row.dwSpeed / 1000000; TCHAR *type; String otherType; if (row.dwType == IF_TYPE_ETHERNET_CSMACD) type = TEXT("ethernet"); else if (row.dwType == IF_TYPE_IEEE80211) type = TEXT("802.11"); else { otherType = FormattedString (TEXT("type %d"), row.dwType); type = otherType.Array(); } Log (TEXT(" Interface: %S (%s, %d mbps)"), row.bDescr, type, speed); } } } DWORD WINAPI RTMPPublisher::CreateConnectionThread(RTMPPublisher *publisher) { //------------------------------------------------------ // set up URL bool bRetry = false; bool bSuccess = false; bool bCanRetry = false; String failReason; String strBindIP; int serviceID = AppConfig->GetInt (TEXT("Publish"), TEXT("Service")); String strURL = AppConfig->GetString(TEXT("Publish"), TEXT("URL")); String strPlayPath = AppConfig->GetString(TEXT("Publish"), TEXT("PlayPath")); strURL.KillSpaces(); strPlayPath.KillSpaces(); LPSTR lpAnsiURL = NULL, lpAnsiPlaypath = NULL; RTMP *rtmp = NULL; //-------------------------------- // unbelievably disgusting hack for elgato devices String strOldDirectory; UINT dirSize = GetCurrentDirectory(0, 0); strOldDirectory.SetLength(dirSize); GetCurrentDirectory(dirSize, strOldDirectory.Array()); OSSetCurrentDirectory(API->GetAppPath()); //-------------------------------- if(!strURL.IsValid()) { failReason = TEXT("No server specified to connect to"); goto end; } if(serviceID != 0) { XConfig serverData; if(!serverData.Open(TEXT("services.xconfig"))) { failReason = TEXT("Could not open services.xconfig"); goto end; } XElement *services = serverData.GetElement(TEXT("services")); if(!services) { failReason = TEXT("Could not find any services in services.xconfig"); goto end; } XElement *service = NULL; DWORD numServices = services->NumElements(); for(UINT i=0; iGetElementByID(i); if(curService->GetInt(TEXT("id")) == serviceID) { service = curService; break; } } if(!service) { failReason = TEXT("Could not find the service specified in services.xconfig"); goto end; } XElement *servers = service->GetElement(TEXT("servers")); if(!servers) { failReason = TEXT("Could not find any servers for the service specified in services.xconfig"); goto end; } XDataItem *item = servers->GetDataItem(strURL); if(!item) item = servers->GetDataItemByID(0); strURL = item->GetData(); Log(TEXT("Using RTMP service: %s"), service->GetName()); Log(TEXT(" Server selection: %s"), strURL.Array()); } //------------------------------------------------------ // now back to the elgato directory if it needs the directory changed still to function *sigh* OSSetCurrentDirectory(strOldDirectory); //------------------------------------------------------ rtmp = RTMP_Alloc(); RTMP_Init(rtmp); RTMP_LogSetCallback(librtmpErrorCallback); //RTMP_LogSetLevel(RTMP_LOGERROR); lpAnsiURL = strURL.CreateUTF8String(); lpAnsiPlaypath = strPlayPath.CreateUTF8String(); if(!RTMP_SetupURL2(rtmp, lpAnsiURL, lpAnsiPlaypath)) { failReason = Str("Connection.CouldNotParseURL"); goto end; } char *rtmpUser = AppConfig->GetString(TEXT("Publish"), TEXT("Username")).CreateUTF8String(); char *rtmpPass = AppConfig->GetString(TEXT("Publish"), TEXT("Password")).CreateUTF8String(); if (rtmpUser) { rtmp->Link.pubUser.av_val = rtmpUser; rtmp->Link.pubUser.av_len = (int)strlen(rtmpUser); } if (rtmpPass) { rtmp->Link.pubPasswd.av_val = rtmpPass; rtmp->Link.pubPasswd.av_len = (int)strlen(rtmpPass); } RTMP_EnableWrite(rtmp); //set it to publish rtmp->Link.swfUrl.av_len = rtmp->Link.tcUrl.av_len; rtmp->Link.swfUrl.av_val = rtmp->Link.tcUrl.av_val; /*rtmp->Link.pageUrl.av_len = rtmp->Link.tcUrl.av_len; rtmp->Link.pageUrl.av_val = rtmp->Link.tcUrl.av_val;*/ rtmp->Link.flashVer.av_val = "FMLE/3.0 (compatible; FMSc/1.0)"; rtmp->Link.flashVer.av_len = (int)strlen(rtmp->Link.flashVer.av_val); //----------------------------------------- UINT tcpBufferSize = AppConfig->GetInt(TEXT("Publish"), TEXT("TCPBufferSize"), 64*1024); if(tcpBufferSize < 8192) tcpBufferSize = 8192; else if(tcpBufferSize > 1024*1024) tcpBufferSize = 1024*1024; rtmp->m_outChunkSize = 4096;//RTMP_DEFAULT_CHUNKSIZE;// rtmp->m_bSendChunkSizeInfo = TRUE; rtmp->m_bUseNagle = TRUE; strBindIP = AppConfig->GetString(TEXT("Publish"), TEXT("BindToIP"), TEXT("Default")); if (scmp(strBindIP, TEXT("Default"))) { rtmp->m_bindIP.addr.sin_family = AF_INET; rtmp->m_bindIP.addrLen = sizeof(rtmp->m_bindIP.addr); if (WSAStringToAddress(strBindIP.Array(), AF_INET, NULL, (LPSOCKADDR)&rtmp->m_bindIP.addr, &rtmp->m_bindIP.addrLen) == SOCKET_ERROR) { // no localization since this should rarely/never happen failReason = TEXT("WSAStringToAddress: Could not parse address"); goto end; } } LogInterfaceType(rtmp); //----------------------------------------- if(!RTMP_Connect(rtmp, NULL)) { failReason = Str("Connection.CouldNotConnect"); failReason << TEXT("\r\n\r\n") << RTMPPublisher::GetRTMPErrors(); bCanRetry = true; goto end; } if(!RTMP_ConnectStream(rtmp, 0)) { failReason = Str("Connection.InvalidStream"); failReason << TEXT("\r\n\r\n") << RTMPPublisher::GetRTMPErrors(); bCanRetry = true; goto end; } //----------------------------------------- OSDebugOut(TEXT("Connected: %u\r\n"), OSGetTime()); publisher->RequestKeyframe(1000); //----------------------------------------- bSuccess = true; end: if (lpAnsiURL) Free(lpAnsiURL); if (lpAnsiPlaypath) Free(lpAnsiPlaypath); if(!bSuccess) { if(rtmp) { RTMP_Close(rtmp); RTMP_Free(rtmp); } if(failReason.IsValid()) App->SetStreamReport(failReason); if(!publisher->bStopping) PostMessage(hwndMain, OBS_REQUESTSTOP, bCanRetry ? 0 : 1, 0); Log(TEXT("Connection to %s failed: %s"), strURL.Array(), failReason.Array()); publisher->bStopping = true; } else { publisher->Init(rtmp, tcpBufferSize); publisher->bConnected = true; publisher->bConnecting = false; } return 0; } double RTMPPublisher::GetPacketStrain() const { return (curDataBufferLen / (double)dataBufferSize) * 100.0; /*if(packetWaitType >= PacketType_VideoHigh) return min(100.0, dNetworkStrain*100.0); else if(bNetworkStrain) return dNetworkStrain*66.0; return dNetworkStrain*33.0;*/ } QWORD RTMPPublisher::GetCurrentSentBytes() { return bytesSent; } DWORD RTMPPublisher::NumDroppedFrames() const { return numBFramesDumped+numPFramesDumped; } int RTMPPublisher::FlushDataBuffer() { unsigned long zero = 0; //OSDebugOut (TEXT("*** ~RTMPPublisher FlushDataBuffer (%d)\n"), curDataBufferLen); //make it blocking again WSAEventSelect(rtmp->m_sb.sb_socket, NULL, 0); ioctlsocket(rtmp->m_sb.sb_socket, FIONBIO, &zero); OSEnterMutex(hDataBufferMutex); int ret = send(rtmp->m_sb.sb_socket, (const char *)dataBuffer, curDataBufferLen, 0); curDataBufferLen = 0; OSLeaveMutex(hDataBufferMutex); return ret; } void RTMPPublisher::SetupSendBacklogEvent() { OVERLAPPED overlapped; zero (&overlapped, sizeof(overlapped)); overlapped.hEvent = hSendBacklogEvent; idealsendbacklognotify(rtmp->m_sb.sb_socket, &overlapped, NULL); } void RTMPPublisher::FatalSocketShutdown() { //We close the socket manually to avoid trying to run cleanup code during the shutdown cycle since //if we're being called the socket is already in an unusable state. closesocket(rtmp->m_sb.sb_socket); rtmp->m_sb.sb_socket = -1; //anything buffered is invalid now curDataBufferLen = 0; App->PostStopMessage(); } void RTMPPublisher::SocketLoop() { bool canWrite = false; int delayTime; int latencyPacketSize; DWORD lastSendTime = 0; WSANETWORKEVENTS networkEvents; SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); WSAEventSelect(rtmp->m_sb.sb_socket, hWriteEvent, FD_READ|FD_WRITE|FD_CLOSE); //Low latency mode works by delaying delayTime ms between calls to send() and only sending //a buffer as large as latencyPacketSize at once. This causes keyframes and other data bursts //to be sent over several sends instead of one large one. if (lowLatencyMode == LL_MODE_AUTO) { //Auto mode aims for a constant rate of whatever the stream bitrate is and segments into //MTU sized packets (test packet captures indicated that despite nagling being enabled, //the size of the send() buffer is still important for some reason). Note that delays //become very short at this rate, and it can take a while for the buffer to empty after //a keyframe. delayTime = 1400.0f / (dataBufferSize / 1000.0f); latencyPacketSize = 1460; } else if (lowLatencyMode == LL_MODE_FIXED) { //We use latencyFactor - 2 to guarantee we're always sending at a slightly higher //rate than the maximum expected data rate so we don't get backed up latencyPacketSize = dataBufferSize / (latencyFactor - 2); delayTime = 1000 / latencyFactor; } else { latencyPacketSize = dataBufferSize; delayTime = 0; } SetupSendBacklogEvent (); HANDLE hObjects[3]; hObjects[0] = hWriteEvent; hObjects[1] = hBufferEvent; hObjects[2] = hSendBacklogEvent; for (;;) { if (bStopping && WaitForSingleObject(hSocketLoopExit, 0) != WAIT_TIMEOUT) { OSEnterMutex(hDataBufferMutex); if (curDataBufferLen == 0) { //OSDebugOut (TEXT("Exiting on empty buffer.\n")); OSLeaveMutex(hDataBufferMutex); break; } //OSDebugOut (TEXT("Want to exit, but %d bytes remain.\n"), curDataBufferLen); OSLeaveMutex(hDataBufferMutex); } int status = WaitForMultipleObjects (3, hObjects, FALSE, INFINITE); if (status == WAIT_ABANDONED || status == WAIT_FAILED) { Log(TEXT("RTMPPublisher::SocketLoop: Aborting due to WaitForMultipleObjects failure")); App->PostStopMessage(); return; } if (status == WAIT_OBJECT_0) { //Socket event if (WSAEnumNetworkEvents (rtmp->m_sb.sb_socket, NULL, &networkEvents)) { Log(TEXT("RTMPPublisher::SocketLoop: Aborting due to WSAEnumNetworkEvents failure, %d"), WSAGetLastError()); App->PostStopMessage(); return; } if (networkEvents.lNetworkEvents & FD_WRITE) canWrite = true; if (networkEvents.lNetworkEvents & FD_CLOSE) { if (bStopping) Log(TEXT("RTMPPublisher::SocketLoop: Aborting due to FD_CLOSE during shutdown, %d bytes lost, error %d"), curDataBufferLen, networkEvents.iErrorCode[FD_CLOSE_BIT]); else Log(TEXT("RTMPPublisher::SocketLoop: Aborting due to FD_CLOSE, error %d"), networkEvents.iErrorCode[FD_CLOSE_BIT]); FatalSocketShutdown (); return; } if (networkEvents.lNetworkEvents & FD_READ) { BYTE discard[16384]; int ret, errorCode; BOOL fatalError = FALSE; for (;;) { ret = recv(rtmp->m_sb.sb_socket, (char *)discard, sizeof(discard), 0); if (ret == -1) { errorCode = WSAGetLastError(); if (errorCode == WSAEWOULDBLOCK) break; fatalError = TRUE; } else if (ret == 0) { errorCode = 0; fatalError = TRUE; } if (fatalError) { Log(TEXT("RTMPPublisher::SocketLoop: Socket error, recv() returned %d, GetLastError() %d"), ret, errorCode); FatalSocketShutdown (); return; } } } } else if (status == WAIT_OBJECT_0 + 2) { //Ideal send backlog event ULONG idealSendBacklog; if (!idealsendbacklogquery(rtmp->m_sb.sb_socket, &idealSendBacklog)) { int curTCPBufSize, curTCPBufSizeSize = sizeof(curTCPBufSize); getsockopt (rtmp->m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char *)&curTCPBufSize, &curTCPBufSizeSize); if (curTCPBufSize < (int)idealSendBacklog) { int bufferSize = (int)idealSendBacklog; setsockopt(rtmp->m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (const char *)&bufferSize, sizeof(bufferSize)); Log(TEXT("RTMPPublisher::Socketloop: Increasing send buffer to ISB %d (buffer: %d / %d)"), idealSendBacklog, curDataBufferLen, dataBufferSize); } } SetupSendBacklogEvent (); continue; } if (canWrite) { bool exitLoop = false; do { OSEnterMutex(hDataBufferMutex); if (!curDataBufferLen) { //this is now an expected occasional condition due to use of auto-reset events, we could end up emptying the buffer //as it's filled in a previous loop cycle, especially if using low latency mode. OSLeaveMutex(hDataBufferMutex); //Log(TEXT("RTMPPublisher::SocketLoop: Trying to send, but no data available?!")); break; } int ret; if (lowLatencyMode != LL_MODE_NONE) { int sendLength = min (latencyPacketSize, curDataBufferLen); ret = send(rtmp->m_sb.sb_socket, (const char *)dataBuffer, sendLength, 0); } else { ret = send(rtmp->m_sb.sb_socket, (const char *)dataBuffer, curDataBufferLen, 0); } if (ret > 0) { if (curDataBufferLen - ret) memmove(dataBuffer, dataBuffer + ret, curDataBufferLen - ret); curDataBufferLen -= ret; bytesSent += ret; if (lastSendTime) { totalSendPeriod += OSGetTime() - lastSendTime; totalSendBytes += ret; totalSendCount++; } lastSendTime = OSGetTime(); SetEvent(hBufferSpaceAvailableEvent); } else { int errorCode; BOOL fatalError = FALSE; if (ret == -1) { errorCode = WSAGetLastError(); if (errorCode == WSAEWOULDBLOCK) { canWrite = false; OSLeaveMutex(hDataBufferMutex); break; } fatalError = TRUE; } else if (ret == 0) { errorCode = 0; fatalError = TRUE; } if (fatalError) { //connection closed, or connection was aborted / socket closed / etc, that's a fatal error for us. Log(TEXT("RTMPPublisher::SocketLoop: Socket error, send() returned %d, GetLastError() %d"), ret, errorCode); OSLeaveMutex(hDataBufferMutex); FatalSocketShutdown (); return; } } //finish writing for now if (curDataBufferLen <= 1000) exitLoop = true; OSLeaveMutex(hDataBufferMutex); if (delayTime) Sleep (delayTime); } while (!exitLoop); } } Log(TEXT("RTMPPublisher::SocketLoop: Graceful loop exit")); } void RTMPPublisher::SendLoop() { SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); while(WaitForSingleObject(hSendSempahore, INFINITE) == WAIT_OBJECT_0) { while(true) { OSEnterMutex(hDataMutex); if(queuedPackets.Num() == 0) { OSLeaveMutex(hDataMutex); break; } List packetData; PacketType type = queuedPackets[0].type; DWORD timestamp = queuedPackets[0].timestamp; packetData.TransferFrom(queuedPackets[0].data); currentBufferSize -= packetData.Num(); queuedPackets.Remove(0); OSLeaveMutex(hDataMutex); //-------------------------------------------- RTMPPacket packet; packet.m_nChannel = (type == PacketType_Audio) ? 0x5 : 0x4; packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; packet.m_packetType = (type == PacketType_Audio) ? RTMP_PACKET_TYPE_AUDIO : RTMP_PACKET_TYPE_VIDEO; packet.m_nTimeStamp = timestamp; packet.m_nInfoField2 = rtmp->m_stream_id; packet.m_hasAbsTimestamp = TRUE; packet.m_nBodySize = packetData.Num()-RTMP_MAX_HEADER_SIZE; packet.m_body = (char*)packetData.Array()+RTMP_MAX_HEADER_SIZE; //QWORD sendTimeStart = OSGetTimeMicroseconds(); if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { //should never reach here with the new shutdown sequence. RUNONCE Log(TEXT("RTMP_SendPacket failure, should not happen!")); if(!RTMP_IsConnected(rtmp)) { App->PostStopMessage(); break; } } //---------------------------------------------------------- /*outputRateSize += packetData.Num(); packetSizeRecord << PacketTimeSize(timestamp, packetData.Num()); if(packetSizeRecord.Num()) { UINT packetID=0; for(; packetIDSendLoop(); return 0; } DWORD RTMPPublisher::SocketThread(RTMPPublisher *publisher) { publisher->SocketLoop(); return 0; } void RTMPPublisher::DropFrame(UINT id) { NetworkPacket &dropPacket = queuedPackets[id]; currentBufferSize -= dropPacket.data.Num(); PacketType type = dropPacket.type; dropPacket.data.Clear(); if(dropPacket.type < PacketType_VideoHigh) numBFramesDumped++; else numPFramesDumped++; for(UINT i=id+1; i=0; i--) { UINT distance = (id-UINT(i)); if(queuedPackets[i].distanceFromDroppedFrame <= distance) break; queuedPackets[i].distanceFromDroppedFrame = distance; } bool bSetPriority = true; for(UINT i=id+1; i= PacketType_VideoHigh) { if(packet.type < PacketType_VideoHighest) { currentBufferSize -= packet.data.Num(); packet.data.Clear(); queuedPackets.Remove(i--); if(packet.type < PacketType_VideoHigh) numBFramesDumped++; else numPFramesDumped++; } else { bSetPriority = false; break; } } else { if(packet.type >= type) { bSetPriority = false; break; } } } } if(bSetPriority) { if(type >= PacketType_VideoHigh) packetWaitType = PacketType_VideoHighest; else { if(packetWaitType < type) packetWaitType = type; } } } //video packet count exceeding maximum. find lowest priority frame to dump bool RTMPPublisher::DoIFrameDelay(bool bBFramesOnly) { int curWaitType = PacketType_VideoDisposable; while(!bBFramesOnly && curWaitType < PacketType_VideoHighest || bBFramesOnly && curWaitType < PacketType_VideoHigh) { UINT bestPacket = INVALID; UINT bestPacketDistance = 0; if(curWaitType == PacketType_VideoHigh) { bool bFoundIFrame = false; for(int i=int(queuedPackets.Num())-1; i>=0; i--) { NetworkPacket &packet = queuedPackets[i]; if(packet.type == PacketType_Audio) continue; if(packet.type == curWaitType) { if(bFoundIFrame) { bestPacket = UINT(i); break; } else if(bestPacket == INVALID) bestPacket = UINT(i); } else if(packet.type == PacketType_VideoHighest) bFoundIFrame = true; } } else { for(UINT i=0; i bestPacketDistance) { bestPacket = i; bestPacketDistance = packet.distanceFromDroppedFrame; } } } } if(bestPacket != INVALID) { DropFrame(bestPacket); queuedPackets.Remove(bestPacket); return true; } curWaitType++; } return false; } void RTMPPublisher::RequestKeyframe(int waitTime) { App->RequestKeyframe(waitTime); } int RTMPPublisher::BufferedSend(RTMPSockBuf *sb, const char *buf, int len, RTMPPublisher *network) { bool bComplete = false; int fullLen = len; //NOTE: This function is called from the SendLoop thread, be careful of race conditions. retrySend: //We may have been disconnected mid-shutdown or something, just pretend we wrote the data //to avoid blocking if the socket loop exited. if (!RTMP_IsConnected(network->rtmp)) return len; OSEnterMutex(network->hDataBufferMutex); if (network->curDataBufferLen + len >= network->dataBufferSize) { //Log(TEXT("RTMPPublisher::BufferedSend: Socket buffer is full (%d / %d bytes), waiting to send %d bytes"), network->curDataBufferLen, network->dataBufferSize, len); ++network->totalTimesWaited; network->totalBytesWaited += len; OSLeaveMutex(network->hDataBufferMutex); int status = WaitForSingleObject(network->hBufferSpaceAvailableEvent, INFINITE); if (status == WAIT_ABANDONED || status == WAIT_FAILED) return 0; goto retrySend; } mcpy(network->dataBuffer + network->curDataBufferLen, buf, len); network->curDataBufferLen += len; OSLeaveMutex(network->hDataBufferMutex); SetEvent (network->hBufferEvent); return len; } NetworkStream* CreateRTMPPublisher() { return new RTMPPublisher; }