Event loop works

This commit is contained in:
Perttu Ahola 2014-09-17 14:53:06 +03:00
parent 7e3dc3f5db
commit 220b858749
12 changed files with 175 additions and 43 deletions

View File

@ -1,5 +1,7 @@
#pragma once
#include <functional>
#include <cstring> // memset()
#include <unistd.h> // usleep()
#include "os.h" // get_timeofday_us()
#include "log.h"

View File

@ -28,8 +28,12 @@ struct Module: public interface::Module
} else {
std::cerr<<"Listening at "<<address<<":"<<port<<std::endl;
}
}
m_server->add_socket(m_socket);
void init()
{
std::cout<<"network init"<<std::endl;
//m_server->add_socket(m_socket);
}
~Module()

View File

@ -6,10 +6,15 @@ Event::Event(const ss_ &name, const sp_<Private> &p):
type(getGlobalEventRegistry()->type(name)), p(p)
{}
Event::Type Event::t(const ss_ &name)
{
return getGlobalEventRegistry()->type(name);
}
struct CEventRegistry: public EventRegistry
{
sm_<ss_, Event::Type> m_types;
Event::Type m_next_type = 0;
Event::Type m_next_type = 1;
Event::Type type(const ss_ &name)
{

View File

@ -10,12 +10,14 @@ namespace interface
virtual ~Private(){}
};
Type type;
sp_<Private> p;
sp_<Private> p; // TODO: up_<>
Event(): type(0){}
Event(const Type &type): type(type){}
Event(const Type &type, const sp_<Private> &p): type(type), p(p){}
Event(const ss_ &name, const sp_<Private> &p = NULL);
static Type t(const ss_ &name); // Shorthand function
};
struct EventRegistry

View File

@ -10,13 +10,8 @@ namespace interface
struct Module
{
virtual ~Module(){};
virtual void init() = 0;
// Never call directly; this is not thread-safe
virtual void event(const interface::Event &event) = 0;
};
struct SafeModule
{
virtual ~ModuleInterface(){};
virtual void event(const interface::Event &event) = 0;
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "core/types.h"
#include "interface/event.h"
namespace interface
{
@ -11,6 +12,8 @@ namespace interface
virtual void load_module(const ss_ &module_name, const ss_ &path) = 0;
virtual ss_ get_modules_path() = 0;
virtual ss_ get_builtin_modules_path() = 0;
virtual SafeModule* get_module(const ss_ &module_name) = 0;
virtual bool has_module(const ss_ &module_name) = 0;
virtual void sub_event(struct Module *module, const Event::Type &type) = 0;
virtual void emit_event(const Event &event) = 0;
};
}

View File

@ -1,14 +1,46 @@
#include "core/types.h"
#include "server/config.h"
#include "server/state.h"
#include <iostream>
#include <c55/getopt.h>
#include <c55/interval_loop.h>
#include <iostream>
#include <unistd.h>
#include <signal.h>
server::Config g_server_config;
bool g_sigint_received = false;
void sigint_handler(int sig)
{
if(!g_sigint_received){
fprintf(stdout, "\n"); // Newline after "^C"
log_i("process", "SIGINT");
g_sigint_received = true;
} else{
(void)signal(SIGINT, SIG_DFL);
}
}
void signal_handler_init()
{
(void)signal(SIGINT, sigint_handler);
}
void basic_init()
{
signal_handler_init();
// Force '.' as decimal point
std::locale::global(std::locale(std::locale(""), "C", std::locale::numeric));
setlocale(LC_NUMERIC, "C");
log_set_max_level(LOG_VERBOSE);
}
int main(int argc, char *argv[])
{
basic_init();
server::Config &config = g_server_config;
std::string module_path;
@ -64,11 +96,12 @@ int main(int argc, char *argv[])
up_<server::State> state(server::createState());
state->load_modules(module_path);
/*// Main loop
// Main loop
uint64_t master_t_per_tick = 100000L; // 10Hz
interval_loop(master_t_per_tick, [&](float load_avg){
state->handle_events();
return !g_sigint_received;
});*/
});
return 0;
}

