Accepting connections works

This commit is contained in:
Perttu Ahola 2014-09-17 18:26:58 +03:00
parent c8a932c59f
commit 8f8e2ba9be
10 changed files with 93 additions and 25 deletions

View File

@ -40,6 +40,7 @@ struct Module: public interface::Module
std::cout<<"network init"<<std::endl;
m_server->sub_event(this, Event::t("core:start"));
m_server->sub_event(this, Event::t("network:send"));
m_server->sub_event(this, Event::t("network:listen_event"));
}
~Module()
@ -55,8 +56,8 @@ struct Module: public interface::Module
if(event.type == Event::t("network:send")){
on_send_packet(*static_cast<Packet*>(event.p.get()));
}
if(event.type == Event::t("network:listen")){
on_listen_event();
if(event.type == Event::t("network:listen_event")){
on_listen_event(*static_cast<interface::SocketEvent*>(event.p.get()));
}
}
@ -74,9 +75,6 @@ struct Module: public interface::Module
m_server->add_socket_event(m_listening_socket->fd(),
Event::t("network:listen_event"));
/*Peer::Id peer_id = m_next_peer_id++;
m_peers[peer_id] = Peer(peer_id, socket);*/
}
void on_send_packet(const Packet &packet)
@ -84,9 +82,13 @@ struct Module: public interface::Module
// TODO
}
void on_listen_event()
void on_listen_event(const interface::SocketEvent &event)
{
std::cerr<<"network: on_listen_event()"<<std::endl;
std::cerr<<"network: on_listen_event(): fd="<<event.fd<<std::endl;
sp_<interface::TCPSocket> socket(interface::createTCPSocket());
socket->accept_fd(*m_listening_socket.get());
Peer::Id peer_id = m_next_peer_id++;
m_peers[peer_id] = Peer(peer_id, socket);
}
};

View File

@ -330,6 +330,7 @@ int main(int argc, char *argv[])
if(!state->connect(config.server_address, "20000"))
return 1;
state->send("foo");
PolycodeView *view = new PolycodeView("Hello Polycode!");
HelloPolycodeApp *app = new HelloPolycodeApp(view);

View File

@ -23,6 +23,11 @@ struct CState: public State
<<address<<":"<<port<<")"<<std::endl;
return ok;
}
bool send(const ss_ &data)
{
return m_socket->send_fd(data);
}
};
State* createState()

View File

@ -11,6 +11,7 @@ namespace client
{
virtual ~State(){}
virtual bool connect(const ss_ &address, const ss_ &port) = 0;
virtual bool send(const ss_ &data) = 0;
};
State* createState();

View File

