Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : /**
8 : * The multiplex stream concatenates a list of input streams into a single
9 : * stream.
10 : */
11 :
12 : #include "mozilla/Attributes.h"
13 : #include "mozilla/MathAlgorithms.h"
14 : #include "mozilla/Mutex.h"
15 : #include "mozilla/SystemGroup.h"
16 :
17 : #include "base/basictypes.h"
18 :
19 : #include "nsMultiplexInputStream.h"
20 : #include "nsICloneableInputStream.h"
21 : #include "nsIMultiplexInputStream.h"
22 : #include "nsISeekableStream.h"
23 : #include "nsCOMPtr.h"
24 : #include "nsCOMArray.h"
25 : #include "nsIClassInfoImpl.h"
26 : #include "nsIIPCSerializableInputStream.h"
27 : #include "mozilla/ipc/InputStreamUtils.h"
28 : #include "nsIAsyncInputStream.h"
29 :
30 : using namespace mozilla;
31 : using namespace mozilla::ipc;
32 :
33 : using mozilla::DeprecatedAbs;
34 : using mozilla::Maybe;
35 : using mozilla::Nothing;
36 : using mozilla::Some;
37 :
38 : class nsMultiplexInputStream final
39 : : public nsIMultiplexInputStream
40 : , public nsISeekableStream
41 : , public nsIIPCSerializableInputStream
42 : , public nsICloneableInputStream
43 : , public nsIAsyncInputStream
44 : {
45 : public:
46 : nsMultiplexInputStream();
47 :
48 : NS_DECL_THREADSAFE_ISUPPORTS
49 : NS_DECL_NSIINPUTSTREAM
50 : NS_DECL_NSIMULTIPLEXINPUTSTREAM
51 : NS_DECL_NSISEEKABLESTREAM
52 : NS_DECL_NSIIPCSERIALIZABLEINPUTSTREAM
53 : NS_DECL_NSICLONEABLEINPUTSTREAM
54 : NS_DECL_NSIASYNCINPUTSTREAM
55 :
56 : void AsyncWaitCompleted();
57 :
58 : private:
59 1 : ~nsMultiplexInputStream()
60 1 : {
61 1 : }
62 :
63 0 : struct MOZ_STACK_CLASS ReadSegmentsState
64 : {
65 : nsCOMPtr<nsIInputStream> mThisStream;
66 : uint32_t mOffset;
67 : nsWriteSegmentFun mWriter;
68 : void* mClosure;
69 : bool mDone;
70 : };
71 :
72 : static nsresult ReadSegCb(nsIInputStream* aIn, void* aClosure,
73 : const char* aFromRawSegment, uint32_t aToOffset,
74 : uint32_t aCount, uint32_t* aWriteCount);
75 :
76 : bool IsSeekable() const;
77 : bool IsIPCSerializable() const;
78 : bool IsCloneable() const;
79 : bool IsAsyncInputStream() const;
80 :
81 : Mutex mLock; // Protects access to all data members.
82 : nsTArray<nsCOMPtr<nsIInputStream>> mStreams;
83 : uint32_t mCurrentStream;
84 : bool mStartedReadingCurrent;
85 : nsresult mStatus;
86 : nsCOMPtr<nsIInputStreamCallback> mAsyncWaitCallback;
87 : };
88 :
89 5 : NS_IMPL_ADDREF(nsMultiplexInputStream)
90 6 : NS_IMPL_RELEASE(nsMultiplexInputStream)
91 :
92 3 : NS_IMPL_CLASSINFO(nsMultiplexInputStream, nullptr, nsIClassInfo::THREADSAFE,
93 : NS_MULTIPLEXINPUTSTREAM_CID)
94 :
95 5 : NS_INTERFACE_MAP_BEGIN(nsMultiplexInputStream)
96 5 : NS_INTERFACE_MAP_ENTRY(nsIMultiplexInputStream)
97 3 : NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIMultiplexInputStream)
98 3 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream, IsSeekable())
99 3 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
100 : IsIPCSerializable())
101 2 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
102 : IsCloneable())
103 2 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
104 : IsAsyncInputStream())
105 2 : NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIMultiplexInputStream)
106 2 : NS_IMPL_QUERY_CLASSINFO(nsMultiplexInputStream)
107 2 : NS_INTERFACE_MAP_END
108 :
109 0 : NS_IMPL_CI_INTERFACE_GETTER(nsMultiplexInputStream,
110 : nsIMultiplexInputStream,
111 : nsIInputStream,
112 : nsISeekableStream)
113 :
114 : static nsresult
115 0 : AvailableMaybeSeek(nsIInputStream* aStream, uint64_t* aResult)
116 : {
117 0 : nsresult rv = aStream->Available(aResult);
118 0 : if (rv == NS_BASE_STREAM_CLOSED) {
119 : // Blindly seek to the current position if Available() returns
120 : // NS_BASE_STREAM_CLOSED.
121 : // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
122 : // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
123 0 : nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(aStream);
124 0 : if (seekable) {
125 0 : nsresult rvSeek = seekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
126 0 : if (NS_SUCCEEDED(rvSeek)) {
127 0 : rv = aStream->Available(aResult);
128 : }
129 : }
130 : }
131 0 : return rv;
132 : }
133 :
134 : static nsresult
135 0 : TellMaybeSeek(nsISeekableStream* aSeekable, int64_t* aResult)
136 : {
137 0 : nsresult rv = aSeekable->Tell(aResult);
138 0 : if (rv == NS_BASE_STREAM_CLOSED) {
139 : // Blindly seek to the current position if Tell() returns
140 : // NS_BASE_STREAM_CLOSED.
141 : // If nsIFileInputStream is closed in Read() due to CLOSE_ON_EOF flag,
142 : // Seek() could reopen the file if REOPEN_ON_REWIND flag is set.
143 0 : nsresult rvSeek = aSeekable->Seek(nsISeekableStream::NS_SEEK_CUR, 0);
144 0 : if (NS_SUCCEEDED(rvSeek)) {
145 0 : rv = aSeekable->Tell(aResult);
146 : }
147 : }
148 0 : return rv;
149 : }
150 :
151 1 : nsMultiplexInputStream::nsMultiplexInputStream()
152 : : mLock("nsMultiplexInputStream lock"),
153 : mCurrentStream(0),
154 : mStartedReadingCurrent(false),
155 1 : mStatus(NS_OK)
156 : {
157 1 : }
158 :
159 : NS_IMETHODIMP
160 0 : nsMultiplexInputStream::GetCount(uint32_t* aCount)
161 : {
162 0 : MutexAutoLock lock(mLock);
163 0 : *aCount = mStreams.Length();
164 0 : return NS_OK;
165 : }
166 :
167 : NS_IMETHODIMP
168 2 : nsMultiplexInputStream::AppendStream(nsIInputStream* aStream)
169 : {
170 4 : MutexAutoLock lock(mLock);
171 4 : return mStreams.AppendElement(aStream) ? NS_OK : NS_ERROR_OUT_OF_MEMORY;
172 : }
173 :
174 : NS_IMETHODIMP
175 0 : nsMultiplexInputStream::InsertStream(nsIInputStream* aStream, uint32_t aIndex)
176 : {
177 0 : MutexAutoLock lock(mLock);
178 0 : mStreams.InsertElementAt(aIndex, aStream);
179 0 : if (mCurrentStream > aIndex ||
180 0 : (mCurrentStream == aIndex && mStartedReadingCurrent)) {
181 0 : ++mCurrentStream;
182 : }
183 0 : return NS_OK;
184 : }
185 :
186 : NS_IMETHODIMP
187 0 : nsMultiplexInputStream::RemoveStream(uint32_t aIndex)
188 : {
189 0 : MutexAutoLock lock(mLock);
190 0 : mStreams.RemoveElementAt(aIndex);
191 0 : if (mCurrentStream > aIndex) {
192 0 : --mCurrentStream;
193 0 : } else if (mCurrentStream == aIndex) {
194 0 : mStartedReadingCurrent = false;
195 : }
196 :
197 0 : return NS_OK;
198 : }
199 :
200 : NS_IMETHODIMP
201 0 : nsMultiplexInputStream::GetStream(uint32_t aIndex, nsIInputStream** aResult)
202 : {
203 0 : MutexAutoLock lock(mLock);
204 0 : *aResult = mStreams.SafeElementAt(aIndex, nullptr);
205 0 : if (NS_WARN_IF(!*aResult)) {
206 0 : return NS_ERROR_NOT_AVAILABLE;
207 : }
208 :
209 0 : NS_ADDREF(*aResult);
210 0 : return NS_OK;
211 : }
212 :
213 : NS_IMETHODIMP
214 0 : nsMultiplexInputStream::Close()
215 : {
216 0 : MutexAutoLock lock(mLock);
217 0 : mStatus = NS_BASE_STREAM_CLOSED;
218 :
219 0 : nsresult rv = NS_OK;
220 :
221 0 : uint32_t len = mStreams.Length();
222 0 : for (uint32_t i = 0; i < len; ++i) {
223 0 : nsresult rv2 = mStreams[i]->Close();
224 : // We still want to close all streams, but we should return an error
225 0 : if (NS_FAILED(rv2)) {
226 0 : rv = rv2;
227 : }
228 : }
229 :
230 0 : mAsyncWaitCallback = nullptr;
231 :
232 0 : return rv;
233 : }
234 :
235 : NS_IMETHODIMP
236 0 : nsMultiplexInputStream::Available(uint64_t* aResult)
237 : {
238 0 : MutexAutoLock lock(mLock);
239 0 : if (NS_FAILED(mStatus)) {
240 0 : return mStatus;
241 : }
242 :
243 0 : uint64_t avail = 0;
244 :
245 0 : uint32_t len = mStreams.Length();
246 0 : for (uint32_t i = mCurrentStream; i < len; i++) {
247 : uint64_t streamAvail;
248 0 : mStatus = AvailableMaybeSeek(mStreams[i], &streamAvail);
249 0 : if (NS_WARN_IF(NS_FAILED(mStatus))) {
250 0 : return mStatus;
251 : }
252 0 : avail += streamAvail;
253 : }
254 0 : *aResult = avail;
255 0 : return NS_OK;
256 : }
257 :
258 : NS_IMETHODIMP
259 3 : nsMultiplexInputStream::Read(char* aBuf, uint32_t aCount, uint32_t* aResult)
260 : {
261 6 : MutexAutoLock lock(mLock);
262 : // It is tempting to implement this method in terms of ReadSegments, but
263 : // that would prevent this class from being used with streams that only
264 : // implement Read (e.g., file streams).
265 :
266 3 : *aResult = 0;
267 :
268 3 : if (mStatus == NS_BASE_STREAM_CLOSED) {
269 0 : return NS_OK;
270 : }
271 3 : if (NS_FAILED(mStatus)) {
272 0 : return mStatus;
273 : }
274 :
275 3 : nsresult rv = NS_OK;
276 :
277 3 : uint32_t len = mStreams.Length();
278 11 : while (mCurrentStream < len && aCount) {
279 : uint32_t read;
280 4 : rv = mStreams[mCurrentStream]->Read(aBuf, aCount, &read);
281 :
282 : // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
283 : // (This is a bug in those stream implementations)
284 4 : if (rv == NS_BASE_STREAM_CLOSED) {
285 0 : NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
286 0 : rv = NS_OK;
287 0 : read = 0;
288 4 : } else if (NS_FAILED(rv)) {
289 0 : break;
290 : }
291 :
292 4 : if (read == 0) {
293 2 : ++mCurrentStream;
294 2 : mStartedReadingCurrent = false;
295 : } else {
296 2 : NS_ASSERTION(aCount >= read, "Read more than requested");
297 2 : *aResult += read;
298 2 : aCount -= read;
299 2 : aBuf += read;
300 2 : mStartedReadingCurrent = true;
301 : }
302 : }
303 3 : return *aResult ? NS_OK : rv;
304 : }
305 :
306 : NS_IMETHODIMP
307 0 : nsMultiplexInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
308 : uint32_t aCount, uint32_t* aResult)
309 : {
310 0 : MutexAutoLock lock(mLock);
311 :
312 0 : if (mStatus == NS_BASE_STREAM_CLOSED) {
313 0 : *aResult = 0;
314 0 : return NS_OK;
315 : }
316 0 : if (NS_FAILED(mStatus)) {
317 0 : return mStatus;
318 : }
319 :
320 0 : NS_ASSERTION(aWriter, "missing aWriter");
321 :
322 0 : nsresult rv = NS_OK;
323 0 : ReadSegmentsState state;
324 0 : state.mThisStream = static_cast<nsIMultiplexInputStream*>(this);
325 0 : state.mOffset = 0;
326 0 : state.mWriter = aWriter;
327 0 : state.mClosure = aClosure;
328 0 : state.mDone = false;
329 :
330 0 : uint32_t len = mStreams.Length();
331 0 : while (mCurrentStream < len && aCount) {
332 : uint32_t read;
333 0 : rv = mStreams[mCurrentStream]->ReadSegments(ReadSegCb, &state, aCount, &read);
334 :
335 : // XXX some streams return NS_BASE_STREAM_CLOSED to indicate EOF.
336 : // (This is a bug in those stream implementations)
337 0 : if (rv == NS_BASE_STREAM_CLOSED) {
338 0 : NS_NOTREACHED("Input stream's Read method returned NS_BASE_STREAM_CLOSED");
339 0 : rv = NS_OK;
340 0 : read = 0;
341 : }
342 :
343 : // if |aWriter| decided to stop reading segments...
344 0 : if (state.mDone || NS_FAILED(rv)) {
345 0 : break;
346 : }
347 :
348 : // if stream is empty, then advance to the next stream.
349 0 : if (read == 0) {
350 0 : ++mCurrentStream;
351 0 : mStartedReadingCurrent = false;
352 : } else {
353 0 : NS_ASSERTION(aCount >= read, "Read more than requested");
354 0 : state.mOffset += read;
355 0 : aCount -= read;
356 0 : mStartedReadingCurrent = true;
357 : }
358 : }
359 :
360 : // if we successfully read some data, then this call succeeded.
361 0 : *aResult = state.mOffset;
362 0 : return state.mOffset ? NS_OK : rv;
363 : }
364 :
365 : nsresult
366 0 : nsMultiplexInputStream::ReadSegCb(nsIInputStream* aIn, void* aClosure,
367 : const char* aFromRawSegment,
368 : uint32_t aToOffset, uint32_t aCount,
369 : uint32_t* aWriteCount)
370 : {
371 : nsresult rv;
372 0 : ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
373 0 : rv = (state->mWriter)(state->mThisStream,
374 : state->mClosure,
375 : aFromRawSegment,
376 0 : aToOffset + state->mOffset,
377 : aCount,
378 0 : aWriteCount);
379 0 : if (NS_FAILED(rv)) {
380 0 : state->mDone = true;
381 : }
382 0 : return rv;
383 : }
384 :
385 : NS_IMETHODIMP
386 0 : nsMultiplexInputStream::IsNonBlocking(bool* aNonBlocking)
387 : {
388 0 : MutexAutoLock lock(mLock);
389 :
390 0 : uint32_t len = mStreams.Length();
391 0 : if (len == 0) {
392 : // Claim to be non-blocking, since we won't block the caller.
393 : // On the other hand we'll never return NS_BASE_STREAM_WOULD_BLOCK,
394 : // so maybe we should claim to be blocking? It probably doesn't
395 : // matter in practice.
396 0 : *aNonBlocking = true;
397 0 : return NS_OK;
398 : }
399 0 : for (uint32_t i = 0; i < len; ++i) {
400 0 : nsresult rv = mStreams[i]->IsNonBlocking(aNonBlocking);
401 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
402 0 : return rv;
403 : }
404 : // If one is non-blocking the entire stream becomes non-blocking
405 : // (except that we don't implement nsIAsyncInputStream, so there's
406 : // not much for the caller to do if Read returns "would block")
407 0 : if (*aNonBlocking) {
408 0 : return NS_OK;
409 : }
410 : }
411 0 : return NS_OK;
412 : }
413 :
414 : NS_IMETHODIMP
415 0 : nsMultiplexInputStream::Seek(int32_t aWhence, int64_t aOffset)
416 : {
417 0 : MutexAutoLock lock(mLock);
418 :
419 0 : if (NS_FAILED(mStatus)) {
420 0 : return mStatus;
421 : }
422 :
423 : nsresult rv;
424 :
425 0 : uint32_t oldCurrentStream = mCurrentStream;
426 0 : bool oldStartedReadingCurrent = mStartedReadingCurrent;
427 :
428 0 : if (aWhence == NS_SEEK_SET) {
429 0 : int64_t remaining = aOffset;
430 0 : if (aOffset == 0) {
431 0 : mCurrentStream = 0;
432 : }
433 0 : for (uint32_t i = 0; i < mStreams.Length(); ++i) {
434 : nsCOMPtr<nsISeekableStream> stream =
435 0 : do_QueryInterface(mStreams[i]);
436 0 : if (!stream) {
437 0 : return NS_ERROR_FAILURE;
438 : }
439 :
440 : // See if all remaining streams should be rewound
441 0 : if (remaining == 0) {
442 0 : if (i < oldCurrentStream ||
443 0 : (i == oldCurrentStream && oldStartedReadingCurrent)) {
444 0 : rv = stream->Seek(NS_SEEK_SET, 0);
445 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
446 0 : return rv;
447 : }
448 0 : continue;
449 : } else {
450 : break;
451 : }
452 : }
453 :
454 : // Get position in current stream
455 : int64_t streamPos;
456 0 : if (i > oldCurrentStream ||
457 0 : (i == oldCurrentStream && !oldStartedReadingCurrent)) {
458 0 : streamPos = 0;
459 : } else {
460 0 : rv = TellMaybeSeek(stream, &streamPos);
461 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
462 0 : return rv;
463 : }
464 : }
465 :
466 : // See if we need to seek current stream forward or backward
467 0 : if (remaining < streamPos) {
468 0 : rv = stream->Seek(NS_SEEK_SET, remaining);
469 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
470 0 : return rv;
471 : }
472 :
473 0 : mCurrentStream = i;
474 0 : mStartedReadingCurrent = remaining != 0;
475 :
476 0 : remaining = 0;
477 0 : } else if (remaining > streamPos) {
478 0 : if (i < oldCurrentStream) {
479 : // We're already at end so no need to seek this stream
480 0 : remaining -= streamPos;
481 0 : NS_ASSERTION(remaining >= 0, "Remaining invalid");
482 : } else {
483 : uint64_t avail;
484 0 : rv = AvailableMaybeSeek(mStreams[i], &avail);
485 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
486 0 : return rv;
487 : }
488 :
489 0 : int64_t newPos = XPCOM_MIN(remaining, streamPos + (int64_t)avail);
490 :
491 0 : rv = stream->Seek(NS_SEEK_SET, newPos);
492 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
493 0 : return rv;
494 : }
495 :
496 0 : mCurrentStream = i;
497 0 : mStartedReadingCurrent = true;
498 :
499 0 : remaining -= newPos;
500 0 : NS_ASSERTION(remaining >= 0, "Remaining invalid");
501 : }
502 : } else {
503 0 : NS_ASSERTION(remaining == streamPos, "Huh?");
504 0 : MOZ_ASSERT(remaining != 0, "Zero remaining should be handled earlier");
505 0 : remaining = 0;
506 0 : mCurrentStream = i;
507 0 : mStartedReadingCurrent = true;
508 : }
509 : }
510 :
511 0 : return NS_OK;
512 : }
513 :
514 0 : if (aWhence == NS_SEEK_CUR && aOffset > 0) {
515 0 : int64_t remaining = aOffset;
516 0 : for (uint32_t i = mCurrentStream; remaining && i < mStreams.Length(); ++i) {
517 : nsCOMPtr<nsISeekableStream> stream =
518 0 : do_QueryInterface(mStreams[i]);
519 :
520 : uint64_t avail;
521 0 : rv = AvailableMaybeSeek(mStreams[i], &avail);
522 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
523 0 : return rv;
524 : }
525 :
526 0 : int64_t seek = XPCOM_MIN((int64_t)avail, remaining);
527 :
528 0 : rv = stream->Seek(NS_SEEK_CUR, seek);
529 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
530 0 : return rv;
531 : }
532 :
533 0 : mCurrentStream = i;
534 0 : mStartedReadingCurrent = true;
535 :
536 0 : remaining -= seek;
537 : }
538 :
539 0 : return NS_OK;
540 : }
541 :
542 0 : if (aWhence == NS_SEEK_CUR && aOffset < 0) {
543 0 : int64_t remaining = -aOffset;
544 0 : for (uint32_t i = mCurrentStream; remaining && i != (uint32_t)-1; --i) {
545 : nsCOMPtr<nsISeekableStream> stream =
546 0 : do_QueryInterface(mStreams[i]);
547 :
548 : int64_t pos;
549 0 : rv = TellMaybeSeek(stream, &pos);
550 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
551 0 : return rv;
552 : }
553 :
554 0 : int64_t seek = XPCOM_MIN(pos, remaining);
555 :
556 0 : rv = stream->Seek(NS_SEEK_CUR, -seek);
557 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
558 0 : return rv;
559 : }
560 :
561 0 : mCurrentStream = i;
562 0 : mStartedReadingCurrent = seek != -pos;
563 :
564 0 : remaining -= seek;
565 : }
566 :
567 0 : return NS_OK;
568 : }
569 :
570 0 : if (aWhence == NS_SEEK_CUR) {
571 0 : NS_ASSERTION(aOffset == 0, "Should have handled all non-zero values");
572 :
573 0 : return NS_OK;
574 : }
575 :
576 0 : if (aWhence == NS_SEEK_END) {
577 0 : if (aOffset > 0) {
578 0 : return NS_ERROR_INVALID_ARG;
579 : }
580 0 : int64_t remaining = aOffset;
581 0 : for (uint32_t i = mStreams.Length() - 1; i != (uint32_t)-1; --i) {
582 : nsCOMPtr<nsISeekableStream> stream =
583 0 : do_QueryInterface(mStreams[i]);
584 :
585 : // See if all remaining streams should be seeked to end
586 0 : if (remaining == 0) {
587 0 : if (i >= oldCurrentStream) {
588 0 : rv = stream->Seek(NS_SEEK_END, 0);
589 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
590 0 : return rv;
591 : }
592 : } else {
593 0 : break;
594 : }
595 : }
596 :
597 : // Get position in current stream
598 : int64_t streamPos;
599 0 : if (i < oldCurrentStream) {
600 0 : streamPos = 0;
601 : } else {
602 : uint64_t avail;
603 0 : rv = AvailableMaybeSeek(mStreams[i], &avail);
604 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
605 0 : return rv;
606 : }
607 :
608 0 : streamPos = avail;
609 : }
610 :
611 : // See if we have enough data in the current stream.
612 0 : if (DeprecatedAbs(remaining) < streamPos) {
613 0 : rv = stream->Seek(NS_SEEK_END, remaining);
614 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
615 0 : return rv;
616 : }
617 :
618 0 : mCurrentStream = i;
619 0 : mStartedReadingCurrent = true;
620 :
621 0 : remaining = 0;
622 0 : } else if (DeprecatedAbs(remaining) > streamPos) {
623 0 : if (i > oldCurrentStream ||
624 0 : (i == oldCurrentStream && !oldStartedReadingCurrent)) {
625 : // We're already at start so no need to seek this stream
626 0 : remaining += streamPos;
627 : } else {
628 : int64_t avail;
629 0 : rv = TellMaybeSeek(stream, &avail);
630 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
631 0 : return rv;
632 : }
633 :
634 0 : int64_t newPos = streamPos + XPCOM_MIN(avail, DeprecatedAbs(remaining));
635 :
636 0 : rv = stream->Seek(NS_SEEK_END, -newPos);
637 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
638 0 : return rv;
639 : }
640 :
641 0 : mCurrentStream = i;
642 0 : mStartedReadingCurrent = true;
643 :
644 0 : remaining += newPos;
645 : }
646 : } else {
647 0 : NS_ASSERTION(remaining == streamPos, "Huh?");
648 0 : remaining = 0;
649 : }
650 : }
651 :
652 0 : return NS_OK;
653 : }
654 :
655 : // other Seeks not implemented yet
656 0 : return NS_ERROR_NOT_IMPLEMENTED;
657 : }
658 :
659 : NS_IMETHODIMP
660 0 : nsMultiplexInputStream::Tell(int64_t* aResult)
661 : {
662 0 : MutexAutoLock lock(mLock);
663 :
664 0 : if (NS_FAILED(mStatus)) {
665 0 : return mStatus;
666 : }
667 :
668 : nsresult rv;
669 0 : int64_t ret64 = 0;
670 : uint32_t i, last;
671 0 : last = mStartedReadingCurrent ? mCurrentStream + 1 : mCurrentStream;
672 0 : for (i = 0; i < last; ++i) {
673 0 : nsCOMPtr<nsISeekableStream> stream = do_QueryInterface(mStreams[i]);
674 0 : if (NS_WARN_IF(!stream)) {
675 0 : return NS_ERROR_NO_INTERFACE;
676 : }
677 :
678 : int64_t pos;
679 0 : rv = TellMaybeSeek(stream, &pos);
680 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
681 0 : return rv;
682 : }
683 0 : ret64 += pos;
684 : }
685 0 : *aResult = ret64;
686 :
687 0 : return NS_OK;
688 : }
689 :
690 : NS_IMETHODIMP
691 0 : nsMultiplexInputStream::SetEOF()
692 : {
693 0 : return NS_ERROR_NOT_IMPLEMENTED;
694 : }
695 :
696 : NS_IMETHODIMP
697 0 : nsMultiplexInputStream::CloseWithStatus(nsresult aStatus)
698 : {
699 0 : return Close();
700 : }
701 :
702 : // This class is used to inform nsMultiplexInputStream that it's time to execute
703 : // the asyncWait callback.
704 0 : class AsyncWaitRunnable final : public Runnable
705 : {
706 : RefPtr<nsMultiplexInputStream> mStream;
707 :
708 : public:
709 0 : explicit AsyncWaitRunnable(nsMultiplexInputStream* aStream)
710 0 : : Runnable("AsyncWaitRunnable")
711 0 : , mStream(aStream)
712 : {
713 0 : MOZ_ASSERT(aStream);
714 0 : }
715 :
716 : NS_IMETHOD
717 0 : Run() override
718 : {
719 0 : mStream->AsyncWaitCompleted();
720 0 : return NS_OK;
721 : }
722 : };
723 :
724 : // This helper class processes an array of nsIAsyncInputStreams, calling
725 : // AsyncWait() for each one of them. When all of them have answered, this helper
726 : // dispatches a AsyncWaitRunnable object. If there is an error calling
727 : // AsyncWait(), AsyncWaitRunnable is not dispatched.
728 : class AsyncStreamHelper final : public nsIInputStreamCallback
729 : {
730 : public:
731 : NS_DECL_THREADSAFE_ISUPPORTS
732 :
733 : static nsresult
734 0 : Process(nsMultiplexInputStream* aStream,
735 : nsTArray<nsCOMPtr<nsIAsyncInputStream>>& aAsyncStreams,
736 : uint32_t aFlags, uint32_t aRequestedCount,
737 : nsIEventTarget* aEventTarget)
738 : {
739 0 : MOZ_ASSERT(aStream);
740 0 : MOZ_ASSERT(!aAsyncStreams.IsEmpty());
741 0 : MOZ_ASSERT(aEventTarget);
742 :
743 : RefPtr<AsyncStreamHelper> helper =
744 0 : new AsyncStreamHelper(aStream, aAsyncStreams, aEventTarget);
745 0 : return helper->Run(aFlags, aRequestedCount);
746 : }
747 :
748 : private:
749 0 : AsyncStreamHelper(nsMultiplexInputStream* aStream,
750 : nsTArray<nsCOMPtr<nsIAsyncInputStream>>& aAsyncStreams,
751 : nsIEventTarget* aEventTarget)
752 0 : : mMutex("AsyncStreamHelper::mMutex")
753 : , mStream(aStream)
754 : , mEventTarget(aEventTarget)
755 0 : , mValid(true)
756 : {
757 0 : mPendingStreams.SwapElements(aAsyncStreams);
758 0 : }
759 :
760 0 : ~AsyncStreamHelper() = default;
761 :
762 : nsresult
763 0 : Run(uint32_t aFlags, uint32_t aRequestedCount)
764 : {
765 0 : MutexAutoLock lock(mMutex);
766 :
767 0 : for (uint32_t i = 0; i < mPendingStreams.Length(); ++i) {
768 : nsresult rv =
769 0 : mPendingStreams[i]->AsyncWait(this, aFlags, aRequestedCount,
770 0 : mEventTarget);
771 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
772 0 : mValid = false;
773 0 : return rv;
774 : }
775 : }
776 :
777 0 : return NS_OK;
778 : }
779 :
780 : NS_IMETHOD
781 0 : OnInputStreamReady(nsIAsyncInputStream* aStream) override
782 : {
783 0 : MOZ_ASSERT(aStream, "This cannot be one of ours.");
784 :
785 0 : MutexAutoLock lock(mMutex);
786 :
787 : // We failed during the Run().
788 0 : if (!mValid) {
789 0 : return NS_OK;
790 : }
791 :
792 0 : MOZ_ASSERT(mPendingStreams.Contains(aStream));
793 0 : mPendingStreams.RemoveElement(aStream);
794 :
795 : // The last asyncStream answered. We can inform nsMultiplexInputStream.
796 0 : if (mPendingStreams.IsEmpty()) {
797 0 : RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(mStream);
798 0 : return mEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
799 : }
800 :
801 0 : return NS_OK;
802 : }
803 :
804 : Mutex mMutex;
805 : RefPtr<nsMultiplexInputStream> mStream;
806 : nsTArray<nsCOMPtr<nsIAsyncInputStream>> mPendingStreams;
807 : nsCOMPtr<nsIEventTarget> mEventTarget;
808 : bool mValid;
809 : };
810 :
811 0 : NS_IMPL_ISUPPORTS(AsyncStreamHelper, nsIInputStreamCallback)
812 :
813 : NS_IMETHODIMP
814 0 : nsMultiplexInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
815 : uint32_t aFlags,
816 : uint32_t aRequestedCount,
817 : nsIEventTarget* aEventTarget)
818 : {
819 : // When AsyncWait() is called, it's better to call AsyncWait() to any sub
820 : // stream if they are valid nsIAsyncInputStream instances. In this way, when
821 : // they all call OnInputStreamReady(), we can proceed with the Read().
822 :
823 0 : MutexAutoLock lock(mLock);
824 :
825 0 : if (NS_FAILED(mStatus)) {
826 0 : return mStatus;
827 : }
828 :
829 0 : if (mAsyncWaitCallback && aCallback) {
830 0 : return NS_ERROR_FAILURE;
831 : }
832 :
833 0 : mAsyncWaitCallback = aCallback;
834 :
835 0 : if (!mAsyncWaitCallback) {
836 0 : return NS_OK;
837 : }
838 :
839 0 : nsTArray<nsCOMPtr<nsIAsyncInputStream>> asyncStreams;
840 0 : for (uint32_t i = mCurrentStream; i < mStreams.Length(); ++i) {
841 : nsCOMPtr<nsIAsyncInputStream> asyncStream =
842 0 : do_QueryInterface(mStreams.SafeElementAt(i, nullptr));
843 0 : if (asyncStream) {
844 0 : asyncStreams.AppendElement(asyncStream);
845 : }
846 : }
847 :
848 0 : if (!aEventTarget) {
849 0 : aEventTarget = SystemGroup::EventTargetFor(TaskCategory::Other);
850 : }
851 :
852 0 : if (asyncStreams.IsEmpty()) {
853 0 : RefPtr<AsyncWaitRunnable> runnable = new AsyncWaitRunnable(this);
854 0 : return aEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
855 : }
856 :
857 : return AsyncStreamHelper::Process(this, asyncStreams, aFlags, aRequestedCount,
858 0 : aEventTarget);
859 : }
860 :
861 : void
862 0 : nsMultiplexInputStream::AsyncWaitCompleted()
863 : {
864 0 : nsCOMPtr<nsIInputStreamCallback> callback;
865 :
866 : {
867 0 : MutexAutoLock lock(mLock);
868 :
869 : // The callback has been nullified in the meantime.
870 0 : if (!mAsyncWaitCallback) {
871 0 : return;
872 : }
873 :
874 0 : mAsyncWaitCallback.swap(callback);
875 : }
876 :
877 0 : callback->OnInputStreamReady(this);
878 : }
879 :
880 : nsresult
881 1 : nsMultiplexInputStreamConstructor(nsISupports* aOuter,
882 : REFNSIID aIID,
883 : void** aResult)
884 : {
885 1 : *aResult = nullptr;
886 :
887 1 : if (aOuter) {
888 0 : return NS_ERROR_NO_AGGREGATION;
889 : }
890 :
891 2 : RefPtr<nsMultiplexInputStream> inst = new nsMultiplexInputStream();
892 :
893 1 : return inst->QueryInterface(aIID, aResult);
894 : }
895 :
896 : void
897 0 : nsMultiplexInputStream::Serialize(InputStreamParams& aParams,
898 : FileDescriptorArray& aFileDescriptors)
899 : {
900 0 : MutexAutoLock lock(mLock);
901 :
902 0 : MultiplexInputStreamParams params;
903 :
904 0 : uint32_t streamCount = mStreams.Length();
905 :
906 0 : if (streamCount) {
907 0 : InfallibleTArray<InputStreamParams>& streams = params.streams();
908 :
909 0 : streams.SetCapacity(streamCount);
910 0 : for (uint32_t index = 0; index < streamCount; index++) {
911 0 : InputStreamParams childStreamParams;
912 0 : InputStreamHelper::SerializeInputStream(mStreams[index],
913 : childStreamParams,
914 0 : aFileDescriptors);
915 :
916 0 : streams.AppendElement(childStreamParams);
917 : }
918 : }
919 :
920 0 : params.currentStream() = mCurrentStream;
921 0 : params.status() = mStatus;
922 0 : params.startedReadingCurrent() = mStartedReadingCurrent;
923 :
924 0 : aParams = params;
925 0 : }
926 :
927 : bool
928 0 : nsMultiplexInputStream::Deserialize(const InputStreamParams& aParams,
929 : const FileDescriptorArray& aFileDescriptors)
930 : {
931 0 : if (aParams.type() !=
932 : InputStreamParams::TMultiplexInputStreamParams) {
933 0 : NS_ERROR("Received unknown parameters from the other process!");
934 0 : return false;
935 : }
936 :
937 : const MultiplexInputStreamParams& params =
938 0 : aParams.get_MultiplexInputStreamParams();
939 :
940 0 : const InfallibleTArray<InputStreamParams>& streams = params.streams();
941 :
942 0 : uint32_t streamCount = streams.Length();
943 0 : for (uint32_t index = 0; index < streamCount; index++) {
944 : nsCOMPtr<nsIInputStream> stream =
945 0 : InputStreamHelper::DeserializeInputStream(streams[index],
946 0 : aFileDescriptors);
947 0 : if (!stream) {
948 0 : NS_WARNING("Deserialize failed!");
949 0 : return false;
950 : }
951 :
952 0 : if (NS_FAILED(AppendStream(stream))) {
953 0 : NS_WARNING("AppendStream failed!");
954 0 : return false;
955 : }
956 : }
957 :
958 0 : mCurrentStream = params.currentStream();
959 0 : mStatus = params.status();
960 0 : mStartedReadingCurrent = params.startedReadingCurrent();
961 :
962 0 : return true;
963 : }
964 :
965 : Maybe<uint64_t>
966 0 : nsMultiplexInputStream::ExpectedSerializedLength()
967 : {
968 0 : MutexAutoLock lock(mLock);
969 :
970 0 : bool lengthValueExists = false;
971 0 : uint64_t expectedLength = 0;
972 0 : uint32_t streamCount = mStreams.Length();
973 0 : for (uint32_t index = 0; index < streamCount; index++) {
974 0 : nsCOMPtr<nsIIPCSerializableInputStream> stream = do_QueryInterface(mStreams[index]);
975 0 : if (!stream) {
976 0 : continue;
977 : }
978 0 : Maybe<uint64_t> length = stream->ExpectedSerializedLength();
979 0 : if (length.isNothing()) {
980 0 : continue;
981 : }
982 0 : lengthValueExists = true;
983 0 : expectedLength += length.value();
984 : }
985 0 : return lengthValueExists ? Some(expectedLength) : Nothing();
986 : }
987 :
988 : NS_IMETHODIMP
989 0 : nsMultiplexInputStream::GetCloneable(bool* aCloneable)
990 : {
991 0 : MutexAutoLock lock(mLock);
992 : //XXXnsm Cloning a multiplex stream which has started reading is not permitted
993 : //right now.
994 0 : if (mCurrentStream > 0 || mStartedReadingCurrent) {
995 0 : *aCloneable = false;
996 0 : return NS_OK;
997 : }
998 :
999 0 : uint32_t len = mStreams.Length();
1000 0 : for (uint32_t i = 0; i < len; ++i) {
1001 0 : nsCOMPtr<nsICloneableInputStream> cis = do_QueryInterface(mStreams[i]);
1002 0 : if (!cis || !cis->GetCloneable()) {
1003 0 : *aCloneable = false;
1004 0 : return NS_OK;
1005 : }
1006 : }
1007 :
1008 0 : *aCloneable = true;
1009 0 : return NS_OK;
1010 : }
1011 :
1012 : NS_IMETHODIMP
1013 0 : nsMultiplexInputStream::Clone(nsIInputStream** aClone)
1014 : {
1015 0 : MutexAutoLock lock(mLock);
1016 :
1017 : //XXXnsm Cloning a multiplex stream which has started reading is not permitted
1018 : //right now.
1019 0 : if (mCurrentStream > 0 || mStartedReadingCurrent) {
1020 0 : return NS_ERROR_FAILURE;
1021 : }
1022 :
1023 0 : nsCOMPtr<nsIMultiplexInputStream> clone = new nsMultiplexInputStream();
1024 :
1025 : nsresult rv;
1026 0 : uint32_t len = mStreams.Length();
1027 0 : for (uint32_t i = 0; i < len; ++i) {
1028 0 : nsCOMPtr<nsICloneableInputStream> substream = do_QueryInterface(mStreams[i]);
1029 0 : if (NS_WARN_IF(!substream)) {
1030 0 : return NS_ERROR_FAILURE;
1031 : }
1032 :
1033 0 : nsCOMPtr<nsIInputStream> clonedSubstream;
1034 0 : rv = substream->Clone(getter_AddRefs(clonedSubstream));
1035 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
1036 0 : return rv;
1037 : }
1038 :
1039 0 : rv = clone->AppendStream(clonedSubstream);
1040 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
1041 0 : return rv;
1042 : }
1043 : }
1044 :
1045 0 : clone.forget(aClone);
1046 0 : return NS_OK;
1047 : }
1048 :
1049 : bool
1050 3 : nsMultiplexInputStream::IsSeekable() const
1051 : {
1052 9 : for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1053 12 : nsCOMPtr<nsISeekableStream> substream = do_QueryInterface(mStreams[i]);
1054 6 : if (!substream) {
1055 0 : return false;
1056 : }
1057 : }
1058 3 : return true;
1059 : }
1060 :
1061 : bool
1062 3 : nsMultiplexInputStream::IsIPCSerializable() const
1063 : {
1064 9 : for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1065 12 : nsCOMPtr<nsIIPCSerializableInputStream> substream = do_QueryInterface(mStreams[i]);
1066 6 : if (!substream) {
1067 0 : return false;
1068 : }
1069 : }
1070 3 : return true;
1071 : }
1072 :
1073 : bool
1074 2 : nsMultiplexInputStream::IsCloneable() const
1075 : {
1076 6 : for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1077 8 : nsCOMPtr<nsICloneableInputStream> substream = do_QueryInterface(mStreams[i]);
1078 4 : if (!substream) {
1079 0 : return false;
1080 : }
1081 : }
1082 2 : return true;
1083 : }
1084 :
1085 : bool
1086 2 : nsMultiplexInputStream::IsAsyncInputStream() const
1087 : {
1088 : // nsMultiplexInputStream is nsIAsyncInputStream if at least 1 of the
1089 : // substream implements that interface.
1090 6 : for (uint32_t i = 0, len = mStreams.Length(); i < len; ++i) {
1091 8 : nsCOMPtr<nsIAsyncInputStream> substream = do_QueryInterface(mStreams[i]);
1092 4 : if (substream) {
1093 0 : return true;
1094 : }
1095 : }
1096 2 : return false;
1097 : }
|