Manage sockets somehow sanely (WIP)

This commit is contained in:
Perttu Ahola 2014-09-17 16:43:05 +03:00
parent a3892bad3f
commit f62a6b59b7
9 changed files with 144 additions and 38 deletions

View File

@ -0,0 +1,29 @@
#include "interface/event.h"
namespace network
{
struct PeerInfo
{
typedef size_t Id;
Id id = 0;
ss_ address;
};
struct Packet: public interface::Event::Private
{
typedef size_t Type;
Type type;
ss_ data;
Packet(const Type &type, const ss_ &data): type(type), data(data){}
};
inline void send(interface::Server *server,
const Packet::Type &type, const ss_ &data)
{
interface::Event event("network:send");
event.p.reset(new network::Packet(type, data));
server->emit_event(event);
}
}

View File

@ -2,21 +2,35 @@
#include "interface/server.h"
#include "interface/event.h"
#include "interface/tcpsocket.h"
//#include "network/include/api.h"
#include "network/include/api.h"
#include <iostream>
using interface::Event;
namespace network {
struct Peer
{
typedef size_t Id;
Id id = 0;
sp_<interface::TCPSocket> socket;
Peer(){}
Peer(Id id, sp_<interface::TCPSocket> socket):
id(id), socket(socket){}
};
struct Module: public interface::Module
{
interface::Server *m_server;
sp_<interface::TCPSocket> m_socket;
sp_<interface::TCPSocket> m_listening_socket;
sm_<Peer::Id, Peer> m_peers;
size_t m_next_peer_id = 1;
Module(interface::Server *server):
m_server(server),
m_socket(interface::createTCPSocket())
m_listening_socket(interface::createTCPSocket())
{
std::cout<<"network construct"<<std::endl;
}
@ -24,7 +38,8 @@ struct Module: public interface::Module
void init()
{
std::cout<<"network init"<<std::endl;
m_server->sub_event(this, interface::Event::t("core:start"));
m_server->sub_event(this, Event::t("core:start"));
m_server->sub_event(this, Event::t("network:send"));
}
~Module()
@ -32,25 +47,46 @@ struct Module: public interface::Module
std::cout<<"network destruct"<<std::endl;
}
void event(const interface::Event &event)
void event(const Event &event)
{
if(event.type == interface::Event::t("core:start")){
start();
if(event.type == Event::t("core:start")){
on_start();
}
if(event.type == Event::t("network:send")){
on_send_packet(*static_cast<Packet*>(event.p.get()));
}
if(event.type == Event::t("network:listen")){
on_listen_event();
}
}
void start()
void on_start()
{
ss_ address = "any4";
ss_ port = "20000";
if(!m_socket->bind_fd(address, port) ||
!m_socket->listen_fd()){
if(!m_listening_socket->bind_fd(address, port) ||
!m_listening_socket->listen_fd()){
std::cerr<<"Failed to bind to "<<address<<":"<<port<<std::endl;
} else {
std::cerr<<"Listening at "<<address<<":"<<port<<std::endl;
}
//m_server->add_socket(m_socket);
m_server->add_socket_event(m_listening_socket->fd(),
Event::t("network:listen_event"));
/*Peer::Id peer_id = m_next_peer_id++;
m_peers[peer_id] = Peer(peer_id, socket);*/
}
void on_send_packet(const Packet &packet)
{
// TODO
}
void on_listen_event()
{
std::cerr<<"network: on_listen_event()"<<std::endl;
}
};

View File

@ -66,6 +66,8 @@ struct CTCPSocket: public TCPSocket
{
int m_fd = 0;
CTCPSocket(int fd=0): m_fd(fd)
{}
int fd() const
{
return m_fd;
@ -288,9 +290,9 @@ struct CTCPSocket: public TCPSocket
}
};
TCPSocket* createTCPSocket()
TCPSocket* createTCPSocket(int fd)
{
return new CTCPSocket();
return new CTCPSocket(fd);
}
}

View File

@ -9,11 +9,16 @@ namespace interface
struct Server
{
virtual ~Server(){}
virtual void load_module(const ss_ &module_name, const ss_ &path) = 0;
virtual ss_ get_modules_path() = 0;
virtual ss_ get_builtin_modules_path() = 0;
virtual bool has_module(const ss_ &module_name) = 0;
virtual void sub_event(struct Module *module, const Event::Type &type) = 0;
virtual void emit_event(const Event &event) = 0;
virtual void add_socket_event(int fd, const Event::Type &event_type) = 0;
virtual void remove_socket_event(int fd) = 0;
};
}

View File

@ -19,6 +19,6 @@ namespace interface
virtual ss_ get_remote_address() const = 0;
};
TCPSocket* createTCPSocket();
TCPSocket* createTCPSocket(int fd=0);
}

View File

