1
0

Shave off buffer copies in networking code (#11607)

This commit is contained in:
sfan5 2021-09-17 18:14:25 +02:00 committed by GitHub
parent ea250ff5c5
commit fd8a8501bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 126 additions and 92 deletions

View File

@ -200,17 +200,12 @@ RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
return i; return i;
} }
RPBSearchResult ReliablePacketBuffer::notFound()
{
return m_list.end();
}
bool ReliablePacketBuffer::getFirstSeqnum(u16& result) bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
{ {
MutexAutoLock listlock(m_list_mutex); MutexAutoLock listlock(m_list_mutex);
if (m_list.empty()) if (m_list.empty())
return false; return false;
const BufferedPacket &p = *m_list.begin(); const BufferedPacket &p = m_list.front();
result = readU16(&p.data[BASE_HEADER_SIZE + 1]); result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
return true; return true;
} }
@ -220,14 +215,14 @@ BufferedPacket ReliablePacketBuffer::popFirst()
MutexAutoLock listlock(m_list_mutex); MutexAutoLock listlock(m_list_mutex);
if (m_list.empty()) if (m_list.empty())
throw NotFoundException("Buffer is empty"); throw NotFoundException("Buffer is empty");
BufferedPacket p = *m_list.begin(); BufferedPacket p = std::move(m_list.front());
m_list.erase(m_list.begin()); m_list.pop_front();
if (m_list.empty()) { if (m_list.empty()) {
m_oldest_non_answered_ack = 0; m_oldest_non_answered_ack = 0;
} else { } else {
m_oldest_non_answered_ack = m_oldest_non_answered_ack =
readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
} }
return p; return p;
} }
@ -241,15 +236,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
<< " not found in reliable buffer"<<std::endl); << " not found in reliable buffer"<<std::endl);
throw NotFoundException("seqnum not found in buffer"); throw NotFoundException("seqnum not found in buffer");
} }
BufferedPacket p = *r; BufferedPacket p = std::move(*r);
RPBSearchResult next = r;
++next;
if (next != notFound()) {
u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
m_oldest_non_answered_ack = s;
}
m_list.erase(r); m_list.erase(r);
@ -257,12 +244,12 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
m_oldest_non_answered_ack = 0; m_oldest_non_answered_ack = 0;
} else { } else {
m_oldest_non_answered_ack = m_oldest_non_answered_ack =
readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
} }
return p; return p;
} }
void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
{ {
MutexAutoLock listlock(m_list_mutex); MutexAutoLock listlock(m_list_mutex);
if (p.data.getSize() < BASE_HEADER_SIZE + 3) { if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
@ -355,7 +342,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
} }
/* update last packet number */ /* update last packet number */
m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
} }
void ReliablePacketBuffer::incrementTimeouts(float dtime) void ReliablePacketBuffer::incrementTimeouts(float dtime)
@ -367,17 +354,19 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime)
} }
} }
std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout, std::list<BufferedPacket>
unsigned int max_packets) ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{ {
MutexAutoLock listlock(m_list_mutex); MutexAutoLock listlock(m_list_mutex);
std::list<BufferedPacket> timed_outs; std::list<BufferedPacket> timed_outs;
for (BufferedPacket &bufferedPacket : m_list) { for (BufferedPacket &bufferedPacket : m_list) {
if (bufferedPacket.time >= timeout) { if (bufferedPacket.time >= timeout) {
// caller will resend packet so reset time and increase counter
bufferedPacket.time = 0.0f;
bufferedPacket.resend_count++;
timed_outs.push_back(bufferedPacket); timed_outs.push_back(bufferedPacket);
//this packet will be sent right afterwards reset timeout here
bufferedPacket.time = 0.0f;
if (timed_outs.size() >= max_packets) if (timed_outs.size() >= max_packets)
break; break;
} }
@ -1051,20 +1040,20 @@ bool UDPPeer::processReliableSendCommand(
m_connection->GetProtocolID(), m_connection->GetPeerID(), m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum); c.channelnum);
toadd.push(p); toadd.push(std::move(p));
} }
if (have_sequence_number) { if (have_sequence_number) {
volatile u16 pcount = 0; volatile u16 pcount = 0;
while (!toadd.empty()) { while (!toadd.empty()) {
BufferedPacket p = toadd.front(); BufferedPacket p = std::move(toadd.front());
toadd.pop(); toadd.pop();
// LOG(dout_con<<connection->getDesc() // LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id // << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF) // << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl) // << std::endl)
chan.queued_reliables.push(p); chan.queued_reliables.push(std::move(p));
pcount++; pcount++;
} }
sanity_check(chan.queued_reliables.size() < 0xFFFF); sanity_check(chan.queued_reliables.size() < 0xFFFF);
@ -1208,12 +1197,19 @@ Connection::~Connection()
} }
/* Internal stuff */ /* Internal stuff */
void Connection::putEvent(ConnectionEvent &e)
void Connection::putEvent(const ConnectionEvent &e)
{ {
assert(e.type != CONNEVENT_NONE); // Pre-condition assert(e.type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e); m_event_queue.push_back(e);
} }
void Connection::putEvent(ConnectionEvent &&e)
{
assert(e.type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(std::move(e));
}
void Connection::TriggerSend() void Connection::TriggerSend()
{ {
m_sendThread->Trigger(); m_sendThread->Trigger();
@ -1299,7 +1295,7 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms)
} }
} }
void Connection::putCommand(ConnectionCommand &c) void Connection::putCommand(const ConnectionCommand &c)
{ {
if (!m_shutting_down) { if (!m_shutting_down) {
m_command_queue.push_back(c); m_command_queue.push_back(c);
@ -1307,6 +1303,14 @@ void Connection::putCommand(ConnectionCommand &c)
} }
} }
void Connection::putCommand(ConnectionCommand &&c)
{
if (!m_shutting_down) {
m_command_queue.push_back(std::move(c));
m_sendThread->Trigger();
}
}
void Connection::Serve(Address bind_addr) void Connection::Serve(Address bind_addr)
{ {
ConnectionCommand c; ConnectionCommand c;
@ -1408,7 +1412,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
ConnectionCommand c; ConnectionCommand c;
c.send(peer_id, channelnum, pkt, reliable); c.send(peer_id, channelnum, pkt, reliable);
putCommand(c); putCommand(std::move(c));
} }
Address Connection::GetPeerAddress(session_t peer_id) Address Connection::GetPeerAddress(session_t peer_id)
@ -1508,12 +1512,12 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
<< "createPeer(): giving peer_id=" << peer_id_new << std::endl); << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
ConnectionCommand cmd; ConnectionCommand cmd;
SharedBuffer<u8> reply(4); Buffer<u8> reply(4);
writeU8(&reply[0], PACKET_TYPE_CONTROL); writeU8(&reply[0], PACKET_TYPE_CONTROL);
writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID); writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
writeU16(&reply[2], peer_id_new); writeU16(&reply[2], peer_id_new);
cmd.createPeer(peer_id_new,reply); cmd.createPeer(peer_id_new,reply);
putCommand(cmd); putCommand(std::move(cmd));
// Create peer addition event // Create peer addition event
ConnectionEvent e; ConnectionEvent e;
@ -1560,7 +1564,7 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
writeU16(&ack[2], seqnum); writeU16(&ack[2], seqnum);
c.ack(peer_id, channelnum, ack); c.ack(peer_id, channelnum, ack);
putCommand(c); putCommand(std::move(c));
m_sendThread->Trigger(); m_sendThread->Trigger();
} }

