289 lines
7.0 KiB
C++
289 lines
7.0 KiB
C++
/* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
|
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#include "JobScheduler.h"
|
|
#include "Logging.h"
|
|
|
|
namespace mozilla {
|
|
namespace gfx {
|
|
|
|
JobScheduler* JobScheduler::sSingleton = nullptr;
|
|
|
|
bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
|
|
{
|
|
MOZ_ASSERT(!sSingleton);
|
|
MOZ_ASSERT(aNumThreads >= aNumQueues);
|
|
|
|
sSingleton = new JobScheduler();
|
|
sSingleton->mNextQueue = 0;
|
|
|
|
for (uint32_t i = 0; i < aNumQueues; ++i) {
|
|
sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
|
|
}
|
|
|
|
for (uint32_t i = 0; i < aNumThreads; ++i) {
|
|
sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void JobScheduler::ShutDown()
|
|
{
|
|
MOZ_ASSERT(IsEnabled());
|
|
if (!IsEnabled()) {
|
|
return;
|
|
}
|
|
|
|
for (auto queue : sSingleton->mDrawingQueues) {
|
|
queue->ShutDown();
|
|
delete queue;
|
|
}
|
|
|
|
for (WorkerThread* thread : sSingleton->mWorkerThreads) {
|
|
// this will block until the thread is joined.
|
|
delete thread;
|
|
}
|
|
|
|
sSingleton->mWorkerThreads.clear();
|
|
delete sSingleton;
|
|
sSingleton = nullptr;
|
|
}
|
|
|
|
JobStatus
|
|
JobScheduler::ProcessJob(Job* aJob)
|
|
{
|
|
MOZ_ASSERT(aJob);
|
|
auto status = aJob->Run();
|
|
if (status == JobStatus::Error || status == JobStatus::Complete) {
|
|
delete aJob;
|
|
}
|
|
return status;
|
|
}
|
|
|
|
void
|
|
JobScheduler::SubmitJob(Job* aJob)
|
|
{
|
|
MOZ_ASSERT(aJob);
|
|
RefPtr<SyncObject> start = aJob->GetStartSync();
|
|
if (start && start->Register(aJob)) {
|
|
// The Job buffer starts with a non-signaled sync object, it
|
|
// is now registered in the list of task buffers waiting on the
|
|
// sync object, so we should not place it in the queue.
|
|
return;
|
|
}
|
|
|
|
GetQueueForJob(aJob)->SubmitJob(aJob);
|
|
}
|
|
|
|
void
|
|
JobScheduler::Join(SyncObject* aCompletion)
|
|
{
|
|
RefPtr<EventObject> waitForCompletion = new EventObject();
|
|
JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
|
|
waitForCompletion->Wait();
|
|
}
|
|
|
|
MultiThreadedJobQueue*
|
|
JobScheduler::GetQueueForJob(Job* aJob)
|
|
{
|
|
return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
|
|
: GetDrawingQueue();
|
|
}
|
|
|
|
Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
|
|
: mNextWaitingJob(nullptr)
|
|
, mStartSync(aStart)
|
|
, mCompletionSync(aCompletion)
|
|
, mPinToThread(aThread)
|
|
{
|
|
if (mStartSync) {
|
|
mStartSync->AddSubsequent(this);
|
|
}
|
|
if (mCompletionSync) {
|
|
mCompletionSync->AddPrerequisite(this);
|
|
}
|
|
}
|
|
|
|
Job::~Job()
|
|
{
|
|
if (mCompletionSync) {
|
|
//printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
|
|
mCompletionSync->Signal();
|
|
mCompletionSync = nullptr;
|
|
}
|
|
}
|
|
|
|
JobStatus
|
|
SetEventJob::Run()
|
|
{
|
|
mEvent->Set();
|
|
return JobStatus::Complete;
|
|
}
|
|
|
|
SetEventJob::SetEventJob(EventObject* aEvent,
|
|
SyncObject* aStart, SyncObject* aCompletion,
|
|
WorkerThread* aWorker)
|
|
: Job(aStart, aCompletion, aWorker)
|
|
, mEvent(aEvent)
|
|
{}
|
|
|
|
SetEventJob::~SetEventJob()
|
|
{}
|
|
|
|
SyncObject::SyncObject(uint32_t aNumPrerequisites)
|
|
: mSignals(aNumPrerequisites)
|
|
, mFirstWaitingJob(nullptr)
|
|
#ifdef DEBUG
|
|
, mNumPrerequisites(aNumPrerequisites)
|
|
, mAddedPrerequisites(0)
|
|
#endif
|
|
{}
|
|
|
|
SyncObject::~SyncObject()
|
|
{
|
|
MOZ_ASSERT(mFirstWaitingJob == nullptr);
|
|
}
|
|
|
|
bool
|
|
SyncObject::Register(Job* aJob)
|
|
{
|
|
MOZ_ASSERT(aJob);
|
|
|
|
// For now, ensure that when we schedule the first subsequent, we have already
|
|
// created all of the prerequisites. This is an arbitrary restriction because
|
|
// we specify the number of prerequisites in the constructor, but in the typical
|
|
// scenario, if the assertion FreezePrerequisite blows up here it probably means
|
|
// we got the initial nmber of prerequisites wrong. We can decide to remove
|
|
// this restriction if needed.
|
|
FreezePrerequisites();
|
|
|
|
int32_t signals = mSignals;
|
|
|
|
if (signals > 0) {
|
|
AddWaitingJob(aJob);
|
|
// Since Register and Signal can be called concurrently, it can happen that
|
|
// reading mSignals in Register happens before decrementing mSignals in Signal,
|
|
// but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
|
|
// the SyncObject ends up in the signaled state with a task sitting in the
|
|
// waiting list. To prevent that we check mSignals a second time and submit
|
|
// again if signals reached zero in the mean time.
|
|
// We do this instead of holding a mutex around mSignals+mJobs to reduce
|
|
// lock contention.
|
|
int32_t signals2 = mSignals;
|
|
if (signals2 == 0) {
|
|
SubmitWaitingJobs();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void
|
|
SyncObject::Signal()
|
|
{
|
|
int32_t signals = --mSignals;
|
|
MOZ_ASSERT(signals >= 0);
|
|
|
|
if (signals == 0) {
|
|
SubmitWaitingJobs();
|
|
}
|
|
}
|
|
|
|
void
|
|
SyncObject::AddWaitingJob(Job* aJob)
|
|
{
|
|
// Push (using atomics) the task into the list of waiting tasks.
|
|
for (;;) {
|
|
Job* first = mFirstWaitingJob;
|
|
aJob->mNextWaitingJob = first;
|
|
if (mFirstWaitingJob.compareExchange(first, aJob)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
void SyncObject::SubmitWaitingJobs()
|
|
{
|
|
// Scheduling the tasks can cause code that modifies <this>'s reference
|
|
// count to run concurrently, and cause the caller of this function to
|
|
// be owned by another thread. We need to make sure the reference count
|
|
// does not reach 0 on another thread before the end of this method, so
|
|
// hold a strong ref to prevent that!
|
|
RefPtr<SyncObject> kungFuDeathGrip(this);
|
|
|
|
// First atomically swap mFirstWaitingJob and waitingJobs...
|
|
Job* waitingJobs = nullptr;
|
|
for (;;) {
|
|
waitingJobs = mFirstWaitingJob;
|
|
if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
// ... and submit all of the waiting tasks in waitingJob now that they belong
|
|
// to this thread.
|
|
while (waitingJobs) {
|
|
Job* next = waitingJobs->mNextWaitingJob;
|
|
waitingJobs->mNextWaitingJob = nullptr;
|
|
JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
|
|
waitingJobs = next;
|
|
}
|
|
}
|
|
|
|
bool
|
|
SyncObject::IsSignaled()
|
|
{
|
|
return mSignals == 0;
|
|
}
|
|
|
|
void
|
|
SyncObject::FreezePrerequisites()
|
|
{
|
|
MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
|
|
}
|
|
|
|
void
|
|
SyncObject::AddPrerequisite(Job* aJob)
|
|
{
|
|
MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
|
|
}
|
|
|
|
void
|
|
SyncObject::AddSubsequent(Job* aJob)
|
|
{
|
|
}
|
|
|
|
WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
|
|
: mQueue(aJobQueue)
|
|
{
|
|
aJobQueue->RegisterThread();
|
|
}
|
|
|
|
void
|
|
WorkerThread::Run()
|
|
{
|
|
SetName("gfx worker");
|
|
|
|
for (;;) {
|
|
Job* commands = nullptr;
|
|
if (!mQueue->WaitForJob(commands)) {
|
|
mQueue->UnregisterThread();
|
|
return;
|
|
}
|
|
|
|
JobStatus status = JobScheduler::ProcessJob(commands);
|
|
|
|
if (status == JobStatus::Error) {
|
|
// Don't try to handle errors for now, but that's open to discussions.
|
|
// I expect errors to be mostly OOM issues.
|
|
gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status;
|
|
}
|
|
}
|
|
}
|
|
|
|
} //namespace
|
|
} //namespace
|