Removed old threading code
parent
a4611031e1
commit
0e16193f92
|
@ -1,610 +0,0 @@
|
|||
#ifndef VOXEL_BLOCK_THREAD_MANAGER_H
|
||||
#define VOXEL_BLOCK_THREAD_MANAGER_H
|
||||
|
||||
#include "../math/rect3i.h"
|
||||
#include "../math/vector3i.h"
|
||||
#include "../util/array_slice.h"
|
||||
#include "../util/fixed_array.h"
|
||||
#include "../util/macros.h"
|
||||
#include "../util/utility.h"
|
||||
#include "../voxel_constants.h"
|
||||
|
||||
#include <core/os/os.h>
|
||||
#include <core/os/semaphore.h>
|
||||
#include <core/os/thread.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <vector>
|
||||
|
||||
// Base structure for an asynchronous block processing manager using threads.
|
||||
// It is the same for block loading and rendering, hence made a generic one.
|
||||
// - Push requests and pop requests in batch
|
||||
// - One or more threads can be used
|
||||
// - Minimizes sync points
|
||||
// - Orders blocks to process the closest ones first
|
||||
// - Merges duplicate requests
|
||||
// - Cancels requests that become out of range
|
||||
// - Takes some stats
|
||||
template <typename InputBlockData_T, typename OutputBlockData_T>
|
||||
class VoxelBlockThreadManager {
|
||||
public:
|
||||
static const int MAX_JOBS = 8; // Arbitrary, should be enough
|
||||
|
||||
// Specialization must be copyable
|
||||
struct InputBlock {
|
||||
InputBlockData_T data;
|
||||
Vector3i position; // In LOD-relative block coordinates
|
||||
uint8_t lod = 0;
|
||||
bool can_be_discarded = true; // If false, will always be processed, even if the thread is told to exit
|
||||
float sort_heuristic = 0;
|
||||
};
|
||||
|
||||
// Specialization must be copyable
|
||||
struct OutputBlock {
|
||||
OutputBlockData_T data;
|
||||
Vector3i position; // In LOD-relative block coordinates
|
||||
uint8_t lod = 0;
|
||||
// True if the block was actually dropped.
|
||||
// Ideally the requester will agree that it doesn't need that block anymore,
|
||||
// but in cases it still does (bad case), it will have to query it again.
|
||||
bool drop_hint = false;
|
||||
};
|
||||
|
||||
struct Input {
|
||||
std::vector<InputBlock> blocks;
|
||||
Vector3i priority_position; // In LOD0 block coordinates
|
||||
Vector3 priority_direction; // Where the viewer is looking at
|
||||
int exclusive_region_extent = 0; // Region beyond which the processor is allowed to discard requests
|
||||
int exclusive_region_max_lod = VoxelConstants::MAX_LOD; // LOD beyond which exclusive region won't be used
|
||||
bool use_exclusive_region = false;
|
||||
int max_lod_index = 0;
|
||||
|
||||
bool is_empty() const {
|
||||
return blocks.empty();
|
||||
}
|
||||
};
|
||||
|
||||
struct ProcessorStats {
|
||||
int file_openings = 0;
|
||||
int time_spent_opening_files = 0;
|
||||
};
|
||||
|
||||
struct Stats {
|
||||
// Generic stats
|
||||
bool first = true;
|
||||
uint64_t min_time = 0;
|
||||
uint64_t max_time = 0;
|
||||
uint64_t sorting_time = 0;
|
||||
uint32_t remaining_blocks[MAX_JOBS];
|
||||
uint32_t thread_count = 0;
|
||||
uint32_t dropped_count = 0;
|
||||
// Processor-specific
|
||||
ProcessorStats processor;
|
||||
|
||||
Stats() {
|
||||
for (int i = 0; i < MAX_JOBS; ++i) {
|
||||
remaining_blocks[i] = 0;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct Output {
|
||||
Vector<OutputBlock> blocks;
|
||||
Stats stats;
|
||||
};
|
||||
|
||||
typedef std::function<void(ArraySlice<InputBlock>, ArraySlice<OutputBlock>, ProcessorStats &)> BlockProcessingFunc;
|
||||
|
||||
// TODO Make job count dynamic, don't start threads in constructor
|
||||
|
||||
// Creates and starts jobs.
|
||||
// Processors are given as array because you could decide to either re-use the same one,
|
||||
// or have clones depending on them being stateless or not.
|
||||
VoxelBlockThreadManager(
|
||||
unsigned int job_count,
|
||||
unsigned int sync_interval_ms,
|
||||
ArraySlice<BlockProcessingFunc> processors,
|
||||
bool duplicate_rejection = true,
|
||||
unsigned int batch_count = 1) {
|
||||
|
||||
CRASH_COND(job_count < 1);
|
||||
CRASH_COND(job_count >= MAX_JOBS);
|
||||
_job_count = job_count;
|
||||
|
||||
CRASH_COND(batch_count == 0);
|
||||
|
||||
for (unsigned int i = 0; i < MAX_JOBS; ++i) {
|
||||
JobData &job = _jobs[i];
|
||||
job.job_index = i;
|
||||
job.duplicate_rejection = duplicate_rejection;
|
||||
job.sync_interval_ms = sync_interval_ms;
|
||||
job.batch_count = batch_count;
|
||||
}
|
||||
|
||||
for (unsigned int i = 0; i < _job_count; ++i) {
|
||||
JobData &job = _jobs[i];
|
||||
CRASH_COND(job.thread != nullptr);
|
||||
|
||||
job.input_mutex = Mutex::create();
|
||||
job.output_mutex = Mutex::create();
|
||||
job.semaphore = Semaphore::create();
|
||||
job.thread = Thread::create(_thread_func, &job);
|
||||
job.needs_sort = true;
|
||||
job.processor = processors[i];
|
||||
}
|
||||
}
|
||||
|
||||
~VoxelBlockThreadManager() {
|
||||
for (unsigned int i = 0; i < _job_count; ++i) {
|
||||
JobData &job = _jobs[i];
|
||||
job.thread_exit = true;
|
||||
job.semaphore->post();
|
||||
}
|
||||
|
||||
for (unsigned int i = 0; i < _job_count; ++i) {
|
||||
JobData &job = _jobs[i];
|
||||
CRASH_COND(job.thread == nullptr);
|
||||
|
||||
Thread::wait_to_finish(job.thread);
|
||||
|
||||
memdelete(job.thread);
|
||||
memdelete(job.semaphore);
|
||||
memdelete(job.input_mutex);
|
||||
memdelete(job.output_mutex);
|
||||
}
|
||||
}
|
||||
|
||||
void push(const Input &input) {
|
||||
CRASH_COND(_job_count < 1);
|
||||
|
||||
unsigned int replaced_blocks = 0;
|
||||
unsigned int highest_pending_count = 0;
|
||||
unsigned int lowest_pending_count = 0;
|
||||
|
||||
// Lock all inputs and gather their pending work counts
|
||||
for (unsigned int job_index = 0; job_index < _job_count; ++job_index) {
|
||||
JobData &job = _jobs[job_index];
|
||||
|
||||
job.input_mutex->lock();
|
||||
|
||||
highest_pending_count = MAX(highest_pending_count, job.shared_input.blocks.size());
|
||||
lowest_pending_count = MIN(lowest_pending_count, job.shared_input.blocks.size());
|
||||
}
|
||||
|
||||
unsigned int i = 0;
|
||||
|
||||
// We don't use a "weakest team gets it" dispatch for speed,
|
||||
// So prioritize only jobs under median workload count and not just the highest.
|
||||
unsigned int median_pending_count = lowest_pending_count + (highest_pending_count - lowest_pending_count) / 2;
|
||||
|
||||
// Dispatch to jobs with least pending requests
|
||||
for (unsigned int job_index = 0; job_index < _job_count && i < input.blocks.size(); ++job_index) {
|
||||
JobData &job = _jobs[job_index];
|
||||
unsigned int pending_count = job.shared_input.blocks.size();
|
||||
|
||||
unsigned int count = MIN(median_pending_count - pending_count, input.blocks.size());
|
||||
|
||||
if (count > 0) {
|
||||
if (i + count > input.blocks.size()) {
|
||||
count = input.blocks.size() - i;
|
||||
}
|
||||
replaced_blocks += push_block_requests(job, input.blocks, i, count);
|
||||
i += count;
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch equal count of remaining requests.
|
||||
// Remainder is dispatched too until consumed through the first jobs.
|
||||
unsigned int base_count = (input.blocks.size() - i) / _job_count;
|
||||
unsigned int remainder = (input.blocks.size() - i) % _job_count;
|
||||
for (unsigned int job_index = 0; job_index < _job_count && i < input.blocks.size(); ++job_index) {
|
||||
JobData &job = _jobs[job_index];
|
||||
|
||||
unsigned int count = base_count;
|
||||
if (remainder > 0) {
|
||||
++count;
|
||||
--remainder;
|
||||
}
|
||||
|
||||
if (i + count > input.blocks.size()) {
|
||||
replaced_blocks += push_block_requests(job, input.blocks, i, input.blocks.size() - i);
|
||||
} else {
|
||||
replaced_blocks += push_block_requests(job, input.blocks, i, count);
|
||||
i += count;
|
||||
}
|
||||
}
|
||||
|
||||
// Set remaining data on all jobs, unlock inputs and resume
|
||||
for (unsigned int job_index = 0; job_index < _job_count; ++job_index) {
|
||||
JobData &job = _jobs[job_index];
|
||||
|
||||
if (job.shared_input.priority_position != input.priority_position || input.blocks.size() > 0) {
|
||||
job.needs_sort = true;
|
||||
}
|
||||
|
||||
job.shared_input.priority_position = input.priority_position;
|
||||
|
||||
if (input.use_exclusive_region) {
|
||||
job.shared_input.use_exclusive_region = true;
|
||||
job.shared_input.exclusive_region_extent = input.exclusive_region_extent;
|
||||
job.shared_input.exclusive_region_max_lod = input.exclusive_region_max_lod;
|
||||
}
|
||||
|
||||
bool should_run = !job.shared_input.is_empty();
|
||||
|
||||
job.input_mutex->unlock();
|
||||
|
||||
if (should_run) {
|
||||
job.semaphore->post();
|
||||
}
|
||||
}
|
||||
|
||||
if (replaced_blocks > 0) {
|
||||
PRINT_VERBOSE(String("VoxelBlockProcessor: {1} blocks already in queue were replaced").format(varray(replaced_blocks)));
|
||||
}
|
||||
}
|
||||
|
||||
void pop(Output &output) {
|
||||
output.stats = Stats();
|
||||
output.stats.thread_count = _job_count;
|
||||
|
||||
// Harvest results from all jobs
|
||||
for (unsigned int i = 0; i < _job_count; ++i) {
|
||||
|
||||
JobData &job = _jobs[i];
|
||||
{
|
||||
MutexLock lock(job.output_mutex);
|
||||
|
||||
output.blocks.append_array(job.shared_output.blocks);
|
||||
merge_stats(output.stats, job.shared_output.stats, i);
|
||||
job.shared_output.blocks.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Dictionary to_dictionary(const Stats &stats) {
|
||||
Dictionary d;
|
||||
d["min_time"] = stats.min_time;
|
||||
d["max_time"] = stats.max_time;
|
||||
d["sorting_time"] = stats.sorting_time;
|
||||
d["dropped_count"] = stats.dropped_count;
|
||||
Array remaining_blocks;
|
||||
remaining_blocks.resize(stats.thread_count);
|
||||
for (unsigned int i = 0; i < stats.thread_count; ++i) {
|
||||
remaining_blocks[i] = stats.remaining_blocks[i];
|
||||
}
|
||||
d["remaining_blocks_per_thread"] = remaining_blocks;
|
||||
d["file_openings"] = stats.processor.file_openings;
|
||||
d["time_spent_opening_files"] = stats.processor.time_spent_opening_files;
|
||||
return d;
|
||||
}
|
||||
|
||||
private:
|
||||
struct JobData {
|
||||
// Data accessed from other threads, so they need mutexes
|
||||
//------------------------
|
||||
Input shared_input;
|
||||
Output shared_output;
|
||||
Mutex *input_mutex = nullptr;
|
||||
Mutex *output_mutex = nullptr;
|
||||
// Indexes which blocks are present in shared_input,
|
||||
// so if we push a duplicate request with the same coordinates, we can discard it without a linear search
|
||||
FixedArray<HashMap<Vector3i, int, Vector3iHasher>, VoxelConstants::MAX_LOD> shared_input_block_indexes;
|
||||
bool needs_sort = false;
|
||||
// Only read by the thread. Should not go back to `false` after being set to `true`.
|
||||
bool thread_exit = false;
|
||||
//------------------------
|
||||
|
||||
Input input;
|
||||
Output output;
|
||||
Semaphore *semaphore = nullptr;
|
||||
Thread *thread = nullptr;
|
||||
uint32_t sync_interval_ms = 100;
|
||||
uint32_t job_index = -1;
|
||||
bool duplicate_rejection = false;
|
||||
int batch_count = 0;
|
||||
|
||||
BlockProcessingFunc processor;
|
||||
};
|
||||
|
||||
static void merge_stats(Stats &a, const Stats &b, int job_index) {
|
||||
a.max_time = MAX(a.max_time, b.max_time);
|
||||
a.min_time = MIN(a.min_time, b.min_time);
|
||||
a.remaining_blocks[job_index] = b.remaining_blocks[job_index];
|
||||
a.sorting_time += b.sorting_time;
|
||||
a.dropped_count += b.dropped_count;
|
||||
|
||||
a.processor.file_openings += b.processor.file_openings;
|
||||
a.processor.time_spent_opening_files += b.processor.time_spent_opening_files;
|
||||
}
|
||||
|
||||
unsigned int push_block_requests(JobData &job, const std::vector<InputBlock> &input_blocks, int begin, int count) {
|
||||
// The job's input mutex must have been locked first!
|
||||
|
||||
unsigned int replaced_blocks = 0;
|
||||
unsigned int end = begin + count;
|
||||
CRASH_COND(end > input_blocks.size());
|
||||
|
||||
for (unsigned int i = begin; i < end; ++i) {
|
||||
const InputBlock &block = input_blocks[i];
|
||||
CRASH_COND(block.lod >= VoxelConstants::MAX_LOD);
|
||||
|
||||
if (job.duplicate_rejection) {
|
||||
int *index = job.shared_input_block_indexes[block.lod].getptr(block.position);
|
||||
|
||||
// TODO When using more than one thread, duplicate rejection is less effective... is it relevant to keep it at all?
|
||||
if (index) {
|
||||
// The block is already in the update queue, replace it
|
||||
++replaced_blocks;
|
||||
CRASH_COND(*index < 0 || *index >= (int)job.shared_input.blocks.size());
|
||||
job.shared_input.blocks[*index] = block;
|
||||
|
||||
} else {
|
||||
// Append new block request
|
||||
unsigned int j = job.shared_input.blocks.size();
|
||||
job.shared_input.blocks.push_back(block);
|
||||
job.shared_input_block_indexes[block.lod][block.position] = j;
|
||||
}
|
||||
|
||||
} else {
|
||||
job.shared_input.blocks.push_back(block);
|
||||
}
|
||||
}
|
||||
|
||||
return replaced_blocks;
|
||||
}
|
||||
|
||||
static void _thread_func(void *p_data) {
|
||||
JobData *data = reinterpret_cast<JobData *>(p_data);
|
||||
CRASH_COND(data == nullptr);
|
||||
thread_func(*data);
|
||||
}
|
||||
|
||||
static void thread_func(JobData &data) {
|
||||
while (true) {
|
||||
uint32_t sync_time = OS::get_singleton()->get_ticks_msec() + data.sync_interval_ms;
|
||||
|
||||
unsigned int queue_index = 0;
|
||||
Stats stats;
|
||||
|
||||
bool should_exit = false;
|
||||
if (data.thread_exit) {
|
||||
// This will be the last run, we must process all mandatory requests.
|
||||
// We can't check `thread_exit` afterwards because it could have become `true` after we sync,
|
||||
// hence we could miss some of the requests.
|
||||
should_exit = true;
|
||||
}
|
||||
|
||||
thread_sync(data, queue_index, stats, stats.sorting_time, stats.dropped_count);
|
||||
|
||||
// Continue to run as long as there are queries to process
|
||||
while (!data.input.blocks.empty()) {
|
||||
if (!data.input.blocks.empty()) {
|
||||
if (should_exit) {
|
||||
// Flush inputs we processed already
|
||||
shift_up(data.input.blocks, queue_index);
|
||||
queue_index = 0;
|
||||
|
||||
// Remove all remaining queries except those that can't be discarded.
|
||||
// Since the thread is exiting, we don't care anymore about sorting.
|
||||
unordered_remove_if(data.input.blocks,
|
||||
[](const InputBlock &b) {
|
||||
return b.can_be_discarded;
|
||||
});
|
||||
}
|
||||
|
||||
unsigned int input_begin = queue_index;
|
||||
unsigned int batch_count = data.batch_count;
|
||||
|
||||
if (input_begin + batch_count > data.input.blocks.size()) {
|
||||
batch_count = data.input.blocks.size() - input_begin;
|
||||
}
|
||||
|
||||
if (batch_count > 0) {
|
||||
uint64_t time_before = OS::get_singleton()->get_ticks_usec();
|
||||
|
||||
unsigned int output_begin = data.output.blocks.size();
|
||||
data.output.blocks.resize(data.output.blocks.size() + batch_count);
|
||||
|
||||
for (unsigned int i = 0; i < batch_count; ++i) {
|
||||
CRASH_COND(input_begin + i < 0 || input_begin + i >= data.input.blocks.size());
|
||||
InputBlock &ib = data.input.blocks[input_begin + i];
|
||||
OutputBlock &ob = data.output.blocks.write[output_begin + i];
|
||||
ob.position = ib.position;
|
||||
ob.lod = ib.lod;
|
||||
}
|
||||
|
||||
data.processor(
|
||||
ArraySlice<InputBlock>(data.input.blocks, input_begin, input_begin + batch_count),
|
||||
ArraySlice<OutputBlock>(&data.output.blocks.write[0], output_begin, output_begin + batch_count),
|
||||
stats.processor);
|
||||
|
||||
uint64_t time_taken = (OS::get_singleton()->get_ticks_usec() - time_before) / batch_count;
|
||||
|
||||
// Do some stats
|
||||
if (stats.first) {
|
||||
stats.first = false;
|
||||
stats.min_time = time_taken;
|
||||
stats.max_time = time_taken;
|
||||
} else {
|
||||
if (time_taken < stats.min_time) {
|
||||
stats.min_time = time_taken;
|
||||
}
|
||||
if (time_taken > stats.max_time) {
|
||||
stats.max_time = time_taken;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
queue_index += batch_count;
|
||||
if (queue_index >= data.input.blocks.size()) {
|
||||
data.input.blocks.clear();
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t time = OS::get_singleton()->get_ticks_msec();
|
||||
if (time >= sync_time || data.input.blocks.empty()) {
|
||||
uint64_t sort_time;
|
||||
unsigned int dropped_count;
|
||||
thread_sync(data, queue_index, stats, sort_time, dropped_count);
|
||||
|
||||
sync_time = OS::get_singleton()->get_ticks_msec() + data.sync_interval_ms;
|
||||
queue_index = 0;
|
||||
stats = Stats();
|
||||
stats.sorting_time = sort_time;
|
||||
stats.dropped_count = dropped_count;
|
||||
}
|
||||
}
|
||||
|
||||
if (should_exit) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait for future wake-up
|
||||
data.semaphore->wait();
|
||||
}
|
||||
}
|
||||
|
||||
static inline float get_priority_heuristic(const InputBlock &a, const Vector3i &viewer_block_pos, const Vector3 &viewer_direction, int max_lod) {
|
||||
int f = 1 << a.lod;
|
||||
Vector3i p = a.position * f;
|
||||
float d = Math::sqrt(p.distance_sq(viewer_block_pos) + 0.1f);
|
||||
float dp = viewer_direction.dot(viewer_block_pos.to_vec3() / d);
|
||||
// Higher lod indexes come first to allow the octree to subdivide.
|
||||
// Then comes distance, which is modified by how much in view the block is
|
||||
return (max_lod - a.lod) * 10000.f + d + (1.f - dp) * 4.f * f;
|
||||
}
|
||||
|
||||
struct BlockUpdateComparator {
|
||||
inline bool operator()(const InputBlock &a, const InputBlock &b) const {
|
||||
return a.sort_heuristic < b.sort_heuristic;
|
||||
}
|
||||
};
|
||||
|
||||
static void thread_sync(JobData &data, unsigned int queue_index, Stats stats, uint64_t &out_sort_time, unsigned int &out_dropped_count) {
|
||||
if (!data.input.blocks.empty()) {
|
||||
// Cleanup input vector
|
||||
|
||||
if (queue_index >= data.input.blocks.size()) {
|
||||
data.input.blocks.clear();
|
||||
|
||||
} else if (queue_index > 0) {
|
||||
|
||||
// Shift up remaining items since we use a Vector
|
||||
shift_up(data.input.blocks, queue_index);
|
||||
}
|
||||
}
|
||||
|
||||
stats.remaining_blocks[data.job_index] = data.input.blocks.size();
|
||||
bool needs_sort;
|
||||
|
||||
// Get input
|
||||
{
|
||||
MutexLock lock(data.input_mutex);
|
||||
|
||||
// Copy requests from shared to internal
|
||||
append_array(data.input.blocks, data.shared_input.blocks);
|
||||
|
||||
data.input.priority_position = data.shared_input.priority_position;
|
||||
|
||||
if (data.shared_input.use_exclusive_region) {
|
||||
data.input.use_exclusive_region = true;
|
||||
data.input.exclusive_region_extent = data.shared_input.exclusive_region_extent;
|
||||
data.input.exclusive_region_max_lod = data.shared_input.exclusive_region_max_lod;
|
||||
}
|
||||
|
||||
data.shared_input.blocks.clear();
|
||||
|
||||
if (data.duplicate_rejection) {
|
||||
// We emptied shared input, empty shared_input_block_indexes then
|
||||
for (unsigned int lod_index = 0; lod_index < data.shared_input_block_indexes.size(); ++lod_index) {
|
||||
data.shared_input_block_indexes[lod_index].clear();
|
||||
}
|
||||
}
|
||||
|
||||
needs_sort = data.needs_sort;
|
||||
data.needs_sort = false;
|
||||
}
|
||||
|
||||
if (!data.output.blocks.empty()) {
|
||||
// print_line(String("VoxelMeshUpdater: posting {0} blocks, {1} remaining ; cost [{2}..{3}] usec")
|
||||
// .format(varray(_output.blocks.size(), _input.blocks.size(), stats.min_time, stats.max_time)));
|
||||
|
||||
// Copy output to shared
|
||||
MutexLock lock(data.output_mutex);
|
||||
data.shared_output.blocks.append_array(data.output.blocks);
|
||||
data.shared_output.stats = stats;
|
||||
data.output.blocks.clear();
|
||||
}
|
||||
|
||||
// Cancel blocks outside exclusive region.
|
||||
// We do this early because if the player keeps moving forward,
|
||||
// we would keep accumulating requests forever, and that means slower sorting and memory waste
|
||||
int dropped_count = 0;
|
||||
if (data.input.use_exclusive_region) {
|
||||
for (unsigned int i = 0; i < data.input.blocks.size(); ++i) {
|
||||
const InputBlock &ib = data.input.blocks[i];
|
||||
|
||||
if (!ib.can_be_discarded || ib.lod >= data.input.exclusive_region_max_lod) {
|
||||
continue;
|
||||
}
|
||||
|
||||
Rect3i box = Rect3i::from_center_extents(data.input.priority_position >> ib.lod, Vector3i(data.input.exclusive_region_extent));
|
||||
|
||||
if (!box.contains(ib.position)) {
|
||||
|
||||
// Indicate the caller that we dropped that block.
|
||||
// This can help troubleshoot bugs in some situations.
|
||||
OutputBlock ob;
|
||||
ob.position = ib.position;
|
||||
ob.lod = ib.lod;
|
||||
ob.drop_hint = true;
|
||||
data.output.blocks.push_back(ob);
|
||||
|
||||
// We'll put that block in replacement of the dropped one and pop the last cell,
|
||||
// so we don't need to shift every following blocks
|
||||
const InputBlock &shifted_block = data.input.blocks.back();
|
||||
|
||||
// Do this last because it invalidates `ib`
|
||||
data.input.blocks[i] = shifted_block;
|
||||
data.input.blocks.pop_back();
|
||||
|
||||
// Move back to redo this index, since we replaced the current block
|
||||
--i;
|
||||
|
||||
++dropped_count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (dropped_count > 0) {
|
||||
PRINT_VERBOSE(String("Dropped {0} blocks from thread").format(varray(dropped_count)));
|
||||
out_dropped_count = dropped_count;
|
||||
}
|
||||
|
||||
uint64_t time_before = OS::get_singleton()->get_ticks_usec();
|
||||
|
||||
if (!data.input.blocks.empty() && needs_sort) {
|
||||
for (auto it = data.input.blocks.begin(); it != data.input.blocks.end(); ++it) {
|
||||
InputBlock &ib = *it;
|
||||
// Set or override previous heuristic based on new infos
|
||||
ib.sort_heuristic = get_priority_heuristic(ib,
|
||||
data.input.priority_position,
|
||||
data.input.priority_direction,
|
||||
data.input.max_lod_index);
|
||||
}
|
||||
|
||||
// Re-sort priority
|
||||
SortArray<InputBlock, BlockUpdateComparator> sorter;
|
||||
sorter.sort(data.input.blocks.data(), data.input.blocks.size());
|
||||
}
|
||||
|
||||
out_sort_time = OS::get_singleton()->get_ticks_usec() - time_before;
|
||||
}
|
||||
|
||||
JobData _jobs[MAX_JOBS];
|
||||
unsigned int _job_count = 0;
|
||||
};
|
||||
|
||||
#endif // VOXEL_BLOCK_THREAD_MANAGER_H
|
|
@ -1,141 +0,0 @@
|
|||
#include "voxel_data_loader.h"
|
||||
#include "../streams/voxel_stream.h"
|
||||
#include "../util/macros.h"
|
||||
#include "../util/utility.h"
|
||||
|
||||
VoxelDataLoader::VoxelDataLoader(unsigned int thread_count, Ref<VoxelStream> stream, unsigned int block_size_pow2) {
|
||||
PRINT_VERBOSE("Constructing VoxelDataLoader");
|
||||
CRASH_COND(stream.is_null());
|
||||
|
||||
if (Engine::get_singleton()->is_editor_hint()) {
|
||||
// In the editor, we want extra safety.
|
||||
// Duplicate all data so modifications done by the user won't impact running threads.
|
||||
stream = stream->duplicate(true);
|
||||
}
|
||||
|
||||
// TODO I'm not sure it's worth to configure more than one thread for voxel streams
|
||||
|
||||
FixedArray<Mgr::BlockProcessingFunc, Mgr::MAX_JOBS> processors;
|
||||
|
||||
processors[0] = [this, stream](ArraySlice<InputBlock> inputs, ArraySlice<OutputBlock> outputs, Mgr::ProcessorStats &stats) {
|
||||
this->process_blocks_thread_func(inputs, outputs, stream, stats);
|
||||
};
|
||||
|
||||
if (thread_count > 1) {
|
||||
if (stream->is_thread_safe()) {
|
||||
|
||||
for (unsigned int i = 1; i < thread_count; ++i) {
|
||||
processors[i] = processors[0];
|
||||
}
|
||||
|
||||
} else if (stream->is_cloneable()) {
|
||||
|
||||
// Note: more than one thread can make sense for generators,
|
||||
// but won't be as useful for file and network streams
|
||||
for (unsigned int i = 1; i < thread_count; ++i) {
|
||||
stream = stream->duplicate();
|
||||
processors[i] = [this, stream](ArraySlice<InputBlock> inputs, ArraySlice<OutputBlock> outputs, Mgr::ProcessorStats &stats) {
|
||||
this->process_blocks_thread_func(inputs, outputs, stream, stats);
|
||||
};
|
||||
}
|
||||
|
||||
} else {
|
||||
ERR_PRINT("Thread count set to higher than 1, but the stream is neither thread-safe nor cloneable. Capping back to 1 thread.");
|
||||
thread_count = 1;
|
||||
}
|
||||
}
|
||||
|
||||
int batch_count = 128;
|
||||
int sync_interval_ms = 500;
|
||||
|
||||
_block_size_pow2 = block_size_pow2;
|
||||
_mgr = memnew(Mgr(thread_count, sync_interval_ms, processors, true, batch_count));
|
||||
}
|
||||
|
||||
VoxelDataLoader::~VoxelDataLoader() {
|
||||
PRINT_VERBOSE("Destroying VoxelDataLoader");
|
||||
if (_mgr) {
|
||||
memdelete(_mgr);
|
||||
}
|
||||
}
|
||||
|
||||
// Can run in multiple threads
|
||||
void VoxelDataLoader::process_blocks_thread_func(const ArraySlice<InputBlock> inputs, ArraySlice<OutputBlock> outputs, Ref<VoxelStream> stream, Mgr::ProcessorStats &stats) {
|
||||
|
||||
CRASH_COND(inputs.size() != outputs.size());
|
||||
|
||||
Vector<VoxelBlockRequest> emerge_requests;
|
||||
Vector<VoxelBlockRequest> immerge_requests;
|
||||
|
||||
for (size_t i = 0; i < inputs.size(); ++i) {
|
||||
|
||||
const InputBlock &ib = inputs[i];
|
||||
|
||||
int bs = 1 << _block_size_pow2;
|
||||
Vector3i block_origin_in_voxels = ib.position * (bs << ib.lod);
|
||||
|
||||
if (ib.data.voxels_to_save.is_null()) {
|
||||
|
||||
VoxelBlockRequest r;
|
||||
r.voxel_buffer.instance();
|
||||
r.voxel_buffer->create(bs, bs, bs);
|
||||
r.origin_in_voxels = block_origin_in_voxels;
|
||||
r.lod = ib.lod;
|
||||
emerge_requests.push_back(r);
|
||||
|
||||
} else {
|
||||
|
||||
VoxelBlockRequest r;
|
||||
r.voxel_buffer = ib.data.voxels_to_save;
|
||||
r.origin_in_voxels = block_origin_in_voxels;
|
||||
r.lod = ib.lod;
|
||||
immerge_requests.push_back(r);
|
||||
}
|
||||
}
|
||||
|
||||
stream->emerge_blocks(emerge_requests);
|
||||
stream->immerge_blocks(immerge_requests);
|
||||
|
||||
VoxelStream::Stats stream_stats = stream->get_statistics();
|
||||
stats.file_openings = stream_stats.file_openings;
|
||||
stats.time_spent_opening_files = stream_stats.time_spent_opening_files;
|
||||
|
||||
// Assumes the stream won't change output order
|
||||
int iload = 0;
|
||||
for (size_t i = 0; i < outputs.size(); ++i) {
|
||||
|
||||
const InputBlock &ib = inputs[i];
|
||||
OutputBlockData &output = outputs[i].data;
|
||||
|
||||
if (ib.data.voxels_to_save.is_null()) {
|
||||
output.type = TYPE_LOAD;
|
||||
output.voxels_loaded = emerge_requests.write[iload].voxel_buffer;
|
||||
CRASH_COND(output.voxels_loaded.is_null());
|
||||
++iload;
|
||||
|
||||
} else {
|
||||
output.type = TYPE_SAVE;
|
||||
}
|
||||
}
|
||||
|
||||
// If unordered responses were allowed
|
||||
//
|
||||
// size_t j = 0;
|
||||
// for (size_t i = 0; i < emerge_requests.size(); ++i) {
|
||||
// VoxelStream::BlockRequest &r = emerge_requests.write[i];
|
||||
// OutputBlock &ob = outputs[j];
|
||||
// ob.position = r.origin_in_voxels >> (_block_size_pow2 + r.lod);
|
||||
// ob.lod = r.lod;
|
||||
// ob.data.type = TYPE_LOAD;
|
||||
// ob.data.voxels_loaded = r.voxel_buffer;
|
||||
// ++j;
|
||||
// }
|
||||
// for (size_t i = 0; i < immerge_requests.size(); ++i) {
|
||||
// VoxelStream::BlockRequest &r = immerge_requests.write[i];
|
||||
// OutputBlock &ob = outputs[j];
|
||||
// ob.position = r.origin_in_voxels >> (_block_size_pow2 + r.lod);
|
||||
// ob.lod = r.lod;
|
||||
// ob.data.type = TYPE_SAVE;
|
||||
// ++j;
|
||||
// }
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
#ifndef VOXEL_DATA_LOADER_H
|
||||
#define VOXEL_DATA_LOADER_H
|
||||
|
||||
#include "block_thread_manager.h"
|
||||
|
||||
class VoxelStream;
|
||||
class VoxelBuffer;
|
||||
|
||||
class VoxelDataLoader {
|
||||
public:
|
||||
struct InputBlockData {
|
||||
Ref<VoxelBuffer> voxels_to_save;
|
||||
};
|
||||
|
||||
enum RequestType {
|
||||
TYPE_NOT_INITIALIZED = 0, // For error detection
|
||||
TYPE_SAVE,
|
||||
TYPE_LOAD
|
||||
};
|
||||
|
||||
struct OutputBlockData {
|
||||
RequestType type;
|
||||
Ref<VoxelBuffer> voxels_loaded;
|
||||
};
|
||||
|
||||
typedef VoxelBlockThreadManager<InputBlockData, OutputBlockData> Mgr;
|
||||
typedef Mgr::InputBlock InputBlock;
|
||||
typedef Mgr::OutputBlock OutputBlock;
|
||||
typedef Mgr::Input Input;
|
||||
typedef Mgr::Output Output;
|
||||
typedef Mgr::Stats Stats;
|
||||
|
||||
VoxelDataLoader(unsigned int thread_count, Ref<VoxelStream> stream, unsigned int block_size_pow2);
|
||||
~VoxelDataLoader();
|
||||
|
||||
void push(const Input &input) { _mgr->push(input); }
|
||||
void pop(Output &output) { _mgr->pop(output); }
|
||||
|
||||
private:
|
||||
void process_blocks_thread_func(const ArraySlice<InputBlock> inputs, ArraySlice<OutputBlock> outputs, Ref<VoxelStream> stream, Mgr::ProcessorStats &stats);
|
||||
|
||||
Mgr *_mgr = nullptr;
|
||||
int _block_size_pow2 = 0;
|
||||
};
|
||||
|
||||
#endif // VOXEL_DATA_LOADER_H
|
|
@ -1,86 +0,0 @@
|
|||
#include "voxel_mesh_updater.h"
|
||||
#include "../meshers/transvoxel/voxel_mesher_transvoxel.h"
|
||||
#include "../util/macros.h"
|
||||
#include "../util/utility.h"
|
||||
#include "voxel_lod_terrain.h"
|
||||
|
||||
VoxelMeshUpdater::VoxelMeshUpdater(unsigned int thread_count, MeshingParams params) {
|
||||
PRINT_VERBOSE("Constructing VoxelMeshUpdater");
|
||||
|
||||
Ref<VoxelMesherBlocky> blocky_mesher;
|
||||
Ref<VoxelMesherTransvoxel> smooth_mesher;
|
||||
|
||||
_minimum_padding = 0;
|
||||
_maximum_padding = 0;
|
||||
|
||||
if (params.library.is_valid()) {
|
||||
blocky_mesher.instance();
|
||||
blocky_mesher->set_library(params.library);
|
||||
blocky_mesher->set_occlusion_enabled(params.baked_ao);
|
||||
blocky_mesher->set_occlusion_darkness(params.baked_ao_darkness);
|
||||
_minimum_padding = max(_minimum_padding, blocky_mesher->get_minimum_padding());
|
||||
_maximum_padding = max(_maximum_padding, blocky_mesher->get_maximum_padding());
|
||||
}
|
||||
|
||||
if (params.smooth_surface) {
|
||||
smooth_mesher.instance();
|
||||
_minimum_padding = max(_minimum_padding, smooth_mesher->get_minimum_padding());
|
||||
_maximum_padding = max(_maximum_padding, smooth_mesher->get_maximum_padding());
|
||||
}
|
||||
|
||||
FixedArray<Mgr::BlockProcessingFunc, VoxelConstants::MAX_LOD> processors;
|
||||
|
||||
for (unsigned int i = 0; i < thread_count; ++i) {
|
||||
|
||||
if (i > 0) {
|
||||
// Need to clone them because they are not thread-safe due to memory pooling.
|
||||
// Also thanks to the wonders of ref_pointer() being private we trigger extra refs/unrefs for no reason
|
||||
if (blocky_mesher.is_valid()) {
|
||||
blocky_mesher = Ref<VoxelMesher>(blocky_mesher->clone());
|
||||
}
|
||||
if (smooth_mesher.is_valid()) {
|
||||
smooth_mesher = Ref<VoxelMesher>(smooth_mesher->clone());
|
||||
}
|
||||
}
|
||||
|
||||
processors[i] = [this, blocky_mesher, smooth_mesher](const ArraySlice<InputBlock> inputs, ArraySlice<OutputBlock> outputs, Mgr::ProcessorStats &_) {
|
||||
this->process_blocks_thread_func(inputs, outputs, blocky_mesher, smooth_mesher);
|
||||
};
|
||||
}
|
||||
|
||||
_mgr = memnew(Mgr(thread_count, 50, processors));
|
||||
}
|
||||
|
||||
VoxelMeshUpdater::~VoxelMeshUpdater() {
|
||||
PRINT_VERBOSE("Destroying VoxelMeshUpdater");
|
||||
if (_mgr) {
|
||||
memdelete(_mgr);
|
||||
}
|
||||
}
|
||||
|
||||
void VoxelMeshUpdater::process_blocks_thread_func(
|
||||
const ArraySlice<InputBlock> inputs,
|
||||
ArraySlice<OutputBlock> outputs,
|
||||
Ref<VoxelMesher> blocky_mesher,
|
||||
Ref<VoxelMesher> smooth_mesher) {
|
||||
|
||||
CRASH_COND(inputs.size() != outputs.size());
|
||||
|
||||
for (unsigned int i = 0; i < inputs.size(); ++i) {
|
||||
|
||||
const InputBlock &ib = inputs[i];
|
||||
const InputBlockData &block = ib.data;
|
||||
OutputBlockData &output = outputs[i].data;
|
||||
|
||||
CRASH_COND(block.voxels.is_null());
|
||||
|
||||
VoxelMesher::Input input = { **block.voxels, ib.lod };
|
||||
|
||||
if (blocky_mesher.is_valid()) {
|
||||
blocky_mesher->build(output.blocky_surfaces, input);
|
||||
}
|
||||
if (smooth_mesher.is_valid()) {
|
||||
smooth_mesher->build(output.smooth_surfaces, input);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
#ifndef VOXEL_MESH_UPDATER_H
|
||||
#define VOXEL_MESH_UPDATER_H
|
||||
|
||||
#include <core/os/semaphore.h>
|
||||
#include <core/os/thread.h>
|
||||
#include <core/vector.h>
|
||||
|
||||
#include "../meshers/blocky/voxel_mesher_blocky.h"
|
||||
#include "../voxel_buffer.h"
|
||||
|
||||
#include "block_thread_manager.h"
|
||||
|
||||
class VoxelMeshUpdater {
|
||||
public:
|
||||
struct InputBlockData {
|
||||
Ref<VoxelBuffer> voxels;
|
||||
};
|
||||
|
||||
struct OutputBlockData {
|
||||
VoxelMesher::Output blocky_surfaces;
|
||||
VoxelMesher::Output smooth_surfaces;
|
||||
};
|
||||
|
||||
struct MeshingParams {
|
||||
Ref<VoxelLibrary> library;
|
||||
bool baked_ao = true;
|
||||
float baked_ao_darkness = 0.8;
|
||||
bool smooth_surface = false;
|
||||
};
|
||||
|
||||
typedef VoxelBlockThreadManager<InputBlockData, OutputBlockData> Mgr;
|
||||
typedef Mgr::InputBlock InputBlock;
|
||||
typedef Mgr::OutputBlock OutputBlock;
|
||||
typedef Mgr::Input Input;
|
||||
typedef Mgr::Output Output;
|
||||
typedef Mgr::Stats Stats;
|
||||
|
||||
VoxelMeshUpdater(unsigned int thread_count, MeshingParams params);
|
||||
~VoxelMeshUpdater();
|
||||
|
||||
void push(const Input &input) { _mgr->push(input); }
|
||||
void pop(Output &output) { _mgr->pop(output); }
|
||||
|
||||
unsigned int get_minimum_padding() const { return _minimum_padding; }
|
||||
unsigned int get_maximum_padding() const { return _maximum_padding; }
|
||||
|
||||
private:
|
||||
void process_blocks_thread_func(const ArraySlice<InputBlock> inputs,
|
||||
ArraySlice<OutputBlock> outputs,
|
||||
Ref<VoxelMesher> blocky_mesher,
|
||||
Ref<VoxelMesher> smooth_mesher);
|
||||
|
||||
Mgr *_mgr = nullptr;
|
||||
unsigned int _minimum_padding = 0;
|
||||
unsigned int _maximum_padding = 0;
|
||||
};
|
||||
|
||||
#endif // VOXEL_MESH_UPDATER_H
|
Loading…
Reference in New Issue