newnet: Don't send lots of small packets, fix "connection error"s when listing games.

Since there is no advantage in sending data more than once per game tick, queue up data until the game time ticks. This may result in sligtly better compression in some cases, due to not flushing non-stop.
Don't close a socket while data is queued to be sent, since this sometimes breaks listing games (where the list is queued, and the socket is closed).
master
Cyp 2010-07-16 21:46:00 +02:00
parent 8e93542537
commit c1ca4d2d34
6 changed files with 175 additions and 56 deletions

View File

@ -1252,6 +1252,34 @@ BOOL NETsend(uint8_t player, NETMESSAGE message)
return false;
}
void NETflush()
{
if (!NetPlay.bComms)
{
return;
}
if (NetPlay.isHost)
{
int player;
for (player = 0; player < MAX_CONNECTED_PLAYERS; ++player)
{
// We are the host, send directly to player.
if (connected_bsocket[player] != NULL)
{
socketFlush(connected_bsocket[player]);
}
}
}
else
{
if (bsocket != NULL)
{
socketFlush(bsocket);
}
}
}
///////////////////////////////////////////////////////////////////////////
// Check if a message is a system message
static BOOL NETprocessSystemMessage(NETQUEUE playerQueue, uint8_t type)
@ -2232,6 +2260,7 @@ static void NETallowJoining(void)
NETbeginEncode(NETnetQueue(index), NET_REJECTED);
NETuint8_t(&rejected);
NETend();
NETflush();
allow_joining = false; // no need to inform master server
NET_DestroyPlayer(index);
@ -2673,6 +2702,7 @@ BOOL NETjoinGame(UDWORD gameNumber, const char* playername)
NETstring(NetPlay.gamePassword, sizeof(NetPlay.gamePassword));
NETint32_t(&NETCODE_HASH); //unused
NETend();
socketFlush(bsocket); // Make sure the message was completely sent.
i = SDL_GetTicks();
// Loop until we've been accepted into the game

View File

@ -297,6 +297,7 @@ extern int NETinit(BOOL bFirstCall); // init
BOOL NETsend(uint8_t player, NETMESSAGE message); ///< send to player, or broadcast if player == NET_ALL_PLAYERS.
extern BOOL NETrecvNet(NETQUEUE *queue, uint8_t *type); ///< recv a message from the net queues if possible.
extern BOOL NETrecvGame(NETQUEUE *queue, uint8_t *type); ///< recv a message from the game queues which is sceduled to execute by time, if possible.
void NETflush(void); ///< Flushes any data stuck in compression buffers.
extern UBYTE NETsendFile(char *fileName, UDWORD player); // send file chunk.
extern UBYTE NETrecvFile(NETQUEUE queue); // recv file chunk

View File

