newnet: Change NetQueue into a message queue instead of a byte queue and fix some things. Take into account that NETbeginDecode isn't always matched with NETend, just most of the time.

Apart from each player playing a completely different games, multiplayer seems to work now. The players can chat with each other.
So everything seems normal, just with slightly worse synchronisation; now with literally no synchronisation, instead of just practically no synchronisation.
Newly built trucks are doubled, due to listening to own game messages. Game messages ought to all be implied (and not sent), or sent but not executed until received.
master
Cyp 2010-02-10 23:45:52 +01:00
parent 8991ed6e76
commit 7e18c64533
9 changed files with 236 additions and 474 deletions

View File

@ -31,7 +31,7 @@
// Logging for degug only
// ////////////////////////////////////////////////////////////////////////
static const char *packetname[NUM_GAME_PACKETS] =
static const char *packetname[NUM_GAME_PACKETS + 1] =
{
"NET_DROID",
"NET_DROIDINFO",
@ -106,12 +106,13 @@ static const char *packetname[NUM_GAME_PACKETS] =
"NET_FUTURE3",
"NET_FILE_REQUESTED",
"NET_FILE_CANCELLED",
"NET_FILE_PAYLOAD"
"NET_FILE_PAYLOAD",
"NET_???"
};
static PHYSFS_file *pFileHandle = NULL;
static uint32_t packetcount[2][NUM_GAME_PACKETS];
static uint32_t packetsize[2][NUM_GAME_PACKETS];
static uint32_t packetcount[2][NUM_GAME_PACKETS + 1];
static uint32_t packetsize[2][NUM_GAME_PACKETS + 1];
BOOL NETstartLogging(void)
{
@ -121,7 +122,7 @@ BOOL NETstartLogging(void)
static char filename[256] = {'\0'};
int i;
for (i = 0; i < NUM_GAME_PACKETS; i++)
for (i = 0; i < NUM_GAME_PACKETS + 1; i++)
{
packetcount[0][i] = 0;
packetsize[0][i] = 0;
@ -157,7 +158,7 @@ BOOL NETstopLogging(void)
}
/* Output stats */
for (i = 0; i < NUM_GAME_PACKETS; i++)
for (i = 0; i < NUM_GAME_PACKETS + 1; i++)
{
snprintf(buf, sizeof(buf), "%s: received %u times, %u bytes; sent %u times, %u bytes\n", packetname[i],
packetcount[0][i], packetsize[0][i], packetcount[1][i], packetsize[1][i]);
@ -190,14 +191,14 @@ BOOL NETstopLogging(void)
return true;
}
/*void NETlogPacket(NETMSG *msg, BOOL received)
/*void NETlogPacket(NETQUEUE queue, BOOL received)
{
if (msg->type >= NUM_GAME_PACKETS)
if (NETmessageType(queue) >= NUM_GAME_PACKETS)
{
return;
NETmessageType(queue) = NUM_GAME_PACKETS;
}
packetcount[received][msg->type]++;
packetsize[received][msg->type] += msg->size;
packetcount[received][NETmessageType(queue)]++;
packetsize[received][NETmessageType(queue)] += NETmessageSize(queue);
}*/
BOOL NETlogEntry(const char *str,UDWORD a,UDWORD b)

View File

@ -2340,6 +2340,7 @@ static BOOL NETprocessSystemMessage(NETQUEUE playerQueue, uint8_t type)
return false;
}
NETpop(playerQueue);
return true;
}
@ -2399,7 +2400,7 @@ BOOL NETrecvNet(NETQUEUE *queue, uint8_t *type)
if (socket_set == NULL || checkSockets(socket_set, NET_READ_TIMEOUT) <= 0)
{
return false;
goto checkMessages;
}
for (current = 0; current < MAX_CONNECTED_PLAYERS; ++current)
@ -2445,6 +2446,7 @@ BOOL NETrecvNet(NETQUEUE *queue, uint8_t *type)
}
}
checkMessages:
for (current = 0; current < MAX_CONNECTED_PLAYERS; ++current)
{
*queue = NETnetQueue(current);
@ -3492,14 +3494,18 @@ connect_succesfull:
NETQUEUE queue;
uint8_t type = NUM_GAME_PACKETS;
NETrecvNet(&queue, &type);
// FIXME: shouldn't there be some sort of rejection message?
if (SDL_GetTicks() > i + 5000)
{
// timeout
return false;
}
if (!NETrecvNet(&queue, &type))
{
continue;
}
if (type == NET_ACCEPTED)
{
// :)
@ -3509,6 +3515,7 @@ connect_succesfull:
// Retrieve the player ID the game host arranged for us
NETuint8_t(&index);
NETend();
NETpop(queue);
selectedPlayer = index;
debug(LOG_NET, "NET_ACCEPTED received. Accepted into the game - I'm player %u using bsocket %p, tcp_socket=%p", (unsigned int)index, bsocket, tcp_socket);
@ -3537,11 +3544,14 @@ connect_succesfull:
// And why "wry"?
NETuint8_t(&rejection);
NETend();
NETpop(queue);
debug(LOG_NET, "NET_REJECTED received. Better luck next time?");
setLobbyError((LOBBY_ERROR_TYPES)rejection);
}
NETpop(queue);
}
}

