Extract JobQueue interface and change the former JobQueue to AsyncJobQueue.

master
Lars W. (lwho) 2014-04-18 17:05:15 +02:00
parent 5dd76306e0
commit 3df75f0fb1
7 changed files with 85 additions and 53 deletions

View File

@ -522,7 +522,7 @@ void GasGiant::GenerateTexture()
assert(!m_job[i].HasJob());
m_hasJobRequest[i] = true;
STextureFaceRequest *ssrd = new STextureFaceRequest(&s_patchFaces[i][0], GetSystemBody()->GetPath(), i, UV_DIMS, GetTerrain());
m_job[i] = Pi::Jobs()->Queue(new SingleTextureFaceJob(ssrd));
m_job[i] = Pi::GetAsyncJobQueue()->Queue(new SingleTextureFaceJob(ssrd));
}
}

View File

@ -176,7 +176,7 @@ void GeoPatch::LODUpdate(const vector3d &campos) {
SQuadSplitRequest *ssrd = new SQuadSplitRequest(v0, v1, v2, v3, centroid.Normalized(), m_depth,
geosphere->GetSystemBody()->GetPath(), mPatchID, ctx->edgeLen,
ctx->frac, geosphere->GetTerrain());
m_job = Pi::Jobs()->Queue(new QuadPatchJob(ssrd));
m_job = Pi::GetAsyncJobQueue()->Queue(new QuadPatchJob(ssrd));
} else {
for (int i=0; i<NUM_KIDS; i++) {
kids[i]->LODUpdate(campos);
@ -202,7 +202,7 @@ void GeoPatch::RequestSinglePatch()
mHasJobRequest = true;
SSingleSplitRequest *ssrd = new SSingleSplitRequest(v0, v1, v2, v3, centroid.Normalized(), m_depth,
geosphere->GetSystemBody()->GetPath(), mPatchID, ctx->edgeLen, ctx->frac, geosphere->GetTerrain());
m_job = Pi::Jobs()->Queue(new SinglePatchJob(ssrd));
m_job = Pi::GetAsyncJobQueue()->Queue(new SinglePatchJob(ssrd));
}
}

View File

