diff --git a/share/builtin/network/server/init.cpp b/share/builtin/network/server/init.cpp index 5cec89b..a3ef85e 100644 --- a/share/builtin/network/server/init.cpp +++ b/share/builtin/network/server/init.cpp @@ -40,6 +40,7 @@ struct Module: public interface::Module std::cout<<"network init"<sub_event(this, Event::t("core:start")); m_server->sub_event(this, Event::t("network:send")); + m_server->sub_event(this, Event::t("network:listen_event")); } ~Module() @@ -55,8 +56,8 @@ struct Module: public interface::Module if(event.type == Event::t("network:send")){ on_send_packet(*static_cast(event.p.get())); } - if(event.type == Event::t("network:listen")){ - on_listen_event(); + if(event.type == Event::t("network:listen_event")){ + on_listen_event(*static_cast(event.p.get())); } } @@ -74,9 +75,6 @@ struct Module: public interface::Module m_server->add_socket_event(m_listening_socket->fd(), Event::t("network:listen_event")); - - /*Peer::Id peer_id = m_next_peer_id++; - m_peers[peer_id] = Peer(peer_id, socket);*/ } void on_send_packet(const Packet &packet) @@ -84,9 +82,13 @@ struct Module: public interface::Module // TODO } - void on_listen_event() + void on_listen_event(const interface::SocketEvent &event) { - std::cerr<<"network: on_listen_event()"<send("foo"); PolycodeView *view = new PolycodeView("Hello Polycode!"); HelloPolycodeApp *app = new HelloPolycodeApp(view); diff --git a/src/client/state.cpp b/src/client/state.cpp index ad0752f..fa295fa 100644 --- a/src/client/state.cpp +++ b/src/client/state.cpp @@ -23,6 +23,11 @@ struct CState: public State <send_fd(data); + } }; State* createState() diff --git a/src/client/state.h b/src/client/state.h index c080c40..df9f2e6 100644 --- a/src/client/state.h +++ b/src/client/state.h @@ -11,6 +11,7 @@ namespace client { virtual ~State(){} virtual bool connect(const ss_ &address, const ss_ &port) = 0; + virtual bool send(const ss_ &data) = 0; }; State* createState(); diff --git a/src/impl/tcpsocket.cpp b/src/impl/tcpsocket.cpp index d7d1bec..d643d19 100644 --- a/src/impl/tcpsocket.cpp +++ b/src/impl/tcpsocket.cpp @@ -64,10 +64,15 @@ std::string address_bytes_to_string(const sv_ &ip) struct CTCPSocket: public TCPSocket { - int m_fd = 0; + int m_fd = -1; - CTCPSocket(int fd=0): m_fd(fd) + CTCPSocket(int fd=-1): + m_fd(fd) {} + ~CTCPSocket() + { + close_fd(); + } int fd() const { return m_fd; diff --git a/src/interface/server.h b/src/interface/server.h index 5d12a25..66a62a5 100644 --- a/src/interface/server.h +++ b/src/interface/server.h @@ -6,6 +6,12 @@ namespace interface { struct Module; + struct SocketEvent: public interface::Event::Private + { + int fd; + SocketEvent(int fd): fd(fd){} + }; + struct Server { virtual ~Server(){} diff --git a/src/interface/tcpsocket.h b/src/interface/tcpsocket.h index 3ff1ac5..0279542 100644 --- a/src/interface/tcpsocket.h +++ b/src/interface/tcpsocket.h @@ -19,6 +19,6 @@ namespace interface virtual ss_ get_remote_address() const = 0; }; - TCPSocket* createTCPSocket(int fd=0); + TCPSocket* createTCPSocket(int fd=-1); } diff --git a/src/server/main.cpp b/src/server/main.cpp index 1cfb8a9..35349ae 100644 --- a/src/server/main.cpp +++ b/src/server/main.cpp @@ -2,10 +2,12 @@ #include "server/config.h" #include "server/state.h" #include -#include +#include +#include #include #include #include +#include // strerror() server::Config g_server_config; @@ -101,11 +103,54 @@ int main(int argc, char *argv[]) state->load_modules(module_path); // Main loop - uint64_t master_t_per_tick = 100000L; // 10Hz - interval_loop(master_t_per_tick, [&](float load_avg){ + uint64_t next_tick_us = get_timeofday_us(); + uint64_t t_per_tick = 1000*100; + while(!g_sigint_received){ + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = t_per_tick; + + fd_set rfds; + FD_ZERO(&rfds); + sv_ sockets = state->get_sockets(); + int fd_max = 0; + for(int fd : sockets){ + FD_SET(fd, &rfds); + if(fd > fd_max) + fd_max = fd; + } + + int r = select(fd_max+1, &rfds, NULL, NULL, &tv); + if(r == -1){ + // Error + log_w("main", "select() returned -1: %s", strerror(errno)); + // Don't consume 100% CPU and flood logs + usleep(1000*100); + return 1; // Temporary return + } else if(r == 0){ + // Nothing happened + } else { + // Something happened + for(int fd : sockets){ + if(FD_ISSET(fd, &rfds)){ + log_d("main", "FD_ISSET: %i", fd); + state->emit_socket_event(fd); + } + } + } + + uint64_t current_us = get_timeofday_us(); + if(current_us >= next_tick_us){ + next_tick_us += t_per_tick; + if(next_tick_us < current_us - 1000*1000){ + log_w("main", "Skipping %zuus", current_us - next_tick_us); + next_tick_us = current_us; + } + state->emit_event(interface::Event("core:tick")); + } + state->handle_events(); - return !g_sigint_received; - }); + } return 0; } diff --git a/src/server/state.cpp b/src/server/state.cpp index c29d188..d7b81a0 100644 --- a/src/server/state.cpp +++ b/src/server/state.cpp @@ -169,16 +169,14 @@ struct CState: public State, public interface::Server void emit_event(const Event &event) { - log_v("state", "emit_event(): type=%zu", event.type); + log_d("state", "emit_event(): type=%zu", event.type); interface::MutexScope ms(m_event_queue_mutex); m_event_queue.push_back(event); } void handle_events() { - log_d("state", "handle_events()"); - for(;;){ - log_d("state", "m_event_subs.size()=%zu", m_event_subs.size()); + for(size_t loop_i=0; ; loop_i++){ sv_ event_queue_snapshot; sv_> event_subs_snapshot; { @@ -190,13 +188,13 @@ struct CState: public State, public interface::Server event_subs_snapshot = m_event_subs; } if(event_queue_snapshot.empty()){ + if(loop_i == 0) + log_d("state", "handle_events(); Nothing to do"); break; } for(const Event &event : event_queue_snapshot){ if(event.type >= event_subs_snapshot.size()){ - log_d("state", "handle_events(): %zu: No subs " - "(event_subs_snapshot.size()=%zu)", - event.type, event_subs_snapshot.size()); + log_d("state", "handle_events(): %zu: No subs", event.type); continue; } sv_ &sublist = event_subs_snapshot[event.type]; @@ -204,7 +202,8 @@ struct CState: public State, public interface::Server log_d("state", "handle_events(): %zu: No subs", event.type); continue; } - log_d("state", "handle_events(): %zu: Handling", event.type); + log_d("state", "handle_events(): %zu: Handling (%zu handlers)", + event.type, sublist.size()); for(ModuleWithMutex *mwm : sublist){ interface::MutexScope mwm_ms(mwm->mutex); mwm->module->event(event); @@ -215,6 +214,7 @@ struct CState: public State, public interface::Server void add_socket_event(int fd, const Event::Type &event_type) { + log_d("state", "add_socket_event(): fd=%i", fd); interface::MutexScope ms(m_sockets_mutex); auto it = m_sockets.find(fd); if(it == m_sockets.end()){ @@ -258,7 +258,10 @@ struct CState: public State, public interface::Server return; } SocketState &s = it->second; - emit_event(Event(s.event_type)); + // Create and emit event + interface::Event event(s.event_type); + event.p.reset(new interface::SocketEvent(fd)); + emit_event(event); } }; diff --git a/test/testmodules/__loader/server/init.cpp b/test/testmodules/__loader/server/init.cpp index 742282e..4836ce5 100644 --- a/test/testmodules/__loader/server/init.cpp +++ b/test/testmodules/__loader/server/init.cpp @@ -13,7 +13,7 @@ struct Module: public interface::Module interface::Server *m_server; Module(interface::Server *server): - m_server(server), + m_server(server) { std::cout<<"__loader construct"<