@ -10,6 +10,8 @@
#include <iostream>
#include <algorithm>
using interface::Event;
extern server::Config g_server_config;
namespace server {
@ -23,18 +25,26 @@ struct CState: public State, public interface::Server
ModuleWithMutex(interface::Module *module=NULL): module(module){}
};
struct SocketState {
int fd = 0;
Event::Type event_type;
};
up_<rccpp::Compiler> m_compiler;
ss_ m_modules_path;
sm_<ss_, ModuleWithMutex> m_modules;
interface::Mutex m_modules_mutex;
sv_<interface::Event> m_event_queue;
sv_<Event> m_event_queue;
interface::Mutex m_event_queue_mutex;
sv_<sv_<ModuleWithMutex*>> m_event_subs;
interface::Mutex m_event_subs_mutex;
sm_<int, SocketState> m_sockets;
interface::Mutex m_sockets_mutex;
CState():
m_compiler(rccpp::createCompiler())
{
@ -44,6 +54,8 @@ struct CState: public State, public interface::Server
g_server_config.interface_path+"/..");
m_compiler->include_directories.push_back(
g_server_config.interface_path+"/../../3rdparty/cereal/include");
m_compiler->include_directories.push_back(
g_server_config.share_path+"/builtin");
}
~CState()
{
@ -83,10 +95,10 @@ struct CState: public State, public interface::Server
ss_ first_module_path = path+"/__loader";
load_module("__loader", first_module_path);
// Allow loader load other modules
emit_event(interface::Event("core:load_modules"));
emit_event(Event("core:load_modules"));
handle_events();
// Now that everyone is listening, we can fire the start event
emit_event(interface::Event("core:start"));
emit_event(Event("core:start"));
handle_events();
}
@ -124,7 +136,7 @@ struct CState: public State, public interface::Server
}
void sub_event(struct interface::Module *module,
const interface::Event::Type &type)
const Event::Type &type)
{
// Lock modules so that the subscribing one isn't removed asynchronously
interface::MutexScope ms(m_modules_mutex);
@ -155,7 +167,7 @@ struct CState: public State, public interface::Server
sublist.push_back(mwm0);
}
void emit_event(const interface::Event &event)
void emit_event(const Event &event)
{
log_v("state", "emit_event(): type=%zu", event.type);
interface::MutexScope ms(m_event_queue_mutex);
@ -167,7 +179,7 @@ struct CState: public State, public interface::Server
log_d("state", "handle_events()");
for(;;){
log_d("state", "m_event_subs.size()=%zu", m_event_subs.size());
sv_<interface::Event> event_queue_snapshot;
sv_<Event> event_queue_snapshot;
sv_<sv_<ModuleWithMutex*>> event_subs_snapshot;
{
interface::MutexScope ms2(m_event_queue_mutex);
@ -180,7 +192,7 @@ struct CState: public State, public interface::Server
if(event_queue_snapshot.empty()){
break;
}
for(const interface::Event &event : event_queue_snapshot){
for(const Event &event : event_queue_snapshot){
if(event.type >= event_subs_snapshot.size()){
log_d("state", "handle_events(): %zu: No subs "
"(event_subs_snapshot.size()=%zu)",
@ -200,6 +212,30 @@ struct CState: public State, public interface::Server
}
}
}
void add_socket_event(int fd, const Event::Type &event_type)
{
interface::MutexScope ms(m_sockets_mutex);
auto it = m_sockets.find(fd);
if(it == m_sockets.end()){
SocketState s;
s.fd = fd;
s.event_type = event_type;
m_sockets[fd] = s;
return;
}
const SocketState &s = it->second;
if(s.event_type != event_type){
throw Exception("Socket events already requested with different"
" event type");
}
// Nothing to do; already set.
}
void remove_socket_event(int fd)
{
// TODO
}
};
State* createState()

View File

@ -11,11 +11,9 @@ namespace __loader {
struct Module: public interface::Module
{
interface::Server *m_server;
Event::Type m_EventType_core_load_modules;
Module(interface::Server *server):
m_server(server),
m_EventType_core_load_modules(interface::Event::t("core:load_modules"))
{
std::cout<<"__loader construct"<<std::endl;
}
@ -23,7 +21,7 @@ struct Module: public interface::Module
void init()
{
std::cout<<"__loader init"<<std::endl;
m_server->sub_event(this, m_EventType_core_load_modules);
m_server->sub_event(this, Event::t("core:load_modules"));
}
~Module()
@ -31,14 +29,14 @@ struct Module: public interface::Module
std::cout<<"__loader destruct"<<std::endl;
}
void event(const interface::Event &event)
void event(const Event &event)
{
if(event.type == m_EventType_core_load_modules){
load_modules();
if(event.type == Event::t("core:load_modules")){
on_load_modules();
}
}
void load_modules()
void on_load_modules()
{
m_server->load_module("network",
m_server->get_builtin_modules_path()+"/network");

View File

@ -15,7 +15,7 @@ struct Module: public interface::Module
Module(interface::Server *server):
m_server(server),
m_EventType_test1_thing(interface::Event::t("test1:thing"))
m_EventType_test1_thing(Event::t("test1:thing"))
{
std::cout<<"test1 construct"<<std::endl;
}
@ -31,16 +31,16 @@ struct Module: public interface::Module
std::cout<<"test1 destruct"<<std::endl;
}
void event(const interface::Event &event)
void event(const Event &event)
{
if(event.type == m_EventType_test1_thing){
thing(static_cast<Thing*>(event.p.get()));
on_thing(*static_cast<Thing*>(event.p.get()));
}
}
void thing(const Thing *thing)
void on_thing(const Thing &thing)
{
std::cout<<"test1.thing: some_data="<<thing->some_data<<std::endl;
std::cout<<"test1.thing: some_data="<<thing.some_data<<std::endl;
}
};

View File

@ -15,7 +15,7 @@ struct Module: public interface::Module
Module(interface::Server *server):
m_server(server),
m_EventType_core_start(interface::Event::t("core:start"))
m_EventType_core_start(Event::t("core:start"))
{
std::cout<<"test2 construct"<<std::endl;
}
@ -31,19 +31,19 @@ struct Module: public interface::Module
std::cout<<"test2 destruct"<<std::endl;
}
void event(const interface::Event &event)
void event(const Event &event)
{
if(event.type == m_EventType_core_start){
start();
on_start();
}
}
void start()
void on_start()
{
std::cout<<"test2 start(): Calling test1"<<std::endl;
// Basic way
interface::Event event("test1:thing");
Event event("test1:thing");
event.p.reset(new test1::Thing("Nakki"));
m_server->emit_event(event);