Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #if !defined(TaskDispatcher_h_)
8 : #define TaskDispatcher_h_
9 :
10 : #include "mozilla/AbstractThread.h"
11 : #include "mozilla/Maybe.h"
12 : #include "mozilla/UniquePtr.h"
13 : #include "mozilla/Unused.h"
14 :
15 : #include "nsISupportsImpl.h"
16 : #include "nsTArray.h"
17 : #include "nsThreadUtils.h"
18 :
19 : #include <queue>
20 :
21 : namespace mozilla {
22 :
23 : /*
24 : * A classic approach to cross-thread communication is to dispatch asynchronous
25 : * runnables to perform updates on other threads. This generally works well, but
26 : * there are sometimes reasons why we might want to delay the actual dispatch of
27 : * these tasks until a specified moment. At present, this is primarily useful to
28 : * ensure that mirrored state gets updated atomically - but there may be other
29 : * applications as well.
30 : *
31 : * TaskDispatcher is a general abstract class that accepts tasks and dispatches
32 : * them at some later point. These groups of tasks are per-target-thread, and
33 : * contain separate queues for several kinds of tasks (see comments below). - "state change tasks" (which
34 : * run first, and are intended to be used to update the value held by mirrors),
35 : * and regular tasks, which are other arbitrary operations that the are gated
36 : * to run after all the state changes have completed.
37 : */
38 : class TaskDispatcher
39 : {
40 : public:
41 0 : TaskDispatcher() {}
42 0 : virtual ~TaskDispatcher() {}
43 :
44 : // Direct tasks are run directly (rather than dispatched asynchronously) when
45 : // the tail dispatcher fires. A direct task may cause other tasks to be added
46 : // to the tail dispatcher.
47 : virtual void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) = 0;
48 :
49 : // State change tasks are dispatched asynchronously always run before regular
50 : // tasks. They are intended to be used to update the value held by mirrors
51 : // before any other dispatched tasks are run on the target thread.
52 : virtual void AddStateChangeTask(AbstractThread* aThread,
53 : already_AddRefed<nsIRunnable> aRunnable) = 0;
54 :
55 : // Regular tasks are dispatched asynchronously, and run after state change
56 : // tasks.
57 : virtual void AddTask(AbstractThread* aThread,
58 : already_AddRefed<nsIRunnable> aRunnable,
59 : AbstractThread::DispatchFailureHandling aFailureHandling = AbstractThread::AssertDispatchSuccess) = 0;
60 :
61 : virtual void DispatchTasksFor(AbstractThread* aThread) = 0;
62 : virtual bool HasTasksFor(AbstractThread* aThread) = 0;
63 : virtual void DrainDirectTasks() = 0;
64 : };
65 :
66 : /*
67 : * AutoTaskDispatcher is a stack-scoped TaskDispatcher implementation that fires
68 : * its queued tasks when it is popped off the stack.
69 : */
70 : class AutoTaskDispatcher : public TaskDispatcher
71 : {
72 : public:
73 0 : explicit AutoTaskDispatcher(bool aIsTailDispatcher = false)
74 0 : : mIsTailDispatcher(aIsTailDispatcher)
75 0 : {}
76 :
77 0 : ~AutoTaskDispatcher()
78 0 : {
79 : // Given that direct tasks may trigger other code that uses the tail
80 : // dispatcher, it's better to avoid processing them in the tail dispatcher's
81 : // destructor. So we require TailDispatchers to manually invoke
82 : // DrainDirectTasks before the AutoTaskDispatcher gets destroyed. In truth,
83 : // this is only necessary in the case where this AutoTaskDispatcher can be
84 : // accessed by the direct tasks it dispatches (true for TailDispatchers, but
85 : // potentially not true for other hypothetical AutoTaskDispatchers). Feel
86 : // free to loosen this restriction to apply only to mIsTailDispatcher if a
87 : // use-case requires it.
88 0 : MOZ_ASSERT(!HaveDirectTasks());
89 :
90 0 : for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
91 0 : DispatchTaskGroup(Move(mTaskGroups[i]));
92 : }
93 0 : }
94 :
95 0 : bool HaveDirectTasks() const
96 : {
97 0 : return mDirectTasks.isSome() && !mDirectTasks->empty();
98 : }
99 :
100 0 : void DrainDirectTasks() override
101 : {
102 0 : while (HaveDirectTasks()) {
103 0 : nsCOMPtr<nsIRunnable> r = mDirectTasks->front();
104 0 : mDirectTasks->pop();
105 0 : r->Run();
106 : }
107 0 : }
108 :
109 0 : void AddDirectTask(already_AddRefed<nsIRunnable> aRunnable) override
110 : {
111 0 : if (mDirectTasks.isNothing()) {
112 0 : mDirectTasks.emplace();
113 : }
114 0 : mDirectTasks->push(Move(aRunnable));
115 0 : }
116 :
117 0 : void AddStateChangeTask(AbstractThread* aThread,
118 : already_AddRefed<nsIRunnable> aRunnable) override
119 : {
120 0 : nsCOMPtr<nsIRunnable> r = aRunnable;
121 0 : MOZ_RELEASE_ASSERT(r);
122 0 : EnsureTaskGroup(aThread).mStateChangeTasks.AppendElement(r.forget());
123 0 : }
124 :
125 0 : void AddTask(AbstractThread* aThread,
126 : already_AddRefed<nsIRunnable> aRunnable,
127 : AbstractThread::DispatchFailureHandling aFailureHandling) override
128 : {
129 0 : nsCOMPtr<nsIRunnable> r = aRunnable;
130 0 : MOZ_RELEASE_ASSERT(r);
131 : // To preserve the event order, we need to append a new group if the last
132 : // group is not targeted for |aThread|.
133 : // See https://bugzilla.mozilla.org/show_bug.cgi?id=1318226&mark=0-3#c0
134 : // for the details of the issue.
135 0 : if (mTaskGroups.Length() == 0 || mTaskGroups.LastElement()->mThread != aThread) {
136 0 : mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
137 : }
138 :
139 0 : PerThreadTaskGroup& group = *mTaskGroups.LastElement();
140 0 : group.mRegularTasks.AppendElement(r.forget());
141 :
142 : // The task group needs to assert dispatch success if any of the runnables
143 : // it's dispatching want to assert it.
144 0 : if (aFailureHandling == AbstractThread::AssertDispatchSuccess) {
145 0 : group.mFailureHandling = AbstractThread::AssertDispatchSuccess;
146 : }
147 0 : }
148 :
149 0 : bool HasTasksFor(AbstractThread* aThread) override
150 : {
151 0 : return !!GetTaskGroup(aThread) ||
152 0 : (aThread == AbstractThread::GetCurrent() && HaveDirectTasks());
153 : }
154 :
155 0 : void DispatchTasksFor(AbstractThread* aThread) override
156 : {
157 : // Dispatch all groups that match |aThread|.
158 0 : for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
159 0 : if (mTaskGroups[i]->mThread == aThread) {
160 0 : DispatchTaskGroup(Move(mTaskGroups[i]));
161 0 : mTaskGroups.RemoveElementAt(i--);
162 : }
163 : }
164 0 : }
165 :
166 : private:
167 :
168 : struct PerThreadTaskGroup
169 : {
170 : public:
171 0 : explicit PerThreadTaskGroup(AbstractThread* aThread)
172 0 : : mThread(aThread), mFailureHandling(AbstractThread::DontAssertDispatchSuccess)
173 : {
174 0 : MOZ_COUNT_CTOR(PerThreadTaskGroup);
175 0 : }
176 :
177 0 : ~PerThreadTaskGroup() { MOZ_COUNT_DTOR(PerThreadTaskGroup); }
178 :
179 : RefPtr<AbstractThread> mThread;
180 : nsTArray<nsCOMPtr<nsIRunnable>> mStateChangeTasks;
181 : nsTArray<nsCOMPtr<nsIRunnable>> mRegularTasks;
182 : AbstractThread::DispatchFailureHandling mFailureHandling;
183 : };
184 :
185 0 : class TaskGroupRunnable : public Runnable
186 : {
187 : public:
188 0 : explicit TaskGroupRunnable(UniquePtr<PerThreadTaskGroup>&& aTasks)
189 0 : : Runnable("AutoTaskDispatcher::TaskGroupRunnable")
190 0 : , mTasks(Move(aTasks))
191 : {
192 0 : }
193 :
194 0 : NS_IMETHOD Run() override
195 : {
196 : // State change tasks get run all together before any code is run, so
197 : // that all state changes are made in an atomic unit.
198 0 : for (size_t i = 0; i < mTasks->mStateChangeTasks.Length(); ++i) {
199 0 : mTasks->mStateChangeTasks[i]->Run();
200 : }
201 :
202 : // Once the state changes have completed, drain any direct tasks
203 : // generated by those state changes (i.e. watcher notification tasks).
204 : // This needs to be outside the loop because we don't want to run code
205 : // that might observe intermediate states.
206 0 : MaybeDrainDirectTasks();
207 :
208 0 : for (size_t i = 0; i < mTasks->mRegularTasks.Length(); ++i) {
209 0 : mTasks->mRegularTasks[i]->Run();
210 :
211 : // Scope direct tasks tightly to the task that generated them.
212 0 : MaybeDrainDirectTasks();
213 : }
214 :
215 0 : return NS_OK;
216 : }
217 :
218 : private:
219 0 : void MaybeDrainDirectTasks()
220 : {
221 0 : AbstractThread* currentThread = AbstractThread::GetCurrent();
222 0 : if (currentThread) {
223 0 : currentThread->TailDispatcher().DrainDirectTasks();
224 : }
225 0 : }
226 :
227 : UniquePtr<PerThreadTaskGroup> mTasks;
228 : };
229 :
230 0 : PerThreadTaskGroup& EnsureTaskGroup(AbstractThread* aThread)
231 : {
232 0 : PerThreadTaskGroup* existing = GetTaskGroup(aThread);
233 0 : if (existing) {
234 0 : return *existing;
235 : }
236 :
237 0 : mTaskGroups.AppendElement(new PerThreadTaskGroup(aThread));
238 0 : return *mTaskGroups.LastElement();
239 : }
240 :
241 0 : PerThreadTaskGroup* GetTaskGroup(AbstractThread* aThread)
242 : {
243 0 : for (size_t i = 0; i < mTaskGroups.Length(); ++i) {
244 0 : if (mTaskGroups[i]->mThread == aThread) {
245 0 : return mTaskGroups[i].get();
246 : }
247 : }
248 :
249 : // Not found.
250 0 : return nullptr;
251 : }
252 :
253 0 : void DispatchTaskGroup(UniquePtr<PerThreadTaskGroup> aGroup)
254 : {
255 0 : RefPtr<AbstractThread> thread = aGroup->mThread;
256 :
257 0 : AbstractThread::DispatchFailureHandling failureHandling = aGroup->mFailureHandling;
258 0 : AbstractThread::DispatchReason reason = mIsTailDispatcher ? AbstractThread::TailDispatch
259 0 : : AbstractThread::NormalDispatch;
260 0 : nsCOMPtr<nsIRunnable> r = new TaskGroupRunnable(Move(aGroup));
261 0 : thread->Dispatch(r.forget(), failureHandling, reason);
262 0 : }
263 :
264 : // Direct tasks. We use a Maybe<> because (a) this class is hot, (b)
265 : // mDirectTasks often doesn't get anything put into it, and (c) the
266 : // std::queue implementation in GNU libstdc++ does two largish heap
267 : // allocations when creating a new std::queue.
268 : mozilla::Maybe<std::queue<nsCOMPtr<nsIRunnable>>> mDirectTasks;
269 :
270 : // Task groups, organized by thread.
271 : nsTArray<UniquePtr<PerThreadTaskGroup>> mTaskGroups;
272 :
273 : // True if this TaskDispatcher represents the tail dispatcher for the thread
274 : // upon which it runs.
275 : const bool mIsTailDispatcher;
276 : };
277 :
278 : // Little utility class to allow declaring AutoTaskDispatcher as a default
279 : // parameter for methods that take a TaskDispatcher&.
280 : template<typename T>
281 : class PassByRef
282 : {
283 : public:
284 : PassByRef() {}
285 : operator T&() { return mVal; }
286 : private:
287 : T mVal;
288 : };
289 :
290 : } // namespace mozilla
291 :
292 : #endif
|