Use generic block manager for loading and fix bugs:
- VoxelProviderThread is now VoxelDataLoader and uses generic block manager - Fixed threads locking up, input mutex was not always unlocked due to for loop exiting early - Fixed blocks failing to load, region rejection was using invalid pointer - Fixed block shifting in region rejection, was replacing wrong index in `block_indexes` - Fixed block region rejection missing blocks, was not stepping back the loop after shifting blocks - Added output message to notify of block rejection
This commit is contained in:
parent
ceb7f47fa1
commit
d8cab2d32b
@ -36,6 +36,10 @@ public:
|
||||
OutputBlockData_T data;
|
||||
Vector3i position; // In LOD0 block coordinates
|
||||
unsigned int lod = 0;
|
||||
// True if the block was actually dropped.
|
||||
// Ideally the requester will agree that it doesn't need that block anymore,
|
||||
// but in cases it still does (bad case), it will have to query it again.
|
||||
bool drop_hint = false;
|
||||
};
|
||||
|
||||
struct Input {
|
||||
@ -70,14 +74,17 @@ public:
|
||||
// Creates and starts jobs.
|
||||
// Processors are given as array because you could decide to either re-use the same one,
|
||||
// or have clones depending on them being stateless or not.
|
||||
VoxelBlockThreadManager(unsigned int job_count, unsigned int sync_interval_ms, Processor_T *processors) {
|
||||
VoxelBlockThreadManager(unsigned int job_count, unsigned int sync_interval_ms, Processor_T *processors, bool duplicate_rejection = true) {
|
||||
|
||||
CRASH_COND(job_count < 1);
|
||||
CRASH_COND(job_count >= MAX_JOBS);
|
||||
_job_count = job_count;
|
||||
|
||||
for (unsigned int i = 0; i < MAX_JOBS; ++i) {
|
||||
_jobs[i].job_index = i;
|
||||
JobData &job = _jobs[i];
|
||||
job.job_index = i;
|
||||
job.duplicate_rejection = duplicate_rejection;
|
||||
job.sync_interval_ms = sync_interval_ms;
|
||||
}
|
||||
|
||||
for (unsigned int i = 0; i < _job_count; ++i) {
|
||||
@ -90,7 +97,6 @@ public:
|
||||
job.semaphore = Semaphore::create();
|
||||
job.thread = Thread::create(_thread_func, &job);
|
||||
job.needs_sort = true;
|
||||
job.sync_interval_ms = sync_interval_ms;
|
||||
job.processor = processors[i];
|
||||
}
|
||||
}
|
||||
@ -161,7 +167,6 @@ public:
|
||||
|
||||
// Dispatch equal count of remaining requests.
|
||||
// Remainder is dispatched too until consumed through the first jobs.
|
||||
// Then unlock each job.
|
||||
int base_count = (input.blocks.size() - i) / _job_count;
|
||||
int remainder = (input.blocks.size() - i) % _job_count;
|
||||
for (int job_index = 0; job_index < _job_count && i < input.blocks.size(); ++job_index) {
|
||||
@ -180,6 +185,12 @@ public:
|
||||
replaced_blocks += push_block_requests(job, input.blocks, i, count);
|
||||
i += count;
|
||||
}
|
||||
}
|
||||
|
||||
// Set remaining data on all jobs, unlock inputs and resume
|
||||
for (int job_index = 0; job_index < _job_count; ++job_index) {
|
||||
|
||||
JobData &job = _jobs[job_index];
|
||||
|
||||
if (job.shared_input.priority_position != input.priority_position || input.blocks.size() > 0) {
|
||||
job.needs_sort = true;
|
||||
@ -191,12 +202,6 @@ public:
|
||||
job.shared_input.use_exclusive_region = true;
|
||||
job.shared_input.exclusive_region_extent = input.exclusive_region_extent;
|
||||
}
|
||||
}
|
||||
|
||||
// Unlock inputs and resume
|
||||
for (int job_index = 0; job_index < _job_count; ++job_index) {
|
||||
|
||||
JobData &job = _jobs[job_index];
|
||||
|
||||
bool should_run = !job.shared_input.is_empty();
|
||||
|
||||
@ -266,6 +271,7 @@ private:
|
||||
Thread *thread = nullptr;
|
||||
uint32_t sync_interval_ms = 100;
|
||||
uint32_t job_index = -1;
|
||||
bool duplicate_rejection = false;
|
||||
|
||||
Processor_T processor;
|
||||
};
|
||||
@ -285,22 +291,29 @@ private:
|
||||
CRASH_COND(end > input_blocks.size());
|
||||
|
||||
for (int i = begin; i < end; ++i) {
|
||||
|
||||
const InputBlock &block = input_blocks[i];
|
||||
|
||||
CRASH_COND(block.lod >= MAX_LOD)
|
||||
int *index = job.block_indexes[block.lod].getptr(block.position);
|
||||
|
||||
// TODO When using more than one thread, duplicate rejection is less effective... is it relevant to keep it at all?
|
||||
if (index) {
|
||||
// The block is already in the update queue, replace it
|
||||
++replaced_blocks;
|
||||
job.shared_input.blocks[*index] = block;
|
||||
if (job.duplicate_rejection) {
|
||||
|
||||
int *index = job.block_indexes[block.lod].getptr(block.position);
|
||||
|
||||
// TODO When using more than one thread, duplicate rejection is less effective... is it relevant to keep it at all?
|
||||
if (index) {
|
||||
// The block is already in the update queue, replace it
|
||||
++replaced_blocks;
|
||||
job.shared_input.blocks[*index] = block;
|
||||
|
||||
} else {
|
||||
// Append new block request
|
||||
int j = job.shared_input.blocks.size();
|
||||
job.shared_input.blocks.push_back(block);
|
||||
job.block_indexes[block.lod][block.position] = j;
|
||||
}
|
||||
|
||||
} else {
|
||||
// Append new block request
|
||||
int j = job.shared_input.blocks.size();
|
||||
job.shared_input.blocks.push_back(block);
|
||||
job.block_indexes[block.lod][block.position] = j;
|
||||
}
|
||||
}
|
||||
|
||||
@ -434,8 +447,10 @@ private:
|
||||
|
||||
data.shared_input.blocks.clear();
|
||||
|
||||
for (unsigned int lod_index = 0; lod_index < MAX_LOD; ++lod_index) {
|
||||
data.block_indexes[lod_index].clear();
|
||||
if (data.duplicate_rejection) {
|
||||
for (unsigned int lod_index = 0; lod_index < MAX_LOD; ++lod_index) {
|
||||
data.block_indexes[lod_index].clear();
|
||||
}
|
||||
}
|
||||
|
||||
needs_sort = data.needs_sort;
|
||||
@ -466,13 +481,29 @@ private:
|
||||
|
||||
if (!box.contains(ib.position)) {
|
||||
|
||||
Vector3i shifted_block_pos = data.input.blocks.back().position;
|
||||
// Indicate the caller that we dropped that block.
|
||||
// This can help troubleshoot bugs in some situations.
|
||||
OutputBlock ob;
|
||||
ob.position = ib.position;
|
||||
ob.lod = ib.lod;
|
||||
ob.drop_hint = true;
|
||||
data.output.blocks.push_back(ob);
|
||||
|
||||
data.input.blocks[i] = data.input.blocks.back();
|
||||
// We'll put that block in replacement of the dropped one and pop the last cell,
|
||||
// so we don't need to shift every following blocks
|
||||
const InputBlock &shifted_block = data.input.blocks.back();
|
||||
|
||||
if (data.duplicate_rejection) {
|
||||
data.block_indexes[ib.lod].erase(ib.position);
|
||||
data.block_indexes[shifted_block.lod][shifted_block.position] = i;
|
||||
}
|
||||
|
||||
// Do this last because it invalidates `ib`
|
||||
data.input.blocks[i] = shifted_block;
|
||||
data.input.blocks.pop_back();
|
||||
|
||||
data.block_indexes[ib.lod].erase(ib.position);
|
||||
data.block_indexes[ib.lod][shifted_block_pos] = i;
|
||||
// Move back to redo this index, since we replaced the current block
|
||||
--i;
|
||||
|
||||
//++dropped_count;
|
||||
}
|
||||
|
@ -62,7 +62,7 @@ void VoxelLodTerrain::set_provider(Ref<VoxelProvider> p_provider) {
|
||||
}
|
||||
|
||||
_provider = p_provider;
|
||||
_provider_thread = memnew(VoxelProviderThread(_provider, get_block_size_pow2()));
|
||||
_provider_thread = memnew(VoxelDataLoader(1, _provider, get_block_size_pow2()));
|
||||
|
||||
// The whole map might change, so make all area dirty
|
||||
// TODO Actually, we should regenerate the whole map, not just update all its blocks
|
||||
@ -526,15 +526,15 @@ void VoxelLodTerrain::_process() {
|
||||
_lod_octree.update(viewer_pos, subdivide_action, unsubdivide_action);
|
||||
|
||||
// Ideally, this stat should stabilize to zero.
|
||||
// If not, something in the meshing process prevents LODs to properly show up and should be fixed.
|
||||
// If not, something in block management prevents LODs to properly show up and should be fixed.
|
||||
_stats.blocked_lods = subdivide_action.blocked_count + unsubdivide_action.blocked_count;
|
||||
}
|
||||
|
||||
// Send block loading requests
|
||||
{
|
||||
VoxelProviderThread::InputData input;
|
||||
|
||||
input.priority_block_position = viewer_block_pos;
|
||||
VoxelDataLoader::Input input;
|
||||
input.priority_position = viewer_block_pos;
|
||||
input.priority_direction = viewer_direction;
|
||||
input.use_exclusive_region = true;
|
||||
input.exclusive_region_extent = get_block_region_extent();
|
||||
|
||||
@ -542,10 +542,10 @@ void VoxelLodTerrain::_process() {
|
||||
Lod &lod = _lods[lod_index];
|
||||
|
||||
for (int i = 0; i < lod.blocks_to_load.size(); ++i) {
|
||||
VoxelProviderThread::EmergeInput input_block;
|
||||
input_block.block_position = lod.blocks_to_load[i];
|
||||
VoxelDataLoader::InputBlock input_block;
|
||||
input_block.position = lod.blocks_to_load[i];
|
||||
input_block.lod = lod_index;
|
||||
input.blocks_to_emerge.push_back(input_block);
|
||||
input.blocks.push_back(input_block);
|
||||
}
|
||||
|
||||
lod.blocks_to_load.clear();
|
||||
@ -561,25 +561,25 @@ void VoxelLodTerrain::_process() {
|
||||
// Note: if block loading is too fast, this can cause stutters.
|
||||
// It should only happen on first load, though.
|
||||
{
|
||||
VoxelProviderThread::OutputData output;
|
||||
VoxelDataLoader::Output output;
|
||||
_provider_thread->pop(output);
|
||||
_stats.provider = output.stats;
|
||||
|
||||
//print_line(String("Loaded {0} blocks").format(varray(output.emerged_blocks.size())));
|
||||
|
||||
for (int i = 0; i < output.emerged_blocks.size(); ++i) {
|
||||
for (int i = 0; i < output.blocks.size(); ++i) {
|
||||
|
||||
const VoxelProviderThread::EmergeOutput &eo = output.emerged_blocks[i];
|
||||
const VoxelDataLoader::OutputBlock &ob = output.blocks[i];
|
||||
|
||||
if (eo.lod >= get_lod_count()) {
|
||||
if (ob.lod >= get_lod_count()) {
|
||||
// That block was requested at a time where LOD was higher... drop it
|
||||
++_stats.dropped_block_loads;
|
||||
continue;
|
||||
}
|
||||
|
||||
Lod &lod = _lods[eo.lod];
|
||||
Lod &lod = _lods[ob.lod];
|
||||
|
||||
Set<Vector3i>::Element *E = lod.loading_blocks.find(eo.block_position);
|
||||
Set<Vector3i>::Element *E = lod.loading_blocks.find(ob.position);
|
||||
if (E == nullptr) {
|
||||
// That block was not requested, or is no longer needed. drop it...
|
||||
++_stats.dropped_block_loads;
|
||||
@ -588,7 +588,17 @@ void VoxelLodTerrain::_process() {
|
||||
|
||||
lod.loading_blocks.erase(E);
|
||||
|
||||
if (eo.voxels->get_size() != lod.map->get_block_size()) {
|
||||
if (ob.drop_hint) {
|
||||
// That block was dropped by the data loader thread, but we were still expecting it...
|
||||
// This is not good, because it means the loader is out of sync due to a bug.
|
||||
// We can recover with the removal from `loading_blocks` so it will be re-queried again later...
|
||||
print_line(String("Received a block loading drop while we were still expecting it: lod{0} ({1}, {2}, {3})")
|
||||
.format(varray(ob.lod, ob.position.x, ob.position.y, ob.position.z)));
|
||||
++_stats.dropped_block_loads;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ob.data.voxels_loaded->get_size() != lod.map->get_block_size()) {
|
||||
// Voxel block size is incorrect, drop it
|
||||
ERR_PRINT("Block size obtained from provider is different from expected size");
|
||||
++_stats.dropped_block_loads;
|
||||
@ -596,7 +606,7 @@ void VoxelLodTerrain::_process() {
|
||||
}
|
||||
|
||||
// Store buffer
|
||||
VoxelBlock *block = lod.map->set_block_buffer(eo.block_position, eo.voxels);
|
||||
VoxelBlock *block = lod.map->set_block_buffer(ob.position, ob.data.voxels_loaded);
|
||||
//print_line(String("Adding block {0} at lod {1}").format(varray(eo.block_position.to_vec3(), eo.lod)));
|
||||
// The block will be made visible and meshed only by LodOctree
|
||||
block->set_visible(false);
|
||||
@ -706,6 +716,14 @@ void VoxelLodTerrain::_process() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ob.drop_hint) {
|
||||
// That block is loaded, but its meshing request was dropped.
|
||||
// TODO Not sure what to do in this case, the code sending update queries has to be tweaked
|
||||
print_line("Received a block mesh drop while we were still expecting it");
|
||||
++_stats.dropped_block_meshs;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (block->get_mesh_state() == VoxelBlock::MESH_UPDATE_SENT) {
|
||||
block->set_mesh_state(VoxelBlock::MESH_UP_TO_DATE);
|
||||
}
|
||||
@ -755,7 +773,7 @@ Dictionary VoxelLodTerrain::get_stats() const {
|
||||
process["time_process_lod"] = _stats.time_process_lod;
|
||||
|
||||
Dictionary d;
|
||||
d["provider"] = VoxelProviderThread::to_dictionary(_stats.provider);
|
||||
d["provider"] = VoxelDataLoader::Mgr::to_dictionary(_stats.provider);
|
||||
d["updater"] = VoxelMeshUpdater::Mgr::to_dictionary(_stats.updater);
|
||||
d["process"] = process;
|
||||
d["blocked_lods"] = _stats.blocked_lods;
|
||||
|
@ -47,7 +47,7 @@ public:
|
||||
|
||||
struct Stats {
|
||||
VoxelMeshUpdater::Stats updater;
|
||||
VoxelProviderThread::Stats provider;
|
||||
VoxelDataLoader::Stats provider;
|
||||
uint64_t time_request_blocks_to_load = 0;
|
||||
uint64_t time_process_load_responses = 0;
|
||||
uint64_t time_request_blocks_to_update = 0;
|
||||
@ -93,7 +93,7 @@ private:
|
||||
NodePath _viewer_path;
|
||||
|
||||
Ref<VoxelProvider> _provider;
|
||||
VoxelProviderThread *_provider_thread = nullptr;
|
||||
VoxelDataLoader *_provider_thread = nullptr;
|
||||
VoxelMeshUpdater *_block_updater = nullptr;
|
||||
std::vector<VoxelMeshUpdater::OutputBlock> _blocks_pending_main_thread_update;
|
||||
|
||||
|
@ -1,263 +1,42 @@
|
||||
#include "voxel_provider_thread.h"
|
||||
#include "../providers/voxel_provider.h"
|
||||
#include "../util/utility.h"
|
||||
#include "voxel_map.h"
|
||||
|
||||
#include <core/os/os.h>
|
||||
#include <core/os/semaphore.h>
|
||||
#include <core/os/thread.h>
|
||||
VoxelDataLoader::VoxelDataLoader(int thread_count, Ref<VoxelProvider> provider, int block_size_pow2) {
|
||||
|
||||
VoxelProviderThread::VoxelProviderThread(Ref<VoxelProvider> provider, int block_size_pow2) {
|
||||
Processor processors[Mgr::MAX_JOBS];
|
||||
|
||||
CRASH_COND(provider.is_null());
|
||||
CRASH_COND(block_size_pow2 <= 0);
|
||||
|
||||
_voxel_provider = provider;
|
||||
_block_size_pow2 = block_size_pow2;
|
||||
_input_mutex = Mutex::create();
|
||||
_output_mutex = Mutex::create();
|
||||
_semaphore = Semaphore::create();
|
||||
_thread_exit = false;
|
||||
_thread = Thread::create(_thread_func, this);
|
||||
}
|
||||
|
||||
VoxelProviderThread::~VoxelProviderThread() {
|
||||
|
||||
_thread_exit = true;
|
||||
_semaphore->post();
|
||||
Thread::wait_to_finish(_thread);
|
||||
|
||||
memdelete(_thread);
|
||||
memdelete(_semaphore);
|
||||
memdelete(_input_mutex);
|
||||
memdelete(_output_mutex);
|
||||
}
|
||||
|
||||
void VoxelProviderThread::push(const InputData &input) {
|
||||
|
||||
bool should_run = false;
|
||||
|
||||
{
|
||||
MutexLock lock(_input_mutex);
|
||||
|
||||
// TODO If the same request is sent twice, keep only the latest one
|
||||
|
||||
append_array(_shared_input.blocks_to_emerge, input.blocks_to_emerge);
|
||||
append_array(_shared_input.blocks_to_immerge, input.blocks_to_immerge);
|
||||
_shared_input.priority_block_position = input.priority_block_position;
|
||||
|
||||
if (input.use_exclusive_region) {
|
||||
_shared_input.use_exclusive_region = true;
|
||||
_shared_input.exclusive_region_extent = input.exclusive_region_extent;
|
||||
}
|
||||
|
||||
should_run = !_shared_input.is_empty();
|
||||
}
|
||||
|
||||
// Notify the thread it should run
|
||||
if (should_run) {
|
||||
_semaphore->post();
|
||||
}
|
||||
}
|
||||
|
||||
void VoxelProviderThread::pop(OutputData &out_data) {
|
||||
|
||||
MutexLock lock(_output_mutex);
|
||||
|
||||
out_data.emerged_blocks.append_array(_shared_output);
|
||||
out_data.stats = _shared_stats;
|
||||
_shared_output.clear();
|
||||
}
|
||||
|
||||
void VoxelProviderThread::_thread_func(void *p_self) {
|
||||
VoxelProviderThread *self = reinterpret_cast<VoxelProviderThread *>(p_self);
|
||||
self->thread_func();
|
||||
}
|
||||
|
||||
void VoxelProviderThread::thread_func() {
|
||||
|
||||
while (!_thread_exit) {
|
||||
|
||||
uint32_t sync_interval = 500.0; // milliseconds
|
||||
uint32_t sync_time = OS::get_singleton()->get_ticks_msec() + sync_interval;
|
||||
|
||||
int emerge_index = 0;
|
||||
Stats stats;
|
||||
|
||||
thread_sync(emerge_index, stats, stats.sort_time);
|
||||
|
||||
while (!_input.is_empty() && !_thread_exit) {
|
||||
//print_line(String("Thread runs: {0}").format(varray(_input.blocks_to_emerge.size())));
|
||||
|
||||
// TODO Block saving
|
||||
_input.blocks_to_immerge.clear();
|
||||
|
||||
if (!_input.blocks_to_emerge.empty()) {
|
||||
|
||||
EmergeInput block = _input.blocks_to_emerge[emerge_index];
|
||||
++emerge_index;
|
||||
|
||||
if (emerge_index >= _input.blocks_to_emerge.size()) {
|
||||
_input.blocks_to_emerge.clear();
|
||||
}
|
||||
|
||||
int bs = 1 << _block_size_pow2;
|
||||
Ref<VoxelBuffer> buffer = Ref<VoxelBuffer>(memnew(VoxelBuffer));
|
||||
buffer->create(bs, bs, bs);
|
||||
|
||||
// Query voxel provider
|
||||
Vector3i block_origin_in_voxels = block.block_position * (bs << block.lod);
|
||||
uint64_t time_before = OS::get_singleton()->get_ticks_usec();
|
||||
_voxel_provider->emerge_block(buffer, block_origin_in_voxels, block.lod);
|
||||
uint64_t time_taken = OS::get_singleton()->get_ticks_usec() - time_before;
|
||||
|
||||
// Do some stats
|
||||
if (stats.first) {
|
||||
stats.first = false;
|
||||
stats.min_time = time_taken;
|
||||
stats.max_time = time_taken;
|
||||
} else {
|
||||
if (time_taken < stats.min_time) {
|
||||
stats.min_time = time_taken;
|
||||
}
|
||||
if (time_taken > stats.max_time) {
|
||||
stats.max_time = time_taken;
|
||||
}
|
||||
}
|
||||
|
||||
EmergeOutput eo;
|
||||
eo.origin_in_voxels = block_origin_in_voxels;
|
||||
eo.block_position = block.block_position;
|
||||
eo.voxels = buffer;
|
||||
eo.lod = block.lod;
|
||||
_output.push_back(eo);
|
||||
}
|
||||
|
||||
uint32_t time = OS::get_singleton()->get_ticks_msec();
|
||||
if (time >= sync_time || _input.is_empty()) {
|
||||
|
||||
uint64_t sort_time;
|
||||
thread_sync(emerge_index, stats, sort_time);
|
||||
|
||||
sync_time = OS::get_singleton()->get_ticks_msec() + sync_interval;
|
||||
emerge_index = 0;
|
||||
stats = Stats();
|
||||
stats.sort_time = sort_time;
|
||||
}
|
||||
}
|
||||
|
||||
if (_thread_exit) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait for future wake-up
|
||||
_semaphore->wait();
|
||||
}
|
||||
|
||||
print_line("Thread exits");
|
||||
}
|
||||
|
||||
// Sorts distance to viewer
|
||||
// The closest block will be the first one in the array
|
||||
struct BlockPositionComparator {
|
||||
// In LOD0 block coordinates
|
||||
Vector3i center;
|
||||
inline bool operator()(const VoxelProviderThread::EmergeInput &a, const VoxelProviderThread::EmergeInput &b) const {
|
||||
if (a.lod == b.lod) {
|
||||
int da = (a.block_position * (1 << a.lod)).distance_sq(center);
|
||||
int db = (b.block_position * (1 << b.lod)).distance_sq(center);
|
||||
return da < db;
|
||||
// Note: more than one thread can make sense for generators,
|
||||
// but won't be as useful for file and network streams
|
||||
for (int i = 0; i < thread_count; ++i) {
|
||||
Processor &p = processors[i];
|
||||
p.block_size_pow2 = block_size_pow2;
|
||||
if (i == 0) {
|
||||
p.provider = provider;
|
||||
} else {
|
||||
// Load highest lods first because they are needed for the octree to subdivide
|
||||
return a.lod > b.lod;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
void VoxelProviderThread::thread_sync(int emerge_index, Stats stats, uint64_t &out_sort_time) {
|
||||
|
||||
if (!_input.blocks_to_emerge.empty()) {
|
||||
// Cleanup emerge vector
|
||||
|
||||
if (emerge_index >= _input.blocks_to_emerge.size()) {
|
||||
_input.blocks_to_emerge.clear();
|
||||
|
||||
} else if (emerge_index > 0) {
|
||||
|
||||
// Shift up remaining items since we use a Vector
|
||||
shift_up(_input.blocks_to_emerge, emerge_index);
|
||||
p.provider = provider->duplicate();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
// Get input
|
||||
MutexLock lock(_input_mutex);
|
||||
|
||||
append_array(_input.blocks_to_emerge, _shared_input.blocks_to_emerge);
|
||||
append_array(_input.blocks_to_immerge, _shared_input.blocks_to_immerge);
|
||||
_input.priority_block_position = _shared_input.priority_block_position;
|
||||
|
||||
if (_shared_input.use_exclusive_region) {
|
||||
_input.use_exclusive_region = true;
|
||||
_input.exclusive_region_extent = _shared_input.exclusive_region_extent;
|
||||
}
|
||||
|
||||
_shared_input.blocks_to_emerge.clear();
|
||||
_shared_input.blocks_to_immerge.clear();
|
||||
_shared_input.use_exclusive_region = false;
|
||||
}
|
||||
|
||||
stats.remaining_blocks = _input.blocks_to_emerge.size();
|
||||
|
||||
// print_line(String("VoxelProviderThread: posting {0} blocks, {1} remaining ; cost [{2}..{3}] usec")
|
||||
// .format(varray(_output.size(), _input.blocks_to_emerge.size(), stats.min_time, stats.max_time)));
|
||||
|
||||
{
|
||||
// Post output
|
||||
MutexLock lock(_output_mutex);
|
||||
_shared_output.append_array(_output);
|
||||
_shared_stats = stats;
|
||||
_output.clear();
|
||||
}
|
||||
|
||||
// Cancel blocks outside exclusive region
|
||||
//int dropped_count = 0;
|
||||
if (_input.use_exclusive_region) {
|
||||
for (int i = 0; i < _input.blocks_to_emerge.size(); ++i) {
|
||||
const EmergeInput &ei = _input.blocks_to_emerge[i];
|
||||
|
||||
Rect3i box = Rect3i::from_center_extents(_input.priority_block_position >> ei.lod, Vector3i(_input.exclusive_region_extent));
|
||||
|
||||
if (!box.contains(ei.block_position)) {
|
||||
_input.blocks_to_emerge[i] = _input.blocks_to_emerge.back();
|
||||
_input.blocks_to_emerge.pop_back();
|
||||
//++dropped_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if (dropped_count > 0) {
|
||||
// print_line(String("Dropped {0} blocks to load from thread").format(varray(dropped_count)));
|
||||
// }
|
||||
|
||||
uint64_t time_before = OS::get_singleton()->get_ticks_usec();
|
||||
|
||||
if (!_input.blocks_to_emerge.empty()) {
|
||||
// Re-sort priority
|
||||
|
||||
SortArray<EmergeInput, BlockPositionComparator> sorter;
|
||||
sorter.compare.center = _input.priority_block_position;
|
||||
sorter.sort(_input.blocks_to_emerge.data(), _input.blocks_to_emerge.size());
|
||||
}
|
||||
|
||||
out_sort_time = OS::get_singleton()->get_ticks_usec() - time_before;
|
||||
// TODO Re-enable duplicate rejection, was turned off to investigate some bugs
|
||||
_mgr = memnew(Mgr(thread_count, 500, processors, false));
|
||||
}
|
||||
|
||||
Dictionary VoxelProviderThread::to_dictionary(const Stats &stats) {
|
||||
Dictionary d;
|
||||
d["min_time"] = stats.min_time;
|
||||
d["max_time"] = stats.max_time;
|
||||
d["remaining_blocks"] = stats.remaining_blocks;
|
||||
d["sort_time"] = stats.sort_time;
|
||||
return d;
|
||||
VoxelDataLoader::~VoxelDataLoader() {
|
||||
if (_mgr) {
|
||||
memdelete(_mgr);
|
||||
}
|
||||
}
|
||||
|
||||
void VoxelDataLoader::Processor::process_block(const InputBlockData &input, OutputBlockData &output, Vector3i block_position, unsigned int lod) {
|
||||
|
||||
int bs = 1 << block_size_pow2;
|
||||
Ref<VoxelBuffer> buffer;
|
||||
buffer.instance();
|
||||
buffer->create(bs, bs, bs);
|
||||
|
||||
Vector3i block_origin_in_voxels = block_position * (bs << lod);
|
||||
provider->emerge_block(buffer, block_origin_in_voxels, lod);
|
||||
|
||||
output.voxels_loaded = buffer;
|
||||
}
|
||||
|
@ -1,90 +1,44 @@
|
||||
#ifndef VOXEL_PROVIDER_THREAD_H
|
||||
#define VOXEL_PROVIDER_THREAD_H
|
||||
#ifndef VOXEL_DATA_LOADER_H
|
||||
#define VOXEL_DATA_LOADER_H
|
||||
|
||||
#include "../math/rect3i.h"
|
||||
#include <core/resource.h>
|
||||
#include <vector>
|
||||
#include "block_thread_manager.h"
|
||||
|
||||
class VoxelProvider;
|
||||
class VoxelBuffer;
|
||||
class Thread;
|
||||
class Semaphore;
|
||||
|
||||
class VoxelProviderThread {
|
||||
// TODO Rename file
|
||||
class VoxelDataLoader {
|
||||
public:
|
||||
struct ImmergeInput {
|
||||
Vector3i origin;
|
||||
Ref<VoxelBuffer> voxels;
|
||||
int lod = 0;
|
||||
struct InputBlockData {
|
||||
Ref<VoxelBuffer> voxels_to_save;
|
||||
};
|
||||
|
||||
struct EmergeInput {
|
||||
Vector3i block_position;
|
||||
int lod = 0;
|
||||
struct OutputBlockData {
|
||||
Ref<VoxelBuffer> voxels_loaded;
|
||||
};
|
||||
|
||||
struct InputData {
|
||||
std::vector<ImmergeInput> blocks_to_immerge;
|
||||
std::vector<EmergeInput> blocks_to_emerge;
|
||||
Vector3i priority_block_position; // In LOD0 block coordinates
|
||||
int exclusive_region_extent = 0;
|
||||
bool use_exclusive_region = false;
|
||||
struct Processor {
|
||||
void process_block(const InputBlockData &input, OutputBlockData &output, Vector3i block_position, unsigned int lod);
|
||||
|
||||
inline bool is_empty() {
|
||||
return blocks_to_emerge.empty() && blocks_to_immerge.empty();
|
||||
}
|
||||
Ref<VoxelProvider> provider;
|
||||
int block_size_pow2 = 0;
|
||||
};
|
||||
|
||||
struct EmergeOutput {
|
||||
Ref<VoxelBuffer> voxels;
|
||||
Vector3i origin_in_voxels; // TODO Remove this, redundant now
|
||||
Vector3i block_position;
|
||||
int lod = 0;
|
||||
};
|
||||
typedef VoxelBlockThreadManager<InputBlockData, OutputBlockData, Processor> Mgr;
|
||||
typedef Mgr::InputBlock InputBlock;
|
||||
typedef Mgr::OutputBlock OutputBlock;
|
||||
typedef Mgr::Input Input;
|
||||
typedef Mgr::Output Output;
|
||||
typedef Mgr::Stats Stats;
|
||||
|
||||
struct Stats {
|
||||
bool first = true;
|
||||
uint64_t min_time = 0;
|
||||
uint64_t max_time = 0;
|
||||
int remaining_blocks = 0;
|
||||
uint64_t sort_time = 0;
|
||||
};
|
||||
VoxelDataLoader(int thread_count, Ref<VoxelProvider> provider, int block_size_pow2);
|
||||
~VoxelDataLoader();
|
||||
|
||||
struct OutputData {
|
||||
Vector<EmergeOutput> emerged_blocks;
|
||||
Stats stats;
|
||||
};
|
||||
|
||||
VoxelProviderThread(Ref<VoxelProvider> provider, int block_size_pow2);
|
||||
~VoxelProviderThread();
|
||||
|
||||
void push(const InputData &input);
|
||||
void pop(OutputData &out_data);
|
||||
|
||||
static Dictionary to_dictionary(const Stats &stats);
|
||||
void push(const Input &input) { _mgr->push(input); }
|
||||
void pop(Output &output) { _mgr->pop(output); }
|
||||
|
||||
private:
|
||||
static void _thread_func(void *p_self);
|
||||
|
||||
void thread_func();
|
||||
void thread_sync(int emerge_index, Stats stats, uint64_t &out_sort_time);
|
||||
|
||||
private:
|
||||
InputData _shared_input;
|
||||
Mutex *_input_mutex;
|
||||
|
||||
Vector<EmergeOutput> _shared_output;
|
||||
Stats _shared_stats;
|
||||
Mutex *_output_mutex;
|
||||
|
||||
Semaphore *_semaphore;
|
||||
bool _thread_exit;
|
||||
Thread *_thread;
|
||||
InputData _input;
|
||||
Vector<EmergeOutput> _output;
|
||||
int _block_size_pow2;
|
||||
|
||||
Ref<VoxelProvider> _voxel_provider;
|
||||
Mgr *_mgr = nullptr;
|
||||
};
|
||||
|
||||
#endif // VOXEL_PROVIDER_THREAD_H
|
||||
#endif // VOXEL_DATA_LOADER_H
|
||||
|
@ -77,10 +77,7 @@ void VoxelTerrain::set_provider(Ref<VoxelProvider> provider) {
|
||||
}
|
||||
|
||||
_provider = provider;
|
||||
_provider_thread = memnew(VoxelProviderThread(_provider, _map->get_block_size_pow2()));
|
||||
// Ref<VoxelProviderTest> test;
|
||||
// test.instance();
|
||||
// _provider_thread = memnew(VoxelProviderThread(test, _map->get_block_size_pow2()));
|
||||
_provider_thread = memnew(VoxelDataLoader(1, _provider, _map->get_block_size_pow2()));
|
||||
|
||||
// The whole map might change, so make all area dirty
|
||||
// TODO Actually, we should regenerate the whole map, not just update all its blocks
|
||||
@ -218,7 +215,7 @@ void VoxelTerrain::immerge_block(Vector3i bpos) {
|
||||
|
||||
Dictionary VoxelTerrain::get_statistics() const {
|
||||
|
||||
Dictionary provider = VoxelProviderThread::to_dictionary(_stats.provider);
|
||||
Dictionary provider = VoxelDataLoader::Mgr::to_dictionary(_stats.provider);
|
||||
provider["dropped_blocks"] = _stats.dropped_provider_blocks;
|
||||
|
||||
Dictionary updater = VoxelMeshUpdater::Mgr::to_dictionary(_stats.updater);
|
||||
@ -607,15 +604,16 @@ void VoxelTerrain::_process() {
|
||||
|
||||
// Send block loading requests
|
||||
{
|
||||
VoxelProviderThread::InputData input;
|
||||
VoxelDataLoader::Input input;
|
||||
|
||||
input.priority_block_position = viewer_block_pos;
|
||||
input.priority_position = viewer_block_pos;
|
||||
input.priority_direction = viewer_direction;
|
||||
|
||||
for (int i = 0; i < _blocks_pending_load.size(); ++i) {
|
||||
VoxelProviderThread::EmergeInput input_block;
|
||||
input_block.block_position = _blocks_pending_load[i];
|
||||
VoxelDataLoader::InputBlock input_block;
|
||||
input_block.position = _blocks_pending_load[i];
|
||||
input_block.lod = 0;
|
||||
input.blocks_to_emerge.push_back(input_block);
|
||||
input.blocks.push_back(input_block);
|
||||
}
|
||||
|
||||
//print_line(String("Sending {0} block requests").format(varray(input.blocks_to_emerge.size())));
|
||||
@ -633,17 +631,17 @@ void VoxelTerrain::_process() {
|
||||
const unsigned int bs = _map->get_block_size();
|
||||
const Vector3i block_size(bs, bs, bs);
|
||||
|
||||
VoxelProviderThread::OutputData output;
|
||||
VoxelDataLoader::Output output;
|
||||
_provider_thread->pop(output);
|
||||
//print_line(String("Receiving {0} blocks").format(varray(output.emerged_blocks.size())));
|
||||
|
||||
_stats.provider = output.stats;
|
||||
_stats.dropped_provider_blocks = 0;
|
||||
|
||||
for (int i = 0; i < output.emerged_blocks.size(); ++i) {
|
||||
for (int i = 0; i < output.blocks.size(); ++i) {
|
||||
|
||||
const VoxelProviderThread::EmergeOutput &o = output.emerged_blocks[i];
|
||||
Vector3i block_pos = _map->voxel_to_block(o.origin_in_voxels);
|
||||
const VoxelDataLoader::OutputBlock &ob = output.blocks[i];
|
||||
Vector3i block_pos = ob.position;
|
||||
|
||||
{
|
||||
VoxelTerrain::BlockDirtyState *state = _dirty_blocks.getptr(block_pos);
|
||||
@ -654,15 +652,25 @@ void VoxelTerrain::_process() {
|
||||
}
|
||||
}
|
||||
|
||||
if (ob.drop_hint) {
|
||||
// That block was dropped by the data loader thread, but we were still expecting it...
|
||||
// This is not good, because it means the loader is out of sync due to a bug.
|
||||
// TODO Implement recovery like `VoxelLodTerrain`?
|
||||
print_line(String("Received a block loading drop while we were still expecting it: lod{0} ({1}, {2}, {3})")
|
||||
.format(varray(ob.lod, ob.position.x, ob.position.y, ob.position.z)));
|
||||
++_stats.dropped_provider_blocks;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check return
|
||||
// TODO Shouldn't halt execution though, as it can bring the map in an invalid state!
|
||||
ERR_FAIL_COND(o.voxels->get_size() != block_size);
|
||||
ERR_FAIL_COND(ob.data.voxels_loaded->get_size() != block_size);
|
||||
|
||||
// TODO Discard blocks out of range
|
||||
|
||||
// Store buffer
|
||||
bool update_neighbors = !_map->has_block(block_pos);
|
||||
_map->set_block_buffer(block_pos, o.voxels);
|
||||
_map->set_block_buffer(block_pos, ob.data.voxels_loaded);
|
||||
|
||||
// Trigger mesh updates
|
||||
if (update_neighbors) {
|
||||
@ -812,6 +820,14 @@ void VoxelTerrain::_process() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ob.drop_hint) {
|
||||
// That block is loaded, but its meshing request was dropped.
|
||||
// TODO Not sure what to do in this case, the code sending update queries has to be tweaked
|
||||
print_line("Received a block mesh drop while we were still expecting it");
|
||||
++_stats.dropped_updater_blocks;
|
||||
continue;
|
||||
}
|
||||
|
||||
Ref<ArrayMesh> mesh;
|
||||
mesh.instance();
|
||||
|
||||
|
@ -62,7 +62,7 @@ public:
|
||||
|
||||
struct Stats {
|
||||
VoxelMeshUpdater::Stats updater;
|
||||
VoxelProviderThread::Stats provider;
|
||||
VoxelDataLoader::Stats provider;
|
||||
uint32_t mesh_alloc_time;
|
||||
int updated_blocks;
|
||||
int dropped_provider_blocks;
|
||||
@ -135,7 +135,7 @@ private:
|
||||
Vector<VoxelMeshUpdater::OutputBlock> _blocks_pending_main_thread_update;
|
||||
|
||||
Ref<VoxelProvider> _provider;
|
||||
VoxelProviderThread *_provider_thread;
|
||||
VoxelDataLoader *_provider_thread;
|
||||
|
||||
Ref<VoxelLibrary> _library;
|
||||
VoxelMeshUpdater *_block_updater;
|
||||
|
Loading…
x
Reference in New Issue
Block a user