View File

@ -1,346 +1,144 @@
#include "netqueue.h"
#include "lib/framework/frame.h"
#ifdef USE_ZLIB
#include "zlib.h"
#endif //USE_ZLIB
NetQueue::NetQueue(UsagePattern p)
: deserialiseUnderflow(false)
, isCompressed(false)
, readOffset(0)
, readSuccessOffset(0)
, netOffset(0)
#ifdef USE_ZLIB
, stream(NULL)
#endif //USE_ZLIB
NetQueue::NetQueue()
: canReadRawData(true)
, canGetMessages(true)
{
switch (p)
{
case Unused: canSerialise = false; canDeserialise = false; canWriteRaw = false; canReadRaw = false; canCompress = false; break;
case NetSend: canSerialise = true; canDeserialise = false; canWriteRaw = false; canReadRaw = true; canCompress = true; break;
case NetReceive: canSerialise = false; canDeserialise = true; canWriteRaw = true; canReadRaw = false; canCompress = true; break;
case GameSend: canSerialise = true; canDeserialise = true; canWriteRaw = false; canReadRaw = true; canCompress = false; break;
case GameReceive: canSerialise = false; canDeserialise = true; canWriteRaw = true; canReadRaw = false; canCompress = false; break;
case GameSolo: canSerialise = true; canDeserialise = true; canWriteRaw = false; canReadRaw = false; canCompress = false; break;
}
ASSERT(!canSerialise || !canWriteRaw, "Can't insert both objects and raw data into the same NetQueue.");
ASSERT(!canWriteRaw || canDeserialise, "No point being able to write data into the NetQueue if we can't deserialise it.");
}
NetQueue::~NetQueue()
{
#ifdef USE_ZLIB
if (stream != NULL)
{
if (canWriteRaw)
{
// Was writing compressed data into this NetQueue.
inflateEnd(stream);
}
else
{
// Was reading compressed data from this NetQueue.
deflateEnd(stream);
}
delete stream;
}
#endif //USE_ZLIB
dataPos = messages.end();
messagePos = messages.end();
}
void NetQueue::writeRawData(const uint8_t *netData, size_t netLen)
{
ASSERT(canWriteRaw, "Wrong NetQueue type for writeRawData.");
#ifdef USE_ZLIB
if (stream != NULL)
{
compressedData.insert(compressedData.end(), netData, netData + netLen);
stream->next_in = &compressedData[0];
stream->avail_in = compressedData.size();
do
{
uint8_t tmp[1024];
memset(tmp, 0x00, sizeof(tmp));
stream->next_out = tmp;
stream->avail_out = sizeof(tmp);
int ret = inflate(stream, Z_NO_FLUSH);
ASSERT(ret == Z_OK || ret == Z_STREAM_END || ret == Z_BUF_ERROR, "Bad compressed data stream, zlib error, or out of memory.");
// Insert the data.
data.insert(data.end(), tmp, tmp + sizeof(tmp) - stream->avail_out);
} while (stream->avail_out == 0);
return;
}
#endif //USE_ZLIB
size_t used = 0;
std::vector<uint8_t> &buffer = incompleteReceivedMessageData; // Short alias.
// Insert the data.
data.insert(data.end(), netData, netData + netLen);
buffer.insert(buffer.end(), netData, netData + netLen);
// Extract the messages.
while (buffer.size() - used >= 5)
{
uint32_t len = buffer[used]<<24 | buffer[used + 1]<<16 | buffer[used + 2]<<8 | buffer[used + 3];
uint8_t type = buffer[used + 4];
if (buffer.size() - used - 5 < len)
{
break;
}
messages.push_front(NetMessage(type));
messages.front().data.assign(buffer.begin() + used + 5, buffer.begin() + used + 5 + len);
used += 5 + len;
}
// Recycle old data.
buffer.erase(buffer.begin(), buffer.begin() + used);
}
void NetQueue::willNeverReadRawData()
{
canReadRawData = false;
}
void NetQueue::readRawData(const uint8_t **netData, size_t *netLen)
{
ASSERT(canReadRaw, "Wrong NetQueue type for readRawData.");
ASSERT(canReadRawData, "Wrong NetQueue type for readRawData.");
// Return the data. (If not compressing.)
*netData = &data[netOffset];
*netLen = data.size() - netOffset;
std::vector<uint8_t> &buffer = unsentMessageData; // Short alias.
#ifdef USE_ZLIB
if (stream != NULL)
// Turn the messages into raw data.
while (dataPos != messages.begin())
{
stream->next_in = const_cast<uint8_t *>(*netData);
stream->avail_in = *netLen;
do
{
uint8_t tmp[1024];
stream->next_out = tmp;
stream->avail_out = sizeof(tmp);
int ret = deflate(stream, Z_SYNC_FLUSH); // Z_SYNC_FLUSH = squeeze out every last byte of output immediately, even if it hurts compression.
ASSERT(ret == Z_OK || ret == Z_STREAM_END || ret == Z_BUF_ERROR, "Bad compressed data stream, zlib error, or out of memory.");
compressedData.insert(compressedData.end(), tmp, tmp + sizeof(tmp) - stream->avail_out);
} while (stream->avail_out == 0);
netOffset += *netLen;
popOldData();
// Return the compressed data instead of the uncompressed data.
*netData = &compressedData[0];
*netLen = compressedData.size();
--dataPos;
buffer.push_back(dataPos->data.size()>>24);
buffer.push_back(dataPos->data.size()>>16);
buffer.push_back(dataPos->data.size()>> 8);
buffer.push_back(dataPos->data.size() );
buffer.push_back(dataPos->type);
buffer.insert(buffer.end(), dataPos->data.begin(), dataPos->data.end());
}
#endif //USE_ZLIB
// Recycle old data.
popOldMessages();
// Return the data.
*netData = &buffer[0];
*netLen = buffer.size();
}
void NetQueue::popRawData(size_t netLen)
{
ASSERT(canReadRaw, "Wrong NetQueue type for popRawData.");
ASSERT(canReadRawData, "Wrong NetQueue type for popRawData.");
ASSERT(netLen <= unsentMessageData.size(), "Popped too much data!");
#ifdef USE_ZLIB
if (stream != NULL)
{
ASSERT(netLen <= compressedData.size(), "Popped too much data!");
compressedData.erase(compressedData.begin(), compressedData.begin() + netLen);
return;
}
#endif //USE_ZLIB
netOffset += netLen;
ASSERT(netOffset <= data.size(), "Popped too much data!");
popOldData();
// Pop the data.
unsentMessageData.erase(unsentMessageData.begin(), unsentMessageData.begin() + netLen);
}
void NetQueue::endSerialiseLength()
void NetQueue::pushMessage(const NetMessage &message)
{
uint32_t len = data.size() - beginSerialiseOffset - 4;
data[beginSerialiseOffset ] = len>>24 & 0xFF;
data[beginSerialiseOffset+1] = len>>16 & 0xFF;
data[beginSerialiseOffset+2] = len>> 8 & 0xFF;
data[beginSerialiseOffset+3] = len & 0xFF;
messages.push_front(message);
}
bool NetQueue::endDeserialise()
void NetQueue::willNeverGetMessages()
{
if (!deserialiseUnderflow)
{
// Success.
readSuccessOffset = readOffset; // Advance readSuccessOffset.
popOldData();
return true;
}
else
{
// Failure.
readOffset = readSuccessOffset; // Reset readOffset.
deserialiseUnderflow = false; // Reset deserialiseUnderflow.
return false;
}
canGetMessages = false;
}
bool NetQueue::isDeserialiseError() const
bool NetQueue::haveMessage() const
{
return deserialiseUnderflow;
ASSERT(canGetMessages, "Wrong NetQueue type for haveMessage.");
return messagePos != messages.begin();
}
void NetQueue::serialiseLength()
const NetMessage &NetQueue::getMessage() const
{
beginSerialiseOffset = data.size();
ASSERT(canGetMessages, "Wrong NetQueue type for getMessage.");
ASSERT(messagePos != messages.begin(), "No message to get!");
// Reserve room for length.
data.resize(data.size() + 4);
// Return the message.
List::iterator i = messagePos;
--i;
return *i;
}
bool NetQueue::deserialiseHaveLength()
void NetQueue::popMessage()
{
if (readOffset + 4 > data.size())
{
return false;
}
uint32_t len = data[readOffset]<<24 | data[readOffset+1]<<16 | data[readOffset+2]<<8 | data[readOffset+3];
return len < data.size() && readOffset + 4 <= data.size() - len;
ASSERT(canGetMessages, "Wrong NetQueue type for popMessage.");
ASSERT(messagePos != messages.begin(), "No message to pop!");
// Pop the message.
--messagePos;
// Recycle old data.
popOldMessages();
}
uint8_t NetQueue::deserialiseGetType()
void NetQueue::popOldMessages()
{
return readOffset + 5 > data.size() ? 0 : data[readOffset+4];
if (!canReadRawData)
{
dataPos = messages.begin();
}
if (!canGetMessages)
{
messagePos = messages.begin();
}
List::iterator i = messages.end();
while (i != dataPos && i != messagePos)
{
--i;
}
if (i == dataPos)
{
dataPos = messages.end(); // Old iterator will become invalid.
}
if (i == messagePos)
{
messagePos = messages.end(); // Old iterator will become invalid.
}
messages.erase(i, messages.end());
}
void NetQueue::popOldData()
{
if (!canDeserialise)
{
readSuccessOffset = data.size();
}
if (!canReadRaw)
{
netOffset = data.size();
}
size_t pop = std::min(readSuccessOffset, netOffset);
if (pop <= data.size()/2)
{
return; // Wait until we have more old data (or less new data), and pop all at once, since popping from the beginning is an O(data.size() - pop) operation.
}
data.erase(data.begin(), data.begin() + pop);
readOffset -= pop; // readOffset unused if !canDeserialise.
readSuccessOffset -= pop; // readSuccessOffset unused if !canDeserialise.
netOffset -= pop; // netOffset unused if !canReadRaw.
}
void NetQueue::serialise(uint8_t v)
{
ASSERT(canSerialise, "Wrong NetQueue type for serialise.");
// Serialise.
data.push_back(v);
}
void NetQueue::deserialise(uint8_t &v)
{
ASSERT(canDeserialise, "Wrong NetQueue type for deserialise.");
if (canReadRaw && readOffset > netOffset)
{
debug(LOG_ERROR, "Deserialising data before it has been sent over the network, and that the other clients therefore haven't received yet.");
}
// Deserialise.
if (readOffset + 1 > data.size())
{
deserialiseUnderflow = true;
return; // Not enough data.
}
v = data[readOffset++];
}
void NetQueue::setCompression(uint32_t compressionMask)
{
ASSERT(canCompress, "Wrong NetQueue type for setCompression.");
#ifdef USE_ZLIB
if (!isCompressed && (compressionMask & CompressionZlib) != 0)
{
setCompressionZlib();
isCompressed = true;
}
#endif //USE_ZLIB
}
#ifdef USE_ZLIB
void NetQueue::setCompressionZlib()
{
stream = new z_stream;
stream->zalloc = NULL;
stream->zfree = NULL;
stream->opaque = NULL;
if (canWriteRaw)
{
// Reading compressed data from this NetQueue.
stream->avail_in = 0;
stream->next_in = NULL;
int ret = inflateInit(stream);
ASSERT(ret == Z_OK, "inflateInit failed. Bad zlib version?");
ASSERT(readSuccessOffset == readOffset, "Called setCompression() after beginDeserialise() without endDeserialise()");
// Reinsert the remaining data, now that we know that it's compressed.
std::vector<uint8_t> tmp(data.begin() + readSuccessOffset, data.end());
data.resize(readSuccessOffset);
writeRawData(&tmp[0], tmp.size());
}
else
{
// Writing compressed data into this NetQueue.
int ret = deflateInit(stream, Z_BEST_COMPRESSION);
ASSERT(ret == Z_OK, "deflateInit failed. Bad zlib version?");
// Don't compress data from before setCompression was called, since clients must agree on exactly when compression starts.
compressedData.assign(data.begin() + netOffset, data.end());
netOffset = data.size();
}
}
#endif //USE_ZLIB
#if 0
extern "C" void testNetQueue()
{
printf("Hello world!\n");
{
NetQueue read(NetQueue::NetReceive), write(NetQueue::NetSend);
// Generate the data.
uint8_t testString[] = "Hello world, this is a string which will be compressed. Compressed strings are good, because they use less space. Compressed strings use a lot less space, if they repeat things a lot, which is very good. As an example of repetition, remember the sentence 'Compressed strings use a lot less space, if they repeat things a lot, which is very good.'. You have seen it before, which means that 'Compressed strings use a lot less space, if they repeat things a lot, which is very good.' was repeated. We can even repeat 'Compressed strings are good, because they use less space.' if we want, and that will be compressed too. Because of repeating things like 'Compressed strings use a lot less space, if they repeat things a lot, which is very good.' and 'Compressed strings are good, because they use less space.' a lot, the compressed string is much shorter than the uncompressed string.";
printf("Test string size = %d\n", int(strlen((char *)testString) + 1));
for (int i = 0; true; ++i)
{
queue(write.beginSerialise(), testString[i]);
if (testString[i] == 0x00)
{
break;
}
if (testString[i] == ' ')
{
write.setCompression(CompressionMask);
}
printf("%c", testString[i]);
}
printf("\n");
// Send the data over the network.
const uint8_t *rawData;
size_t rawLen;
write.readRawData(&rawData, &rawLen);
printf("Raw data size = %d\n", int(rawLen));
for (size_t i = 0; i != rawLen; ++i)
{
printf("%02X ", rawData[i]); // Send this byte over the network.
}
printf("\n");
read.writeRawData(rawData, rawLen);
write.popRawData(rawLen);
// Parse the data.
while (true)
{
uint8_t result;
queue(read.beginDeserialise(), result);
read.endDeserialise();
if (result == 0x00)
{
break;
}
if (result == ' ')
{
read.setCompression(CompressionMask);
}
printf("%c", result);
}
printf("\n");
}
printf("End hello world!\n");
}
#endif

