server: Packet buffering and handling

This commit is contained in:
Perttu Ahola 2014-09-18 01:35:53 +03:00
parent c2bd1b151e
commit 55541903ab
5 changed files with 80 additions and 6 deletions

View File

@ -7,7 +7,9 @@
#include "core/log.h"
//#include <cereal/archives/binary.hpp>
//#include <cereal/types/string.hpp>
#include <iostream>
#include <deque>
#include <sys/socket.h>
#include <cstring> // strerror()
using interface::Event;
@ -20,6 +22,7 @@ struct Peer
Id id = 0;
sp_<interface::TCPSocket> socket;
Packet::Type highest_known_type = 99;
std::deque<char> socket_buffer;
Peer(){}
Peer(Id id, sp_<interface::TCPSocket> socket):
@ -55,6 +58,7 @@ struct Module: public interface::Module, public network::Interface
interface::Server *m_server;
sp_<interface::TCPSocket> m_listening_socket;
sm_<Peer::Id, Peer> m_peers;
sm_<int, Peer*> m_peers_by_socket;
size_t m_next_peer_id = 1;
PacketTypeRegistry m_packet_types;
@ -73,6 +77,7 @@ struct Module: public interface::Module, public network::Interface
log_v(MODULE, "network init");
m_server->sub_event(this, Event::t("core:start"));
m_server->sub_event(this, Event::t("network:listen_event"));
m_server->sub_event(this, Event::t("network:incoming_data"));
}
~Module()
@ -86,6 +91,7 @@ struct Module: public interface::Module, public network::Interface
EVENT_VOIDN("core:start", on_start)
EVENT_TYPEN("network:listen_event", on_listen_event, interface::SocketEvent)
EVENT_TYPEN("network:incoming_data", on_incoming_data, interface::SocketEvent)
}
void* get_interface()
@ -119,11 +125,68 @@ struct Module: public interface::Module, public network::Interface
// Store socket
Peer::Id peer_id = m_next_peer_id++;
m_peers[peer_id] = Peer(peer_id, socket);
m_peers_by_socket[socket->fd()] = &m_peers[peer_id];
// Emit event
PeerInfo pinfo;
pinfo.id = peer_id;
pinfo.address = socket->get_remote_address();
m_server->emit_event("network:new_client", new NewClient(pinfo));
m_server->add_socket_event(socket->fd(),
Event::t("network:incoming_data"));
}
void on_incoming_data(const interface::SocketEvent &event)
{
log_i(MODULE, "network: on_incoming_data(): fd=%i", event.fd);
auto it = m_peers_by_socket.find(event.fd);
if(it == m_peers_by_socket.end()){
log_w(MODULE, "network: Peer with fd=%i not found", event.fd);
return;
}
Peer &peer = *it->second;
int fd = peer.socket->fd();
if(fd != event.fd)
throw Exception("on_incoming_data: fds don't match");
char buf[100000];
ssize_t r = recv(fd, buf, 100000, 0);
if(r == -1)
throw Exception(ss_()+"Receive failed: "+strerror(errno));
if(r == 0){
log_i(MODULE, "Peer %zu disconnected", peer.id);
m_server->remove_socket_event(peer.socket->fd());
m_peers_by_socket.erase(peer.socket->fd());
m_peers.erase(peer.id);
return;
}
log_i(MODULE, "Received %zu bytes", r);
peer.socket_buffer.insert(peer.socket_buffer.end(), buf, buf + r);
for(;;){
if(peer.socket_buffer.size() < 6)
return;
size_t type =
peer.socket_buffer[0]<<0 |
peer.socket_buffer[1]<<8;
size_t size =
peer.socket_buffer[2]<<0 |
peer.socket_buffer[3]<<8 |
peer.socket_buffer[4]<<16 |
peer.socket_buffer[5]<<24;
log_i(MODULE, "size=%zu", size);
if(peer.socket_buffer.size() < 6 + size)
return;
log_i(MODULE, "Received full packet; type=%zu, length=6+%zu",
type, size);
ss_ data(&peer.socket_buffer[6], size);
peer.socket_buffer.erase(peer.socket_buffer.begin(),
peer.socket_buffer.begin() + 6 + size);
// Emit event
m_server->emit_event("network:packet_received",
new Packet(0, type, data));
}
}
void send_u(Peer &peer, const Packet::Type &type, const ss_ &data)

View File

@ -3,7 +3,6 @@
#include "interface/tcpsocket.h"
//#include <cereal/archives/binary.hpp>
//#include <cereal/types/string.hpp>
#include <iostream>
#include <cstring>
#include <deque>
#include <sys/socket.h>

View File

@ -125,9 +125,13 @@ int main(int argc, char *argv[])
if(r == -1){
// Error
log_w("main", "select() returned -1: %s", strerror(errno));
// Don't consume 100% CPU and flood logs
usleep(1000 * 100);
return 1; // Temporary return
if(errno == EBADF || errno == EINTR){
// These are temporary errors
} else {
// Don't consume 100% CPU and flood logs
usleep(1000 * 100);
return 1;
}
} else if(r == 0){
// Nothing happened
} else {

View File

@ -236,7 +236,7 @@ struct CState: public State, public interface::Server
void remove_socket_event(int fd)
{
interface::MutexScope ms(m_sockets_mutex);
// TODO
m_sockets.erase(fd);
}
sv_<int> get_sockets()

View File

@ -33,6 +33,7 @@ struct Module: public interface::Module
m_server->sub_event(this, Event::t("core:start"));
m_server->sub_event(this, m_EventType_test1_thing);
m_server->sub_event(this, Event::t("network:new_client"));
m_server->sub_event(this, Event::t("network:packet_received"));
}
~Module()
@ -47,6 +48,7 @@ struct Module: public interface::Module
EVENT_VOIDN("core:start", on_start)
EVENT_TYPE(m_EventType_test1_thing, on_thing, Thing)
EVENT_TYPEN("network:new_client", on_new_client, network::NewClient)
EVENT_TYPEN("network:packet_received", on_packet_received, network::Packet)
}
void on_start()
@ -65,6 +67,12 @@ struct Module: public interface::Module
network::Interface *inetwork = network::get_interface(m_server);
inetwork->send(new_client.info.id, "test1:dummy", "dummy data");
}
void on_packet_received(const network::Packet &packet)
{
log_i(MODULE, "test1::on_packet_received: type=%zu, size=%zu",
packet.type, packet.data.size());
}
};
extern "C" {