@ -64,10 +64,15 @@ std::string address_bytes_to_string(const sv_<uchar> &ip)
struct CTCPSocket: public TCPSocket
{
int m_fd = 0;
int m_fd = -1;
CTCPSocket(int fd=0): m_fd(fd)
CTCPSocket(int fd=-1):
m_fd(fd)
{}
~CTCPSocket()
{
close_fd();
}
int fd() const
{
return m_fd;

View File

@ -6,6 +6,12 @@ namespace interface
{
struct Module;
struct SocketEvent: public interface::Event::Private
{
int fd;
SocketEvent(int fd): fd(fd){}
};
struct Server
{
virtual ~Server(){}

View File

@ -19,6 +19,6 @@ namespace interface
virtual ss_ get_remote_address() const = 0;
};
TCPSocket* createTCPSocket(int fd=0);
TCPSocket* createTCPSocket(int fd=-1);
}

View File

@ -2,10 +2,12 @@
#include "server/config.h"
#include "server/state.h"
#include <c55/getopt.h>
#include <c55/interval_loop.h>
#include <c55/os.h>
#include <c55/log.h>
#include <iostream>
#include <unistd.h>
#include <signal.h>
#include <string.h> // strerror()
server::Config g_server_config;
@ -101,11 +103,54 @@ int main(int argc, char *argv[])
state->load_modules(module_path);
// Main loop
uint64_t master_t_per_tick = 100000L; // 10Hz
interval_loop(master_t_per_tick, [&](float load_avg){
uint64_t next_tick_us = get_timeofday_us();
uint64_t t_per_tick = 1000*100;
while(!g_sigint_received){
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = t_per_tick;
fd_set rfds;
FD_ZERO(&rfds);
sv_<int> sockets = state->get_sockets();
int fd_max = 0;
for(int fd : sockets){
FD_SET(fd, &rfds);
if(fd > fd_max)
fd_max = fd;
}
int r = select(fd_max+1, &rfds, NULL, NULL, &tv);
if(r == -1){
// Error
log_w("main", "select() returned -1: %s", strerror(errno));
// Don't consume 100% CPU and flood logs
usleep(1000*100);
return 1; // Temporary return
} else if(r == 0){
// Nothing happened
} else {
// Something happened
for(int fd : sockets){
if(FD_ISSET(fd, &rfds)){
log_d("main", "FD_ISSET: %i", fd);
state->emit_socket_event(fd);
}
}
}
uint64_t current_us = get_timeofday_us();
if(current_us >= next_tick_us){
next_tick_us += t_per_tick;
if(next_tick_us < current_us - 1000*1000){
log_w("main", "Skipping %zuus", current_us - next_tick_us);
next_tick_us = current_us;
}
state->emit_event(interface::Event("core:tick"));
}
state->handle_events();
return !g_sigint_received;
});
}
return 0;
}

View File

@ -169,16 +169,14 @@ struct CState: public State, public interface::Server
void emit_event(const Event &event)
{
log_v("state", "emit_event(): type=%zu", event.type);
log_d("state", "emit_event(): type=%zu", event.type);
interface::MutexScope ms(m_event_queue_mutex);
m_event_queue.push_back(event);
}
void handle_events()
{
log_d("state", "handle_events()");
for(;;){
log_d("state", "m_event_subs.size()=%zu", m_event_subs.size());
for(size_t loop_i=0; ; loop_i++){
sv_<Event> event_queue_snapshot;
sv_<sv_<ModuleWithMutex*>> event_subs_snapshot;
{
@ -190,13 +188,13 @@ struct CState: public State, public interface::Server
event_subs_snapshot = m_event_subs;
}
if(event_queue_snapshot.empty()){
if(loop_i == 0)
log_d("state", "handle_events(); Nothing to do");
break;
}
for(const Event &event : event_queue_snapshot){
if(event.type >= event_subs_snapshot.size()){
log_d("state", "handle_events(): %zu: No subs "
"(event_subs_snapshot.size()=%zu)",
event.type, event_subs_snapshot.size());
log_d("state", "handle_events(): %zu: No subs", event.type);
continue;
}
sv_<ModuleWithMutex*> &sublist = event_subs_snapshot[event.type];
@ -204,7 +202,8 @@ struct CState: public State, public interface::Server
log_d("state", "handle_events(): %zu: No subs", event.type);
continue;
}
log_d("state", "handle_events(): %zu: Handling", event.type);
log_d("state", "handle_events(): %zu: Handling (%zu handlers)",
event.type, sublist.size());
for(ModuleWithMutex *mwm : sublist){
interface::MutexScope mwm_ms(mwm->mutex);
mwm->module->event(event);
@ -215,6 +214,7 @@ struct CState: public State, public interface::Server
void add_socket_event(int fd, const Event::Type &event_type)
{
log_d("state", "add_socket_event(): fd=%i", fd);
interface::MutexScope ms(m_sockets_mutex);
auto it = m_sockets.find(fd);
if(it == m_sockets.end()){
@ -258,7 +258,10 @@ struct CState: public State, public interface::Server
return;
}
SocketState &s = it->second;
emit_event(Event(s.event_type));
// Create and emit event
interface::Event event(s.event_type);
event.p.reset(new interface::SocketEvent(fd));
emit_event(event);
}
};

View File

@ -13,7 +13,7 @@ struct Module: public interface::Module
interface::Server *m_server;
Module(interface::Server *server):
m_server(server),
m_server(server)
{
std::cout<<"__loader construct"<<std::endl;
}