View File

@ -3,143 +3,97 @@
#include "lib/framework/types.h"
// TODO Add zlib to build scripts and use it.
// TODO But first move it directly on top of the sockets, seems it didn't really belong here. NetQueue
// was originally intended to be just a stream of bytes, which would be synchronised among all players.
// It ended up being a stream of messages prefixed with message size. And thanks to the need for
// broadcasting, the message streams can get mixed together, which isn't good if the message streams are
// individually zlib-compressed. While it would be possible to enable compression if being careful which
// queues it's enabled on, the compression code doesn't belong here anymore. Leaving it here for now,
// since the compression code is tested, and works. At the time of writing, the compression code is the
// only part of the code which is currently known to work well. It should be moved to a CompressPipe
// class, or something similar.
//#don't define USE_ZLIB
#ifdef __cplusplus
#include <vector>
#include <list>
#include <deque>
// At game level:
// There should be a NetQueue representing each client.
// Each client should set their own queue to GameSend, or GameSolo if there are no other clients.
// The other queues should be set to GameReceive, or GameSolo if the queue belongs to an AI.
// Clients should serialise messages to their own queue (the one set to GameSend).
// Clients should serialise messages to their own queue.
// Clients should call readRawData on their own queue, and send it over the network. (And popRawData when done.)
// Clients should receive data from remote clients, and call writeRawData on the corresponding queues.
// Clients should deserialise messages from all queues (including their own), such that all clients are processing the same data in the same way.
// At socket level:
// There should be two NetQueues per socket, one set to NetSend, the other set to NetReceive.
// There should be a NetQueuePair per socket.
class NetMessage
{
public:
NetMessage(uint8_t type_ = 0xFF) : type(type_) {}
uint8_t type;
std::vector<uint8_t> data;
};
class MessageWriter
{
public:
enum { Read, Write, Direction = Write };
MessageWriter(NetMessage *m = NULL) : message(m) {}
MessageWriter(NetMessage &m) : message(&m) {}
void byte(uint8_t v) const { message->data.push_back(v); }
NetMessage *message;
};
class MessageReader
{
public:
enum { Read, Write, Direction = Read };
MessageReader(const NetMessage *m = NULL) : message(m), index(0) {}
MessageReader(const NetMessage &m) : message(&m), index(0) {}
void byte(uint8_t &v) const { v = index >= message->data.size() ? 0x00 : message->data[index]; ++index; }
bool valid() const { return index <= message->data.size(); }
const NetMessage *message;
mutable size_t index;
};
class NetQueue
{
public:
enum UsagePattern
{
Unused, ///< NetQueue is not used for anything.
NetQueue();
NetSend, ///< For use with outgoing sockets. Will serialise and read raw data.
NetReceive, ///< For use with incoming sockets. Will write raw data and deserialise.
GameSend, ///< For use as a game order queue. Will serialise, deserialise and read raw data.
GameReceive, ///< For use as a game order queue. Will deserialise and write raw data.
GameSolo ///< For use as a game order queue. Will serialise and deserialise.
};
enum Compression
{
CompressionZlib = 0x00000001, ///< Zlib compression.
// TODO? CompressionLzma = 0x00000002, ///< Lzma compression.
CompressionMask = 0x00000000 ///< All compression types we support.
#ifdef USE_ZLIB
| CompressionZlib
#endif //USE_ZLIB
};
class Writer
{
public:
enum { Read, Write, Direction = Write };
Writer(NetQueue *q = NULL) : queue(q) {}
Writer(NetQueue &q) : queue(&q) {}
void byte(uint8_t v) const { queue->serialise(v); }
NetQueue *queue;
};
class Reader
{
public:
enum { Read, Write, Direction = Read };
Reader(NetQueue *q = NULL) : queue(q) {}
Reader(NetQueue &q) : queue(&q) {}
void byte(uint8_t &v) const { queue->deserialise(v); }
NetQueue *queue;
};
NetQueue(UsagePattern p = Unused);
~NetQueue();
// Network related
// Network related, receiving
void writeRawData(const uint8_t *netData, size_t netLen); ///< Inserts data from the network into the NetQueue.
// Network related, sending
void willNeverReadRawData(); ///< Marks that we will not be sending this data over the network.
void readRawData(const uint8_t **netData, size_t *netLen); ///< Extracts data from the NetQueue to send over the network.
void popRawData(size_t netLen); ///< Pops the extracted data, so that future readRawData calls do not return that data.
// Serialise/deserialise related. All game clients should deserialise all queues, including their own.
void endSerialiseLength(); ///< Must call after serialiseLength, but not call otherwise.
bool endDeserialise(); ///< Returns true if deserialise succeeded, data has been consumed. Returns false if deserialise failed, due to not enough data yet.
bool isDeserialiseError() const; ///< Returns true if deserialise has failed.
void serialiseLength(); ///< Do not call readRawData or beginDeserialise before calling endSerialiseLength! Makes room for the length, which will actually be serialised there when calling endSerialise.
bool deserialiseHaveLength(); ///< Checks the length, and returns true iff that much data is available. Length will still be returned by deserialise.
uint8_t deserialiseGetType(); ///< Returns the byte immediately after the length.
void serialise(uint8_t v); ///< Serialise a byte.
void deserialise(uint8_t &v); ///< Deserialise a byte.
void setCompression(uint32_t compressionMask); ///< Enable compression if possible. Should call after serialising or deserialising an appropriate message.
// All game clients should check game messages from all queues, including their own, and only the net messages sent to them.
// Message related, storing.
void pushMessage(const NetMessage &message); ///< Adds a message to the queue.
// Message related, extracting.
void willNeverGetMessages(); ///< Marks that we will not be reading any of the messages (only sending over the network).
bool haveMessage() const; ///< Return true if we have a message ready to return.
const NetMessage &getMessage() const; ///< Returns a message.
void popMessage(); ///< Pops the last returned message.
private:
#ifdef USE_ZLIB
void setCompressionZlib(); ///< Enable zlib compression.
#endif //USE_ZLIB
void popOldData(); ///< Pops any data that is no longer needed.
void popOldMessages(); ///< Pops any messages that are no longer needed.
// Disable copy constructor and assignment operator.
NetQueue(const NetQueue &);
void operator =(const NetQueue &);
bool canSerialise; ///< True if we are the producer of data for this NetQueue.
bool canDeserialise; ///< True if we are the producer of data for this NetQueue.
bool canWriteRaw; ///< True if we will call getDataForNetwork.
bool canReadRaw; ///< True if we will call getDataForNetwork.
bool deserialiseUnderflow; ///< We ran out of data when deserialising.
bool canCompress; ///< True if this is a NetQueue for a socket.
bool isCompressed; ///< True if we turned compression on already.
unsigned readOffset; ///< Offset in data for reading.
unsigned readSuccessOffset; ///< Offset in data for reading, when success was last called.
unsigned netOffset; ///< Offset in data for writing to network.
unsigned beginSerialiseOffset; ///< Offset in data when serialiseLength was called.
std::vector<uint8_t> data; ///< Decompressed serialised data.
std::vector<uint8_t> compressedData; ///< Compressed data.
bool canReadRawData; ///< True if we will send the messages over the network, false if we don't.
bool canGetMessages; ///< True if we will get the messages, false if we don't use them ourselves.
#ifdef USE_ZLIB
struct z_stream_s *stream; ///< Non-null iff we are using zlib compression.
#endif //USE_ZLIB
typedef std::list<NetMessage> List;
List::iterator dataPos; ///< Last message which was sent over the network.
List::iterator messagePos; ///< Last message which was popped.
List messages; ///< List of messages. Messages are added to the front and read from the back.
std::vector<uint8_t> incompleteReceivedMessageData; ///< Data from network which has not yet formed an entire message.
std::vector<uint8_t> unsentMessageData; ///< Data for network which has been requested but not yet popped.
};
class NetQueuePair
{
public:
NetQueuePair() : send(NetQueue::NetSend), receive(NetQueue::NetReceive) {}
NetQueuePair() { send.willNeverGetMessages(); receive.willNeverReadRawData(); }
NetQueue send;
NetQueue receive;
};
extern "C"
{
#else
#error There isn''t currently any C interface to this.
#endif //__cplusplus
#ifdef __cplusplus
}
#endif //__cplusplus
#endif //_NET_QUEUE_H_

