Line data Source code
1 : /*
2 : * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 : *
4 : * Use of this source code is governed by a BSD-style license
5 : * that can be found in the LICENSE file in the root of the source
6 : * tree. An additional intellectual property rights grant can be found
7 : * in the file PATENTS. All contributing project authors may
8 : * be found in the AUTHORS file in the root of the source tree.
9 : */
10 :
11 : #ifndef WEBRTC_BASE_TASK_QUEUE_H_
12 : #define WEBRTC_BASE_TASK_QUEUE_H_
13 :
14 : #include <list>
15 : #include <memory>
16 : #include <unordered_map>
17 :
18 : #if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
19 : #include <dispatch/dispatch.h>
20 : #endif
21 :
22 : #include "webrtc/base/constructormagic.h"
23 : #include "webrtc/base/criticalsection.h"
24 :
25 : #if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT)
26 : #include "webrtc/base/platform_thread.h"
27 : #endif
28 :
29 : #if defined(WEBRTC_BUILD_LIBEVENT)
30 : struct event_base;
31 : struct event;
32 : #endif
33 :
34 : namespace rtc {
35 :
36 : // Base interface for asynchronously executed tasks.
37 : // The interface basically consists of a single function, Run(), that executes
38 : // on the target queue. For more details see the Run() method and TaskQueue.
39 : class QueuedTask {
40 : public:
41 0 : QueuedTask() {}
42 0 : virtual ~QueuedTask() {}
43 :
44 : // Main routine that will run when the task is executed on the desired queue.
45 : // The task should return |true| to indicate that it should be deleted or
46 : // |false| to indicate that the queue should consider ownership of the task
47 : // having been transferred. Returning |false| can be useful if a task has
48 : // re-posted itself to a different queue or is otherwise being re-used.
49 : virtual bool Run() = 0;
50 :
51 : private:
52 : RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
53 : };
54 :
55 : // Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
56 : template <class Closure>
57 0 : class ClosureTask : public QueuedTask {
58 : public:
59 0 : explicit ClosureTask(const Closure& closure) : closure_(closure) {}
60 :
61 : private:
62 0 : bool Run() override {
63 0 : closure_();
64 0 : return true;
65 : }
66 :
67 : Closure closure_;
68 : };
69 :
70 : // Extends ClosureTask to also allow specifying cleanup code.
71 : // This is useful when using lambdas if guaranteeing cleanup, even if a task
72 : // was dropped (queue is too full), is required.
73 : template <class Closure, class Cleanup>
74 : class ClosureTaskWithCleanup : public ClosureTask<Closure> {
75 : public:
76 : ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup)
77 : : ClosureTask<Closure>(closure), cleanup_(cleanup) {}
78 : ~ClosureTaskWithCleanup() { cleanup_(); }
79 :
80 : private:
81 : Cleanup cleanup_;
82 : };
83 :
84 : // Convenience function to construct closures that can be passed directly
85 : // to methods that support std::unique_ptr<QueuedTask> but not template
86 : // based parameters.
87 : template <class Closure>
88 : static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) {
89 : return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure));
90 : }
91 :
92 : template <class Closure, class Cleanup>
93 : static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure,
94 : const Cleanup& cleanup) {
95 : return std::unique_ptr<QueuedTask>(
96 : new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup));
97 : }
98 :
99 : // Implements a task queue that asynchronously executes tasks in a way that
100 : // guarantees that they're executed in FIFO order and that tasks never overlap.
101 : // Tasks may always execute on the same worker thread and they may not.
102 : // To DCHECK that tasks are executing on a known task queue, use IsCurrent().
103 : //
104 : // Here are some usage examples:
105 : //
106 : // 1) Asynchronously running a lambda:
107 : //
108 : // class MyClass {
109 : // ...
110 : // TaskQueue queue_("MyQueue");
111 : // };
112 : //
113 : // void MyClass::StartWork() {
114 : // queue_.PostTask([]() { Work(); });
115 : // ...
116 : //
117 : // 2) Doing work asynchronously on a worker queue and providing a notification
118 : // callback on the current queue, when the work has been done:
119 : //
120 : // void MyClass::StartWorkAndLetMeKnowWhenDone(
121 : // std::unique_ptr<QueuedTask> callback) {
122 : // DCHECK(TaskQueue::Current()) << "Need to be running on a queue";
123 : // queue_.PostTaskAndReply([]() { Work(); }, std::move(callback));
124 : // }
125 : // ...
126 : // my_class->StartWorkAndLetMeKnowWhenDone(
127 : // NewClosure([]() { LOG(INFO) << "The work is done!";}));
128 : //
129 : // 3) Posting a custom task on a timer. The task posts itself again after
130 : // every running:
131 : //
132 : // class TimerTask : public QueuedTask {
133 : // public:
134 : // TimerTask() {}
135 : // private:
136 : // bool Run() override {
137 : // ++count_;
138 : // TaskQueue::Current()->PostDelayedTask(
139 : // std::unique_ptr<QueuedTask>(this), 1000);
140 : // // Ownership has been transferred to the next occurance,
141 : // // so return false to prevent from being deleted now.
142 : // return false;
143 : // }
144 : // int count_ = 0;
145 : // };
146 : // ...
147 : // queue_.PostDelayedTask(
148 : // std::unique_ptr<QueuedTask>(new TimerTask()), 1000);
149 : //
150 : // For more examples, see task_queue_unittests.cc.
151 : //
152 : // A note on destruction:
153 : //
154 : // When a TaskQueue is deleted, pending tasks will not be executed but they will
155 : // be deleted. The deletion of tasks may happen asynchronously after the
156 : // TaskQueue itself has been deleted or it may happen synchronously while the
157 : // TaskQueue instance is being deleted. This may vary from one OS to the next
158 : // so assumptions about lifetimes of pending tasks should not be made.
159 : class LOCKABLE TaskQueue {
160 : public:
161 : explicit TaskQueue(const char* queue_name);
162 : // TODO(tommi): Implement move semantics?
163 : ~TaskQueue();
164 :
165 : static TaskQueue* Current();
166 :
167 : // Used for DCHECKing the current queue.
168 : static bool IsCurrent(const char* queue_name);
169 : bool IsCurrent() const;
170 :
171 : // TODO(tommi): For better debuggability, implement RTC_FROM_HERE.
172 :
173 : // Ownership of the task is passed to PostTask.
174 : void PostTask(std::unique_ptr<QueuedTask> task);
175 : void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
176 : std::unique_ptr<QueuedTask> reply,
177 : TaskQueue* reply_queue);
178 : void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
179 : std::unique_ptr<QueuedTask> reply);
180 :
181 : void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
182 :
183 : template <class Closure>
184 0 : void PostTask(const Closure& closure) {
185 0 : PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
186 0 : }
187 :
188 : template <class Closure>
189 0 : void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
190 0 : PostDelayedTask(
191 0 : std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
192 : milliseconds);
193 0 : }
194 :
195 : template <class Closure1, class Closure2>
196 : void PostTaskAndReply(const Closure1& task,
197 : const Closure2& reply,
198 : TaskQueue* reply_queue) {
199 : PostTaskAndReply(
200 : std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
201 : std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
202 : reply_queue);
203 : }
204 :
205 : template <class Closure>
206 : void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
207 : const Closure& reply) {
208 : PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>(
209 : new ClosureTask<Closure>(reply)));
210 : }
211 :
212 : template <class Closure>
213 : void PostTaskAndReply(const Closure& task,
214 : std::unique_ptr<QueuedTask> reply) {
215 : PostTaskAndReply(
216 : std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)),
217 : std::move(reply));
218 : }
219 :
220 : template <class Closure1, class Closure2>
221 : void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
222 : PostTaskAndReply(
223 : std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
224 : std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
225 : }
226 :
227 : private:
228 : #if defined(WEBRTC_BUILD_LIBEVENT)
229 : static bool ThreadMain(void* context);
230 : static void OnWakeup(int socket, short flags, void* context); // NOLINT
231 : static void RunTask(int fd, short flags, void* context); // NOLINT
232 : static void RunTimer(int fd, short flags, void* context); // NOLINT
233 :
234 : class PostAndReplyTask;
235 : class SetTimerTask;
236 :
237 : void PrepareReplyTask(PostAndReplyTask* reply_task);
238 : void ReplyTaskDone(PostAndReplyTask* reply_task);
239 :
240 : struct QueueContext;
241 :
242 : int wakeup_pipe_in_ = -1;
243 : int wakeup_pipe_out_ = -1;
244 : event_base* event_base_;
245 : std::unique_ptr<event> wakeup_event_;
246 : PlatformThread thread_;
247 : rtc::CriticalSection pending_lock_;
248 : std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
249 : std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
250 : #elif defined(WEBRTC_MAC)
251 : struct QueueContext;
252 : struct TaskContext;
253 : struct PostTaskAndReplyContext;
254 : dispatch_queue_t queue_;
255 : QueueContext* const context_;
256 : #elif defined(WEBRTC_WIN)
257 : typedef std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>>
258 : DelayedTasks;
259 : static bool ThreadMain(void* context);
260 : static bool ProcessQueuedMessages(DelayedTasks* delayed_tasks);
261 :
262 : class WorkerThread : public PlatformThread {
263 : public:
264 : WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
265 : : PlatformThread(func, obj, thread_name) {}
266 :
267 : bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
268 : return PlatformThread::QueueAPC(apc_function, data);
269 : }
270 : };
271 : WorkerThread thread_;
272 : #else
273 : #error not supported.
274 : #endif
275 :
276 : RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
277 : };
278 :
279 : } // namespace rtc
280 :
281 : #endif // WEBRTC_BASE_TASK_QUEUE_H_
|