278 lines
7.9 KiB
C++
278 lines
7.9 KiB
C++
// Copyright © 2008-2020 Pioneer Developers. See AUTHORS.txt for details
|
|
// Licensed under the terms of the GPL v3. See licenses/GPL-3.txt
|
|
|
|
#ifndef JOBQUEUE_H
|
|
#define JOBQUEUE_H
|
|
|
|
#include "SDL_thread.h"
|
|
#include <cassert>
|
|
#include <deque>
|
|
#include <set>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
static const Uint32 MAX_THREADS = 64;
|
|
|
|
class JobClient;
|
|
class JobQueue;
|
|
|
|
// represents a single unit of work that you want done
|
|
// subclass and implement:
|
|
//
|
|
// OnRun: called from worker thread, and does the actual stuff you want done.
|
|
// store all your data in the object itself.
|
|
//
|
|
// OnFinish: called from the main thread once the worker completes the job.
|
|
// this is where you deliver the results from the worker
|
|
//
|
|
// OnCancel: optional. called from the main thread to tell the job that its
|
|
// results are not wanted. it should arrange for OnRun to return
|
|
// as quickly as possible. OnFinish will not be called for the job
|
|
class Job {
|
|
public:
|
|
// This is the RAII handle for a queued Job. A job is cancelled when the
|
|
// Job::Handle is destroyed. There is at most one Job::Handle for each Job
|
|
// (non-queued Jobs have no handle). Job::Handle is not copyable only
|
|
// moveable.
|
|
class Handle {
|
|
public:
|
|
Handle() :
|
|
m_id(++s_nextId),
|
|
m_job(nullptr),
|
|
m_queue(nullptr),
|
|
m_client(nullptr) {}
|
|
Handle(Handle &&other);
|
|
Handle &operator=(Handle &&other);
|
|
~Handle();
|
|
|
|
Handle(const Handle &) = delete;
|
|
Handle &operator=(const Handle &) = delete;
|
|
|
|
bool HasJob() const { return m_job != nullptr; }
|
|
Job *GetJob() const { return m_job; }
|
|
|
|
bool operator<(const Handle &other) const { return m_id < other.m_id; }
|
|
|
|
private:
|
|
friend class Job;
|
|
friend class AsyncJobQueue;
|
|
friend class SyncJobQueue;
|
|
|
|
Handle(Job *job, JobQueue *queue, JobClient *client);
|
|
void Unlink();
|
|
|
|
static unsigned long long s_nextId;
|
|
|
|
unsigned long long m_id;
|
|
Job *m_job;
|
|
JobQueue *m_queue;
|
|
JobClient *m_client;
|
|
};
|
|
|
|
public:
|
|
Job() :
|
|
cancelled(false),
|
|
m_handle(nullptr) {}
|
|
virtual ~Job();
|
|
|
|
Job(const Job &) = delete;
|
|
Job &operator=(const Job &) = delete;
|
|
|
|
virtual void OnRun() = 0;
|
|
virtual void OnFinish() = 0;
|
|
virtual void OnCancel() {}
|
|
|
|
private:
|
|
friend class AsyncJobQueue;
|
|
friend class SyncJobQueue;
|
|
friend class JobRunner;
|
|
|
|
void UnlinkHandle();
|
|
const Handle *GetHandle() const { return m_handle; }
|
|
void SetHandle(Handle *handle) { m_handle = handle; }
|
|
void ClearHandle() { m_handle = nullptr; }
|
|
|
|
bool cancelled;
|
|
Handle *m_handle;
|
|
};
|
|
|
|
// the queue management class. create one from the main thread, and feed your
|
|
// jobs do it. it will take care of the rest
|
|
class JobQueue {
|
|
public:
|
|
JobQueue() = default;
|
|
JobQueue(const JobQueue &) = delete;
|
|
JobQueue &operator=(const JobQueue &) = delete;
|
|
|
|
// numRunners is the number of jobs to run in parallel. right now its the
|
|
// same as the number of threads, but there's no reason that it has to be
|
|
virtual ~JobQueue() {}
|
|
|
|
// call from the main thread to add a job to the queue. the job should be
|
|
// allocated with new. the queue will delete it once its its completed
|
|
virtual Job::Handle Queue(Job *job, JobClient *client = nullptr) = 0;
|
|
|
|
// call from the main thread to cancel a job. one of three things will happen
|
|
//
|
|
// - the job hasn't run yet. it will never be run, and neither OnFinished nor
|
|
// OnCancel will be called. the job will be deleted on the next call to
|
|
// FinishJobs
|
|
//
|
|
// - the job has finished. neither onFinished not onCancel will be called.
|
|
// the job will be deleted on the next call to FinishJobs
|
|
//
|
|
// - the job is running. OnCancel will be called
|
|
virtual void Cancel(Job *job) = 0;
|
|
|
|
// call from the main loop. this will call OnFinish for any finished jobs,
|
|
// and then delete all finished and cancelled jobs. returns the number of
|
|
// finished jobs (not cancelled)
|
|
virtual Uint32 FinishJobs() = 0;
|
|
};
|
|
|
|
// the queue management class. create one from the main thread, and feed your
|
|
// jobs do it. it will take care of the rest
|
|
class AsyncJobQueue : public JobQueue {
|
|
public:
|
|
// numRunners is the number of jobs to run in parallel. right now its the
|
|
// same as the number of threads, but there's no reason that it has to be
|
|
AsyncJobQueue(Uint32 numRunners);
|
|
virtual ~AsyncJobQueue();
|
|
|
|
// call from the main thread to add a job to the queue. the job should be
|
|
// allocated with new. the queue will delete it once its its completed
|
|
virtual Job::Handle Queue(Job *job, JobClient *client = nullptr) override;
|
|
|
|
// call from the main thread to cancel a job. one of three things will happen
|
|
//
|
|
// - the job hasn't run yet. it will never be run, and neither OnFinished nor
|
|
// OnCancel will be called. the job will be deleted on the next call to
|
|
// FinishJobs
|
|
//
|
|
// - the job has finished. neither onFinished not onCancel will be called.
|
|
// the job will be deleted on the next call to FinishJobs
|
|
//
|
|
// - the job is running. OnCancel will be called
|
|
virtual void Cancel(Job *job) override;
|
|
|
|
// call from the main loop. this will call OnFinish for any finished jobs,
|
|
// and then delete all finished and cancelled jobs. returns the number of
|
|
// finished jobs (not cancelled)
|
|
virtual Uint32 FinishJobs() override;
|
|
|
|
private:
|
|
// a runner wraps a single thread, and calls into the queue when its ready for
|
|
// a new job. no user-servicable parts inside!
|
|
class JobRunner {
|
|
public:
|
|
JobRunner(AsyncJobQueue *jq, const uint8_t idx);
|
|
~JobRunner();
|
|
SDL_mutex *GetQueueDestroyingLock();
|
|
void SetQueueDestroyed();
|
|
|
|
private:
|
|
static int Trampoline(void *);
|
|
void Main();
|
|
|
|
AsyncJobQueue *m_jobQueue;
|
|
|
|
Job *m_job;
|
|
SDL_mutex *m_jobLock;
|
|
SDL_mutex *m_queueDestroyingLock;
|
|
|
|
SDL_Thread *m_threadId;
|
|
|
|
uint8_t m_threadIdx;
|
|
std::string m_threadName;
|
|
bool m_queueDestroyed;
|
|
};
|
|
|
|
Job *GetJob();
|
|
void Finish(Job *job, const uint8_t threadIdx);
|
|
|
|
std::deque<Job *> m_queue;
|
|
SDL_mutex *m_queueLock;
|
|
SDL_cond *m_queueWaitCond;
|
|
|
|
std::deque<Job *> m_finished[MAX_THREADS];
|
|
SDL_mutex *m_finishedLock[MAX_THREADS];
|
|
|
|
std::vector<JobRunner *> m_runners;
|
|
|
|
bool m_shutdown;
|
|
};
|
|
|
|
class SyncJobQueue : public JobQueue {
|
|
public:
|
|
SyncJobQueue() = default;
|
|
virtual ~SyncJobQueue();
|
|
|
|
// call from the main thread to add a job to the queue. the job should be
|
|
// allocated with new. the queue will delete it once its its completed
|
|
virtual Job::Handle Queue(Job *job, JobClient *client = nullptr) override;
|
|
|
|
// call from the main thread to cancel a job. one of three things will happen
|
|
//
|
|
// - the job hasn't run yet. it will never be run, and neither OnFinished nor
|
|
// OnCancel will be called. the job will be deleted on the next call to
|
|
// FinishJobs
|
|
//
|
|
// - the job has finished. neither onFinished not onCancel will be called.
|
|
// the job will be deleted on the next call to FinishJobs
|
|
//
|
|
// - the job is running. OnCancel will be called
|
|
virtual void Cancel(Job *job) override;
|
|
|
|
// call from the main loop. this will call OnFinish for any finished jobs,
|
|
// and then delete all finished and cancelled jobs. returns the number of
|
|
// finished jobs (not cancelled)
|
|
virtual Uint32 FinishJobs() override;
|
|
|
|
Uint32 RunJobs(Uint32 count = 1);
|
|
|
|
private:
|
|
std::deque<Job *> m_queue;
|
|
std::deque<Job *> m_finished;
|
|
};
|
|
|
|
class JobClient {
|
|
public:
|
|
virtual void Order(Job *job) = 0;
|
|
virtual void RemoveJob(Job::Handle *handle) = 0;
|
|
virtual ~JobClient() {}
|
|
};
|
|
|
|
class JobSet : public JobClient {
|
|
public:
|
|
JobSet(JobQueue *queue) :
|
|
m_queue(queue) {}
|
|
JobSet(JobSet &&other) :
|
|
m_queue(other.m_queue),
|
|
m_jobs(std::move(other.m_jobs)) { other.m_queue = nullptr; }
|
|
JobSet &operator=(JobSet &&other)
|
|
{
|
|
m_queue = other.m_queue;
|
|
m_jobs = std::move(other.m_jobs);
|
|
other.m_queue = nullptr;
|
|
return *this;
|
|
}
|
|
|
|
JobSet(const JobSet &) = delete;
|
|
JobSet &operator=(const JobSet &other) = delete;
|
|
|
|
virtual void Order(Job *job)
|
|
{
|
|
auto x = m_jobs.insert(m_queue->Queue(job, this));
|
|
assert(x.second);
|
|
}
|
|
virtual void RemoveJob(Job::Handle *handle) { m_jobs.erase(*handle); }
|
|
|
|
bool IsEmpty() const { return m_jobs.empty(); }
|
|
|
|
private:
|
|
JobQueue *m_queue;
|
|
std::set<Job::Handle> m_jobs;
|
|
};
|
|
|
|
#endif
|