LCOV - code coverage report
Current view: top level - netwerk/base - ThrottleQueue.cpp (source / functions) Hit Total Coverage
Test: output.info Lines: 0 178 0.0 %
Date: 2017-07-14 16:53:18 Functions: 0 29 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
       2             : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
       3             : /* This Source Code Form is subject to the terms of the Mozilla Public
       4             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       5             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
       6             : 
       7             : #include "ThrottleQueue.h"
       8             : #include "nsISeekableStream.h"
       9             : #include "nsIAsyncInputStream.h"
      10             : #include "nsStreamUtils.h"
      11             : #include "nsNetUtil.h"
      12             : 
      13             : namespace mozilla {
      14             : namespace net {
      15             : 
      16             : //-----------------------------------------------------------------------------
      17             : 
      18             : class ThrottleInputStream final
      19             :   : public nsIAsyncInputStream
      20             :   , public nsISeekableStream
      21             : {
      22             : public:
      23             : 
      24             :   ThrottleInputStream(nsIInputStream* aStream, ThrottleQueue* aQueue);
      25             : 
      26             :   NS_DECL_THREADSAFE_ISUPPORTS
      27             :   NS_DECL_NSIINPUTSTREAM
      28             :   NS_DECL_NSISEEKABLESTREAM
      29             :   NS_DECL_NSIASYNCINPUTSTREAM
      30             : 
      31             :   void AllowInput();
      32             : 
      33             : private:
      34             : 
      35             :   ~ThrottleInputStream();
      36             : 
      37             :   nsCOMPtr<nsIInputStream> mStream;
      38             :   RefPtr<ThrottleQueue> mQueue;
      39             :   nsresult mClosedStatus;
      40             : 
      41             :   nsCOMPtr<nsIInputStreamCallback> mCallback;
      42             :   nsCOMPtr<nsIEventTarget> mEventTarget;
      43             : };
      44             : 
      45           0 : NS_IMPL_ISUPPORTS(ThrottleInputStream, nsIAsyncInputStream, nsIInputStream, nsISeekableStream)
      46             : 
      47           0 : ThrottleInputStream::ThrottleInputStream(nsIInputStream *aStream, ThrottleQueue* aQueue)
      48             :   : mStream(aStream)
      49             :   , mQueue(aQueue)
      50           0 :   , mClosedStatus(NS_OK)
      51             : {
      52           0 :   MOZ_ASSERT(aQueue != nullptr);
      53           0 : }
      54             : 
      55           0 : ThrottleInputStream::~ThrottleInputStream()
      56             : {
      57           0 :   Close();
      58           0 : }
      59             : 
      60             : NS_IMETHODIMP
      61           0 : ThrottleInputStream::Close()
      62             : {
      63           0 :   if (NS_FAILED(mClosedStatus)) {
      64           0 :     return mClosedStatus;
      65             :   }
      66             : 
      67           0 :   if (mQueue) {
      68           0 :     mQueue->DequeueStream(this);
      69           0 :     mQueue = nullptr;
      70           0 :     mClosedStatus = NS_BASE_STREAM_CLOSED;
      71             :   }
      72           0 :   return mStream->Close();
      73             : }
      74             : 
      75             : NS_IMETHODIMP
      76           0 : ThrottleInputStream::Available(uint64_t* aResult)
      77             : {
      78           0 :   if (NS_FAILED(mClosedStatus)) {
      79           0 :     return mClosedStatus;
      80             :   }
      81             : 
      82           0 :   return mStream->Available(aResult);
      83             : }
      84             : 
      85             : NS_IMETHODIMP
      86           0 : ThrottleInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
      87             : {
      88           0 :   if (NS_FAILED(mClosedStatus)) {
      89           0 :     return mClosedStatus;
      90             :   }
      91             : 
      92             :   uint32_t realCount;
      93           0 :   nsresult rv = mQueue->Available(aCount, &realCount);
      94           0 :   if (NS_FAILED(rv)) {
      95           0 :     return rv;
      96             :   }
      97             : 
      98           0 :   if (realCount == 0) {
      99           0 :     return NS_BASE_STREAM_WOULD_BLOCK;
     100             :   }
     101             : 
     102           0 :   rv = mStream->Read(aBuf, realCount, aResult);
     103           0 :   if (NS_SUCCEEDED(rv) && *aResult > 0) {
     104           0 :     mQueue->RecordRead(*aResult);
     105             :   }
     106           0 :   return rv;
     107             : }
     108             : 
     109             : NS_IMETHODIMP
     110           0 : ThrottleInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
     111             :                                   uint32_t aCount, uint32_t* aResult)
     112             : {
     113           0 :   if (NS_FAILED(mClosedStatus)) {
     114           0 :     return mClosedStatus;
     115             :   }
     116             : 
     117             :   uint32_t realCount;
     118           0 :   nsresult rv = mQueue->Available(aCount, &realCount);
     119           0 :   if (NS_FAILED(rv)) {
     120           0 :     return rv;
     121             :   }
     122             : 
     123           0 :   if (realCount == 0) {
     124           0 :     return NS_BASE_STREAM_WOULD_BLOCK;
     125             :   }
     126             : 
     127           0 :   rv = mStream->ReadSegments(aWriter, aClosure, realCount, aResult);
     128           0 :   if (NS_SUCCEEDED(rv) && *aResult > 0) {
     129           0 :     mQueue->RecordRead(*aResult);
     130             :   }
     131           0 :   return rv;
     132             : }
     133             : 
     134             : NS_IMETHODIMP
     135           0 : ThrottleInputStream::IsNonBlocking(bool* aNonBlocking)
     136             : {
     137           0 :   *aNonBlocking = true;
     138           0 :   return NS_OK;
     139             : }
     140             : 
     141             : NS_IMETHODIMP
     142           0 : ThrottleInputStream::Seek(int32_t aWhence, int64_t aOffset)
     143             : {
     144           0 :   if (NS_FAILED(mClosedStatus)) {
     145           0 :     return mClosedStatus;
     146             :   }
     147             : 
     148           0 :   nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
     149           0 :   if (!sstream) {
     150           0 :     return NS_ERROR_FAILURE;
     151             :   }
     152             : 
     153           0 :   return sstream->Seek(aWhence, aOffset);
     154             : }
     155             : 
     156             : NS_IMETHODIMP
     157           0 : ThrottleInputStream::Tell(int64_t* aResult)
     158             : {
     159           0 :   if (NS_FAILED(mClosedStatus)) {
     160           0 :     return mClosedStatus;
     161             :   }
     162             : 
     163           0 :   nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
     164           0 :   if (!sstream) {
     165           0 :     return NS_ERROR_FAILURE;
     166             :   }
     167             : 
     168           0 :   return sstream->Tell(aResult);
     169             : }
     170             : 
     171             : NS_IMETHODIMP
     172           0 : ThrottleInputStream::SetEOF()
     173             : {
     174           0 :   if (NS_FAILED(mClosedStatus)) {
     175           0 :     return mClosedStatus;
     176             :   }
     177             : 
     178           0 :   nsCOMPtr<nsISeekableStream> sstream = do_QueryInterface(mStream);
     179           0 :   if (!sstream) {
     180           0 :     return NS_ERROR_FAILURE;
     181             :   }
     182             : 
     183           0 :   return sstream->SetEOF();
     184             : }
     185             : 
     186             : NS_IMETHODIMP
     187           0 : ThrottleInputStream::CloseWithStatus(nsresult aStatus)
     188             : {
     189           0 :   if (NS_FAILED(mClosedStatus)) {
     190             :     // Already closed, ignore.
     191           0 :     return NS_OK;
     192             :   }
     193           0 :   if (NS_SUCCEEDED(aStatus)) {
     194           0 :     aStatus = NS_BASE_STREAM_CLOSED;
     195             :   }
     196             : 
     197           0 :   mClosedStatus = Close();
     198           0 :   if (NS_SUCCEEDED(mClosedStatus)) {
     199           0 :     mClosedStatus = aStatus;
     200             :   }
     201           0 :   return NS_OK;
     202             : }
     203             : 
     204             : NS_IMETHODIMP
     205           0 : ThrottleInputStream::AsyncWait(nsIInputStreamCallback *aCallback,
     206             :                                uint32_t aFlags,
     207             :                                uint32_t aRequestedCount,
     208             :                                nsIEventTarget *aEventTarget)
     209             : {
     210           0 :   if (aFlags != 0) {
     211           0 :     return NS_ERROR_ILLEGAL_VALUE;
     212             :   }
     213             : 
     214           0 :   mCallback = aCallback;
     215           0 :   mEventTarget = aEventTarget;
     216           0 :   if (mCallback) {
     217           0 :     mQueue->QueueStream(this);
     218             :   } else {
     219           0 :     mQueue->DequeueStream(this);
     220             :   }
     221           0 :   return NS_OK;
     222             : }
     223             : 
     224             : void
     225           0 : ThrottleInputStream::AllowInput()
     226             : {
     227           0 :   MOZ_ASSERT(mCallback);
     228             :   nsCOMPtr<nsIInputStreamCallback> callbackEvent =
     229           0 :     NS_NewInputStreamReadyEvent("ThrottleInputStream::AllowInput",
     230           0 :                                 mCallback, mEventTarget);
     231           0 :   mCallback = nullptr;
     232           0 :   mEventTarget = nullptr;
     233           0 :   callbackEvent->OnInputStreamReady(this);
     234           0 : }
     235             : 
     236             : //-----------------------------------------------------------------------------
     237             : 
     238           0 : NS_IMPL_ISUPPORTS(ThrottleQueue, nsIInputChannelThrottleQueue, nsITimerCallback)
     239             : 
     240           0 : ThrottleQueue::ThrottleQueue()
     241             :   : mMeanBytesPerSecond(0)
     242             :   , mMaxBytesPerSecond(0)
     243             :   , mBytesProcessed(0)
     244           0 :   , mTimerArmed(false)
     245             : {
     246             :   nsresult rv;
     247           0 :   nsCOMPtr<nsIEventTarget> sts;
     248           0 :   nsCOMPtr<nsIIOService> ioService = do_GetIOService(&rv);
     249           0 :   if (NS_SUCCEEDED(rv))
     250           0 :     sts = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
     251           0 :   if (NS_SUCCEEDED(rv))
     252           0 :     mTimer = do_CreateInstance("@mozilla.org/timer;1");
     253           0 :   if (mTimer)
     254           0 :     mTimer->SetTarget(sts);
     255           0 : }
     256             : 
     257           0 : ThrottleQueue::~ThrottleQueue()
     258             : {
     259           0 :   if (mTimer && mTimerArmed) {
     260           0 :     mTimer->Cancel();
     261             :   }
     262           0 :   mTimer = nullptr;
     263           0 : }
     264             : 
     265             : NS_IMETHODIMP
     266           0 : ThrottleQueue::RecordRead(uint32_t aBytesRead)
     267             : {
     268           0 :   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
     269           0 :   ThrottleEntry entry;
     270           0 :   entry.mTime = TimeStamp::Now();
     271           0 :   entry.mBytesRead = aBytesRead;
     272           0 :   mReadEvents.AppendElement(entry);
     273           0 :   mBytesProcessed += aBytesRead;
     274           0 :   return NS_OK;
     275             : }
     276             : 
     277             : NS_IMETHODIMP
     278           0 : ThrottleQueue::Available(uint32_t aRemaining, uint32_t* aAvailable)
     279             : {
     280           0 :   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
     281           0 :   TimeStamp now = TimeStamp::Now();
     282           0 :   TimeStamp oneSecondAgo = now - TimeDuration::FromSeconds(1);
     283             :   size_t i;
     284             : 
     285             :   // Remove all stale events.
     286           0 :   for (i = 0; i < mReadEvents.Length(); ++i) {
     287           0 :     if (mReadEvents[i].mTime >= oneSecondAgo) {
     288           0 :       break;
     289             :     }
     290             :   }
     291           0 :   mReadEvents.RemoveElementsAt(0, i);
     292             : 
     293           0 :   uint32_t totalBytes = 0;
     294           0 :   for (i = 0; i < mReadEvents.Length(); ++i) {
     295           0 :     totalBytes += mReadEvents[i].mBytesRead;
     296             :   }
     297             : 
     298           0 :   uint32_t spread = mMaxBytesPerSecond - mMeanBytesPerSecond;
     299           0 :   double prob = static_cast<double>(rand()) / RAND_MAX;
     300           0 :   uint32_t thisSliceBytes = mMeanBytesPerSecond - spread +
     301           0 :     static_cast<uint32_t>(2 * spread * prob);
     302             : 
     303           0 :   if (totalBytes >= thisSliceBytes) {
     304           0 :     *aAvailable = 0;
     305             :   } else {
     306           0 :     *aAvailable = thisSliceBytes;
     307             :   }
     308           0 :   return NS_OK;
     309             : }
     310             : 
     311             : NS_IMETHODIMP
     312           0 : ThrottleQueue::Init(uint32_t aMeanBytesPerSecond, uint32_t aMaxBytesPerSecond)
     313             : {
     314             :   // Can be called on any thread.
     315           0 :   if (aMeanBytesPerSecond == 0 || aMaxBytesPerSecond == 0 || aMaxBytesPerSecond < aMeanBytesPerSecond) {
     316           0 :     return NS_ERROR_ILLEGAL_VALUE;
     317             :   }
     318             : 
     319           0 :   mMeanBytesPerSecond = aMeanBytesPerSecond;
     320           0 :   mMaxBytesPerSecond = aMaxBytesPerSecond;
     321           0 :   return NS_OK;
     322             : }
     323             : 
     324             : NS_IMETHODIMP
     325           0 : ThrottleQueue::BytesProcessed(uint64_t* aResult)
     326             : {
     327           0 :   *aResult = mBytesProcessed;
     328           0 :   return NS_OK;
     329             : }
     330             : 
     331             : NS_IMETHODIMP
     332           0 : ThrottleQueue::WrapStream(nsIInputStream* aInputStream, nsIAsyncInputStream** aResult)
     333             : {
     334           0 :   nsCOMPtr<nsIAsyncInputStream> result = new ThrottleInputStream(aInputStream, this);
     335           0 :   result.forget(aResult);
     336           0 :   return NS_OK;
     337             : }
     338             : 
     339             : NS_IMETHODIMP
     340           0 : ThrottleQueue::Notify(nsITimer* aTimer)
     341             : {
     342           0 :   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
     343             :   // A notified reader may need to push itself back on the queue.
     344             :   // Swap out the list of readers so that this works properly.
     345           0 :   nsTArray<RefPtr<ThrottleInputStream>> events;
     346           0 :   events.SwapElements(mAsyncEvents);
     347             : 
     348             :   // Optimistically notify all the waiting readers, and then let them
     349             :   // requeue if there isn't enough bandwidth.
     350           0 :   for (size_t i = 0; i < events.Length(); ++i) {
     351           0 :     events[i]->AllowInput();
     352             :   }
     353             : 
     354           0 :   mTimerArmed = false;
     355           0 :   return NS_OK;
     356             : }
     357             : 
     358             : void
     359           0 : ThrottleQueue::QueueStream(ThrottleInputStream* aStream)
     360             : {
     361           0 :   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
     362           0 :   if (mAsyncEvents.IndexOf(aStream) == mAsyncEvents.NoIndex) {
     363           0 :     mAsyncEvents.AppendElement(aStream);
     364             : 
     365           0 :     if (!mTimerArmed) {
     366           0 :       uint32_t ms = 1000;
     367           0 :       if (mReadEvents.Length() > 0) {
     368           0 :         TimeStamp t = mReadEvents[0].mTime + TimeDuration::FromSeconds(1);
     369           0 :         TimeStamp now = TimeStamp::Now();
     370             : 
     371           0 :         if (t > now) {
     372           0 :           ms = static_cast<uint32_t>((t - now).ToMilliseconds());
     373             :         } else {
     374           0 :           ms = 1;
     375             :         }
     376             :       }
     377             : 
     378           0 :       if (NS_SUCCEEDED(mTimer->InitWithCallback(this, ms, nsITimer::TYPE_ONE_SHOT))) {
     379           0 :         mTimerArmed = true;
     380             :       }
     381             :     }
     382             :   }
     383           0 : }
     384             : 
     385             : void
     386           0 : ThrottleQueue::DequeueStream(ThrottleInputStream* aStream)
     387             : {
     388           0 :   MOZ_ASSERT(OnSocketThread(), "not on socket thread");
     389           0 :   mAsyncEvents.RemoveElement(aStream);
     390           0 : }
     391             : 
     392             : }
     393             : }

Generated by: LCOV version 1.13