LCOV - code coverage report
Current view: top level - ipc/glue - IPCStreamSource.cpp (source / functions) Hit Total Coverage
Test: output.info Lines: 1 136 0.7 %
Date: 2017-07-14 16:53:18 Functions: 2 25 8.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
       2             : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
       3             : /* This Source Code Form is subject to the terms of the Mozilla Public
       4             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       5             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
       6             : 
       7             : #include "IPCStreamSource.h"
       8             : #include "nsIAsyncInputStream.h"
       9             : #include "nsICancelableRunnable.h"
      10             : #include "nsIRunnable.h"
      11             : #include "nsISerialEventTarget.h"
      12             : #include "nsStreamUtils.h"
      13             : #include "nsThreadUtils.h"
      14             : 
      15             : using mozilla::dom::workers::Canceling;
      16             : using mozilla::dom::workers::GetCurrentThreadWorkerPrivate;
      17             : using mozilla::dom::workers::Status;
      18             : using mozilla::dom::workers::WorkerPrivate;
      19             : 
      20             : namespace mozilla {
      21             : namespace ipc {
      22             : 
      23             : class IPCStreamSource::Callback final : public nsIInputStreamCallback
      24             :                                       , public nsIRunnable
      25             :                                       , public nsICancelableRunnable
      26             : {
      27             : public:
      28           0 :   explicit Callback(IPCStreamSource* aSource)
      29           0 :     : mSource(aSource)
      30           0 :     , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
      31             :   {
      32           0 :     MOZ_ASSERT(mSource);
      33           0 :   }
      34             : 
      35             :   NS_IMETHOD
      36           0 :   OnInputStreamReady(nsIAsyncInputStream* aStream) override
      37             :   {
      38             :     // any thread
      39           0 :     if (mOwningEventTarget->IsOnCurrentThread()) {
      40           0 :       return Run();
      41             :     }
      42             : 
      43             :     // If this fails, then it means the owning thread is a Worker that has
      44             :     // been shutdown.  Its ok to lose the event in this case because the
      45             :     // IPCStreamChild listens for this event through the WorkerHolder.
      46           0 :     nsresult rv = mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
      47           0 :     if (NS_FAILED(rv)) {
      48           0 :       NS_WARNING("Failed to dispatch stream readable event to owning thread");
      49             :     }
      50             : 
      51           0 :     return NS_OK;
      52             :   }
      53             : 
      54             :   NS_IMETHOD
      55           0 :   Run() override
      56             :   {
      57           0 :     MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
      58           0 :     if (mSource) {
      59           0 :       mSource->OnStreamReady(this);
      60             :     }
      61           0 :     return NS_OK;
      62             :   }
      63             : 
      64             :   nsresult
      65           0 :   Cancel() override
      66             :   {
      67             :     // Cancel() gets called when the Worker thread is being shutdown.  We have
      68             :     // nothing to do here because IPCStreamChild handles this case via
      69             :     // the WorkerHolder.
      70           0 :     return NS_OK;
      71             :   }
      72             : 
      73             :   void
      74           0 :   ClearSource()
      75             :   {
      76           0 :     MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
      77           0 :     MOZ_ASSERT(mSource);
      78           0 :     mSource = nullptr;
      79           0 :   }
      80             : 
      81             : private:
      82           0 :   ~Callback()
      83           0 :   {
      84             :     // called on any thread
      85             : 
      86             :     // ClearSource() should be called before the Callback is destroyed
      87           0 :     MOZ_ASSERT(!mSource);
      88           0 :   }
      89             : 
      90             :   // This is a raw pointer because the source keeps alive the callback and,
      91             :   // before beeing destroyed, it nullifies this pointer (this happens when
      92             :   // ActorDestroyed() is called).
      93             :   IPCStreamSource* mSource;
      94             : 
      95             :   nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
      96             : 
      97             :   NS_DECL_THREADSAFE_ISUPPORTS
      98             : };
      99             : 
     100           0 : NS_IMPL_ISUPPORTS(IPCStreamSource::Callback, nsIInputStreamCallback,
     101             :                                              nsIRunnable,
     102             :                                              nsICancelableRunnable);
     103             : 
     104           0 : IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
     105             :   : mStream(aInputStream)
     106             :   , mWorkerPrivate(nullptr)
     107           0 :   , mState(ePending)
     108             : {
     109           0 :   MOZ_ASSERT(aInputStream);
     110           0 : }
     111             : 
     112           0 : IPCStreamSource::~IPCStreamSource()
     113             : {
     114           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     115           0 :   MOZ_ASSERT(mState == eClosed);
     116           0 :   MOZ_ASSERT(!mCallback);
     117           0 :   MOZ_ASSERT(!mWorkerPrivate);
     118           0 : }
     119             : 
     120             : bool
     121           0 : IPCStreamSource::Initialize()
     122             : {
     123           0 :   bool nonBlocking = false;
     124           0 :   MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
     125             :   // IPCStreamChild reads in the current thread, so it is only supported on
     126             :   // non-blocking, async channels
     127           0 :   if (!nonBlocking) {
     128           0 :     return false;
     129             :   }
     130             : 
     131             :   // A source can be used on any thread, but we only support IPCStream on
     132             :   // main thread, Workers and PBackground thread right now.  This is due
     133             :   // to the requirement  that the thread be guaranteed to live long enough to
     134             :   // receive messages. We can enforce this guarantee with a WorkerHolder on
     135             :   // worker threads, but not other threads. Main-thread and PBackground thread
     136             :   // do not need anything special in order to be kept alive.
     137           0 :   WorkerPrivate* workerPrivate = nullptr;
     138           0 :   if (!NS_IsMainThread()) {
     139           0 :     workerPrivate = GetCurrentThreadWorkerPrivate();
     140           0 :     if (workerPrivate) {
     141           0 :       bool result = HoldWorker(workerPrivate, Canceling);
     142           0 :       if (!result) {
     143           0 :         return false;
     144             :       }
     145             : 
     146           0 :       mWorkerPrivate = workerPrivate;
     147             :     } else {
     148           0 :       AssertIsOnBackgroundThread();
     149             :     }
     150             :   }
     151             : 
     152           0 :   return true;
     153             : }
     154             : 
     155             : void
     156           0 : IPCStreamSource::ActorConstructed()
     157             : {
     158           0 :   MOZ_ASSERT(mState == ePending);
     159           0 :   mState = eActorConstructed;
     160           0 : }
     161             : 
     162             : bool
     163           0 : IPCStreamSource::Notify(Status aStatus)
     164             : {
     165           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     166             : 
     167             :   // Keep the worker thread alive until the stream is finished.
     168           0 :   return true;
     169             : }
     170             : 
     171             : void
     172           0 : IPCStreamSource::ActorDestroyed()
     173             : {
     174           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     175             : 
     176           0 :   mState = eClosed;
     177             : 
     178           0 :   if (mCallback) {
     179           0 :     mCallback->ClearSource();
     180           0 :     mCallback = nullptr;
     181             :   }
     182             : 
     183           0 :   if (mWorkerPrivate) {
     184           0 :     ReleaseWorker();
     185           0 :     mWorkerPrivate = nullptr;
     186             :   }
     187           0 : }
     188             : 
     189             : void
     190           0 : IPCStreamSource::Start()
     191             : {
     192           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     193           0 :   DoRead();
     194           0 : }
     195             : 
     196             : void
     197           0 : IPCStreamSource::StartDestroy()
     198             : {
     199           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     200           0 :   OnEnd(NS_ERROR_ABORT);
     201           0 : }
     202             : 
     203             : void
     204           0 : IPCStreamSource::DoRead()
     205             : {
     206           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     207           0 :   MOZ_ASSERT(mState == eActorConstructed);
     208           0 :   MOZ_ASSERT(!mCallback);
     209             : 
     210             :   // The input stream (likely a pipe) probably uses a segment size of
     211             :   // 4kb.  If there is data already buffered it would be nice to aggregate
     212             :   // multiple segments into a single IPC call.  Conversely, don't send too
     213             :   // too large of a buffer in a single call to avoid spiking memory.
     214             :   static const uint64_t kMaxBytesPerMessage = 32 * 1024;
     215             :   static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
     216             :                 "kMaxBytesPerMessage must cleanly cast to uint32_t");
     217             : 
     218             :   while (true) {
     219             :     // It should not be possible to transition to closed state without
     220             :     // this loop terminating via a return.
     221           0 :     MOZ_ASSERT(mState == eActorConstructed);
     222             : 
     223             :     // Use non-auto here as we're unlikely to hit stack storage with the
     224             :     // sizes we are sending.  Also, it would be nice to avoid another copy
     225             :     // to the IPC layer which we avoid if we use COW strings.  Unfortunately
     226             :     // IPC does not seem to support passing dependent storage types.
     227           0 :     nsCString buffer;
     228             : 
     229           0 :     uint64_t available = 0;
     230           0 :     nsresult rv = mStream->Available(&available);
     231           0 :     if (NS_FAILED(rv)) {
     232           0 :       OnEnd(rv);
     233           0 :       return;
     234             :     }
     235             : 
     236           0 :     if (available == 0) {
     237           0 :       Wait();
     238           0 :       return;
     239             :     }
     240             : 
     241             :     uint32_t expectedBytes =
     242           0 :       static_cast<uint32_t>(std::min(available, kMaxBytesPerMessage));
     243             : 
     244           0 :     buffer.SetLength(expectedBytes);
     245             : 
     246           0 :     uint32_t bytesRead = 0;
     247           0 :     rv = mStream->Read(buffer.BeginWriting(), buffer.Length(), &bytesRead);
     248           0 :     MOZ_ASSERT_IF(NS_FAILED(rv), bytesRead == 0);
     249           0 :     buffer.SetLength(bytesRead);
     250             : 
     251             :     // If we read any data from the stream, send it across.
     252           0 :     if (!buffer.IsEmpty()) {
     253           0 :       SendData(buffer);
     254             :     }
     255             : 
     256           0 :     if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
     257           0 :       Wait();
     258           0 :       return;
     259             :     }
     260             : 
     261             :     // Any other error or zero-byte read indicates end-of-stream
     262           0 :     if (NS_FAILED(rv) || buffer.IsEmpty()) {
     263           0 :       OnEnd(rv);
     264           0 :       return;
     265             :     }
     266           0 :   }
     267             : }
     268             : 
     269             : void
     270           0 : IPCStreamSource::Wait()
     271             : {
     272           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     273           0 :   MOZ_ASSERT(mState == eActorConstructed);
     274           0 :   MOZ_ASSERT(!mCallback);
     275             : 
     276             :   // Set mCallback immediately instead of waiting for success.  Its possible
     277             :   // AsyncWait() will callback synchronously.
     278           0 :   mCallback = new Callback(this);
     279           0 :   nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
     280           0 :   if (NS_FAILED(rv)) {
     281           0 :     OnEnd(rv);
     282           0 :     return;
     283             :   }
     284             : }
     285             : 
     286             : void
     287           0 : IPCStreamSource::OnStreamReady(Callback* aCallback)
     288             : {
     289           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     290           0 :   MOZ_ASSERT(mCallback);
     291           0 :   MOZ_ASSERT(aCallback == mCallback);
     292           0 :   mCallback->ClearSource();
     293           0 :   mCallback = nullptr;
     294           0 :   DoRead();
     295           0 : }
     296             : 
     297             : void
     298           0 : IPCStreamSource::OnEnd(nsresult aRv)
     299             : {
     300           0 :   NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
     301           0 :   MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
     302             : 
     303           0 :   if (mState == eClosed) {
     304           0 :     return;
     305             :   }
     306             : 
     307           0 :   mState = eClosed;
     308             : 
     309           0 :   mStream->CloseWithStatus(aRv);
     310             : 
     311           0 :   if (aRv == NS_BASE_STREAM_CLOSED) {
     312           0 :     aRv = NS_OK;
     313             :   }
     314             : 
     315             :   // This will trigger an ActorDestroy() from the other side
     316           0 :   Close(aRv);
     317             : }
     318             : 
     319             : } // namespace ipc
     320           9 : } // namespace mozilla

Generated by: LCOV version 1.13