LCOV - code coverage report
Current view: top level - media/webrtc/trunk/webrtc/modules/utility/source - process_thread_impl.cc (source / functions) Hit Total Coverage
Test: output.info Lines: 0 104 0.0 %
Date: 2017-07-14 16:53:18 Functions: 0 16 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
       3             :  *
       4             :  *  Use of this source code is governed by a BSD-style license
       5             :  *  that can be found in the LICENSE file in the root of the source
       6             :  *  tree. An additional intellectual property rights grant can be found
       7             :  *  in the file PATENTS.  All contributing project authors may
       8             :  *  be found in the AUTHORS file in the root of the source tree.
       9             :  */
      10             : 
      11             : #include "webrtc/modules/utility/source/process_thread_impl.h"
      12             : 
      13             : #include "webrtc/base/checks.h"
      14             : #include "webrtc/base/task_queue.h"
      15             : #include "webrtc/base/timeutils.h"
      16             : #include "webrtc/modules/include/module.h"
      17             : #include "webrtc/system_wrappers/include/logging.h"
      18             : 
      19             : namespace webrtc {
      20             : namespace {
      21             : 
      22             : // We use this constant internally to signal that a module has requested
      23             : // a callback right away.  When this is set, no call to TimeUntilNextProcess
      24             : // should be made, but Process() should be called directly.
      25             : const int64_t kCallProcessImmediately = -1;
      26             : 
      27           0 : int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
      28           0 :   int64_t interval = module->TimeUntilNextProcess();
      29           0 :   if (interval < 0) {
      30             :     // Falling behind, we should call the callback now.
      31           0 :     return time_now;
      32             :   }
      33           0 :   return time_now + interval;
      34             : }
      35             : }
      36             : 
      37           0 : ProcessThread::~ProcessThread() {}
      38             : 
      39             : // static
      40           0 : std::unique_ptr<ProcessThread> ProcessThread::Create(
      41             :     const char* thread_name) {
      42           0 :   return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
      43             : }
      44             : 
      45           0 : ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
      46             :     : wake_up_(EventWrapper::Create()),
      47             :       stop_(false),
      48           0 :       thread_name_(thread_name) {}
      49             : 
      50           0 : ProcessThreadImpl::~ProcessThreadImpl() {
      51           0 :   RTC_DCHECK(thread_checker_.CalledOnValidThread());
      52           0 :   RTC_DCHECK(!thread_.get());
      53           0 :   RTC_DCHECK(!stop_);
      54             : 
      55           0 :   while (!queue_.empty()) {
      56           0 :     delete queue_.front();
      57           0 :     queue_.pop();
      58             :   }
      59           0 : }
      60             : 
      61           0 : void ProcessThreadImpl::Start() {
      62           0 :   RTC_DCHECK(thread_checker_.CalledOnValidThread());
      63           0 :   RTC_DCHECK(!thread_.get());
      64           0 :   if (thread_.get())
      65           0 :     return;
      66             : 
      67           0 :   RTC_DCHECK(!stop_);
      68             : 
      69             :   {
      70             :     // TODO(tommi): Since DeRegisterModule is currently being called from
      71             :     // different threads in some cases (ChannelOwner), we need to lock access to
      72             :     // the modules_ collection even on the controller thread.
      73             :     // Once we've cleaned up those places, we can remove this lock.
      74           0 :     rtc::CritScope lock(&lock_);
      75           0 :     for (ModuleCallback& m : modules_)
      76           0 :       m.module->ProcessThreadAttached(this);
      77             :   }
      78             : 
      79           0 :   thread_.reset(
      80           0 :       new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
      81           0 :   thread_->Start();
      82             : }
      83             : 
      84           0 : void ProcessThreadImpl::Stop() {
      85           0 :   RTC_DCHECK(thread_checker_.CalledOnValidThread());
      86           0 :   if(!thread_.get())
      87           0 :     return;
      88             : 
      89             :   {
      90           0 :     rtc::CritScope lock(&lock_);
      91           0 :     stop_ = true;
      92             :   }
      93             : 
      94           0 :   wake_up_->Set();
      95             : 
      96           0 :   thread_->Stop();
      97           0 :   stop_ = false;
      98             : 
      99             :   // TODO(tommi): Since DeRegisterModule is currently being called from
     100             :   // different threads in some cases (ChannelOwner), we need to lock access to
     101             :   // the modules_ collection even on the controller thread.
     102             :   // Since DeRegisterModule also checks thread_, we also need to hold the
     103             :   // lock for the .reset() operation.
     104             :   // Once we've cleaned up those places, we can remove this lock.
     105           0 :   rtc::CritScope lock(&lock_);
     106           0 :   thread_.reset();
     107           0 :   for (ModuleCallback& m : modules_)
     108           0 :     m.module->ProcessThreadAttached(nullptr);
     109             : }
     110             : 
     111           0 : void ProcessThreadImpl::WakeUp(Module* module) {
     112             :   // Allowed to be called on any thread.
     113             :   {
     114           0 :     rtc::CritScope lock(&lock_);
     115           0 :     for (ModuleCallback& m : modules_) {
     116           0 :       if (m.module == module)
     117           0 :         m.next_callback = kCallProcessImmediately;
     118             :     }
     119             :   }
     120           0 :   wake_up_->Set();
     121           0 : }
     122             : 
     123           0 : void ProcessThreadImpl::PostTask(std::unique_ptr<rtc::QueuedTask> task) {
     124             :   // Allowed to be called on any thread.
     125             :   {
     126           0 :     rtc::CritScope lock(&lock_);
     127           0 :     queue_.push(task.release());
     128             :   }
     129           0 :   wake_up_->Set();
     130           0 : }
     131             : 
     132           0 : void ProcessThreadImpl::RegisterModule(Module* module) {
     133             :   // RTC_DCHECK(thread_checker_.CalledOnValidThread());  Not really needed
     134           0 :   RTC_DCHECK(module);
     135             : 
     136             : #if RTC_DCHECK_IS_ON
     137             :   {
     138             :     // Catch programmer error.
     139           0 :     rtc::CritScope lock(&lock_);
     140           0 :     for (const ModuleCallback& mc : modules_)
     141           0 :       RTC_DCHECK(mc.module != module);
     142             :   }
     143             : #endif
     144             : 
     145             :   // Now that we know the module isn't in the list, we'll call out to notify
     146             :   // the module that it's attached to the worker thread.  We don't hold
     147             :   // the lock while we make this call.
     148           0 :   if (thread_.get())
     149           0 :     module->ProcessThreadAttached(this);
     150             : 
     151             :   {
     152           0 :     rtc::CritScope lock(&lock_);
     153           0 :     modules_.push_back(ModuleCallback(module));
     154             :   }
     155             : 
     156             :   // Wake the thread calling ProcessThreadImpl::Process() to update the
     157             :   // waiting time. The waiting time for the just registered module may be
     158             :   // shorter than all other registered modules.
     159           0 :   wake_up_->Set();
     160           0 : }
     161             : 
     162           0 : void ProcessThreadImpl::DeRegisterModule(Module* module) {
     163             :   // Allowed to be called on any thread.
     164             :   // TODO(tommi): Disallow this ^^^
     165           0 :   RTC_DCHECK(module);
     166             : 
     167             :   {
     168           0 :     rtc::CritScope lock(&lock_);
     169           0 :     modules_.remove_if([&module](const ModuleCallback& m) {
     170           0 :         return m.module == module;
     171           0 :       });
     172             : 
     173             :     // TODO(tommi): we currently need to hold the lock while calling out to
     174             :     // ProcessThreadAttached.  This is to make sure that the thread hasn't been
     175             :     // destroyed while we attach the module.  Once we can make sure
     176             :     // DeRegisterModule isn't being called on arbitrary threads, we can move the
     177             :     // |if (thread_.get())| check and ProcessThreadAttached() call outside the
     178             :     // lock scope.
     179             : 
     180             :     // Notify the module that it's been detached.
     181           0 :     if (thread_.get())
     182           0 :       module->ProcessThreadAttached(nullptr);
     183             :   }
     184           0 : }
     185             : 
     186             : // static
     187           0 : bool ProcessThreadImpl::Run(void* obj) {
     188           0 :   return static_cast<ProcessThreadImpl*>(obj)->Process();
     189             : }
     190             : 
     191           0 : bool ProcessThreadImpl::Process() {
     192           0 :   int64_t now = rtc::TimeMillis();
     193           0 :   int64_t next_checkpoint = now + (1000 * 60);
     194             : 
     195             :   {
     196           0 :     rtc::CritScope lock(&lock_);
     197           0 :     if (stop_)
     198           0 :       return false;
     199           0 :     for (ModuleCallback& m : modules_) {
     200             :       // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
     201             :       // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
     202             :       // operation should not require taking a lock, so querying all modules
     203             :       // should run in a matter of nanoseconds.
     204           0 :       if (m.next_callback == 0)
     205           0 :         m.next_callback = GetNextCallbackTime(m.module, now);
     206             : 
     207           0 :       if (m.next_callback <= now ||
     208           0 :           m.next_callback == kCallProcessImmediately) {
     209           0 :         m.module->Process();
     210             :         // Use a new 'now' reference to calculate when the next callback
     211             :         // should occur.  We'll continue to use 'now' above for the baseline
     212             :         // of calculating how long we should wait, to reduce variance.
     213           0 :         int64_t new_now = rtc::TimeMillis();
     214           0 :         m.next_callback = GetNextCallbackTime(m.module, new_now);
     215             :       }
     216             : 
     217           0 :       if (m.next_callback < next_checkpoint)
     218           0 :         next_checkpoint = m.next_callback;
     219             :     }
     220             : 
     221           0 :     while (!queue_.empty()) {
     222           0 :       rtc::QueuedTask* task = queue_.front();
     223           0 :       queue_.pop();
     224           0 :       lock_.Leave();
     225           0 :       task->Run();
     226           0 :       delete task;
     227           0 :       lock_.Enter();
     228             :     }
     229             :   }
     230             : 
     231           0 :   int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
     232           0 :   if (time_to_wait > 0)
     233           0 :     wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
     234             : 
     235           0 :   return true;
     236             : }
     237             : }  // namespace webrtc

Generated by: LCOV version 1.13