Line data Source code
1 : /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "EventTokenBucket.h"
8 :
9 : #include "nsICancelable.h"
10 : #include "nsIIOService.h"
11 : #include "nsNetCID.h"
12 : #include "nsNetUtil.h"
13 : #include "nsServiceManagerUtils.h"
14 : #include "nsSocketTransportService2.h"
15 : #ifdef DEBUG
16 : #include "MainThreadUtils.h"
17 : #endif
18 : #include "mozilla/SizePrintfMacros.h"
19 :
20 : #ifdef XP_WIN
21 : #include <windows.h>
22 : #include <mmsystem.h>
23 : #endif
24 :
25 : namespace mozilla {
26 : namespace net {
27 :
28 : ////////////////////////////////////////////
29 : // EventTokenBucketCancelable
30 : ////////////////////////////////////////////
31 :
32 : class TokenBucketCancelable : public nsICancelable
33 : {
34 : public:
35 : NS_DECL_THREADSAFE_ISUPPORTS
36 : NS_DECL_NSICANCELABLE
37 :
38 : explicit TokenBucketCancelable(class ATokenBucketEvent *event);
39 : void Fire();
40 :
41 : private:
42 9 : virtual ~TokenBucketCancelable() {}
43 :
44 : friend class EventTokenBucket;
45 : ATokenBucketEvent *mEvent;
46 : };
47 :
48 12 : NS_IMPL_ISUPPORTS(TokenBucketCancelable, nsICancelable)
49 :
50 3 : TokenBucketCancelable::TokenBucketCancelable(ATokenBucketEvent *event)
51 3 : : mEvent(event)
52 : {
53 3 : }
54 :
55 : NS_IMETHODIMP
56 0 : TokenBucketCancelable::Cancel(nsresult reason)
57 : {
58 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
59 0 : mEvent = nullptr;
60 0 : return NS_OK;
61 : }
62 :
63 : void
64 3 : TokenBucketCancelable::Fire()
65 : {
66 3 : if (!mEvent)
67 0 : return;
68 :
69 3 : ATokenBucketEvent *event = mEvent;
70 3 : mEvent = nullptr;
71 3 : event->OnTokenBucketAdmitted();
72 : }
73 :
74 : ////////////////////////////////////////////
75 : // EventTokenBucket
76 : ////////////////////////////////////////////
77 :
78 5 : NS_IMPL_ISUPPORTS(EventTokenBucket, nsITimerCallback)
79 :
80 : // by default 1hz with no burst
81 1 : EventTokenBucket::EventTokenBucket(uint32_t eventsPerSecond,
82 1 : uint32_t burstSize)
83 : : mUnitCost(kUsecPerSec)
84 : , mMaxCredit(kUsecPerSec)
85 : , mCredit(kUsecPerSec)
86 : , mPaused(false)
87 : , mStopped(false)
88 1 : , mTimerArmed(false)
89 : #ifdef XP_WIN
90 : , mFineGrainTimerInUse(false)
91 : , mFineGrainResetTimerArmed(false)
92 : #endif
93 : {
94 1 : mLastUpdate = TimeStamp::Now();
95 :
96 1 : MOZ_ASSERT(NS_IsMainThread());
97 :
98 : nsresult rv;
99 2 : nsCOMPtr<nsIEventTarget> sts;
100 2 : nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
101 1 : if (NS_SUCCEEDED(rv))
102 1 : sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
103 1 : if (NS_SUCCEEDED(rv))
104 1 : mTimer = do_CreateInstance("@mozilla.org/timer;1");
105 1 : if (mTimer)
106 1 : mTimer->SetTarget(sts);
107 1 : SetRate(eventsPerSecond, burstSize);
108 1 : }
109 :
110 0 : EventTokenBucket::~EventTokenBucket()
111 : {
112 0 : SOCKET_LOG(("EventTokenBucket::dtor %p events=%" PRIuSIZE "\n",
113 : this, mEvents.GetSize()));
114 :
115 0 : CleanupTimers();
116 :
117 : // Complete any queued events to prevent hangs
118 0 : while (mEvents.GetSize()) {
119 : RefPtr<TokenBucketCancelable> cancelable =
120 0 : dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
121 0 : cancelable->Fire();
122 : }
123 0 : }
124 :
125 : void
126 0 : EventTokenBucket::CleanupTimers()
127 : {
128 0 : if (mTimer && mTimerArmed) {
129 0 : mTimer->Cancel();
130 : }
131 0 : mTimer = nullptr;
132 0 : mTimerArmed = false;
133 :
134 : #ifdef XP_WIN
135 : NormalTimers();
136 : if (mFineGrainResetTimer && mFineGrainResetTimerArmed) {
137 : mFineGrainResetTimer->Cancel();
138 : }
139 : mFineGrainResetTimer = nullptr;
140 : mFineGrainResetTimerArmed = false;
141 : #endif
142 0 : }
143 :
144 : void
145 1 : EventTokenBucket::SetRate(uint32_t eventsPerSecond,
146 : uint32_t burstSize)
147 : {
148 1 : SOCKET_LOG(("EventTokenBucket::SetRate %p %u %u\n",
149 : this, eventsPerSecond, burstSize));
150 :
151 1 : if (eventsPerSecond > kMaxHz) {
152 0 : eventsPerSecond = kMaxHz;
153 0 : SOCKET_LOG((" eventsPerSecond out of range\n"));
154 : }
155 :
156 1 : if (!eventsPerSecond) {
157 0 : eventsPerSecond = 1;
158 0 : SOCKET_LOG((" eventsPerSecond out of range\n"));
159 : }
160 :
161 1 : mUnitCost = kUsecPerSec / eventsPerSecond;
162 1 : mMaxCredit = mUnitCost * burstSize;
163 1 : if (mMaxCredit > kUsecPerSec * 60 * 15) {
164 0 : SOCKET_LOG((" burstSize out of range\n"));
165 0 : mMaxCredit = kUsecPerSec * 60 * 15;
166 : }
167 1 : mCredit = mMaxCredit;
168 1 : mLastUpdate = TimeStamp::Now();
169 1 : }
170 :
171 : void
172 0 : EventTokenBucket::ClearCredits()
173 : {
174 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
175 0 : SOCKET_LOG(("EventTokenBucket::ClearCredits %p\n", this));
176 0 : mCredit = 0;
177 0 : }
178 :
179 : uint32_t
180 0 : EventTokenBucket::BurstEventsAvailable()
181 : {
182 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
183 0 : return static_cast<uint32_t>(mCredit / mUnitCost);
184 : }
185 :
186 : uint32_t
187 0 : EventTokenBucket::QueuedEvents()
188 : {
189 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
190 0 : return mEvents.GetSize();
191 : }
192 :
193 : void
194 0 : EventTokenBucket::Pause()
195 : {
196 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
197 0 : SOCKET_LOG(("EventTokenBucket::Pause %p\n", this));
198 0 : if (mPaused || mStopped)
199 0 : return;
200 :
201 0 : mPaused = true;
202 0 : if (mTimerArmed) {
203 0 : mTimer->Cancel();
204 0 : mTimerArmed = false;
205 : }
206 : }
207 :
208 : void
209 0 : EventTokenBucket::UnPause()
210 : {
211 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
212 0 : SOCKET_LOG(("EventTokenBucket::UnPause %p\n", this));
213 0 : if (!mPaused || mStopped)
214 0 : return;
215 :
216 0 : mPaused = false;
217 0 : DispatchEvents();
218 0 : UpdateTimer();
219 : }
220 :
221 : void
222 0 : EventTokenBucket::Stop()
223 : {
224 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
225 0 : SOCKET_LOG(("EventTokenBucket::Stop %p armed=%d\n", this, mTimerArmed));
226 0 : mStopped = true;
227 0 : CleanupTimers();
228 :
229 : // Complete any queued events to prevent hangs
230 0 : while (mEvents.GetSize()) {
231 : RefPtr<TokenBucketCancelable> cancelable =
232 0 : dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
233 0 : cancelable->Fire();
234 : }
235 0 : }
236 :
237 : nsresult
238 3 : EventTokenBucket::SubmitEvent(ATokenBucketEvent *event, nsICancelable **cancelable)
239 : {
240 3 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
241 3 : SOCKET_LOG(("EventTokenBucket::SubmitEvent %p\n", this));
242 :
243 3 : if (mStopped || !mTimer)
244 0 : return NS_ERROR_FAILURE;
245 :
246 3 : UpdateCredits();
247 :
248 6 : RefPtr<TokenBucketCancelable> cancelEvent = new TokenBucketCancelable(event);
249 : // When this function exits the cancelEvent needs 2 references, one for the
250 : // mEvents queue and one for the caller of SubmitEvent()
251 :
252 3 : NS_ADDREF(*cancelable = cancelEvent.get());
253 :
254 3 : if (mPaused || !TryImmediateDispatch(cancelEvent.get())) {
255 : // queue it
256 0 : SOCKET_LOG((" queued\n"));
257 0 : mEvents.Push(cancelEvent.forget().take());
258 0 : UpdateTimer();
259 : }
260 : else {
261 3 : SOCKET_LOG((" dispatched synchronously\n"));
262 : }
263 :
264 3 : return NS_OK;
265 : }
266 :
267 : bool
268 3 : EventTokenBucket::TryImmediateDispatch(TokenBucketCancelable *cancelable)
269 : {
270 3 : if (mCredit < mUnitCost)
271 0 : return false;
272 :
273 3 : mCredit -= mUnitCost;
274 3 : cancelable->Fire();
275 3 : return true;
276 : }
277 :
278 : void
279 0 : EventTokenBucket::DispatchEvents()
280 : {
281 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
282 0 : SOCKET_LOG(("EventTokenBucket::DispatchEvents %p %d\n", this, mPaused));
283 0 : if (mPaused || mStopped)
284 0 : return;
285 :
286 0 : while (mEvents.GetSize() && mUnitCost <= mCredit) {
287 : RefPtr<TokenBucketCancelable> cancelable =
288 0 : dont_AddRef(static_cast<TokenBucketCancelable *>(mEvents.PopFront()));
289 0 : if (cancelable->mEvent) {
290 0 : SOCKET_LOG(("EventTokenBucket::DispachEvents [%p] "
291 : "Dispatching queue token bucket event cost=%" PRIu64 " credit=%" PRIu64 "\n",
292 : this, mUnitCost, mCredit));
293 0 : mCredit -= mUnitCost;
294 0 : cancelable->Fire();
295 : }
296 : }
297 :
298 : #ifdef XP_WIN
299 : if (!mEvents.GetSize())
300 : WantNormalTimers();
301 : #endif
302 : }
303 :
304 : void
305 0 : EventTokenBucket::UpdateTimer()
306 : {
307 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
308 0 : if (mTimerArmed || mPaused || mStopped || !mEvents.GetSize() || !mTimer)
309 0 : return;
310 :
311 0 : if (mCredit >= mUnitCost)
312 0 : return;
313 :
314 : // determine the time needed to wait to accumulate enough credits to admit
315 : // one more event and set the timer for that point. Always round it
316 : // up because firing early doesn't help.
317 : //
318 0 : uint64_t deficit = mUnitCost - mCredit;
319 0 : uint64_t msecWait = (deficit + (kUsecPerMsec - 1)) / kUsecPerMsec;
320 :
321 0 : if (msecWait < 4) // minimum wait
322 0 : msecWait = 4;
323 0 : else if (msecWait > 60000) // maximum wait
324 0 : msecWait = 60000;
325 :
326 : #ifdef XP_WIN
327 : FineGrainTimers();
328 : #endif
329 :
330 0 : SOCKET_LOG(("EventTokenBucket::UpdateTimer %p for %" PRIu64 "ms\n",
331 : this, msecWait));
332 0 : nsresult rv = mTimer->InitWithCallback(this, static_cast<uint32_t>(msecWait),
333 0 : nsITimer::TYPE_ONE_SHOT);
334 0 : mTimerArmed = NS_SUCCEEDED(rv);
335 : }
336 :
337 : NS_IMETHODIMP
338 0 : EventTokenBucket::Notify(nsITimer *timer)
339 : {
340 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
341 :
342 : #ifdef XP_WIN
343 : if (timer == mFineGrainResetTimer) {
344 : FineGrainResetTimerNotify();
345 : return NS_OK;
346 : }
347 : #endif
348 :
349 0 : SOCKET_LOG(("EventTokenBucket::Notify() %p\n", this));
350 0 : mTimerArmed = false;
351 0 : if (mStopped)
352 0 : return NS_OK;
353 :
354 0 : UpdateCredits();
355 0 : DispatchEvents();
356 0 : UpdateTimer();
357 :
358 0 : return NS_OK;
359 : }
360 :
361 : void
362 3 : EventTokenBucket::UpdateCredits()
363 : {
364 3 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
365 :
366 3 : TimeStamp now = TimeStamp::Now();
367 3 : TimeDuration elapsed = now - mLastUpdate;
368 3 : mLastUpdate = now;
369 :
370 3 : mCredit += static_cast<uint64_t>(elapsed.ToMicroseconds());
371 3 : if (mCredit > mMaxCredit)
372 3 : mCredit = mMaxCredit;
373 3 : SOCKET_LOG(("EventTokenBucket::UpdateCredits %p to %" PRIu64 " (%" PRIu64 " each.. %3.2f)\n",
374 : this, mCredit, mUnitCost, (double)mCredit / mUnitCost));
375 3 : }
376 :
377 : #ifdef XP_WIN
378 : void
379 : EventTokenBucket::FineGrainTimers()
380 : {
381 : SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p mFineGrainTimerInUse=%d\n",
382 : this, mFineGrainTimerInUse));
383 :
384 : mLastFineGrainTimerUse = TimeStamp::Now();
385 :
386 : if (mFineGrainTimerInUse)
387 : return;
388 :
389 : if (mUnitCost > kCostFineGrainThreshold)
390 : return;
391 :
392 : SOCKET_LOG(("EventTokenBucket::FineGrainTimers %p timeBeginPeriod()\n",
393 : this));
394 :
395 : mFineGrainTimerInUse = true;
396 : timeBeginPeriod(1);
397 : }
398 :
399 : void
400 : EventTokenBucket::NormalTimers()
401 : {
402 : if (!mFineGrainTimerInUse)
403 : return;
404 : mFineGrainTimerInUse = false;
405 :
406 : SOCKET_LOG(("EventTokenBucket::NormalTimers %p timeEndPeriod()\n", this));
407 : timeEndPeriod(1);
408 : }
409 :
410 : void
411 : EventTokenBucket::WantNormalTimers()
412 : {
413 : if (!mFineGrainTimerInUse)
414 : return;
415 : if (mFineGrainResetTimerArmed)
416 : return;
417 :
418 : TimeDuration elapsed(TimeStamp::Now() - mLastFineGrainTimerUse);
419 : static const TimeDuration fiveSeconds = TimeDuration::FromSeconds(5);
420 :
421 : if (elapsed >= fiveSeconds) {
422 : NormalTimers();
423 : return;
424 : }
425 :
426 : if (!mFineGrainResetTimer)
427 : mFineGrainResetTimer = do_CreateInstance("@mozilla.org/timer;1");
428 :
429 : // if we can't delay the reset, just do it now
430 : if (!mFineGrainResetTimer) {
431 : NormalTimers();
432 : return;
433 : }
434 :
435 : // pad the callback out 100ms to avoid having to round trip this again if the
436 : // timer calls back just a tad early.
437 : SOCKET_LOG(("EventTokenBucket::WantNormalTimers %p "
438 : "Will reset timer granularity after delay", this));
439 :
440 : mFineGrainResetTimer->InitWithCallback(
441 : this,
442 : static_cast<uint32_t>((fiveSeconds - elapsed).ToMilliseconds()) + 100,
443 : nsITimer::TYPE_ONE_SHOT);
444 : mFineGrainResetTimerArmed = true;
445 : }
446 :
447 : void
448 : EventTokenBucket::FineGrainResetTimerNotify()
449 : {
450 : SOCKET_LOG(("EventTokenBucket::FineGrainResetTimerNotify() events = %d\n",
451 : this, mEvents.GetSize()));
452 : mFineGrainResetTimerArmed = false;
453 :
454 : // If we are currently processing events then wait for the queue to drain
455 : // before trying to reset back to normal timers again
456 : if (!mEvents.GetSize())
457 : WantNormalTimers();
458 : }
459 :
460 : #endif
461 :
462 : } // namespace net
463 : } // namespace mozilla
|