View File

@ -40,10 +40,12 @@ static NetQueue *gameQueues[MAX_PLAYERS + 1] = {NULL, NULL, NULL, NULL, NULL, NU
static NetQueuePair *netQueues[MAX_CONNECTED_PLAYERS] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL};
static NetQueuePair *tmpQueues[MAX_TMP_SOCKETS] = {NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL};
static NetQueue *broadcastQueue = NULL;
static NetQueue::Writer writer;
static NetQueue::Reader reader;
static int writeSocketIndex = -1;
// Only used between NETbegin{Encode,Decode} and NETend calls.
static MessageWriter writer;
static MessageReader reader;
static NetMessage message;
static NETQUEUE queueInfo;
static PACKETDIR NetDir;
static void NETsetPacketDir(PACKETDIR dir)
@ -256,12 +258,12 @@ void NETinsertRawData(NETQUEUE queue, uint8_t *data, size_t dataLen)
BOOL NETisMessageReady(NETQUEUE queue)
{
return receiveQueue(queue)->deserialiseHaveLength();
return receiveQueue(queue)->haveMessage();
}
uint8_t NETmessageType(NETQUEUE queue)
{
return receiveQueue(queue)->deserialiseGetType();
return receiveQueue(queue)->getMessage().type;
}
/*
@ -273,13 +275,14 @@ void NETinitQueue(NETQUEUE queue)
if (queue.queueType == QUEUE_BROADCAST)
{
delete broadcastQueue;
broadcastQueue = new NetQueue(NetQueue::NetSend);
broadcastQueue = new NetQueue;
broadcastQueue->willNeverGetMessages();
return;
}
else if (queue.queueType == QUEUE_GAME)
{
delete gameQueues[queue.index];
gameQueues[queue.index] = new NetQueue(NetQueue::GameSolo); // TODO Should sometimes be NetQueue::GameSend or NetQueue::GameReceive.
gameQueues[queue.index] = new NetQueue; // TODO Call .willNeverReadRawData() if not sending over net.
return;
}
else
@ -298,82 +301,70 @@ void NETmoveQueue(NETQUEUE src, NETQUEUE dst)
std::swap(pairQueue(src), pairQueue(dst));
}
void NETbeginEncode(NETQUEUE cqueue, uint8_t type)
void NETbeginEncode(NETQUEUE queue, uint8_t type)
{
NETsetPacketDir(PACKET_ENCODE);
writer = sendQueue(cqueue);
if (cqueue.queueType == QUEUE_NET || cqueue.queueType == QUEUE_BROADCAST)
{
writeSocketIndex = cqueue.index;
}
else
{
writeSocketIndex = -1;
}
writer.queue->serialiseLength();
queue(writer, type);
queueInfo = queue;
message = type;
writer = MessageWriter(message);
}
void NETbeginDecode(NETQUEUE cqueue, uint8_t type)
void NETbeginDecode(NETQUEUE queue, uint8_t type)
{
NETsetPacketDir(PACKET_DECODE);
reader = receiveQueue(cqueue);
queueInfo = queue;
message = receiveQueue(queueInfo)->getMessage();
reader = MessageReader(message);
uint32_t len;
uint8_t readType;
queue(reader, len);
queue(reader, readType);
assert(type == readType);
assert(type == message.type);
}
BOOL NETend()
{
assert(NETgetPacketDir() != PACKET_INVALID);
// If we are encoding just return true
if (NETgetPacketDir() == PACKET_ENCODE)
{
writer.queue->endSerialiseLength();
if (writeSocketIndex >= 0)
// Push the message onto the list.
NetQueue *queue = sendQueue(queueInfo);
queue->pushMessage(message);
if (queueInfo.queueType == QUEUE_NET || queueInfo.queueType == QUEUE_BROADCAST)
{
const uint8_t *data;
size_t dataLen;
NetQueue *queue = writeSocketIndex == NET_ALL_PLAYERS ? broadcastQueue : &netQueues[writeSocketIndex]->send;
queue->readRawData(&data, &dataLen);
NETsend(writeSocketIndex, data, dataLen);
NETsend(queueInfo.index, data, dataLen);
queue->popRawData(dataLen);
}
// We have ended the serialisation, so mark the direction invalid
NETsetPacketDir(PACKET_INVALID);
return true;
return true; // Serialising never fails.
}
bool ret = reader.queue->endDeserialise();
// We have ended the deserialisation, so mark the direction invalid
NETsetPacketDir(PACKET_INVALID);
return ret;
/*
// If the packet is invalid or failed to compile
if (NETgetPacketDir() != PACKET_ENCODE || !NetMsg.status)
if (NETgetPacketDir() == PACKET_DECODE)
{
return false;
bool ret = reader.valid();
//receiveQueue(queueInfo)->popMessage(); // Moved to NETpop(), since some messages call NETbeginEncode but not NETend, and others call neither.
// We have ended the deserialisation, so mark the direction invalid
NETsetPacketDir(PACKET_INVALID);
return ret;
}
// Send the packet, updating the send functions is on my todo list!
if (NetMsg.destination == NET_ALL_PLAYERS)
{
return NETbcast(&NetMsg);
}
else
{
return NETsend(&NetMsg, NetMsg.destination);
}
*/
assert(false && false && false);
return false;
}
void NETpop(NETQUEUE queue)
{
receiveQueue(queue)->popMessage();
}
template<class T>

