server/state: Fix file watching and module reload

This commit is contained in:
Perttu Ahola 2014-10-28 15:34:50 +02:00
parent 17faa14416
commit e630a27fec
3 changed files with 103 additions and 25 deletions

View File

@ -334,6 +334,9 @@ void FileWatchThread::run(interface::Thread *thread)
bool ok = handler.check(500000, sockets, active_sockets); bool ok = handler.check(500000, sockets, active_sockets);
(void)ok; // Unused (void)ok; // Unused
if(active_sockets.empty())
continue;
client_file::access(m_module->m_server, client_file::access(m_module->m_server,
[&](client_file::Interface *iclient_file) [&](client_file::Interface *iclient_file)
{ {

View File

@ -329,6 +329,9 @@ void NetworkThread::run(interface::Thread *thread)
bool ok = handler.check(500000, sockets, active_sockets); bool ok = handler.check(500000, sockets, active_sockets);
(void)ok; // Unused (void)ok; // Unused
if(active_sockets.empty())
continue;
network::access(m_module->m_server, [&](network::Interface *inetwork){ network::access(m_module->m_server, [&](network::Interface *inetwork){
for(int fd: active_sockets){ for(int fd: active_sockets){
m_module->handle_active_socket(fd); m_module->handle_active_socket(fd);

View File

@ -17,6 +17,7 @@
#include "interface/thread.h" #include "interface/thread.h"
#include "interface/semaphore.h" #include "interface/semaphore.h"
#include "interface/debug.h" #include "interface/debug.h"
#include "interface/select_handler.h"
#include <iostream> #include <iostream>
#include <algorithm> #include <algorithm>
#include <fstream> #include <fstream>
@ -239,13 +240,21 @@ void ModuleThread::run(interface::Thread *thread)
module_moved.reset(); module_moved.reset();
} }
struct CState;
struct FileWatchThread: public interface::ThreadedThing
{
CState *m_server;
FileWatchThread(CState *server):
m_server(server)
{}
void run(interface::Thread *thread);
};
struct CState: public State, public interface::Server struct CState: public State, public interface::Server
{ {
struct SocketState {
int fd = 0;
Event::Type event_type;
};
bool m_shutdown_requested = false; bool m_shutdown_requested = false;
int m_shutdown_exit_status = 0; int m_shutdown_exit_status = 0;
ss_ m_shutdown_reason; ss_ m_shutdown_reason;
@ -283,12 +292,20 @@ struct CState: public State, public interface::Server
sp_<interface::thread_pool::ThreadPool> m_thread_pool; sp_<interface::thread_pool::ThreadPool> m_thread_pool;
interface::Mutex m_thread_pool_mutex; interface::Mutex m_thread_pool_mutex;
// Must come after the members this will access, which are m_modules_mutex
// and m_module_file_watches.
sp_<interface::Thread> m_file_watch_thread;
CState(): CState():
m_compiler(rccpp::createCompiler(g_server_config.compiler_command)), m_compiler(rccpp::createCompiler(g_server_config.compiler_command)),
m_thread_pool(interface::thread_pool::createThreadPool()) m_thread_pool(interface::thread_pool::createThreadPool())
{ {
m_thread_pool->start(4); // TODO: Configurable m_thread_pool->start(4); // TODO: Configurable
m_file_watch_thread.reset(interface::createThread(
new FileWatchThread(this)));
m_file_watch_thread->start();
// Set basic RCC++ include directories // Set basic RCC++ include directories
// We don't want to directly add the interface path as it contains // We don't want to directly add the interface path as it contains
@ -627,15 +644,20 @@ struct CState: public State, public interface::Server
// Get and lock module // Get and lock module
auto it = m_modules.find(module_name); auto it = m_modules.find(module_name);
if(it == m_modules.end()){ if(it == m_modules.end()){
log_w(MODULE, "unload_module_u: Module not found: %s", cs(module_name)); log_w(MODULE, "unload_module_u(): Module not found: %s",
cs(module_name));
return; return;
} }
sp_<ModuleContainer> mc = it->second; sp_<ModuleContainer> mc = it->second;
{ {
interface::MutexScope mc_ms(mc->mutex); interface::MutexScope mc_ms(mc->mutex);
// Send core::unload directly to module // Send core::unload directly to module
log_t(MODULE, "unload_module_u[%s]: Directly sending core:unload",
cs(module_name));
mc->module->event(Event::t("core:unload"), nullptr); mc->module->event(Event::t("core:unload"), nullptr);
// Delete subscriptions // Delete subscriptions
log_t(MODULE, "unload_module_u[%s]: Deleting subscriptions",
cs(module_name));
{ {
for(Event::Type type = 0; type < m_event_subs.size(); type++){ for(Event::Type type = 0; type < m_event_subs.size(); type++){
sv_<wp_<ModuleContainer>> &sublist = m_event_subs[type]; sv_<wp_<ModuleContainer>> &sublist = m_event_subs[type];
@ -651,11 +673,24 @@ struct CState: public State, public interface::Server
sublist = new_sublist; sublist = new_sublist;
} }
} }
} // Remove server-wide reference to module container
{
// Delete module and container
m_modules.erase(module_name); m_modules.erase(module_name);
} }
// Destruct module
log_t(MODULE, "unload_module_u[%s]: Deleting module", cs(module_name));
mc->stop_and_delete_module();
// So, hopefully this is the last reference because we're going to
// unload the shared executable...
if(!mc.unique())
log_w(MODULE, "unload_module_u[%s]: This is not the last container"
" reference; unloading shared executable is probably unsafe",
cs(module_name));
// Drop reference to container
log_t(MODULE, "unload_module_u[%s]: Dropping container", cs(module_name));
mc.reset();
// Unload shared executable
log_t(MODULE, "unload_module_u[%s]: Unloading shared executable",
cs(module_name));
m_compiler->unload(module_name); m_compiler->unload(module_name);
emit_event(Event("core:module_unloaded", emit_event(Event("core:module_unloaded",
@ -927,23 +962,29 @@ struct CState: public State, public interface::Server
void handle_unloads_and_reloads() void handle_unloads_and_reloads()
{ {
interface::MutexScope ms(m_modules_mutex); sv_<interface::ModuleInfo> reloads_requested;
// Unload according to unload requests {
for(auto it = m_unloads_requested.begin(); // Unload using unload_module_u with m_modules_mutex locked
it != m_unloads_requested.end();){ interface::MutexScope ms(m_modules_mutex);
ss_ module_name = *it; // Copy // Unload according to unload requests
it++; for(auto it = m_unloads_requested.begin();
log_i("state", "Unloading %s (unload requested)", cs(module_name)); it != m_unloads_requested.end();){
m_unloads_requested.erase(module_name); ss_ module_name = *it; // Copy
unload_module_u(module_name); it++;
} log_i("state", "Unloading %s (unload requested)", cs(module_name));
// Unload according to reload requests m_unloads_requested.erase(module_name);
for(const interface::ModuleInfo &info : m_reloads_requested){ unload_module_u(module_name);
log_i("state", "Unloading %s (reload requested)", cs(info.name)); }
unload_module_u(info.name); // Unload according to reload requests
for(const interface::ModuleInfo &info : m_reloads_requested){
log_i("state", "Unloading %s (reload requested)", cs(info.name));
unload_module_u(info.name);
}
// Grab reload requests out from this mutex scope
reloads_requested.swap(m_reloads_requested);
} }
// Load according to reload requests // Load according to reload requests
for(const interface::ModuleInfo &info : m_reloads_requested){ for(const interface::ModuleInfo &info : reloads_requested){
log_i("state", "Loading %s (reload requested)", cs(info.name)); log_i("state", "Loading %s (reload requested)", cs(info.name));
load_module(info); load_module(info);
// Send core::continue directly to module // Send core::continue directly to module
@ -965,7 +1006,6 @@ struct CState: public State, public interface::Server
mc->module->event(Event::t("core:continue"), nullptr); mc->module->event(Event::t("core:continue"), nullptr);
} }
} }
m_reloads_requested.clear();
} }
void tmp_store_data(const ss_ &name, const ss_ &data) void tmp_store_data(const ss_ &name, const ss_ &data)
@ -1013,6 +1053,38 @@ struct CState: public State, public interface::Server
} }
}; };
void FileWatchThread::run(interface::Thread *thread)
{
interface::SelectHandler handler;
while(!thread->stop_requested()){
sv_<int> sockets;
{
interface::MutexScope ms(m_server->m_modules_mutex);
for(auto &pair : m_server->m_module_file_watches){
sv_<int> fds = pair.second->get_fds();
sockets.insert(sockets.begin(), fds.begin(), fds.end());
}
}
sv_<int> active_sockets;
bool ok = handler.check(500000, sockets, active_sockets);
(void)ok; // Unused
if(active_sockets.empty())
continue;
{
interface::MutexScope ms(m_server->m_modules_mutex);
for(auto &pair : m_server->m_module_file_watches){
for(int fd: active_sockets){
pair.second->report_fd(fd);
}
}
}
}
}
State* createState() State* createState()
{ {
return new CState(); return new CState();