PERSISTENCE: fixed ConnectionPool usage

using it as a thread_local produced a lot of problems for the async running
PersistenceMgr. That's why the connection pool is now a member of the DBHandler
and holds a mutex to lock the access to the connections
Martin Gerhardy 2019-11-16 13:03:27 +01:00
parent 7349bba327
commit 90097fd682
14 changed files with 59 additions and 37 deletions

View File

@ -3,8 +3,6 @@
*/
#include "BindParam.h"
#include "ScopedConnection.h"
#include "ConnectionPool.h"
#include "Connection.h"
#include "Model.h"
#include "FieldType.h"

View File

@ -3,9 +3,7 @@
*/
#include "Connection.h"
#include "ConnectionPool.h"
#include "core/Log.h"
#include "core/Singleton.h"
#include "postgres/PQSymbol.h"
namespace persistence {
@ -130,8 +128,4 @@ void Connection::disconnect() {
_preparedStatements.clear();
}
void Connection::close() {
core::Singleton<ConnectionPool>::getInstance().giveBack(this);
}
}

View File

@ -41,8 +41,6 @@ public:
bool connect();
public:
void close();
ConnectionType* connection() const;
};

View File

@ -18,8 +18,12 @@ ConnectionPool::~ConnectionPool() {
}
bool ConnectionPool::init() {
_min = core::Var::getSafe(cfg::DatabaseMinConnections)->intVal();
_max = core::Var::getSafe(cfg::DatabaseMaxConnections)->intVal();
std::unique_lock lock(_mutex);
_minConnections = core::Var::getSafe(cfg::DatabaseMinConnections);
_maxConnections = core::Var::getSafe(cfg::DatabaseMaxConnections);
_min = _minConnections->intVal();
_max = _maxConnections->intVal();
if (_min > _max) {
Log::error("The min connection amount must be smaller or equal to the max connection amount");
@ -45,6 +49,7 @@ bool ConnectionPool::init() {
}
void ConnectionPool::shutdown() {
std::unique_lock lock(_mutex);
while (!_connections.empty()) {
Connection* c = _connections.front();
c->disconnect();
@ -83,13 +88,29 @@ void ConnectionPool::giveBack(Connection* c) {
if (c == nullptr) {
return;
}
std::unique_lock lock(_mutex);
_connections.push(c);
}
Connection* ConnectionPool::connection() {
std::unique_lock lock(_mutex);
if (_connections.empty()) {
if (_minConnections->isDirty()) {
const int newMin = _minConnections->intVal();
if (newMin > 0 && newMin <= _max) {
_min = newMin;
}
_minConnections->markClean();
}
if (_maxConnections->isDirty()) {
const int newMax = _maxConnections->intVal();
if (newMax > 0 && newMax >= _min) {
_max = newMax;
}
_maxConnections->markClean();
}
if (_connectionAmount >= _max) {
Log::warn("Could not acquire pooled connection, max limit hit");
Log::warn("Could not acquire pooled connection, max limit (%i) hit", _max);
return nullptr;
}
Connection* newC = addConnection();

View File

@ -6,8 +6,10 @@
#include "Connection.h"
#include "core/Var.h"
#include "core/Trace.h"
#include "core/IComponent.h"
#include <queue>
#include <mutex>
namespace persistence {
@ -16,6 +18,7 @@ namespace persistence {
*/
class ConnectionPool : public core::IComponent {
friend class Connection;
friend class ScopedConnection;
protected:
int _min = -1;
int _max = -1;
@ -24,6 +27,9 @@ protected:
core::VarPtr _dbHost;
core::VarPtr _dbUser;
core::VarPtr _dbPw;
core::VarPtr _minConnections;
core::VarPtr _maxConnections;
mutable core_trace_mutex(std::mutex, _mutex);
std::queue<Connection*> _connections;

View File

@ -3,9 +3,7 @@
*/
#include "DBHandler.h"
#include "core/Singleton.h"
#include "core/Assert.h"
#include "ConnectionPool.h"
#include "core/Log.h"
#include "postgres/PQSymbol.h"
@ -27,7 +25,7 @@ bool DBHandler::init() {
Log::error("Database driver initialization failed.");
return false;
}
if (!core::Singleton<ConnectionPool>::getInstance().init()) {
if (!_connectionPool.init()) {
Log::error(logid, "Failed to init the connection pool");
return false;
}
@ -37,12 +35,12 @@ bool DBHandler::init() {
void DBHandler::shutdown() {
_initialized = false;
core::Singleton<ConnectionPool>::getInstance().shutdown();
_connectionPool.shutdown();
postgresShutdown();
}
Connection* DBHandler::connection() const {
return core::Singleton<ConnectionPool>::getInstance().connection();
return _connectionPool.connection();
}
bool DBHandler::update(Model& model, const DBCondition& condition) const {
@ -182,7 +180,7 @@ bool DBHandler::exec(const std::string& query) const {
}
State DBHandler::execInternal(const std::string& query) const {
ScopedConnection scoped(connection());
ScopedConnection scoped(_connectionPool, connection());
if (!scoped) {
Log::error(logid, "Could not execute query '%s' - could not acquire connection", query.c_str());
return State();
@ -198,7 +196,7 @@ State DBHandler::execInternal(const std::string& query) const {
State DBHandler::execInternalWithCondition(const std::string& query, BindParam& params, int conditionOffset, const DBCondition& condition) const {
Log::debug(logid, "Execute query '%s'", query.c_str());
ScopedConnection scoped(connection());
ScopedConnection scoped(_connectionPool, connection());
if (!scoped) {
Log::error("Could not execute query '%s' - could not acquire connection", query.c_str());
return State();
@ -225,7 +223,7 @@ State DBHandler::execInternalWithCondition(const std::string& query, BindParam&
}
State DBHandler::execInternalWithParameters(const std::string& query, Model& model, const BindParam& param) const {
ScopedConnection scoped(connection());
ScopedConnection scoped(_connectionPool, connection());
if (!scoped) {
Log::error(logid, "Could not execute query '%s' - could not acquire connection", query.c_str());
return State();
@ -244,7 +242,7 @@ State DBHandler::execInternalWithParameters(const std::string& query, Model& mod
}
State DBHandler::execInternalWithParameters(const std::string& query, const BindParam& param) const {
ScopedConnection scoped(connection());
ScopedConnection scoped(_connectionPool, connection());
if (!scoped) {
Log::error(logid, "Could not execute query '%s' - could not acquire connection", query.c_str());
return State();

View File

@ -10,6 +10,7 @@
#pragma once
#include "PersistenceModels.h"
#include "ConnectionPool.h"
#include "Model.h"
#include "MassQuery.h"
#include "core/String.h"
@ -41,6 +42,8 @@ private:
State execInternalWithCondition(const std::string& query, BindParam& params, int conditionOffset, const DBCondition& condition) const;
State execInternalWithParameters(const std::string& query, const BindParam& param) const;
mutable ConnectionPool _connectionPool;
Connection* connection() const;
bool insertMetadata(const Model& model) const;
@ -49,7 +52,7 @@ private:
template<class FUNC, class MODEL>
bool select(const std::string& query, int conditionAmount, MODEL& model, const DBCondition& condition, FUNC&& func) const {
Log::debug(logid, "Execute query '%s'", query.c_str());
ScopedConnection scoped(connection());
ScopedConnection scoped(_connectionPool, connection());
if (!scoped) {
Log::error("Could not execute query '%s' - could not acquire connection", query.c_str());
return false;

View File

@ -3,8 +3,6 @@
*/
#include "Model.h"
#include "ConnectionPool.h"
#include "ScopedConnection.h"
#include "ConstraintType.h"
#include "core/Log.h"
#include "core/String.h"

View File

@ -4,11 +4,12 @@
#include "ScopedConnection.h"
#include "Connection.h"
#include "ConnectionPool.h"
namespace persistence {
ScopedConnection::~ScopedConnection() {
_c->close();
_connectionPool.giveBack(_c);
}
}

View File

@ -8,12 +8,15 @@
namespace persistence {
class ConnectionPool;
class ScopedConnection {
private:
ConnectionPool& _connectionPool;
Connection* _c;
public:
ScopedConnection(Connection* c) :
_c(c) {
ScopedConnection(ConnectionPool& connectionPool, Connection* c) :
_connectionPool(connectionPool), _c(c) {
}
inline operator Connection*() {

View File

@ -13,26 +13,30 @@ private:
using Super = AbstractDatabaseTest;
protected:
bool _supported = true;
ConnectionPool _connectionPool;
public:
void SetUp() override {
Super::SetUp();
ConnectionPool& pool = core::Singleton<ConnectionPool>::getInstance();
_supported = pool.init();
_supported = _connectionPool.init();
if (!_supported) {
Log::warn("ConnectionPoolTest is skipped");
}
}
void TearDown() override {
Super::TearDown();
_connectionPool.shutdown();
}
};
TEST_F(ConnectionPoolTest, testConnectionPoolGetConnection) {
if (!_supported) {
return;
}
ConnectionPool& pool = core::Singleton<ConnectionPool>::getInstance();
ASSERT_TRUE(pool.init());
Connection* c = pool.connection();
ASSERT_TRUE(_connectionPool.init());
Connection* c = _connectionPool.connection();
ASSERT_NE(nullptr, c);
pool.shutdown();
_connectionPool.shutdown();
}
}

View File

@ -4,7 +4,6 @@
#include "AbstractDatabaseTest.h"
#include "TestModel.h"
#include "persistence/ConnectionPool.h"
#include "persistence/DBHandler.h"
#include "engine-config.h"

View File

@ -4,7 +4,6 @@
#include "AbstractDatabaseTest.h"
#include "TestModels.h"
#include "persistence/ConnectionPool.h"
#include "persistence/DBHandler.h"
#include "engine-config.h"

View File

@ -52,7 +52,7 @@ core::AppState Server::onConstruct() {
core::Var::get(cfg::ServerSeed, "1");
core::Var::get(cfg::VoxelMeshSize, "16", core::CV_READONLY);
core::Var::get(cfg::DatabaseMinConnections, "2");
core::Var::get(cfg::DatabaseMaxConnections, "10");
core::Var::get(cfg::DatabaseMaxConnections, "100");
_serverLoop->construct();