/******************************************************************************** Copyright (C) 2012 Hugh Bailey This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. ********************************************************************************/ #include "Main.h" #include "RTMPStuff.h" #include "RTMPPublisher.h" void rtmp_log_output(int level, const char *format, va_list vl) { int size = _vscprintf(format, vl); LPSTR lpTemp = (LPSTR)Allocate(size+1); vsprintf_s(lpTemp, size+1, format, vl); // OSDebugOut(TEXT("%S\r\n"), lpTemp); Log(TEXT("%S\r\n"), lpTemp); Free(lpTemp); } RTMPPublisher::RTMPPublisher(RTMP *rtmpIn, BOOL bUseSendBuffer, UINT sendBufferSize) { rtmp = rtmpIn; //------------------------------------------ sendBuffer.SetSize(sendBufferSize); curSendBufferLen = 0; this->bUseSendBuffer = bUseSendBuffer; if(bUseSendBuffer) { Log(TEXT("Send Buffer Size: %u"), sendBufferSize); rtmp->m_customSendFunc = (CUSTOMSEND)RTMPPublisher::BufferedSend; rtmp->m_customSendParam = this; rtmp->m_bCustomSend = TRUE; //todo: this is a test setsockopt(rtmp->m_sb.sb_socket, SOL_SOCKET, SO_SNDBUF, (char*)&sendBufferSize, sizeof(sendBufferSize)); } else Log(TEXT("Not using send buffering")); //------------------------------------------ bufferTime = (DWORD)AppConfig->GetInt(TEXT("Publish"), TEXT("FrameDropBufferTime"), 2); if(bufferTime < 1) bufferTime = 1; else if(bufferTime > 10) bufferTime = 10; bufferTime *= 1000; dropThreshold = (DWORD)AppConfig->GetInt(TEXT("Publish"), TEXT("FrameDropThreshold"), 400); if(dropThreshold < 50) dropThreshold = 50; else if(dropThreshold > 1000) dropThreshold = 1000; dropThreshold += bufferTime; Log(TEXT("bufferTime: %u, dropThreshold: %u"), bufferTime, dropThreshold); //------------------------------------------ hSendSempahore = CreateSemaphore(NULL, 0, 0x7FFFFFFFL, NULL); if(!hSendSempahore) CrashError(TEXT("RTMPPublisher: Could not create semaphore")); hDataMutex = OSCreateMutex(); if(!hDataMutex) CrashError(TEXT("RTMPPublisher: Could not create mutex")); hSendThread = OSCreateThread((XTHREAD)RTMPPublisher::SendThread, this); if(!hSendThread) CrashError(TEXT("RTMPPublisher: Could not create thread")); //------------------------------------------ packetWaitType = 0; } RTMPPublisher::~RTMPPublisher() { bStopping = true; ReleaseSemaphore(hSendSempahore, 1, NULL); OSTerminateThread(hSendThread, 20000); CloseHandle(hSendSempahore); OSCloseMutex(hDataMutex); //flush the last of the data if(bUseSendBuffer && RTMP_IsConnected(rtmp)) FlushSendBuffer(); //-------------------------- for(UINT i=0; i dropThreshold && currentBufferSize > outputRateSize) { Log(TEXT("killing frames")); while(currentBufferSize > outputRateSize) { if(!DoIFrameDelay(false)) break; } ignoreCount = int(queuedPackets.Num()); } } if(timestamp >= bufferTime) { sendTime = timestamp-bufferTime; ReleaseSemaphore(hSendSempahore, 1, NULL); } } void RTMPPublisher::SendPacket(BYTE *data, UINT size, DWORD timestamp, PacketType type) { if(!bStopping) { bool bAddPacket = false; if(type >= packetWaitType) { if(type != PacketType_Audio) packetWaitType = PacketType_VideoDisposable; bAddPacket = true; } OSEnterMutex(hDataMutex); if(bAddPacket) { List paddedData; paddedData.SetSize(size+RTMP_MAX_HEADER_SIZE); mcpy(paddedData.Array()+RTMP_MAX_HEADER_SIZE, data, size); currentBufferSize += paddedData.Num(); UINT droppedFrameVal = queuedPackets.Num() ? queuedPackets.Last().distanceFromDroppedFrame+1 : 10000; NetworkPacket *queuedPacket = queuedPackets.CreateNew(); queuedPacket->distanceFromDroppedFrame = droppedFrameVal; queuedPacket->data.TransferFrom(paddedData); queuedPacket->timestamp = timestamp; queuedPacket->type = type; } else { if(type < PacketType_VideoHigh) numBFramesDumped++; else numPFramesDumped++; } ProcessPackets(timestamp); OSLeaveMutex(hDataMutex); } } void RTMPPublisher::BeginPublishing() { RTMPPacket packet; char pbuf[2048], *pend = pbuf+sizeof(pbuf); packet.m_nChannel = 0x03; // control channel (invoke) packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_INFO; packet.m_nTimeStamp = 0; packet.m_nInfoField2 = rtmp->m_stream_id; packet.m_hasAbsTimestamp = TRUE; packet.m_body = pbuf + RTMP_MAX_HEADER_SIZE; char *enc = packet.m_body; enc = AMF_EncodeString(enc, pend, &av_setDataFrame); enc = AMF_EncodeString(enc, pend, &av_onMetaData); enc = App->EncMetaData(enc, pend); packet.m_nBodySize = enc - packet.m_body; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { App->PostStopMessage(); return; } //---------------------------------------------- List packetPadding; DataPacket mediaHeaders; //---------------------------------------------- packet.m_nChannel = 0x05; // source channel packet.m_packetType = RTMP_PACKET_TYPE_AUDIO; App->GetAudioHeaders(mediaHeaders); packetPadding.SetSize(RTMP_MAX_HEADER_SIZE); packetPadding.AppendArray(mediaHeaders.lpPacket, mediaHeaders.size); packet.m_body = (char*)packetPadding.Array()+RTMP_MAX_HEADER_SIZE; packet.m_nBodySize = mediaHeaders.size; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { App->PostStopMessage(); return; } //---------------------------------------------- packet.m_nChannel = 0x04; // source channel packet.m_headerType = RTMP_PACKET_SIZE_LARGE; packet.m_packetType = RTMP_PACKET_TYPE_VIDEO; App->GetVideoHeaders(mediaHeaders); packetPadding.SetSize(RTMP_MAX_HEADER_SIZE); packetPadding.AppendArray(mediaHeaders.lpPacket, mediaHeaders.size); packet.m_body = (char*)packetPadding.Array()+RTMP_MAX_HEADER_SIZE; packet.m_nBodySize = mediaHeaders.size; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { App->PostStopMessage(); return; } } double RTMPPublisher::GetPacketStrain() const { if(packetWaitType > PacketType_VideoLow) return 100.0; else if(packetWaitType > PacketType_VideoDisposable) return 50.0; return 0.0; } QWORD RTMPPublisher::GetCurrentSentBytes() { return bytesSent; } DWORD RTMPPublisher::NumDroppedFrames() const { return numBFramesDumped+numPFramesDumped; } void RTMPPublisher::SendLoop() { SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); while(WaitForSingleObject(hSendSempahore, INFINITE) == WAIT_OBJECT_0) { /*//-------------------------------------------- // read DWORD pendingBytes = 0; ioctlsocket(rtmp->m_sb.sb_socket, FIONREAD, &pendingBytes); if(pendingBytes) { RTMPPacket packet; zero(&packet, sizeof(packet)); while(RTMP_ReadPacket(rtmp, &packet) && !RTMPPacket_IsReady(&packet) && RTMP_IsConnected(rtmp)); if(!RTMP_IsConnected(rtmp)) { App->PostStopMessage(); bStopping = true; break; } RTMPPacket_Free(&packet); }*/ //-------------------------------------------- // send while(true) { if(bStopping || !RTMP_IsConnected(rtmp)) return; OSEnterMutex(hDataMutex); if(queuedPackets.Num() == 0) { OSLeaveMutex(hDataMutex); break; } bool bFoundPacket = false; if(queuedPackets[0].timestamp < sendTime) { NetworkPacket &packet = queuedPackets[0]; bFoundPacket = true; currentBufferSize -= packet.data.Num(); } else { OSLeaveMutex(hDataMutex); break; } List packetData; PacketType type = queuedPackets[0].type; DWORD timestamp = queuedPackets[0].timestamp; packetData.TransferFrom(queuedPackets[0].data); queuedPackets.Remove(0); OSLeaveMutex(hDataMutex); //-------------------------------------------- if(ignoreCount > 0) { if(--ignoreCount == 0) Log(TEXT("ignore complete")); } //-------------------------------------------- RTMPPacket packet; packet.m_nChannel = (type == PacketType_Audio) ? 0x5 : 0x4; packet.m_headerType = RTMP_PACKET_SIZE_MEDIUM; packet.m_packetType = (type == PacketType_Audio) ? RTMP_PACKET_TYPE_AUDIO : RTMP_PACKET_TYPE_VIDEO; packet.m_nTimeStamp = timestamp; packet.m_nInfoField2 = rtmp->m_stream_id; packet.m_hasAbsTimestamp = TRUE; packet.m_nBodySize = packetData.Num()-RTMP_MAX_HEADER_SIZE; packet.m_body = (char*)packetData.Array()+RTMP_MAX_HEADER_SIZE; if(!RTMP_SendPacket(rtmp, &packet, FALSE)) { if(!RTMP_IsConnected(rtmp)) { App->PostStopMessage(); bStopping = true; break; } } //---------------------------------------------------------- outputRateSize += packetData.Num(); packetSizeRecord << PacketTimeSize(timestamp, packetData.Num()); if(packetSizeRecord.Num()) { UINT packetID=0; for(; packetID maxBufferTime) { if(!FlushSendBuffer()) { App->PostStopMessage(); bStopping = true; break; } firstBufferedVideoFrameTimestamp = timestamp; } } bytesSent += packetData.Num(); } } } DWORD RTMPPublisher::SendThread(RTMPPublisher *publisher) { publisher->SendLoop(); return 0; } void RTMPPublisher::DropFrame(UINT id) { NetworkPacket &dropPacket = queuedPackets[id]; currentBufferSize -= dropPacket.data.Num(); PacketType type = dropPacket.type; dropPacket.data.Clear(); if(dropPacket.type < PacketType_VideoHigh) numBFramesDumped++; else numPFramesDumped++; for(UINT i=id+1; i=0; i--) { UINT distance = (id-UINT(i)); if(queuedPackets[i].distanceFromDroppedFrame <= distance) break; queuedPackets[i].distanceFromDroppedFrame = distance; } bool bSetPriority = true; for(UINT i=id+1; i= PacketType_VideoHigh) { if(packet.type < PacketType_VideoHighest) { currentBufferSize -= packet.data.Num(); packet.data.Clear(); queuedPackets.Remove(i--); if(packet.type < PacketType_VideoHigh) numBFramesDumped++; else numPFramesDumped++; } else { bSetPriority = false; break; } } else { if(packet.type >= type) { bSetPriority = false; break; } } } } if(bSetPriority) { if(type >= PacketType_VideoHigh) packetWaitType = PacketType_VideoHighest; else { if(packetWaitType < type) packetWaitType = type; } } } //video packet count exceeding maximum. find lowest priority frame to dump bool RTMPPublisher::DoIFrameDelay(bool bBFramesOnly) { int curWaitType = PacketType_VideoDisposable; while(!bBFramesOnly && curWaitType < PacketType_VideoHighest || bBFramesOnly && curWaitType < PacketType_VideoHigh) { UINT bestPacket = INVALID; UINT bestPacketDistance = 0; if(curWaitType == PacketType_VideoHigh) { bool bFoundIFrame = false; for(int i=int(queuedPackets.Num())-1; i>=0; i--) { NetworkPacket &packet = queuedPackets[i]; if(packet.type == PacketType_Audio) continue; if(packet.type == curWaitType) { if(bFoundIFrame) { bestPacket = UINT(i); break; } else if(bestPacket == INVALID) bestPacket = UINT(i); } else if(packet.type == PacketType_VideoHighest) bFoundIFrame = true; } } else { for(UINT i=0; i bestPacketDistance) { bestPacket = i; bestPacketDistance = packet.distanceFromDroppedFrame; } } } } if(bestPacket != INVALID) { DropFrame(bestPacket); queuedPackets.Remove(bestPacket); return true; } curWaitType++; } return false; } int RTMPPublisher::FlushSendBuffer() { if(!curSendBufferLen) return 1; SOCKET sb_socket = rtmp->m_sb.sb_socket; BYTE *lpTemp = sendBuffer.Array(); int totalBytesSent = curSendBufferLen; while(totalBytesSent > 0) { int nBytes = send(sb_socket, (const char*)lpTemp, totalBytesSent, 0); if(nBytes < 0) return nBytes; if(nBytes == 0) return 0; totalBytesSent -= nBytes; lpTemp += nBytes; } int prevSendBufferSize = curSendBufferLen; curSendBufferLen = 0; return prevSendBufferSize; } int RTMPPublisher::BufferedSend(RTMPSockBuf *sb, const char *buf, int len, RTMPPublisher *network) { bool bComplete = false; int fullLen = len; do { int newTotal = network->curSendBufferLen+len; //buffer full, send if(newTotal >= int(network->sendBuffer.Num())) { int pendingBytes = newTotal-network->sendBuffer.Num(); int copyCount = network->sendBuffer.Num()-network->curSendBufferLen; mcpy(network->sendBuffer.Array()+network->curSendBufferLen, buf, copyCount); BYTE *lpTemp = network->sendBuffer.Array(); int totalBytesSent = network->sendBuffer.Num(); while(totalBytesSent > 0) { int nBytes = send(sb->sb_socket, (const char*)lpTemp, totalBytesSent, 0); if(nBytes < 0) return nBytes; if(nBytes == 0) return 0; totalBytesSent -= nBytes; lpTemp += nBytes; } network->curSendBufferLen = 0; if(pendingBytes) { buf += copyCount; len -= copyCount; } else bComplete = true; } else { if(len) { mcpy(network->sendBuffer.Array()+network->curSendBufferLen, buf, len); network->curSendBufferLen = newTotal; } bComplete = true; } } while(!bComplete); return fullLen; } NetworkStream* CreateRTMPPublisher(String &failReason, bool &bCanRetry) { //------------------------------------------------------ // set up URL bCanRetry = false; int serviceID = AppConfig->GetInt (TEXT("Publish"), TEXT("Service")); String strURL = AppConfig->GetString(TEXT("Publish"), TEXT("URL")); String strPlayPath = AppConfig->GetString(TEXT("Publish"), TEXT("PlayPath")); if(!strURL.IsValid()) { failReason = TEXT("No server specified to connect to"); return NULL; } if(serviceID != 0) { XConfig serverData; if(!serverData.Open(TEXT("services.xconfig"))) { failReason = TEXT("Could not open services.xconfig"); return NULL; } XElement *services = serverData.GetElement(TEXT("services")); if(!services) { failReason = TEXT("Could not any services in services.xconfig"); return NULL; } XElement *service = services->GetElementByID(serviceID-1); if(!service) { failReason = TEXT("Could not find the service specified in services.xconfig"); return NULL; } XElement *servers = service->GetElement(TEXT("servers")); if(!servers) { failReason = TEXT("Could not find any servers for the service specified in services.xconfig"); return NULL; } XDataItem *item = servers->GetDataItem(strURL); if(!item) item = servers->GetDataItemByID(0); strURL = item->GetData(); } //------------------------------------------------------ RTMP *rtmp = RTMP_Alloc(); RTMP_Init(rtmp); /*RTMP_LogSetCallback(rtmp_log_output); RTMP_LogSetLevel(RTMP_LOGDEBUG2);*/ LPSTR lpAnsiURL = strURL.CreateUTF8String(); LPSTR lpAnsiPlaypath = strPlayPath.CreateUTF8String(); if(!RTMP_SetupURL2(rtmp, lpAnsiURL, lpAnsiPlaypath)) { failReason = Str("Connection.CouldNotParseURL"); RTMP_Free(rtmp); Free(lpAnsiURL); Free(lpAnsiPlaypath); return NULL; } RTMP_EnableWrite(rtmp); //set it to publish /*rtmp->Link.swfUrl.av_len = rtmp->Link.tcUrl.av_len; rtmp->Link.swfUrl.av_val = rtmp->Link.tcUrl.av_val; rtmp->Link.pageUrl.av_len = rtmp->Link.tcUrl.av_len; rtmp->Link.pageUrl.av_val = rtmp->Link.tcUrl.av_val;*/ rtmp->Link.flashVer.av_val = "FMLE/3.0 (compatible; FMSc/1.0)"; rtmp->Link.flashVer.av_len = (int)strlen(rtmp->Link.flashVer.av_val); BOOL bUseSendBuffer = AppConfig->GetInt(TEXT("Publish"), TEXT("UseSendBuffer"), 1); UINT sendBufferSize = AppConfig->GetInt(TEXT("Publish"), TEXT("SendBufferSize"), 1460); if(sendBufferSize > 32120) sendBufferSize = 32120; else if(sendBufferSize < 536) sendBufferSize = 536; rtmp->m_outChunkSize = 4096;//RTMP_DEFAULT_CHUNKSIZE;// rtmp->m_bSendChunkSizeInfo = TRUE; if (!bUseSendBuffer) rtmp->m_bUseNagle = TRUE; if(!RTMP_Connect(rtmp, NULL)) { failReason = Str("Connection.CouldNotConnect"); RTMP_Free(rtmp); bCanRetry = true; Free(lpAnsiURL); Free(lpAnsiPlaypath); return NULL; } if(!RTMP_ConnectStream(rtmp, 0)) { failReason = Str("Connection.InvalidStream"); RTMP_Close(rtmp); RTMP_Free(rtmp); Free(lpAnsiURL); Free(lpAnsiPlaypath); return NULL; } Free(lpAnsiURL); Free(lpAnsiPlaypath); return new RTMPPublisher(rtmp, bUseSendBuffer, sendBufferSize); }