Line data Source code
1 : /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /* vim:set ts=4 sts=4 sw=4 et cin: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "nsIOService.h"
8 : #include "nsInputStreamPump.h"
9 : #include "nsIStreamTransportService.h"
10 : #include "nsISeekableStream.h"
11 : #include "nsITransport.h"
12 : #include "nsIThreadRetargetableStreamListener.h"
13 : #include "nsThreadUtils.h"
14 : #include "nsCOMPtr.h"
15 : #include "mozilla/Logging.h"
16 : #include "GeckoProfiler.h"
17 : #include "nsIStreamListener.h"
18 : #include "nsILoadGroup.h"
19 : #include "nsNetCID.h"
20 : #include "nsStreamUtils.h"
21 : #include <algorithm>
22 :
23 : static NS_DEFINE_CID(kStreamTransportServiceCID, NS_STREAMTRANSPORTSERVICE_CID);
24 :
25 : //
26 : // MOZ_LOG=nsStreamPump:5
27 : //
28 : static mozilla::LazyLogModule gStreamPumpLog("nsStreamPump");
29 : #undef LOG
30 : #define LOG(args) MOZ_LOG(gStreamPumpLog, mozilla::LogLevel::Debug, args)
31 :
32 : //-----------------------------------------------------------------------------
33 : // nsInputStreamPump methods
34 : //-----------------------------------------------------------------------------
35 :
36 73 : nsInputStreamPump::nsInputStreamPump()
37 : : mState(STATE_IDLE)
38 : , mStreamOffset(0)
39 : , mStreamLength(UINT64_MAX)
40 : , mStatus(NS_OK)
41 : , mSuspendCount(0)
42 : , mLoadFlags(LOAD_NORMAL)
43 : , mProcessingCallbacks(false)
44 : , mWaitingForInputStreamReady(false)
45 : , mCloseWhenDone(false)
46 : , mRetargeting(false)
47 73 : , mMonitor("nsInputStreamPump")
48 : {
49 73 : }
50 :
51 73 : nsInputStreamPump::~nsInputStreamPump()
52 : {
53 73 : }
54 :
55 : nsresult
56 72 : nsInputStreamPump::Create(nsInputStreamPump **result,
57 : nsIInputStream *stream,
58 : int64_t streamPos,
59 : int64_t streamLen,
60 : uint32_t segsize,
61 : uint32_t segcount,
62 : bool closeWhenDone,
63 : nsIEventTarget *mainThreadTarget)
64 : {
65 72 : nsresult rv = NS_ERROR_OUT_OF_MEMORY;
66 144 : RefPtr<nsInputStreamPump> pump = new nsInputStreamPump();
67 72 : if (pump) {
68 72 : rv = pump->Init(stream, streamPos, streamLen,
69 72 : segsize, segcount, closeWhenDone, mainThreadTarget);
70 72 : if (NS_SUCCEEDED(rv)) {
71 72 : pump.forget(result);
72 : }
73 : }
74 144 : return rv;
75 : }
76 :
77 : struct PeekData {
78 3 : PeekData(nsInputStreamPump::PeekSegmentFun fun, void* closure)
79 3 : : mFunc(fun), mClosure(closure) {}
80 :
81 : nsInputStreamPump::PeekSegmentFun mFunc;
82 : void* mClosure;
83 : };
84 :
85 : static nsresult
86 3 : CallPeekFunc(nsIInputStream *aInStream, void *aClosure,
87 : const char *aFromSegment, uint32_t aToOffset, uint32_t aCount,
88 : uint32_t *aWriteCount)
89 : {
90 3 : NS_ASSERTION(aToOffset == 0, "Called more than once?");
91 3 : NS_ASSERTION(aCount > 0, "Called without data?");
92 :
93 3 : PeekData* data = static_cast<PeekData*>(aClosure);
94 3 : data->mFunc(data->mClosure,
95 3 : reinterpret_cast<const uint8_t*>(aFromSegment), aCount);
96 3 : return NS_BINDING_ABORTED;
97 : }
98 :
99 : nsresult
100 6 : nsInputStreamPump::PeekStream(PeekSegmentFun callback, void* closure)
101 : {
102 12 : ReentrantMonitorAutoEnter mon(mMonitor);
103 :
104 6 : NS_ASSERTION(mAsyncStream, "PeekStream called without stream");
105 :
106 6 : nsresult rv = CreateBufferedStreamIfNeeded();
107 6 : NS_ENSURE_SUCCESS(rv, rv);
108 :
109 : // See if the pipe is closed by checking the return of Available.
110 : uint64_t dummy64;
111 6 : rv = mBufferedStream->Available(&dummy64);
112 6 : if (NS_FAILED(rv))
113 3 : return rv;
114 3 : uint32_t dummy = (uint32_t)std::min(dummy64, (uint64_t)UINT32_MAX);
115 :
116 3 : PeekData data(callback, closure);
117 6 : return mBufferedStream->ReadSegments(CallPeekFunc,
118 : &data,
119 : nsIOService::gDefaultSegmentSize,
120 6 : &dummy);
121 : }
122 :
123 : nsresult
124 279 : nsInputStreamPump::EnsureWaiting()
125 : {
126 279 : mMonitor.AssertCurrentThreadIn();
127 :
128 : // no need to worry about multiple threads... an input stream pump lives
129 : // on only one thread at a time.
130 279 : MOZ_ASSERT(mAsyncStream);
131 279 : if (!mWaitingForInputStreamReady && !mProcessingCallbacks) {
132 : // Ensure OnStateStop is called on the main thread.
133 81 : if (mState == STATE_STOP) {
134 : nsCOMPtr<nsIEventTarget> mainThread = mLabeledMainThreadTarget
135 : ? mLabeledMainThreadTarget
136 8 : : do_AddRef(GetMainThreadEventTarget());
137 4 : if (mTargetThread != mainThread) {
138 4 : mTargetThread = do_QueryInterface(mainThread);
139 : }
140 : }
141 81 : MOZ_ASSERT(mTargetThread);
142 81 : nsresult rv = mAsyncStream->AsyncWait(this, 0, 0, mTargetThread);
143 81 : if (NS_FAILED(rv)) {
144 0 : NS_ERROR("AsyncWait failed");
145 0 : return rv;
146 : }
147 : // Any retargeting during STATE_START or START_TRANSFER is complete
148 : // after the call to AsyncWait; next callback wil be on mTargetThread.
149 81 : mRetargeting = false;
150 81 : mWaitingForInputStreamReady = true;
151 : }
152 279 : return NS_OK;
153 : }
154 :
155 : //-----------------------------------------------------------------------------
156 : // nsInputStreamPump::nsISupports
157 : //-----------------------------------------------------------------------------
158 :
159 : // although this class can only be accessed from one thread at a time, we do
160 : // allow its ownership to move from thread to thread, assuming the consumer
161 : // understands the limitations of this.
162 813 : NS_IMPL_ISUPPORTS(nsInputStreamPump,
163 : nsIRequest,
164 : nsIThreadRetargetableRequest,
165 : nsIInputStreamCallback,
166 : nsIInputStreamPump)
167 :
168 : //-----------------------------------------------------------------------------
169 : // nsInputStreamPump::nsIRequest
170 : //-----------------------------------------------------------------------------
171 :
172 : NS_IMETHODIMP
173 0 : nsInputStreamPump::GetName(nsACString &result)
174 : {
175 0 : ReentrantMonitorAutoEnter mon(mMonitor);
176 :
177 0 : result.Truncate();
178 0 : return NS_OK;
179 : }
180 :
181 : NS_IMETHODIMP
182 0 : nsInputStreamPump::IsPending(bool *result)
183 : {
184 0 : ReentrantMonitorAutoEnter mon(mMonitor);
185 :
186 0 : *result = (mState != STATE_IDLE);
187 0 : return NS_OK;
188 : }
189 :
190 : NS_IMETHODIMP
191 76 : nsInputStreamPump::GetStatus(nsresult *status)
192 : {
193 152 : ReentrantMonitorAutoEnter mon(mMonitor);
194 :
195 76 : *status = mStatus;
196 152 : return NS_OK;
197 : }
198 :
199 : NS_IMETHODIMP
200 3 : nsInputStreamPump::Cancel(nsresult status)
201 : {
202 3 : MOZ_ASSERT(NS_IsMainThread());
203 :
204 6 : ReentrantMonitorAutoEnter mon(mMonitor);
205 :
206 3 : LOG(("nsInputStreamPump::Cancel [this=%p status=%" PRIx32 "]\n",
207 : this, static_cast<uint32_t>(status)));
208 :
209 3 : if (NS_FAILED(mStatus)) {
210 1 : LOG((" already canceled\n"));
211 1 : return NS_OK;
212 : }
213 :
214 2 : NS_ASSERTION(NS_FAILED(status), "cancel with non-failure status code");
215 2 : mStatus = status;
216 :
217 : // close input stream
218 2 : if (mAsyncStream) {
219 2 : mAsyncStream->CloseWithStatus(status);
220 2 : if (mSuspendCount == 0)
221 2 : EnsureWaiting();
222 : // Otherwise, EnsureWaiting will be called by Resume().
223 : // Note that while suspended, OnInputStreamReady will
224 : // not do anything, and also note that calling asyncWait
225 : // on a closed stream works and will dispatch an event immediately.
226 : }
227 2 : return NS_OK;
228 : }
229 :
230 : NS_IMETHODIMP
231 256 : nsInputStreamPump::Suspend()
232 : {
233 512 : ReentrantMonitorAutoEnter mon(mMonitor);
234 :
235 256 : LOG(("nsInputStreamPump::Suspend [this=%p]\n", this));
236 256 : NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
237 256 : ++mSuspendCount;
238 256 : return NS_OK;
239 : }
240 :
241 : NS_IMETHODIMP
242 256 : nsInputStreamPump::Resume()
243 : {
244 512 : ReentrantMonitorAutoEnter mon(mMonitor);
245 :
246 256 : LOG(("nsInputStreamPump::Resume [this=%p]\n", this));
247 256 : NS_ENSURE_TRUE(mSuspendCount > 0, NS_ERROR_UNEXPECTED);
248 256 : NS_ENSURE_TRUE(mState != STATE_IDLE, NS_ERROR_UNEXPECTED);
249 :
250 : // There is a brief in-between state when we null out mAsyncStream in
251 : // OnStateStop() before calling OnStopRequest, and only afterwards set
252 : // STATE_IDLE, which we need to handle gracefully.
253 256 : if (--mSuspendCount == 0 && mAsyncStream)
254 196 : EnsureWaiting();
255 256 : return NS_OK;
256 : }
257 :
258 : NS_IMETHODIMP
259 0 : nsInputStreamPump::GetLoadFlags(nsLoadFlags *aLoadFlags)
260 : {
261 0 : ReentrantMonitorAutoEnter mon(mMonitor);
262 :
263 0 : *aLoadFlags = mLoadFlags;
264 0 : return NS_OK;
265 : }
266 :
267 : NS_IMETHODIMP
268 0 : nsInputStreamPump::SetLoadFlags(nsLoadFlags aLoadFlags)
269 : {
270 0 : ReentrantMonitorAutoEnter mon(mMonitor);
271 :
272 0 : mLoadFlags = aLoadFlags;
273 0 : return NS_OK;
274 : }
275 :
276 : NS_IMETHODIMP
277 0 : nsInputStreamPump::GetLoadGroup(nsILoadGroup **aLoadGroup)
278 : {
279 0 : ReentrantMonitorAutoEnter mon(mMonitor);
280 :
281 0 : NS_IF_ADDREF(*aLoadGroup = mLoadGroup);
282 0 : return NS_OK;
283 : }
284 :
285 : NS_IMETHODIMP
286 0 : nsInputStreamPump::SetLoadGroup(nsILoadGroup *aLoadGroup)
287 : {
288 0 : ReentrantMonitorAutoEnter mon(mMonitor);
289 :
290 0 : mLoadGroup = aLoadGroup;
291 0 : return NS_OK;
292 : }
293 :
294 : //-----------------------------------------------------------------------------
295 : // nsInputStreamPump::nsIInputStreamPump implementation
296 : //-----------------------------------------------------------------------------
297 :
298 : NS_IMETHODIMP
299 73 : nsInputStreamPump::Init(nsIInputStream *stream,
300 : int64_t streamPos, int64_t streamLen,
301 : uint32_t segsize, uint32_t segcount,
302 : bool closeWhenDone, nsIEventTarget *mainThreadTarget)
303 : {
304 73 : NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
305 :
306 73 : mStreamOffset = uint64_t(streamPos);
307 73 : if (int64_t(streamLen) >= int64_t(0))
308 0 : mStreamLength = uint64_t(streamLen);
309 73 : mStream = stream;
310 73 : mSegSize = segsize;
311 73 : mSegCount = segcount;
312 73 : mCloseWhenDone = closeWhenDone;
313 73 : mLabeledMainThreadTarget = mainThreadTarget;
314 :
315 73 : return NS_OK;
316 : }
317 :
318 : NS_IMETHODIMP
319 73 : nsInputStreamPump::AsyncRead(nsIStreamListener *listener, nsISupports *ctxt)
320 : {
321 146 : ReentrantMonitorAutoEnter mon(mMonitor);
322 :
323 73 : NS_ENSURE_TRUE(mState == STATE_IDLE, NS_ERROR_IN_PROGRESS);
324 73 : NS_ENSURE_ARG_POINTER(listener);
325 73 : MOZ_ASSERT(NS_IsMainThread(), "nsInputStreamPump should be read from the "
326 : "main thread only.");
327 :
328 : //
329 : // OK, we need to use the stream transport service if
330 : //
331 : // (1) the stream is blocking
332 : // (2) the stream does not support nsIAsyncInputStream
333 : //
334 :
335 : bool nonBlocking;
336 73 : nsresult rv = mStream->IsNonBlocking(&nonBlocking);
337 73 : if (NS_FAILED(rv)) return rv;
338 :
339 73 : if (nonBlocking) {
340 10 : mAsyncStream = do_QueryInterface(mStream);
341 : //
342 : // if the stream supports nsIAsyncInputStream, and if we need to seek
343 : // to a starting offset, then we must do so here. in the non-async
344 : // stream case, the stream transport service will take care of seeking
345 : // for us.
346 : //
347 10 : if (mAsyncStream && (mStreamOffset != UINT64_MAX)) {
348 0 : nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mStream);
349 0 : if (seekable)
350 0 : seekable->Seek(nsISeekableStream::NS_SEEK_SET, mStreamOffset);
351 : }
352 : }
353 :
354 73 : if (!mAsyncStream) {
355 : // ok, let's use the stream transport service to read this stream.
356 : nsCOMPtr<nsIStreamTransportService> sts =
357 132 : do_GetService(kStreamTransportServiceCID, &rv);
358 66 : if (NS_FAILED(rv)) return rv;
359 :
360 132 : nsCOMPtr<nsITransport> transport;
361 198 : rv = sts->CreateInputTransport(mStream, mStreamOffset, mStreamLength,
362 198 : mCloseWhenDone, getter_AddRefs(transport));
363 66 : if (NS_FAILED(rv)) return rv;
364 :
365 132 : nsCOMPtr<nsIInputStream> wrapper;
366 66 : rv = transport->OpenInputStream(0, mSegSize, mSegCount, getter_AddRefs(wrapper));
367 66 : if (NS_FAILED(rv)) return rv;
368 :
369 66 : mAsyncStream = do_QueryInterface(wrapper, &rv);
370 66 : if (NS_FAILED(rv)) return rv;
371 : }
372 :
373 : // release our reference to the original stream. from this point forward,
374 : // we only reference the "stream" via mAsyncStream.
375 73 : mStream = nullptr;
376 :
377 : // mStreamOffset now holds the number of bytes currently read. we use this
378 : // to enforce the mStreamLength restriction.
379 73 : mStreamOffset = 0;
380 :
381 : // grab event queue (we must do this here by contract, since all notifications
382 : // must go to the thread which called AsyncRead)
383 73 : if (NS_IsMainThread() && mLabeledMainThreadTarget) {
384 67 : mTargetThread = mLabeledMainThreadTarget;
385 : } else {
386 6 : mTargetThread = GetCurrentThreadEventTarget();
387 : }
388 73 : NS_ENSURE_STATE(mTargetThread);
389 :
390 73 : rv = EnsureWaiting();
391 73 : if (NS_FAILED(rv)) return rv;
392 :
393 73 : if (mLoadGroup)
394 0 : mLoadGroup->AddRequest(this, nullptr);
395 :
396 73 : mState = STATE_START;
397 73 : mListener = listener;
398 73 : mListenerContext = ctxt;
399 73 : return NS_OK;
400 : }
401 :
402 : //-----------------------------------------------------------------------------
403 : // nsInputStreamPump::nsIInputStreamCallback implementation
404 : //-----------------------------------------------------------------------------
405 :
406 : NS_IMETHODIMP
407 81 : nsInputStreamPump::OnInputStreamReady(nsIAsyncInputStream *stream)
408 : {
409 81 : LOG(("nsInputStreamPump::OnInputStreamReady [this=%p]\n", this));
410 :
411 162 : AUTO_PROFILER_LABEL("nsInputStreamPump::OnInputStreamReady", NETWORK);
412 :
413 : // this function has been called from a PLEvent, so we can safely call
414 : // any listener or progress sink methods directly from here.
415 :
416 : for (;;) {
417 : // There should only be one iteration of this loop happening at a time.
418 : // To prevent AsyncWait() (called during callbacks or on other threads)
419 : // from creating a parallel OnInputStreamReady(), we use:
420 : // -- a monitor; and
421 : // -- a boolean mProcessingCallbacks to detect parallel loops
422 : // when exiting the monitor for callbacks.
423 495 : ReentrantMonitorAutoEnter lock(mMonitor);
424 :
425 : // Prevent parallel execution during callbacks, while out of monitor.
426 288 : if (mProcessingCallbacks) {
427 0 : MOZ_ASSERT(!mProcessingCallbacks);
428 0 : break;
429 : }
430 288 : mProcessingCallbacks = true;
431 288 : if (mSuspendCount || mState == STATE_IDLE) {
432 73 : mWaitingForInputStreamReady = false;
433 73 : mProcessingCallbacks = false;
434 73 : break;
435 : }
436 :
437 : uint32_t nextState;
438 215 : switch (mState) {
439 : case STATE_START:
440 73 : nextState = OnStateStart();
441 73 : break;
442 : case STATE_TRANSFER:
443 69 : nextState = OnStateTransfer();
444 69 : break;
445 : case STATE_STOP:
446 73 : mRetargeting = false;
447 73 : nextState = OnStateStop();
448 73 : break;
449 : default:
450 0 : nextState = 0;
451 0 : NS_NOTREACHED("Unknown enum value.");
452 0 : return NS_ERROR_UNEXPECTED;
453 : }
454 :
455 215 : bool stillTransferring = (mState == STATE_TRANSFER &&
456 215 : nextState == STATE_TRANSFER);
457 215 : if (stillTransferring) {
458 0 : NS_ASSERTION(NS_SUCCEEDED(mStatus),
459 : "Should not have failed status for ongoing transfer");
460 : } else {
461 215 : NS_ASSERTION(mState != nextState,
462 : "Only OnStateTransfer can be called more than once.");
463 : }
464 215 : if (mRetargeting) {
465 4 : NS_ASSERTION(mState != STATE_STOP,
466 : "Retargeting should not happen during OnStateStop.");
467 : }
468 :
469 : // Set mRetargeting so EnsureWaiting will be called. It ensures that
470 : // OnStateStop is called on the main thread.
471 215 : if (nextState == STATE_STOP && !NS_IsMainThread()) {
472 4 : mRetargeting = true;
473 : }
474 :
475 : // Unset mProcessingCallbacks here (while we have lock) so our own call to
476 : // EnsureWaiting isn't blocked by it.
477 215 : mProcessingCallbacks = false;
478 :
479 : // We must break the loop when we're switching event delivery to another
480 : // thread and the input stream pump is suspended, otherwise
481 : // OnStateStop() might be called off the main thread. See bug 1026951
482 : // comment #107 for the exact scenario.
483 215 : if (mSuspendCount && mRetargeting) {
484 0 : mState = nextState;
485 0 : mWaitingForInputStreamReady = false;
486 0 : break;
487 : }
488 :
489 : // Wait asynchronously if there is still data to transfer, or we're
490 : // switching event delivery to another thread.
491 215 : if (!mSuspendCount && (stillTransferring || mRetargeting)) {
492 8 : mState = nextState;
493 8 : mWaitingForInputStreamReady = false;
494 8 : nsresult rv = EnsureWaiting();
495 8 : if (NS_SUCCEEDED(rv))
496 8 : break;
497 :
498 : // Failure to start asynchronous wait: stop transfer.
499 : // Do not set mStatus if it was previously set to report a failure.
500 0 : if (NS_SUCCEEDED(mStatus)) {
501 0 : mStatus = rv;
502 : }
503 0 : nextState = STATE_STOP;
504 : }
505 :
506 207 : mState = nextState;
507 207 : }
508 81 : return NS_OK;
509 : }
510 :
511 : uint32_t
512 73 : nsInputStreamPump::OnStateStart()
513 : {
514 73 : mMonitor.AssertCurrentThreadIn();
515 :
516 146 : AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStart", NETWORK);
517 :
518 73 : LOG((" OnStateStart [this=%p]\n", this));
519 :
520 : nsresult rv;
521 :
522 : // need to check the reason why the stream is ready. this is required
523 : // so our listener can check our status from OnStartRequest.
524 : // XXX async streams should have a GetStatus method!
525 73 : if (NS_SUCCEEDED(mStatus)) {
526 : uint64_t avail;
527 71 : rv = mAsyncStream->Available(&avail);
528 71 : if (NS_FAILED(rv) && rv != NS_BASE_STREAM_CLOSED)
529 0 : mStatus = rv;
530 : }
531 :
532 : {
533 : // Note: Must exit monitor for call to OnStartRequest to avoid
534 : // deadlocks when calls to RetargetDeliveryTo for multiple
535 : // nsInputStreamPumps are needed (e.g. nsHttpChannel).
536 73 : mMonitor.Exit();
537 73 : rv = mListener->OnStartRequest(this, mListenerContext);
538 73 : mMonitor.Enter();
539 : }
540 :
541 : // an error returned from OnStartRequest should cause us to abort; however,
542 : // we must not stomp on mStatus if already canceled.
543 73 : if (NS_FAILED(rv) && NS_SUCCEEDED(mStatus))
544 2 : mStatus = rv;
545 :
546 146 : return NS_SUCCEEDED(mStatus) ? STATE_TRANSFER : STATE_STOP;
547 : }
548 :
549 : uint32_t
550 69 : nsInputStreamPump::OnStateTransfer()
551 : {
552 69 : mMonitor.AssertCurrentThreadIn();
553 :
554 138 : AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateTransfer", NETWORK);
555 :
556 69 : LOG((" OnStateTransfer [this=%p]\n", this));
557 :
558 : // if canceled, go directly to STATE_STOP...
559 69 : if (NS_FAILED(mStatus))
560 0 : return STATE_STOP;
561 :
562 69 : nsresult rv = CreateBufferedStreamIfNeeded();
563 69 : if (NS_WARN_IF(NS_FAILED(rv))) {
564 0 : return STATE_STOP;
565 : }
566 :
567 : uint64_t avail;
568 69 : rv = mBufferedStream->Available(&avail);
569 69 : LOG((" Available returned [stream=%p rv=%" PRIx32 " avail=%" PRIu64 "]\n", mAsyncStream.get(),
570 : static_cast<uint32_t>(rv), avail));
571 :
572 69 : if (rv == NS_BASE_STREAM_CLOSED) {
573 1 : rv = NS_OK;
574 1 : avail = 0;
575 : }
576 68 : else if (NS_SUCCEEDED(rv) && avail) {
577 : // figure out how much data to report (XXX detect overflow??)
578 68 : if (avail > mStreamLength - mStreamOffset)
579 0 : avail = mStreamLength - mStreamOffset;
580 :
581 68 : if (avail) {
582 : // we used to limit avail to 16K - we were afraid some ODA handlers
583 : // might assume they wouldn't get more than 16K at once
584 : // we're removing that limit since it speeds up local file access.
585 : // Now there's an implicit 64K limit of 4 16K segments
586 : // NOTE: ok, so the story is as follows. OnDataAvailable impls
587 : // are by contract supposed to consume exactly |avail| bytes.
588 : // however, many do not... mailnews... stream converters...
589 : // cough, cough. the input stream pump is fairly tolerant
590 : // in this regard; however, if an ODA does not consume any
591 : // data from the stream, then we could potentially end up in
592 : // an infinite loop. we do our best here to try to catch
593 : // such an error. (see bug 189672)
594 :
595 : // in most cases this QI will succeed (mAsyncStream is almost always
596 : // a nsPipeInputStream, which implements nsISeekableStream::Tell).
597 : int64_t offsetBefore;
598 136 : nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mBufferedStream);
599 68 : if (seekable && NS_FAILED(seekable->Tell(&offsetBefore))) {
600 0 : NS_NOTREACHED("Tell failed on readable stream");
601 0 : offsetBefore = 0;
602 : }
603 :
604 : uint32_t odaAvail =
605 68 : avail > UINT32_MAX ?
606 68 : UINT32_MAX : uint32_t(avail);
607 :
608 68 : LOG((" calling OnDataAvailable [offset=%" PRIu64 " count=%" PRIu64 "(%u)]\n",
609 : mStreamOffset, avail, odaAvail));
610 :
611 : {
612 : // Note: Must exit monitor for call to OnStartRequest to avoid
613 : // deadlocks when calls to RetargetDeliveryTo for multiple
614 : // nsInputStreamPumps are needed (e.g. nsHttpChannel).
615 68 : mMonitor.Exit();
616 136 : rv = mListener->OnDataAvailable(this, mListenerContext,
617 : mBufferedStream, mStreamOffset,
618 136 : odaAvail);
619 68 : mMonitor.Enter();
620 : }
621 :
622 : // don't enter this code if ODA failed or called Cancel
623 68 : if (NS_SUCCEEDED(rv) && NS_SUCCEEDED(mStatus)) {
624 : // test to see if this ODA failed to consume data
625 67 : if (seekable) {
626 : // NOTE: if Tell fails, which can happen if the stream is
627 : // now closed, then we assume that everything was read.
628 : int64_t offsetAfter;
629 67 : if (NS_FAILED(seekable->Tell(&offsetAfter)))
630 65 : offsetAfter = offsetBefore + odaAvail;
631 67 : if (offsetAfter > offsetBefore)
632 67 : mStreamOffset += (offsetAfter - offsetBefore);
633 0 : else if (mSuspendCount == 0) {
634 : //
635 : // possible infinite loop if we continue pumping data!
636 : //
637 : // NOTE: although not allowed by nsIStreamListener, we
638 : // will allow the ODA impl to Suspend the pump. IMAP
639 : // does this :-(
640 : //
641 0 : NS_ERROR("OnDataAvailable implementation consumed no data");
642 0 : mStatus = NS_ERROR_UNEXPECTED;
643 : }
644 : }
645 : else
646 0 : mStreamOffset += odaAvail; // assume ODA behaved well
647 : }
648 : }
649 : }
650 :
651 : // an error returned from Available or OnDataAvailable should cause us to
652 : // abort; however, we must not stomp on mStatus if already canceled.
653 :
654 69 : if (NS_SUCCEEDED(mStatus)) {
655 69 : if (NS_FAILED(rv))
656 1 : mStatus = rv;
657 68 : else if (avail) {
658 : // if stream is now closed, advance to STATE_STOP right away.
659 : // Available may return 0 bytes available at the moment; that
660 : // would not mean that we are done.
661 : // XXX async streams should have a GetStatus method!
662 67 : rv = mBufferedStream->Available(&avail);
663 67 : if (NS_SUCCEEDED(rv))
664 0 : return STATE_TRANSFER;
665 67 : if (rv != NS_BASE_STREAM_CLOSED)
666 0 : mStatus = rv;
667 : }
668 : }
669 69 : return STATE_STOP;
670 : }
671 :
672 : nsresult
673 0 : nsInputStreamPump::CallOnStateStop()
674 : {
675 0 : ReentrantMonitorAutoEnter mon(mMonitor);
676 :
677 0 : MOZ_ASSERT(NS_IsMainThread(),
678 : "CallOnStateStop should only be called on the main thread.");
679 :
680 0 : mState = OnStateStop();
681 0 : return NS_OK;
682 : }
683 :
684 : uint32_t
685 73 : nsInputStreamPump::OnStateStop()
686 : {
687 73 : mMonitor.AssertCurrentThreadIn();
688 :
689 73 : if (!NS_IsMainThread()) {
690 : // Hopefully temporary hack: OnStateStop should only run on the main
691 : // thread, but we're seeing some rare off-main-thread calls. For now
692 : // just redispatch to the main thread in release builds, and crash in
693 : // debug builds.
694 0 : MOZ_ASSERT(NS_IsMainThread(),
695 : "OnStateStop should only be called on the main thread.");
696 0 : nsresult rv = NS_DispatchToMainThread(
697 0 : NewRunnableMethod("nsInputStreamPump::CallOnStateStop",
698 : this,
699 0 : &nsInputStreamPump::CallOnStateStop));
700 0 : NS_ENSURE_SUCCESS(rv, STATE_IDLE);
701 0 : return STATE_IDLE;
702 : }
703 :
704 146 : AUTO_PROFILER_LABEL("nsInputStreamPump::OnStateStop", NETWORK);
705 :
706 73 : LOG((" OnStateStop [this=%p status=%" PRIx32 "]\n", this, static_cast<uint32_t>(mStatus)));
707 :
708 : // if an error occurred, we must be sure to pass the error onto the async
709 : // stream. in some cases, this is redundant, but since close is idempotent,
710 : // this is OK. otherwise, be sure to honor the "close-when-done" option.
711 :
712 73 : if (!mAsyncStream || !mListener) {
713 0 : MOZ_ASSERT(mAsyncStream, "null mAsyncStream: OnStateStop called twice?");
714 0 : MOZ_ASSERT(mListener, "null mListener: OnStateStop called twice?");
715 0 : return STATE_IDLE;
716 : }
717 :
718 73 : if (NS_FAILED(mStatus))
719 5 : mAsyncStream->CloseWithStatus(mStatus);
720 68 : else if (mCloseWhenDone)
721 65 : mAsyncStream->Close();
722 :
723 73 : mAsyncStream = nullptr;
724 73 : mBufferedStream = nullptr;
725 73 : mTargetThread = nullptr;
726 73 : mIsPending = false;
727 : {
728 : // Note: Must exit monitor for call to OnStartRequest to avoid
729 : // deadlocks when calls to RetargetDeliveryTo for multiple
730 : // nsInputStreamPumps are needed (e.g. nsHttpChannel).
731 73 : mMonitor.Exit();
732 73 : mListener->OnStopRequest(this, mListenerContext, mStatus);
733 73 : mMonitor.Enter();
734 : }
735 73 : mListener = nullptr;
736 73 : mListenerContext = nullptr;
737 :
738 73 : if (mLoadGroup)
739 0 : mLoadGroup->RemoveRequest(this, nullptr, mStatus);
740 :
741 73 : return STATE_IDLE;
742 : }
743 :
744 : nsresult
745 75 : nsInputStreamPump::CreateBufferedStreamIfNeeded()
746 : {
747 75 : if (mBufferedStream) {
748 3 : return NS_OK;
749 : }
750 :
751 : // ReadSegments is not available for any nsIAsyncInputStream. In order to use
752 : // it, we wrap a nsIBufferedInputStream around it, if needed.
753 :
754 72 : if (NS_InputStreamIsBuffered(mAsyncStream)) {
755 72 : mBufferedStream = mAsyncStream;
756 72 : return NS_OK;
757 : }
758 :
759 0 : nsresult rv = NS_NewBufferedInputStream(getter_AddRefs(mBufferedStream),
760 0 : mAsyncStream, 4096);
761 0 : NS_ENSURE_SUCCESS(rv, rv);
762 :
763 0 : return NS_OK;
764 : }
765 :
766 : //-----------------------------------------------------------------------------
767 : // nsIThreadRetargetableRequest
768 : //-----------------------------------------------------------------------------
769 :
770 : NS_IMETHODIMP
771 4 : nsInputStreamPump::RetargetDeliveryTo(nsIEventTarget* aNewTarget)
772 : {
773 8 : ReentrantMonitorAutoEnter mon(mMonitor);
774 :
775 4 : NS_ENSURE_ARG(aNewTarget);
776 4 : NS_ENSURE_TRUE(mState == STATE_START || mState == STATE_TRANSFER,
777 : NS_ERROR_UNEXPECTED);
778 :
779 : // If canceled, do not retarget. Return with canceled status.
780 4 : if (NS_FAILED(mStatus)) {
781 0 : return mStatus;
782 : }
783 :
784 4 : if (aNewTarget == mTargetThread) {
785 0 : NS_WARNING("Retargeting delivery to same thread");
786 0 : return NS_OK;
787 : }
788 :
789 : // Ensure that |mListener| and any subsequent listeners can be retargeted
790 : // to another thread.
791 4 : nsresult rv = NS_OK;
792 : nsCOMPtr<nsIThreadRetargetableStreamListener> retargetableListener =
793 8 : do_QueryInterface(mListener, &rv);
794 4 : if (NS_SUCCEEDED(rv) && retargetableListener) {
795 4 : rv = retargetableListener->CheckListenerChain();
796 4 : if (NS_SUCCEEDED(rv)) {
797 4 : mTargetThread = aNewTarget;
798 4 : mRetargeting = true;
799 : }
800 : }
801 4 : LOG(("nsInputStreamPump::RetargetDeliveryTo [this=%p aNewTarget=%p] "
802 : "%s listener [%p] rv[%" PRIx32 "]",
803 : this, aNewTarget, (mTargetThread == aNewTarget ? "success" : "failure"),
804 : (nsIStreamListener*)mListener, static_cast<uint32_t>(rv)));
805 4 : return rv;
806 : }
|