Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "ThrottleQueue.h"
8 : #include "nsISeekableStream.h"
9 : #include "nsIAsyncInputStream.h"
10 : #include "nsStreamUtils.h"
11 : #include "nsNetUtil.h"
12 :
13 : namespace mozilla {
14 : namespace net {
15 :
16 : //-----------------------------------------------------------------------------
17 :
18 : class ThrottleInputStream final
19 : : public nsIAsyncInputStream
20 : , public nsISeekableStream
21 : {
22 : public:
23 :
24 : ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
25 :
26 : NS_DECL_THREADSAFE_ISUPPORTS
27 : NS_DECL_NSIINPUTSTREAM
28 : NS_DECL_NSISEEKABLESTREAM
29 : NS_DECL_NSIASYNCINPUTSTREAM
30 :
31 : void AllowInput();
32 :
33 : private:
34 :
35 : ~ThrottleInputStream();
36 :
37 : nsCOMPtr<nsIInputStream> mStream;
38 : RefPtr<ThrottleQueue> mQueue;
39 : nsresult mClosedStatus;
40 :
41 : nsCOMPtr<nsIInputStreamCallback> mCallback;
42 : nsCOMPtr<nsIEventTarget> mEventTarget;
43 : };
44 :
45 0 : NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
46 :
47 0 : ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
48 : : mStream(aStream)
49 : , mQueue(aQueue)
50 0 : , mClosedStatus(NS_OK)
51 : {
52 0 : MOZ_ASSERT(aQueue != nullptr);
53 0 : }
54 :
55 0 : ThrottleInputStream::~ThrottleInputStream()
56 : {
57 0 : Close();
58 0 : }
59 :
60 : NS_IMETHODIMP
61 0 : ThrottleInputStream::Close()
62 : {
63 0 : if (NS_FAILED(mClosedStatus)) {
64 0 : return mClosedStatus;
65 : }
66 :
67 0 : if (mQueue) {
68 0 : mQueue->DequeueStream(this);
69 0 : mQueue = nullptr;
70 0 : mClosedStatus = NS_BASE_STREAM_CLOSED;
71 : }
72 0 : return mStream->Close();
73 : }
74 :
75 : NS_IMETHODIMP
76 0 : ThrottleInputStream::Available(uint64_t* aResult)
77 : {
78 0 : if (NS_FAILED(mClosedStatus)) {
79 0 : return mClosedStatus;
80 : }
81 :
82 0 : return mStream->Available(aResult);
83 : }
84 :
85 : NS_IMETHODIMP
86 0 : ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
87 : {
88 0 : if (NS_FAILED(mClosedStatus)) {
89 0 : return mClosedStatus;
90 : }
91 :
92 : uint32_t realCount;
93 0 : nsresult rv = mQueue->Available(aCount, &realCount);
94 0 : if (NS_FAILED(rv)) {
95 0 : return rv;
96 : }
97 :
98 0 : if (realCount == 0) {
99 0 : return NS_BASE_STREAM_WOULD_BLOCK;
100 : }
101 :
102 0 : rv = mStream->Read(aBuf, realCount, aResult);
103 0 : if (NS_SUCCEEDED(rv) && *aResult > 0) {
104 0 : mQueue->RecordRead(*aResult);
105 : }
106 0 : return rv;
107 : }
108 :
109 : NS_IMETHODIMP
110 0 : ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
111 : uint32_t aCount, uint32_t* aResult)
112 : {
113 0 : if (NS_FAILED(mClosedStatus)) {
114 0 : return mClosedStatus;
115 : }
116 :
117 : uint32_t realCount;
118 0 : nsresult rv = mQueue->Available(aCount, &realCount);
119 0 : if (NS_FAILED(rv)) {
120 0 : return rv;
121 : }
122 :
123 0 : if (realCount == 0) {
124 0 : return NS_BASE_STREAM_WOULD_BLOCK;
125 : }
126 :
127 0 : rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
128 0 : if (NS_SUCCEEDED(rv) && *aResult > 0) {
129 0 : mQueue->RecordRead(*aResult);
130 : }
131 0 : return rv;
132 : }
133 :
134 : NS_IMETHODIMP
135 0 : ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
136 : {
137 0 : *aNonBlocking = true;
138 0 : return NS_OK;
139 : }
140 :
141 : NS_IMETHODIMP
142 0 : ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
143 : {
144 0 : if (NS_FAILED(mClosedStatus)) {
145 0 : return mClosedStatus;
146 : }
147 :
148 0 : nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
149 0 : if (!sstream) {
150 0 : return NS_ERROR_FAILURE;
151 : }
152 :
153 0 : return sstream->Seek(aWhence, aOffset);
154 : }
155 :
156 : NS_IMETHODIMP
157 0 : ThrottleInputStream::Tell(int64_t* aResult)
158 : {
159 0 : if (NS_FAILED(mClosedStatus)) {
160 0 : return mClosedStatus;
161 : }
162 :
163 0 : nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
164 0 : if (!sstream) {
165 0 : return NS_ERROR_FAILURE;
166 : }
167 :
168 0 : return sstream->Tell(aResult);
169 : }
170 :
171 : NS_IMETHODIMP
172 0 : ThrottleInputStream::SetEOF()
173 : {
174 0 : if (NS_FAILED(mClosedStatus)) {
175 0 : return mClosedStatus;
176 : }
177 :
178 0 : nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
179 0 : if (!sstream) {
180 0 : return NS_ERROR_FAILURE;
181 : }
182 :
183 0 : return sstream->SetEOF();
184 : }
185 :
186 : NS_IMETHODIMP
187 0 : ThrottleInputStream::CloseWithStatus(nsresult aStatus)
188 : {
189 0 : if (NS_FAILED(mClosedStatus)) {
190 : // Already closed, ignore.
191 0 : return NS_OK;
192 : }
193 0 : if (NS_SUCCEEDED(aStatus)) {
194 0 : aStatus = NS_BASE_STREAM_CLOSED;
195 : }
196 :
197 0 : mClosedStatus = Close();
198 0 : if (NS_SUCCEEDED(mClosedStatus)) {
199 0 : mClosedStatus = aStatus;
200 : }
201 0 : return NS_OK;
202 : }
203 :
204 : NS_IMETHODIMP
205 0 : ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
206 : uint32_t aFlags,
207 : uint32_t aRequestedCount,
208 : nsIEventTarget *aEventTarget)
209 : {
210 0 : if (aFlags != 0) {
211 0 : return NS_ERROR_ILLEGAL_VALUE;
212 : }
213 :
214 0 : mCallback = aCallback;
215 0 : mEventTarget = aEventTarget;
216 0 : if (mCallback) {
217 0 : mQueue->QueueStream(this);
218 : } else {
219 0 : mQueue->DequeueStream(this);
220 : }
221 0 : return NS_OK;
222 : }
223 :
224 : void
225 0 : ThrottleInputStream::AllowInput()
226 : {
227 0 : MOZ_ASSERT(mCallback);
228 : nsCOMPtr<nsIInputStreamCallback> callbackEvent =
229 0 : NS_NewInputStreamReadyEvent("ThrottleInputStream::AllowInput",
230 0 : mCallback, mEventTarget);
231 0 : mCallback = nullptr;
232 0 : mEventTarget = nullptr;
233 0 : callbackEvent->OnInputStreamReady(this);
234 0 : }
235 :
236 : //-----------------------------------------------------------------------------
237 :
238 0 : NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback)
239 :
240 0 : ThrottleQueue::ThrottleQueue()
241 : : mMeanBytesPerSecond(0)
242 : , mMaxBytesPerSecond(0)
243 : , mBytesProcessed(0)
244 0 : , mTimerArmed(false)
245 : {
246 : nsresult rv;
247 0 : nsCOMPtr<nsIEventTarget> sts;
248 0 : nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
249 0 : if (NS_SUCCEEDED(rv))
250 0 : sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
251 0 : if (NS_SUCCEEDED(rv))
252 0 : mTimer = do_CreateInstance("@mozilla.org/timer;1");
253 0 : if (mTimer)
254 0 : mTimer->SetTarget(sts);
255 0 : }
256 :
257 0 : ThrottleQueue::~ThrottleQueue()
258 : {
259 0 : if (mTimer && mTimerArmed) {
260 0 : mTimer->Cancel();
261 : }
262 0 : mTimer = nullptr;
263 0 : }
264 :
265 : NS_IMETHODIMP
266 0 : ThrottleQueue::RecordRead(uint32_t aBytesRead)
267 : {
268 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
269 0 : ThrottleEntry entry;
270 0 : entry.mTime = TimeStamp::Now();
271 0 : entry.mBytesRead = aBytesRead;
272 0 : mReadEvents.AppendElement(entry);
273 0 : mBytesProcessed += aBytesRead;
274 0 : return NS_OK;
275 : }
276 :
277 : NS_IMETHODIMP
278 0 : ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
279 : {
280 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
281 0 : TimeStamp now = TimeStamp::Now();
282 0 : TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
283 : size_t i;
284 :
285 : // Remove all stale events.
286 0 : for (i = 0; i < mReadEvents.Length(); ++i) {
287 0 : if (mReadEvents[i].mTime >= oneSecondAgo) {
288 0 : break;
289 : }
290 : }
291 0 : mReadEvents.RemoveElementsAt(0, i);
292 :
293 0 : uint32_t totalBytes = 0;
294 0 : for (i = 0; i < mReadEvents.Length(); ++i) {
295 0 : totalBytes += mReadEvents[i].mBytesRead;
296 : }
297 :
298 0 : uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
299 0 : double prob = static_cast<double>(rand()) / RAND_MAX;
300 0 : uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
301 0 : static_cast<uint32_t>(2 * spread * prob);
302 :
303 0 : if (totalBytes >= thisSliceBytes) {
304 0 : *aAvailable = 0;
305 : } else {
306 0 : *aAvailable = thisSliceBytes;
307 : }
308 0 : return NS_OK;
309 : }
310 :
311 : NS_IMETHODIMP
312 0 : ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
313 : {
314 : // Can be called on any thread.
315 0 : if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
316 0 : return NS_ERROR_ILLEGAL_VALUE;
317 : }
318 :
319 0 : mMeanBytesPerSecond = aMeanBytesPerSecond;
320 0 : mMaxBytesPerSecond = aMaxBytesPerSecond;
321 0 : return NS_OK;
322 : }
323 :
324 : NS_IMETHODIMP
325 0 : ThrottleQueue::BytesProcessed(uint64_t* aResult)
326 : {
327 0 : *aResult = mBytesProcessed;
328 0 : return NS_OK;
329 : }
330 :
331 : NS_IMETHODIMP
332 0 : ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
333 : {
334 0 : nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
335 0 : result.forget(aResult);
336 0 : return NS_OK;
337 : }
338 :
339 : NS_IMETHODIMP
340 0 : ThrottleQueue::Notify(nsITimer* aTimer)
341 : {
342 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
343 : // A notified reader may need to push itself back on the queue.
344 : // Swap out the list of readers so that this works properly.
345 0 : nsTArray<RefPtr<ThrottleInputStream>> events;
346 0 : events.SwapElements(mAsyncEvents);
347 :
348 : // Optimistically notify all the waiting readers, and then let them
349 : // requeue if there isn't enough bandwidth.
350 0 : for (size_t i = 0; i < events.Length(); ++i) {
351 0 : events[i]->AllowInput();
352 : }
353 :
354 0 : mTimerArmed = false;
355 0 : return NS_OK;
356 : }
357 :
358 : void
359 0 : ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
360 : {
361 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
362 0 : if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
363 0 : mAsyncEvents.AppendElement(aStream);
364 :
365 0 : if (!mTimerArmed) {
366 0 : uint32_t ms = 1000;
367 0 : if (mReadEvents.Length() > 0) {
368 0 : TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
369 0 : TimeStamp now = TimeStamp::Now();
370 :
371 0 : if (t > now) {
372 0 : ms = static_cast<uint32_t>((t - now).ToMilliseconds());
373 : } else {
374 0 : ms = 1;
375 : }
376 : }
377 :
378 0 : if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
379 0 : mTimerArmed = true;
380 : }
381 : }
382 : }
383 0 : }
384 :
385 : void
386 0 : ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
387 : {
388 0 : MOZ_ASSERT(OnSocketThread(), "not on socket thread");
389 0 : mAsyncEvents.RemoveElement(aStream);
390 0 : }
391 :
392 : }
393 : }
|