View File

@ -4,9 +4,11 @@
#include "interface/module.h"
#include "interface/server.h"
#include "interface/event.h"
#include "interface/thread.h"
//#include "interface/thread.h"
#include "interface/mutex.h"
#include <c55/log.h>
#include <iostream>
#include <algorithm>
extern server::Config g_server_config;
@ -18,7 +20,7 @@ struct CState: public State, public interface::Server
interface::Mutex mutex;
interface::Module *module;
ModuleWithMutex(interface::Module *module): module(module){}
ModuleWithMutex(interface::Module *module=NULL): module(module){}
};
up_<rccpp::Compiler> m_compiler;
@ -27,6 +29,12 @@ struct CState: public State, public interface::Server
sm_<ss_, ModuleWithMutex> m_modules;
interface::Mutex m_modules_mutex;
sv_<interface::Event> m_event_queue;
interface::Mutex m_event_queue_mutex;
sv_<sv_<ModuleWithMutex*>> m_event_subs;
interface::Mutex m_event_subs_mutex;
CState():
m_compiler(rccpp::createCompiler())
{
@ -62,11 +70,11 @@ struct CState: public State, public interface::Server
m_compiler->construct(module_name.c_str(), this));
m_modules[module_name] = ModuleWithMutex(m);
send_event_u(m,
m->event(interface::Event("core:load_modules"));
m->event(interface::Event("core:start"));
{
ModuleWithMutex &mwm = m_modules[module_name];
interface::MutexScope ms2(mwm.mutex);
mwm.module->init();
}
}
void load_modules(const ss_ &path)
@ -74,6 +82,10 @@ struct CState: public State, public interface::Server
m_modules_path = path;
ss_ first_module_path = path+"/__loader";
load_module("__loader", first_module_path);
log_v("state", "asd");
emit_event(interface::Event("core:load_modules"));
emit_event(interface::Event("core:start"));
log_v("state", "asd2");
}
ss_ get_modules_path()
@ -86,7 +98,7 @@ struct CState: public State, public interface::Server
return g_server_config.share_path+"/builtin";
}
interface::Module* get_module(const ss_ &module_name)
/*interface::Module* get_module_u(const ss_ &module_name)
{
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(module_name);
@ -95,28 +107,85 @@ struct CState: public State, public interface::Server
return it->second;
}
interface::Module* check_module(const ss_ &module_name)
interface::Module* check_module_u(const ss_ &module_name)
{
interface::Module *m = get_module(module_name);
if(m) return m;
throw ModuleNotFoundException(ss_()+"Module not found: "+module_name);
}
}*/
interface::SafeModule interface::SafeModule &get_module( get_module(const ss_ &module_name)
{
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(module_name);
if(it == m_modules.end())
return NULL;
return it->second;
}
bool module_available(const ss_ &module_name)
bool has_module(const ss_ &module_name)
{
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(module_name);
return (it != m_modules.end());
}
void sub_event(struct interface::Module *module,
const interface::Event::Type &type)
{
// Lock modules so that the subscribing one isn't removed asynchronously
interface::MutexScope ms(m_modules_mutex);
// Make sure module is a known instance
ModuleWithMutex *mwm0 = NULL;
ss_ module_name = "(unknown)";
for(auto &pair : m_modules){
ModuleWithMutex &mwm = pair.second;
if(mwm.module == module){
mwm0 = &mwm;
module_name = pair.first;
break;
}
}
if(mwm0 == nullptr){
std::cerr<<"sub_event(): Not a known module"<<std::endl;
return;
}
interface::MutexScope ms2(m_event_subs_mutex);
if(m_event_subs.size() <= type)
m_event_subs.resize(type+1);
sv_<ModuleWithMutex*> &sublist = m_event_subs[type];
if(std::find(sublist.begin(), sublist.end(), mwm0) != sublist.end()){
std::cerr<<"sub_event(): Already on list: "<<module_name<<std::endl;
return;
}
std::cerr<<"sub_event(): "<<module_name<<" subscribed to "<<type<<std::endl;
sublist.push_back(mwm0);
}
void emit_event(const interface::Event &event)
{
log_v("state", "emit_event(): type=%zu", event.type);
interface::MutexScope ms(m_event_queue_mutex);
m_event_queue.push_back(event);
}
void handle_events()
{
log_d("state", "handle_events()");
interface::MutexScope ms(m_modules_mutex);
for(;;){
sv_<interface::Event> event_queue_snapshot;
sv_<sv_<ModuleWithMutex*>> event_subs_snapshot;
{
interface::MutexScope ms2(m_event_queue_mutex);
interface::MutexScope ms3(m_event_subs_mutex);
m_event_queue.swap(event_queue_snapshot);
m_event_subs.swap(event_subs_snapshot);
}
if(event_queue_snapshot.empty()){
break;
}
for(const interface::Event &event : event_queue_snapshot){
if(event_subs_snapshot.size() <= event.type)
continue;
sv_<ModuleWithMutex*> &sublist = event_subs_snapshot[event.type];
for(ModuleWithMutex *mwm : sublist){
mwm->module->event(event);
}
}
}
}
};
State* createState()