View File

@ -64,6 +64,7 @@ NETQUEUE NETbroadcastQueue(void); ///< The queue for sending data di
void NETinsertRawData(NETQUEUE queue, uint8_t *data, size_t dataLen); /// Dump raw data from sockets and raw data sent via host here.
BOOL NETisMessageReady(NETQUEUE queue);
uint8_t NETmessageType(NETQUEUE queue);
uint32_t NETmessageSize(NETQUEUE queue);
void NETinitQueue(NETQUEUE queue);
void NETmoveQueue(NETQUEUE src, NETQUEUE dst);
@ -71,6 +72,7 @@ void NETmoveQueue(NETQUEUE src, NETQUEUE dst);
void NETbeginEncode(NETQUEUE queue, uint8_t type);
void NETbeginDecode(NETQUEUE queue, uint8_t type);
BOOL NETend(void);
void NETpop(NETQUEUE queue);
void NETint8_t(int8_t *ip);
void NETuint8_t(uint8_t *ip);

View File

@ -3076,6 +3076,8 @@ void frontendMultiMessages(void)
}
break;
}
NETpop(queue);
}
}

View File

@ -200,6 +200,8 @@ void recvOptions(NETQUEUE queue)
NETuint8_t(&ingame.pStructureLimits[i].limit);
}
NETend();
// Do the skirmish slider settings if they are up
for (i = 0; i < MAX_PLAYERS; i++)
{

View File

@ -771,6 +771,8 @@ BOOL recvMessage(void)
default:
break;
}
NETpop(queue);
}
return true;