Clean up ClientInterface locking

This commit is contained in:
Jude Melton-Houghton 2022-02-01 20:49:19 -05:00 committed by sfan5
parent d387e9b6d3
commit 1c73902005
3 changed files with 127 additions and 140 deletions

View File

@ -27,6 +27,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "network/networkprotocol.h"
#include "network/address.h"
#include "porting.h"
#include "threading/mutex_auto_lock.h"
#include <list>
#include <vector>
@ -503,9 +504,13 @@ public:
static std::string state2Name(ClientState state);
protected:
//TODO find way to avoid this functions
void lock() { m_clients_mutex.lock(); }
void unlock() { m_clients_mutex.unlock(); }
class AutoLock {
public:
AutoLock(ClientInterface &iface): m_lock(iface.m_clients_mutex) {}
private:
RecursiveMutexAutoLock m_lock;
};
RemoteClientMap& getClientList() { return m_clients; }

View File

@ -452,7 +452,7 @@ void Server::handleCommand_GotBlocks(NetworkPacket* pkt)
("GOTBLOCKS length is too short");
}
m_clients.lock();
ClientInterface::AutoLock lock(m_clients);
RemoteClient *client = m_clients.lockedGetClientNoEx(pkt->getPeerId());
for (u16 i = 0; i < count; i++) {
@ -460,7 +460,6 @@ void Server::handleCommand_GotBlocks(NetworkPacket* pkt)
*pkt >> p;
client->GotBlock(p);
}
m_clients.unlock();
}
void Server::process_PlayerPos(RemotePlayer *player, PlayerSAO *playersao,

View File

@ -723,28 +723,29 @@ void Server::AsyncRunStep(bool initial_step)
//infostream<<"Server: Checking added and deleted active objects"<<std::endl;
MutexAutoLock envlock(m_env_mutex);
m_clients.lock();
const RemoteClientMap &clients = m_clients.getClientList();
ScopeProfiler sp(g_profiler, "Server: update objects within range");
{
ClientInterface::AutoLock clientlock(m_clients);
const RemoteClientMap &clients = m_clients.getClientList();
ScopeProfiler sp(g_profiler, "Server: update objects within range");
m_player_gauge->set(clients.size());
for (const auto &client_it : clients) {
RemoteClient *client = client_it.second;
m_player_gauge->set(clients.size());
for (const auto &client_it : clients) {
RemoteClient *client = client_it.second;
if (client->getState() < CS_DefinitionsSent)
continue;
if (client->getState() < CS_DefinitionsSent)
continue;
// This can happen if the client times out somehow
if (!m_env->getPlayer(client->peer_id))
continue;
// This can happen if the client times out somehow
if (!m_env->getPlayer(client->peer_id))
continue;
PlayerSAO *playersao = getPlayerSAO(client->peer_id);
if (!playersao)
continue;
PlayerSAO *playersao = getPlayerSAO(client->peer_id);
if (!playersao)
continue;
SendActiveObjectRemoveAdd(client, playersao);
SendActiveObjectRemoveAdd(client, playersao);
}
}
m_clients.unlock();
// Write changes to the mod storage
m_mod_storage_save_timer -= dtime;
@ -787,63 +788,64 @@ void Server::AsyncRunStep(bool initial_step)
m_aom_buffer_counter->increment(aom_count);
m_clients.lock();
const RemoteClientMap &clients = m_clients.getClientList();
// Route data to every client
std::string reliable_data, unreliable_data;
for (const auto &client_it : clients) {
reliable_data.clear();
unreliable_data.clear();
RemoteClient *client = client_it.second;
PlayerSAO *player = getPlayerSAO(client->peer_id);
// Go through all objects in message buffer
for (const auto &buffered_message : buffered_messages) {
// If object does not exist or is not known by client, skip it
u16 id = buffered_message.first;
ServerActiveObject *sao = m_env->getActiveObject(id);
if (!sao || client->m_known_objects.find(id) == client->m_known_objects.end())
continue;
{
ClientInterface::AutoLock clientlock(m_clients);
const RemoteClientMap &clients = m_clients.getClientList();
// Route data to every client
std::string reliable_data, unreliable_data;
for (const auto &client_it : clients) {
reliable_data.clear();
unreliable_data.clear();
RemoteClient *client = client_it.second;
PlayerSAO *player = getPlayerSAO(client->peer_id);
// Go through all objects in message buffer
for (const auto &buffered_message : buffered_messages) {
// If object does not exist or is not known by client, skip it
u16 id = buffered_message.first;
ServerActiveObject *sao = m_env->getActiveObject(id);
if (!sao || client->m_known_objects.find(id) == client->m_known_objects.end())
continue;
// Get message list of object
std::vector<ActiveObjectMessage>* list = buffered_message.second;
// Go through every message
for (const ActiveObjectMessage &aom : *list) {
// Send position updates to players who do not see the attachment
if (aom.datastring[0] == AO_CMD_UPDATE_POSITION) {
if (sao->getId() == player->getId())
continue;
// Get message list of object
std::vector<ActiveObjectMessage>* list = buffered_message.second;
// Go through every message
for (const ActiveObjectMessage &aom : *list) {
// Send position updates to players who do not see the attachment
if (aom.datastring[0] == AO_CMD_UPDATE_POSITION) {
if (sao->getId() == player->getId())
continue;
// Do not send position updates for attached players
// as long the parent is known to the client
ServerActiveObject *parent = sao->getParent();
if (parent && client->m_known_objects.find(parent->getId()) !=
client->m_known_objects.end())
continue;
// Do not send position updates for attached players
// as long the parent is known to the client
ServerActiveObject *parent = sao->getParent();
if (parent && client->m_known_objects.find(parent->getId()) !=
client->m_known_objects.end())
continue;
}
// Add full new data to appropriate buffer
std::string &buffer = aom.reliable ? reliable_data : unreliable_data;
char idbuf[2];
writeU16((u8*) idbuf, aom.id);
// u16 id
// std::string data
buffer.append(idbuf, sizeof(idbuf));
buffer.append(serializeString16(aom.datastring));
}
}
/*
reliable_data and unreliable_data are now ready.
Send them.
*/
if (!reliable_data.empty()) {
SendActiveObjectMessages(client->peer_id, reliable_data);
}
// Add full new data to appropriate buffer
std::string &buffer = aom.reliable ? reliable_data : unreliable_data;
char idbuf[2];
writeU16((u8*) idbuf, aom.id);
// u16 id
// std::string data
buffer.append(idbuf, sizeof(idbuf));
buffer.append(serializeString16(aom.datastring));
if (!unreliable_data.empty()) {
SendActiveObjectMessages(client->peer_id, unreliable_data, false);
}
}
/*
reliable_data and unreliable_data are now ready.
Send them.
*/
if (!reliable_data.empty()) {
SendActiveObjectMessages(client->peer_id, reliable_data);
}
if (!unreliable_data.empty()) {
SendActiveObjectMessages(client->peer_id, unreliable_data, false);
}
}
m_clients.unlock();
// Clear buffered_messages
for (auto &buffered_message : buffered_messages) {
@ -1050,18 +1052,14 @@ PlayerSAO* Server::StageTwoClientInit(session_t peer_id)
{
std::string playername;
PlayerSAO *playersao = NULL;
m_clients.lock();
try {
{
ClientInterface::AutoLock clientlock(m_clients);
RemoteClient* client = m_clients.lockedGetClientNoEx(peer_id, CS_InitDone);
if (client) {
playername = client->getName();
playersao = emergePlayer(playername.c_str(), peer_id, client->net_proto_version);
}
} catch (std::exception &e) {
m_clients.unlock();
throw;
}
m_clients.unlock();
RemotePlayer *player = m_env->getPlayer(playername.c_str());
@ -1233,13 +1231,12 @@ void Server::onMapEditEvent(const MapEditEvent &event)
void Server::SetBlocksNotSent(std::map<v3s16, MapBlock *>& block)
{
std::vector<session_t> clients = m_clients.getClientIDs();
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
// Set the modified blocks unsent for all the clients
for (const session_t client_id : clients) {
if (RemoteClient *client = m_clients.lockedGetClientNoEx(client_id))
client->SetBlocksNotSent(block);
}
m_clients.unlock();
}
void Server::peerAdded(con::Peer *peer)
@ -1267,13 +1264,11 @@ bool Server::getClientConInfo(session_t peer_id, con::rtt_stat_type type, float*
bool Server::getClientInfo(session_t peer_id, ClientInfo &ret)
{
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
RemoteClient* client = m_clients.lockedGetClientNoEx(peer_id, CS_Invalid);
if (!client) {
m_clients.unlock();
if (!client)
return false;
}
ret.state = client->getState();
ret.addr = client->getAddress();
@ -1288,8 +1283,6 @@ bool Server::getClientInfo(session_t peer_id, ClientInfo &ret)
ret.lang_code = client->getLangCode();
m_clients.unlock();
return true;
}
@ -2218,7 +2211,7 @@ void Server::sendRemoveNode(v3s16 p, std::unordered_set<u16> *far_players,
pkt << p;
std::vector<session_t> clients = m_clients.getClientIDs();
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
for (session_t client_id : clients) {
RemoteClient *client = m_clients.lockedGetClientNoEx(client_id);
@ -2241,8 +2234,6 @@ void Server::sendRemoveNode(v3s16 p, std::unordered_set<u16> *far_players,
// Send as reliable
m_clients.send(client_id, 0, &pkt, true);
}
m_clients.unlock();
}
void Server::sendAddNode(v3s16 p, MapNode n, std::unordered_set<u16> *far_players,
@ -2257,7 +2248,7 @@ void Server::sendAddNode(v3s16 p, MapNode n, std::unordered_set<u16> *far_player
<< (u8) (remove_metadata ? 0 : 1);
std::vector<session_t> clients = m_clients.getClientIDs();
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
for (session_t client_id : clients) {
RemoteClient *client = m_clients.lockedGetClientNoEx(client_id);
@ -2280,8 +2271,6 @@ void Server::sendAddNode(v3s16 p, MapNode n, std::unordered_set<u16> *far_player
// Send as reliable
m_clients.send(client_id, 0, &pkt, true);
}
m_clients.unlock();
}
void Server::sendMetadataChanged(const std::list<v3s16> &meta_updates, float far_d_nodes)
@ -2290,7 +2279,7 @@ void Server::sendMetadataChanged(const std::list<v3s16> &meta_updates, float far
NodeMetadataList meta_updates_list(false);
std::vector<session_t> clients = m_clients.getClientIDs();
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
for (session_t i : clients) {
RemoteClient *client = m_clients.lockedGetClientNoEx(i);
@ -2331,8 +2320,6 @@ void Server::sendMetadataChanged(const std::list<v3s16> &meta_updates, float far
meta_updates_list.clear();
}
m_clients.unlock();
}
void Server::SendBlockNoLock(session_t peer_id, MapBlock *block, u8 ver,
@ -2368,7 +2355,7 @@ void Server::SendBlocks(float dtime)
std::vector<session_t> clients = m_clients.getClientIDs();
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
for (const session_t client_id : clients) {
RemoteClient *client = m_clients.lockedGetClientNoEx(client_id, CS_Active);
@ -2378,7 +2365,6 @@ void Server::SendBlocks(float dtime)
total_sending += client->getSendingCount();
client->GetNextBlocks(m_env,m_emerge, dtime, queue);
}
m_clients.unlock();
}
// Sort.
@ -2386,7 +2372,7 @@ void Server::SendBlocks(float dtime)
// Lowest is most important.
std::sort(queue.begin(), queue.end());
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
// Maximal total count calculation
// The per-client block sends is halved with the maximal online users
@ -2415,7 +2401,6 @@ void Server::SendBlocks(float dtime)
client->SentBlock(block_to_send.pos);
total_sending++;
}
m_clients.unlock();
}
bool Server::SendBlock(session_t peer_id, const v3s16 &blockpos)
@ -2424,15 +2409,12 @@ bool Server::SendBlock(session_t peer_id, const v3s16 &blockpos)
if (!block)
return false;
m_clients.lock();
ClientInterface::AutoLock clientlock(m_clients);
RemoteClient *client = m_clients.lockedGetClientNoEx(peer_id, CS_Active);
if (!client || client->isBlockSent(blockpos)) {
m_clients.unlock();
if (!client || client->isBlockSent(blockpos))
return false;
}
SendBlockNoLock(peer_id, block, client->serialization_version,
client->net_proto_version);
m_clients.unlock();
return true;
}
@ -3534,48 +3516,49 @@ bool Server::dynamicAddMedia(std::string filepath,
legacy_pkt.putLongString(filedata);
std::unordered_set<session_t> delivered, waiting;
m_clients.lock();
for (auto &pair : m_clients.getClientList()) {
if (pair.second->getState() == CS_DefinitionsSent && !ephemeral) {
/*
If a client is in the DefinitionsSent state it is too late to
transfer the file via sendMediaAnnouncement() but at the same
time the client cannot accept a media push yet.
Short of artificially delaying the joining process there is no
way for the server to resolve this so we (currently) opt not to.
*/
warningstream << "The media \"" << filename << "\" (dynamic) could "
"not be delivered to " << pair.second->getName()
<< " due to a race condition." << std::endl;
continue;
}
if (pair.second->getState() < CS_Active)
continue;
{
ClientInterface::AutoLock clientlock(m_clients);
for (auto &pair : m_clients.getClientList()) {
if (pair.second->getState() == CS_DefinitionsSent && !ephemeral) {
/*
If a client is in the DefinitionsSent state it is too late to
transfer the file via sendMediaAnnouncement() but at the same
time the client cannot accept a media push yet.
Short of artificially delaying the joining process there is no
way for the server to resolve this so we (currently) opt not to.
*/
warningstream << "The media \"" << filename << "\" (dynamic) could "
"not be delivered to " << pair.second->getName()
<< " due to a race condition." << std::endl;
continue;
}
if (pair.second->getState() < CS_Active)
continue;
const auto proto_ver = pair.second->net_proto_version;
if (proto_ver < 39)
continue;
const auto proto_ver = pair.second->net_proto_version;
if (proto_ver < 39)
continue;
const session_t peer_id = pair.second->peer_id;
if (!to_player.empty() && getPlayerName(peer_id) != to_player)
continue;
const session_t peer_id = pair.second->peer_id;
if (!to_player.empty() && getPlayerName(peer_id) != to_player)
continue;
if (proto_ver < 40) {
delivered.emplace(peer_id);
/*
The network layer only guarantees ordered delivery inside a channel.
Since the very next packet could be one that uses the media, we have
to push the media over ALL channels to ensure it is processed before
it is used. In practice this means channels 1 and 0.
*/
m_clients.send(peer_id, 1, &legacy_pkt, true);
m_clients.send(peer_id, 0, &legacy_pkt, true);
} else {
waiting.emplace(peer_id);
Send(peer_id, &pkt);
if (proto_ver < 40) {
delivered.emplace(peer_id);
/*
The network layer only guarantees ordered delivery inside a channel.
Since the very next packet could be one that uses the media, we have
to push the media over ALL channels to ensure it is processed before
it is used. In practice this means channels 1 and 0.
*/
m_clients.send(peer_id, 1, &legacy_pkt, true);
m_clients.send(peer_id, 0, &legacy_pkt, true);
} else {
waiting.emplace(peer_id);
Send(peer_id, &pkt);
}
}
}
m_clients.unlock();
// Run callback for players that already had the file delivered (legacy-only)
for (session_t peer_id : delivered) {