View File

@ -19,7 +19,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#pragma once #pragma once
#include "irrlichttypes_bloated.h" #include "irrlichttypes.h"
#include "peerhandler.h" #include "peerhandler.h"
#include "socket.h" #include "socket.h"
#include "constants.h" #include "constants.h"
@ -29,7 +29,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "util/numeric.h" #include "util/numeric.h"
#include "networkprotocol.h" #include "networkprotocol.h"
#include <iostream> #include <iostream>
#include <fstream>
#include <vector> #include <vector>
#include <map> #include <map>
@ -242,20 +241,19 @@ public:
BufferedPacket popFirst(); BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum); BufferedPacket popSeqnum(u16 seqnum);
void insert(BufferedPacket &p, u16 next_expected); void insert(const BufferedPacket &p, u16 next_expected);
void incrementTimeouts(float dtime); void incrementTimeouts(float dtime);
std::list<BufferedPacket> getTimedOuts(float timeout, std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);
unsigned int max_packets);
void print(); void print();
bool empty(); bool empty();
RPBSearchResult notFound();
u32 size(); u32 size();
private: private:
RPBSearchResult findPacket(u16 seqnum); // does not perform locking RPBSearchResult findPacket(u16 seqnum); // does not perform locking
inline RPBSearchResult notFound() { return m_list.end(); }
std::list<BufferedPacket> m_list; std::list<BufferedPacket> m_list;
@ -329,18 +327,6 @@ struct ConnectionCommand
bool raw = false; bool raw = false;
ConnectionCommand() = default; ConnectionCommand() = default;
ConnectionCommand &operator=(const ConnectionCommand &other)
{
type = other.type;
address = other.address;
peer_id = other.peer_id;
channelnum = other.channelnum;
// We must copy the buffer here to prevent race condition
data = SharedBuffer<u8>(*other.data, other.data.getSize());
reliable = other.reliable;
raw = other.raw;
return *this;
}
void serve(Address address_) void serve(Address address_)
{ {
@ -364,7 +350,7 @@ struct ConnectionCommand
void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_); void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
void ack(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_) void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
{ {
type = CONCMD_ACK; type = CONCMD_ACK;
peer_id = peer_id_; peer_id = peer_id_;
@ -373,7 +359,7 @@ struct ConnectionCommand
reliable = false; reliable = false;
} }
void createPeer(session_t peer_id_, const SharedBuffer<u8> &data_) void createPeer(session_t peer_id_, const Buffer<u8> &data_)
{ {
type = CONCMD_CREATE_PEER; type = CONCMD_CREATE_PEER;
peer_id = peer_id_; peer_id = peer_id_;
@ -707,7 +693,7 @@ struct ConnectionEvent
ConnectionEvent() = default; ConnectionEvent() = default;
std::string describe() const char *describe() const
{ {
switch(type) { switch(type) {
case CONNEVENT_NONE: case CONNEVENT_NONE:
@ -724,7 +710,7 @@ struct ConnectionEvent
return "Invalid ConnectionEvent"; return "Invalid ConnectionEvent";
} }
void dataReceived(session_t peer_id_, const SharedBuffer<u8> &data_) void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
{ {
type = CONNEVENT_DATA_RECEIVED; type = CONNEVENT_DATA_RECEIVED;
peer_id = peer_id_; peer_id = peer_id_;
@ -763,7 +749,9 @@ public:
/* Interface */ /* Interface */
ConnectionEvent waitEvent(u32 timeout_ms); ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c); // Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
void putCommand(const ConnectionCommand &c);
void putCommand(ConnectionCommand &&c);
void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; } void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
void Serve(Address bind_addr); void Serve(Address bind_addr);
@ -802,11 +790,14 @@ protected:
} }
UDPSocket m_udpSocket; UDPSocket m_udpSocket;
// Command queue: user -> SendThread
MutexedQueue<ConnectionCommand> m_command_queue; MutexedQueue<ConnectionCommand> m_command_queue;
bool Receive(NetworkPacket *pkt, u32 timeout); bool Receive(NetworkPacket *pkt, u32 timeout);
void putEvent(ConnectionEvent &e); // Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
void putEvent(const ConnectionEvent &e);
void putEvent(ConnectionEvent &&e);
void TriggerSend(); void TriggerSend();
@ -815,6 +806,7 @@ protected:
return getPeerNoEx(PEER_ID_SERVER) != nullptr; return getPeerNoEx(PEER_ID_SERVER) != nullptr;
} }
private: private:
// Event queue: ReceiveThread -> user
MutexedQueue<ConnectionEvent> m_event_queue; MutexedQueue<ConnectionEvent> m_event_queue;
session_t m_peer_id = 0; session_t m_peer_id = 0;

View File

@ -174,6 +174,11 @@ void ConnectionSendThread::runTimeouts(float dtime)
std::vector<session_t> timeouted_peers; std::vector<session_t> timeouted_peers;
std::vector<session_t> peerIds = m_connection->getPeerIDs(); std::vector<session_t> peerIds = m_connection->getPeerIDs();
const u32 numpeers = m_connection->m_peers.size();
if (numpeers == 0)
return;
for (session_t &peerId : peerIds) { for (session_t &peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId); PeerHelper peer = m_connection->getPeerNoEx(peerId);
@ -209,7 +214,6 @@ void ConnectionSendThread::runTimeouts(float dtime)
float resend_timeout = udpPeer->getResendTimeout(); float resend_timeout = udpPeer->getResendTimeout();
bool retry_count_exceeded = false; bool retry_count_exceeded = false;
for (Channel &channel : udpPeer->channels) { for (Channel &channel : udpPeer->channels) {
std::list<BufferedPacket> timed_outs;
// Remove timed out incomplete unreliable split packets // Remove timed out incomplete unreliable split packets
channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
@ -217,13 +221,8 @@ void ConnectionSendThread::runTimeouts(float dtime)
// Increment reliable packet times // Increment reliable packet times
channel.outgoing_reliables_sent.incrementTimeouts(dtime); channel.outgoing_reliables_sent.incrementTimeouts(dtime);
unsigned int numpeers = m_connection->m_peers.size();
if (numpeers == 0)
return;
// Re-send timed out outgoing reliables // Re-send timed out outgoing reliables
timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout, auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
(m_max_data_packets_per_iteration / numpeers)); (m_max_data_packets_per_iteration / numpeers));
channel.UpdatePacketLossCounter(timed_outs.size()); channel.UpdatePacketLossCounter(timed_outs.size());
@ -231,16 +230,14 @@ void ConnectionSendThread::runTimeouts(float dtime)
m_iteration_packets_avaialble -= timed_outs.size(); m_iteration_packets_avaialble -= timed_outs.size();
for (std::list<BufferedPacket>::iterator k = timed_outs.begin(); for (const auto &k : timed_outs) {
k != timed_outs.end(); ++k) { session_t peer_id = readPeerId(*k.data);
session_t peer_id = readPeerId(*(k->data)); u8 channelnum = readChannel(*k.data);
u8 channelnum = readChannel(*(k->data)); u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1]));
u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
channel.UpdateBytesLost(k->data.getSize()); channel.UpdateBytesLost(k.data.getSize());
k->resend_count++;
if (k->resend_count > MAX_RELIABLE_RETRY) { if (k.resend_count > MAX_RELIABLE_RETRY) {
retry_count_exceeded = true; retry_count_exceeded = true;
timeouted_peers.push_back(peer->id); timeouted_peers.push_back(peer->id);
/* no need to check additional packets if a single one did timeout*/ /* no need to check additional packets if a single one did timeout*/
@ -249,14 +246,14 @@ void ConnectionSendThread::runTimeouts(float dtime)
LOG(derr_con << m_connection->getDesc() LOG(derr_con << m_connection->getDesc()
<< "RE-SENDING timed-out RELIABLE to " << "RE-SENDING timed-out RELIABLE to "
<< k->address.serializeString() << k.address.serializeString()
<< "(t/o=" << resend_timeout << "): " << "(t/o=" << resend_timeout << "): "
<< "from_peer_id=" << peer_id << "from_peer_id=" << peer_id
<< ", channel=" << ((int) channelnum & 0xff) << ", channel=" << ((int) channelnum & 0xff)
<< ", seqnum=" << seqnum << ", seqnum=" << seqnum
<< std::endl); << std::endl);
rawSend(*k); rawSend(k);
// do not handle rtt here as we can't decide if this packet was // do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit // lost or really takes more time to transmit
@ -375,7 +372,7 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
<< " INFO: queueing reliable packet for peer_id: " << peer_id << " INFO: queueing reliable packet for peer_id: " << peer_id
<< " channel: " << (u32)channelnum << " channel: " << (u32)channelnum
<< " seqnum: " << seqnum << std::endl); << " seqnum: " << seqnum << std::endl);
channel->queued_reliables.push(p); channel->queued_reliables.push(std::move(p));
return false; return false;
} }
@ -717,13 +714,15 @@ void ConnectionSendThread::sendPackets(float dtime)
channel.outgoing_reliables_sent.size() channel.outgoing_reliables_sent.size()
< channel.getWindowSize() && < channel.getWindowSize() &&
peer->m_increment_packets_remaining > 0) { peer->m_increment_packets_remaining > 0) {
BufferedPacket p = channel.queued_reliables.front(); BufferedPacket p = std::move(channel.queued_reliables.front());
channel.queued_reliables.pop(); channel.queued_reliables.pop();
LOG(dout_con << m_connection->getDesc() LOG(dout_con << m_connection->getDesc()
<< " INFO: sending a queued reliable packet " << " INFO: sending a queued reliable packet "
<< " channel: " << i << " channel: " << i
<< ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1]) << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
<< std::endl); << std::endl);
sendAsPacketReliable(p, &channel); sendAsPacketReliable(p, &channel);
peer->m_increment_packets_remaining--; peer->m_increment_packets_remaining--;
} }
@ -911,7 +910,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
if (data_left) { if (data_left) {
ConnectionEvent e; ConnectionEvent e;
e.dataReceived(peer_id, resultdata); e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e); m_connection->putEvent(std::move(e));
} }
} }
catch (ProcessedSilentlyException &e) { catch (ProcessedSilentlyException &e) {
@ -1022,7 +1021,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
ConnectionEvent e; ConnectionEvent e;
e.dataReceived(peer_id, resultdata); e.dataReceived(peer_id, resultdata);
m_connection->putEvent(e); m_connection->putEvent(std::move(e));
} }
catch (ProcessedSilentlyException &e) { catch (ProcessedSilentlyException &e) {
} }