View File

@ -1,5 +1,6 @@
#pragma once
#include "core/types.h"
#include "interface/event.h"
namespace interface
{
@ -18,9 +19,12 @@ namespace server
virtual ~State(){}
virtual void load_module(const ss_ &module_name, const ss_ &path) = 0;
virtual void load_modules(const ss_ &path) = 0;
virtual interface::Module* get_module_u(const ss_ &module_name) = 0;
virtual interface::Module* check_module_u(const ss_ &module_name) = 0;
virtual void tick(float dtime) = 0;
//virtual interface::Module* get_module_u(const ss_ &module_name) = 0;
//virtual interface::Module* check_module_u(const ss_ &module_name) = 0;
virtual void sub_event(struct interface::Module *module,
const interface::Event::Type &type) = 0;
virtual void emit_event(const interface::Event &event) = 0;
virtual void handle_events() = 0;
};
State* createState();

View File

@ -15,12 +15,17 @@ struct Module: public interface::Module
Module(interface::Server *server):
m_server(server),
m_EventType_core_load_modules(
interface::getGlobalEventRegistry()->type("core:load_modules"))
m_EventType_core_load_modules(interface::Event::t("core:load_modules"))
{
std::cout<<"__loader construct"<<std::endl;
}
void init()
{
std::cout<<"__loader init"<<std::endl;
m_server->sub_event(this, m_EventType_core_load_modules);
}
~Module()
{
std::cout<<"__loader destruct"<<std::endl;

View File

@ -15,12 +15,17 @@ struct Module: public interface::Module
Module(interface::Server *server):
m_server(server),
m_EventType_test1_thing(
interface::getGlobalEventRegistry()->type("test1:thing"))
m_EventType_test1_thing(interface::Event::t("test1:thing"))
{
std::cout<<"test1 construct"<<std::endl;
}
void init()
{
std::cout<<"test1 init"<<std::endl;
m_server->sub_event(this, m_EventType_test1_thing);
}
~Module()
{
std::cout<<"test1 destruct"<<std::endl;

View File

@ -15,11 +15,17 @@ struct Module: public interface::Module
Module(interface::Server *server):
m_server(server),
m_EventType_core_start(interface::getGlobalEventRegistry()->type("core:start"))
m_EventType_core_start(interface::Event::t("core:start"))
{
std::cout<<"test2 construct"<<std::endl;
}
void init()
{
std::cout<<"test2 init"<<std::endl;
m_server->sub_event(this, m_EventType_core_start);
}
~Module()
{
std::cout<<"test2 destruct"<<std::endl;
@ -34,10 +40,9 @@ struct Module: public interface::Module
void start()
{
interface::Module *m = m_server->check_module("test1");
interface::Event event("test1:thing");
event.p.reset(new test1::Thing("Nakki"));
m->event(event);
m_server->emit_event(event);
}
};