2020-08-24 01:49:23 +01:00
|
|
|
#include "voxel_thread_pool.h"
|
2020-08-30 03:59:02 +01:00
|
|
|
#include "../util/profiling.h"
|
2020-08-24 01:49:23 +01:00
|
|
|
|
|
|
|
#include <core/os/os.h>
|
|
|
|
#include <core/os/semaphore.h>
|
|
|
|
#include <core/os/thread.h>
|
|
|
|
|
2020-08-27 23:32:34 +01:00
|
|
|
// template <typename T>
|
|
|
|
// static bool contains(const std::vector<T> vec, T v) {
|
|
|
|
// for (size_t i = 0; i < vec.size(); ++i) {
|
|
|
|
// if (vec[i] == v) {
|
|
|
|
// return true;
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// return false;
|
|
|
|
// }
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
VoxelThreadPool::VoxelThreadPool() {
|
|
|
|
}
|
|
|
|
|
|
|
|
VoxelThreadPool::~VoxelThreadPool() {
|
2020-08-28 18:31:21 +01:00
|
|
|
destroy_all_threads();
|
2020-08-24 01:49:23 +01:00
|
|
|
|
|
|
|
if (_completed_tasks.size() != 0) {
|
|
|
|
// We don't have ownership over tasks, so it's an error to destroy the pool without handling them
|
|
|
|
ERR_PRINT("There are unhandled completed tasks remaining!");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void VoxelThreadPool::create_thread(ThreadData &d, uint32_t i) {
|
|
|
|
d.pool = this;
|
|
|
|
d.stop = false;
|
|
|
|
d.waiting = false;
|
|
|
|
d.index = i;
|
2021-02-09 00:52:26 +00:00
|
|
|
if (!_name.empty()) {
|
|
|
|
d.name = String("{0} {1}").format(varray(_name, i));
|
|
|
|
}
|
2021-02-19 01:30:22 +00:00
|
|
|
d.thread.start(thread_func_static, &d);
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
|
2020-08-28 18:31:21 +01:00
|
|
|
void VoxelThreadPool::destroy_all_threads() {
|
|
|
|
// We have only one semaphore to signal threads to resume, and one `post()` lets only one pass.
|
|
|
|
// We cannot tell one single thread to stop, because when we post and other threads are waiting, we can't guarantee
|
|
|
|
// the one to pass will be the one we want.
|
|
|
|
// So we can only choose to stop ALL threads, and then start them again if we want to adjust their count.
|
|
|
|
// Also, it shouldn't drop tasks. Any tasks the thread was working on should still complete normally.
|
|
|
|
for (size_t i = 0; i < _thread_count; ++i) {
|
|
|
|
ThreadData &d = _threads[i];
|
|
|
|
d.stop = true;
|
|
|
|
}
|
|
|
|
for (size_t i = 0; i < _thread_count; ++i) {
|
2021-02-19 01:30:22 +00:00
|
|
|
_tasks_semaphore.post();
|
2020-08-28 18:31:21 +01:00
|
|
|
}
|
|
|
|
for (size_t i = 0; i < _thread_count; ++i) {
|
|
|
|
ThreadData &d = _threads[i];
|
2021-02-19 01:30:22 +00:00
|
|
|
d.wait_to_finish_and_reset();
|
2020-08-28 18:31:21 +01:00
|
|
|
}
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
|
2021-02-09 00:52:26 +00:00
|
|
|
void VoxelThreadPool::set_name(String name) {
|
|
|
|
_name = name;
|
|
|
|
}
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
void VoxelThreadPool::set_thread_count(uint32_t count) {
|
|
|
|
if (count > MAX_THREADS) {
|
|
|
|
count = MAX_THREADS;
|
|
|
|
}
|
2020-08-28 18:31:21 +01:00
|
|
|
destroy_all_threads();
|
2020-08-24 01:49:23 +01:00
|
|
|
for (uint32_t i = _thread_count; i < count; ++i) {
|
|
|
|
ThreadData &d = _threads[i];
|
|
|
|
create_thread(d, i);
|
|
|
|
}
|
|
|
|
_thread_count = count;
|
|
|
|
}
|
|
|
|
|
|
|
|
void VoxelThreadPool::set_batch_count(uint32_t count) {
|
|
|
|
_batch_count = count;
|
|
|
|
}
|
|
|
|
|
|
|
|
void VoxelThreadPool::set_priority_update_period(uint32_t milliseconds) {
|
|
|
|
_priority_update_period = milliseconds;
|
|
|
|
}
|
|
|
|
|
|
|
|
void VoxelThreadPool::enqueue(IVoxelTask *task) {
|
|
|
|
CRASH_COND(task == nullptr);
|
|
|
|
{
|
|
|
|
MutexLock lock(_tasks_mutex);
|
|
|
|
TaskItem t;
|
|
|
|
t.task = task;
|
|
|
|
_tasks.push_back(t);
|
2020-08-26 21:17:57 +01:00
|
|
|
++_debug_received_tasks;
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
// TODO Do I need to post a certain amount of times?
|
2021-02-19 01:30:22 +00:00
|
|
|
_tasks_semaphore.post();
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
|
2020-08-25 23:00:38 +01:00
|
|
|
void VoxelThreadPool::enqueue(ArraySlice<IVoxelTask *> tasks) {
|
|
|
|
{
|
|
|
|
MutexLock lock(_tasks_mutex);
|
|
|
|
for (size_t i = 0; i < tasks.size(); ++i) {
|
|
|
|
TaskItem t;
|
|
|
|
t.task = tasks[i];
|
2020-09-06 23:57:41 +01:00
|
|
|
CRASH_COND(t.task == nullptr);
|
2020-08-25 23:00:38 +01:00
|
|
|
_tasks.push_back(t);
|
2020-08-26 21:17:57 +01:00
|
|
|
++_debug_received_tasks;
|
2020-08-25 23:00:38 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// TODO Do I need to post a certain amount of times?
|
|
|
|
for (size_t i = 0; i < tasks.size(); ++i) {
|
2021-02-19 01:30:22 +00:00
|
|
|
_tasks_semaphore.post();
|
2020-08-25 23:00:38 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
void VoxelThreadPool::thread_func_static(void *p_data) {
|
|
|
|
ThreadData &data = *static_cast<ThreadData *>(p_data);
|
|
|
|
VoxelThreadPool &pool = *data.pool;
|
2021-02-09 00:52:26 +00:00
|
|
|
|
|
|
|
if (!data.name.empty()) {
|
|
|
|
Thread::set_name(data.name);
|
|
|
|
|
|
|
|
#ifdef VOXEL_PROFILER_ENABLED
|
|
|
|
CharString thread_name = data.name.utf8();
|
|
|
|
VOXEL_PROFILE_SET_THREAD_NAME(thread_name.get_data());
|
|
|
|
#endif
|
|
|
|
}
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
pool.thread_func(data);
|
|
|
|
}
|
|
|
|
|
|
|
|
void VoxelThreadPool::thread_func(ThreadData &data) {
|
|
|
|
data.debug_state = STATE_RUNNING;
|
|
|
|
|
|
|
|
std::vector<TaskItem> tasks;
|
|
|
|
std::vector<IVoxelTask *> cancelled_tasks;
|
|
|
|
|
|
|
|
while (!data.stop) {
|
|
|
|
{
|
2020-08-30 03:59:02 +01:00
|
|
|
VOXEL_PROFILE_SCOPE();
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
data.debug_state = STATE_PICKING;
|
|
|
|
const uint32_t now = OS::get_singleton()->get_ticks_msec();
|
|
|
|
|
|
|
|
MutexLock lock(_tasks_mutex);
|
|
|
|
|
|
|
|
// Pick best tasks
|
|
|
|
for (uint32_t bi = 0; bi < _batch_count && _tasks.size() != 0; ++bi) {
|
2020-08-27 23:32:34 +01:00
|
|
|
size_t best_index = 0; // Take first by default, this is a valid index
|
2020-08-24 01:49:23 +01:00
|
|
|
int best_priority = 999999;
|
|
|
|
|
2020-08-27 23:32:34 +01:00
|
|
|
// TODO This takes a lot of time when there are many queued tasks. Use a better container?
|
2020-08-24 01:49:23 +01:00
|
|
|
for (size_t i = 0; i < _tasks.size(); ++i) {
|
|
|
|
TaskItem &item = _tasks[i];
|
2020-09-06 23:57:41 +01:00
|
|
|
CRASH_COND(item.task == nullptr);
|
2020-08-24 01:49:23 +01:00
|
|
|
|
|
|
|
if (now - item.last_priority_update_time > _priority_update_period) {
|
2020-09-06 23:57:41 +01:00
|
|
|
// Calling `get_priority()` first since it can update cancellation
|
|
|
|
// (not clear API tho, might review that in the future)
|
|
|
|
item.cached_priority = item.task->get_priority();
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
if (item.task->is_cancelled()) {
|
|
|
|
cancelled_tasks.push_back(item.task);
|
2020-08-25 23:00:38 +01:00
|
|
|
_tasks[i] = _tasks.back();
|
|
|
|
_tasks.pop_back();
|
|
|
|
--i;
|
2020-08-24 01:49:23 +01:00
|
|
|
continue;
|
|
|
|
}
|
2020-09-06 23:57:41 +01:00
|
|
|
|
|
|
|
item.last_priority_update_time = now;
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
if (item.cached_priority < best_priority) {
|
|
|
|
best_priority = item.cached_priority;
|
|
|
|
best_index = i;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-06 23:55:22 +01:00
|
|
|
// If not all tasks were cancelled
|
|
|
|
if (_tasks.size() != 0) {
|
|
|
|
tasks.push_back(_tasks[best_index]);
|
|
|
|
CRASH_COND(best_index >= _tasks.size());
|
|
|
|
_tasks[best_index] = _tasks.back();
|
|
|
|
_tasks.pop_back();
|
|
|
|
}
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (cancelled_tasks.size() > 0) {
|
|
|
|
MutexLock lock(_completed_tasks_mutex);
|
|
|
|
for (size_t i = 0; i < cancelled_tasks.size(); ++i) {
|
|
|
|
_completed_tasks.push_back(cancelled_tasks[i]);
|
2020-08-26 21:17:57 +01:00
|
|
|
++_debug_completed_tasks;
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
}
|
2020-08-27 23:32:34 +01:00
|
|
|
cancelled_tasks.clear();
|
2020-08-24 01:49:23 +01:00
|
|
|
|
2020-08-25 23:00:38 +01:00
|
|
|
//print_line(String("Processing {0} tasks").format(varray(tasks.size())));
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
if (tasks.empty()) {
|
2020-08-26 21:17:57 +01:00
|
|
|
data.debug_state = STATE_WAITING;
|
2020-08-24 01:49:23 +01:00
|
|
|
|
|
|
|
// Wait for more tasks
|
|
|
|
data.waiting = true;
|
2021-02-19 01:30:22 +00:00
|
|
|
_tasks_semaphore.wait();
|
2020-08-24 01:49:23 +01:00
|
|
|
data.waiting = false;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
data.debug_state = STATE_RUNNING;
|
|
|
|
|
|
|
|
for (size_t i = 0; i < tasks.size(); ++i) {
|
|
|
|
TaskItem &item = tasks[i];
|
|
|
|
if (!item.task->is_cancelled()) {
|
|
|
|
VoxelTaskContext ctx;
|
|
|
|
ctx.thread_index = data.index;
|
|
|
|
item.task->run(ctx);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
{
|
|
|
|
MutexLock lock(_completed_tasks_mutex);
|
|
|
|
for (size_t i = 0; i < tasks.size(); ++i) {
|
|
|
|
TaskItem &item = tasks[i];
|
|
|
|
_completed_tasks.push_back(item.task);
|
2020-08-26 21:17:57 +01:00
|
|
|
++_debug_completed_tasks;
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
}
|
2020-08-25 23:00:38 +01:00
|
|
|
|
|
|
|
tasks.clear();
|
2020-08-24 01:49:23 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
data.debug_state = STATE_STOPPED;
|
|
|
|
}
|
|
|
|
|
|
|
|
void VoxelThreadPool::wait_for_all_tasks() {
|
2020-09-06 23:56:36 +01:00
|
|
|
const uint32_t suspicious_delay_msec = 10000;
|
2020-08-24 01:49:23 +01:00
|
|
|
|
|
|
|
uint32_t before = OS::get_singleton()->get_ticks_msec();
|
|
|
|
bool error1_reported = false;
|
|
|
|
|
|
|
|
// Wait until all tasks have been taken
|
|
|
|
while (true) {
|
|
|
|
{
|
|
|
|
MutexLock lock(_tasks_mutex);
|
|
|
|
if (_tasks.size() == 0) {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-06 23:56:36 +01:00
|
|
|
OS::get_singleton()->delay_usec(2000);
|
2020-08-24 01:49:23 +01:00
|
|
|
|
2020-09-06 23:56:36 +01:00
|
|
|
if (!error1_reported && OS::get_singleton()->get_ticks_msec() - before > suspicious_delay_msec) {
|
2020-08-24 01:49:23 +01:00
|
|
|
ERR_PRINT("Waiting for all tasks to be picked is taking too long");
|
|
|
|
error1_reported = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
before = OS::get_singleton()->get_ticks_msec();
|
|
|
|
bool error2_reported = false;
|
|
|
|
|
|
|
|
// Wait until all threads have done all their tasks
|
|
|
|
bool any_working_thread = true;
|
|
|
|
while (any_working_thread) {
|
|
|
|
any_working_thread = false;
|
|
|
|
for (size_t i = 0; i < _thread_count; ++i) {
|
|
|
|
const ThreadData &t = _threads[i];
|
|
|
|
if (t.waiting == false) {
|
|
|
|
any_working_thread = true;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-06 23:56:36 +01:00
|
|
|
OS::get_singleton()->delay_usec(2000);
|
2020-08-24 01:49:23 +01:00
|
|
|
|
2020-09-06 23:56:36 +01:00
|
|
|
if (!error2_reported && OS::get_singleton()->get_ticks_msec() - before > suspicious_delay_msec) {
|
2020-08-24 01:49:23 +01:00
|
|
|
ERR_PRINT("Waiting for all tasks to be completed is taking too long");
|
|
|
|
error2_reported = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-26 21:17:57 +01:00
|
|
|
// Debug information can be wrong, on some rare occasions.
|
|
|
|
// The variables should be safely updated, but computing or reading from them is not thread safe.
|
|
|
|
// Thought it wasnt worth locking for debugging.
|
|
|
|
|
2020-08-24 01:49:23 +01:00
|
|
|
VoxelThreadPool::State VoxelThreadPool::get_thread_debug_state(uint32_t i) const {
|
|
|
|
return _threads[i].debug_state;
|
|
|
|
}
|
2020-08-26 21:17:57 +01:00
|
|
|
|
|
|
|
unsigned int VoxelThreadPool::get_debug_remaining_tasks() const {
|
|
|
|
return _debug_received_tasks - _debug_completed_tasks;
|
|
|
|
}
|