View File

@ -549,14 +549,11 @@ NetworkPacket& NetworkPacket::operator<<(video::SColor src)
return *this; return *this;
} }
SharedBuffer<u8> NetworkPacket::oldForgePacket() Buffer<u8> NetworkPacket::oldForgePacket()
{ {
SharedBuffer<u8> sb(m_datasize + 2); Buffer<u8> sb(m_datasize + 2);
writeU16(&sb[0], m_command); writeU16(&sb[0], m_command);
memcpy(&sb[2], m_data.data(), m_datasize);
u8* datas = getU8Ptr(0);
if (datas != NULL)
memcpy(&sb[2], datas, m_datasize);
return sb; return sb;
} }

View File

@ -115,7 +115,8 @@ public:
NetworkPacket &operator<<(video::SColor src); NetworkPacket &operator<<(video::SColor src);
// Temp, we remove SharedBuffer when migration finished // Temp, we remove SharedBuffer when migration finished
SharedBuffer<u8> oldForgePacket(); // ^ this comment has been here for 4 years
Buffer<u8> oldForgePacket();
private: private:
void checkReadOffset(u32 from_offset, u32 field_size); void checkReadOffset(u32 from_offset, u32 field_size);

View File

@ -88,7 +88,7 @@ void TestConnection::testNetworkPacketSerialize()
}; };
if (sizeof(wchar_t) == 2) if (sizeof(wchar_t) == 2)
warningstream << __func__ << " may fail on this platform." << std::endl; warningstream << __FUNCTION__ << " may fail on this platform." << std::endl;
{ {
NetworkPacket pkt(123, 0); NetworkPacket pkt(123, 0);
@ -96,7 +96,7 @@ void TestConnection::testNetworkPacketSerialize()
// serializing wide strings should do surrogate encoding, we test that here // serializing wide strings should do surrogate encoding, we test that here
pkt << std::wstring(L"\U00020b9a"); pkt << std::wstring(L"\U00020b9a");
SharedBuffer<u8> buf = pkt.oldForgePacket(); auto buf = pkt.oldForgePacket();
UASSERTEQ(int, buf.getSize(), sizeof(expected)); UASSERTEQ(int, buf.getSize(), sizeof(expected));
UASSERT(!memcmp(expected, &buf[0], buf.getSize())); UASSERT(!memcmp(expected, &buf[0], buf.getSize()));
} }
@ -280,7 +280,7 @@ void TestConnection::testConnectSendReceive()
NetworkPacket pkt; NetworkPacket pkt;
pkt.putRawPacket((u8*) "Hello World !", 14, 0); pkt.putRawPacket((u8*) "Hello World !", 14, 0);
SharedBuffer<u8> sentdata = pkt.oldForgePacket(); auto sentdata = pkt.oldForgePacket();
infostream<<"** running client.Send()"<<std::endl; infostream<<"** running client.Send()"<<std::endl;
client.Send(PEER_ID_SERVER, 0, &pkt, true); client.Send(PEER_ID_SERVER, 0, &pkt, true);
@ -295,7 +295,7 @@ void TestConnection::testConnectSendReceive()
<< ", data=" << (const char*)pkt.getU8Ptr(0) << ", data=" << (const char*)pkt.getU8Ptr(0)
<< std::endl; << std::endl;
SharedBuffer<u8> recvdata = pkt.oldForgePacket(); auto recvdata = pkt.oldForgePacket();
UASSERT(memcmp(*sentdata, *recvdata, recvdata.getSize()) == 0); UASSERT(memcmp(*sentdata, *recvdata, recvdata.getSize()) == 0);
} }
@ -324,13 +324,13 @@ void TestConnection::testConnectSendReceive()
infostream << "..."; infostream << "...";
infostream << std::endl; infostream << std::endl;
SharedBuffer<u8> sentdata = pkt.oldForgePacket(); auto sentdata = pkt.oldForgePacket();
server.Send(peer_id_client, 0, &pkt, true); server.Send(peer_id_client, 0, &pkt, true);
//sleep_ms(3000); //sleep_ms(3000);
SharedBuffer<u8> recvdata; Buffer<u8> recvdata;
infostream << "** running client.Receive()" << std::endl; infostream << "** running client.Receive()" << std::endl;
session_t peer_id = 132; session_t peer_id = 132;
u16 size = 0; u16 size = 0;