@ -16,7 +16,7 @@ Job::~Job()
UnlinkHandle();
}
JobRunner::JobRunner(JobQueue *jq, const uint8_t idx) :
AsyncJobQueue::JobRunner::JobRunner(AsyncJobQueue *jq, const uint8_t idx) :
m_jobQueue(jq),
m_job(0),
m_threadIdx(idx),
@ -28,7 +28,7 @@ JobRunner::JobRunner(JobQueue *jq, const uint8_t idx) :
m_threadId = SDL_CreateThread(&JobRunner::Trampoline, m_threadName.c_str(), this);
}
JobRunner::~JobRunner()
AsyncJobQueue::JobRunner::~JobRunner()
{
// if we have a job running, cancel it. the worker will return it to the
// finish queue, where it will be deleted later, so we don't need to do that
@ -46,14 +46,14 @@ JobRunner::~JobRunner()
}
// entry point for SDL thread. we simply get back onto a method. convenience mostly
int JobRunner::Trampoline(void *data)
int AsyncJobQueue::JobRunner::Trampoline(void *data)
{
JobRunner *jr = static_cast<JobRunner*>(data);
jr->Main();
return 0;
}
void JobRunner::Main()
void AsyncJobQueue::JobRunner::Main()
{
Job *job;
@ -101,12 +101,12 @@ void JobRunner::Main()
}
}
SDL_mutex *JobRunner::GetQueueDestroyingLock()
SDL_mutex *AsyncJobQueue::JobRunner::GetQueueDestroyingLock()
{
return m_queueDestroyingLock;
}
void JobRunner::SetQueueDestroyed()
void AsyncJobQueue::JobRunner::SetQueueDestroyed()
{
m_queueDestroyed = true;
}
@ -178,7 +178,7 @@ JobHandle::~JobHandle()
}
JobQueue::JobQueue(Uint32 numRunners) :
AsyncJobQueue::AsyncJobQueue(Uint32 numRunners) :
m_shutdown(false)
{
// Want to limit this for now to the maximum number of threads defined in the class
@ -193,7 +193,7 @@ JobQueue::JobQueue(Uint32 numRunners) :
}
}
JobQueue::~JobQueue()
AsyncJobQueue::~AsyncJobQueue()
{
// flag shutdown. protected by the queue lock for convenience in GetJob
SDL_LockMutex(m_queueLock);
@ -236,7 +236,7 @@ JobQueue::~JobQueue()
SDL_DestroyMutex(m_queueLock);
}
JobHandle JobQueue::Queue(Job *job, JobClient *client)
JobHandle AsyncJobQueue::Queue(Job *job, JobClient *client)
{
JobHandle handle(job, this, client);
@ -251,7 +251,7 @@ JobHandle JobQueue::Queue(Job *job, JobClient *client)
}
// called by the runner to get a new job
Job *JobQueue::GetJob()
Job *AsyncJobQueue::GetJob()
{
SDL_LockMutex(m_queueLock);
@ -282,7 +282,7 @@ Job *JobQueue::GetJob()
}
// called by the runner when a job completes
void JobQueue::Finish(Job *job, const uint8_t threadIdx)
void AsyncJobQueue::Finish(Job *job, const uint8_t threadIdx)
{
SDL_LockMutex(m_finishedLock[threadIdx]);
m_finished[threadIdx].push_back(job);
@ -290,7 +290,7 @@ void JobQueue::Finish(Job *job, const uint8_t threadIdx)
}
// call OnFinish methods for completed jobs, and clean up
Uint32 JobQueue::FinishJobs()
Uint32 AsyncJobQueue::FinishJobs()
{
PROFILE_SCOPED()
Uint32 finished = 0;
@ -321,7 +321,7 @@ Uint32 JobQueue::FinishJobs()
return finished;
}
void JobQueue::Cancel(Job *job) {
void AsyncJobQueue::Cancel(Job *job) {
// lock both queues, so we know that all jobs will stay put
SDL_LockMutex(m_queueLock);
const uint32_t numRunners = m_runners.size();

View File

@ -13,10 +13,9 @@
static const Uint32 MAX_THREADS = 64;
class JobQueue;
class JobRunner;
class JobHandle;
class JobClient;
class JobQueue;
// represents a single unit of work that you want done
// subclass and implement:
@ -43,7 +42,7 @@ public:
virtual void OnCancel() {}
private:
friend class JobQueue;
friend class AsyncJobQueue;
friend class JobHandle;
friend class JobRunner;
@ -57,30 +56,38 @@ private:
};
// a runner wraps a single thread, and calls into the queue when its ready for
// a new job. no user-servicable parts inside!
class JobRunner {
// the queue management class. create one from the main thread, and feed your
// jobs do it. it will take care of the rest
class JobQueue {
public:
JobRunner(JobQueue *jq, const uint8_t idx);
~JobRunner();
SDL_mutex *GetQueueDestroyingLock();
void SetQueueDestroyed();
JobQueue() = default;
JobQueue(const JobQueue&) = delete;
JobQueue& operator=(const JobQueue&) = delete;
private:
static int Trampoline(void *);
void Main();
// numRunners is the number of jobs to run in parallel. right now its the
// same as the number of threads, but there's no reason that it has to be
virtual ~JobQueue() { }
JobQueue *m_jobQueue;
// call from the main thread to add a job to the queue. the job should be
// allocated with new. the queue will delete it once its its completed
virtual JobHandle Queue(Job *job, JobClient *client = nullptr) = 0;
Job *m_job;
SDL_mutex *m_jobLock;
SDL_mutex *m_queueDestroyingLock;
// call from the main thread to cancel a job. one of three things will happen
//
// - the job hasn't run yet. it will never be run, and neither OnFinished nor
// OnCancel will be called. the job will be deleted on the next call to
// FinishJobs
//
// - the job has finished. neither onFinished not onCancel will be called.
// the job will be deleted on the next call to FinishJobs
//
// - the job is running. OnCancel will be called
virtual void Cancel(Job *job) = 0;
SDL_Thread *m_threadId;
uint8_t m_threadIdx;
std::string m_threadName;
bool m_queueDestroyed;
// call from the main loop. this will call OnFinish for any finished jobs,
// and then delete all finished and cancelled jobs. returns the number of
// finished jobs (not cancelled)
virtual Uint32 FinishJobs() = 0;
};
// This is the RAII handle for a queued Job. A job is cancelled when the
@ -103,7 +110,7 @@ public:
bool operator<(const JobHandle& other) const { return m_id < other.m_id; }
private:
friend class JobQueue;
friend class AsyncJobQueue;
friend class Job;
friend class JobRunner;
@ -120,16 +127,16 @@ private:
// the queue management class. create one from the main thread, and feed your
// jobs do it. it will take care of the rest
class JobQueue {
class AsyncJobQueue : public JobQueue {
public:
// numRunners is the number of jobs to run in parallel. right now its the
// same as the number of threads, but there's no reason that it has to be
JobQueue(Uint32 numRunners);
~JobQueue();
AsyncJobQueue(Uint32 numRunners);
virtual ~AsyncJobQueue();
// call from the main thread to add a job to the queue. the job should be
// allocated with new. the queue will delete it once its its completed
JobHandle Queue(Job *job, JobClient *client = nullptr);
virtual JobHandle Queue(Job *job, JobClient *client = nullptr) override;
// call from the main thread to cancel a job. one of three things will happen
//
@ -141,15 +148,40 @@ public:
// the job will be deleted on the next call to FinishJobs
//
// - the job is running. OnCancel will be called
void Cancel(Job *job);
virtual void Cancel(Job *job) override;
// call from the main loop. this will call OnFinish for any finished jobs,
// and then delete all finished and cancelled jobs. returns the number of
// finished jobs (not cancelled)
Uint32 FinishJobs();
virtual Uint32 FinishJobs() override;
private:
friend class JobRunner;
// a runner wraps a single thread, and calls into the queue when its ready for
// a new job. no user-servicable parts inside!
class JobRunner {
public:
JobRunner(AsyncJobQueue *jq, const uint8_t idx);
~JobRunner();
SDL_mutex *GetQueueDestroyingLock();
void SetQueueDestroyed();
private:
static int Trampoline(void *);
void Main();
AsyncJobQueue *m_jobQueue;
Job *m_job;
SDL_mutex *m_jobLock;
SDL_mutex *m_queueDestroyingLock;
SDL_Thread *m_threadId;
uint8_t m_threadIdx;
std::string m_threadName;
bool m_queueDestroyed;
};
Job *GetJob();
void Finish(Job *job, const uint8_t threadIdx);

View File

@ -148,7 +148,7 @@ ObjectViewerView *Pi::objectViewerView;
#endif
Sound::MusicPlayer Pi::musicPlayer;
std::unique_ptr<JobQueue> Pi::jobQueue;
std::unique_ptr<AsyncJobQueue> Pi::asyncJobQueue;
// XXX enabling this breaks UI gauge rendering. see #2627
#define USE_RTT 0
@ -440,7 +440,7 @@ void Pi::Init(const std::map<std::string,std::string> &options, bool no_gui)
const int numCores = OS::GetNumCores();
assert(numCores > 0);
if (numThreads == 0) numThreads = std::max(Uint32(numCores) - 1, 1U);
jobQueue.reset(new JobQueue(numThreads));
asyncJobQueue.reset(new AsyncJobQueue(numThreads));
Output("started %d worker threads\n", numThreads);
// XXX early, Lua init needs it
@ -678,7 +678,7 @@ void Pi::Quit()
StarSystemCache::ShrinkCache(SystemPath(), true);
SDL_Quit();
FileSystem::Uninit();
jobQueue.reset();
asyncJobQueue.reset();
exit(0);
}
@ -1282,7 +1282,7 @@ void Pi::MainLoop()
cpan->Update();
musicPlayer.Update();
jobQueue->FinishJobs();
asyncJobQueue->FinishJobs();
#if WITH_DEVKEYS
if (Pi::showDebugInfo && SDL_GetTicks() - last_stats > 1000) {

View File

@ -173,7 +173,7 @@ public:
static struct DetailLevel detail;
static GameConfig *config;
static JobQueue *Jobs() { return jobQueue.get();}
static JobQueue *GetAsyncJobQueue() { return asyncJobQueue.get();}
static bool DrawGUI;
@ -181,7 +181,7 @@ private:
static void HandleEvents();
static void InitJoysticks();
static std::unique_ptr<JobQueue> jobQueue;
static std::unique_ptr<AsyncJobQueue> asyncJobQueue;
static bool menuDone;

View File

@ -91,7 +91,7 @@ RefCountedPtr<SectorCache::Slave> SectorCache::NewSlaveCache()
return RefCountedPtr<Slave>(new Slave);
}
SectorCache::Slave::Slave() : m_jobs(Pi::Jobs())
SectorCache::Slave::Slave() : m_jobs(Pi::GetAsyncJobQueue())
{
Sector::cache.m_slaves.insert(this);
}