From c1ca4d2d343e01ab9eb672bf6d3b2809e11efe19 Mon Sep 17 00:00:00 2001 From: Cyp Date: Fri, 16 Jul 2010 21:46:00 +0200 Subject: [PATCH] newnet: Don't send lots of small packets, fix "connection error"s when listing games. Since there is no advantage in sending data more than once per game tick, queue up data until the game time ticks. This may result in sligtly better compression in some cases, due to not flushing non-stop. Don't close a socket while data is queued to be sent, since this sometimes breaks listing games (where the list is queued, and the socket is closed). --- lib/netplay/netplay.c | 30 ++++++ lib/netplay/netplay.h | 1 + lib/netplay/netsocket.cpp | 190 +++++++++++++++++++++++++++----------- lib/netplay/netsocket.h | 2 + src/loop.c | 7 ++ src/wrappers.c | 1 + 6 files changed, 175 insertions(+), 56 deletions(-) diff --git a/lib/netplay/netplay.c b/lib/netplay/netplay.c index 55af5b238..8f00dc8be 100644 --- a/lib/netplay/netplay.c +++ b/lib/netplay/netplay.c @@ -1252,6 +1252,34 @@ BOOL NETsend(uint8_t player, NETMESSAGE message) return false; } +void NETflush() +{ + if (!NetPlay.bComms) + { + return; + } + + if (NetPlay.isHost) + { + int player; + for (player = 0; player < MAX_CONNECTED_PLAYERS; ++player) + { + // We are the host, send directly to player. + if (connected_bsocket[player] != NULL) + { + socketFlush(connected_bsocket[player]); + } + } + } + else + { + if (bsocket != NULL) + { + socketFlush(bsocket); + } + } +} + /////////////////////////////////////////////////////////////////////////// // Check if a message is a system message static BOOL NETprocessSystemMessage(NETQUEUE playerQueue, uint8_t type) @@ -2232,6 +2260,7 @@ static void NETallowJoining(void) NETbeginEncode(NETnetQueue(index), NET_REJECTED); NETuint8_t(&rejected); NETend(); + NETflush(); allow_joining = false; // no need to inform master server NET_DestroyPlayer(index); @@ -2673,6 +2702,7 @@ BOOL NETjoinGame(UDWORD gameNumber, const char* playername) NETstring(NetPlay.gamePassword, sizeof(NetPlay.gamePassword)); NETint32_t(&NETCODE_HASH); //unused NETend(); + socketFlush(bsocket); // Make sure the message was completely sent. i = SDL_GetTicks(); // Loop until we've been accepted into the game diff --git a/lib/netplay/netplay.h b/lib/netplay/netplay.h index 3357c1ca2..3e4d3fa56 100644 --- a/lib/netplay/netplay.h +++ b/lib/netplay/netplay.h @@ -297,6 +297,7 @@ extern int NETinit(BOOL bFirstCall); // init BOOL NETsend(uint8_t player, NETMESSAGE message); ///< send to player, or broadcast if player == NET_ALL_PLAYERS. extern BOOL NETrecvNet(NETQUEUE *queue, uint8_t *type); ///< recv a message from the net queues if possible. extern BOOL NETrecvGame(NETQUEUE *queue, uint8_t *type); ///< recv a message from the game queues which is sceduled to execute by time, if possible. +void NETflush(void); ///< Flushes any data stuck in compression buffers. extern UBYTE NETsendFile(char *fileName, UDWORD player); // send file chunk. extern UBYTE NETrecvFile(NETQUEUE queue); // recv file chunk diff --git a/lib/netplay/netsocket.cpp b/lib/netplay/netsocket.cpp index 734b338a0..d70123a13 100644 --- a/lib/netplay/netsocket.cpp +++ b/lib/netplay/netsocket.cpp @@ -97,18 +97,20 @@ struct Socket * * All non-listening sockets will only use the first socket handle. */ - Socket() : ready(false), writeError(false), isCompressed(false), readDisconnected(false) {} + Socket() : ready(false), writeError(false), deleteLater(false), isCompressed(false), readDisconnected(false), zDeflateInSize(0) {} ~Socket(); SOCKET fd[SOCK_COUNT]; bool ready; bool writeError; + bool deleteLater; char textAddress[40]; bool isCompressed; bool readDisconnected; ///< True iff a call to recv() returned 0. z_stream zDeflate; z_stream zInflate; + unsigned zDeflateInSize; std::vector zDeflateOutBuf; std::vector zInflateInBuf; }; @@ -127,6 +129,9 @@ typedef std::map > SocketThreadWriteMap; static SocketThreadWriteMap socketThreadWrites; +static void socketCloseNow(Socket *sock); + + bool socketReadReady(Socket const *sock) { return sock->ready; @@ -434,9 +439,10 @@ static int socketThreadFunction(void *) ++i; Socket *sock = w->first; - std::vector &writeQueue = socketThreadWrites[sock]; + std::vector &writeQueue = w->second; + ASSERT(!writeQueue.empty(), "writeQueue[sock] must not be empty."); - if (writeQueue.empty() || !FD_ISSET(sock->fd[SOCK_CONNECTION], &fds)) + if (!FD_ISSET(sock->fd[SOCK_CONNECTION], &fds)) { continue; // This socket is not ready for writing, or we don't have anything to write. } @@ -450,6 +456,10 @@ static int socketThreadFunction(void *) if (writeQueue.empty()) { socketThreadWrites.erase(w); // Nothing left to write, delete from pending list. + if (sock->deleteLater) + { + socketCloseNow(sock); + } } } else @@ -465,6 +475,10 @@ static int socketThreadFunction(void *) debug(LOG_NET, "Socket error"); sock->writeError = true; socketThreadWrites.erase(w); // Socket broken, don't try writing to it again. + if (sock->deleteLater) + { + socketCloseNow(sock); + } break; } case EINTR: @@ -477,6 +491,10 @@ static int socketThreadFunction(void *) default: sock->writeError = true; socketThreadWrites.erase(w); // Socket broken, don't try writing to it again. + if (sock->deleteLater) + { + socketCloseNow(sock); + } break; } } @@ -540,13 +558,17 @@ ssize_t readNoInt(Socket* sock, void* buf, size_t max_size) sock->zInflate.avail_out = max_size; int ret = inflate(&sock->zInflate, Z_NO_FLUSH); ASSERT(ret != Z_STREAM_ERROR, "zlib inflate not working!"); + char const *err = NULL; switch (ret) { - case Z_NEED_DICT: - case Z_DATA_ERROR: - case Z_MEM_ERROR: - debug(LOG_ERROR, "Couldn't decompress data from socket. zlib error %d", ret); - return -1; // Bad data! + case Z_NEED_DICT: err = "Z_NEED_DICT"; break; + case Z_DATA_ERROR: err = "Z_DATA_ERROR"; break; + case Z_MEM_ERROR: err = "Z_MEM_ERROR"; break; + } + if (err != NULL) + { + debug(LOG_ERROR, "Couldn't decompress data from socket. zlib error %s", err); + return -1; // Bad data! } return max_size - sock->zInflate.avail_out; // Got some data, return how much. @@ -595,43 +617,89 @@ ssize_t writeAll(Socket* sock, const void* buf, size_t size) if (size > 0) { - wzMutexLock(socketThreadMutex); - if (socketThreadWrites.empty()) - { - wzSemaphorePost(socketThreadSemaphore); - } - std::vector &writeQueue = socketThreadWrites[sock]; if (!sock->isCompressed) { + wzMutexLock(socketThreadMutex); + if (socketThreadWrites.empty()) + { + wzSemaphorePost(socketThreadSemaphore); + } + std::vector &writeQueue = socketThreadWrites[sock]; writeQueue.insert(writeQueue.end(), static_cast(buf), static_cast(buf) + size); + wzMutexUnlock(socketThreadMutex); } else { sock->zDeflate.next_in = (Bytef *)buf; sock->zDeflate.avail_in = size; - sock->zDeflateOutBuf.resize(size + 20); + sock->zDeflateInSize += sock->zDeflate.avail_in; do { - sock->zDeflate.next_out = (Bytef *)&sock->zDeflateOutBuf[0]; - sock->zDeflate.avail_out = sock->zDeflateOutBuf.size(); + size_t alreadyHave = sock->zDeflateOutBuf.size(); + sock->zDeflateOutBuf.resize(alreadyHave + size + 20); // A bit more than size should be enough to always do everything in one go. + sock->zDeflate.next_out = (Bytef *)&sock->zDeflateOutBuf[alreadyHave]; + sock->zDeflate.avail_out = sock->zDeflateOutBuf.size() - alreadyHave; - int ret = deflate(&sock->zDeflate, Z_SYNC_FLUSH); /* no bad return value */ + int ret = deflate(&sock->zDeflate, Z_NO_FLUSH); ASSERT(ret != Z_STREAM_ERROR, "zlib compression failed!"); - writeQueue.insert(writeQueue.end(), sock->zDeflateOutBuf.begin(), sock->zDeflateOutBuf.end() - sock->zDeflate.avail_out); - - // Primitive network logging, uncomment to use. - //printf("Size %3zu ->%3zu, buf =", size, sock->zDeflateOutBuf.size() - sock->zDeflate.avail_out); - //for (unsigned n = 0; n < std::min(size, 30); ++n) printf(" %02X", static_cast(buf)[n]&0xFF); - //printf("\n"); + // Remove unused part of buffer. + sock->zDeflateOutBuf.resize(sock->zDeflateOutBuf.size() - sock->zDeflate.avail_out); } while(sock->zDeflate.avail_out == 0); + + ASSERT(sock->zDeflate.avail_in == 0, "zlib didn't compress everything!"); } - wzMutexUnlock(socketThreadMutex); } return size; } +void socketFlush(Socket *sock) +{ + if (!sock->isCompressed) + { + return; // Not compressed, so don't mess with zlib. + } + + // Flush data out of zlib compression state. + do + { + size_t alreadyHave = sock->zDeflateOutBuf.size(); + sock->zDeflateOutBuf.resize(alreadyHave + 1000); // 100 bytes would probably be enough to flush the rest in one go. + sock->zDeflate.next_out = (Bytef *)&sock->zDeflateOutBuf[alreadyHave]; + sock->zDeflate.avail_out = sock->zDeflateOutBuf.size() - alreadyHave; + + int ret = deflate(&sock->zDeflate, Z_SYNC_FLUSH); + ASSERT(ret != Z_STREAM_ERROR, "zlib compression failed!"); + + // Remove unused part of buffer. + sock->zDeflateOutBuf.resize(sock->zDeflateOutBuf.size() - sock->zDeflate.avail_out); + } while(sock->zDeflate.avail_out == 0); + + if (sock->zDeflateOutBuf.empty()) + { + return; // No data to flush out. + } + + wzMutexLock(socketThreadMutex); + if (socketThreadWrites.empty()) + { + wzSemaphorePost(socketThreadSemaphore); + } + std::vector &writeQueue = socketThreadWrites[sock]; + writeQueue.insert(writeQueue.end(), sock->zDeflateOutBuf.begin(), sock->zDeflateOutBuf.end()); + wzMutexUnlock(socketThreadMutex); + + // Primitive network logging, uncomment to use. + //printf("Size %3u ->%3zu, buf =", sock->zDeflateInSize, sock->zDeflateOutBuf.size()); + //for (unsigned n = 0; n < std::min(sock->zDeflateOutBuf.size(), 40); ++n) printf(" %02X", sock->zDeflateOutBuf[n]); + //printf("\n"); + + // Data sent, don't send again. + sock->zDeflateInSize = 0; + sock->zDeflateOutBuf.clear(); +} + void socketBeginCompression(Socket *sock) { if (sock->isCompressed) @@ -901,41 +969,51 @@ ssize_t readAll(Socket* sock, void* buf, size_t size, unsigned int timeout) return received; } +static void socketCloseNow(Socket *sock) +{ + for (unsigned i = 0; i < ARRAY_SIZE(sock->fd); ++i) + { + if (sock->fd[i] != INVALID_SOCKET) + { +#if defined(WZ_OS_WIN) + int err = closesocket(sock->fd[i]); +#else + int err = close(sock->fd[i]); +#endif + if (err) + { + debug(LOG_ERROR, "Failed to close socket %p: %s", sock, strSockError(getSockErr())); + } + + /* Make sure that dangling pointers to this + * structure don't think they've got their + * hands on a valid socket. + */ + sock->fd[i] = INVALID_SOCKET; + } + } + delete sock; +} + void socketClose(Socket* sock) { - unsigned int i; - int err = 0; - - if (sock) + if (sock == NULL) { - wzMutexLock(socketThreadMutex); - socketThreadWrites.erase(sock); - wzMutexUnlock(socketThreadMutex); - - for (i = 0; i < ARRAY_SIZE(sock->fd); ++i) - { - if (sock->fd[i] != INVALID_SOCKET) - { -#if defined(WZ_OS_WIN) - err = closesocket(sock->fd[i]); -#else - err = close(sock->fd[i]); -#endif - if (err) - { - debug(LOG_ERROR, "Failed to close socket %p: %s", sock, strSockError(getSockErr())); - } - - /* Make sure that dangling pointers to this - * structure don't think they've got their - * hands on a valid socket. - */ - sock->fd[i] = INVALID_SOCKET; - } - } - delete sock; - sock = NULL; + return; } + wzMutexLock(socketThreadMutex); + //Instead of socketThreadWrites.erase(sock);, try sending the data before actually deleting. + if (socketThreadWrites.find(sock) != socketThreadWrites.end()) + { + // Wait until the data is written, then delete the socket. + sock->deleteLater = true; + } + else + { + // Delete the socket. + socketCloseNow(sock); + } + wzMutexUnlock(socketThreadMutex); } Socket *socketAccept(Socket *sock) diff --git a/lib/netplay/netsocket.h b/lib/netplay/netsocket.h index 1b589a8f4..6c8056874 100644 --- a/lib/netplay/netsocket.h +++ b/lib/netplay/netsocket.h @@ -65,8 +65,10 @@ ssize_t readNoInt(Socket *sock, void *buf, size_t max_size); ///< Rea ssize_t readAll(Socket* sock, void *buf, size_t size, unsigned timeout);///< Reads exactly size bytes from the Socket, or blocks until the timeout expires. ssize_t writeAll(Socket *sock, const void* buf, size_t size); ///< Nonblocking write of size bytes to the Socket. All bytes will be written asynchronously, by a separate thread. +// Sockets, compressed. void socketBeginCompression(Socket *sock); ///< Makes future data sent compressed, and future data received expected to be compressed. bool socketReadDisconnected(Socket *sock); ///< If readNoInt returned 0, returns true if this is the result of a disconnect, or false if the input compressed data just hasn't produced any output bytes. +void socketFlush(Socket *sock); ///< Actually sends the data written with writeAll. Only useful on compressed sockets. Note that flushing too often makes compression less effective. // Socket sets. SocketSet *allocSocketSet(void); ///< Constructs a SocketSet. diff --git a/src/loop.c b/src/loop.c index a42611b5b..a370fc5f5 100644 --- a/src/loop.c +++ b/src/loop.c @@ -147,6 +147,7 @@ GAMECODE gameLoop(void) INT_RETVAL intRetVal; int clearMode = 0; bool gameTicked; // true iff we are doing a logical update. + uint32_t lastFlushTime = 0; // Receive NET_BLAH messages. // Receive GAME_BLAH messages, and if it's time, process exactly as many GAME_BLAH messages as required to be able to tick the gameTime. @@ -165,6 +166,12 @@ GAMECODE gameLoop(void) gameSRand(gameTime); // Brute force way of synchronising the random number generator, which can't go out of synch. } + if (gameTicked || realTime - lastFlushTime < 400u) + { + lastFlushTime = realTime; + NETflush(); // Make sure the game time tick message is really sent over the network, and that we aren't waiting too long to send data. + } + if (bMultiPlayer && !NetPlay.isHostAlive && NetPlay.bComms && !NetPlay.isHost) { intAddInGamePopup(); diff --git a/src/wrappers.c b/src/wrappers.c index 14c45d0ff..4bdea6795 100644 --- a/src/wrappers.c +++ b/src/wrappers.c @@ -258,6 +258,7 @@ TITLECODE titleLoop(void) debug( LOG_FATAL, "unknown title screen mode" ); abort(); } + NETflush(); // Send any pending network data. audio_Update();