From b7f2ba0da288cc1bf7fdd6073491ede8ed7a23f3 Mon Sep 17 00:00:00 2001 From: sfan5 Date: Fri, 17 Sep 2021 18:14:25 +0200 Subject: [PATCH] Shave off buffer copies in networking code (#11607) --- src/network/connection.cpp | 72 ++++++++++++++++--------------- src/network/connection.h | 40 +++++++---------- src/network/connectionthreads.cpp | 41 +++++++++--------- src/network/networkpacket.cpp | 9 ++-- src/network/networkpacket.h | 3 +- src/unittest/test_connection.cpp | 12 +++--- src/util/container.h | 7 +++ src/util/pointer.h | 34 +++++++++++++++ 8 files changed, 126 insertions(+), 92 deletions(-) diff --git a/src/network/connection.cpp b/src/network/connection.cpp index d4813b955..060cbcb18 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -200,17 +200,12 @@ RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum) return i; } -RPBSearchResult ReliablePacketBuffer::notFound() -{ - return m_list.end(); -} - bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) return false; - const BufferedPacket &p = *m_list.begin(); + const BufferedPacket &p = m_list.front(); result = readU16(&p.data[BASE_HEADER_SIZE + 1]); return true; } @@ -220,14 +215,14 @@ BufferedPacket ReliablePacketBuffer::popFirst() MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) throw NotFoundException("Buffer is empty"); - BufferedPacket p = *m_list.begin(); - m_list.erase(m_list.begin()); + BufferedPacket p = std::move(m_list.front()); + m_list.pop_front(); if (m_list.empty()) { m_oldest_non_answered_ack = 0; } else { m_oldest_non_answered_ack = - readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); + readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]); } return p; } @@ -241,15 +236,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) << " not found in reliable buffer"<data[BASE_HEADER_SIZE+1])); - m_oldest_non_answered_ack = s; - } + BufferedPacket p = std::move(*r); m_list.erase(r); @@ -257,12 +244,12 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) m_oldest_non_answered_ack = 0; } else { m_oldest_non_answered_ack = - readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); + readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]); } return p; } -void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) +void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected) { MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { @@ -355,7 +342,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) } /* update last packet number */ - m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); + m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]); } void ReliablePacketBuffer::incrementTimeouts(float dtime) @@ -367,17 +354,19 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) } } -std::list ReliablePacketBuffer::getTimedOuts(float timeout, - unsigned int max_packets) +std::list + ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets) { MutexAutoLock listlock(m_list_mutex); std::list timed_outs; for (BufferedPacket &bufferedPacket : m_list) { if (bufferedPacket.time >= timeout) { + // caller will resend packet so reset time and increase counter + bufferedPacket.time = 0.0f; + bufferedPacket.resend_count++; + timed_outs.push_back(bufferedPacket); - //this packet will be sent right afterwards reset timeout here - bufferedPacket.time = 0.0f; if (timed_outs.size() >= max_packets) break; } @@ -1053,20 +1042,20 @@ bool UDPPeer::processReliableSendCommand( m_connection->GetProtocolID(), m_connection->GetPeerID(), c.channelnum); - toadd.push(p); + toadd.push(std::move(p)); } if (have_sequence_number) { volatile u16 pcount = 0; while (!toadd.empty()) { - BufferedPacket p = toadd.front(); + BufferedPacket p = std::move(toadd.front()); toadd.pop(); // LOG(dout_con<getDesc() // << " queuing reliable packet for peer_id: " << c.peer_id // << " channel: " << (c.channelnum&0xFF) // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) // << std::endl) - chan.queued_reliables.push(p); + chan.queued_reliables.push(std::move(p)); pcount++; } sanity_check(chan.queued_reliables.size() < 0xFFFF); @@ -1210,12 +1199,19 @@ Connection::~Connection() } /* Internal stuff */ -void Connection::putEvent(ConnectionEvent &e) + +void Connection::putEvent(const ConnectionEvent &e) { assert(e.type != CONNEVENT_NONE); // Pre-condition m_event_queue.push_back(e); } +void Connection::putEvent(ConnectionEvent &&e) +{ + assert(e.type != CONNEVENT_NONE); // Pre-condition + m_event_queue.push_back(std::move(e)); +} + void Connection::TriggerSend() { m_sendThread->Trigger(); @@ -1301,7 +1297,7 @@ ConnectionEvent Connection::waitEvent(u32 timeout_ms) } } -void Connection::putCommand(ConnectionCommand &c) +void Connection::putCommand(const ConnectionCommand &c) { if (!m_shutting_down) { m_command_queue.push_back(c); @@ -1309,6 +1305,14 @@ void Connection::putCommand(ConnectionCommand &c) } } +void Connection::putCommand(ConnectionCommand &&c) +{ + if (!m_shutting_down) { + m_command_queue.push_back(std::move(c)); + m_sendThread->Trigger(); + } +} + void Connection::Serve(Address bind_addr) { ConnectionCommand c; @@ -1410,7 +1414,7 @@ void Connection::Send(session_t peer_id, u8 channelnum, ConnectionCommand c; c.send(peer_id, channelnum, pkt, reliable); - putCommand(c); + putCommand(std::move(c)); } Address Connection::GetPeerAddress(session_t peer_id) @@ -1510,12 +1514,12 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) << "createPeer(): giving peer_id=" << peer_id_new << std::endl); ConnectionCommand cmd; - SharedBuffer reply(4); + Buffer reply(4); writeU8(&reply[0], PACKET_TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID); writeU16(&reply[2], peer_id_new); cmd.createPeer(peer_id_new,reply); - putCommand(cmd); + putCommand(std::move(cmd)); // Create peer addition event ConnectionEvent e; @@ -1562,7 +1566,7 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) writeU16(&ack[2], seqnum); c.ack(peer_id, channelnum, ack); - putCommand(c); + putCommand(std::move(c)); m_sendThread->Trigger(); } diff --git a/src/network/connection.h b/src/network/connection.h index 346c0f5dc..e6330affd 100644 --- a/src/network/connection.h +++ b/src/network/connection.h @@ -19,7 +19,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #pragma once -#include "irrlichttypes_bloated.h" +#include "irrlichttypes.h" #include "peerhandler.h" #include "socket.h" #include "constants.h" @@ -29,7 +29,6 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "util/numeric.h" #include "networkprotocol.h" #include -#include #include #include @@ -243,20 +242,19 @@ public: BufferedPacket popFirst(); BufferedPacket popSeqnum(u16 seqnum); - void insert(BufferedPacket &p, u16 next_expected); + void insert(const BufferedPacket &p, u16 next_expected); void incrementTimeouts(float dtime); - std::list getTimedOuts(float timeout, - unsigned int max_packets); + std::list getTimedOuts(float timeout, u32 max_packets); void print(); bool empty(); - RPBSearchResult notFound(); u32 size(); private: RPBSearchResult findPacket(u16 seqnum); // does not perform locking + inline RPBSearchResult notFound() { return m_list.end(); } std::list m_list; @@ -331,18 +329,6 @@ struct ConnectionCommand bool raw = false; ConnectionCommand() = default; - ConnectionCommand &operator=(const ConnectionCommand &other) - { - type = other.type; - address = other.address; - peer_id = other.peer_id; - channelnum = other.channelnum; - // We must copy the buffer here to prevent race condition - data = SharedBuffer(*other.data, other.data.getSize()); - reliable = other.reliable; - raw = other.raw; - return *this; - } void serve(Address address_) { @@ -366,7 +352,7 @@ struct ConnectionCommand void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_); - void ack(session_t peer_id_, u8 channelnum_, const SharedBuffer &data_) + void ack(session_t peer_id_, u8 channelnum_, const Buffer &data_) { type = CONCMD_ACK; peer_id = peer_id_; @@ -375,7 +361,7 @@ struct ConnectionCommand reliable = false; } - void createPeer(session_t peer_id_, const SharedBuffer &data_) + void createPeer(session_t peer_id_, const Buffer &data_) { type = CONCMD_CREATE_PEER; peer_id = peer_id_; @@ -719,7 +705,7 @@ struct ConnectionEvent ConnectionEvent() = default; - std::string describe() + const char *describe() const { switch(type) { case CONNEVENT_NONE: @@ -736,7 +722,7 @@ struct ConnectionEvent return "Invalid ConnectionEvent"; } - void dataReceived(session_t peer_id_, const SharedBuffer &data_) + void dataReceived(session_t peer_id_, const Buffer &data_) { type = CONNEVENT_DATA_RECEIVED; peer_id = peer_id_; @@ -775,7 +761,9 @@ public: /* Interface */ ConnectionEvent waitEvent(u32 timeout_ms); - void putCommand(ConnectionCommand &c); + // Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible + void putCommand(const ConnectionCommand &c); + void putCommand(ConnectionCommand &&c); void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; } void Serve(Address bind_addr); @@ -814,11 +802,14 @@ protected: } UDPSocket m_udpSocket; + // Command queue: user -> SendThread MutexedQueue m_command_queue; bool Receive(NetworkPacket *pkt, u32 timeout); - void putEvent(ConnectionEvent &e); + // Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible + void putEvent(const ConnectionEvent &e); + void putEvent(ConnectionEvent &&e); void TriggerSend(); @@ -827,6 +818,7 @@ protected: return getPeerNoEx(PEER_ID_SERVER) != nullptr; } private: + // Event queue: ReceiveThread -> user MutexedQueue m_event_queue; session_t m_peer_id = 0; diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index f7a8aadf4..d048ccdf4 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -174,6 +174,11 @@ void ConnectionSendThread::runTimeouts(float dtime) std::vector timeouted_peers; std::vector peerIds = m_connection->getPeerIDs(); + const u32 numpeers = m_connection->m_peers.size(); + + if (numpeers == 0) + return; + for (session_t &peerId : peerIds) { PeerHelper peer = m_connection->getPeerNoEx(peerId); @@ -209,7 +214,6 @@ void ConnectionSendThread::runTimeouts(float dtime) float resend_timeout = udpPeer->getResendTimeout(); bool retry_count_exceeded = false; for (Channel &channel : udpPeer->channels) { - std::list timed_outs; // Remove timed out incomplete unreliable split packets channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); @@ -217,13 +221,8 @@ void ConnectionSendThread::runTimeouts(float dtime) // Increment reliable packet times channel.outgoing_reliables_sent.incrementTimeouts(dtime); - unsigned int numpeers = m_connection->m_peers.size(); - - if (numpeers == 0) - return; - // Re-send timed out outgoing reliables - timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout, + auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout, (m_max_data_packets_per_iteration / numpeers)); channel.UpdatePacketLossCounter(timed_outs.size()); @@ -231,16 +230,14 @@ void ConnectionSendThread::runTimeouts(float dtime) m_iteration_packets_avaialble -= timed_outs.size(); - for (std::list::iterator k = timed_outs.begin(); - k != timed_outs.end(); ++k) { - session_t peer_id = readPeerId(*(k->data)); - u8 channelnum = readChannel(*(k->data)); - u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1])); + for (const auto &k : timed_outs) { + session_t peer_id = readPeerId(*k.data); + u8 channelnum = readChannel(*k.data); + u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1])); - channel.UpdateBytesLost(k->data.getSize()); - k->resend_count++; + channel.UpdateBytesLost(k.data.getSize()); - if (k->resend_count > MAX_RELIABLE_RETRY) { + if (k.resend_count > MAX_RELIABLE_RETRY) { retry_count_exceeded = true; timeouted_peers.push_back(peer->id); /* no need to check additional packets if a single one did timeout*/ @@ -249,14 +246,14 @@ void ConnectionSendThread::runTimeouts(float dtime) LOG(derr_con << m_connection->getDesc() << "RE-SENDING timed-out RELIABLE to " - << k->address.serializeString() + << k.address.serializeString() << "(t/o=" << resend_timeout << "): " << "from_peer_id=" << peer_id << ", channel=" << ((int) channelnum & 0xff) << ", seqnum=" << seqnum << std::endl); - rawSend(*k); + rawSend(k); // do not handle rtt here as we can't decide if this packet was // lost or really takes more time to transmit @@ -375,7 +372,7 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, << " INFO: queueing reliable packet for peer_id: " << peer_id << " channel: " << (u32)channelnum << " seqnum: " << seqnum << std::endl); - channel->queued_reliables.push(p); + channel->queued_reliables.push(std::move(p)); return false; } @@ -726,13 +723,15 @@ void ConnectionSendThread::sendPackets(float dtime) channel.outgoing_reliables_sent.size() < channel.getWindowSize() && peer->m_increment_packets_remaining > 0) { - BufferedPacket p = channel.queued_reliables.front(); + BufferedPacket p = std::move(channel.queued_reliables.front()); channel.queued_reliables.pop(); + LOG(dout_con << m_connection->getDesc() << " INFO: sending a queued reliable packet " << " channel: " << i << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1]) << std::endl); + sendAsPacketReliable(p, &channel); peer->m_increment_packets_remaining--; } @@ -920,7 +919,7 @@ void ConnectionReceiveThread::receive(SharedBuffer &packetdata, if (data_left) { ConnectionEvent e; e.dataReceived(peer_id, resultdata); - m_connection->putEvent(e); + m_connection->putEvent(std::move(e)); } } catch (ProcessedSilentlyException &e) { @@ -1031,7 +1030,7 @@ void ConnectionReceiveThread::receive(SharedBuffer &packetdata, ConnectionEvent e; e.dataReceived(peer_id, resultdata); - m_connection->putEvent(e); + m_connection->putEvent(std::move(e)); } catch (ProcessedSilentlyException &e) { } diff --git a/src/network/networkpacket.cpp b/src/network/networkpacket.cpp index e75214075..78151df94 100644 --- a/src/network/networkpacket.cpp +++ b/src/network/networkpacket.cpp @@ -559,14 +559,11 @@ NetworkPacket& NetworkPacket::operator<<(video::SColor src) return *this; } -SharedBuffer NetworkPacket::oldForgePacket() +Buffer NetworkPacket::oldForgePacket() { - SharedBuffer sb(m_datasize + 2); + Buffer sb(m_datasize + 2); writeU16(&sb[0], m_command); + memcpy(&sb[2], m_data.data(), m_datasize); - u8* datas = getU8Ptr(0); - - if (datas != NULL) - memcpy(&sb[2], datas, m_datasize); return sb; } diff --git a/src/network/networkpacket.h b/src/network/networkpacket.h index 04bb19fe4..6a486767f 100644 --- a/src/network/networkpacket.h +++ b/src/network/networkpacket.h @@ -117,7 +117,8 @@ public: NetworkPacket &operator<<(video::SColor src); // Temp, we remove SharedBuffer when migration finished - SharedBuffer oldForgePacket(); + // ^ this comment has been here for 4 years + Buffer oldForgePacket(); inline void setProtocolVersion(const u16 protocol_version) { diff --git a/src/unittest/test_connection.cpp b/src/unittest/test_connection.cpp index c6d9b1bbc..13b72a4a6 100644 --- a/src/unittest/test_connection.cpp +++ b/src/unittest/test_connection.cpp @@ -88,7 +88,7 @@ void TestConnection::testNetworkPacketSerialize() }; if (sizeof(wchar_t) == 2) - warningstream << __func__ << " may fail on this platform." << std::endl; + warningstream << __FUNCTION__ << " may fail on this platform." << std::endl; { NetworkPacket pkt(123, 0); @@ -96,7 +96,7 @@ void TestConnection::testNetworkPacketSerialize() // serializing wide strings should do surrogate encoding, we test that here pkt << std::wstring(L"\U00020b9a"); - SharedBuffer buf = pkt.oldForgePacket(); + auto buf = pkt.oldForgePacket(); UASSERTEQ(int, buf.getSize(), sizeof(expected)); UASSERT(!memcmp(expected, &buf[0], buf.getSize())); } @@ -280,7 +280,7 @@ void TestConnection::testConnectSendReceive() NetworkPacket pkt; pkt.putRawPacket((u8*) "Hello World !", 14, 0); - SharedBuffer sentdata = pkt.oldForgePacket(); + auto sentdata = pkt.oldForgePacket(); infostream<<"** running client.Send()"< sentdata = pkt.oldForgePacket(); + auto sentdata = pkt.oldForgePacket(); server.Send(peer_id_client, 0, &pkt, true); //sleep_ms(3000); - SharedBuffer recvdata; + Buffer recvdata; infostream << "** running client.Receive()" << std::endl; session_t peer_id = 132; u16 size = 0; diff --git a/src/util/container.h b/src/util/container.h index be427ecd2..d22c7d6f0 100644 --- a/src/util/container.h +++ b/src/util/container.h @@ -143,6 +143,13 @@ public: m_signal.post(); } + void push_back(T &&t) + { + MutexAutoLock lock(m_mutex); + m_queue.push_back(std::move(t)); + m_signal.post(); + } + /* this version of pop_front returns a empty element of T on timeout. * Make sure default constructor of T creates a recognizable "empty" element */ diff --git a/src/util/pointer.h b/src/util/pointer.h index 4d1a96c5a..d7fcc1c7e 100644 --- a/src/util/pointer.h +++ b/src/util/pointer.h @@ -51,6 +51,19 @@ public: else data = NULL; } + Buffer(Buffer &&buffer) + { + m_size = buffer.m_size; + if(m_size != 0) + { + data = buffer.data; + buffer.data = nullptr; + buffer.m_size = 0; + } + else + data = nullptr; + } + // Copies whole buffer Buffer(const T *t, unsigned int size) { m_size = size; @@ -62,10 +75,12 @@ public: else data = NULL; } + ~Buffer() { drop(); } + Buffer& operator=(const Buffer &buffer) { if(this == &buffer) @@ -81,6 +96,23 @@ public: data = NULL; return *this; } + Buffer& operator=(Buffer &&buffer) + { + if(this == &buffer) + return *this; + drop(); + m_size = buffer.m_size; + if(m_size != 0) + { + data = buffer.data; + buffer.data = nullptr; + buffer.m_size = 0; + } + else + data = nullptr; + return *this; + } + T & operator[](unsigned int i) const { return data[i]; @@ -89,10 +121,12 @@ public: { return data; } + unsigned int getSize() const { return m_size; } + private: void drop() {