@ -97,18 +97,20 @@ struct Socket
*
* All non-listening sockets will only use the first socket handle.
*/
Socket() : ready(false), writeError(false), isCompressed(false), readDisconnected(false) {}
Socket() : ready(false), writeError(false), deleteLater(false), isCompressed(false), readDisconnected(false), zDeflateInSize(0) {}
~Socket();
SOCKET fd[SOCK_COUNT];
bool ready;
bool writeError;
bool deleteLater;
char textAddress[40];
bool isCompressed;
bool readDisconnected; ///< True iff a call to recv() returned 0.
z_stream zDeflate;
z_stream zInflate;
unsigned zDeflateInSize;
std::vector<uint8_t> zDeflateOutBuf;
std::vector<uint8_t> zInflateInBuf;
};
@ -127,6 +129,9 @@ typedef std::map<Socket *, std::vector<uint8_t> > SocketThreadWriteMap;
static SocketThreadWriteMap socketThreadWrites;
static void socketCloseNow(Socket *sock);
bool socketReadReady(Socket const *sock)
{
return sock->ready;
@ -434,9 +439,10 @@ static int socketThreadFunction(void *)
++i;
Socket *sock = w->first;
std::vector<uint8_t> &writeQueue = socketThreadWrites[sock];
std::vector<uint8_t> &writeQueue = w->second;
ASSERT(!writeQueue.empty(), "writeQueue[sock] must not be empty.");
if (writeQueue.empty() || !FD_ISSET(sock->fd[SOCK_CONNECTION], &fds))
if (!FD_ISSET(sock->fd[SOCK_CONNECTION], &fds))
{
continue; // This socket is not ready for writing, or we don't have anything to write.
}
@ -450,6 +456,10 @@ static int socketThreadFunction(void *)
if (writeQueue.empty())
{
socketThreadWrites.erase(w); // Nothing left to write, delete from pending list.
if (sock->deleteLater)
{
socketCloseNow(sock);
}
}
}
else
@ -465,6 +475,10 @@ static int socketThreadFunction(void *)
debug(LOG_NET, "Socket error");
sock->writeError = true;
socketThreadWrites.erase(w); // Socket broken, don't try writing to it again.
if (sock->deleteLater)
{
socketCloseNow(sock);
}
break;
}
case EINTR:
@ -477,6 +491,10 @@ static int socketThreadFunction(void *)
default:
sock->writeError = true;
socketThreadWrites.erase(w); // Socket broken, don't try writing to it again.
if (sock->deleteLater)
{
socketCloseNow(sock);
}
break;
}
}
@ -540,13 +558,17 @@ ssize_t readNoInt(Socket* sock, void* buf, size_t max_size)
sock->zInflate.avail_out = max_size;
int ret = inflate(&sock->zInflate, Z_NO_FLUSH);
ASSERT(ret != Z_STREAM_ERROR, "zlib inflate not working!");
char const *err = NULL;
switch (ret)
{
case Z_NEED_DICT:
case Z_DATA_ERROR:
case Z_MEM_ERROR:
debug(LOG_ERROR, "Couldn't decompress data from socket. zlib error %d", ret);
return -1; // Bad data!
case Z_NEED_DICT: err = "Z_NEED_DICT"; break;
case Z_DATA_ERROR: err = "Z_DATA_ERROR"; break;
case Z_MEM_ERROR: err = "Z_MEM_ERROR"; break;
}
if (err != NULL)
{
debug(LOG_ERROR, "Couldn't decompress data from socket. zlib error %s", err);
return -1; // Bad data!
}
return max_size - sock->zInflate.avail_out; // Got some data, return how much.
@ -595,43 +617,89 @@ ssize_t writeAll(Socket* sock, const void* buf, size_t size)
if (size > 0)
{
wzMutexLock(socketThreadMutex);
if (socketThreadWrites.empty())
{
wzSemaphorePost(socketThreadSemaphore);
}
std::vector<uint8_t> &writeQueue = socketThreadWrites[sock];
if (!sock->isCompressed)
{
wzMutexLock(socketThreadMutex);
if (socketThreadWrites.empty())
{
wzSemaphorePost(socketThreadSemaphore);
}
std::vector<uint8_t> &writeQueue = socketThreadWrites[sock];
writeQueue.insert(writeQueue.end(), static_cast<char const *>(buf), static_cast<char const *>(buf) + size);
wzMutexUnlock(socketThreadMutex);
}
else
{
sock->zDeflate.next_in = (Bytef *)buf;
sock->zDeflate.avail_in = size;
sock->zDeflateOutBuf.resize(size + 20);
sock->zDeflateInSize += sock->zDeflate.avail_in;
do
{
sock->zDeflate.next_out = (Bytef *)&sock->zDeflateOutBuf[0];
sock->zDeflate.avail_out = sock->zDeflateOutBuf.size();
size_t alreadyHave = sock->zDeflateOutBuf.size();
sock->zDeflateOutBuf.resize(alreadyHave + size + 20); // A bit more than size should be enough to always do everything in one go.
sock->zDeflate.next_out = (Bytef *)&sock->zDeflateOutBuf[alreadyHave];
sock->zDeflate.avail_out = sock->zDeflateOutBuf.size() - alreadyHave;
int ret = deflate(&sock->zDeflate, Z_SYNC_FLUSH); /* no bad return value */
int ret = deflate(&sock->zDeflate, Z_NO_FLUSH);
ASSERT(ret != Z_STREAM_ERROR, "zlib compression failed!");
writeQueue.insert(writeQueue.end(), sock->zDeflateOutBuf.begin(), sock->zDeflateOutBuf.end() - sock->zDeflate.avail_out);
// Primitive network logging, uncomment to use.
//printf("Size %3zu ->%3zu, buf =", size, sock->zDeflateOutBuf.size() - sock->zDeflate.avail_out);
//for (unsigned n = 0; n < std::min<unsigned>(size, 30); ++n) printf(" %02X", static_cast<char const *>(buf)[n]&0xFF);
//printf("\n");
// Remove unused part of buffer.
sock->zDeflateOutBuf.resize(sock->zDeflateOutBuf.size() - sock->zDeflate.avail_out);
} while(sock->zDeflate.avail_out == 0);
ASSERT(sock->zDeflate.avail_in == 0, "zlib didn't compress everything!");
}
wzMutexUnlock(socketThreadMutex);
}
return size;
}
void socketFlush(Socket *sock)
{
if (!sock->isCompressed)
{
return; // Not compressed, so don't mess with zlib.
}
// Flush data out of zlib compression state.
do
{
size_t alreadyHave = sock->zDeflateOutBuf.size();
sock->zDeflateOutBuf.resize(alreadyHave + 1000); // 100 bytes would probably be enough to flush the rest in one go.
sock->zDeflate.next_out = (Bytef *)&sock->zDeflateOutBuf[alreadyHave];
sock->zDeflate.avail_out = sock->zDeflateOutBuf.size() - alreadyHave;
int ret = deflate(&sock->zDeflate, Z_SYNC_FLUSH);
ASSERT(ret != Z_STREAM_ERROR, "zlib compression failed!");
// Remove unused part of buffer.
sock->zDeflateOutBuf.resize(sock->zDeflateOutBuf.size() - sock->zDeflate.avail_out);
} while(sock->zDeflate.avail_out == 0);
if (sock->zDeflateOutBuf.empty())
{
return; // No data to flush out.
}
wzMutexLock(socketThreadMutex);
if (socketThreadWrites.empty())
{
wzSemaphorePost(socketThreadSemaphore);
}
std::vector<uint8_t> &writeQueue = socketThreadWrites[sock];
writeQueue.insert(writeQueue.end(), sock->zDeflateOutBuf.begin(), sock->zDeflateOutBuf.end());
wzMutexUnlock(socketThreadMutex);
// Primitive network logging, uncomment to use.
//printf("Size %3u ->%3zu, buf =", sock->zDeflateInSize, sock->zDeflateOutBuf.size());
//for (unsigned n = 0; n < std::min<unsigned>(sock->zDeflateOutBuf.size(), 40); ++n) printf(" %02X", sock->zDeflateOutBuf[n]);
//printf("\n");
// Data sent, don't send again.
sock->zDeflateInSize = 0;
sock->zDeflateOutBuf.clear();
}
void socketBeginCompression(Socket *sock)
{
if (sock->isCompressed)
@ -901,41 +969,51 @@ ssize_t readAll(Socket* sock, void* buf, size_t size, unsigned int timeout)
return received;
}
static void socketCloseNow(Socket *sock)
{
for (unsigned i = 0; i < ARRAY_SIZE(sock->fd); ++i)
{
if (sock->fd[i] != INVALID_SOCKET)
{
#if defined(WZ_OS_WIN)
int err = closesocket(sock->fd[i]);
#else
int err = close(sock->fd[i]);
#endif
if (err)
{
debug(LOG_ERROR, "Failed to close socket %p: %s", sock, strSockError(getSockErr()));
}
/* Make sure that dangling pointers to this
* structure don't think they've got their
* hands on a valid socket.
*/
sock->fd[i] = INVALID_SOCKET;
}
}
delete sock;
}
void socketClose(Socket* sock)
{
unsigned int i;
int err = 0;
if (sock)
if (sock == NULL)
{
wzMutexLock(socketThreadMutex);
socketThreadWrites.erase(sock);
wzMutexUnlock(socketThreadMutex);
for (i = 0; i < ARRAY_SIZE(sock->fd); ++i)
{
if (sock->fd[i] != INVALID_SOCKET)
{
#if defined(WZ_OS_WIN)
err = closesocket(sock->fd[i]);
#else
err = close(sock->fd[i]);
#endif
if (err)
{
debug(LOG_ERROR, "Failed to close socket %p: %s", sock, strSockError(getSockErr()));
}
/* Make sure that dangling pointers to this
* structure don't think they've got their
* hands on a valid socket.
*/
sock->fd[i] = INVALID_SOCKET;
}
}
delete sock;
sock = NULL;
return;
}
wzMutexLock(socketThreadMutex);
//Instead of socketThreadWrites.erase(sock);, try sending the data before actually deleting.
if (socketThreadWrites.find(sock) != socketThreadWrites.end())
{
// Wait until the data is written, then delete the socket.
sock->deleteLater = true;
}
else
{
// Delete the socket.
socketCloseNow(sock);
}
wzMutexUnlock(socketThreadMutex);
}
Socket *socketAccept(Socket *sock)

