server: Run modules in separate threads

This commit is contained in:
Perttu Ahola 2014-10-26 15:09:58 +02:00
parent 89add42185
commit 4937a83ac5
21 changed files with 995 additions and 522 deletions

View File

@ -130,6 +130,7 @@ set(BUILDAT_CORE_SRCS
src/impl/compress.cpp
src/impl/thread_pool.cpp
src/impl/os.cpp
src/impl/thread.cpp
)
if(WIN32)
set(BUILDAT_CORE_SRCS ${BUILDAT_CORE_SRCS} src/impl/windows/file_watch.cpp)

View File

@ -7,6 +7,8 @@
#include "interface/sha1.h"
#include "interface/file_watch.h"
#include "interface/fs.h"
#include "interface/thread.h"
#include "interface/select_handler.h"
#include "client_file/api.h"
#include "network/api.h"
#include <cereal/archives/portable_binary.hpp>
@ -29,11 +31,27 @@ struct FileInfo {
namespace client_file {
struct Module;
struct FileWatchThread: public interface::ThreadedThing
{
static constexpr const char *MODULE = "client_file";
Module *m_module = nullptr;
FileWatchThread(Module *module):
m_module(module)
{}
void run(interface::Thread *thread);
};
struct Module: public interface::Module, public client_file::Interface
{
interface::Server *m_server;
sm_<ss_, sp_<FileInfo>> m_files;
sp_<interface::FileWatch> m_watch;
sp_<interface::Thread> m_thread;
Module(interface::Server *server):
interface::Module("client_file"),
@ -41,13 +59,14 @@ struct Module: public interface::Module, public client_file::Interface
m_watch(interface::createFileWatch())
{
log_d(MODULE, "client_file construct");
m_thread.reset(interface::createThread(new FileWatchThread(this)));
m_thread->start();
}
~Module()
{
log_d(MODULE, "client_file destruct");
for(int fd : m_watch->get_fds())
m_server->remove_socket_event(fd);
}
void init()
@ -61,10 +80,6 @@ struct Module: public interface::Module, public client_file::Interface
Event::t("network:packet_received/core:request_file"));
m_server->sub_event(this,
Event::t("network:packet_received/core:all_files_transferred"));
m_server->sub_event(this, Event::t("watch_file:watch_fd_event"));
for(int fd : m_watch->get_fds())
m_server->add_socket_event(fd, Event::t("watch_file:watch_fd_event"));
}
void event(const Event::Type &type, const Event::Private *p)
@ -78,8 +93,6 @@ struct Module: public interface::Module, public client_file::Interface
network::Packet)
EVENT_TYPEN("network:packet_received/core:all_files_transferred",
on_all_files_transferred, network::Packet)
EVENT_TYPEN("watch_file:watch_fd_event", on_watch_fd_event,
interface::SocketEvent);
}
void on_start()
@ -203,10 +216,17 @@ struct Module: public interface::Module, public client_file::Interface
new FilesTransmitted(packet.sender));
}
void on_watch_fd_event(const interface::SocketEvent &event)
// Interface for FileWatchThread
sv_<int> get_sockets()
{
log_d(MODULE, "on_watch_fd_event()");
m_watch->report_fd(event.fd);
return m_watch->get_fds();
}
void handle_active_socket(int fd)
{
log_d(MODULE, "handle_active_socket(): fd=%i", fd);
m_watch->report_fd(fd);
}
// Interface
@ -297,6 +317,34 @@ struct Module: public interface::Module, public client_file::Interface
}
};
void FileWatchThread::run(interface::Thread *thread)
{
interface::SelectHandler handler;
while(!thread->stop_requested()){
sv_<int> sockets;
// We can avoid implementing our own mutex locking in Module by using
// interface::Server::access_module() instead of directly accessing it.
client_file::access(m_module->m_server,
[&](client_file::Interface *iclient_file)
{
sockets = m_module->get_sockets();
});
sv_<int> active_sockets;
bool ok = handler.check(500000, sockets, active_sockets);
(void)ok; // Unused
client_file::access(m_module->m_server,
[&](client_file::Interface *iclient_file)
{
for(int fd : active_sockets){
m_module->handle_active_socket(fd);
}
});
}
}
extern "C" {
BUILDAT_EXPORT void* createModule_client_file(interface::Server *server){
return (void*)(new Module(server));

View File

@ -0,0 +1,45 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#pragma once
#include "interface/event.h"
#include "interface/server.h"
#include "interface/module.h"
#include <functional>
namespace Urho3D
{
class Context;
class Scene;
class StringHash;
}
namespace main_context
{
namespace magic = Urho3D;
using interface::Event;
struct Interface
{
// NOTE: Do not store Urho3D::SharedPtr<>s or any other kinds of pointers
// to Urho3D objects because accessing and deleting them is not
// thread-safe.
virtual magic::Context* get_context() = 0;
virtual magic::Scene* get_scene() = 0;
virtual void sub_magic_event(
const magic::StringHash &event_type,
const Event::Type &buildat_event_type) = 0;
};
inline bool access(interface::Server *server,
std::function<void(main_context::Interface*)> cb)
{
return server->access_module("main_context", [&](interface::Module *module){
cb((main_context::Interface*)module->check_interface());
});
}
}
// vim: set noet ts=4 sw=4:

View File

@ -0,0 +1,242 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#include "main_context/api.h"
#include "network/api.h"
#include "core/log.h"
#include "interface/module.h"
#include "interface/server.h"
#include "interface/server_config.h"
#include "interface/event.h"
#include "interface/magic_event_handler.h"
#include "interface/os.h"
#include "interface/fs.h"
#include <Variant.h>
#include <Context.h>
#include <Engine.h>
#include <Scene.h>
#include <SceneEvents.h>
#include <Component.h>
#include <ReplicationState.h>
#include <PhysicsWorld.h>
#include <ResourceCache.h>
#include <Octree.h>
#include <Profiler.h>
#include <Log.h>
#include <IOEvents.h> // E_LOGMESSAGE
#include <Thread.h>
#include <climits>
namespace main_context {
using interface::Event;
class BuildatResourceRouter: public magic::ResourceRouter
{
static constexpr const char *MODULE = "main_context";
OBJECT(BuildatResourceRouter);
interface::Server *m_server;
public:
BuildatResourceRouter(magic::Context *context, interface::Server *server):
magic::ResourceRouter(context),
m_server(server)
{}
void Route(magic::String &name, magic::ResourceRequest requestType)
{
ss_ path = m_server->get_file_path(name.CString());
if(path == ""){
log_v(MODULE, "Resource route access: %s (assuming local file)",
name.CString());
return;
}
log_v(MODULE, "Resource route access: %s -> %s",
name.CString(), cs(path));
name = path.c_str();
}
};
struct Module: public interface::Module, public main_context::Interface
{
interface::Server *m_server;
magic::SharedPtr<magic::Context> m_context;
magic::SharedPtr<magic::Engine> m_engine;
magic::SharedPtr<magic::Scene> m_scene;
sm_<Event::Type, magic::SharedPtr<interface::MagicEventHandler>>
m_magic_event_handlers;
uint64_t profiler_last_print_us = 0;
Module(interface::Server *server):
interface::Module("main_context"),
m_server(server),
profiler_last_print_us(interface::os::get_timeofday_us())
{
log_d(MODULE, "main_context construct");
}
~Module()
{
log_d(MODULE, "main_context destruct");
}
void init()
{
log_d(MODULE, "main_context init");
m_server->sub_event(this, Event::t("core:start"));
m_server->sub_event(this, Event::t("core:unload"));
m_server->sub_event(this, Event::t("core:continue"));
m_server->sub_event(this, Event::t("core:tick"));
}
void event(const Event::Type &type, const Event::Private *p)
{
EVENT_VOIDN("core:start", on_start)
EVENT_VOIDN("core:unload", on_unload)
EVENT_VOIDN("core:continue", on_continue)
EVENT_TYPEN("core:tick", on_tick, interface::TickEvent)
EVENT_TYPEN("urho3d_log_redirect:message", on_message,
interface::MagicEvent)
}
void init_in_module_thread()
{
// Urho3D wants to know which is the main thread of the context. Module
// constructor and init() are called from a different thread than
// regular event(), so this is required.
// Initialize Urho3D
m_context = new magic::Context();
m_engine = new magic::Engine(m_context);
// Disable timestamps in Urho3D log message events
magic::Log *magic_log = m_context->GetSubsystem<magic::Log>();
magic_log->SetTimeStamp(false);
const interface::ServerConfig &server_config = m_server->get_config();
sv_<ss_> resource_paths = {
server_config.urho3d_path+"/Bin/CoreData",
server_config.urho3d_path+"/Bin/Data",
};
auto *fs = interface::getGlobalFilesystem();
ss_ resource_paths_s;
for(const ss_ &path : resource_paths){
if(!resource_paths_s.empty())
resource_paths_s += ";";
resource_paths_s += fs->get_absolute_path(path);
}
magic::VariantMap params;
params["ResourcePaths"] = resource_paths_s.c_str();
params["Headless"] = true;
params["LogName"] = ""; // Don't log to file
params["LogQuiet"] = true; // Don't log to stdout
if(!m_engine->Initialize(params))
throw Exception("Urho3D engine initialization failed");
m_scene = new magic::Scene(m_context);
auto *physics = m_scene->CreateComponent<magic::PhysicsWorld>(
magic::LOCAL);
physics->SetFps(30);
physics->SetInterpolation(false);
// Useless but gets rid of warnings like
// "ERROR: No Octree component in scene, drawable will not render"
m_scene->CreateComponent<magic::Octree>(magic::LOCAL);
magic::ResourceCache *magic_cache =
m_context->GetSubsystem<magic::ResourceCache>();
//magic_cache->SetAutoReloadResources(true);
magic_cache->SetResourceRouter(
new BuildatResourceRouter(m_context, m_server));
sub_magic_event(magic::E_LOGMESSAGE,
Event::t("urho3d_log_redirect:message"));
m_server->sub_event(this, Event::t("urho3d_log_redirect:message"));
}
void on_start()
{
log_v(MODULE, "main_context start");
init_in_module_thread();
}
void on_unload()
{
log_v(MODULE, "main_context unload");
}
void on_continue()
{
log_v(MODULE, "main_context continue");
init_in_module_thread();
}
void on_tick(const interface::TickEvent &event)
{
m_engine->SetNextTimeStep(event.dtime);
m_engine->RunFrame();
uint64_t current_us = interface::os::get_timeofday_us();
magic::Profiler *p = m_context->GetSubsystem<magic::Profiler>();
if(p && profiler_last_print_us < current_us - 10000000){
profiler_last_print_us = current_us;
magic::String s = p->GetData(false, false, UINT_MAX);
p->BeginInterval();
log_v(MODULE, "Urho3D profiler:\n%s", s.CString());
}
}
void on_message(const interface::MagicEvent &event)
{
int magic_level = event.magic_data.Find("Level")->second_.GetInt();
ss_ message = event.magic_data.Find("Message")->second_.
GetString().CString();
int core_level = LOG_ERROR;
if(magic_level == magic::LOG_DEBUG)
core_level = LOG_DEBUG;
else if(magic_level == magic::LOG_INFO)
core_level = LOG_VERBOSE;
else if(magic_level == magic::LOG_WARNING)
core_level = LOG_WARNING;
else if(magic_level == magic::LOG_ERROR)
core_level = LOG_ERROR;
log_(core_level, MODULE, "Urho3D %s", cs(message));
}
// Interface
magic::Context* get_context()
{
return m_context;
}
magic::Scene* get_scene()
{
return m_scene;
}
void sub_magic_event(
const magic::StringHash &event_type,
const Event::Type &buildat_event_type)
{
m_magic_event_handlers[buildat_event_type] =
new interface::MagicEventHandler(
m_context, m_server, event_type, buildat_event_type);
}
void* get_interface()
{
return dynamic_cast<Interface*>(this);
}
};
extern "C" {
BUILDAT_EXPORT void* createModule_main_context(interface::Server *server){
return (void*)(new Module(server));
}
}
}
// vim: set noet ts=4 sw=4:

View File

@ -0,0 +1,5 @@
{
"dependencies": [
]
}

View File

@ -7,21 +7,39 @@
#include "interface/event.h"
#include "interface/tcpsocket.h"
#include "interface/packet_stream.h"
#include "interface/thread.h"
#include "interface/select_handler.h"
#include <cereal/archives/portable_binary.hpp>
#include <cereal/types/vector.hpp>
#include <cereal/types/tuple.hpp>
#ifdef _WIN32
#include "ports/windows_sockets.h"
#include "ports/windows_compat.h" // usleep()
#else
#include <sys/socket.h>
#include <unistd.h> // usleep()
#endif
#include <deque>
#include <cstring> // strerror()
using interface::Event;
namespace network {
struct Module;
struct NetworkThread: public interface::ThreadedThing
{
static constexpr const char *MODULE = "network";
Module *m_module = nullptr;
NetworkThread(Module *module):
m_module(module)
{}
void run(interface::Thread *thread);
};
struct Peer
{
typedef size_t Id;
@ -44,6 +62,7 @@ struct Module: public interface::Module, public network::Interface
sm_<int, Peer*> m_peers_by_socket;
size_t m_next_peer_id = 1;
bool m_will_restore_after_unload = false;
sp_<interface::Thread> m_thread;
Module(interface::Server *server):
interface::Module("network"),
@ -51,22 +70,24 @@ struct Module: public interface::Module, public network::Interface
m_listening_socket(interface::createTCPSocket())
{
log_d(MODULE, "network construct");
m_thread.reset(interface::createThread(new NetworkThread(this)));
m_thread->start();
}
~Module()
{
log_d(MODULE, "network destruct");
if(m_listening_socket->good()){
m_server->remove_socket_event(m_listening_socket->fd());
if(m_will_restore_after_unload)
if(m_will_restore_after_unload){
if(m_listening_socket->good()){
m_listening_socket->release_fd();
}
for(auto pair : m_peers){
const Peer &peer = pair.second;
if(peer.socket->good()){
m_server->remove_socket_event(peer.socket->fd());
if(m_will_restore_after_unload)
}
for(auto pair : m_peers){
const Peer &peer = pair.second;
if(peer.socket->good()){
peer.socket->release_fd();
}
}
}
}
@ -77,8 +98,6 @@ struct Module: public interface::Module, public network::Interface
m_server->sub_event(this, Event::t("core:start"));
m_server->sub_event(this, Event::t("core:unload"));
m_server->sub_event(this, Event::t("core:continue"));
m_server->sub_event(this, Event::t("network:listen_event"));
m_server->sub_event(this, Event::t("network:incoming_data"));
}
void event(const Event::Type &type, const Event::Private *p)
@ -86,9 +105,6 @@ struct Module: public interface::Module, public network::Interface
EVENT_VOIDN("core:start", on_start)
EVENT_VOIDN("core:unload", on_unload)
EVENT_VOIDN("core:continue", on_continue)
EVENT_TYPEN("network:listen_event", on_listen_event, interface::SocketEvent)
EVENT_TYPEN("network:incoming_data", on_incoming_data,
interface::SocketEvent)
}
void on_start()
@ -109,9 +125,6 @@ struct Module: public interface::Module, public network::Interface
log_i(MODULE, "Listening at %s:%s, fd=%i", cs(address), cs(port),
m_listening_socket->fd());
}
m_server->add_socket_event(m_listening_socket->fd(),
Event::t("network:listen_event"));
}
void on_unload()
@ -159,14 +172,12 @@ struct Module: public interface::Module, public network::Interface
sp_<interface::TCPSocket> socket(interface::createTCPSocket(fd));
m_peers[peer_id] = Peer(peer_id, socket);
m_peers_by_socket[socket->fd()] = &m_peers[peer_id];
m_server->add_socket_event(socket->fd(),
Event::t("network:incoming_data"));
}
}
void on_listen_event(const interface::SocketEvent &event)
void on_listen_event(int event_fd)
{
log_v(MODULE, "network: on_listen_event(): fd=%i", event.fd);
log_v(MODULE, "network: on_listen_event(): fd=%i", event_fd);
// Create socket
sp_<interface::TCPSocket> socket(interface::createTCPSocket());
// Accept connection
@ -182,23 +193,21 @@ struct Module: public interface::Module, public network::Interface
pinfo.id = peer_id;
pinfo.address = socket->get_remote_address();
m_server->emit_event("network:client_connected", new NewClient(pinfo));
m_server->add_socket_event(socket->fd(),
Event::t("network:incoming_data"));
}
void on_incoming_data(const interface::SocketEvent &event)
void on_incoming_data(int event_fd)
{
log_v(MODULE, "network: on_incoming_data(): fd=%i", event.fd);
log_v(MODULE, "network: on_incoming_data(): fd=%i", event_fd);
auto it = m_peers_by_socket.find(event.fd);
auto it = m_peers_by_socket.find(event_fd);
if(it == m_peers_by_socket.end()){
log_w(MODULE, "network: Peer with fd=%i not found", event.fd);
log_w(MODULE, "network: Peer with fd=%i not found", event_fd);
return;
}
Peer &peer = *it->second;
int fd = peer.socket->fd();
if(fd != event.fd)
if(fd != event_fd)
throw Exception("on_incoming_data: fds don't match");
char buf[100000];
ssize_t r = recv(fd, buf, 100000, 0);
@ -219,7 +228,6 @@ struct Module: public interface::Module, public network::Interface
m_server->emit_event("network:client_disconnected",
new OldClient(pinfo));
m_server->remove_socket_event(peer.socket->fd());
m_peers_by_socket.erase(peer.socket->fd());
m_peers.erase(peer.id);
return;
@ -260,6 +268,28 @@ struct Module: public interface::Module, public network::Interface
send_u(peer, name, data);
}
// Interface for NetworkThread
sv_<int> get_sockets()
{
sv_<int> result;
result.push_back(m_listening_socket->fd());
for(auto &pair : m_peers){
Peer &peer = pair.second;
result.push_back(peer.socket->fd());
}
return result;
}
void handle_active_socket(int fd)
{
if(fd == m_listening_socket->fd()){
on_listen_event(fd);
} else {
on_incoming_data(fd);
}
}
// Interface
void send(PeerInfo::Id recipient, const ss_ &name, const ss_ &data)
@ -284,6 +314,30 @@ struct Module: public interface::Module, public network::Interface
}
};
void NetworkThread::run(interface::Thread *thread)
{
interface::SelectHandler handler;
while(!thread->stop_requested()){
sv_<int> sockets;
// We can avoid implementing our own mutex locking in Module by using
// interface::Server::access_module() instead of directly accessing it.
network::access(m_module->m_server, [&](network::Interface *inetwork){
sockets = m_module->get_sockets();
});
sv_<int> active_sockets;
bool ok = handler.check(500000, sockets, active_sockets);
(void)ok; // Unused
network::access(m_module->m_server, [&](network::Interface *inetwork){
for(int fd : active_sockets){
m_module->handle_active_socket(fd);
}
});
}
}
extern "C" {
BUILDAT_EXPORT void* createModule_network(interface::Server *server){
return (void*)(new Module(server));

View File

@ -1,5 +1,6 @@
{
"dependencies": [
{"module": "main_context"},
{"module": "network"}
]
}

View File

@ -1,8 +1,9 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#include "replicate/api.h"
#include "core/log.h"
#include "main_context/api.h"
#include "network/api.h"
#include "core/log.h"
#include "interface/module.h"
#include "interface/server.h"
#include "interface/event.h"
@ -69,7 +70,8 @@ struct Module: public interface::Module, public replicate::Interface
~Module()
{
log_d(MODULE, "replicate destruct");
m_server->access_scene([&](magic::Scene *scene){
main_context::access(m_server, [&](main_context::Interface *imc){
magic::Scene *scene = imc->get_scene();
for(auto &pair: m_scene_states){
magic::SceneReplicationState &scene_state = pair.second;
scene->CleanupConnection((magic::Connection*)&scene_state);
@ -124,7 +126,8 @@ struct Module: public interface::Module, public replicate::Interface
if(it == m_scene_states.end())
return;
magic::SceneReplicationState &scene_state = it->second;
m_server->access_scene([&](magic::Scene *scene){
main_context::access(m_server, [&](main_context::Interface *imc){
magic::Scene *scene = imc->get_scene();
// NOTE: We use pointers to SceneReplicationStates as Connection
// pointers in other replication states in order to
// scene->CleanupConnection() without an actual Connection object
@ -152,7 +155,8 @@ struct Module: public interface::Module, public replicate::Interface
network::access(m_server, [&](network::Interface *inetwork){
peers = inetwork->list_peers();
});
m_server->access_scene([&](magic::Scene *scene){
main_context::access(m_server, [&](main_context::Interface *imc){
magic::Scene *scene = imc->get_scene();
// For a reference implementation of this kind of network
// synchronization, see Urho3D's Network/Connection.cpp
@ -485,7 +489,8 @@ struct Module: public interface::Module, public replicate::Interface
network::access(m_server, [&](network::Interface *inetwork){
peers = inetwork->list_peers();
});
m_server->access_scene([&](magic::Scene *scene){
main_context::access(m_server, [&](main_context::Interface *imc){
magic::Scene *scene = imc->get_scene();
Node *n = scene->GetNode(node_id);
n->PrepareNetworkUpdate();
for(auto &peer: peers){

View File

@ -4,6 +4,7 @@
#include "network/api.h"
#include "client_file/api.h"
#include "replicate/api.h"
#include "main_context/api.h"
#include "core/log.h"
#include "interface/module.h"
#include "interface/server.h"
@ -164,8 +165,10 @@ ChunkBuffer& Section::get_buffer(const pv::Vector3DInt32 &chunk_p,
}
log_t(MODULE, "Loading chunk " PV3I_FORMAT " (node %i)",
PV3I_PARAMS(chunk_p), node_id);
server->access_scene([&](Scene *scene)
main_context::access(server, [&](main_context::Interface *imc)
{
Scene *scene = imc->get_scene();
Node *n = scene->GetNode(node_id);
if(!n){
log_w("voxelworld",
@ -265,9 +268,8 @@ struct Module: public interface::Module, public voxelworld::Interface
m_server->sub_event(this, Event::t(
"network:packet_received/voxelworld:get_section"));
m_server->access_scene([&](Scene *scene)
{
Context *context = scene->GetContext();
main_context::access(m_server, [&](main_context::Interface *imc){
Context *context = imc->get_context();
m_atlas_reg.reset(interface::createAtlasRegistry(context));
});
@ -335,8 +337,8 @@ struct Module: public interface::Module, public voxelworld::Interface
commit();
// Remove everything managed by us from the scene
m_server->access_scene([&](Scene *scene)
{
main_context::access(m_server, [&](main_context::Interface *imc){
Scene *scene = imc->get_scene();
size_t progress = 0;
for(auto &sector_pair: m_sections){
log_v(MODULE, "Unloading nodes... %i%%",
@ -397,9 +399,9 @@ struct Module: public interface::Module, public voxelworld::Interface
void on_tick(const interface::TickEvent &event)
{
m_server->access_scene([&](Scene *scene)
{
Context *context = scene->GetContext();
main_context::access(m_server, [&](main_context::Interface *imc){
Scene *scene = imc->get_scene();
Context *context = imc->get_context();
// Update node collision boxes
if(!m_nodes_needing_physics_update.empty()){
@ -613,8 +615,8 @@ struct Module: public interface::Module, public voxelworld::Interface
void create_section(Section &section)
{
m_server->access_scene([&](Scene *scene)
{
main_context::access(m_server, [&](main_context::Interface *imc){
Scene *scene = imc->get_scene();
auto lc = section.contained_chunks.getLowerCorner();
auto uc = section.contained_chunks.getUpperCorner();
for(int z = 0; z <= uc.getZ() - lc.getZ(); z++){
@ -682,7 +684,7 @@ struct Module: public interface::Module, public voxelworld::Interface
}
void run_commit_hooks_in_scene(
const pv::Vector3DInt32 &chunk_p, magic::Node *n)
const pv::Vector3DInt32 &chunk_p, Node *n)
{
for(up_<CommitHook> &hook : m_commit_hooks)
hook->in_scene(this, chunk_p, n);
@ -823,8 +825,8 @@ struct Module: public interface::Module, public voxelworld::Interface
// TODO: Commit only the current chunk
commit();
m_server->access_scene([&](Scene *scene)
{
main_context::access(m_server, [&](main_context::Interface *imc){
Scene *scene = imc->get_scene();
Node *n = scene->GetNode(node_id);
const Variant &var = n->GetVar(StringHash("buildat_voxel_data"));
const PODVector<unsigned char> &buf = var.GetBuffer();
@ -945,8 +947,8 @@ struct Module: public interface::Module, public voxelworld::Interface
ss_ new_data = interface::serialize_volume_compressed(
*chunk_buffer.volume);
m_server->access_scene([&](Scene *scene)
{
main_context::access(m_server, [&](main_context::Interface *imc){
Scene *scene = imc->get_scene();
Context *context = scene->GetContext();
Node *n = scene->GetNode(node_id);

View File

@ -3,6 +3,7 @@
#include "network/api.h"
#include "replicate/api.h"
#include "voxelworld/api.h"
#include "main_context/api.h"
#include "interface/module.h"
#include "interface/server.h"
#include "interface/event.h"
@ -217,13 +218,14 @@ struct Module: public interface::Module
}
});
m_server->access_scene([&](Scene *scene)
voxelworld::access(m_server, [&](voxelworld::Interface *ivoxelworld)
{
Context *context = scene->GetContext();
ResourceCache *cache = context->GetSubsystem<ResourceCache>();
voxelworld::access(m_server, [&](voxelworld::Interface *ivoxelworld)
main_context::access(m_server, [&](main_context::Interface *imc)
{
Scene *scene = imc->get_scene();
Context *context = imc->get_context();
ResourceCache *cache = context->GetSubsystem<ResourceCache>();
interface::VoxelRegistry *voxel_reg =
ivoxelworld->get_voxel_reg();
@ -278,14 +280,18 @@ struct Module: public interface::Module
void on_tick(const interface::TickEvent &event)
{
/*m_server->access_scene([&](Scene *scene){
/*main_context::access(m_server, [&](main_context::Interface *imc)
{
Scene *scene = imc->get_scene();
Node *n = scene->GetChild("Testbox");
auto p = n->GetPosition();
log_v(MODULE, "Testbox: (%f, %f, %f)", p.x_, p.y_, p.z_);
});*/
static uint a = 0;
if(((a++) % 150) == 0){
m_server->access_scene([&](Scene *scene){
main_context::access(m_server, [&](main_context::Interface *imc)
{
Scene *scene = imc->get_scene();
Node *n = scene->GetChild("Testbox");
n->SetRotation(Quaternion(30, 60, 90));
n->SetPosition(Vector3(30.0f, 30.0f, 40.0f));

120
src/impl/thread.cpp Normal file
View File

@ -0,0 +1,120 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#include "interface/thread.h"
#include "interface/mutex.h"
#include "core/log.h"
#include <c55/os.h>
#include <deque>
#ifdef _WIN32
#include "ports/windows_compat.h"
#else
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#endif
#define MODULE "thread_pool"
namespace interface {
struct CThread: public Thread
{
// TODO: Use std::atomic_bool
bool m_running = false;
bool m_stop_requested = false;
interface::Mutex m_mutex; // Protects each of the former variables
up_<ThreadedThing> m_thing;
pthread_t m_thread;
CThread(ThreadedThing *thing):
m_thing(thing)
{}
~CThread()
{
request_stop();
join();
}
static void* run_thread(void *arg)
{
log_d(MODULE, "Thread %p start", arg);
#ifndef _WIN32
// Disable all signals
sigset_t sigset;
sigemptyset(&sigset);
(void)pthread_sigmask(SIG_SETMASK, &sigset, NULL);
#endif
// Go on
CThread *thread = (CThread*)arg;
try {
if(thread->m_thing)
thread->m_thing->run(thread);
} catch(std::exception &e){
log_w(MODULE, "ThreadThing of thread %p failed: %s",
arg, e.what());
}
log_d(MODULE, "Thread %p exit", arg);
interface::MutexScope ms(thread->m_mutex);
thread->m_running = false;
pthread_exit(NULL);
}
// Interface
void start()
{
interface::MutexScope ms(m_mutex);
if(m_running){
log_w(MODULE, "CThread::start(): Already running");
return;
}
m_stop_requested = false;
if(pthread_create(&m_thread, NULL, run_thread, (void*)this)){
throw Exception("pthread_create() failed");
}
m_running = true;
}
bool is_running()
{
interface::MutexScope ms(m_mutex);
return m_running;
}
void request_stop()
{
interface::MutexScope ms(m_mutex);
m_stop_requested = true;
}
bool stop_requested()
{
interface::MutexScope ms(m_mutex);
return m_stop_requested;
}
void join()
{
{
interface::MutexScope ms(m_mutex);
if(!m_running){
return;
}
if(!m_stop_requested){
log_w(MODULE, "Joining a thread that was not requested "
"to stop");
}
}
pthread_join(m_thread, NULL);
}
};
Thread* createThread(ThreadedThing *thing)
{
return new CThread(thing);
}
}
// vim: set noet ts=4 sw=4:

View File

@ -33,7 +33,7 @@ namespace interface
virtual ~Private(){}
};
Type type;
const sp_<Private> p;
sp_<const Private> p;
Event():
type(0){}

View File

@ -9,7 +9,7 @@
#include <Node.h>
#include <Component.h>
namespace server
namespace interface
{
using interface::Event;
namespace magic = Urho3D;

View File

@ -0,0 +1,102 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#pragma once
#include "core/types.h"
#ifdef _WIN32
#include "ports/windows_sockets.h"
#include "ports/windows_compat.h" // usleep()
#else
#include <sys/socket.h>
#include <unistd.h> // usleep()
#endif
#include <cstring> // strerror()
namespace interface
{
struct SelectHandler
{
static constexpr const char *MODULE = "SelectHandler";
set_<int> attempt_bad_fds;
int last_added_attempt_bad_fd = -42;
set_<int> bad_fds;
size_t num_consequent_valid_selects = 0;
// Returns false if there is some sort of error (no errors are fatal)
bool check(int timeout_us, const sv_<int> &sockets,
sv_<int> &active_sockets)
{
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = timeout_us;
fd_set rfds;
FD_ZERO(&rfds);
int fd_max = 0;
if(!attempt_bad_fds.empty() || !bad_fds.empty()){
log_w(MODULE, "Ignoring fds %s and %s out of all %s",
cs(dump(attempt_bad_fds)), cs(dump(bad_fds)),
cs(dump(sockets)));
}
for(int fd : sockets){
if(attempt_bad_fds.count(fd) || bad_fds.count(fd))
continue;
FD_SET(fd, &rfds);
if(fd > fd_max)
fd_max = fd;
}
int r = select(fd_max + 1, &rfds, NULL, NULL, &tv);
if(r == -1){
if(errno == EINTR){
// The process is probably quitting
return false;
}
// Error
num_consequent_valid_selects = 0;
log_w(MODULE, "select() returned -1: %s (fds: %s)",
strerror(errno), cs(dump(sockets)));
if(errno == EBADF){
// These are temporary errors
// Try to find out which socket is doing this
if(attempt_bad_fds.size() == sockets.size()){
throw Exception("All fds are bad");
} else {
for(;;){
int fd = sockets[rand() % sockets.size()];
if(attempt_bad_fds.count(fd) == 0){
log_w(MODULE, "Trying to ignore fd=%i", fd);
attempt_bad_fds.insert(fd);
last_added_attempt_bad_fd = fd;
return false;
}
}
}
} else {
// Don't consume 100% CPU and flood logs
usleep(1000 * 100);
return false;
}
} else if(r == 0){
// Nothing happened
num_consequent_valid_selects++;
} else {
// Something happened
num_consequent_valid_selects++;
for(int fd : sockets){
if(FD_ISSET(fd, &rfds)){
log_d(MODULE, "FD_ISSET: %i", fd);
active_sockets.push_back(fd);
}
}
}
if(!attempt_bad_fds.empty() && num_consequent_valid_selects > 5){
log_w(MODULE, "Found bad fd: %d", last_added_attempt_bad_fd);
bad_fds.insert(last_added_attempt_bad_fd);
attempt_bad_fds.clear();
}
return true;
}
};
}
// vim: set noet ts=4 sw=4:

View File

@ -11,12 +11,18 @@ namespace Urho3D
class StringHash;
}
namespace server
{
struct Config;
}
namespace interface
{
namespace magic = Urho3D;
struct ModuleInfo;
struct Module;
typedef server::Config ServerConfig;
namespace thread_pool {
struct ThreadPool;
@ -27,11 +33,6 @@ namespace interface
TickEvent(float dtime): dtime(dtime){}
};
struct SocketEvent: public interface::Event::Private {
int fd;
SocketEvent(int fd): fd(fd){}
};
struct ModuleModifiedEvent: public interface::Event::Private {
ss_ name;
ss_ path;
@ -74,19 +75,15 @@ namespace interface
emit_event(std::move(Event(type, up_<Event::Private>(p))));
}
virtual void access_scene(std::function<void(magic::Scene*)> cb) = 0;
virtual void sub_magic_event(struct interface::Module *module,
const magic::StringHash &event_type,
const Event::Type &buildat_event_type) = 0;
virtual void add_socket_event(int fd, const Event::Type &event_type) = 0;
virtual void remove_socket_event(int fd) = 0;
virtual void tmp_store_data(const ss_ &name, const ss_ &data) = 0;
virtual ss_ tmp_restore_data(const ss_ &name) = 0;
// Add resource file path (to make a mirror of the client)
virtual void add_file_path(const ss_ &name, const ss_ &path) = 0;
// Returns "" if not found
virtual ss_ get_file_path(const ss_ &name) = 0;
virtual const ServerConfig& get_config() = 0;
virtual void access_thread_pool(std::function<void(
interface::thread_pool::ThreadPool*pool)> cb) = 0;

View File

@ -0,0 +1,12 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#pragma once
#include "core/types.h"
#include "server/config.h"
namespace interface
{
typedef server::Config ServerConfig;
}
// vim: set noet ts=4 sw=4:

29
src/interface/thread.h Normal file
View File

@ -0,0 +1,29 @@
// http://www.apache.org/licenses/LICENSE-2.0
// Copyright 2014 Perttu Ahola <celeron55@gmail.com>
#pragma once
#include "core/types.h"
namespace interface
{
struct Thread;
struct ThreadedThing
{
virtual ~ThreadedThing(){}
// Shall run until finished or until thread->stop_requested()
virtual void run(interface::Thread *thread) = 0;
};
struct Thread
{
virtual ~Thread(){}
virtual void start() = 0;
virtual bool is_running() = 0;
virtual void request_stop() = 0;
virtual bool stop_requested() = 0;
virtual void join() = 0;
};
Thread* createThread(ThreadedThing *thing);
}
// vim: set noet ts=4 sw=4:

View File

@ -18,19 +18,6 @@ namespace interface
virtual bool post() = 0;
};
/*struct Thread
{
virtual ~Thread(){}
virtual void add_task(up_<Task> task) = 0;
virtual void start() = 0;
virtual void request_stop() = 0;
virtual void join() = 0;
virtual bool is_running() = 0;
virtual void main() = 0; // Allow task to do stuff in main thread
};
Thread* createThread();*/
struct ThreadPool
{
virtual ~ThreadPool(){}

View File

@ -7,10 +7,6 @@
#include "interface/server.h"
#include <c55/getopt.h>
#include <c55/os.h>
#include <Context.h>
#include <Engine.h>
#include <Scene.h>
#include <Profiler.h>
#ifdef _WIN32
#include "ports/windows_sockets.h"
#include "ports/windows_compat.h"
@ -24,8 +20,6 @@
#include <time.h> // struct timeval
#define MODULE "main"
namespace magic = Urho3D;
server::Config g_server_config;
bool g_sigint_received = false;
@ -161,89 +155,13 @@ int main(int argc, char *argv[])
uint64_t next_tick_us = get_timeofday_us();
uint64_t t_per_tick = 1000000 / 30; // Same as physics FPS
set_<int> attempt_bad_fds;
int last_added_attempt_bad_fd = -42;
set_<int> bad_fds;
size_t num_consequent_valid_selects = 0;
uint64_t profiler_last_print_us = next_tick_us;
while(!g_sigint_received){
uint64_t current_us = get_timeofday_us();
int64_t delay_us = next_tick_us - current_us;
if(delay_us < 0)
delay_us = 0;
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = delay_us;
fd_set rfds;
FD_ZERO(&rfds);
sv_<int> sockets = state->get_sockets();
int fd_max = 0;
if(!attempt_bad_fds.empty() || !bad_fds.empty()){
log_w("main", "Ignoring fds %s and %s out of all %s",
cs(dump(attempt_bad_fds)), cs(dump(bad_fds)),
cs(dump(sockets)));
}
for(int fd : sockets){
if(attempt_bad_fds.count(fd) || bad_fds.count(fd))
continue;
FD_SET(fd, &rfds);
if(fd > fd_max)
fd_max = fd;
}
int r = select(fd_max + 1, &rfds, NULL, NULL, &tv);
if(r == -1){
if(errno == EINTR && g_sigint_received){
// Fine, we're quitting
break;
}
// Error
num_consequent_valid_selects = 0;
log_w("main", "select() returned -1: %s (fds: %s)",
strerror(errno), cs(dump(sockets)));
if(errno == EBADF || errno == EINTR){
// These are temporary errors
// Try to find out which socket is doing this
if(attempt_bad_fds.size() == sockets.size()){
throw Exception("All fds are bad");
} else {
for(;;){
int fd = sockets[rand() % sockets.size()];
if(attempt_bad_fds.count(fd) == 0){
log_w("main", "Trying to ignore fd=%i", fd);
attempt_bad_fds.insert(fd);
last_added_attempt_bad_fd = fd;
break;
}
}
}
} else {
// Don't consume 100% CPU and flood logs
usleep(1000 * 100);
return 1;
}
} else if(r == 0){
// Nothing happened
num_consequent_valid_selects++;
} else {
// Something happened
num_consequent_valid_selects++;
for(int fd : sockets){
if(FD_ISSET(fd, &rfds)){
log_d("main", "FD_ISSET: %i", fd);
state->emit_socket_event(fd);
}
}
}
if(!attempt_bad_fds.empty() && num_consequent_valid_selects > 5){
log_w("main", "Found bad fd: %d", last_added_attempt_bad_fd);
bad_fds.insert(last_added_attempt_bad_fd);
attempt_bad_fds.clear();
}
usleep(delay_us);
state->handle_events();
@ -256,22 +174,6 @@ int main(int argc, char *argv[])
interface::Event event("core:tick",
new interface::TickEvent(t_per_tick / 1e6));
state->emit_event(std::move(event));
state->access_scene([&](magic::Scene *scene)
{
magic::Context *context = scene->GetContext();
magic::Engine *engine = context->GetSubsystem<magic::Engine>();
engine->SetNextTimeStep(t_per_tick / 1e6);
engine->RunFrame();
magic::Profiler *p = context->GetSubsystem<magic::Profiler>();
if(p && profiler_last_print_us < current_us - 10000000){
profiler_last_print_us = current_us;
magic::String s = p->GetData(false, false, UINT_MAX);
p->BeginInterval();
log_v("main", "Urho3D profiler:\n%s", s.CString());
}
});
}
if(state->is_shutdown_requested(&exit_status, &shutdown_reason))

View File

@ -5,32 +5,21 @@
#include "rccpp.h"
#include "rccpp_util.h"
#include "config.h"
#include "magic_event_handler.h"
#include "urho3d_log_redirect.h"
#include "interface/module.h"
#include "interface/module_info.h"
#include "interface/server.h"
#include "interface/event.h"
#include "interface/file_watch.h"
#include "interface/fs.h"
#include "interface/magic_event.h"
#include "interface/sha1.h"
#include "interface/mutex.h"
#include "interface/thread_pool.h"
#include <Variant.h>
#include <Context.h>
#include <Engine.h>
#include <Scene.h>
#include <SceneEvents.h>
#include <Component.h>
#include <ReplicationState.h>
#include <PhysicsWorld.h>
#include <ResourceCache.h>
#include <Octree.h>
#include <Profiler.h>
#include "interface/thread.h"
#include "interface/semaphore.h"
#include <iostream>
#include <algorithm>
#include <fstream>
#include <deque>
#define MODULE "__state"
#ifdef _WIN32
@ -45,44 +34,114 @@ extern bool g_sigint_received;
namespace server {
using interface::Event;
namespace magic = Urho3D;
class BuildatResourceRouter: public magic::ResourceRouter
struct ModuleContainer;
struct ModuleThread: public interface::ThreadedThing
{
OBJECT(BuildatResourceRouter);
ModuleContainer *mc = nullptr;
server::State *m_server;
public:
BuildatResourceRouter(magic::Context *context, server::State *server):
magic::ResourceRouter(context),
m_server(server)
ModuleThread(ModuleContainer *mc):
mc(mc)
{}
void Route(magic::String &name, magic::ResourceRequest requestType)
{
ss_ path = m_server->get_file_path(name.CString());
if(path == ""){
log_v(MODULE, "Resource route access: %s (assuming local file)",
name.CString());
return;
void run(interface::Thread *thread);
};
struct ModuleContainer
{
up_<interface::Module> module;
interface::ModuleInfo info;
interface::Mutex mutex; // Protects each of the former variables
up_<interface::Thread> thread;
std::deque<Event> event_queue; // Push back, pop front
interface::Mutex event_queue_mutex;
interface::Semaphore event_queue_sem; // Counts queued events
ModuleContainer(interface::Module *module = NULL,
const interface::ModuleInfo &info = interface::ModuleInfo()):
module(module),
info(info)
{}
~ModuleContainer(){
log_t(MODULE, "ModuleContainer[%s]: Destructing", cs(info.name));
stop_and_delete_module();
}
void stop_and_delete_module(){
if(thread){
log_t(MODULE, "ModuleContainer[%s]: Asking thread to exit",
cs(info.name));
thread->request_stop();
event_queue_sem.post(); // Wake up thread so it can exit
log_t(MODULE, "ModuleContainer[%s]: Asked thread to exit; waiting",
cs(info.name));
thread->join();
thread.reset();
log_t(MODULE, "ModuleContainer[%s]: Thread exited", cs(info.name));
} else {
log_t(MODULE, "ModuleContainer[%s]: No thread", cs(info.name));
}
log_v(MODULE, "Resource route access: %s -> %s",
name.CString(), cs(path));
name = path.c_str();
// Module should have been deleted by the thread. In case the thread
// failed, delete it here.
module.reset();
}
void push_event(const Event &event){
interface::MutexScope ms(event_queue_mutex);
event_queue.push_back(event);
event_queue_sem.post();
}
void emit_event_sync(const Event &event){
interface::MutexScope ms(mutex);
module->event(event.type, event.p.get());
}
};
void ModuleThread::run(interface::Thread *thread)
{
for(;;){
// Wait for an event
mc->event_queue_sem.wait();
// Check if should stop
if(thread->stop_requested())
break;
// Grab an event from the queue
Event event;
{
interface::MutexScope ms(mc->event_queue_mutex);
if(mc->event_queue.empty())
continue;
event = mc->event_queue.front();
mc->event_queue.pop_front();
}
// Handle the event
interface::MutexScope ms(mc->mutex);
if(!mc->module){
log_w(MODULE, "ModuleContainer[%s]: Module is null; cannot"
" handle event", cs(mc->info.name));
continue;
}
try {
mc->module->event(event.type, event.p.get());
} catch(std::exception &e){
log_w(MODULE, "module->event() failed: %s", e.what());
}
}
// Delete module in this thread. This is important in case the destruction
// of some objects in the module is required to be done in the same thread
// as they were created in.
// It is also important to delete the module outside of mc->mutex, as doing
// it in the locked state will only cause deadlocks.
up_<interface::Module> module_moved;
{
interface::MutexScope ms(mc->mutex);
module_moved = std::move(mc->module);
}
module_moved.reset();
}
struct CState: public State, public interface::Server
{
struct ModuleContainer {
interface::Mutex mutex;
interface::Module *module;
interface::ModuleInfo info;
ModuleContainer(interface::Module *module = NULL,
const interface::ModuleInfo &info = interface::ModuleInfo()):
module(module), info(info){}
};
struct SocketState {
int fd = 0;
Event::Type event_type;
@ -95,18 +154,8 @@ struct CState: public State, public interface::Server
up_<rccpp::Compiler> m_compiler;
ss_ m_modules_path;
magic::SharedPtr<magic::Context> m_magic_context;
magic::SharedPtr<magic::Engine> m_magic_engine;
magic::SharedPtr<magic::Scene> m_magic_scene;
sm_<Event::Type, magic::SharedPtr<MagicEventHandler>> m_magic_event_handlers;
// NOTE: m_magic_mutex must be locked when constructing or destructing
// modules. In every other case modules must use access_scene().
// NOTE: If not locked, creating or destructing Urho3D Objects can cause a
// crash.
interface::Mutex m_magic_mutex; // Lock for all of Urho3D
sm_<ss_, interface::ModuleInfo> m_module_info; // Info of every seen module
sm_<ss_, ModuleContainer> m_modules; // Currently loaded modules
sm_<ss_, sp_<ModuleContainer>> m_modules; // Currently loaded modules
set_<ss_> m_unloads_requested;
sv_<interface::ModuleInfo> m_reloads_requested;
sm_<ss_, sp_<interface::FileWatch>> m_module_file_watches;
@ -117,17 +166,11 @@ struct CState: public State, public interface::Server
// TODO: Handle properly in reloads (unload by popping from top, then reload
// everything until top)
sv_<ss_> m_module_load_order;
sv_<sv_<wp_<ModuleContainer>>> m_event_subs;
// NOTE: You can make a copy of an sp_<ModuleContainer> and unlock this
// mutex for processing the module asynchronously (just lock mc->mutex)
interface::Mutex m_modules_mutex;
sv_<Event> m_event_queue;
interface::Mutex m_event_queue_mutex;
sv_<sv_<ModuleContainer* >> m_event_subs;
interface::Mutex m_event_subs_mutex;
sm_<int, SocketState> m_sockets;
interface::Mutex m_sockets_mutex;
sm_<ss_, ss_> m_tmp_data;
interface::Mutex m_tmp_data_mutex;
@ -175,74 +218,32 @@ struct CState: public State, public interface::Server
m_compiler->libraries.push_back("-lUrho3D");
m_compiler->include_directories.push_back(
g_server_config.urho3d_path+"/Source/ThirdParty/Bullet/src");
// Initialize Urho3D
m_magic_context = new magic::Context();
m_magic_engine = new magic::Engine(m_magic_context);
// Load hardcoded log redirection module
{
interface::Module *m = new urho3d_log_redirect::Module(this);
load_module_direct_u(m, "urho3d_log_redirect");
// Disable timestamps in Urho3D log message events
magic::Log *magic_log = m_magic_context->GetSubsystem<magic::Log>();
magic_log->SetTimeStamp(false);
}
sv_<ss_> resource_paths = {
g_server_config.urho3d_path+"/Bin/CoreData",
g_server_config.urho3d_path+"/Bin/Data",
};
auto *fs = interface::getGlobalFilesystem();
ss_ resource_paths_s;
for(const ss_ &path : resource_paths){
if(!resource_paths_s.empty())
resource_paths_s += ";";
resource_paths_s += fs->get_absolute_path(path);
}
magic::VariantMap params;
params["ResourcePaths"] = resource_paths_s.c_str();
params["Headless"] = true;
params["LogName"] = ""; // Don't log to file
params["LogQuiet"] = true; // Don't log to stdout
if(!m_magic_engine->Initialize(params))
throw Exception("Urho3D engine initialization failed");
m_magic_scene = new magic::Scene(m_magic_context);
auto *physics = m_magic_scene->CreateComponent<magic::PhysicsWorld>(
magic::LOCAL);
physics->SetFps(30);
physics->SetInterpolation(false);
// Useless but gets rid of warnings like
// "ERROR: No Octree component in scene, drawable will not render"
m_magic_scene->CreateComponent<magic::Octree>(magic::LOCAL);
magic::ResourceCache *magic_cache =
m_magic_context->GetSubsystem<magic::ResourceCache>();
//magic_cache->SetAutoReloadResources(true);
magic_cache->SetResourceRouter(
new BuildatResourceRouter(m_magic_context, this));
}
~CState()
{
interface::MutexScope ms(m_modules_mutex);
interface::MutexScope ms_magic(m_magic_mutex);
// Unload modules in reverse load order to make things work more
// predictably
for(auto name_it = m_module_load_order.rbegin();
name_it != m_module_load_order.rend(); ++name_it){
auto it2 = m_modules.find(*name_it);
if(it2 == m_modules.end())
continue;
ModuleContainer &mc = it2->second;
// Don't lock; it would only cause deadlocks
delete mc.module;
mc.module = nullptr;
sv_<sp_<ModuleContainer>> mcs;
{
// Don't have this locked when handling modules because it causes
// deadlocks
interface::MutexScope ms(m_modules_mutex);
for(auto name_it = m_module_load_order.rbegin();
name_it != m_module_load_order.rend(); ++name_it){
auto it2 = m_modules.find(*name_it);
if(it2 == m_modules.end())
continue;
sp_<ModuleContainer> &mc = it2->second;
mcs.push_back(mc);
}
}
for(sp_<ModuleContainer> &mc : mcs){
log_v(MODULE, "Destructing module %s", cs(mc->info.name));
mc->stop_and_delete_module();
// Remove our reference to the module container.
// This stops the module's main thread.
// (This is a shared pointer)
mc.reset();
}
}
@ -270,7 +271,6 @@ struct CState: public State, public interface::Server
return m_shutdown_requested;
}
// Call with m_modules_mutex and m_magic_mutex locked
interface::Module* build_module_u(const interface::ModuleInfo &info)
{
ss_ init_cpp_path = info.path+"/"+info.name+".cpp";
@ -387,7 +387,6 @@ struct CState: public State, public interface::Server
void load_module_direct_u(interface::Module *m, const ss_ &name)
{
interface::MutexScope ms(m_modules_mutex);
interface::MutexScope ms_magic(m_magic_mutex);
interface::ModuleInfo info;
info.name = name;
@ -397,19 +396,21 @@ struct CState: public State, public interface::Server
m_module_info[info.name] = info;
m_modules[info.name] = ModuleContainer(m, info);
m_modules[info.name] = sp_<ModuleContainer>(
new ModuleContainer(m, info));
m_module_load_order.push_back(info.name);
// Call init()
ModuleContainer &mc = m_modules[info.name];
interface::MutexScope ms2(mc.mutex);
mc.module->init();
sp_<ModuleContainer> mc = m_modules[info.name];
interface::MutexScope ms2(mc->mutex);
mc->module->init();
mc->thread.reset(interface::createThread(new ModuleThread(mc.get())));
mc->thread->start();
}
bool load_module(const interface::ModuleInfo &info)
{
interface::MutexScope ms(m_modules_mutex);
interface::MutexScope ms_magic(m_magic_mutex);
if(m_modules.find(info.name) != m_modules.end()){
log_w(MODULE, "Cannot load module %s from %s: Already loaded",
@ -431,15 +432,18 @@ struct CState: public State, public interface::Server
return false;
}
}
m_modules[info.name] = ModuleContainer(m, info);
m_modules[info.name] = sp_<ModuleContainer>(
new ModuleContainer(m, info));
m_module_load_order.push_back(info.name);
// Call init()
if(m){
ModuleContainer &mc = m_modules[info.name];
interface::MutexScope ms2(mc.mutex);
mc.module->init();
sp_<ModuleContainer> mc = m_modules[info.name];
interface::MutexScope ms2(mc->mutex);
mc->module->init();
mc->thread.reset(interface::createThread(new ModuleThread(mc.get())));
mc->thread->start();
}
emit_event(Event("core:module_loaded",
@ -460,16 +464,16 @@ struct CState: public State, public interface::Server
return;
}
// Allow loader to load other modules
emit_event(Event("core:load_modules"));
handle_events();
// Allow loader to load other modules.
// Emit synchronously because threading doesn't matter at this point in
// initialization and we have to wait for it to complete.
emit_event(Event("core:load_modules"), true);
if(is_shutdown_requested())
return;
// Now that everyone is listening, we can fire the start event
emit_event(Event("core:start"));
handle_events();
}
// interface::Server version; doesn't directly unload
@ -525,19 +529,19 @@ struct CState: public State, public interface::Server
log_w(MODULE, "unload_module_u: Module not found: %s", cs(module_name));
return;
}
ModuleContainer *mc = &it->second;
sp_<ModuleContainer> mc = it->second;
{
interface::MutexScope mc_ms(mc->mutex);
// Send core::unload directly to module
mc->module->event(Event::t("core:unload"), nullptr);
// Delete subscriptions
{
interface::MutexScope ms(m_event_subs_mutex);
for(Event::Type type = 0; type < m_event_subs.size(); type++){
sv_<ModuleContainer*> &sublist = m_event_subs[type];
sv_<ModuleContainer*> new_sublist;
for(ModuleContainer *mc1 : sublist){
if(mc1 != mc)
sv_<wp_<ModuleContainer>> &sublist = m_event_subs[type];
sv_<wp_<ModuleContainer>> new_sublist;
for(wp_<ModuleContainer> &mc1 : sublist){
if(sp_<ModuleContainer>(mc1.lock()).get() !=
mc.get())
new_sublist.push_back(mc1);
else
log_v(MODULE, "Removing %s subscription to event %zu",
@ -546,13 +550,11 @@ struct CState: public State, public interface::Server
sublist = new_sublist;
}
}
// Delete module
{
interface::MutexScope ms_magic(m_magic_mutex);
delete mc->module;
}
}
m_modules.erase(module_name);
{
// Delete module and container
m_modules.erase(module_name);
}
m_compiler->unload(module_name);
emit_event(Event("core:module_unloaded",
@ -575,7 +577,7 @@ struct CState: public State, public interface::Server
auto it = m_modules.find(module_name);
if(it == m_modules.end())
throw ModuleNotFoundException(ss_()+"Module not found: "+module_name);
ModuleContainer *mc = &it->second;
ModuleContainer *mc = it->second.get();
return mc->info.path;
}
@ -585,7 +587,7 @@ struct CState: public State, public interface::Server
auto it = m_modules.find(module_name);
if(it == m_modules.end())
return NULL;
return it->second.module;
return it->second->module.get();
}
interface::Module* check_module(const ss_ &module_name)
@ -615,14 +617,20 @@ struct CState: public State, public interface::Server
bool access_module(const ss_ &module_name,
std::function<void(interface::Module*)> cb)
{
// This prevents module from being deleted while it is being called
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(module_name);
if(it == m_modules.end())
return false;
ModuleContainer *mc = &it->second;
sp_<ModuleContainer> mc;
{
// This prevents module from being deleted while a reference is
// being copied
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(module_name);
if(it == m_modules.end())
return false;
mc = it->second;
}
interface::MutexScope mc_ms(mc->mutex);
cb(mc->module);
if(!mc->module)
return false;
cb(mc->module.get());
return true;
}
@ -632,12 +640,12 @@ struct CState: public State, public interface::Server
// Lock modules so that the subscribing one isn't removed asynchronously
interface::MutexScope ms(m_modules_mutex);
// Make sure module is a known instance
ModuleContainer *mc0 = NULL;
sp_<ModuleContainer> mc0;
ss_ module_name = "(unknown)";
for(auto &pair : m_modules){
ModuleContainer &mc = pair.second;
if(mc.module == module){
mc0 = &mc;
sp_<ModuleContainer> &mc = pair.second;
if(mc->module.get() == module){
mc0 = mc;
module_name = pair.first;
break;
}
@ -646,21 +654,28 @@ struct CState: public State, public interface::Server
log_w(MODULE, "sub_event(): Not a known module");
return;
}
interface::MutexScope ms2(m_event_subs_mutex);
if(m_event_subs.size() <= type + 1)
m_event_subs.resize(type + 1);
sv_<ModuleContainer*> &sublist = m_event_subs[type];
if(std::find(sublist.begin(), sublist.end(), mc0) != sublist.end()){
sv_<wp_<ModuleContainer>> &sublist = m_event_subs[type];
bool found = false;
for(wp_<ModuleContainer> &item : sublist){
if(item.lock() == mc0){
found = true;
break;
}
}
if(found){
log_w(MODULE, "sub_event(): Already on list: %s", cs(module_name));
return;
}
auto *evreg = interface::getGlobalEventRegistry();
log_d(MODULE, "sub_event(): %s subscribed to %s (%zu)",
cs(module_name), cs(evreg->name(type)), type);
sublist.push_back(mc0);
sublist.push_back(wp_<ModuleContainer>(mc0));
}
void emit_event(Event event)
// Do not use synchronous=true unless specifically needed in a special case.
void emit_event(Event event, bool synchronous)
{
if(log_get_max_level() >= LOG_TRACE){
auto *evreg = interface::getGlobalEventRegistry();
@ -668,37 +683,48 @@ struct CState: public State, public interface::Server
cs(evreg->name(event.type)), event.type);
}
// TODO: Run modules in threads and have a separate event queue in each
// of them, and copy the event to the queues in here according to
// subscriptions
interface::MutexScope ms(m_event_queue_mutex);
m_event_queue.push_back(std::move(event));
}
void access_scene(std::function<void(magic::Scene*)> cb)
{
interface::MutexScope ms(m_magic_mutex);
cb(m_magic_scene);
}
void sub_magic_event(struct interface::Module *module,
const magic::StringHash &event_type,
const Event::Type &buildat_event_type)
{
sv_<sv_<wp_<ModuleContainer>>> event_subs_snapshot;
{
interface::MutexScope ms(m_magic_mutex);
m_magic_event_handlers[buildat_event_type] = new MagicEventHandler(
m_magic_context, this, event_type, buildat_event_type);
interface::MutexScope ms(m_modules_mutex);
event_subs_snapshot = m_event_subs;
}
sub_event(module, buildat_event_type);
if(event.type >= event_subs_snapshot.size()){
log_t("state", "emit_event(): %zu: No subs", event.type);
return;
}
sv_<wp_<ModuleContainer>> &sublist = event_subs_snapshot[event.type];
if(sublist.empty()){
log_t("state", "emit_event(): %zu: No subs", event.type);
return;
}
if(log_get_max_level() >= LOG_TRACE){
auto *evreg = interface::getGlobalEventRegistry();
log_t("state", "emit_event(): %s (%zu): Pushing to %zu modules",
cs(evreg->name(event.type)), event.type, sublist.size());
}
for(wp_<ModuleContainer> &mc_weak : sublist){
sp_<ModuleContainer> mc(mc_weak.lock());
if(mc){
if(synchronous)
mc->emit_event_sync(event);
else
mc->push_event(event);
} else {
auto *evreg = interface::getGlobalEventRegistry();
log_t("state", "emit_event(): %s: (%zu): Subscriber weak pointer"
" is null", cs(evreg->name(event.type)), event.type);
}
}
}
void emit_event(Event event)
{
emit_event(event, false);
}
void handle_events()
{
magic::AutoProfileBlock profiler_block(m_magic_context->
GetSubsystem<magic::Profiler>(), "Buildat|handle_events");
// Get modified modules and push events to queue
{
interface::MutexScope ms(m_modules_mutex);
@ -714,48 +740,9 @@ struct CState: public State, public interface::Server
info.name, info.path)));
}
}
// Note: Locking m_modules_mutex here is not needed because no modules
// can be deleted while this is running, because modules are deleted
// only by this same thread.
for(size_t loop_i = 0;; loop_i++){
if(g_sigint_received){
// Get out fast
throw ServerShutdownRequest("Server shutdown requested via SIGINT");
}
sv_<Event> event_queue_snapshot;
sv_<sv_<ModuleContainer* >> event_subs_snapshot;
{
interface::MutexScope ms2(m_event_queue_mutex);
interface::MutexScope ms3(m_event_subs_mutex);
// Swap to clear queue
m_event_queue.swap(event_queue_snapshot);
// Copy to leave subscriptions active
event_subs_snapshot = m_event_subs;
}
if(event_queue_snapshot.empty()){
if(loop_i == 0)
log_t("state", "handle_events(); Nothing to do");
break;
}
for(const Event &event : event_queue_snapshot){
if(event.type >= event_subs_snapshot.size()){
log_t("state", "handle_events(): %zu: No subs", event.type);
continue;
}
sv_<ModuleContainer*> &sublist = event_subs_snapshot[event.type];
if(sublist.empty()){
log_t("state", "handle_events(): %zu: No subs", event.type);
continue;
}
log_t("state", "handle_events(): %zu: Handling (%zu handlers)",
event.type, sublist.size());
for(ModuleContainer *mc : sublist){
interface::MutexScope mc_ms(mc->mutex);
mc->module->event(event.type, event.p.get());
}
}
handle_unloads_and_reloads();
}
// Handle module unloads and reloads as requested
handle_unloads_and_reloads();
}
void handle_unloads_and_reloads()
@ -781,14 +768,19 @@ struct CState: public State, public interface::Server
load_module(info);
// Send core::continue directly to module
{
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(info.name);
if(it == m_modules.end()){
log_w(MODULE, "reload_module: Module not found: %s",
cs(info.name));
return;
sp_<ModuleContainer> mc;
{
// This prevents module from being deleted while a reference
// is being copied
interface::MutexScope ms(m_modules_mutex);
auto it = m_modules.find(info.name);
if(it == m_modules.end()){
log_w(MODULE, "reload_module: Module not found: %s",
cs(info.name));
return;
}
mc = it->second;
}
ModuleContainer *mc = &it->second;
interface::MutexScope mc_ms(mc->mutex);
mc->module->event(Event::t("core:continue"), nullptr);
}
@ -796,78 +788,6 @@ struct CState: public State, public interface::Server
m_reloads_requested.clear();
}
void add_socket_event(int fd, const Event::Type &event_type)
{
log_d("state", "add_socket_event(): fd=%i", fd);
interface::MutexScope ms(m_sockets_mutex);
auto it = m_sockets.find(fd);
if(it == m_sockets.end()){
SocketState s;
s.fd = fd;
s.event_type = event_type;
m_sockets[fd] = s;
return;
}
const SocketState &s = it->second;
if(s.event_type != event_type){
throw Exception("Socket events already requested with different"
" event type");
}
// Nothing to do; already set.
}
void remove_socket_event(int fd)
{
interface::MutexScope ms(m_sockets_mutex);
m_sockets.erase(fd);
}
sv_<int> get_sockets()
{
sv_<int> result;
{
interface::MutexScope ms(m_sockets_mutex);
for(auto &pair : m_sockets)
result.push_back(pair.second.fd);
}
{
interface::MutexScope ms(m_modules_mutex);
for(auto &pair : m_module_file_watches){
auto fds = pair.second->get_fds();
result.insert(result.end(), fds.begin(), fds.end());
}
}
return result;
}
void emit_socket_event(int fd)
{
log_d(MODULE, "emit_socket_event(): fd=%i", fd);
// Break if not found; return if found and handled.
do {
interface::MutexScope ms(m_modules_mutex);
for(auto &pair : m_module_file_watches){
sv_<int> fds = pair.second->get_fds();
if(std::find(fds.begin(), fds.end(), fd) != fds.end()){
pair.second->report_fd(fd);
return;
}
}
} while(0);
do {
interface::MutexScope ms(m_sockets_mutex);
auto it = m_sockets.find(fd);
if(it == m_sockets.end())
break;
SocketState &s = it->second;
// Create and emit event
interface::Event event(s.event_type,
new interface::SocketEvent(fd));
emit_event(std::move(event));
return;
} while(0);
}
void tmp_store_data(const ss_ &name, const ss_ &data)
{
interface::MutexScope ms(m_tmp_data_mutex);
@ -900,6 +820,11 @@ struct CState: public State, public interface::Server
return it->second;
}
const interface::ServerConfig& get_config()
{
return g_server_config;
}
void access_thread_pool(std::function<void(
interface::thread_pool::ThreadPool*pool)> cb)
{

View File

@ -14,15 +14,8 @@ namespace interface
}
}
namespace Urho3D
{
class Scene;
}
namespace server
{
namespace magic = Urho3D;
struct ServerShutdownRequest: public Exception {
ss_ msg;
ServerShutdownRequest(const ss_ &msg): Exception(msg){}
@ -46,11 +39,8 @@ namespace server
virtual void sub_event(struct interface::Module *module,
const interface::Event::Type &type) = 0;
virtual void emit_event(interface::Event event) = 0;
virtual void access_scene(std::function<void(magic::Scene*)> cb) = 0;
virtual void handle_events() = 0;
virtual sv_<int> get_sockets() = 0;
virtual void emit_socket_event(int fd) = 0;
// Add resource file path (to make a mirror of the client)
virtual void add_file_path(const ss_ &name, const ss_ &path) = 0;