Line data Source code
1 : /*
2 : * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 : *
4 : * Use of this source code is governed by a BSD-style license
5 : * that can be found in the LICENSE file in the root of the source
6 : * tree. An additional intellectual property rights grant can be found
7 : * in the file PATENTS. All contributing project authors may
8 : * be found in the AUTHORS file in the root of the source tree.
9 : */
10 :
11 : #include "webrtc/base/task_queue.h"
12 :
13 : #include <fcntl.h>
14 : #include <string.h>
15 : #include <unistd.h>
16 :
17 : #include "libevent/include/event.h"
18 : #include "webrtc/base/checks.h"
19 : #include "webrtc/base/logging.h"
20 : #include "webrtc/base/task_queue_posix.h"
21 : #include "webrtc/base/timeutils.h"
22 :
23 : namespace rtc {
24 : using internal::GetQueuePtrTls;
25 : using internal::AutoSetCurrentQueuePtr;
26 :
27 : namespace {
28 : static const char kQuit = 1;
29 : static const char kRunTask = 2;
30 :
31 : struct TimerEvent {
32 0 : explicit TimerEvent(std::unique_ptr<QueuedTask> task)
33 0 : : task(std::move(task)) {}
34 0 : ~TimerEvent() { event_del(&ev); }
35 : event ev;
36 : std::unique_ptr<QueuedTask> task;
37 : };
38 :
39 0 : bool SetNonBlocking(int fd) {
40 0 : const int flags = fcntl(fd, F_GETFL);
41 0 : RTC_CHECK(flags != -1);
42 0 : return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
43 : }
44 :
45 : // TODO(tommi): This is a hack to support two versions of libevent that we're
46 : // compatible with. The method we really want to call is event_assign(),
47 : // since event_set() has been marked as deprecated (and doesn't accept
48 : // passing event_base__ as a parameter). However, the version of libevent
49 : // that we have in Chromium, doesn't have event_assign(), so we need to call
50 : // event_set() there.
51 0 : void EventAssign(struct event* ev,
52 : struct event_base* base,
53 : int fd,
54 : short events,
55 : void (*callback)(int, short, void*),
56 : void* arg) {
57 : #if defined(_EVENT2_EVENT_H_)
58 : RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
59 : #else
60 0 : event_set(ev, fd, events, callback, arg);
61 0 : RTC_CHECK_EQ(0, event_base_set(base, ev));
62 : #endif
63 0 : }
64 : } // namespace
65 :
66 0 : struct TaskQueue::QueueContext {
67 0 : explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
68 : TaskQueue* queue;
69 : bool is_active;
70 : // Holds a list of events pending timers for cleanup when the loop exits.
71 : std::list<TimerEvent*> pending_timers_;
72 : };
73 :
74 : class TaskQueue::PostAndReplyTask : public QueuedTask {
75 : public:
76 0 : PostAndReplyTask(std::unique_ptr<QueuedTask> task,
77 : std::unique_ptr<QueuedTask> reply,
78 : TaskQueue* reply_queue)
79 0 : : task_(std::move(task)),
80 0 : reply_(std::move(reply)),
81 0 : reply_queue_(reply_queue) {
82 0 : reply_queue->PrepareReplyTask(this);
83 0 : }
84 :
85 0 : ~PostAndReplyTask() override {
86 0 : CritScope lock(&lock_);
87 0 : if (reply_queue_)
88 0 : reply_queue_->ReplyTaskDone(this);
89 0 : }
90 :
91 0 : void OnReplyQueueGone() {
92 0 : CritScope lock(&lock_);
93 0 : reply_queue_ = nullptr;
94 0 : }
95 :
96 : private:
97 0 : bool Run() override {
98 0 : if (!task_->Run())
99 0 : task_.release();
100 :
101 0 : CritScope lock(&lock_);
102 0 : if (reply_queue_)
103 0 : reply_queue_->PostTask(std::move(reply_));
104 0 : return true;
105 : }
106 :
107 : CriticalSection lock_;
108 : std::unique_ptr<QueuedTask> task_;
109 : std::unique_ptr<QueuedTask> reply_;
110 : TaskQueue* reply_queue_ GUARDED_BY(lock_);
111 : };
112 :
113 0 : class TaskQueue::SetTimerTask : public QueuedTask {
114 : public:
115 0 : SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
116 0 : : task_(std::move(task)),
117 : milliseconds_(milliseconds),
118 0 : posted_(Time32()) {}
119 :
120 : private:
121 0 : bool Run() override {
122 : // Compensate for the time that has passed since construction
123 : // and until we got here.
124 0 : uint32_t post_time = Time32() - posted_;
125 0 : TaskQueue::Current()->PostDelayedTask(
126 0 : std::move(task_),
127 0 : post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
128 0 : return true;
129 : }
130 :
131 : std::unique_ptr<QueuedTask> task_;
132 : const uint32_t milliseconds_;
133 : const uint32_t posted_;
134 : };
135 :
136 0 : TaskQueue::TaskQueue(const char* queue_name)
137 0 : : event_base_(event_base_new()),
138 0 : wakeup_event_(new event()),
139 0 : thread_(&TaskQueue::ThreadMain, this, queue_name) {
140 0 : RTC_DCHECK(queue_name);
141 : int fds[2];
142 0 : RTC_CHECK(pipe(fds) == 0);
143 0 : SetNonBlocking(fds[0]);
144 0 : SetNonBlocking(fds[1]);
145 0 : wakeup_pipe_out_ = fds[0];
146 0 : wakeup_pipe_in_ = fds[1];
147 0 : EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
148 0 : EV_READ | EV_PERSIST, OnWakeup, this);
149 0 : event_add(wakeup_event_.get(), 0);
150 0 : thread_.Start();
151 0 : }
152 :
153 0 : TaskQueue::~TaskQueue() {
154 0 : RTC_DCHECK(!IsCurrent());
155 : struct timespec ts;
156 0 : char message = kQuit;
157 0 : while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
158 : // The queue is full, so we have no choice but to wait and retry.
159 0 : RTC_CHECK_EQ(EAGAIN, errno);
160 0 : ts.tv_sec = 0;
161 0 : ts.tv_nsec = 1000000;
162 0 : nanosleep(&ts, nullptr);
163 : }
164 :
165 0 : thread_.Stop();
166 :
167 0 : event_del(wakeup_event_.get());
168 0 : close(wakeup_pipe_in_);
169 0 : close(wakeup_pipe_out_);
170 0 : wakeup_pipe_in_ = -1;
171 0 : wakeup_pipe_out_ = -1;
172 :
173 : {
174 : // Synchronize against any pending reply tasks that might be running on
175 : // other queues.
176 0 : CritScope lock(&pending_lock_);
177 0 : for (auto* reply : pending_replies_)
178 0 : reply->OnReplyQueueGone();
179 0 : pending_replies_.clear();
180 : }
181 :
182 0 : event_base_free(event_base_);
183 0 : }
184 :
185 : // static
186 0 : TaskQueue* TaskQueue::Current() {
187 : QueueContext* ctx =
188 0 : static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
189 0 : return ctx ? ctx->queue : nullptr;
190 : }
191 :
192 : // static
193 0 : bool TaskQueue::IsCurrent(const char* queue_name) {
194 0 : TaskQueue* current = Current();
195 0 : return current && current->thread_.name().compare(queue_name) == 0;
196 : }
197 :
198 0 : bool TaskQueue::IsCurrent() const {
199 0 : return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
200 : }
201 :
202 0 : void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
203 0 : RTC_DCHECK(task.get());
204 : // libevent isn't thread safe. This means that we can't use methods such
205 : // as event_base_once to post tasks to the worker thread from a different
206 : // thread. However, we can use it when posting from the worker thread itself.
207 0 : if (IsCurrent()) {
208 0 : if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
209 0 : task.get(), nullptr) == 0) {
210 0 : task.release();
211 : }
212 : } else {
213 0 : QueuedTask* task_id = task.get(); // Only used for comparison.
214 : {
215 0 : CritScope lock(&pending_lock_);
216 0 : pending_.push_back(std::move(task));
217 : }
218 0 : char message = kRunTask;
219 0 : if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
220 0 : LOG(WARNING) << "Failed to queue task.";
221 0 : CritScope lock(&pending_lock_);
222 0 : pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
223 0 : return t.get() == task_id;
224 0 : });
225 : }
226 : }
227 0 : }
228 :
229 0 : void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
230 : uint32_t milliseconds) {
231 0 : if (IsCurrent()) {
232 0 : TimerEvent* timer = new TimerEvent(std::move(task));
233 0 : EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
234 : QueueContext* ctx =
235 0 : static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
236 0 : ctx->pending_timers_.push_back(timer);
237 0 : timeval tv = {static_cast<time_t>(milliseconds) / 1000,
238 0 : static_cast<suseconds_t>((milliseconds % 1000) * 1000)};
239 0 : event_add(&timer->ev, &tv);
240 : } else {
241 0 : PostTask(std::unique_ptr<QueuedTask>(
242 0 : new SetTimerTask(std::move(task), milliseconds)));
243 : }
244 0 : }
245 :
246 0 : void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
247 : std::unique_ptr<QueuedTask> reply,
248 : TaskQueue* reply_queue) {
249 : std::unique_ptr<QueuedTask> wrapper_task(
250 0 : new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
251 0 : PostTask(std::move(wrapper_task));
252 0 : }
253 :
254 0 : void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
255 : std::unique_ptr<QueuedTask> reply) {
256 0 : return PostTaskAndReply(std::move(task), std::move(reply), Current());
257 : }
258 :
259 : // static
260 0 : bool TaskQueue::ThreadMain(void* context) {
261 0 : TaskQueue* me = static_cast<TaskQueue*>(context);
262 :
263 0 : QueueContext queue_context(me);
264 0 : pthread_setspecific(GetQueuePtrTls(), &queue_context);
265 :
266 0 : while (queue_context.is_active)
267 0 : event_base_loop(me->event_base_, 0);
268 :
269 0 : pthread_setspecific(GetQueuePtrTls(), nullptr);
270 :
271 0 : for (TimerEvent* timer : queue_context.pending_timers_)
272 0 : delete timer;
273 :
274 0 : return false;
275 : }
276 :
277 : // static
278 0 : void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
279 : QueueContext* ctx =
280 0 : static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
281 0 : RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
282 : char buf;
283 0 : RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
284 0 : switch (buf) {
285 : case kQuit:
286 0 : ctx->is_active = false;
287 0 : event_base_loopbreak(ctx->queue->event_base_);
288 0 : break;
289 : case kRunTask: {
290 0 : std::unique_ptr<QueuedTask> task;
291 : {
292 0 : CritScope lock(&ctx->queue->pending_lock_);
293 0 : RTC_DCHECK(!ctx->queue->pending_.empty());
294 0 : task = std::move(ctx->queue->pending_.front());
295 0 : ctx->queue->pending_.pop_front();
296 0 : RTC_DCHECK(task.get());
297 : }
298 0 : if (!task->Run())
299 0 : task.release();
300 0 : break;
301 : }
302 : default:
303 0 : RTC_NOTREACHED();
304 0 : break;
305 : }
306 0 : }
307 :
308 : // static
309 0 : void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
310 0 : auto* task = static_cast<QueuedTask*>(context);
311 0 : if (task->Run())
312 0 : delete task;
313 0 : }
314 :
315 : // static
316 0 : void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
317 0 : TimerEvent* timer = static_cast<TimerEvent*>(context);
318 0 : if (!timer->task->Run())
319 0 : timer->task.release();
320 : QueueContext* ctx =
321 0 : static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
322 0 : ctx->pending_timers_.remove(timer);
323 0 : delete timer;
324 0 : }
325 :
326 0 : void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
327 0 : RTC_DCHECK(reply_task);
328 0 : CritScope lock(&pending_lock_);
329 0 : pending_replies_.push_back(reply_task);
330 0 : }
331 :
332 0 : void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
333 0 : CritScope lock(&pending_lock_);
334 0 : pending_replies_.remove(reply_task);
335 0 : }
336 :
337 : } // namespace rtc
|