Line data Source code
1 : /*
2 : * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
3 : *
4 : * Use of this source code is governed by a BSD-style license
5 : * that can be found in the LICENSE file in the root of the source
6 : * tree. An additional intellectual property rights grant can be found
7 : * in the file PATENTS. All contributing project authors may
8 : * be found in the AUTHORS file in the root of the source tree.
9 : */
10 :
11 : #include "webrtc/modules/utility/source/process_thread_impl.h"
12 :
13 : #include "webrtc/base/checks.h"
14 : #include "webrtc/base/task_queue.h"
15 : #include "webrtc/base/timeutils.h"
16 : #include "webrtc/modules/include/module.h"
17 : #include "webrtc/system_wrappers/include/logging.h"
18 :
19 : namespace webrtc {
20 : namespace {
21 :
22 : // We use this constant internally to signal that a module has requested
23 : // a callback right away. When this is set, no call to TimeUntilNextProcess
24 : // should be made, but Process() should be called directly.
25 : const int64_t kCallProcessImmediately = -1;
26 :
27 0 : int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
28 0 : int64_t interval = module->TimeUntilNextProcess();
29 0 : if (interval < 0) {
30 : // Falling behind, we should call the callback now.
31 0 : return time_now;
32 : }
33 0 : return time_now + interval;
34 : }
35 : }
36 :
37 0 : ProcessThread::~ProcessThread() {}
38 :
39 : // static
40 0 : std::unique_ptr<ProcessThread> ProcessThread::Create(
41 : const char* thread_name) {
42 0 : return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
43 : }
44 :
45 0 : ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
46 : : wake_up_(EventWrapper::Create()),
47 : stop_(false),
48 0 : thread_name_(thread_name) {}
49 :
50 0 : ProcessThreadImpl::~ProcessThreadImpl() {
51 0 : RTC_DCHECK(thread_checker_.CalledOnValidThread());
52 0 : RTC_DCHECK(!thread_.get());
53 0 : RTC_DCHECK(!stop_);
54 :
55 0 : while (!queue_.empty()) {
56 0 : delete queue_.front();
57 0 : queue_.pop();
58 : }
59 0 : }
60 :
61 0 : void ProcessThreadImpl::Start() {
62 0 : RTC_DCHECK(thread_checker_.CalledOnValidThread());
63 0 : RTC_DCHECK(!thread_.get());
64 0 : if (thread_.get())
65 0 : return;
66 :
67 0 : RTC_DCHECK(!stop_);
68 :
69 : {
70 : // TODO(tommi): Since DeRegisterModule is currently being called from
71 : // different threads in some cases (ChannelOwner), we need to lock access to
72 : // the modules_ collection even on the controller thread.
73 : // Once we've cleaned up those places, we can remove this lock.
74 0 : rtc::CritScope lock(&lock_);
75 0 : for (ModuleCallback& m : modules_)
76 0 : m.module->ProcessThreadAttached(this);
77 : }
78 :
79 0 : thread_.reset(
80 0 : new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
81 0 : thread_->Start();
82 : }
83 :
84 0 : void ProcessThreadImpl::Stop() {
85 0 : RTC_DCHECK(thread_checker_.CalledOnValidThread());
86 0 : if(!thread_.get())
87 0 : return;
88 :
89 : {
90 0 : rtc::CritScope lock(&lock_);
91 0 : stop_ = true;
92 : }
93 :
94 0 : wake_up_->Set();
95 :
96 0 : thread_->Stop();
97 0 : stop_ = false;
98 :
99 : // TODO(tommi): Since DeRegisterModule is currently being called from
100 : // different threads in some cases (ChannelOwner), we need to lock access to
101 : // the modules_ collection even on the controller thread.
102 : // Since DeRegisterModule also checks thread_, we also need to hold the
103 : // lock for the .reset() operation.
104 : // Once we've cleaned up those places, we can remove this lock.
105 0 : rtc::CritScope lock(&lock_);
106 0 : thread_.reset();
107 0 : for (ModuleCallback& m : modules_)
108 0 : m.module->ProcessThreadAttached(nullptr);
109 : }
110 :
111 0 : void ProcessThreadImpl::WakeUp(Module* module) {
112 : // Allowed to be called on any thread.
113 : {
114 0 : rtc::CritScope lock(&lock_);
115 0 : for (ModuleCallback& m : modules_) {
116 0 : if (m.module == module)
117 0 : m.next_callback = kCallProcessImmediately;
118 : }
119 : }
120 0 : wake_up_->Set();
121 0 : }
122 :
123 0 : void ProcessThreadImpl::PostTask(std::unique_ptr<rtc::QueuedTask> task) {
124 : // Allowed to be called on any thread.
125 : {
126 0 : rtc::CritScope lock(&lock_);
127 0 : queue_.push(task.release());
128 : }
129 0 : wake_up_->Set();
130 0 : }
131 :
132 0 : void ProcessThreadImpl::RegisterModule(Module* module) {
133 : // RTC_DCHECK(thread_checker_.CalledOnValidThread()); Not really needed
134 0 : RTC_DCHECK(module);
135 :
136 : #if RTC_DCHECK_IS_ON
137 : {
138 : // Catch programmer error.
139 0 : rtc::CritScope lock(&lock_);
140 0 : for (const ModuleCallback& mc : modules_)
141 0 : RTC_DCHECK(mc.module != module);
142 : }
143 : #endif
144 :
145 : // Now that we know the module isn't in the list, we'll call out to notify
146 : // the module that it's attached to the worker thread. We don't hold
147 : // the lock while we make this call.
148 0 : if (thread_.get())
149 0 : module->ProcessThreadAttached(this);
150 :
151 : {
152 0 : rtc::CritScope lock(&lock_);
153 0 : modules_.push_back(ModuleCallback(module));
154 : }
155 :
156 : // Wake the thread calling ProcessThreadImpl::Process() to update the
157 : // waiting time. The waiting time for the just registered module may be
158 : // shorter than all other registered modules.
159 0 : wake_up_->Set();
160 0 : }
161 :
162 0 : void ProcessThreadImpl::DeRegisterModule(Module* module) {
163 : // Allowed to be called on any thread.
164 : // TODO(tommi): Disallow this ^^^
165 0 : RTC_DCHECK(module);
166 :
167 : {
168 0 : rtc::CritScope lock(&lock_);
169 0 : modules_.remove_if([&module](const ModuleCallback& m) {
170 0 : return m.module == module;
171 0 : });
172 :
173 : // TODO(tommi): we currently need to hold the lock while calling out to
174 : // ProcessThreadAttached. This is to make sure that the thread hasn't been
175 : // destroyed while we attach the module. Once we can make sure
176 : // DeRegisterModule isn't being called on arbitrary threads, we can move the
177 : // |if (thread_.get())| check and ProcessThreadAttached() call outside the
178 : // lock scope.
179 :
180 : // Notify the module that it's been detached.
181 0 : if (thread_.get())
182 0 : module->ProcessThreadAttached(nullptr);
183 : }
184 0 : }
185 :
186 : // static
187 0 : bool ProcessThreadImpl::Run(void* obj) {
188 0 : return static_cast<ProcessThreadImpl*>(obj)->Process();
189 : }
190 :
191 0 : bool ProcessThreadImpl::Process() {
192 0 : int64_t now = rtc::TimeMillis();
193 0 : int64_t next_checkpoint = now + (1000 * 60);
194 :
195 : {
196 0 : rtc::CritScope lock(&lock_);
197 0 : if (stop_)
198 0 : return false;
199 0 : for (ModuleCallback& m : modules_) {
200 : // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
201 : // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
202 : // operation should not require taking a lock, so querying all modules
203 : // should run in a matter of nanoseconds.
204 0 : if (m.next_callback == 0)
205 0 : m.next_callback = GetNextCallbackTime(m.module, now);
206 :
207 0 : if (m.next_callback <= now ||
208 0 : m.next_callback == kCallProcessImmediately) {
209 0 : m.module->Process();
210 : // Use a new 'now' reference to calculate when the next callback
211 : // should occur. We'll continue to use 'now' above for the baseline
212 : // of calculating how long we should wait, to reduce variance.
213 0 : int64_t new_now = rtc::TimeMillis();
214 0 : m.next_callback = GetNextCallbackTime(m.module, new_now);
215 : }
216 :
217 0 : if (m.next_callback < next_checkpoint)
218 0 : next_checkpoint = m.next_callback;
219 : }
220 :
221 0 : while (!queue_.empty()) {
222 0 : rtc::QueuedTask* task = queue_.front();
223 0 : queue_.pop();
224 0 : lock_.Leave();
225 0 : task->Run();
226 0 : delete task;
227 0 : lock_.Enter();
228 : }
229 : }
230 :
231 0 : int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
232 0 : if (time_to_wait > 0)
233 0 : wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
234 :
235 0 : return true;
236 : }
237 : } // namespace webrtc
|