LCOV - code coverage report
Current view: top level - media/webrtc/trunk/webrtc/base - task_queue_libevent.cc (source / functions) Hit Total Coverage
Test: output.info Lines: 0 175 0.0 %
Date: 2017-07-14 16:53:18 Functions: 0 31 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
       3             :  *
       4             :  *  Use of this source code is governed by a BSD-style license
       5             :  *  that can be found in the LICENSE file in the root of the source
       6             :  *  tree. An additional intellectual property rights grant can be found
       7             :  *  in the file PATENTS.  All contributing project authors may
       8             :  *  be found in the AUTHORS file in the root of the source tree.
       9             :  */
      10             : 
      11             : #include "webrtc/base/task_queue.h"
      12             : 
      13             : #include <fcntl.h>
      14             : #include <string.h>
      15             : #include <unistd.h>
      16             : 
      17             : #include "libevent/include/event.h"
      18             : #include "webrtc/base/checks.h"
      19             : #include "webrtc/base/logging.h"
      20             : #include "webrtc/base/task_queue_posix.h"
      21             : #include "webrtc/base/timeutils.h"
      22             : 
      23             : namespace rtc {
      24             : using internal::GetQueuePtrTls;
      25             : using internal::AutoSetCurrentQueuePtr;
      26             : 
      27             : namespace {
      28             : static const char kQuit = 1;
      29             : static const char kRunTask = 2;
      30             : 
      31             : struct TimerEvent {
      32           0 :   explicit TimerEvent(std::unique_ptr<QueuedTask> task)
      33           0 :       : task(std::move(task)) {}
      34           0 :   ~TimerEvent() { event_del(&ev); }
      35             :   event ev;
      36             :   std::unique_ptr<QueuedTask> task;
      37             : };
      38             : 
      39           0 : bool SetNonBlocking(int fd) {
      40           0 :   const int flags = fcntl(fd, F_GETFL);
      41           0 :   RTC_CHECK(flags != -1);
      42           0 :   return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
      43             : }
      44             : 
      45             : // TODO(tommi): This is a hack to support two versions of libevent that we're
      46             : // compatible with.  The method we really want to call is event_assign(),
      47             : // since event_set() has been marked as deprecated (and doesn't accept
      48             : // passing event_base__ as a parameter).  However, the version of libevent
      49             : // that we have in Chromium, doesn't have event_assign(), so we need to call
      50             : // event_set() there.
      51           0 : void EventAssign(struct event* ev,
      52             :                  struct event_base* base,
      53             :                  int fd,
      54             :                  short events,
      55             :                  void (*callback)(int, short, void*),
      56             :                  void* arg) {
      57             : #if defined(_EVENT2_EVENT_H_)
      58             :   RTC_CHECK_EQ(0, event_assign(ev, base, fd, events, callback, arg));
      59             : #else
      60           0 :   event_set(ev, fd, events, callback, arg);
      61           0 :   RTC_CHECK_EQ(0, event_base_set(base, ev));
      62             : #endif
      63           0 : }
      64             : }  // namespace
      65             : 
      66           0 : struct TaskQueue::QueueContext {
      67           0 :   explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
      68             :   TaskQueue* queue;
      69             :   bool is_active;
      70             :   // Holds a list of events pending timers for cleanup when the loop exits.
      71             :   std::list<TimerEvent*> pending_timers_;
      72             : };
      73             : 
      74             : class TaskQueue::PostAndReplyTask : public QueuedTask {
      75             :  public:
      76           0 :   PostAndReplyTask(std::unique_ptr<QueuedTask> task,
      77             :                    std::unique_ptr<QueuedTask> reply,
      78             :                    TaskQueue* reply_queue)
      79           0 :       : task_(std::move(task)),
      80           0 :         reply_(std::move(reply)),
      81           0 :         reply_queue_(reply_queue) {
      82           0 :     reply_queue->PrepareReplyTask(this);
      83           0 :   }
      84             : 
      85           0 :   ~PostAndReplyTask() override {
      86           0 :     CritScope lock(&lock_);
      87           0 :     if (reply_queue_)
      88           0 :       reply_queue_->ReplyTaskDone(this);
      89           0 :   }
      90             : 
      91           0 :   void OnReplyQueueGone() {
      92           0 :     CritScope lock(&lock_);
      93           0 :     reply_queue_ = nullptr;
      94           0 :   }
      95             : 
      96             :  private:
      97           0 :   bool Run() override {
      98           0 :     if (!task_->Run())
      99           0 :       task_.release();
     100             : 
     101           0 :     CritScope lock(&lock_);
     102           0 :     if (reply_queue_)
     103           0 :       reply_queue_->PostTask(std::move(reply_));
     104           0 :     return true;
     105             :   }
     106             : 
     107             :   CriticalSection lock_;
     108             :   std::unique_ptr<QueuedTask> task_;
     109             :   std::unique_ptr<QueuedTask> reply_;
     110             :   TaskQueue* reply_queue_ GUARDED_BY(lock_);
     111             : };
     112             : 
     113           0 : class TaskQueue::SetTimerTask : public QueuedTask {
     114             :  public:
     115           0 :   SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
     116           0 :       : task_(std::move(task)),
     117             :         milliseconds_(milliseconds),
     118           0 :         posted_(Time32()) {}
     119             : 
     120             :  private:
     121           0 :   bool Run() override {
     122             :     // Compensate for the time that has passed since construction
     123             :     // and until we got here.
     124           0 :     uint32_t post_time = Time32() - posted_;
     125           0 :     TaskQueue::Current()->PostDelayedTask(
     126           0 :         std::move(task_),
     127           0 :         post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
     128           0 :     return true;
     129             :   }
     130             : 
     131             :   std::unique_ptr<QueuedTask> task_;
     132             :   const uint32_t milliseconds_;
     133             :   const uint32_t posted_;
     134             : };
     135             : 
     136           0 : TaskQueue::TaskQueue(const char* queue_name)
     137           0 :     : event_base_(event_base_new()),
     138           0 :       wakeup_event_(new event()),
     139           0 :       thread_(&TaskQueue::ThreadMain, this, queue_name) {
     140           0 :   RTC_DCHECK(queue_name);
     141             :   int fds[2];
     142           0 :   RTC_CHECK(pipe(fds) == 0);
     143           0 :   SetNonBlocking(fds[0]);
     144           0 :   SetNonBlocking(fds[1]);
     145           0 :   wakeup_pipe_out_ = fds[0];
     146           0 :   wakeup_pipe_in_ = fds[1];
     147           0 :   EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
     148           0 :               EV_READ | EV_PERSIST, OnWakeup, this);
     149           0 :   event_add(wakeup_event_.get(), 0);
     150           0 :   thread_.Start();
     151           0 : }
     152             : 
     153           0 : TaskQueue::~TaskQueue() {
     154           0 :   RTC_DCHECK(!IsCurrent());
     155             :   struct timespec ts;
     156           0 :   char message = kQuit;
     157           0 :   while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
     158             :     // The queue is full, so we have no choice but to wait and retry.
     159           0 :     RTC_CHECK_EQ(EAGAIN, errno);
     160           0 :     ts.tv_sec = 0;
     161           0 :     ts.tv_nsec = 1000000;
     162           0 :     nanosleep(&ts, nullptr);
     163             :   }
     164             : 
     165           0 :   thread_.Stop();
     166             : 
     167           0 :   event_del(wakeup_event_.get());
     168           0 :   close(wakeup_pipe_in_);
     169           0 :   close(wakeup_pipe_out_);
     170           0 :   wakeup_pipe_in_ = -1;
     171           0 :   wakeup_pipe_out_ = -1;
     172             : 
     173             :   {
     174             :     // Synchronize against any pending reply tasks that might be running on
     175             :     // other queues.
     176           0 :     CritScope lock(&pending_lock_);
     177           0 :     for (auto* reply : pending_replies_)
     178           0 :       reply->OnReplyQueueGone();
     179           0 :     pending_replies_.clear();
     180             :   }
     181             : 
     182           0 :   event_base_free(event_base_);
     183           0 : }
     184             : 
     185             : // static
     186           0 : TaskQueue* TaskQueue::Current() {
     187             :   QueueContext* ctx =
     188           0 :       static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
     189           0 :   return ctx ? ctx->queue : nullptr;
     190             : }
     191             : 
     192             : // static
     193           0 : bool TaskQueue::IsCurrent(const char* queue_name) {
     194           0 :   TaskQueue* current = Current();
     195           0 :   return current && current->thread_.name().compare(queue_name) == 0;
     196             : }
     197             : 
     198           0 : bool TaskQueue::IsCurrent() const {
     199           0 :   return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
     200             : }
     201             : 
     202           0 : void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
     203           0 :   RTC_DCHECK(task.get());
     204             :   // libevent isn't thread safe.  This means that we can't use methods such
     205             :   // as event_base_once to post tasks to the worker thread from a different
     206             :   // thread.  However, we can use it when posting from the worker thread itself.
     207           0 :   if (IsCurrent()) {
     208           0 :     if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
     209           0 :                         task.get(), nullptr) == 0) {
     210           0 :       task.release();
     211             :     }
     212             :   } else {
     213           0 :     QueuedTask* task_id = task.get();  // Only used for comparison.
     214             :     {
     215           0 :       CritScope lock(&pending_lock_);
     216           0 :       pending_.push_back(std::move(task));
     217             :     }
     218           0 :     char message = kRunTask;
     219           0 :     if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
     220           0 :       LOG(WARNING) << "Failed to queue task.";
     221           0 :       CritScope lock(&pending_lock_);
     222           0 :       pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
     223           0 :         return t.get() == task_id;
     224           0 :       });
     225             :     }
     226             :   }
     227           0 : }
     228             : 
     229           0 : void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
     230             :                                 uint32_t milliseconds) {
     231           0 :   if (IsCurrent()) {
     232           0 :     TimerEvent* timer = new TimerEvent(std::move(task));
     233           0 :     EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
     234             :     QueueContext* ctx =
     235           0 :         static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
     236           0 :     ctx->pending_timers_.push_back(timer);
     237           0 :     timeval tv = {static_cast<time_t>(milliseconds) / 1000,
     238           0 :                   static_cast<suseconds_t>((milliseconds % 1000) * 1000)};
     239           0 :     event_add(&timer->ev, &tv);
     240             :   } else {
     241           0 :     PostTask(std::unique_ptr<QueuedTask>(
     242           0 :         new SetTimerTask(std::move(task), milliseconds)));
     243             :   }
     244           0 : }
     245             : 
     246           0 : void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
     247             :                                  std::unique_ptr<QueuedTask> reply,
     248             :                                  TaskQueue* reply_queue) {
     249             :   std::unique_ptr<QueuedTask> wrapper_task(
     250           0 :       new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
     251           0 :   PostTask(std::move(wrapper_task));
     252           0 : }
     253             : 
     254           0 : void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
     255             :                                  std::unique_ptr<QueuedTask> reply) {
     256           0 :   return PostTaskAndReply(std::move(task), std::move(reply), Current());
     257             : }
     258             : 
     259             : // static
     260           0 : bool TaskQueue::ThreadMain(void* context) {
     261           0 :   TaskQueue* me = static_cast<TaskQueue*>(context);
     262             : 
     263           0 :   QueueContext queue_context(me);
     264           0 :   pthread_setspecific(GetQueuePtrTls(), &queue_context);
     265             : 
     266           0 :   while (queue_context.is_active)
     267           0 :     event_base_loop(me->event_base_, 0);
     268             : 
     269           0 :   pthread_setspecific(GetQueuePtrTls(), nullptr);
     270             : 
     271           0 :   for (TimerEvent* timer : queue_context.pending_timers_)
     272           0 :     delete timer;
     273             : 
     274           0 :   return false;
     275             : }
     276             : 
     277             : // static
     278           0 : void TaskQueue::OnWakeup(int socket, short flags, void* context) {  // NOLINT
     279             :   QueueContext* ctx =
     280           0 :       static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
     281           0 :   RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
     282             :   char buf;
     283           0 :   RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
     284           0 :   switch (buf) {
     285             :     case kQuit:
     286           0 :       ctx->is_active = false;
     287           0 :       event_base_loopbreak(ctx->queue->event_base_);
     288           0 :       break;
     289             :     case kRunTask: {
     290           0 :       std::unique_ptr<QueuedTask> task;
     291             :       {
     292           0 :         CritScope lock(&ctx->queue->pending_lock_);
     293           0 :         RTC_DCHECK(!ctx->queue->pending_.empty());
     294           0 :         task = std::move(ctx->queue->pending_.front());
     295           0 :         ctx->queue->pending_.pop_front();
     296           0 :         RTC_DCHECK(task.get());
     297             :       }
     298           0 :       if (!task->Run())
     299           0 :         task.release();
     300           0 :       break;
     301             :     }
     302             :     default:
     303           0 :       RTC_NOTREACHED();
     304           0 :       break;
     305             :   }
     306           0 : }
     307             : 
     308             : // static
     309           0 : void TaskQueue::RunTask(int fd, short flags, void* context) {  // NOLINT
     310           0 :   auto* task = static_cast<QueuedTask*>(context);
     311           0 :   if (task->Run())
     312           0 :     delete task;
     313           0 : }
     314             : 
     315             : // static
     316           0 : void TaskQueue::RunTimer(int fd, short flags, void* context) {  // NOLINT
     317           0 :   TimerEvent* timer = static_cast<TimerEvent*>(context);
     318           0 :   if (!timer->task->Run())
     319           0 :     timer->task.release();
     320             :   QueueContext* ctx =
     321           0 :       static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
     322           0 :   ctx->pending_timers_.remove(timer);
     323           0 :   delete timer;
     324           0 : }
     325             : 
     326           0 : void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
     327           0 :   RTC_DCHECK(reply_task);
     328           0 :   CritScope lock(&pending_lock_);
     329           0 :   pending_replies_.push_back(reply_task);
     330           0 : }
     331             : 
     332           0 : void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
     333           0 :   CritScope lock(&pending_lock_);
     334           0 :   pending_replies_.remove(reply_task);
     335           0 : }
     336             : 
     337             : }  // namespace rtc

Generated by: LCOV version 1.13