View File

@ -140,6 +140,13 @@ public:
m_signal.post(); m_signal.post();
} }
void push_back(T &&t)
{
MutexAutoLock lock(m_mutex);
m_queue.push_back(std::move(t));
m_signal.post();
}
/* this version of pop_front returns a empty element of T on timeout. /* this version of pop_front returns a empty element of T on timeout.
* Make sure default constructor of T creates a recognizable "empty" element * Make sure default constructor of T creates a recognizable "empty" element
*/ */

View File

@ -51,6 +51,19 @@ public:
else else
data = NULL; data = NULL;
} }
Buffer(Buffer &&buffer)
{
m_size = buffer.m_size;
if(m_size != 0)
{
data = buffer.data;
buffer.data = nullptr;
buffer.m_size = 0;
}
else
data = nullptr;
}
// Copies whole buffer
Buffer(const T *t, unsigned int size) Buffer(const T *t, unsigned int size)
{ {
m_size = size; m_size = size;
@ -62,10 +75,12 @@ public:
else else
data = NULL; data = NULL;
} }
~Buffer() ~Buffer()
{ {
drop(); drop();
} }
Buffer& operator=(const Buffer &buffer) Buffer& operator=(const Buffer &buffer)
{ {
if(this == &buffer) if(this == &buffer)
@ -81,6 +96,23 @@ public:
data = NULL; data = NULL;
return *this; return *this;
} }
Buffer& operator=(Buffer &&buffer)
{
if(this == &buffer)
return *this;
drop();
m_size = buffer.m_size;
if(m_size != 0)
{
data = buffer.data;
buffer.data = nullptr;
buffer.m_size = 0;
}
else
data = nullptr;
return *this;
}
T & operator[](unsigned int i) const T & operator[](unsigned int i) const
{ {
return data[i]; return data[i];
@ -89,10 +121,12 @@ public:
{ {
return data; return data;
} }
unsigned int getSize() const unsigned int getSize() const
{ {
return m_size; return m_size;
} }
private: private:
void drop() void drop()
{ {