Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "mozilla/Mutex.h"
8 : #include "mozilla/Attributes.h"
9 : #include "nsStreamUtils.h"
10 : #include "nsAutoPtr.h"
11 : #include "nsCOMPtr.h"
12 : #include "nsIPipe.h"
13 : #include "nsICloneableInputStream.h"
14 : #include "nsIEventTarget.h"
15 : #include "nsICancelableRunnable.h"
16 : #include "nsISafeOutputStream.h"
17 : #include "nsString.h"
18 : #include "nsIAsyncInputStream.h"
19 : #include "nsIAsyncOutputStream.h"
20 : #include "nsIBufferedStreams.h"
21 : #include "nsNetCID.h"
22 : #include "nsServiceManagerUtils.h"
23 : #include "nsThreadUtils.h"
24 :
25 : using namespace mozilla;
26 :
27 : //-----------------------------------------------------------------------------
28 :
29 : // This is a nsICancelableRunnable because we can dispatch it to Workers and
30 : // those can be shut down at any time, and in these cases, Cancel() is called
31 : // instead of Run().
32 : class nsInputStreamReadyEvent final
33 : : public CancelableRunnable
34 : , public nsIInputStreamCallback
35 : {
36 : public:
37 : NS_DECL_ISUPPORTS_INHERITED
38 :
39 81 : nsInputStreamReadyEvent(const char* aName,
40 : nsIInputStreamCallback* aCallback,
41 : nsIEventTarget* aTarget)
42 81 : : CancelableRunnable(aName)
43 : , mCallback(aCallback)
44 81 : , mTarget(aTarget)
45 : {
46 81 : }
47 :
48 : private:
49 162 : ~nsInputStreamReadyEvent()
50 162 : {
51 81 : if (!mCallback) {
52 81 : return;
53 : }
54 : //
55 : // whoa!! looks like we never posted this event. take care to
56 : // release mCallback on the correct thread. if mTarget lives on the
57 : // calling thread, then we are ok. otherwise, we have to try to
58 : // proxy the Release over the right thread. if that thread is dead,
59 : // then there's nothing we can do... better to leak than crash.
60 : //
61 : bool val;
62 0 : nsresult rv = mTarget->IsOnCurrentThread(&val);
63 0 : if (NS_FAILED(rv) || !val) {
64 : nsCOMPtr<nsIInputStreamCallback> event =
65 0 : NS_NewInputStreamReadyEvent("~nsInputStreamReadyEvent", mCallback, mTarget);
66 0 : mCallback = nullptr;
67 0 : if (event) {
68 0 : rv = event->OnInputStreamReady(nullptr);
69 0 : if (NS_FAILED(rv)) {
70 0 : NS_NOTREACHED("leaking stream event");
71 0 : nsISupports* sup = event;
72 0 : NS_ADDREF(sup);
73 : }
74 : }
75 : }
76 243 : }
77 :
78 : public:
79 81 : NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aStream) override
80 : {
81 81 : mStream = aStream;
82 :
83 : nsresult rv =
84 81 : mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
85 81 : if (NS_FAILED(rv)) {
86 0 : NS_WARNING("Dispatch failed");
87 0 : return NS_ERROR_FAILURE;
88 : }
89 :
90 81 : return NS_OK;
91 : }
92 :
93 81 : NS_IMETHOD Run() override
94 : {
95 81 : if (mCallback) {
96 81 : if (mStream) {
97 81 : mCallback->OnInputStreamReady(mStream);
98 : }
99 81 : mCallback = nullptr;
100 : }
101 81 : return NS_OK;
102 : }
103 :
104 0 : nsresult Cancel() override
105 : {
106 0 : mCallback = nullptr;
107 0 : return NS_OK;
108 : }
109 :
110 : private:
111 : nsCOMPtr<nsIAsyncInputStream> mStream;
112 : nsCOMPtr<nsIInputStreamCallback> mCallback;
113 : nsCOMPtr<nsIEventTarget> mTarget;
114 : };
115 :
116 2417 : NS_IMPL_ISUPPORTS_INHERITED(nsInputStreamReadyEvent, CancelableRunnable,
117 : nsIInputStreamCallback)
118 :
119 : //-----------------------------------------------------------------------------
120 :
121 : // This is a nsICancelableRunnable because we can dispatch it to Workers and
122 : // those can be shut down at any time, and in these cases, Cancel() is called
123 : // instead of Run().
124 : class nsOutputStreamReadyEvent final
125 : : public CancelableRunnable
126 : , public nsIOutputStreamCallback
127 : {
128 : public:
129 : NS_DECL_ISUPPORTS_INHERITED
130 :
131 0 : nsOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
132 : nsIEventTarget* aTarget)
133 0 : : CancelableRunnable("nsOutputStreamReadyEvent")
134 : , mCallback(aCallback)
135 0 : , mTarget(aTarget)
136 : {
137 0 : }
138 :
139 : private:
140 0 : ~nsOutputStreamReadyEvent()
141 0 : {
142 0 : if (!mCallback) {
143 0 : return;
144 : }
145 : //
146 : // whoa!! looks like we never posted this event. take care to
147 : // release mCallback on the correct thread. if mTarget lives on the
148 : // calling thread, then we are ok. otherwise, we have to try to
149 : // proxy the Release over the right thread. if that thread is dead,
150 : // then there's nothing we can do... better to leak than crash.
151 : //
152 : bool val;
153 0 : nsresult rv = mTarget->IsOnCurrentThread(&val);
154 0 : if (NS_FAILED(rv) || !val) {
155 : nsCOMPtr<nsIOutputStreamCallback> event =
156 0 : NS_NewOutputStreamReadyEvent(mCallback, mTarget);
157 0 : mCallback = nullptr;
158 0 : if (event) {
159 0 : rv = event->OnOutputStreamReady(nullptr);
160 0 : if (NS_FAILED(rv)) {
161 0 : NS_NOTREACHED("leaking stream event");
162 0 : nsISupports* sup = event;
163 0 : NS_ADDREF(sup);
164 : }
165 : }
166 : }
167 0 : }
168 :
169 : public:
170 0 : NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aStream) override
171 : {
172 0 : mStream = aStream;
173 :
174 : nsresult rv =
175 0 : mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
176 0 : if (NS_FAILED(rv)) {
177 0 : NS_WARNING("PostEvent failed");
178 0 : return NS_ERROR_FAILURE;
179 : }
180 :
181 0 : return NS_OK;
182 : }
183 :
184 0 : NS_IMETHOD Run() override
185 : {
186 0 : if (mCallback) {
187 0 : if (mStream) {
188 0 : mCallback->OnOutputStreamReady(mStream);
189 : }
190 0 : mCallback = nullptr;
191 : }
192 0 : return NS_OK;
193 : }
194 :
195 0 : nsresult Cancel() override
196 : {
197 0 : mCallback = nullptr;
198 0 : return NS_OK;
199 : }
200 :
201 : private:
202 : nsCOMPtr<nsIAsyncOutputStream> mStream;
203 : nsCOMPtr<nsIOutputStreamCallback> mCallback;
204 : nsCOMPtr<nsIEventTarget> mTarget;
205 : };
206 :
207 0 : NS_IMPL_ISUPPORTS_INHERITED(nsOutputStreamReadyEvent, CancelableRunnable,
208 : nsIOutputStreamCallback)
209 :
210 : //-----------------------------------------------------------------------------
211 :
212 : already_AddRefed<nsIInputStreamCallback>
213 81 : NS_NewInputStreamReadyEvent(const char* aName,
214 : nsIInputStreamCallback* aCallback,
215 : nsIEventTarget* aTarget)
216 : {
217 81 : NS_ASSERTION(aCallback, "null callback");
218 81 : NS_ASSERTION(aTarget, "null target");
219 : RefPtr<nsInputStreamReadyEvent> ev =
220 162 : new nsInputStreamReadyEvent(aName, aCallback, aTarget);
221 162 : return ev.forget();
222 : }
223 :
224 : already_AddRefed<nsIOutputStreamCallback>
225 0 : NS_NewOutputStreamReadyEvent(nsIOutputStreamCallback* aCallback,
226 : nsIEventTarget* aTarget)
227 : {
228 0 : NS_ASSERTION(aCallback, "null callback");
229 0 : NS_ASSERTION(aTarget, "null target");
230 : RefPtr<nsOutputStreamReadyEvent> ev =
231 0 : new nsOutputStreamReadyEvent(aCallback, aTarget);
232 0 : return ev.forget();
233 : }
234 :
235 : //-----------------------------------------------------------------------------
236 : // NS_AsyncCopy implementation
237 :
238 : // abstract stream copier...
239 : class nsAStreamCopier
240 : : public nsIInputStreamCallback
241 : , public nsIOutputStreamCallback
242 : , public CancelableRunnable
243 : {
244 : public:
245 : NS_DECL_ISUPPORTS_INHERITED
246 :
247 66 : nsAStreamCopier()
248 66 : : CancelableRunnable("nsAStreamCopier")
249 : , mLock("nsAStreamCopier.mLock")
250 : , mCallback(nullptr)
251 : , mProgressCallback(nullptr)
252 : , mClosure(nullptr)
253 : , mChunkSize(0)
254 : , mEventInProcess(false)
255 : , mEventIsPending(false)
256 : , mCloseSource(true)
257 : , mCloseSink(true)
258 : , mCanceled(false)
259 66 : , mCancelStatus(NS_OK)
260 : {
261 66 : }
262 :
263 : // kick off the async copy...
264 66 : nsresult Start(nsIInputStream* aSource,
265 : nsIOutputStream* aSink,
266 : nsIEventTarget* aTarget,
267 : nsAsyncCopyCallbackFun aCallback,
268 : void* aClosure,
269 : uint32_t aChunksize,
270 : bool aCloseSource,
271 : bool aCloseSink,
272 : nsAsyncCopyProgressFun aProgressCallback)
273 : {
274 66 : mSource = aSource;
275 66 : mSink = aSink;
276 66 : mTarget = aTarget;
277 66 : mCallback = aCallback;
278 66 : mClosure = aClosure;
279 66 : mChunkSize = aChunksize;
280 66 : mCloseSource = aCloseSource;
281 66 : mCloseSink = aCloseSink;
282 66 : mProgressCallback = aProgressCallback;
283 :
284 66 : mAsyncSource = do_QueryInterface(mSource);
285 66 : mAsyncSink = do_QueryInterface(mSink);
286 :
287 66 : return PostContinuationEvent();
288 : }
289 :
290 : // implemented by subclasses, returns number of bytes copied and
291 : // sets source and sink condition before returning.
292 : virtual uint32_t DoCopy(nsresult* aSourceCondition,
293 : nsresult* aSinkCondition) = 0;
294 :
295 66 : void Process()
296 : {
297 66 : if (!mSource || !mSink) {
298 0 : return;
299 : }
300 :
301 : nsresult cancelStatus;
302 : bool canceled;
303 : {
304 132 : MutexAutoLock lock(mLock);
305 66 : canceled = mCanceled;
306 66 : cancelStatus = mCancelStatus;
307 : }
308 :
309 : // If the copy was canceled before Process() was even called, then
310 : // sourceCondition and sinkCondition should be set to error results to
311 : // ensure we don't call Finish() on a canceled nsISafeOutputStream.
312 66 : MOZ_ASSERT(NS_FAILED(cancelStatus) == canceled, "cancel needs an error");
313 66 : nsresult sourceCondition = cancelStatus;
314 66 : nsresult sinkCondition = cancelStatus;
315 :
316 : // Copy data from the source to the sink until we hit failure or have
317 : // copied all the data.
318 : for (;;) {
319 : // Note: copyFailed will be true if the source or the sink have
320 : // reported an error, or if we failed to write any bytes
321 : // because we have consumed all of our data.
322 81 : bool copyFailed = false;
323 81 : if (!canceled) {
324 81 : uint32_t n = DoCopy(&sourceCondition, &sinkCondition);
325 81 : if (n > 0 && mProgressCallback) {
326 0 : mProgressCallback(mClosure, n);
327 : }
328 177 : copyFailed = NS_FAILED(sourceCondition) ||
329 96 : NS_FAILED(sinkCondition) || n == 0;
330 :
331 162 : MutexAutoLock lock(mLock);
332 81 : canceled = mCanceled;
333 81 : cancelStatus = mCancelStatus;
334 : }
335 81 : if (copyFailed && !canceled) {
336 66 : if (sourceCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSource) {
337 : // need to wait for more data from source. while waiting for
338 : // more source data, be sure to observe failures on output end.
339 0 : mAsyncSource->AsyncWait(this, 0, 0, nullptr);
340 :
341 0 : if (mAsyncSink)
342 0 : mAsyncSink->AsyncWait(this,
343 : nsIAsyncOutputStream::WAIT_CLOSURE_ONLY,
344 0 : 0, nullptr);
345 0 : break;
346 66 : } else if (sinkCondition == NS_BASE_STREAM_WOULD_BLOCK && mAsyncSink) {
347 : // need to wait for more room in the sink. while waiting for
348 : // more room in the sink, be sure to observer failures on the
349 : // input end.
350 0 : mAsyncSink->AsyncWait(this, 0, 0, nullptr);
351 :
352 0 : if (mAsyncSource)
353 0 : mAsyncSource->AsyncWait(this,
354 : nsIAsyncInputStream::WAIT_CLOSURE_ONLY,
355 0 : 0, nullptr);
356 0 : break;
357 : }
358 : }
359 81 : if (copyFailed || canceled) {
360 66 : if (mCloseSource) {
361 : // close source
362 66 : if (mAsyncSource)
363 0 : mAsyncSource->CloseWithStatus(
364 0 : canceled ? cancelStatus : sinkCondition);
365 : else {
366 66 : mSource->Close();
367 : }
368 : }
369 66 : mAsyncSource = nullptr;
370 66 : mSource = nullptr;
371 :
372 66 : if (mCloseSink) {
373 : // close sink
374 66 : if (mAsyncSink)
375 132 : mAsyncSink->CloseWithStatus(
376 132 : canceled ? cancelStatus : sourceCondition);
377 : else {
378 : // If we have an nsISafeOutputStream, and our
379 : // sourceCondition and sinkCondition are not set to a
380 : // failure state, finish writing.
381 : nsCOMPtr<nsISafeOutputStream> sostream =
382 0 : do_QueryInterface(mSink);
383 0 : if (sostream && NS_SUCCEEDED(sourceCondition) &&
384 0 : NS_SUCCEEDED(sinkCondition)) {
385 0 : sostream->Finish();
386 : } else {
387 0 : mSink->Close();
388 : }
389 : }
390 : }
391 66 : mAsyncSink = nullptr;
392 66 : mSink = nullptr;
393 :
394 : // notify state complete...
395 66 : if (mCallback) {
396 : nsresult status;
397 0 : if (!canceled) {
398 0 : status = sourceCondition;
399 0 : if (NS_SUCCEEDED(status)) {
400 0 : status = sinkCondition;
401 : }
402 0 : if (status == NS_BASE_STREAM_CLOSED) {
403 0 : status = NS_OK;
404 : }
405 : } else {
406 0 : status = cancelStatus;
407 : }
408 0 : mCallback(mClosure, status);
409 : }
410 66 : break;
411 : }
412 15 : }
413 : }
414 :
415 0 : nsresult Cancel(nsresult aReason)
416 : {
417 0 : MutexAutoLock lock(mLock);
418 0 : if (mCanceled) {
419 0 : return NS_ERROR_FAILURE;
420 : }
421 :
422 0 : if (NS_SUCCEEDED(aReason)) {
423 0 : NS_WARNING("cancel with non-failure status code");
424 0 : aReason = NS_BASE_STREAM_CLOSED;
425 : }
426 :
427 0 : mCanceled = true;
428 0 : mCancelStatus = aReason;
429 0 : return NS_OK;
430 : }
431 :
432 0 : NS_IMETHOD OnInputStreamReady(nsIAsyncInputStream* aSource) override
433 : {
434 0 : PostContinuationEvent();
435 0 : return NS_OK;
436 : }
437 :
438 0 : NS_IMETHOD OnOutputStreamReady(nsIAsyncOutputStream* aSink) override
439 : {
440 0 : PostContinuationEvent();
441 0 : return NS_OK;
442 : }
443 :
444 : // continuation event handler
445 66 : NS_IMETHOD Run() override
446 : {
447 66 : Process();
448 :
449 : // clear "in process" flag and post any pending continuation event
450 132 : MutexAutoLock lock(mLock);
451 66 : mEventInProcess = false;
452 66 : if (mEventIsPending) {
453 0 : mEventIsPending = false;
454 0 : PostContinuationEvent_Locked();
455 : }
456 :
457 132 : return NS_OK;
458 : }
459 :
460 : nsresult Cancel() MOZ_MUST_OVERRIDE override = 0;
461 :
462 66 : nsresult PostContinuationEvent()
463 : {
464 : // we cannot post a continuation event if there is currently
465 : // an event in process. doing so could result in Process being
466 : // run simultaneously on multiple threads, so we mark the event
467 : // as pending, and if an event is already in process then we
468 : // just let that existing event take care of posting the real
469 : // continuation event.
470 :
471 132 : MutexAutoLock lock(mLock);
472 132 : return PostContinuationEvent_Locked();
473 : }
474 :
475 66 : nsresult PostContinuationEvent_Locked()
476 : {
477 66 : nsresult rv = NS_OK;
478 66 : if (mEventInProcess) {
479 0 : mEventIsPending = true;
480 : } else {
481 66 : rv = mTarget->Dispatch(this, NS_DISPATCH_NORMAL);
482 66 : if (NS_SUCCEEDED(rv)) {
483 66 : mEventInProcess = true;
484 : } else {
485 0 : NS_WARNING("unable to post continuation event");
486 : }
487 : }
488 66 : return rv;
489 : }
490 :
491 : protected:
492 : nsCOMPtr<nsIInputStream> mSource;
493 : nsCOMPtr<nsIOutputStream> mSink;
494 : nsCOMPtr<nsIAsyncInputStream> mAsyncSource;
495 : nsCOMPtr<nsIAsyncOutputStream> mAsyncSink;
496 : nsCOMPtr<nsIEventTarget> mTarget;
497 : Mutex mLock;
498 : nsAsyncCopyCallbackFun mCallback;
499 : nsAsyncCopyProgressFun mProgressCallback;
500 : void* mClosure;
501 : uint32_t mChunkSize;
502 : bool mEventInProcess;
503 : bool mEventIsPending;
504 : bool mCloseSource;
505 : bool mCloseSink;
506 : bool mCanceled;
507 : nsresult mCancelStatus;
508 :
509 : // virtual since subclasses call superclass Release()
510 66 : virtual ~nsAStreamCopier()
511 66 : {
512 66 : }
513 : };
514 :
515 858 : NS_IMPL_ISUPPORTS_INHERITED(nsAStreamCopier,
516 : CancelableRunnable,
517 : nsIInputStreamCallback,
518 : nsIOutputStreamCallback)
519 :
520 : class nsStreamCopierIB final : public nsAStreamCopier
521 : {
522 : public:
523 0 : nsStreamCopierIB() : nsAStreamCopier()
524 : {
525 0 : }
526 0 : virtual ~nsStreamCopierIB()
527 0 : {
528 0 : }
529 :
530 : struct MOZ_STACK_CLASS ReadSegmentsState
531 : {
532 : // the nsIOutputStream will outlive the ReadSegmentsState on the stack
533 : nsIOutputStream* MOZ_NON_OWNING_REF mSink;
534 : nsresult mSinkCondition;
535 : };
536 :
537 0 : static nsresult ConsumeInputBuffer(nsIInputStream* aInStr,
538 : void* aClosure,
539 : const char* aBuffer,
540 : uint32_t aOffset,
541 : uint32_t aCount,
542 : uint32_t* aCountWritten)
543 : {
544 0 : ReadSegmentsState* state = (ReadSegmentsState*)aClosure;
545 :
546 0 : nsresult rv = state->mSink->Write(aBuffer, aCount, aCountWritten);
547 0 : if (NS_FAILED(rv)) {
548 0 : state->mSinkCondition = rv;
549 0 : } else if (*aCountWritten == 0) {
550 0 : state->mSinkCondition = NS_BASE_STREAM_CLOSED;
551 : }
552 :
553 0 : return state->mSinkCondition;
554 : }
555 :
556 0 : uint32_t DoCopy(nsresult* aSourceCondition,
557 : nsresult* aSinkCondition) override
558 : {
559 : ReadSegmentsState state;
560 0 : state.mSink = mSink;
561 0 : state.mSinkCondition = NS_OK;
562 :
563 : uint32_t n;
564 0 : *aSourceCondition =
565 0 : mSource->ReadSegments(ConsumeInputBuffer, &state, mChunkSize, &n);
566 0 : *aSinkCondition = state.mSinkCondition;
567 0 : return n;
568 : }
569 :
570 0 : nsresult Cancel() override
571 : {
572 0 : return NS_OK;
573 : }
574 : };
575 :
576 : class nsStreamCopierOB final : public nsAStreamCopier
577 : {
578 : public:
579 66 : nsStreamCopierOB() : nsAStreamCopier()
580 : {
581 66 : }
582 132 : virtual ~nsStreamCopierOB()
583 66 : {
584 198 : }
585 :
586 : struct MOZ_STACK_CLASS WriteSegmentsState
587 : {
588 : // the nsIInputStream will outlive the WriteSegmentsState on the stack
589 : nsIInputStream* MOZ_NON_OWNING_REF mSource;
590 : nsresult mSourceCondition;
591 : };
592 :
593 144 : static nsresult FillOutputBuffer(nsIOutputStream* aOutStr,
594 : void* aClosure,
595 : char* aBuffer,
596 : uint32_t aOffset,
597 : uint32_t aCount,
598 : uint32_t* aCountRead)
599 : {
600 144 : WriteSegmentsState* state = (WriteSegmentsState*)aClosure;
601 :
602 144 : nsresult rv = state->mSource->Read(aBuffer, aCount, aCountRead);
603 144 : if (NS_FAILED(rv)) {
604 0 : state->mSourceCondition = rv;
605 144 : } else if (*aCountRead == 0) {
606 66 : state->mSourceCondition = NS_BASE_STREAM_CLOSED;
607 : }
608 :
609 144 : return state->mSourceCondition;
610 : }
611 :
612 81 : uint32_t DoCopy(nsresult* aSourceCondition,
613 : nsresult* aSinkCondition) override
614 : {
615 : WriteSegmentsState state;
616 81 : state.mSource = mSource;
617 81 : state.mSourceCondition = NS_OK;
618 :
619 : uint32_t n;
620 81 : *aSinkCondition =
621 81 : mSink->WriteSegments(FillOutputBuffer, &state, mChunkSize, &n);
622 81 : *aSourceCondition = state.mSourceCondition;
623 81 : return n;
624 : }
625 :
626 0 : nsresult Cancel() override
627 : {
628 0 : return NS_OK;
629 : }
630 : };
631 :
632 : //-----------------------------------------------------------------------------
633 :
634 : nsresult
635 66 : NS_AsyncCopy(nsIInputStream* aSource,
636 : nsIOutputStream* aSink,
637 : nsIEventTarget* aTarget,
638 : nsAsyncCopyMode aMode,
639 : uint32_t aChunkSize,
640 : nsAsyncCopyCallbackFun aCallback,
641 : void* aClosure,
642 : bool aCloseSource,
643 : bool aCloseSink,
644 : nsISupports** aCopierCtx,
645 : nsAsyncCopyProgressFun aProgressCallback)
646 : {
647 66 : NS_ASSERTION(aTarget, "non-null target required");
648 :
649 : nsresult rv;
650 : nsAStreamCopier* copier;
651 :
652 66 : if (aMode == NS_ASYNCCOPY_VIA_READSEGMENTS) {
653 0 : copier = new nsStreamCopierIB();
654 : } else {
655 66 : copier = new nsStreamCopierOB();
656 : }
657 :
658 : // Start() takes an owning ref to the copier...
659 66 : NS_ADDREF(copier);
660 66 : rv = copier->Start(aSource, aSink, aTarget, aCallback, aClosure, aChunkSize,
661 66 : aCloseSource, aCloseSink, aProgressCallback);
662 :
663 66 : if (aCopierCtx) {
664 0 : *aCopierCtx = static_cast<nsISupports*>(static_cast<nsIRunnable*>(copier));
665 0 : NS_ADDREF(*aCopierCtx);
666 : }
667 66 : NS_RELEASE(copier);
668 :
669 66 : return rv;
670 : }
671 :
672 : //-----------------------------------------------------------------------------
673 :
674 : nsresult
675 0 : NS_CancelAsyncCopy(nsISupports* aCopierCtx, nsresult aReason)
676 : {
677 : nsAStreamCopier* copier =
678 0 : static_cast<nsAStreamCopier*>(static_cast<nsIRunnable *>(aCopierCtx));
679 0 : return copier->Cancel(aReason);
680 : }
681 :
682 : //-----------------------------------------------------------------------------
683 :
684 : nsresult
685 4 : NS_ConsumeStream(nsIInputStream* aStream, uint32_t aMaxCount,
686 : nsACString& aResult)
687 : {
688 4 : nsresult rv = NS_OK;
689 4 : aResult.Truncate();
690 :
691 6 : while (aMaxCount) {
692 : uint64_t avail64;
693 4 : rv = aStream->Available(&avail64);
694 4 : if (NS_FAILED(rv)) {
695 0 : if (rv == NS_BASE_STREAM_CLOSED) {
696 0 : rv = NS_OK;
697 : }
698 3 : break;
699 : }
700 4 : if (avail64 == 0) {
701 3 : break;
702 : }
703 :
704 1 : uint32_t avail = (uint32_t)XPCOM_MIN<uint64_t>(avail64, aMaxCount);
705 :
706 : // resize aResult buffer
707 1 : uint32_t length = aResult.Length();
708 1 : if (avail > UINT32_MAX - length) {
709 0 : return NS_ERROR_FILE_TOO_BIG;
710 : }
711 :
712 1 : aResult.SetLength(length + avail);
713 1 : if (aResult.Length() != (length + avail)) {
714 0 : return NS_ERROR_OUT_OF_MEMORY;
715 : }
716 1 : char* buf = aResult.BeginWriting() + length;
717 :
718 : uint32_t n;
719 1 : rv = aStream->Read(buf, avail, &n);
720 1 : if (NS_FAILED(rv)) {
721 0 : break;
722 : }
723 1 : if (n != avail) {
724 0 : aResult.SetLength(length + n);
725 : }
726 1 : if (n == 0) {
727 0 : break;
728 : }
729 1 : aMaxCount -= n;
730 : }
731 :
732 4 : return rv;
733 : }
734 :
735 : //-----------------------------------------------------------------------------
736 :
737 : static nsresult
738 3 : TestInputStream(nsIInputStream* aInStr,
739 : void* aClosure,
740 : const char* aBuffer,
741 : uint32_t aOffset,
742 : uint32_t aCount,
743 : uint32_t* aCountWritten)
744 : {
745 3 : bool* result = static_cast<bool*>(aClosure);
746 3 : *result = true;
747 3 : return NS_ERROR_ABORT; // don't call me anymore
748 : }
749 :
750 : bool
751 140 : NS_InputStreamIsBuffered(nsIInputStream* aStream)
752 : {
753 280 : nsCOMPtr<nsIBufferedInputStream> bufferedIn = do_QueryInterface(aStream);
754 140 : if (bufferedIn) {
755 91 : return true;
756 : }
757 :
758 49 : bool result = false;
759 : uint32_t n;
760 49 : nsresult rv = aStream->ReadSegments(TestInputStream, &result, 1, &n);
761 49 : return result || NS_SUCCEEDED(rv);
762 : }
763 :
764 : static nsresult
765 0 : TestOutputStream(nsIOutputStream* aOutStr,
766 : void* aClosure,
767 : char* aBuffer,
768 : uint32_t aOffset,
769 : uint32_t aCount,
770 : uint32_t* aCountRead)
771 : {
772 0 : bool* result = static_cast<bool*>(aClosure);
773 0 : *result = true;
774 0 : return NS_ERROR_ABORT; // don't call me anymore
775 : }
776 :
777 : bool
778 0 : NS_OutputStreamIsBuffered(nsIOutputStream* aStream)
779 : {
780 0 : nsCOMPtr<nsIBufferedOutputStream> bufferedOut = do_QueryInterface(aStream);
781 0 : if (bufferedOut) {
782 0 : return true;
783 : }
784 :
785 0 : bool result = false;
786 : uint32_t n;
787 0 : aStream->WriteSegments(TestOutputStream, &result, 1, &n);
788 0 : return result;
789 : }
790 :
791 : //-----------------------------------------------------------------------------
792 :
793 : nsresult
794 1 : NS_CopySegmentToStream(nsIInputStream* aInStr,
795 : void* aClosure,
796 : const char* aBuffer,
797 : uint32_t aOffset,
798 : uint32_t aCount,
799 : uint32_t* aCountWritten)
800 : {
801 1 : nsIOutputStream* outStr = static_cast<nsIOutputStream*>(aClosure);
802 1 : *aCountWritten = 0;
803 3 : while (aCount) {
804 : uint32_t n;
805 1 : nsresult rv = outStr->Write(aBuffer, aCount, &n);
806 1 : if (NS_FAILED(rv)) {
807 0 : return rv;
808 : }
809 1 : aBuffer += n;
810 1 : aCount -= n;
811 1 : *aCountWritten += n;
812 : }
813 1 : return NS_OK;
814 : }
815 :
816 : nsresult
817 40097 : NS_CopySegmentToBuffer(nsIInputStream* aInStr,
818 : void* aClosure,
819 : const char* aBuffer,
820 : uint32_t aOffset,
821 : uint32_t aCount,
822 : uint32_t* aCountWritten)
823 : {
824 40097 : char* toBuf = static_cast<char*>(aClosure);
825 40097 : memcpy(&toBuf[aOffset], aBuffer, aCount);
826 40097 : *aCountWritten = aCount;
827 40097 : return NS_OK;
828 : }
829 :
830 : nsresult
831 0 : NS_CopySegmentToBuffer(nsIOutputStream* aOutStr,
832 : void* aClosure,
833 : char* aBuffer,
834 : uint32_t aOffset,
835 : uint32_t aCount,
836 : uint32_t* aCountRead)
837 : {
838 0 : const char* fromBuf = static_cast<const char*>(aClosure);
839 0 : memcpy(aBuffer, &fromBuf[aOffset], aCount);
840 0 : *aCountRead = aCount;
841 0 : return NS_OK;
842 : }
843 :
844 : nsresult
845 0 : NS_DiscardSegment(nsIInputStream* aInStr,
846 : void* aClosure,
847 : const char* aBuffer,
848 : uint32_t aOffset,
849 : uint32_t aCount,
850 : uint32_t* aCountWritten)
851 : {
852 0 : *aCountWritten = aCount;
853 0 : return NS_OK;
854 : }
855 :
856 : //-----------------------------------------------------------------------------
857 :
858 : nsresult
859 0 : NS_WriteSegmentThunk(nsIInputStream* aInStr,
860 : void* aClosure,
861 : const char* aBuffer,
862 : uint32_t aOffset,
863 : uint32_t aCount,
864 : uint32_t* aCountWritten)
865 : {
866 0 : nsWriteSegmentThunk* thunk = static_cast<nsWriteSegmentThunk*>(aClosure);
867 0 : return thunk->mFun(thunk->mStream, thunk->mClosure, aBuffer, aOffset, aCount,
868 0 : aCountWritten);
869 : }
870 :
871 : nsresult
872 53 : NS_FillArray(FallibleTArray<char>& aDest, nsIInputStream* aInput,
873 : uint32_t aKeep, uint32_t* aNewBytes)
874 : {
875 53 : MOZ_ASSERT(aInput, "null stream");
876 53 : MOZ_ASSERT(aKeep <= aDest.Length(), "illegal keep count");
877 :
878 53 : char* aBuffer = aDest.Elements();
879 53 : int64_t keepOffset = int64_t(aDest.Length()) - aKeep;
880 53 : if (aKeep != 0 && keepOffset > 0) {
881 0 : memmove(aBuffer, aBuffer + keepOffset, aKeep);
882 : }
883 :
884 : nsresult rv =
885 53 : aInput->Read(aBuffer + aKeep, aDest.Capacity() - aKeep, aNewBytes);
886 53 : if (NS_FAILED(rv)) {
887 0 : *aNewBytes = 0;
888 : }
889 : // NOTE: we rely on the fact that the new slots are NOT initialized by
890 : // SetLengthAndRetainStorage here, see nsTArrayElementTraits::Construct()
891 : // in nsTArray.h:
892 53 : aDest.SetLengthAndRetainStorage(aKeep + *aNewBytes);
893 :
894 53 : MOZ_ASSERT(aDest.Length() <= aDest.Capacity(), "buffer overflow");
895 53 : return rv;
896 : }
897 :
898 : bool
899 0 : NS_InputStreamIsCloneable(nsIInputStream* aSource)
900 : {
901 0 : if (!aSource) {
902 0 : return false;
903 : }
904 :
905 0 : nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
906 0 : return cloneable && cloneable->GetCloneable();
907 : }
908 :
909 : nsresult
910 0 : NS_CloneInputStream(nsIInputStream* aSource, nsIInputStream** aCloneOut,
911 : nsIInputStream** aReplacementOut)
912 : {
913 0 : if (NS_WARN_IF(!aSource)) {
914 0 : return NS_ERROR_FAILURE;
915 : }
916 :
917 : // Attempt to perform the clone directly on the source stream
918 0 : nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(aSource);
919 0 : if (cloneable && cloneable->GetCloneable()) {
920 0 : if (aReplacementOut) {
921 0 : *aReplacementOut = nullptr;
922 : }
923 0 : return cloneable->Clone(aCloneOut);
924 : }
925 :
926 : // If we failed the clone and the caller does not want to replace their
927 : // original stream, then we are done. Return error.
928 0 : if (!aReplacementOut) {
929 0 : return NS_ERROR_FAILURE;
930 : }
931 :
932 : // The caller has opted-in to the fallback clone support that replaces
933 : // the original stream. Copy the data to a pipe and return two cloned
934 : // input streams.
935 :
936 0 : nsCOMPtr<nsIInputStream> reader;
937 0 : nsCOMPtr<nsIInputStream> readerClone;
938 0 : nsCOMPtr<nsIOutputStream> writer;
939 :
940 0 : nsresult rv = NS_NewPipe(getter_AddRefs(reader), getter_AddRefs(writer),
941 : 0, 0, // default segment size and max size
942 0 : true, true); // non-blocking
943 0 : if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
944 :
945 0 : cloneable = do_QueryInterface(reader);
946 0 : MOZ_ASSERT(cloneable && cloneable->GetCloneable());
947 :
948 0 : rv = cloneable->Clone(getter_AddRefs(readerClone));
949 0 : if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
950 :
951 : nsCOMPtr<nsIEventTarget> target =
952 0 : do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
953 0 : if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
954 :
955 0 : rv = NS_AsyncCopy(aSource, writer, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS);
956 0 : if (NS_WARN_IF(NS_FAILED(rv))) { return rv; }
957 :
958 0 : readerClone.forget(aCloneOut);
959 0 : reader.forget(aReplacementOut);
960 :
961 0 : return NS_OK;
962 : }
|