LCOV - code coverage report
Current view: top level - xpcom/io - nsMultiplexInputStream.cpp (source / functions) Hit Total Coverage
Test: output.info Lines: 67 504 13.3 %
Date: 2017-07-14 16:53:18 Functions: 12 49 24.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
       2             : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
       3             : /* This Source Code Form is subject to the terms of the Mozilla Public
       4             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       5             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
       6             : 
       7             : /**
       8             :  * The multiplex stream concatenates a list of input streams into a single
       9             :  * stream.
      10             :  */
      11             : 
      12             : #include "mozilla/Attributes.h"
      13             : #include "mozilla/MathAlgorithms.h"
      14             : #include "mozilla/Mutex.h"
      15             : #include "mozilla/SystemGroup.h"
      16             : 
      17             : #include "base/basictypes.h"
      18             : 
      19             : #include "nsMultiplexInputStream.h"
      20             : #include "nsICloneableInputStream.h"
      21             : #include "nsIMultiplexInputStream.h"
      22             : #include "nsISeekableStream.h"
      23             : #include "nsCOMPtr.h"
      24             : #include "nsCOMArray.h"
      25             : #include "nsIClassInfoImpl.h"
      26             : #include "nsIIPCSerializableInputStream.h"
      27             : #include "mozilla/ipc/InputStreamUtils.h"
      28             : #include "nsIAsyncInputStream.h"
      29             : 
      30             : using namespace mozilla;
      31             : using namespace mozilla::ipc;
      32             : 
      33             : using mozilla::DeprecatedAbs;
      34             : using mozilla::Maybe;
      35             : using mozilla::Nothing;
      36             : using mozilla::Some;
      37             : 
      38             : class nsMultiplexInputStream final
      39             :   : public nsIMultiplexInputStream
      40             :   , public nsISeekableStream
      41             :   , public nsIIPCSerializableInputStream
      42             :   , public nsICloneableInputStream
      43             :   , public nsIAsyncInputStream
      44             : {
      45             : public:
      46             :   nsMultiplexInputStream();
      47             : 
      48             :   NS_DECL_THREADSAFE_ISUPPORTS
      49             :   NS_DECL_NSIINPUTSTREAM
      50             :   NS_DECL_NSIMULTIPLEXINPUTSTREAM
      51             :   NS_DECL_NSISEEKABLESTREAM
      52             :   NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
      53             :   NS_DECL_NSICLONEABLEINPUTSTREAM
      54             :   NS_DECL_NSIASYNCINPUTSTREAM
      55             : 
      56             :   void AsyncWaitCompleted();
      57             : 
      58             : private:
      59           1 :   ~nsMultiplexInputStream()
      60           1 :   {
      61           1 :   }
      62             : 
      63           0 :   struct MOZ_STACK_CLASS ReadSegmentsState
      64             :   {
      65             :     nsCOMPtr<nsIInputStream> mThisStream;
      66             :     uint32_t mOffset;
      67             :     nsWriteSegmentFun mWriter;
      68             :     void* mClosure;
      69             :     bool mDone;
      70             :   };
      71             : 
      72             :   static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
      73             :                             const char* aFromRawSegment, uint32_t aToOffset,
      74             :                             uint32_t aCount, uint32_t* aWriteCount);
      75             : 
      76             :   bool IsSeekable() const;
      77             :   bool IsIPCSerializable() const;
      78             :   bool IsCloneable() const;
      79             :   bool IsAsyncInputStream() const;
      80             : 
      81             :   Mutex mLock; // Protects access to all data members.
      82             :   nsTArray<nsCOMPtr<nsIInputStream>> mStreams;
      83             :   uint32_t mCurrentStream;
      84             :   bool mStartedReadingCurrent;
      85             :   nsresult mStatus;
      86             :   nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
      87             : };
      88             : 
      89           5 : NS_IMPL_ADDREF(nsMultiplexInputStream)
      90           6 : NS_IMPL_RELEASE(nsMultiplexInputStream)
      91             : 
      92           3 : NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
      93             :                   NS_MULTIPLEXINPUTSTREAM_CID)
      94             : 
      95           5 : NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
      96           5 :   NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
      97           3 :   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIMultiplexInputStream)
      98           3 :   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
      99           3 :   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
     100             :                                      IsIPCSerializable())
     101           2 :   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
     102             :                                      IsCloneable())
     103           2 :   NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
     104             :                                      IsAsyncInputStream())
     105           2 :   NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
     106           2 :   NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
     107           2 : NS_INTERFACE_MAP_END
     108             : 
     109           0 : NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
     110             :                             nsIMultiplexInputStream,
     111             :                             nsIInputStream,
     112             :                             nsISeekableStream)
     113             : 
     114             : static nsresult
     115           0 : AvailableMaybeSeek(nsIInputStream* aStream, uint64_t* aResult)
     116             : {
     117           0 :   nsresult rv = aStream->Available(aResult);
     118           0 :   if (rv == NS_BASE_STREAM_CLOSED) {
     119             :     // Blindly seek to the current position if Available() returns
     120             :     // NS_BASE_STREAM_CLOSED.
     121             :     // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
     122             :     // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
     123           0 :     nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(aStream);
     124           0 :     if (seekable) {
     125           0 :       nsresult rvSeek = seekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
     126           0 :       if (NS_SUCCEEDED(rvSeek)) {
     127           0 :         rv = aStream->Available(aResult);
     128             :       }
     129             :     }
     130             :   }
     131           0 :   return rv;
     132             : }
     133             : 
     134             : static nsresult
     135           0 : TellMaybeSeek(nsISeekableStream* aSeekable, int64_t* aResult)
     136             : {
     137           0 :   nsresult rv = aSeekable->Tell(aResult);
     138           0 :   if (rv == NS_BASE_STREAM_CLOSED) {
     139             :     // Blindly seek to the current position if Tell() returns
     140             :     // NS_BASE_STREAM_CLOSED.
     141             :     // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
     142             :     // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
     143           0 :     nsresult rvSeek = aSeekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
     144           0 :     if (NS_SUCCEEDED(rvSeek)) {
     145           0 :       rv = aSeekable->Tell(aResult);
     146             :     }
     147             :   }
     148           0 :   return rv;
     149             : }
     150             : 
     151           1 : nsMultiplexInputStream::nsMultiplexInputStream()
     152             :   : mLock("nsMultiplexInputStream lock"),
     153             :     mCurrentStream(0),
     154             :     mStartedReadingCurrent(false),
     155           1 :     mStatus(NS_OK)
     156             : {
     157           1 : }
     158             : 
     159             : NS_IMETHODIMP
     160           0 : nsMultiplexInputStream::GetCount(uint32_t* aCount)
     161             : {
     162           0 :   MutexAutoLock lock(mLock);
     163           0 :   *aCount = mStreams.Length();
     164           0 :   return NS_OK;
     165             : }
     166             : 
     167             : NS_IMETHODIMP
     168           2 : nsMultiplexInputStream::AppendStream(nsIInputStream* aStream)
     169             : {
     170           4 :   MutexAutoLock lock(mLock);
     171           4 :   return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
     172             : }
     173             : 
     174             : NS_IMETHODIMP
     175           0 : nsMultiplexInputStream::InsertStream(nsIInputStream* aStream, uint32_t aIndex)
     176             : {
     177           0 :   MutexAutoLock lock(mLock);
     178           0 :   mStreams.InsertElementAt(aIndex, aStream);
     179           0 :   if (mCurrentStream > aIndex ||
     180           0 :       (mCurrentStream == aIndex && mStartedReadingCurrent)) {
     181           0 :     ++mCurrentStream;
     182             :   }
     183           0 :   return NS_OK;
     184             : }
     185             : 
     186             : NS_IMETHODIMP
     187           0 : nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
     188             : {
     189           0 :   MutexAutoLock lock(mLock);
     190           0 :   mStreams.RemoveElementAt(aIndex);
     191           0 :   if (mCurrentStream > aIndex) {
     192           0 :     --mCurrentStream;
     193           0 :   } else if (mCurrentStream == aIndex) {
     194           0 :     mStartedReadingCurrent = false;
     195             :   }
     196             : 
     197           0 :   return NS_OK;
     198             : }
     199             : 
     200             : NS_IMETHODIMP
     201           0 : nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult)
     202             : {
     203           0 :   MutexAutoLock lock(mLock);
     204           0 :   *aResult = mStreams.SafeElementAt(aIndex, nullptr);
     205           0 :   if (NS_WARN_IF(!*aResult)) {
     206           0 :     return NS_ERROR_NOT_AVAILABLE;
     207             :   }
     208             : 
     209           0 :   NS_ADDREF(*aResult);
     210           0 :   return NS_OK;
     211             : }
     212             : 
     213             : NS_IMETHODIMP
     214           0 : nsMultiplexInputStream::Close()
     215             : {
     216           0 :   MutexAutoLock lock(mLock);
     217           0 :   mStatus = NS_BASE_STREAM_CLOSED;
     218             : 
     219           0 :   nsresult rv = NS_OK;
     220             : 
     221           0 :   uint32_t len = mStreams.Length();
     222           0 :   for (uint32_t i = 0; i < len; ++i) {
     223           0 :     nsresult rv2 = mStreams[i]->Close();
     224             :     // We still want to close all streams, but we should return an error
     225           0 :     if (NS_FAILED(rv2)) {
     226           0 :       rv = rv2;
     227             :     }
     228             :   }
     229             : 
     230           0 :   mAsyncWaitCallback = nullptr;
     231             : 
     232           0 :   return rv;
     233             : }
     234             : 
     235             : NS_IMETHODIMP
     236           0 : nsMultiplexInputStream::Available(uint64_t* aResult)
     237             : {
     238           0 :   MutexAutoLock lock(mLock);
     239           0 :   if (NS_FAILED(mStatus)) {
     240           0 :     return mStatus;
     241             :   }
     242             : 
     243           0 :   uint64_t avail = 0;
     244             : 
     245           0 :   uint32_t len = mStreams.Length();
     246           0 :   for (uint32_t i = mCurrentStream; i < len; i++) {
     247             :     uint64_t streamAvail;
     248           0 :     mStatus = AvailableMaybeSeek(mStreams[i], &streamAvail);
     249           0 :     if (NS_WARN_IF(NS_FAILED(mStatus))) {
     250           0 :       return mStatus;
     251             :     }
     252           0 :     avail += streamAvail;
     253             :   }
     254           0 :   *aResult = avail;
     255           0 :   return NS_OK;
     256             : }
     257             : 
     258             : NS_IMETHODIMP
     259           3 : nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
     260             : {
     261           6 :   MutexAutoLock lock(mLock);
     262             :   // It is tempting to implement this method in terms of ReadSegments, but
     263             :   // that would prevent this class from being used with streams that only
     264             :   // implement Read (e.g., file streams).
     265             : 
     266           3 :   *aResult = 0;
     267             : 
     268           3 :   if (mStatus == NS_BASE_STREAM_CLOSED) {
     269           0 :     return NS_OK;
     270             :   }
     271           3 :   if (NS_FAILED(mStatus)) {
     272           0 :     return mStatus;
     273             :   }
     274             : 
     275           3 :   nsresult rv = NS_OK;
     276             : 
     277           3 :   uint32_t len = mStreams.Length();
     278          11 :   while (mCurrentStream < len && aCount) {
     279             :     uint32_t read;
     280           4 :     rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read);
     281             : 
     282             :     // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
     283             :     // (This is a bug in those stream implementations)
     284           4 :     if (rv == NS_BASE_STREAM_CLOSED) {
     285           0 :       NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
     286           0 :       rv = NS_OK;
     287           0 :       read = 0;
     288           4 :     } else if (NS_FAILED(rv)) {
     289           0 :       break;
     290             :     }
     291             : 
     292           4 :     if (read == 0) {
     293           2 :       ++mCurrentStream;
     294           2 :       mStartedReadingCurrent = false;
     295             :     } else {
     296           2 :       NS_ASSERTION(aCount >= read, "Read more than requested");
     297           2 :       *aResult += read;
     298           2 :       aCount -= read;
     299           2 :       aBuf += read;
     300           2 :       mStartedReadingCurrent = true;
     301             :     }
     302             :   }
     303           3 :   return *aResult ? NS_OK : rv;
     304             : }
     305             : 
     306             : NS_IMETHODIMP
     307           0 : nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
     308             :                                      uint32_t aCount, uint32_t* aResult)
     309             : {
     310           0 :   MutexAutoLock lock(mLock);
     311             : 
     312           0 :   if (mStatus == NS_BASE_STREAM_CLOSED) {
     313           0 :     *aResult = 0;
     314           0 :     return NS_OK;
     315             :   }
     316           0 :   if (NS_FAILED(mStatus)) {
     317           0 :     return mStatus;
     318             :   }
     319             : 
     320           0 :   NS_ASSERTION(aWriter, "missing aWriter");
     321             : 
     322           0 :   nsresult rv = NS_OK;
     323           0 :   ReadSegmentsState state;
     324           0 :   state.mThisStream = static_cast<nsIMultiplexInputStream*>(this);
     325           0 :   state.mOffset = 0;
     326           0 :   state.mWriter = aWriter;
     327           0 :   state.mClosure = aClosure;
     328           0 :   state.mDone = false;
     329             : 
     330           0 :   uint32_t len = mStreams.Length();
     331           0 :   while (mCurrentStream < len && aCount) {
     332             :     uint32_t read;
     333           0 :     rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read);
     334             : 
     335             :     // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
     336             :     // (This is a bug in those stream implementations)
     337           0 :     if (rv == NS_BASE_STREAM_CLOSED) {
     338           0 :       NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
     339           0 :       rv = NS_OK;
     340           0 :       read = 0;
     341             :     }
     342             : 
     343             :     // if |aWriter| decided to stop reading segments...
     344           0 :     if (state.mDone || NS_FAILED(rv)) {
     345           0 :       break;
     346             :     }
     347             : 
     348             :     // if stream is empty, then advance to the next stream.
     349           0 :     if (read == 0) {
     350           0 :       ++mCurrentStream;
     351           0 :       mStartedReadingCurrent = false;
     352             :     } else {
     353           0 :       NS_ASSERTION(aCount >= read, "Read more than requested");
     354           0 :       state.mOffset += read;
     355           0 :       aCount -= read;
     356           0 :       mStartedReadingCurrent = true;
     357             :     }
     358             :   }
     359             : 
     360             :   // if we successfully read some data, then this call succeeded.
     361           0 :   *aResult = state.mOffset;
     362           0 :   return state.mOffset ? NS_OK : rv;
     363             : }
     364             : 
     365             : nsresult
     366           0 : nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
     367             :                                   const char* aFromRawSegment,
     368             :                                   uint32_t aToOffset, uint32_t aCount,
     369             :                                   uint32_t* aWriteCount)
     370             : {
     371             :   nsresult rv;
     372           0 :   ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
     373           0 :   rv = (state->mWriter)(state->mThisStream,
     374             :                         state->mClosure,
     375             :                         aFromRawSegment,
     376           0 :                         aToOffset + state->mOffset,
     377             :                         aCount,
     378           0 :                         aWriteCount);
     379           0 :   if (NS_FAILED(rv)) {
     380           0 :     state->mDone = true;
     381             :   }
     382           0 :   return rv;
     383             : }
     384             : 
     385             : NS_IMETHODIMP
     386           0 : nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking)
     387             : {
     388           0 :   MutexAutoLock lock(mLock);
     389             : 
     390           0 :   uint32_t len = mStreams.Length();
     391           0 :   if (len == 0) {
     392             :     // Claim to be non-blocking, since we won't block the caller.
     393             :     // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
     394             :     // so maybe we should claim to be blocking?  It probably doesn't
     395             :     // matter in practice.
     396           0 :     *aNonBlocking = true;
     397           0 :     return NS_OK;
     398             :   }
     399           0 :   for (uint32_t i = 0; i < len; ++i) {
     400           0 :     nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking);
     401           0 :     if (NS_WARN_IF(NS_FAILED(rv))) {
     402           0 :       return rv;
     403             :     }
     404             :     // If one is non-blocking the entire stream becomes non-blocking
     405             :     // (except that we don't implement nsIAsyncInputStream, so there's
     406             :     //  not much for the caller to do if Read returns "would block")
     407           0 :     if (*aNonBlocking) {
     408           0 :       return NS_OK;
     409             :     }
     410             :   }
     411           0 :   return NS_OK;
     412             : }
     413             : 
     414             : NS_IMETHODIMP
     415           0 : nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
     416             : {
     417           0 :   MutexAutoLock lock(mLock);
     418             : 
     419           0 :   if (NS_FAILED(mStatus)) {
     420           0 :     return mStatus;
     421             :   }
     422             : 
     423             :   nsresult rv;
     424             : 
     425           0 :   uint32_t oldCurrentStream = mCurrentStream;
     426           0 :   bool oldStartedReadingCurrent = mStartedReadingCurrent;
     427             : 
     428           0 :   if (aWhence == NS_SEEK_SET) {
     429           0 :     int64_t remaining = aOffset;
     430           0 :     if (aOffset == 0) {
     431           0 :       mCurrentStream = 0;
     432             :     }
     433           0 :     for (uint32_t i = 0; i < mStreams.Length(); ++i) {
     434             :       nsCOMPtr<nsISeekableStream> stream =
     435           0 :         do_QueryInterface(mStreams[i]);
     436           0 :       if (!stream) {
     437           0 :         return NS_ERROR_FAILURE;
     438             :       }
     439             : 
     440             :       // See if all remaining streams should be rewound
     441           0 :       if (remaining == 0) {
     442           0 :         if (i < oldCurrentStream ||
     443           0 :             (i == oldCurrentStream && oldStartedReadingCurrent)) {
     444           0 :           rv = stream->Seek(NS_SEEK_SET, 0);
     445           0 :           if (NS_WARN_IF(NS_FAILED(rv))) {
     446           0 :             return rv;
     447             :           }
     448           0 :           continue;
     449             :         } else {
     450             :           break;
     451             :         }
     452             :       }
     453             : 
     454             :       // Get position in current stream
     455             :       int64_t streamPos;
     456           0 :       if (i > oldCurrentStream ||
     457           0 :           (i == oldCurrentStream && !oldStartedReadingCurrent)) {
     458           0 :         streamPos = 0;
     459             :       } else {
     460           0 :         rv = TellMaybeSeek(stream, &streamPos);
     461           0 :         if (NS_WARN_IF(NS_FAILED(rv))) {
     462           0 :           return rv;
     463             :         }
     464             :       }
     465             : 
     466             :       // See if we need to seek current stream forward or backward
     467           0 :       if (remaining < streamPos) {
     468           0 :         rv = stream->Seek(NS_SEEK_SET, remaining);
     469           0 :         if (NS_WARN_IF(NS_FAILED(rv))) {
     470           0 :           return rv;
     471             :         }
     472             : 
     473           0 :         mCurrentStream = i;
     474           0 :         mStartedReadingCurrent = remaining != 0;
     475             : 
     476           0 :         remaining = 0;
     477           0 :       } else if (remaining > streamPos) {
     478           0 :         if (i < oldCurrentStream) {
     479             :           // We're already at end so no need to seek this stream
     480           0 :           remaining -= streamPos;
     481           0 :           NS_ASSERTION(remaining >= 0, "Remaining invalid");
     482             :         } else {
     483             :           uint64_t avail;
     484           0 :           rv = AvailableMaybeSeek(mStreams[i], &avail);
     485           0 :           if (NS_WARN_IF(NS_FAILED(rv))) {
     486           0 :             return rv;
     487             :           }
     488             : 
     489           0 :           int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
     490             : 
     491           0 :           rv = stream->Seek(NS_SEEK_SET, newPos);
     492           0 :           if (NS_WARN_IF(NS_FAILED(rv))) {
     493           0 :             return rv;
     494             :           }
     495             : 
     496           0 :           mCurrentStream = i;
     497           0 :           mStartedReadingCurrent = true;
     498             : 
     499           0 :           remaining -= newPos;
     500           0 :           NS_ASSERTION(remaining >= 0, "Remaining invalid");
     501             :         }
     502             :       } else {
     503           0 :         NS_ASSERTION(remaining == streamPos, "Huh?");
     504           0 :         MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
     505           0 :         remaining = 0;
     506           0 :         mCurrentStream = i;
     507           0 :         mStartedReadingCurrent = true;
     508             :       }
     509             :     }
     510             : 
     511           0 :     return NS_OK;
     512             :   }
     513             : 
     514           0 :   if (aWhence == NS_SEEK_CUR && aOffset > 0) {
     515           0 :     int64_t remaining = aOffset;
     516           0 :     for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
     517             :       nsCOMPtr<nsISeekableStream> stream =
     518           0 :         do_QueryInterface(mStreams[i]);
     519             : 
     520             :       uint64_t avail;
     521           0 :       rv = AvailableMaybeSeek(mStreams[i], &avail);
     522           0 :       if (NS_WARN_IF(NS_FAILED(rv))) {
     523           0 :         return rv;
     524             :       }
     525             : 
     526           0 :       int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
     527             : 
     528           0 :       rv = stream->Seek(NS_SEEK_CUR, seek);
     529           0 :       if (NS_WARN_IF(NS_FAILED(rv))) {
     530           0 :         return rv;
     531             :       }
     532             : 
     533           0 :       mCurrentStream = i;
     534           0 :       mStartedReadingCurrent = true;
     535             : 
     536           0 :       remaining -= seek;
     537             :     }
     538             : 
     539           0 :     return NS_OK;
     540             :   }
     541             : 
     542           0 :   if (aWhence == NS_SEEK_CUR && aOffset < 0) {
     543           0 :     int64_t remaining = -aOffset;
     544           0 :     for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
     545             :       nsCOMPtr<nsISeekableStream> stream =
     546           0 :         do_QueryInterface(mStreams[i]);
     547             : 
     548             :       int64_t pos;
     549           0 :       rv = TellMaybeSeek(stream, &pos);
     550           0 :       if (NS_WARN_IF(NS_FAILED(rv))) {
     551           0 :         return rv;
     552             :       }
     553             : 
     554           0 :       int64_t seek = XPCOM_MIN(pos, remaining);
     555             : 
     556           0 :       rv = stream->Seek(NS_SEEK_CUR, -seek);
     557           0 :       if (NS_WARN_IF(NS_FAILED(rv))) {
     558           0 :         return rv;
     559             :       }
     560             : 
     561           0 :       mCurrentStream = i;
     562           0 :       mStartedReadingCurrent = seek != -pos;
     563             : 
     564           0 :       remaining -= seek;
     565             :     }
     566             : 
     567           0 :     return NS_OK;
     568             :   }
     569             : 
     570           0 :   if (aWhence == NS_SEEK_CUR) {
     571           0 :     NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
     572             : 
     573           0 :     return NS_OK;
     574             :   }
     575             : 
     576           0 :   if (aWhence == NS_SEEK_END) {
     577           0 :     if (aOffset > 0) {
     578           0 :       return NS_ERROR_INVALID_ARG;
     579             :     }
     580           0 :     int64_t remaining = aOffset;
     581           0 :     for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
     582             :       nsCOMPtr<nsISeekableStream> stream =
     583           0 :         do_QueryInterface(mStreams[i]);
     584             : 
     585             :       // See if all remaining streams should be seeked to end
     586           0 :       if (remaining == 0) {
     587           0 :         if (i >= oldCurrentStream) {
     588           0 :           rv = stream->Seek(NS_SEEK_END, 0);
     589           0 :           if (NS_WARN_IF(NS_FAILED(rv))) {
     590           0 :             return rv;
     591             :           }
     592             :         } else {
     593           0 :           break;
     594             :         }
     595             :       }
     596             : 
     597             :       // Get position in current stream
     598             :       int64_t streamPos;
     599           0 :       if (i < oldCurrentStream) {
     600           0 :         streamPos = 0;
     601             :       } else {
     602             :         uint64_t avail;
     603           0 :         rv = AvailableMaybeSeek(mStreams[i], &avail);
     604           0 :         if (NS_WARN_IF(NS_FAILED(rv))) {
     605           0 :           return rv;
     606             :         }
     607             : 
     608           0 :         streamPos = avail;
     609             :       }
     610             : 
     611             :       // See if we have enough data in the current stream.
     612           0 :       if (DeprecatedAbs(remaining) < streamPos) {
     613           0 :         rv = stream->Seek(NS_SEEK_END, remaining);
     614           0 :         if (NS_WARN_IF(NS_FAILED(rv))) {
     615           0 :           return rv;
     616             :         }
     617             : 
     618           0 :         mCurrentStream = i;
     619           0 :         mStartedReadingCurrent = true;
     620             : 
     621           0 :         remaining = 0;
     622           0 :       } else if (DeprecatedAbs(remaining) > streamPos) {
     623           0 :         if (i > oldCurrentStream ||
     624           0 :             (i == oldCurrentStream && !oldStartedReadingCurrent)) {
     625             :           // We're already at start so no need to seek this stream
     626           0 :           remaining += streamPos;
     627             :         } else {
     628             :           int64_t avail;
     629           0 :           rv = TellMaybeSeek(stream, &avail);
     630           0 :           if (NS_WARN_IF(NS_FAILED(rv))) {
     631           0 :             return rv;
     632             :           }
     633             : 
     634           0 :           int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
     635             : 
     636           0 :           rv = stream->Seek(NS_SEEK_END, -newPos);
     637           0 :           if (NS_WARN_IF(NS_FAILED(rv))) {
     638           0 :             return rv;
     639             :           }
     640             : 
     641           0 :           mCurrentStream = i;
     642           0 :           mStartedReadingCurrent = true;
     643             : 
     644           0 :           remaining += newPos;
     645             :         }
     646             :       } else {
     647           0 :         NS_ASSERTION(remaining == streamPos, "Huh?");
     648           0 :         remaining = 0;
     649             :       }
     650             :     }
     651             : 
     652           0 :     return NS_OK;
     653             :   }
     654             : 
     655             :   // other Seeks not implemented yet
     656           0 :   return NS_ERROR_NOT_IMPLEMENTED;
     657             : }
     658             : 
     659             : NS_IMETHODIMP
     660           0 : nsMultiplexInputStream::Tell(int64_t* aResult)
     661             : {
     662           0 :   MutexAutoLock lock(mLock);
     663             : 
     664           0 :   if (NS_FAILED(mStatus)) {
     665           0 :     return mStatus;
     666             :   }
     667             : 
     668             :   nsresult rv;
     669           0 :   int64_t ret64 = 0;
     670             :   uint32_t i, last;
     671           0 :   last = mStartedReadingCurrent ? mCurrentStream + 1 : mCurrentStream;
     672           0 :   for (i = 0; i < last; ++i) {
     673           0 :     nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(mStreams[i]);
     674           0 :     if (NS_WARN_IF(!stream)) {
     675           0 :       return NS_ERROR_NO_INTERFACE;
     676             :     }
     677             : 
     678             :     int64_t pos;
     679           0 :     rv = TellMaybeSeek(stream, &pos);
     680           0 :     if (NS_WARN_IF(NS_FAILED(rv))) {
     681           0 :       return rv;
     682             :     }
     683           0 :     ret64 += pos;
     684             :   }
     685           0 :   *aResult =  ret64;
     686             : 
     687           0 :   return NS_OK;
     688             : }
     689             : 
     690             : NS_IMETHODIMP
     691           0 : nsMultiplexInputStream::SetEOF()
     692             : {
     693           0 :   return NS_ERROR_NOT_IMPLEMENTED;
     694             : }
     695             : 
     696             : NS_IMETHODIMP
     697           0 : nsMultiplexInputStream::CloseWithStatus(nsresult aStatus)
     698             : {
     699           0 :   return Close();
     700             : }
     701             : 
     702             : // This class is used to inform nsMultiplexInputStream that it's time to execute
     703             : // the asyncWait callback.
     704           0 : class AsyncWaitRunnable final : public Runnable
     705             : {
     706             :   RefPtr<nsMultiplexInputStream> mStream;
     707             : 
     708             : public:
     709           0 :   explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
     710           0 :     : Runnable("AsyncWaitRunnable")
     711           0 :     , mStream(aStream)
     712             :   {
     713           0 :     MOZ_ASSERT(aStream);
     714           0 :   }
     715             : 
     716             :   NS_IMETHOD
     717           0 :   Run() override
     718             :   {
     719           0 :     mStream->AsyncWaitCompleted();
     720           0 :     return NS_OK;
     721             :   }
     722             : };
     723             : 
     724             : // This helper class processes an array of nsIAsyncInputStreams, calling
     725             : // AsyncWait() for each one of them. When all of them have answered, this helper
     726             : // dispatches a AsyncWaitRunnable object. If there is an error calling
     727             : // AsyncWait(), AsyncWaitRunnable is not dispatched.
     728             : class AsyncStreamHelper final : public nsIInputStreamCallback
     729             : {
     730             : public:
     731             :   NS_DECL_THREADSAFE_ISUPPORTS
     732             : 
     733             :   static nsresult
     734           0 :   Process(nsMultiplexInputStream* aStream,
     735             :           nsTArray<nsCOMPtr<nsIAsyncInputStream>>& aAsyncStreams,
     736             :           uint32_t aFlags, uint32_t aRequestedCount,
     737             :           nsIEventTarget* aEventTarget)
     738             :   {
     739           0 :     MOZ_ASSERT(aStream);
     740           0 :     MOZ_ASSERT(!aAsyncStreams.IsEmpty());
     741           0 :     MOZ_ASSERT(aEventTarget);
     742             : 
     743             :     RefPtr<AsyncStreamHelper> helper =
     744           0 :       new AsyncStreamHelper(aStream, aAsyncStreams, aEventTarget);
     745           0 :     return helper->Run(aFlags, aRequestedCount);
     746             :   }
     747             : 
     748             : private:
     749           0 :   AsyncStreamHelper(nsMultiplexInputStream* aStream,
     750             :                     nsTArray<nsCOMPtr<nsIAsyncInputStream>>& aAsyncStreams,
     751             :                     nsIEventTarget* aEventTarget)
     752           0 :     : mMutex("AsyncStreamHelper::mMutex")
     753             :     , mStream(aStream)
     754             :     , mEventTarget(aEventTarget)
     755           0 :     , mValid(true)
     756             :   {
     757           0 :     mPendingStreams.SwapElements(aAsyncStreams);
     758           0 :   }
     759             : 
     760           0 :   ~AsyncStreamHelper() = default;
     761             : 
     762             :   nsresult
     763           0 :   Run(uint32_t aFlags, uint32_t aRequestedCount)
     764             :   {
     765           0 :     MutexAutoLock lock(mMutex);
     766             : 
     767           0 :     for (uint32_t i = 0; i < mPendingStreams.Length(); ++i) {
     768             :       nsresult rv =
     769           0 :         mPendingStreams[i]->AsyncWait(this, aFlags, aRequestedCount,
     770           0 :                                       mEventTarget);
     771           0 :       if (NS_WARN_IF(NS_FAILED(rv))) {
     772           0 :         mValid = false;
     773           0 :         return rv;
     774             :       }
     775             :     }
     776             : 
     777           0 :     return NS_OK;
     778             :   }
     779             : 
     780             :   NS_IMETHOD
     781           0 :   OnInputStreamReady(nsIAsyncInputStream* aStream) override
     782             :   {
     783           0 :     MOZ_ASSERT(aStream, "This cannot be one of ours.");
     784             : 
     785           0 :     MutexAutoLock lock(mMutex);
     786             : 
     787             :     // We failed during the Run().
     788           0 :     if (!mValid) {
     789           0 :       return NS_OK;
     790             :     }
     791             : 
     792           0 :     MOZ_ASSERT(mPendingStreams.Contains(aStream));
     793           0 :     mPendingStreams.RemoveElement(aStream);
     794             : 
     795             :     // The last asyncStream answered. We can inform nsMultiplexInputStream.
     796           0 :     if (mPendingStreams.IsEmpty()) {
     797           0 :       RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(mStream);
     798           0 :       return mEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
     799             :     }
     800             : 
     801           0 :     return NS_OK;
     802             :   }
     803             : 
     804             :   Mutex mMutex;
     805             :   RefPtr<nsMultiplexInputStream> mStream;
     806             :   nsTArray<nsCOMPtr<nsIAsyncInputStream>> mPendingStreams;
     807             :   nsCOMPtr<nsIEventTarget> mEventTarget;
     808             :   bool mValid;
     809             : };
     810             : 
     811           0 : NS_IMPL_ISUPPORTS(AsyncStreamHelper, nsIInputStreamCallback)
     812             : 
     813             : NS_IMETHODIMP
     814           0 : nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
     815             :                                   uint32_t aFlags,
     816             :                                   uint32_t aRequestedCount,
     817             :                                   nsIEventTarget* aEventTarget)
     818             : {
     819             :   // When AsyncWait() is called, it's better to call AsyncWait() to any sub
     820             :   // stream if they are valid nsIAsyncInputStream instances. In this way, when
     821             :   // they all call OnInputStreamReady(), we can proceed with the Read().
     822             : 
     823           0 :   MutexAutoLock lock(mLock);
     824             : 
     825           0 :   if (NS_FAILED(mStatus)) {
     826           0 :     return mStatus;
     827             :   }
     828             : 
     829           0 :   if (mAsyncWaitCallback && aCallback) {
     830           0 :     return NS_ERROR_FAILURE;
     831             :   }
     832             : 
     833           0 :   mAsyncWaitCallback = aCallback;
     834             : 
     835           0 :   if (!mAsyncWaitCallback) {
     836           0 :       return NS_OK;
     837             :   }
     838             : 
     839           0 :   nsTArray<nsCOMPtr<nsIAsyncInputStream>> asyncStreams;
     840           0 :   for (uint32_t i = mCurrentStream; i < mStreams.Length(); ++i) {
     841             :     nsCOMPtr<nsIAsyncInputStream> asyncStream =
     842           0 :       do_QueryInterface(mStreams.SafeElementAt(i, nullptr));
     843           0 :     if (asyncStream) {
     844           0 :       asyncStreams.AppendElement(asyncStream);
     845             :     }
     846             :   }
     847             : 
     848           0 :   if (!aEventTarget) {
     849           0 :     aEventTarget = SystemGroup::EventTargetFor(TaskCategory::Other);
     850             :   }
     851             : 
     852           0 :   if (asyncStreams.IsEmpty()) {
     853           0 :     RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(this);
     854           0 :     return aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
     855             :   }
     856             : 
     857             :   return AsyncStreamHelper::Process(this, asyncStreams, aFlags, aRequestedCount,
     858           0 :                                     aEventTarget);
     859             : }
     860             : 
     861             : void
     862           0 : nsMultiplexInputStream::AsyncWaitCompleted()
     863             : {
     864           0 :   nsCOMPtr<nsIInputStreamCallback> callback;
     865             : 
     866             :   {
     867           0 :     MutexAutoLock lock(mLock);
     868             : 
     869             :     // The callback has been nullified in the meantime.
     870           0 :     if (!mAsyncWaitCallback) {
     871           0 :       return;
     872             :     }
     873             : 
     874           0 :     mAsyncWaitCallback.swap(callback);
     875             :   }
     876             : 
     877           0 :   callback->OnInputStreamReady(this);
     878             : }
     879             : 
     880             : nsresult
     881           1 : nsMultiplexInputStreamConstructor(nsISupports* aOuter,
     882             :                                   REFNSIID aIID,
     883             :                                   void** aResult)
     884             : {
     885           1 :   *aResult = nullptr;
     886             : 
     887           1 :   if (aOuter) {
     888           0 :     return NS_ERROR_NO_AGGREGATION;
     889             :   }
     890             : 
     891           2 :   RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
     892             : 
     893           1 :   return inst->QueryInterface(aIID, aResult);
     894             : }
     895             : 
     896             : void
     897           0 : nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
     898             :                                   FileDescriptorArray& aFileDescriptors)
     899             : {
     900           0 :   MutexAutoLock lock(mLock);
     901             : 
     902           0 :   MultiplexInputStreamParams params;
     903             : 
     904           0 :   uint32_t streamCount = mStreams.Length();
     905             : 
     906           0 :   if (streamCount) {
     907           0 :     InfallibleTArray<InputStreamParams>& streams = params.streams();
     908             : 
     909           0 :     streams.SetCapacity(streamCount);
     910           0 :     for (uint32_t index = 0; index < streamCount; index++) {
     911           0 :       InputStreamParams childStreamParams;
     912           0 :       InputStreamHelper::SerializeInputStream(mStreams[index],
     913             :                                               childStreamParams,
     914           0 :                                               aFileDescriptors);
     915             : 
     916           0 :       streams.AppendElement(childStreamParams);
     917             :     }
     918             :   }
     919             : 
     920           0 :   params.currentStream() = mCurrentStream;
     921           0 :   params.status() = mStatus;
     922           0 :   params.startedReadingCurrent() = mStartedReadingCurrent;
     923             : 
     924           0 :   aParams = params;
     925           0 : }
     926             : 
     927             : bool
     928           0 : nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
     929             :                                     const FileDescriptorArray& aFileDescriptors)
     930             : {
     931           0 :   if (aParams.type() !=
     932             :       InputStreamParams::TMultiplexInputStreamParams) {
     933           0 :     NS_ERROR("Received unknown parameters from the other process!");
     934           0 :     return false;
     935             :   }
     936             : 
     937             :   const MultiplexInputStreamParams& params =
     938           0 :     aParams.get_MultiplexInputStreamParams();
     939             : 
     940           0 :   const InfallibleTArray<InputStreamParams>& streams = params.streams();
     941             : 
     942           0 :   uint32_t streamCount = streams.Length();
     943           0 :   for (uint32_t index = 0; index < streamCount; index++) {
     944             :     nsCOMPtr<nsIInputStream> stream =
     945           0 :       InputStreamHelper::DeserializeInputStream(streams[index],
     946           0 :                                                 aFileDescriptors);
     947           0 :     if (!stream) {
     948           0 :       NS_WARNING("Deserialize failed!");
     949           0 :       return false;
     950             :     }
     951             : 
     952           0 :     if (NS_FAILED(AppendStream(stream))) {
     953           0 :       NS_WARNING("AppendStream failed!");
     954           0 :       return false;
     955             :     }
     956             :   }
     957             : 
     958           0 :   mCurrentStream = params.currentStream();
     959           0 :   mStatus = params.status();
     960           0 :   mStartedReadingCurrent = params.startedReadingCurrent();
     961             : 
     962           0 :   return true;
     963             : }
     964             : 
     965             : Maybe<uint64_t>
     966           0 : nsMultiplexInputStream::ExpectedSerializedLength()
     967             : {
     968           0 :   MutexAutoLock lock(mLock);
     969             : 
     970           0 :   bool lengthValueExists = false;
     971           0 :   uint64_t expectedLength = 0;
     972           0 :   uint32_t streamCount = mStreams.Length();
     973           0 :   for (uint32_t index = 0; index < streamCount; index++) {
     974           0 :     nsCOMPtr<nsIIPCSerializableInputStream> stream = do_QueryInterface(mStreams[index]);
     975           0 :     if (!stream) {
     976           0 :       continue;
     977             :     }
     978           0 :     Maybe<uint64_t> length = stream->ExpectedSerializedLength();
     979           0 :     if (length.isNothing()) {
     980           0 :       continue;
     981             :     }
     982           0 :     lengthValueExists = true;
     983           0 :     expectedLength += length.value();
     984             :   }
     985           0 :   return lengthValueExists ? Some(expectedLength) : Nothing();
     986             : }
     987             : 
     988             : NS_IMETHODIMP
     989           0 : nsMultiplexInputStream::GetCloneable(bool* aCloneable)
     990             : {
     991           0 :   MutexAutoLock lock(mLock);
     992             :   //XXXnsm Cloning a multiplex stream which has started reading is not permitted
     993             :   //right now.
     994           0 :   if (mCurrentStream > 0 || mStartedReadingCurrent) {
     995           0 :     *aCloneable = false;
     996           0 :     return NS_OK;
     997             :   }
     998             : 
     999           0 :   uint32_t len = mStreams.Length();
    1000           0 :   for (uint32_t i = 0; i < len; ++i) {
    1001           0 :     nsCOMPtr<nsICloneableInputStream> cis = do_QueryInterface(mStreams[i]);
    1002           0 :     if (!cis || !cis->GetCloneable()) {
    1003           0 :       *aCloneable = false;
    1004           0 :       return NS_OK;
    1005             :     }
    1006             :   }
    1007             : 
    1008           0 :   *aCloneable = true;
    1009           0 :   return NS_OK;
    1010             : }
    1011             : 
    1012             : NS_IMETHODIMP
    1013           0 : nsMultiplexInputStream::Clone(nsIInputStream** aClone)
    1014             : {
    1015           0 :   MutexAutoLock lock(mLock);
    1016             : 
    1017             :   //XXXnsm Cloning a multiplex stream which has started reading is not permitted
    1018             :   //right now.
    1019           0 :   if (mCurrentStream > 0 || mStartedReadingCurrent) {
    1020           0 :     return NS_ERROR_FAILURE;
    1021             :   }
    1022             : 
    1023           0 :   nsCOMPtr<nsIMultiplexInputStream> clone = new nsMultiplexInputStream();
    1024             : 
    1025             :   nsresult rv;
    1026           0 :   uint32_t len = mStreams.Length();
    1027           0 :   for (uint32_t i = 0; i < len; ++i) {
    1028           0 :     nsCOMPtr<nsICloneableInputStream> substream = do_QueryInterface(mStreams[i]);
    1029           0 :     if (NS_WARN_IF(!substream)) {
    1030           0 :       return NS_ERROR_FAILURE;
    1031             :     }
    1032             : 
    1033           0 :     nsCOMPtr<nsIInputStream> clonedSubstream;
    1034           0 :     rv = substream->Clone(getter_AddRefs(clonedSubstream));
    1035           0 :     if (NS_WARN_IF(NS_FAILED(rv))) {
    1036           0 :       return rv;
    1037             :     }
    1038             : 
    1039           0 :     rv = clone->AppendStream(clonedSubstream);
    1040           0 :     if (NS_WARN_IF(NS_FAILED(rv))) {
    1041           0 :       return rv;
    1042             :     }
    1043             :   }
    1044             : 
    1045           0 :   clone.forget(aClone);
    1046           0 :   return NS_OK;
    1047             : }
    1048             : 
    1049             : bool
    1050           3 : nsMultiplexInputStream::IsSeekable() const
    1051             : {
    1052           9 :   for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
    1053          12 :     nsCOMPtr<nsISeekableStream> substream = do_QueryInterface(mStreams[i]);
    1054           6 :     if (!substream) {
    1055           0 :       return false;
    1056             :     }
    1057             :   }
    1058           3 :   return true;
    1059             : }
    1060             : 
    1061             : bool
    1062           3 : nsMultiplexInputStream::IsIPCSerializable() const
    1063             : {
    1064           9 :   for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
    1065          12 :     nsCOMPtr<nsIIPCSerializableInputStream> substream = do_QueryInterface(mStreams[i]);
    1066           6 :     if (!substream) {
    1067           0 :       return false;
    1068             :     }
    1069             :   }
    1070           3 :   return true;
    1071             : }
    1072             : 
    1073             : bool
    1074           2 : nsMultiplexInputStream::IsCloneable() const
    1075             : {
    1076           6 :   for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
    1077           8 :     nsCOMPtr<nsICloneableInputStream> substream = do_QueryInterface(mStreams[i]);
    1078           4 :     if (!substream) {
    1079           0 :       return false;
    1080             :     }
    1081             :   }
    1082           2 :   return true;
    1083             : }
    1084             : 
    1085             : bool
    1086           2 : nsMultiplexInputStream::IsAsyncInputStream() const
    1087             : {
    1088             :   // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
    1089             :   // substream implements that interface.
    1090           6 :   for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
    1091           8 :     nsCOMPtr<nsIAsyncInputStream> substream = do_QueryInterface(mStreams[i]);
    1092           4 :     if (substream) {
    1093           0 :       return true;
    1094             :     }
    1095             :   }
    1096           2 :   return false;
    1097             : }

Generated by: LCOV version 1.13