Added a low-priority queue to TimeSpreadTaskRunner

This commit is contained in:
Marc Gilleron 2022-06-22 22:01:10 +01:00
parent c51f3d8005
commit 76de7a439b
3 changed files with 57 additions and 26 deletions

View File

@ -119,7 +119,8 @@ public:
_world.viewers.for_each_with_id(f);
}
void push_main_thread_time_spread_task(ITimeSpreadTask *task);
void push_main_thread_time_spread_task(
ITimeSpreadTask *task, TimeSpreadTaskRunner::Priority priority = TimeSpreadTaskRunner::PRIORITY_NORMAL);
int get_main_thread_time_budget_usec() const;
void set_main_thread_time_budget_usec(unsigned int usec);

View File

@ -10,15 +10,17 @@ TimeSpreadTaskRunner::~TimeSpreadTaskRunner() {
flush();
}
void TimeSpreadTaskRunner::push(ITimeSpreadTask *task) {
MutexLock lock(_tasks_mutex);
_tasks.push(task);
void TimeSpreadTaskRunner::push(ITimeSpreadTask *task, Priority priority) {
Queue &queue = _queues[priority];
MutexLock lock(queue.tasks_mutex);
queue.tasks.push(task);
}
void TimeSpreadTaskRunner::push(Span<ITimeSpreadTask *> tasks) {
MutexLock lock(_tasks_mutex);
void TimeSpreadTaskRunner::push(Span<ITimeSpreadTask *> tasks, Priority priority) {
Queue &queue = _queues[priority];
MutexLock lock(queue.tasks_mutex);
for (unsigned int i = 0; i < tasks.size(); ++i) {
_tasks.push(tasks[i]);
queue.tasks.push(tasks[i]);
}
}
@ -26,28 +28,38 @@ void TimeSpreadTaskRunner::process(uint64_t time_budget_usec) {
ZN_PROFILE_SCOPE();
const Time &time = *Time::get_singleton();
static thread_local std::vector<ITimeSpreadTask *> tls_postponed_tasks;
ZN_ASSERT(tls_postponed_tasks.size() == 0);
static thread_local FixedArray<std::vector<ITimeSpreadTask *>, PRIORITY_COUNT> tls_postponed_tasks;
for (unsigned int i = 0; i < tls_postponed_tasks.size(); ++i) {
ZN_ASSERT(tls_postponed_tasks[i].size() == 0);
}
const uint64_t time_before = time.get_ticks_usec();
// Do at least one task
do {
ITimeSpreadTask *task;
{
MutexLock lock(_tasks_mutex);
if (_tasks.size() == 0) {
ITimeSpreadTask *task = nullptr;
// Consume from high priority queues first
unsigned int queue_index;
for (queue_index = 0; queue_index < _queues.size(); ++queue_index) {
Queue &queue = _queues[queue_index];
MutexLock lock(queue.tasks_mutex);
if (queue.tasks.size() != 0) {
task = queue.tasks.front();
queue.tasks.pop();
break;
}
task = _tasks.front();
_tasks.pop();
}
if (task == nullptr) {
break;
}
TimeSpreadTaskContext ctx;
task->run(ctx);
if (ctx.postpone) {
tls_postponed_tasks.push_back(task);
tls_postponed_tasks[queue_index].push_back(task);
} else {
// TODO Call recycling function instead?
ZN_DELETE(task);
@ -55,9 +67,13 @@ void TimeSpreadTaskRunner::process(uint64_t time_budget_usec) {
} while (time.get_ticks_usec() - time_before < time_budget_usec);
if (tls_postponed_tasks.size() > 0) {
push(to_span(tls_postponed_tasks));
tls_postponed_tasks.clear();
// Push postponed task back into queues
for (unsigned int queue_index = 0; queue_index < tls_postponed_tasks.size(); ++queue_index) {
std::vector<ITimeSpreadTask *> &tasks = tls_postponed_tasks[queue_index];
if (tasks.size() > 0) {
push(to_span(tasks), Priority(queue_index));
tasks.clear();
}
}
}
@ -71,8 +87,13 @@ void TimeSpreadTaskRunner::flush() {
}
unsigned int TimeSpreadTaskRunner::get_pending_count() const {
MutexLock lock(_tasks_mutex);
return _tasks.size();
unsigned int count = 0;
for (unsigned int queue_index = 0; queue_index < _queues.size(); ++queue_index) {
const Queue &queue = _queues[queue_index];
MutexLock lock(queue.tasks_mutex);
count += queue.tasks.size();
}
return count;
}
} // namespace zylann

View File

@ -24,20 +24,29 @@ public:
// Runs tasks in the caller thread, within a time budget per call. Kind of like coroutines.
class TimeSpreadTaskRunner {
public:
enum Priority { //
PRIORITY_NORMAL = 0,
PRIORITY_LOW = 1,
PRIORITY_COUNT
};
~TimeSpreadTaskRunner();
// Pushing is thread-safe.
void push(ITimeSpreadTask *task);
void push(Span<ITimeSpreadTask *> tasks);
void push(ITimeSpreadTask *task, Priority priority = PRIORITY_NORMAL);
void push(Span<ITimeSpreadTask *> tasks, Priority priority = PRIORITY_NORMAL);
void process(uint64_t time_budget_usec);
void flush();
unsigned int get_pending_count() const;
private:
// TODO Optimization: naive thread safety. Should be enough for now.
std::queue<ITimeSpreadTask *> _tasks;
BinaryMutex _tasks_mutex;
struct Queue {
std::queue<ITimeSpreadTask *> tasks;
// TODO Optimization: naive thread safety. Should be enough for now.
BinaryMutex tasks_mutex;
};
FixedArray<Queue, PRIORITY_COUNT> _queues;
};
} // namespace zylann