Line data Source code
1 : /* -*- Mode: C++; tab-width: 20; indent-tabs-mode: nil; c-basic-offset: 2 -*-
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 :
6 : #include "JobScheduler.h"
7 : #include "Logging.h"
8 :
9 : namespace mozilla {
10 : namespace gfx {
11 :
12 : JobScheduler* JobScheduler::sSingleton = nullptr;
13 :
14 0 : bool JobScheduler::Init(uint32_t aNumThreads, uint32_t aNumQueues)
15 : {
16 0 : MOZ_ASSERT(!sSingleton);
17 0 : MOZ_ASSERT(aNumThreads >= aNumQueues);
18 :
19 0 : sSingleton = new JobScheduler();
20 0 : sSingleton->mNextQueue = 0;
21 :
22 0 : for (uint32_t i = 0; i < aNumQueues; ++i) {
23 0 : sSingleton->mDrawingQueues.push_back(new MultiThreadedJobQueue());
24 : }
25 :
26 0 : for (uint32_t i = 0; i < aNumThreads; ++i) {
27 0 : sSingleton->mWorkerThreads.push_back(WorkerThread::Create(sSingleton->mDrawingQueues[i%aNumQueues]));
28 : }
29 0 : return true;
30 : }
31 :
32 0 : void JobScheduler::ShutDown()
33 : {
34 0 : MOZ_ASSERT(IsEnabled());
35 0 : if (!IsEnabled()) {
36 0 : return;
37 : }
38 :
39 0 : for (auto queue : sSingleton->mDrawingQueues) {
40 0 : queue->ShutDown();
41 0 : delete queue;
42 : }
43 :
44 0 : for (WorkerThread* thread : sSingleton->mWorkerThreads) {
45 : // this will block until the thread is joined.
46 0 : delete thread;
47 : }
48 :
49 0 : sSingleton->mWorkerThreads.clear();
50 0 : delete sSingleton;
51 0 : sSingleton = nullptr;
52 : }
53 :
54 : JobStatus
55 0 : JobScheduler::ProcessJob(Job* aJob)
56 : {
57 0 : MOZ_ASSERT(aJob);
58 0 : auto status = aJob->Run();
59 0 : if (status == JobStatus::Error || status == JobStatus::Complete) {
60 0 : delete aJob;
61 : }
62 0 : return status;
63 : }
64 :
65 : void
66 0 : JobScheduler::SubmitJob(Job* aJob)
67 : {
68 0 : MOZ_ASSERT(aJob);
69 0 : RefPtr<SyncObject> start = aJob->GetStartSync();
70 0 : if (start && start->Register(aJob)) {
71 : // The Job buffer starts with a non-signaled sync object, it
72 : // is now registered in the list of task buffers waiting on the
73 : // sync object, so we should not place it in the queue.
74 0 : return;
75 : }
76 :
77 0 : GetQueueForJob(aJob)->SubmitJob(aJob);
78 : }
79 :
80 : void
81 0 : JobScheduler::Join(SyncObject* aCompletion)
82 : {
83 0 : RefPtr<EventObject> waitForCompletion = new EventObject();
84 0 : JobScheduler::SubmitJob(new SetEventJob(waitForCompletion, aCompletion));
85 0 : waitForCompletion->Wait();
86 0 : }
87 :
88 : MultiThreadedJobQueue*
89 0 : JobScheduler::GetQueueForJob(Job* aJob)
90 : {
91 0 : return aJob->IsPinnedToAThread() ? aJob->GetWorkerThread()->GetJobQueue()
92 0 : : GetDrawingQueue();
93 : }
94 :
95 0 : Job::Job(SyncObject* aStart, SyncObject* aCompletion, WorkerThread* aThread)
96 : : mNextWaitingJob(nullptr)
97 : , mStartSync(aStart)
98 : , mCompletionSync(aCompletion)
99 0 : , mPinToThread(aThread)
100 : {
101 0 : if (mStartSync) {
102 0 : mStartSync->AddSubsequent(this);
103 : }
104 0 : if (mCompletionSync) {
105 0 : mCompletionSync->AddPrerequisite(this);
106 : }
107 0 : }
108 :
109 0 : Job::~Job()
110 : {
111 0 : if (mCompletionSync) {
112 : //printf(" -- Job %p dtor completion %p\n", this, mCompletionSync);
113 0 : mCompletionSync->Signal();
114 0 : mCompletionSync = nullptr;
115 : }
116 0 : }
117 :
118 : JobStatus
119 0 : SetEventJob::Run()
120 : {
121 0 : mEvent->Set();
122 0 : return JobStatus::Complete;
123 : }
124 :
125 0 : SetEventJob::SetEventJob(EventObject* aEvent,
126 : SyncObject* aStart, SyncObject* aCompletion,
127 0 : WorkerThread* aWorker)
128 : : Job(aStart, aCompletion, aWorker)
129 0 : , mEvent(aEvent)
130 0 : {}
131 :
132 0 : SetEventJob::~SetEventJob()
133 0 : {}
134 :
135 0 : SyncObject::SyncObject(uint32_t aNumPrerequisites)
136 : : mSignals(aNumPrerequisites)
137 : , mFirstWaitingJob(nullptr)
138 : #ifdef DEBUG
139 : , mNumPrerequisites(aNumPrerequisites)
140 0 : , mAddedPrerequisites(0)
141 : #endif
142 0 : {}
143 :
144 0 : SyncObject::~SyncObject()
145 : {
146 0 : MOZ_ASSERT(mFirstWaitingJob == nullptr);
147 0 : }
148 :
149 : bool
150 0 : SyncObject::Register(Job* aJob)
151 : {
152 0 : MOZ_ASSERT(aJob);
153 :
154 : // For now, ensure that when we schedule the first subsequent, we have already
155 : // created all of the prerequisites. This is an arbitrary restriction because
156 : // we specify the number of prerequisites in the constructor, but in the typical
157 : // scenario, if the assertion FreezePrerequisite blows up here it probably means
158 : // we got the initial nmber of prerequisites wrong. We can decide to remove
159 : // this restriction if needed.
160 0 : FreezePrerequisites();
161 :
162 0 : int32_t signals = mSignals;
163 :
164 0 : if (signals > 0) {
165 0 : AddWaitingJob(aJob);
166 : // Since Register and Signal can be called concurrently, it can happen that
167 : // reading mSignals in Register happens before decrementing mSignals in Signal,
168 : // but SubmitWaitingJobs happens before AddWaitingJob. This ordering means
169 : // the SyncObject ends up in the signaled state with a task sitting in the
170 : // waiting list. To prevent that we check mSignals a second time and submit
171 : // again if signals reached zero in the mean time.
172 : // We do this instead of holding a mutex around mSignals+mJobs to reduce
173 : // lock contention.
174 0 : int32_t signals2 = mSignals;
175 0 : if (signals2 == 0) {
176 0 : SubmitWaitingJobs();
177 : }
178 0 : return true;
179 : }
180 :
181 0 : return false;
182 : }
183 :
184 : void
185 0 : SyncObject::Signal()
186 : {
187 0 : int32_t signals = --mSignals;
188 0 : MOZ_ASSERT(signals >= 0);
189 :
190 0 : if (signals == 0) {
191 0 : SubmitWaitingJobs();
192 : }
193 0 : }
194 :
195 : void
196 0 : SyncObject::AddWaitingJob(Job* aJob)
197 : {
198 : // Push (using atomics) the task into the list of waiting tasks.
199 : for (;;) {
200 0 : Job* first = mFirstWaitingJob;
201 0 : aJob->mNextWaitingJob = first;
202 0 : if (mFirstWaitingJob.compareExchange(first, aJob)) {
203 0 : break;
204 : }
205 0 : }
206 0 : }
207 :
208 0 : void SyncObject::SubmitWaitingJobs()
209 : {
210 : // Scheduling the tasks can cause code that modifies <this>'s reference
211 : // count to run concurrently, and cause the caller of this function to
212 : // be owned by another thread. We need to make sure the reference count
213 : // does not reach 0 on another thread before the end of this method, so
214 : // hold a strong ref to prevent that!
215 0 : RefPtr<SyncObject> kungFuDeathGrip(this);
216 :
217 : // First atomically swap mFirstWaitingJob and waitingJobs...
218 0 : Job* waitingJobs = nullptr;
219 : for (;;) {
220 0 : waitingJobs = mFirstWaitingJob;
221 0 : if (mFirstWaitingJob.compareExchange(waitingJobs, nullptr)) {
222 0 : break;
223 : }
224 : }
225 :
226 : // ... and submit all of the waiting tasks in waitingJob now that they belong
227 : // to this thread.
228 0 : while (waitingJobs) {
229 0 : Job* next = waitingJobs->mNextWaitingJob;
230 0 : waitingJobs->mNextWaitingJob = nullptr;
231 0 : JobScheduler::GetQueueForJob(waitingJobs)->SubmitJob(waitingJobs);
232 0 : waitingJobs = next;
233 : }
234 0 : }
235 :
236 : bool
237 0 : SyncObject::IsSignaled()
238 : {
239 0 : return mSignals == 0;
240 : }
241 :
242 : void
243 0 : SyncObject::FreezePrerequisites()
244 : {
245 0 : MOZ_ASSERT(mAddedPrerequisites == mNumPrerequisites);
246 0 : }
247 :
248 : void
249 0 : SyncObject::AddPrerequisite(Job* aJob)
250 : {
251 0 : MOZ_ASSERT(++mAddedPrerequisites <= mNumPrerequisites);
252 0 : }
253 :
254 : void
255 0 : SyncObject::AddSubsequent(Job* aJob)
256 : {
257 0 : }
258 :
259 0 : WorkerThread::WorkerThread(MultiThreadedJobQueue* aJobQueue)
260 0 : : mQueue(aJobQueue)
261 : {
262 0 : aJobQueue->RegisterThread();
263 0 : }
264 :
265 : void
266 0 : WorkerThread::Run()
267 : {
268 0 : SetName("gfx worker");
269 :
270 : for (;;) {
271 0 : Job* commands = nullptr;
272 0 : if (!mQueue->WaitForJob(commands)) {
273 0 : mQueue->UnregisterThread();
274 0 : return;
275 : }
276 :
277 0 : JobStatus status = JobScheduler::ProcessJob(commands);
278 :
279 0 : if (status == JobStatus::Error) {
280 : // Don't try to handle errors for now, but that's open to discussions.
281 : // I expect errors to be mostly OOM issues.
282 0 : gfxDevCrash(LogReason::JobStatusError) << "Invalid job status " << (int)status;
283 : }
284 0 : }
285 : }
286 :
287 : } //namespace
288 : } //namespace
|