View File

@ -65,8 +65,10 @@ ssize_t readNoInt(Socket *sock, void *buf, size_t max_size); ///< Rea
ssize_t readAll(Socket* sock, void *buf, size_t size, unsigned timeout);///< Reads exactly size bytes from the Socket, or blocks until the timeout expires.
ssize_t writeAll(Socket *sock, const void* buf, size_t size); ///< Nonblocking write of size bytes to the Socket. All bytes will be written asynchronously, by a separate thread.
// Sockets, compressed.
void socketBeginCompression(Socket *sock); ///< Makes future data sent compressed, and future data received expected to be compressed.
bool socketReadDisconnected(Socket *sock); ///< If readNoInt returned 0, returns true if this is the result of a disconnect, or false if the input compressed data just hasn't produced any output bytes.
void socketFlush(Socket *sock); ///< Actually sends the data written with writeAll. Only useful on compressed sockets. Note that flushing too often makes compression less effective.
// Socket sets.
SocketSet *allocSocketSet(void); ///< Constructs a SocketSet.

View File

@ -147,6 +147,7 @@ GAMECODE gameLoop(void)
INT_RETVAL intRetVal;
int clearMode = 0;
bool gameTicked; // true iff we are doing a logical update.
uint32_t lastFlushTime = 0;
// Receive NET_BLAH messages.
// Receive GAME_BLAH messages, and if it's time, process exactly as many GAME_BLAH messages as required to be able to tick the gameTime.
@ -165,6 +166,12 @@ GAMECODE gameLoop(void)
gameSRand(gameTime); // Brute force way of synchronising the random number generator, which can't go out of synch.
}
if (gameTicked || realTime - lastFlushTime < 400u)
{
lastFlushTime = realTime;
NETflush(); // Make sure the game time tick message is really sent over the network, and that we aren't waiting too long to send data.
}
if (bMultiPlayer && !NetPlay.isHostAlive && NetPlay.bComms && !NetPlay.isHost)
{
intAddInGamePopup();

View File

@ -258,6 +258,7 @@ TITLECODE titleLoop(void)
debug( LOG_FATAL, "unknown title screen mode" );
abort();
}
NETflush(); // Send any pending network data.
audio_Update();