Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include <algorithm>
8 : #include "mozilla/Attributes.h"
9 : #include "mozilla/IntegerPrintfMacros.h"
10 : #include "mozilla/ReentrantMonitor.h"
11 : #include "nsIBufferedStreams.h"
12 : #include "nsICloneableInputStream.h"
13 : #include "nsIPipe.h"
14 : #include "nsIEventTarget.h"
15 : #include "nsISeekableStream.h"
16 : #include "mozilla/RefPtr.h"
17 : #include "nsSegmentedBuffer.h"
18 : #include "nsStreamUtils.h"
19 : #include "nsCOMPtr.h"
20 : #include "nsCRT.h"
21 : #include "mozilla/Logging.h"
22 : #include "nsIClassInfoImpl.h"
23 : #include "nsAlgorithm.h"
24 : #include "nsMemory.h"
25 : #include "nsIAsyncInputStream.h"
26 : #include "nsIAsyncOutputStream.h"
27 :
28 : using namespace mozilla;
29 :
30 : #ifdef LOG
31 : #undef LOG
32 : #endif
33 : //
34 : // set MOZ_LOG=nsPipe:5
35 : //
36 : static LazyLogModule sPipeLog("nsPipe");
37 : #define LOG(args) MOZ_LOG(sPipeLog, mozilla::LogLevel::Debug, args)
38 :
39 : #define DEFAULT_SEGMENT_SIZE 4096
40 : #define DEFAULT_SEGMENT_COUNT 16
41 :
42 : class nsPipe;
43 : class nsPipeEvents;
44 : class nsPipeInputStream;
45 : class nsPipeOutputStream;
46 : class AutoReadSegment;
47 :
48 : namespace {
49 :
50 : enum MonitorAction
51 : {
52 : DoNotNotifyMonitor,
53 : NotifyMonitor
54 : };
55 :
56 : enum SegmentChangeResult
57 : {
58 : SegmentNotChanged,
59 : SegmentAdvanceBufferRead
60 : };
61 :
62 : } // namespace
63 :
64 : //-----------------------------------------------------------------------------
65 :
66 : // this class is used to delay notifications until the end of a particular
67 : // scope. it helps avoid the complexity of issuing callbacks while inside
68 : // a critical section.
69 : class nsPipeEvents
70 : {
71 : public:
72 713 : nsPipeEvents() { }
73 : ~nsPipeEvents();
74 :
75 76 : inline void NotifyInputReady(nsIAsyncInputStream* aStream,
76 : nsIInputStreamCallback* aCallback)
77 : {
78 76 : mInputList.AppendElement(InputEntry(aStream, aCallback));
79 76 : }
80 :
81 0 : inline void NotifyOutputReady(nsIAsyncOutputStream* aStream,
82 : nsIOutputStreamCallback* aCallback)
83 : {
84 0 : NS_ASSERTION(!mOutputCallback, "already have an output event");
85 0 : mOutputStream = aStream;
86 0 : mOutputCallback = aCallback;
87 0 : }
88 :
89 : private:
90 228 : struct InputEntry
91 : {
92 76 : InputEntry(nsIAsyncInputStream* aStream, nsIInputStreamCallback* aCallback)
93 76 : : mStream(aStream)
94 76 : , mCallback(aCallback)
95 : {
96 76 : MOZ_ASSERT(mStream);
97 76 : MOZ_ASSERT(mCallback);
98 76 : }
99 :
100 : nsCOMPtr<nsIAsyncInputStream> mStream;
101 : nsCOMPtr<nsIInputStreamCallback> mCallback;
102 : };
103 :
104 : nsTArray<InputEntry> mInputList;
105 :
106 : nsCOMPtr<nsIAsyncOutputStream> mOutputStream;
107 : nsCOMPtr<nsIOutputStreamCallback> mOutputCallback;
108 : };
109 :
110 : //-----------------------------------------------------------------------------
111 :
112 : // This class is used to maintain input stream state. Its broken out from the
113 : // nsPipeInputStream class because generally the nsPipe should be modifying
114 : // this state and not the input stream itself.
115 : struct nsPipeReadState
116 : {
117 76 : nsPipeReadState()
118 76 : : mReadCursor(nullptr)
119 : , mReadLimit(nullptr)
120 : , mSegment(0)
121 : , mAvailable(0)
122 : , mActiveRead(false)
123 76 : , mNeedDrain(false)
124 76 : { }
125 :
126 : char* mReadCursor;
127 : char* mReadLimit;
128 : int32_t mSegment;
129 : uint32_t mAvailable;
130 :
131 : // This flag is managed using the AutoReadSegment RAII stack class.
132 : bool mActiveRead;
133 :
134 : // Set to indicate that the input stream has closed and should be drained,
135 : // but that drain has been delayed due to an active read. When the read
136 : // completes, this flag indicate the drain should then be performed.
137 : bool mNeedDrain;
138 : };
139 :
140 : //-----------------------------------------------------------------------------
141 :
142 : // an input end of a pipe (maintained as a list of refs within the pipe)
143 : class nsPipeInputStream final
144 : : public nsIAsyncInputStream
145 : , public nsISeekableStream
146 : , public nsISearchableInputStream
147 : , public nsICloneableInputStream
148 : , public nsIClassInfo
149 : , public nsIBufferedInputStream
150 : {
151 : public:
152 : NS_DECL_THREADSAFE_ISUPPORTS
153 : NS_DECL_NSIINPUTSTREAM
154 : NS_DECL_NSIASYNCINPUTSTREAM
155 : NS_DECL_NSISEEKABLESTREAM
156 : NS_DECL_NSISEARCHABLEINPUTSTREAM
157 : NS_DECL_NSICLONEABLEINPUTSTREAM
158 : NS_DECL_NSICLASSINFO
159 : NS_DECL_NSIBUFFEREDINPUTSTREAM
160 :
161 76 : explicit nsPipeInputStream(nsPipe* aPipe)
162 76 : : mPipe(aPipe)
163 : , mLogicalOffset(0)
164 : , mInputStatus(NS_OK)
165 : , mBlocking(true)
166 : , mBlocked(false)
167 76 : , mCallbackFlags(0)
168 76 : { }
169 :
170 0 : explicit nsPipeInputStream(const nsPipeInputStream& aOther)
171 0 : : mPipe(aOther.mPipe)
172 0 : , mLogicalOffset(aOther.mLogicalOffset)
173 0 : , mInputStatus(aOther.mInputStatus)
174 0 : , mBlocking(aOther.mBlocking)
175 : , mBlocked(false)
176 : , mCallbackFlags(0)
177 0 : , mReadState(aOther.mReadState)
178 0 : { }
179 :
180 : nsresult Fill();
181 76 : void SetNonBlocking(bool aNonBlocking)
182 : {
183 76 : mBlocking = !aNonBlocking;
184 76 : }
185 :
186 : uint32_t Available();
187 :
188 : // synchronously wait for the pipe to become readable.
189 : nsresult Wait();
190 :
191 : // These two don't acquire the monitor themselves. Instead they
192 : // expect their caller to have done so and to pass the monitor as
193 : // evidence.
194 : MonitorAction OnInputReadable(uint32_t aBytesWritten, nsPipeEvents&,
195 : const ReentrantMonitorAutoEnter& ev);
196 : MonitorAction OnInputException(nsresult, nsPipeEvents&,
197 : const ReentrantMonitorAutoEnter& ev);
198 :
199 466 : nsPipeReadState& ReadState()
200 : {
201 466 : return mReadState;
202 : }
203 :
204 : const nsPipeReadState& ReadState() const
205 : {
206 : return mReadState;
207 : }
208 :
209 : nsresult Status() const;
210 :
211 : // A version of Status() that doesn't acquire the monitor.
212 : nsresult Status(const ReentrantMonitorAutoEnter& ev) const;
213 :
214 : private:
215 : virtual ~nsPipeInputStream();
216 :
217 : RefPtr<nsPipe> mPipe;
218 :
219 : int64_t mLogicalOffset;
220 : // Individual input streams can be closed without effecting the rest of the
221 : // pipe. So track individual input stream status separately. |mInputStatus|
222 : // is protected by |mPipe->mReentrantMonitor|.
223 : nsresult mInputStatus;
224 : bool mBlocking;
225 :
226 : // these variables can only be accessed while inside the pipe's monitor
227 : bool mBlocked;
228 : nsCOMPtr<nsIInputStreamCallback> mCallback;
229 : uint32_t mCallbackFlags;
230 :
231 : // requires pipe's monitor; usually treat as an opaque token to pass to nsPipe
232 : nsPipeReadState mReadState;
233 : };
234 :
235 : //-----------------------------------------------------------------------------
236 :
237 : // the output end of a pipe (allocated as a member of the pipe).
238 75 : class nsPipeOutputStream
239 : : public nsIAsyncOutputStream
240 : , public nsIClassInfo
241 : {
242 : public:
243 : // since this class will be allocated as a member of the pipe, we do not
244 : // need our own ref count. instead, we share the lifetime (the ref count)
245 : // of the entire pipe. this macro is just convenience since it does not
246 : // declare a mRefCount variable; however, don't let the name fool you...
247 : // we are not inheriting from nsPipe ;-)
248 : NS_DECL_ISUPPORTS_INHERITED
249 :
250 : NS_DECL_NSIOUTPUTSTREAM
251 : NS_DECL_NSIASYNCOUTPUTSTREAM
252 : NS_DECL_NSICLASSINFO
253 :
254 76 : explicit nsPipeOutputStream(nsPipe* aPipe)
255 76 : : mPipe(aPipe)
256 : , mWriterRefCnt(0)
257 : , mLogicalOffset(0)
258 : , mBlocking(true)
259 : , mBlocked(false)
260 : , mWritable(true)
261 76 : , mCallbackFlags(0)
262 76 : { }
263 :
264 76 : void SetNonBlocking(bool aNonBlocking)
265 : {
266 76 : mBlocking = !aNonBlocking;
267 76 : }
268 15 : void SetWritable(bool aWritable)
269 : {
270 15 : mWritable = aWritable;
271 15 : }
272 :
273 : // synchronously wait for the pipe to become writable.
274 : nsresult Wait();
275 :
276 : MonitorAction OnOutputWritable(nsPipeEvents&);
277 : MonitorAction OnOutputException(nsresult, nsPipeEvents&);
278 :
279 : private:
280 : nsPipe* mPipe;
281 :
282 : // separate refcnt so that we know when to close the producer
283 : mozilla::ThreadSafeAutoRefCnt mWriterRefCnt;
284 : int64_t mLogicalOffset;
285 : bool mBlocking;
286 :
287 : // these variables can only be accessed while inside the pipe's monitor
288 : bool mBlocked;
289 : bool mWritable;
290 : nsCOMPtr<nsIOutputStreamCallback> mCallback;
291 : uint32_t mCallbackFlags;
292 : };
293 :
294 : //-----------------------------------------------------------------------------
295 :
296 : class nsPipe final : public nsIPipe
297 : {
298 : public:
299 : friend class nsPipeInputStream;
300 : friend class nsPipeOutputStream;
301 : friend class AutoReadSegment;
302 :
303 : NS_DECL_THREADSAFE_ISUPPORTS
304 : NS_DECL_NSIPIPE
305 :
306 : // nsPipe methods:
307 : nsPipe();
308 :
309 : private:
310 : ~nsPipe();
311 :
312 : //
313 : // Methods below may only be called while inside the pipe's monitor. Some
314 : // of these methods require passing a ReentrantMonitorAutoEnter to prove the
315 : // monitor is held.
316 : //
317 :
318 : void PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
319 : char*& aCursor, char*& aLimit);
320 : SegmentChangeResult AdvanceReadSegment(nsPipeReadState& aReadState,
321 : const ReentrantMonitorAutoEnter &ev);
322 : bool ReadSegmentBeingWritten(nsPipeReadState& aReadState);
323 : uint32_t CountSegmentReferences(int32_t aSegment);
324 : void SetAllNullReadCursors();
325 : bool AllReadCursorsMatchWriteCursor();
326 : void RollBackAllReadCursors(char* aWriteCursor);
327 : void UpdateAllReadCursors(char* aWriteCursor);
328 : void ValidateAllReadCursors();
329 : uint32_t GetBufferSegmentCount(const nsPipeReadState& aReadState,
330 : const ReentrantMonitorAutoEnter& ev) const;
331 : bool IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const;
332 :
333 : //
334 : // methods below may be called while outside the pipe's monitor
335 : //
336 :
337 : void DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents);
338 : nsresult GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen);
339 : void AdvanceWriteCursor(uint32_t aCount);
340 :
341 : void OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason);
342 : void OnPipeException(nsresult aReason, bool aOutputOnly = false);
343 :
344 : nsresult CloneInputStream(nsPipeInputStream* aOriginal,
345 : nsIInputStream** aCloneOut);
346 :
347 : // methods below should only be called by AutoReadSegment
348 : nsresult GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
349 : uint32_t& aLength);
350 : void ReleaseReadSegment(nsPipeReadState& aReadState,
351 : nsPipeEvents& aEvents);
352 : void AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aCount);
353 :
354 : // We can't inherit from both nsIInputStream and nsIOutputStream
355 : // because they collide on their Close method. Consequently we nest their
356 : // implementations to avoid the extra object allocation.
357 : nsPipeOutputStream mOutput;
358 :
359 : // Since the input stream can be cloned, we may have more than one. Use
360 : // a weak reference as the streams will clear their entry here in their
361 : // destructor. Using a strong reference would create a reference cycle.
362 : // Only usable while mReentrantMonitor is locked.
363 : nsTArray<nsPipeInputStream*> mInputList;
364 :
365 : // But hold a strong ref to our original input stream. For backward
366 : // compatibility we need to be able to consistently return this same
367 : // object from GetInputStream(). Note, mOriginalInput is also stored
368 : // in mInputList as a weak ref.
369 : RefPtr<nsPipeInputStream> mOriginalInput;
370 :
371 : ReentrantMonitor mReentrantMonitor;
372 : nsSegmentedBuffer mBuffer;
373 :
374 : // The maximum number of segments to allow to be buffered in advance
375 : // of the fastest reader. This is collection of segments is called
376 : // the "advance buffer".
377 : uint32_t mMaxAdvanceBufferSegmentCount;
378 :
379 : int32_t mWriteSegment;
380 : char* mWriteCursor;
381 : char* mWriteLimit;
382 :
383 : // |mStatus| is protected by |mReentrantMonitor|.
384 : nsresult mStatus;
385 : bool mInited;
386 : };
387 :
388 : //-----------------------------------------------------------------------------
389 :
390 : // RAII class representing an active read segment. When it goes out of scope
391 : // it automatically updates the read cursor and releases the read segment.
392 : class MOZ_STACK_CLASS AutoReadSegment final
393 : {
394 : public:
395 131 : AutoReadSegment(nsPipe* aPipe, nsPipeReadState& aReadState,
396 : uint32_t aMaxLength)
397 131 : : mPipe(aPipe)
398 : , mReadState(aReadState)
399 : , mStatus(NS_ERROR_FAILURE)
400 : , mSegment(nullptr)
401 : , mLength(0)
402 131 : , mOffset(0)
403 : {
404 131 : MOZ_ASSERT(mPipe);
405 131 : MOZ_ASSERT(!mReadState.mActiveRead);
406 131 : mStatus = mPipe->GetReadSegment(mReadState, mSegment, mLength);
407 131 : if (NS_SUCCEEDED(mStatus)) {
408 131 : MOZ_ASSERT(mReadState.mActiveRead);
409 131 : MOZ_ASSERT(mSegment);
410 131 : mLength = std::min(mLength, aMaxLength);
411 131 : MOZ_ASSERT(mLength);
412 : }
413 131 : }
414 :
415 131 : ~AutoReadSegment()
416 131 : {
417 131 : if (NS_SUCCEEDED(mStatus)) {
418 131 : if (mOffset) {
419 88 : mPipe->AdvanceReadCursor(mReadState, mOffset);
420 : } else {
421 86 : nsPipeEvents events;
422 43 : mPipe->ReleaseReadSegment(mReadState, events);
423 : }
424 : }
425 131 : MOZ_ASSERT(!mReadState.mActiveRead);
426 131 : }
427 :
428 131 : nsresult Status() const
429 : {
430 131 : return mStatus;
431 : }
432 :
433 131 : const char* Data() const
434 : {
435 131 : MOZ_ASSERT(NS_SUCCEEDED(mStatus));
436 131 : MOZ_ASSERT(mSegment);
437 131 : return mSegment + mOffset;
438 : }
439 :
440 438 : uint32_t Length() const
441 : {
442 438 : MOZ_ASSERT(NS_SUCCEEDED(mStatus));
443 438 : MOZ_ASSERT(mLength >= mOffset);
444 438 : return mLength - mOffset;
445 : }
446 :
447 : void
448 88 : Advance(uint32_t aCount)
449 : {
450 88 : MOZ_ASSERT(NS_SUCCEEDED(mStatus));
451 88 : MOZ_ASSERT(aCount <= (mLength - mOffset));
452 88 : mOffset += aCount;
453 88 : }
454 :
455 : nsPipeReadState&
456 : ReadState() const
457 : {
458 : return mReadState;
459 : }
460 :
461 : private:
462 : // guaranteed to remain alive due to limited stack lifetime of AutoReadSegment
463 : nsPipe* mPipe;
464 : nsPipeReadState& mReadState;
465 : nsresult mStatus;
466 : const char* mSegment;
467 : uint32_t mLength;
468 : uint32_t mOffset;
469 : };
470 :
471 : //
472 : // NOTES on buffer architecture:
473 : //
474 : // +-----------------+ - - mBuffer.GetSegment(0)
475 : // | |
476 : // + - - - - - - - - + - - nsPipeReadState.mReadCursor
477 : // |/////////////////|
478 : // |/////////////////|
479 : // |/////////////////|
480 : // |/////////////////|
481 : // +-----------------+ - - nsPipeReadState.mReadLimit
482 : // |
483 : // +-----------------+
484 : // |/////////////////|
485 : // |/////////////////|
486 : // |/////////////////|
487 : // |/////////////////|
488 : // |/////////////////|
489 : // |/////////////////|
490 : // +-----------------+
491 : // |
492 : // +-----------------+ - - mBuffer.GetSegment(mWriteSegment)
493 : // |/////////////////|
494 : // |/////////////////|
495 : // |/////////////////|
496 : // + - - - - - - - - + - - mWriteCursor
497 : // | |
498 : // | |
499 : // +-----------------+ - - mWriteLimit
500 : //
501 : // (shaded region contains data)
502 : //
503 : // NOTE: Each input stream produced by the nsPipe contains its own, separate
504 : // nsPipeReadState. This means there are multiple mReadCursor and
505 : // mReadLimit values in play. The pipe cannot discard old data until
506 : // all mReadCursors have moved beyond that point in the stream.
507 : //
508 : // Likewise, each input stream reader will have it's own amount of
509 : // buffered data. The pipe size threshold, however, is only applied
510 : // to the input stream that is being read fastest. We call this
511 : // the "advance buffer" in that its in advance of all readers. We
512 : // allow slower input streams to buffer more data so that we don't
513 : // stall processing of the faster input stream.
514 : //
515 : // NOTE: on some systems (notably OS/2), the heap allocator uses an arena for
516 : // small allocations (e.g., 64 byte allocations). this means that buffers may
517 : // be allocated back-to-back. in the diagram above, for example, mReadLimit
518 : // would actually be pointing at the beginning of the next segment. when
519 : // making changes to this file, please keep this fact in mind.
520 : //
521 :
522 : //-----------------------------------------------------------------------------
523 : // nsPipe methods:
524 : //-----------------------------------------------------------------------------
525 :
526 76 : nsPipe::nsPipe()
527 : : mOutput(this)
528 76 : , mOriginalInput(new nsPipeInputStream(this))
529 : , mReentrantMonitor("nsPipe.mReentrantMonitor")
530 : , mMaxAdvanceBufferSegmentCount(0)
531 : , mWriteSegment(-1)
532 : , mWriteCursor(nullptr)
533 : , mWriteLimit(nullptr)
534 : , mStatus(NS_OK)
535 152 : , mInited(false)
536 : {
537 76 : mInputList.AppendElement(mOriginalInput);
538 76 : }
539 :
540 75 : nsPipe::~nsPipe()
541 : {
542 75 : }
543 :
544 426 : NS_IMPL_ADDREF(nsPipe)
545 0 : NS_IMPL_QUERY_INTERFACE(nsPipe, nsIPipe)
546 :
547 : NS_IMETHODIMP_(MozExternalRefCountType)
548 425 : nsPipe::Release()
549 : {
550 425 : MOZ_ASSERT(int32_t(mRefCnt) > 0, "dup release");
551 425 : nsrefcnt count = --mRefCnt;
552 425 : NS_LOG_RELEASE(this, count, "nsPipe");
553 425 : if (count == 0) {
554 75 : delete (this);
555 75 : return 0;
556 : }
557 : // Avoid racing on |mOriginalInput| by only looking at it when
558 : // the refcount is 1, that is, we are the only pointer (hence only
559 : // thread) to access it.
560 350 : if (count == 1 && mOriginalInput) {
561 76 : mOriginalInput = nullptr;
562 76 : return 1;
563 : }
564 274 : return count;
565 : }
566 :
567 : NS_IMETHODIMP
568 76 : nsPipe::Init(bool aNonBlockingIn,
569 : bool aNonBlockingOut,
570 : uint32_t aSegmentSize,
571 : uint32_t aSegmentCount)
572 : {
573 76 : mInited = true;
574 :
575 76 : if (aSegmentSize == 0) {
576 0 : aSegmentSize = DEFAULT_SEGMENT_SIZE;
577 : }
578 76 : if (aSegmentCount == 0) {
579 0 : aSegmentCount = DEFAULT_SEGMENT_COUNT;
580 : }
581 :
582 : // protect against overflow
583 76 : uint32_t maxCount = uint32_t(-1) / aSegmentSize;
584 76 : if (aSegmentCount > maxCount) {
585 7 : aSegmentCount = maxCount;
586 : }
587 :
588 : // The internal buffer is always "infinite" so that we can allow
589 : // the size to expand when cloned streams are read at different
590 : // rates. We enforce a limit on how much data can be buffered
591 : // ahead of the fastest reader in GetWriteSegment().
592 76 : nsresult rv = mBuffer.Init(aSegmentSize, UINT32_MAX);
593 76 : if (NS_FAILED(rv)) {
594 0 : return rv;
595 : }
596 :
597 76 : mMaxAdvanceBufferSegmentCount = aSegmentCount;
598 :
599 76 : mOutput.SetNonBlocking(aNonBlockingOut);
600 76 : mOriginalInput->SetNonBlocking(aNonBlockingIn);
601 :
602 76 : return NS_OK;
603 : }
604 :
605 : NS_IMETHODIMP
606 76 : nsPipe::GetInputStream(nsIAsyncInputStream** aInputStream)
607 : {
608 76 : if (NS_WARN_IF(!mInited)) {
609 0 : return NS_ERROR_NOT_INITIALIZED;
610 : }
611 152 : RefPtr<nsPipeInputStream> ref = mOriginalInput;
612 76 : ref.forget(aInputStream);
613 76 : return NS_OK;
614 : }
615 :
616 : NS_IMETHODIMP
617 76 : nsPipe::GetOutputStream(nsIAsyncOutputStream** aOutputStream)
618 : {
619 76 : if (NS_WARN_IF(!mInited)) {
620 0 : return NS_ERROR_NOT_INITIALIZED;
621 : }
622 76 : NS_ADDREF(*aOutputStream = &mOutput);
623 76 : return NS_OK;
624 : }
625 :
626 : void
627 0 : nsPipe::PeekSegment(const nsPipeReadState& aReadState, uint32_t aIndex,
628 : char*& aCursor, char*& aLimit)
629 : {
630 0 : if (aIndex == 0) {
631 0 : NS_ASSERTION(!aReadState.mReadCursor || mBuffer.GetSegmentCount(),
632 : "unexpected state");
633 0 : aCursor = aReadState.mReadCursor;
634 0 : aLimit = aReadState.mReadLimit;
635 : } else {
636 0 : uint32_t absoluteIndex = aReadState.mSegment + aIndex;
637 0 : uint32_t numSegments = mBuffer.GetSegmentCount();
638 0 : if (absoluteIndex >= numSegments) {
639 0 : aCursor = aLimit = nullptr;
640 : } else {
641 0 : aCursor = mBuffer.GetSegment(absoluteIndex);
642 0 : if (mWriteSegment == (int32_t)absoluteIndex) {
643 0 : aLimit = mWriteCursor;
644 : } else {
645 0 : aLimit = aCursor + mBuffer.GetSegmentSize();
646 : }
647 : }
648 : }
649 0 : }
650 :
651 : nsresult
652 131 : nsPipe::GetReadSegment(nsPipeReadState& aReadState, const char*& aSegment,
653 : uint32_t& aLength)
654 : {
655 262 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
656 :
657 131 : if (aReadState.mReadCursor == aReadState.mReadLimit) {
658 0 : return NS_FAILED(mStatus) ? mStatus : NS_BASE_STREAM_WOULD_BLOCK;
659 : }
660 :
661 : // The input stream locks the pipe while getting the buffer to read from,
662 : // but then unlocks while actual data copying is taking place. In
663 : // order to avoid deleting the buffer out from under this lockless read
664 : // set a flag to indicate a read is active. This flag is only modified
665 : // while the lock is held.
666 131 : MOZ_ASSERT(!aReadState.mActiveRead);
667 131 : aReadState.mActiveRead = true;
668 :
669 131 : aSegment = aReadState.mReadCursor;
670 131 : aLength = aReadState.mReadLimit - aReadState.mReadCursor;
671 :
672 131 : return NS_OK;
673 : }
674 :
675 : void
676 131 : nsPipe::ReleaseReadSegment(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
677 : {
678 262 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
679 :
680 131 : MOZ_ASSERT(aReadState.mActiveRead);
681 131 : aReadState.mActiveRead = false;
682 :
683 : // When a read completes and releases the mActiveRead flag, we may have blocked
684 : // a drain from completing. This occurs when the input stream is closed during
685 : // the read. In these cases, we need to complete the drain as soon as the
686 : // active read completes.
687 131 : if (aReadState.mNeedDrain) {
688 0 : aReadState.mNeedDrain = false;
689 0 : DrainInputStream(aReadState, aEvents);
690 : }
691 131 : }
692 :
693 : void
694 88 : nsPipe::AdvanceReadCursor(nsPipeReadState& aReadState, uint32_t aBytesRead)
695 : {
696 88 : NS_ASSERTION(aBytesRead, "don't call if no bytes read");
697 :
698 176 : nsPipeEvents events;
699 : {
700 176 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
701 :
702 88 : LOG(("III advancing read cursor by %u\n", aBytesRead));
703 88 : NS_ASSERTION(aBytesRead <= mBuffer.GetSegmentSize(), "read too much");
704 :
705 88 : aReadState.mReadCursor += aBytesRead;
706 88 : NS_ASSERTION(aReadState.mReadCursor <= aReadState.mReadLimit,
707 : "read cursor exceeds limit");
708 :
709 88 : MOZ_ASSERT(aReadState.mAvailable >= aBytesRead);
710 88 : aReadState.mAvailable -= aBytesRead;
711 :
712 : // Check to see if we're at the end of the available read data. If we
713 : // are, and this segment is not still being written, then we can possibly
714 : // free up the segment.
715 169 : if (aReadState.mReadCursor == aReadState.mReadLimit &&
716 81 : !ReadSegmentBeingWritten(aReadState)) {
717 :
718 : // Advance the segment position. If we have read any segments from the
719 : // advance buffer then we can potentially notify blocked writers.
720 10 : if (AdvanceReadSegment(aReadState, mon) == SegmentAdvanceBufferRead &&
721 0 : mOutput.OnOutputWritable(events) == NotifyMonitor) {
722 0 : mon.NotifyAll();
723 : }
724 : }
725 :
726 88 : ReleaseReadSegment(aReadState, events);
727 : }
728 88 : }
729 :
730 : SegmentChangeResult
731 10 : nsPipe::AdvanceReadSegment(nsPipeReadState& aReadState,
732 : const ReentrantMonitorAutoEnter &ev)
733 : {
734 : // Calculate how many segments are buffered for this stream to start.
735 10 : uint32_t startBufferSegments = GetBufferSegmentCount(aReadState, ev);
736 :
737 10 : int32_t currentSegment = aReadState.mSegment;
738 :
739 : // Move to the next segment to read
740 10 : aReadState.mSegment += 1;
741 :
742 : // If this was the last reference to the first segment, then remove it.
743 10 : if (currentSegment == 0 && CountSegmentReferences(currentSegment) == 0) {
744 :
745 : // shift write and read segment index (-1 indicates an empty buffer).
746 10 : mWriteSegment -= 1;
747 :
748 : // Directly modify the current read state. If the associated input
749 : // stream is closed simultaneous with reading, then it may not be
750 : // in the mInputList any more.
751 10 : aReadState.mSegment -= 1;
752 :
753 20 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
754 : // Skip the current read state structure since we modify it manually
755 : // before entering this loop.
756 10 : if (&mInputList[i]->ReadState() == &aReadState) {
757 10 : continue;
758 : }
759 0 : mInputList[i]->ReadState().mSegment -= 1;
760 : }
761 :
762 : // done with this segment
763 10 : mBuffer.DeleteFirstSegment();
764 10 : LOG(("III deleting first segment\n"));
765 : }
766 :
767 10 : if (mWriteSegment < aReadState.mSegment) {
768 : // read cursor has hit the end of written data, so reset it
769 0 : MOZ_ASSERT(mWriteSegment == (aReadState.mSegment - 1));
770 0 : aReadState.mReadCursor = nullptr;
771 0 : aReadState.mReadLimit = nullptr;
772 : // also, the buffer is completely empty, so reset the write cursor
773 0 : if (mWriteSegment == -1) {
774 0 : mWriteCursor = nullptr;
775 0 : mWriteLimit = nullptr;
776 : }
777 : } else {
778 : // advance read cursor and limit to next buffer segment
779 10 : aReadState.mReadCursor = mBuffer.GetSegment(aReadState.mSegment);
780 10 : if (mWriteSegment == aReadState.mSegment) {
781 6 : aReadState.mReadLimit = mWriteCursor;
782 : } else {
783 4 : aReadState.mReadLimit = aReadState.mReadCursor + mBuffer.GetSegmentSize();
784 : }
785 : }
786 :
787 : // Calculate how many segments are buffered for the stream after
788 : // reading.
789 10 : uint32_t endBufferSegments = GetBufferSegmentCount(aReadState, ev);
790 :
791 : // If the stream has read a segment out of the set of advanced buffer
792 : // segments, then the writer may advance.
793 10 : if (startBufferSegments >= mMaxAdvanceBufferSegmentCount &&
794 0 : endBufferSegments < mMaxAdvanceBufferSegmentCount) {
795 0 : return SegmentAdvanceBufferRead;
796 : }
797 :
798 : // Otherwise there are no significant changes to the segment structure.
799 10 : return SegmentNotChanged;
800 : }
801 :
802 : void
803 3 : nsPipe::DrainInputStream(nsPipeReadState& aReadState, nsPipeEvents& aEvents)
804 : {
805 6 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
806 :
807 : // If a segment is actively being read in ReadSegments() for this input
808 : // stream, then we cannot drain the stream. This can happen because
809 : // ReadSegments() does not hold the lock while copying from the buffer.
810 : // If we detect this condition, simply note that we need a drain once
811 : // the read completes and return immediately.
812 3 : if (aReadState.mActiveRead) {
813 0 : MOZ_ASSERT(!aReadState.mNeedDrain);
814 0 : aReadState.mNeedDrain = true;
815 0 : return;
816 : }
817 :
818 3 : aReadState.mAvailable = 0;
819 :
820 3 : while(mWriteSegment >= aReadState.mSegment) {
821 :
822 : // If the last segment to free is still being written to, we're done
823 : // draining. We can't free any more.
824 3 : if (ReadSegmentBeingWritten(aReadState)) {
825 3 : break;
826 : }
827 :
828 : // Don't bother checking if this results in an advance buffer segment
829 : // read. Since we are draining the entire stream we will read an
830 : // advance buffer segment no matter what.
831 0 : AdvanceReadSegment(aReadState, mon);
832 : }
833 :
834 : // If we have read any segments from the advance buffer then we can
835 : // potentially notify blocked writers.
836 6 : if (!IsAdvanceBufferFull(mon) &&
837 3 : mOutput.OnOutputWritable(aEvents) == NotifyMonitor) {
838 0 : mon.NotifyAll();
839 : }
840 : }
841 :
842 : bool
843 84 : nsPipe::ReadSegmentBeingWritten(nsPipeReadState& aReadState)
844 : {
845 84 : mReentrantMonitor.AssertCurrentThreadIn();
846 158 : bool beingWritten = mWriteSegment == aReadState.mSegment &&
847 158 : mWriteLimit > mWriteCursor;
848 84 : NS_ASSERTION(!beingWritten || aReadState.mReadLimit == mWriteCursor,
849 : "unexpected state");
850 84 : return beingWritten;
851 : }
852 :
853 : nsresult
854 100 : nsPipe::GetWriteSegment(char*& aSegment, uint32_t& aSegmentLen)
855 : {
856 200 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
857 :
858 100 : if (NS_FAILED(mStatus)) {
859 0 : return mStatus;
860 : }
861 :
862 : // write cursor and limit may both be null indicating an empty buffer.
863 100 : if (mWriteCursor == mWriteLimit) {
864 : // The pipe is full if we have hit our limit on advance data buffering.
865 : // This means the fastest reader is still reading slower than data is
866 : // being written into the pipe.
867 91 : if (IsAdvanceBufferFull(mon)) {
868 0 : return NS_BASE_STREAM_WOULD_BLOCK;
869 : }
870 :
871 : // The nsSegmentedBuffer is configured to be "infinite", so this
872 : // should never return nullptr here.
873 91 : char* seg = mBuffer.AppendNewSegment();
874 91 : if (!seg) {
875 0 : return NS_ERROR_OUT_OF_MEMORY;
876 : }
877 :
878 91 : LOG(("OOO appended new segment\n"));
879 91 : mWriteCursor = seg;
880 91 : mWriteLimit = mWriteCursor + mBuffer.GetSegmentSize();
881 91 : ++mWriteSegment;
882 : }
883 :
884 : // make sure read cursor is initialized
885 100 : SetAllNullReadCursors();
886 :
887 : // check to see if we can roll-back our read and write cursors to the
888 : // beginning of the current/first segment. this is purely an optimization.
889 100 : if (mWriteSegment == 0 && AllReadCursorsMatchWriteCursor()) {
890 85 : char* head = mBuffer.GetSegment(0);
891 85 : LOG(("OOO rolling back write cursor %" PRId64 " bytes\n",
892 : static_cast<int64_t>(mWriteCursor - head)));
893 85 : RollBackAllReadCursors(head);
894 85 : mWriteCursor = head;
895 : }
896 :
897 100 : aSegment = mWriteCursor;
898 100 : aSegmentLen = mWriteLimit - mWriteCursor;
899 100 : return NS_OK;
900 : }
901 :
902 : void
903 88 : nsPipe::AdvanceWriteCursor(uint32_t aBytesWritten)
904 : {
905 88 : NS_ASSERTION(aBytesWritten, "don't call if no bytes written");
906 :
907 176 : nsPipeEvents events;
908 : {
909 176 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
910 :
911 88 : LOG(("OOO advancing write cursor by %u\n", aBytesWritten));
912 :
913 88 : char* newWriteCursor = mWriteCursor + aBytesWritten;
914 88 : NS_ASSERTION(newWriteCursor <= mWriteLimit, "write cursor exceeds limit");
915 :
916 : // update read limit if reading in the same segment
917 88 : UpdateAllReadCursors(newWriteCursor);
918 :
919 88 : mWriteCursor = newWriteCursor;
920 :
921 88 : ValidateAllReadCursors();
922 :
923 : // update the writable flag on the output stream
924 88 : if (mWriteCursor == mWriteLimit) {
925 15 : mOutput.SetWritable(!IsAdvanceBufferFull(mon));
926 : }
927 :
928 : // notify input stream that pipe now contains additional data
929 88 : bool needNotify = false;
930 176 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
931 88 : if (mInputList[i]->OnInputReadable(aBytesWritten, events, mon)
932 : == NotifyMonitor) {
933 0 : needNotify = true;
934 : }
935 : }
936 :
937 88 : if (needNotify) {
938 0 : mon.NotifyAll();
939 : }
940 : }
941 88 : }
942 :
943 : void
944 136 : nsPipe::OnInputStreamException(nsPipeInputStream* aStream, nsresult aReason)
945 : {
946 136 : MOZ_ASSERT(NS_FAILED(aReason));
947 :
948 136 : nsPipeEvents events;
949 : {
950 136 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
951 :
952 : // Its possible to re-enter this method when we call OnPipeException() or
953 : // OnInputExection() below. If there is a caller stuck in our synchronous
954 : // Wait() method, then they will get woken up with a failure code which
955 : // re-enters this method. Therefore, gracefully handle unknown streams
956 : // here.
957 :
958 : // If we only have one stream open and it is the given stream, then shut
959 : // down the entire pipe.
960 136 : if (mInputList.Length() == 1) {
961 136 : if (mInputList[0] == aStream) {
962 136 : OnPipeException(aReason);
963 : }
964 136 : return;
965 : }
966 :
967 : // Otherwise just close the particular stream that hit an exception.
968 0 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
969 0 : if (mInputList[i] != aStream) {
970 0 : continue;
971 : }
972 :
973 0 : MonitorAction action = mInputList[i]->OnInputException(aReason, events,
974 0 : mon);
975 0 : mInputList.RemoveElementAt(i);
976 :
977 : // Notify after element is removed in case we re-enter as a result.
978 0 : if (action == NotifyMonitor) {
979 0 : mon.NotifyAll();
980 : }
981 :
982 0 : return;
983 : }
984 : }
985 : }
986 :
987 : void
988 282 : nsPipe::OnPipeException(nsresult aReason, bool aOutputOnly)
989 : {
990 282 : LOG(("PPP nsPipe::OnPipeException [reason=%" PRIx32 " output-only=%d]\n",
991 : static_cast<uint32_t>(aReason), aOutputOnly));
992 :
993 358 : nsPipeEvents events;
994 : {
995 358 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
996 :
997 : // if we've already hit an exception, then ignore this one.
998 282 : if (NS_FAILED(mStatus)) {
999 206 : return;
1000 : }
1001 :
1002 76 : mStatus = aReason;
1003 :
1004 76 : bool needNotify = false;
1005 :
1006 152 : nsTArray<nsPipeInputStream*> tmpInputList;
1007 152 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1008 : // an output-only exception applies to the input end if the pipe has
1009 : // zero bytes available.
1010 76 : if (aOutputOnly && mInputList[i]->Available()) {
1011 73 : tmpInputList.AppendElement(mInputList[i]);
1012 73 : continue;
1013 : }
1014 :
1015 3 : if (mInputList[i]->OnInputException(aReason, events, mon)
1016 : == NotifyMonitor) {
1017 0 : needNotify = true;
1018 : }
1019 : }
1020 76 : mInputList = tmpInputList;
1021 :
1022 76 : if (mOutput.OnOutputException(aReason, events) == NotifyMonitor) {
1023 0 : needNotify = true;
1024 : }
1025 :
1026 : // Notify after we have removed any input streams from mInputList
1027 76 : if (needNotify) {
1028 0 : mon.NotifyAll();
1029 : }
1030 : }
1031 : }
1032 :
1033 : nsresult
1034 0 : nsPipe::CloneInputStream(nsPipeInputStream* aOriginal,
1035 : nsIInputStream** aCloneOut)
1036 : {
1037 0 : ReentrantMonitorAutoEnter mon(mReentrantMonitor);
1038 0 : RefPtr<nsPipeInputStream> ref = new nsPipeInputStream(*aOriginal);
1039 0 : mInputList.AppendElement(ref);
1040 0 : nsCOMPtr<nsIAsyncInputStream> downcast = ref.forget();
1041 0 : downcast.forget(aCloneOut);
1042 0 : return NS_OK;
1043 : }
1044 :
1045 : uint32_t
1046 10 : nsPipe::CountSegmentReferences(int32_t aSegment)
1047 : {
1048 10 : mReentrantMonitor.AssertCurrentThreadIn();
1049 10 : uint32_t count = 0;
1050 20 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1051 10 : if (aSegment >= mInputList[i]->ReadState().mSegment) {
1052 0 : count += 1;
1053 : }
1054 : }
1055 10 : return count;
1056 : }
1057 :
1058 : void
1059 100 : nsPipe::SetAllNullReadCursors()
1060 : {
1061 100 : mReentrantMonitor.AssertCurrentThreadIn();
1062 200 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1063 100 : nsPipeReadState& readState = mInputList[i]->ReadState();
1064 100 : if (!readState.mReadCursor) {
1065 76 : NS_ASSERTION(mWriteSegment == readState.mSegment,
1066 : "unexpected null read cursor");
1067 76 : readState.mReadCursor = readState.mReadLimit = mWriteCursor;
1068 : }
1069 : }
1070 100 : }
1071 :
1072 : bool
1073 85 : nsPipe::AllReadCursorsMatchWriteCursor()
1074 : {
1075 85 : mReentrantMonitor.AssertCurrentThreadIn();
1076 170 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1077 85 : const nsPipeReadState& readState = mInputList[i]->ReadState();
1078 170 : if (readState.mSegment != mWriteSegment ||
1079 85 : readState.mReadCursor != mWriteCursor) {
1080 0 : return false;
1081 : }
1082 : }
1083 85 : return true;
1084 : }
1085 :
1086 : void
1087 85 : nsPipe::RollBackAllReadCursors(char* aWriteCursor)
1088 : {
1089 85 : mReentrantMonitor.AssertCurrentThreadIn();
1090 170 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1091 85 : nsPipeReadState& readState = mInputList[i]->ReadState();
1092 85 : MOZ_ASSERT(mWriteSegment == readState.mSegment);
1093 85 : MOZ_ASSERT(mWriteCursor == readState.mReadCursor);
1094 85 : MOZ_ASSERT(mWriteCursor == readState.mReadLimit);
1095 85 : readState.mReadCursor = aWriteCursor;
1096 85 : readState.mReadLimit = aWriteCursor;
1097 : }
1098 85 : }
1099 :
1100 : void
1101 88 : nsPipe::UpdateAllReadCursors(char* aWriteCursor)
1102 : {
1103 88 : mReentrantMonitor.AssertCurrentThreadIn();
1104 176 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1105 88 : nsPipeReadState& readState = mInputList[i]->ReadState();
1106 161 : if (mWriteSegment == readState.mSegment &&
1107 73 : readState.mReadLimit == mWriteCursor) {
1108 73 : readState.mReadLimit = aWriteCursor;
1109 : }
1110 : }
1111 88 : }
1112 :
1113 : void
1114 88 : nsPipe::ValidateAllReadCursors()
1115 : {
1116 88 : mReentrantMonitor.AssertCurrentThreadIn();
1117 : // The only way mReadCursor == mWriteCursor is if:
1118 : //
1119 : // - mReadCursor is at the start of a segment (which, based on how
1120 : // nsSegmentedBuffer works, means that this segment is the "first"
1121 : // segment)
1122 : // - mWriteCursor points at the location past the end of the current
1123 : // write segment (so the current write filled the current write
1124 : // segment, so we've incremented mWriteCursor to point past the end
1125 : // of it)
1126 : // - the segment to which data has just been written is located
1127 : // exactly one segment's worth of bytes before the first segment
1128 : // where mReadCursor is located
1129 : //
1130 : // Consequently, the byte immediately after the end of the current
1131 : // write segment is the first byte of the first segment, so
1132 : // mReadCursor == mWriteCursor. (Another way to think about this is
1133 : // to consider the buffer architecture diagram above, but consider it
1134 : // with an arena allocator which allocates from the *end* of the
1135 : // arena to the *beginning* of the arena.)
1136 : #ifdef DEBUG
1137 176 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1138 88 : const nsPipeReadState& state = mInputList[i]->ReadState();
1139 88 : NS_ASSERTION(state.mReadCursor != mWriteCursor ||
1140 : (mBuffer.GetSegment(state.mSegment) == state.mReadCursor &&
1141 : mWriteCursor == mWriteLimit),
1142 : "read cursor is bad");
1143 : }
1144 : #endif
1145 88 : }
1146 :
1147 : uint32_t
1148 20 : nsPipe::GetBufferSegmentCount(const nsPipeReadState& aReadState,
1149 : const ReentrantMonitorAutoEnter& ev) const
1150 : {
1151 : // The write segment can be smaller than the current reader position
1152 : // in some cases. For example, when the first write segment has not
1153 : // been allocated yet mWriteSegment is negative. In these cases
1154 : // the stream is effectively using zero segments.
1155 20 : if (mWriteSegment < aReadState.mSegment) {
1156 0 : return 0;
1157 : }
1158 :
1159 20 : MOZ_ASSERT(mWriteSegment >= 0);
1160 20 : MOZ_ASSERT(aReadState.mSegment >= 0);
1161 :
1162 : // Otherwise at least one segment is being used. We add one here
1163 : // since a single segment is being used when the write and read
1164 : // segment indices are the same.
1165 20 : return 1 + mWriteSegment - aReadState.mSegment;
1166 : }
1167 :
1168 : bool
1169 109 : nsPipe::IsAdvanceBufferFull(const ReentrantMonitorAutoEnter& ev) const
1170 : {
1171 : // If we have fewer total segments than the limit we can immediately
1172 : // determine we are not full. Note, we must add one to mWriteSegment
1173 : // to convert from a index to a count.
1174 109 : MOZ_DIAGNOSTIC_ASSERT(mWriteSegment >= -1);
1175 109 : MOZ_DIAGNOSTIC_ASSERT(mWriteSegment < INT32_MAX);
1176 109 : uint32_t totalWriteSegments = mWriteSegment + 1;
1177 109 : if (totalWriteSegments < mMaxAdvanceBufferSegmentCount) {
1178 109 : return false;
1179 : }
1180 :
1181 : // Otherwise we must inspect all of our reader streams. We need
1182 : // to determine the buffer depth of the fastest reader.
1183 0 : uint32_t minBufferSegments = UINT32_MAX;
1184 0 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1185 : // Only count buffer segments from input streams that are open.
1186 0 : if (NS_FAILED(mInputList[i]->Status(ev))) {
1187 0 : continue;
1188 : }
1189 0 : const nsPipeReadState& state = mInputList[i]->ReadState();
1190 0 : uint32_t bufferSegments = GetBufferSegmentCount(state, ev);
1191 0 : minBufferSegments = std::min(minBufferSegments, bufferSegments);
1192 : // We only care if any reader has fewer segments buffered than
1193 : // our threshold. We can stop once we hit that threshold.
1194 0 : if (minBufferSegments < mMaxAdvanceBufferSegmentCount) {
1195 0 : return false;
1196 : }
1197 : }
1198 :
1199 : // Note, its possible for minBufferSegments to exceed our
1200 : // mMaxAdvanceBufferSegmentCount here. This happens when a cloned
1201 : // reader gets far behind, but then the fastest reader stream is
1202 : // closed. This leaves us with a single stream that is buffered
1203 : // beyond our max. Naturally we continue to indicate the pipe
1204 : // is full at this point.
1205 :
1206 0 : return true;
1207 : }
1208 :
1209 : //-----------------------------------------------------------------------------
1210 : // nsPipeEvents methods:
1211 : //-----------------------------------------------------------------------------
1212 :
1213 1426 : nsPipeEvents::~nsPipeEvents()
1214 : {
1215 : // dispatch any pending events
1216 :
1217 789 : for (uint32_t i = 0; i < mInputList.Length(); ++i) {
1218 76 : mInputList[i].mCallback->OnInputStreamReady(mInputList[i].mStream);
1219 : }
1220 713 : mInputList.Clear();
1221 :
1222 713 : if (mOutputCallback) {
1223 0 : mOutputCallback->OnOutputStreamReady(mOutputStream);
1224 0 : mOutputCallback = nullptr;
1225 0 : mOutputStream = nullptr;
1226 : }
1227 713 : }
1228 :
1229 : //-----------------------------------------------------------------------------
1230 : // nsPipeInputStream methods:
1231 : //-----------------------------------------------------------------------------
1232 :
1233 1133 : NS_IMPL_ADDREF(nsPipeInputStream);
1234 1132 : NS_IMPL_RELEASE(nsPipeInputStream);
1235 :
1236 683 : NS_INTERFACE_TABLE_HEAD(nsPipeInputStream)
1237 : NS_INTERFACE_TABLE_BEGIN
1238 : NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIAsyncInputStream)
1239 : NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISeekableStream)
1240 : NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsISearchableInputStream)
1241 : NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsICloneableInputStream)
1242 : NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIBufferedInputStream)
1243 : NS_INTERFACE_TABLE_ENTRY(nsPipeInputStream, nsIClassInfo)
1244 : NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsIInputStream,
1245 : nsIAsyncInputStream)
1246 : NS_INTERFACE_TABLE_ENTRY_AMBIGUOUS(nsPipeInputStream, nsISupports,
1247 : nsIAsyncInputStream)
1248 683 : NS_INTERFACE_TABLE_END
1249 683 : NS_INTERFACE_TABLE_TAIL
1250 :
1251 0 : NS_IMPL_CI_INTERFACE_GETTER(nsPipeInputStream,
1252 : nsIInputStream,
1253 : nsIAsyncInputStream,
1254 : nsISeekableStream,
1255 : nsISearchableInputStream,
1256 : nsICloneableInputStream,
1257 : nsIBufferedInputStream)
1258 :
1259 0 : NS_IMPL_THREADSAFE_CI(nsPipeInputStream)
1260 :
1261 : NS_IMETHODIMP
1262 0 : nsPipeInputStream::Init(nsIInputStream*, uint32_t)
1263 : {
1264 0 : MOZ_CRASH("nsPipeInputStream should never be initialized with "
1265 : "nsIBufferedInputStream::Init!\n");
1266 : }
1267 :
1268 : NS_IMETHODIMP
1269 0 : nsPipeInputStream::GetData(nsIInputStream **aResult)
1270 : {
1271 : // as this was not created with init() we are not
1272 : // wrapping anything
1273 0 : return NS_ERROR_NOT_IMPLEMENTED;
1274 : }
1275 :
1276 : uint32_t
1277 76 : nsPipeInputStream::Available()
1278 : {
1279 76 : mPipe->mReentrantMonitor.AssertCurrentThreadIn();
1280 76 : return mReadState.mAvailable;
1281 : }
1282 :
1283 : nsresult
1284 0 : nsPipeInputStream::Wait()
1285 : {
1286 0 : NS_ASSERTION(mBlocking, "wait on non-blocking pipe input stream");
1287 :
1288 0 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1289 :
1290 0 : while (NS_SUCCEEDED(Status(mon)) && (mReadState.mAvailable == 0)) {
1291 0 : LOG(("III pipe input: waiting for data\n"));
1292 :
1293 0 : mBlocked = true;
1294 0 : mon.Wait();
1295 0 : mBlocked = false;
1296 :
1297 0 : LOG(("III pipe input: woke up [status=%" PRIx32 " available=%u]\n",
1298 : static_cast<uint32_t>(Status(mon)), mReadState.mAvailable));
1299 : }
1300 :
1301 0 : return Status(mon) == NS_BASE_STREAM_CLOSED ? NS_OK : Status(mon);
1302 : }
1303 :
1304 : MonitorAction
1305 88 : nsPipeInputStream::OnInputReadable(uint32_t aBytesWritten,
1306 : nsPipeEvents& aEvents,
1307 : const ReentrantMonitorAutoEnter& ev)
1308 : {
1309 88 : MonitorAction result = DoNotNotifyMonitor;
1310 :
1311 88 : mPipe->mReentrantMonitor.AssertCurrentThreadIn();
1312 88 : mReadState.mAvailable += aBytesWritten;
1313 :
1314 88 : if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1315 66 : aEvents.NotifyInputReady(this, mCallback);
1316 66 : mCallback = nullptr;
1317 66 : mCallbackFlags = 0;
1318 22 : } else if (mBlocked) {
1319 0 : result = NotifyMonitor;
1320 : }
1321 :
1322 88 : return result;
1323 : }
1324 :
1325 : MonitorAction
1326 3 : nsPipeInputStream::OnInputException(nsresult aReason, nsPipeEvents& aEvents,
1327 : const ReentrantMonitorAutoEnter& ev)
1328 : {
1329 3 : LOG(("nsPipeInputStream::OnInputException [this=%p reason=%" PRIx32 "]\n",
1330 : this, static_cast<uint32_t>(aReason)));
1331 :
1332 3 : MonitorAction result = DoNotNotifyMonitor;
1333 :
1334 3 : NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
1335 :
1336 3 : if (NS_SUCCEEDED(mInputStatus)) {
1337 3 : mInputStatus = aReason;
1338 : }
1339 :
1340 : // force count of available bytes to zero.
1341 3 : mPipe->DrainInputStream(mReadState, aEvents);
1342 :
1343 3 : if (mCallback) {
1344 3 : aEvents.NotifyInputReady(this, mCallback);
1345 3 : mCallback = nullptr;
1346 3 : mCallbackFlags = 0;
1347 0 : } else if (mBlocked) {
1348 0 : result = NotifyMonitor;
1349 : }
1350 :
1351 3 : return result;
1352 : }
1353 :
1354 : NS_IMETHODIMP
1355 144 : nsPipeInputStream::CloseWithStatus(nsresult aReason)
1356 : {
1357 144 : LOG(("III CloseWithStatus [this=%p reason=%" PRIx32 "]\n",
1358 : this, static_cast<uint32_t>(aReason)));
1359 :
1360 288 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1361 :
1362 144 : if (NS_FAILED(mInputStatus)) {
1363 8 : return NS_OK;
1364 : }
1365 :
1366 136 : if (NS_SUCCEEDED(aReason)) {
1367 0 : aReason = NS_BASE_STREAM_CLOSED;
1368 : }
1369 :
1370 136 : mPipe->OnInputStreamException(this, aReason);
1371 136 : return NS_OK;
1372 : }
1373 :
1374 : NS_IMETHODIMP
1375 138 : nsPipeInputStream::Close()
1376 : {
1377 138 : return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1378 : }
1379 :
1380 : NS_IMETHODIMP
1381 212 : nsPipeInputStream::Available(uint64_t* aResult)
1382 : {
1383 : // nsPipeInputStream supports under 4GB stream only
1384 424 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1385 :
1386 : // return error if closed
1387 212 : if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
1388 70 : return Status(mon);
1389 : }
1390 :
1391 142 : *aResult = (uint64_t)mReadState.mAvailable;
1392 142 : return NS_OK;
1393 : }
1394 :
1395 : NS_IMETHODIMP
1396 121 : nsPipeInputStream::ReadSegments(nsWriteSegmentFun aWriter,
1397 : void* aClosure,
1398 : uint32_t aCount,
1399 : uint32_t* aReadCount)
1400 : {
1401 121 : LOG(("III ReadSegments [this=%p count=%u]\n", this, aCount));
1402 :
1403 121 : nsresult rv = NS_OK;
1404 :
1405 121 : *aReadCount = 0;
1406 383 : while (aCount) {
1407 262 : AutoReadSegment segment(mPipe, mReadState, aCount);
1408 131 : rv = segment.Status();
1409 131 : if (NS_FAILED(rv)) {
1410 : // ignore this error if we've already read something.
1411 0 : if (*aReadCount > 0) {
1412 0 : rv = NS_OK;
1413 0 : break;
1414 : }
1415 0 : if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1416 : // pipe is empty
1417 0 : if (!mBlocking) {
1418 0 : break;
1419 : }
1420 : // wait for some data to be written to the pipe
1421 0 : rv = Wait();
1422 0 : if (NS_SUCCEEDED(rv)) {
1423 0 : continue;
1424 : }
1425 : }
1426 : // ignore this error, just return.
1427 0 : if (rv == NS_BASE_STREAM_CLOSED) {
1428 0 : rv = NS_OK;
1429 0 : break;
1430 : }
1431 0 : mPipe->OnInputStreamException(this, rv);
1432 0 : break;
1433 : }
1434 :
1435 : uint32_t writeCount;
1436 307 : while (segment.Length()) {
1437 131 : writeCount = 0;
1438 :
1439 131 : rv = aWriter(static_cast<nsIAsyncInputStream*>(this), aClosure,
1440 131 : segment.Data(), *aReadCount, segment.Length(), &writeCount);
1441 :
1442 131 : if (NS_FAILED(rv) || writeCount == 0) {
1443 43 : aCount = 0;
1444 : // any errors returned from the writer end here: do not
1445 : // propagate to the caller of ReadSegments.
1446 43 : rv = NS_OK;
1447 43 : break;
1448 : }
1449 :
1450 88 : NS_ASSERTION(writeCount <= segment.Length(), "wrote more than expected");
1451 88 : segment.Advance(writeCount);
1452 88 : aCount -= writeCount;
1453 88 : *aReadCount += writeCount;
1454 88 : mLogicalOffset += writeCount;
1455 : }
1456 : }
1457 :
1458 121 : return rv;
1459 : }
1460 :
1461 : NS_IMETHODIMP
1462 16 : nsPipeInputStream::Read(char* aToBuf, uint32_t aBufLen, uint32_t* aReadCount)
1463 : {
1464 16 : return ReadSegments(NS_CopySegmentToBuffer, aToBuf, aBufLen, aReadCount);
1465 : }
1466 :
1467 : NS_IMETHODIMP
1468 4 : nsPipeInputStream::IsNonBlocking(bool* aNonBlocking)
1469 : {
1470 4 : *aNonBlocking = !mBlocking;
1471 4 : return NS_OK;
1472 : }
1473 :
1474 : NS_IMETHODIMP
1475 76 : nsPipeInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
1476 : uint32_t aFlags,
1477 : uint32_t aRequestedCount,
1478 : nsIEventTarget* aTarget)
1479 : {
1480 76 : LOG(("III AsyncWait [this=%p]\n", this));
1481 :
1482 152 : nsPipeEvents pipeEvents;
1483 : {
1484 152 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1485 :
1486 : // replace a pending callback
1487 76 : mCallback = nullptr;
1488 76 : mCallbackFlags = 0;
1489 :
1490 76 : if (!aCallback) {
1491 0 : return NS_OK;
1492 : }
1493 :
1494 152 : nsCOMPtr<nsIInputStreamCallback> proxy;
1495 76 : if (aTarget) {
1496 152 : proxy = NS_NewInputStreamReadyEvent("nsPipeInputStream::AsyncWait",
1497 76 : aCallback, aTarget);
1498 76 : aCallback = proxy;
1499 : }
1500 :
1501 156 : if (NS_FAILED(Status(mon)) ||
1502 77 : (mReadState.mAvailable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1503 : // stream is already closed or readable; post event.
1504 7 : pipeEvents.NotifyInputReady(this, aCallback);
1505 : } else {
1506 : // queue up callback object to be notified when data becomes available
1507 69 : mCallback = aCallback;
1508 69 : mCallbackFlags = aFlags;
1509 : }
1510 : }
1511 76 : return NS_OK;
1512 : }
1513 :
1514 : NS_IMETHODIMP
1515 0 : nsPipeInputStream::Seek(int32_t aWhence, int64_t aOffset)
1516 : {
1517 0 : NS_NOTREACHED("nsPipeInputStream::Seek");
1518 0 : return NS_ERROR_NOT_IMPLEMENTED;
1519 : }
1520 :
1521 : NS_IMETHODIMP
1522 134 : nsPipeInputStream::Tell(int64_t* aOffset)
1523 : {
1524 268 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1525 :
1526 : // return error if closed
1527 134 : if (!mReadState.mAvailable && NS_FAILED(Status(mon))) {
1528 67 : return Status(mon);
1529 : }
1530 :
1531 67 : *aOffset = mLogicalOffset;
1532 67 : return NS_OK;
1533 : }
1534 :
1535 : NS_IMETHODIMP
1536 0 : nsPipeInputStream::SetEOF()
1537 : {
1538 0 : NS_NOTREACHED("nsPipeInputStream::SetEOF");
1539 0 : return NS_ERROR_NOT_IMPLEMENTED;
1540 : }
1541 :
1542 0 : static bool strings_equal(bool aIgnoreCase,
1543 : const char* aS1, const char* aS2, uint32_t aLen)
1544 : {
1545 : return aIgnoreCase
1546 0 : ? !nsCRT::strncasecmp(aS1, aS2, aLen) : !nsCRT::strncmp(aS1, aS2, aLen);
1547 : }
1548 :
1549 : NS_IMETHODIMP
1550 0 : nsPipeInputStream::Search(const char* aForString,
1551 : bool aIgnoreCase,
1552 : bool* aFound,
1553 : uint32_t* aOffsetSearchedTo)
1554 : {
1555 0 : LOG(("III Search [for=%s ic=%u]\n", aForString, aIgnoreCase));
1556 :
1557 0 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1558 :
1559 : char* cursor1;
1560 : char* limit1;
1561 0 : uint32_t index = 0, offset = 0;
1562 0 : uint32_t strLen = strlen(aForString);
1563 :
1564 0 : mPipe->PeekSegment(mReadState, 0, cursor1, limit1);
1565 0 : if (cursor1 == limit1) {
1566 0 : *aFound = false;
1567 0 : *aOffsetSearchedTo = 0;
1568 0 : LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1569 0 : return NS_OK;
1570 : }
1571 :
1572 : while (true) {
1573 0 : uint32_t i, len1 = limit1 - cursor1;
1574 :
1575 : // check if the string is in the buffer segment
1576 0 : for (i = 0; i < len1 - strLen + 1; i++) {
1577 0 : if (strings_equal(aIgnoreCase, &cursor1[i], aForString, strLen)) {
1578 0 : *aFound = true;
1579 0 : *aOffsetSearchedTo = offset + i;
1580 0 : LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1581 0 : return NS_OK;
1582 : }
1583 : }
1584 :
1585 : // get the next segment
1586 : char* cursor2;
1587 : char* limit2;
1588 : uint32_t len2;
1589 :
1590 0 : index++;
1591 0 : offset += len1;
1592 :
1593 0 : mPipe->PeekSegment(mReadState, index, cursor2, limit2);
1594 0 : if (cursor2 == limit2) {
1595 0 : *aFound = false;
1596 0 : *aOffsetSearchedTo = offset - strLen + 1;
1597 0 : LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1598 0 : return NS_OK;
1599 : }
1600 0 : len2 = limit2 - cursor2;
1601 :
1602 : // check if the string is straddling the next buffer segment
1603 0 : uint32_t lim = XPCOM_MIN(strLen, len2 + 1);
1604 0 : for (i = 0; i < lim; ++i) {
1605 0 : uint32_t strPart1Len = strLen - i - 1;
1606 0 : uint32_t strPart2Len = strLen - strPart1Len;
1607 0 : const char* strPart2 = &aForString[strLen - strPart2Len];
1608 0 : uint32_t bufSeg1Offset = len1 - strPart1Len;
1609 0 : if (strings_equal(aIgnoreCase, &cursor1[bufSeg1Offset], aForString, strPart1Len) &&
1610 0 : strings_equal(aIgnoreCase, cursor2, strPart2, strPart2Len)) {
1611 0 : *aFound = true;
1612 0 : *aOffsetSearchedTo = offset - strPart1Len;
1613 0 : LOG((" result [aFound=%u offset=%u]\n", *aFound, *aOffsetSearchedTo));
1614 0 : return NS_OK;
1615 : }
1616 : }
1617 :
1618 : // finally continue with the next buffer
1619 0 : cursor1 = cursor2;
1620 0 : limit1 = limit2;
1621 0 : }
1622 :
1623 : NS_NOTREACHED("can't get here");
1624 : return NS_ERROR_UNEXPECTED; // keep compiler happy
1625 : }
1626 :
1627 : NS_IMETHODIMP
1628 0 : nsPipeInputStream::GetCloneable(bool* aCloneableOut)
1629 : {
1630 0 : *aCloneableOut = true;
1631 0 : return NS_OK;
1632 : }
1633 :
1634 : NS_IMETHODIMP
1635 0 : nsPipeInputStream::Clone(nsIInputStream** aCloneOut)
1636 : {
1637 0 : return mPipe->CloneInputStream(this, aCloneOut);
1638 : }
1639 :
1640 : nsresult
1641 350 : nsPipeInputStream::Status(const ReentrantMonitorAutoEnter& ev) const
1642 : {
1643 350 : if (NS_FAILED(mInputStatus)) {
1644 10 : return mInputStatus;
1645 : }
1646 :
1647 340 : if (mReadState.mAvailable) {
1648 : // Still something to read and this input stream state is OK.
1649 4 : return NS_OK;
1650 : }
1651 :
1652 : // Nothing to read, just fall through to the pipe's state that
1653 : // may reflect state of its output stream side (already closed).
1654 336 : return mPipe->mStatus;
1655 : }
1656 :
1657 : nsresult
1658 0 : nsPipeInputStream::Status() const
1659 : {
1660 0 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1661 0 : return Status(mon);
1662 : }
1663 :
1664 225 : nsPipeInputStream::~nsPipeInputStream()
1665 : {
1666 75 : Close();
1667 225 : }
1668 :
1669 : //-----------------------------------------------------------------------------
1670 : // nsPipeOutputStream methods:
1671 : //-----------------------------------------------------------------------------
1672 :
1673 208 : NS_IMPL_QUERY_INTERFACE(nsPipeOutputStream,
1674 : nsIOutputStream,
1675 : nsIAsyncOutputStream,
1676 : nsIClassInfo)
1677 :
1678 0 : NS_IMPL_CI_INTERFACE_GETTER(nsPipeOutputStream,
1679 : nsIOutputStream,
1680 : nsIAsyncOutputStream)
1681 :
1682 0 : NS_IMPL_THREADSAFE_CI(nsPipeOutputStream)
1683 :
1684 : nsresult
1685 0 : nsPipeOutputStream::Wait()
1686 : {
1687 0 : NS_ASSERTION(mBlocking, "wait on non-blocking pipe output stream");
1688 :
1689 0 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1690 :
1691 0 : if (NS_SUCCEEDED(mPipe->mStatus) && !mWritable) {
1692 0 : LOG(("OOO pipe output: waiting for space\n"));
1693 0 : mBlocked = true;
1694 0 : mon.Wait();
1695 0 : mBlocked = false;
1696 0 : LOG(("OOO pipe output: woke up [pipe-status=%" PRIx32 " writable=%u]\n",
1697 : static_cast<uint32_t>(mPipe->mStatus), mWritable));
1698 : }
1699 :
1700 0 : return mPipe->mStatus == NS_BASE_STREAM_CLOSED ? NS_OK : mPipe->mStatus;
1701 : }
1702 :
1703 : MonitorAction
1704 3 : nsPipeOutputStream::OnOutputWritable(nsPipeEvents& aEvents)
1705 : {
1706 3 : MonitorAction result = DoNotNotifyMonitor;
1707 :
1708 3 : mWritable = true;
1709 :
1710 3 : if (mCallback && !(mCallbackFlags & WAIT_CLOSURE_ONLY)) {
1711 0 : aEvents.NotifyOutputReady(this, mCallback);
1712 0 : mCallback = nullptr;
1713 0 : mCallbackFlags = 0;
1714 3 : } else if (mBlocked) {
1715 0 : result = NotifyMonitor;
1716 : }
1717 :
1718 3 : return result;
1719 : }
1720 :
1721 : MonitorAction
1722 76 : nsPipeOutputStream::OnOutputException(nsresult aReason, nsPipeEvents& aEvents)
1723 : {
1724 76 : LOG(("nsPipeOutputStream::OnOutputException [this=%p reason=%" PRIx32 "]\n",
1725 : this, static_cast<uint32_t>(aReason)));
1726 :
1727 76 : MonitorAction result = DoNotNotifyMonitor;
1728 :
1729 76 : NS_ASSERTION(NS_FAILED(aReason), "huh? successful exception");
1730 76 : mWritable = false;
1731 :
1732 76 : if (mCallback) {
1733 0 : aEvents.NotifyOutputReady(this, mCallback);
1734 0 : mCallback = nullptr;
1735 0 : mCallbackFlags = 0;
1736 76 : } else if (mBlocked) {
1737 0 : result = NotifyMonitor;
1738 : }
1739 :
1740 76 : return result;
1741 : }
1742 :
1743 :
1744 : NS_IMETHODIMP_(MozExternalRefCountType)
1745 350 : nsPipeOutputStream::AddRef()
1746 : {
1747 350 : ++mWriterRefCnt;
1748 350 : return mPipe->AddRef();
1749 : }
1750 :
1751 : NS_IMETHODIMP_(MozExternalRefCountType)
1752 350 : nsPipeOutputStream::Release()
1753 : {
1754 350 : if (--mWriterRefCnt == 0) {
1755 76 : Close();
1756 : }
1757 350 : return mPipe->Release();
1758 : }
1759 :
1760 : NS_IMETHODIMP
1761 146 : nsPipeOutputStream::CloseWithStatus(nsresult aReason)
1762 : {
1763 146 : LOG(("OOO CloseWithStatus [this=%p reason=%" PRIx32 "]\n",
1764 : this, static_cast<uint32_t>(aReason)));
1765 :
1766 146 : if (NS_SUCCEEDED(aReason)) {
1767 3 : aReason = NS_BASE_STREAM_CLOSED;
1768 : }
1769 :
1770 : // input stream may remain open
1771 146 : mPipe->OnPipeException(aReason, true);
1772 146 : return NS_OK;
1773 : }
1774 :
1775 : NS_IMETHODIMP
1776 77 : nsPipeOutputStream::Close()
1777 : {
1778 77 : return CloseWithStatus(NS_BASE_STREAM_CLOSED);
1779 : }
1780 :
1781 : NS_IMETHODIMP
1782 100 : nsPipeOutputStream::WriteSegments(nsReadSegmentFun aReader,
1783 : void* aClosure,
1784 : uint32_t aCount,
1785 : uint32_t* aWriteCount)
1786 : {
1787 100 : LOG(("OOO WriteSegments [this=%p count=%u]\n", this, aCount));
1788 :
1789 100 : nsresult rv = NS_OK;
1790 :
1791 : char* segment;
1792 : uint32_t segmentLen;
1793 :
1794 100 : *aWriteCount = 0;
1795 300 : while (aCount) {
1796 100 : rv = mPipe->GetWriteSegment(segment, segmentLen);
1797 100 : if (NS_FAILED(rv)) {
1798 0 : if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
1799 : // pipe is full
1800 0 : if (!mBlocking) {
1801 : // ignore this error if we've already written something
1802 0 : if (*aWriteCount > 0) {
1803 0 : rv = NS_OK;
1804 : }
1805 0 : break;
1806 : }
1807 : // wait for the pipe to have an empty segment.
1808 0 : rv = Wait();
1809 0 : if (NS_SUCCEEDED(rv)) {
1810 0 : continue;
1811 : }
1812 : }
1813 0 : mPipe->OnPipeException(rv);
1814 0 : break;
1815 : }
1816 :
1817 : // write no more than aCount
1818 100 : if (segmentLen > aCount) {
1819 7 : segmentLen = aCount;
1820 : }
1821 :
1822 100 : uint32_t readCount, originalLen = segmentLen;
1823 276 : while (segmentLen) {
1824 166 : readCount = 0;
1825 :
1826 166 : rv = aReader(this, aClosure, segment, *aWriteCount, segmentLen, &readCount);
1827 :
1828 166 : if (NS_FAILED(rv) || readCount == 0) {
1829 78 : aCount = 0;
1830 : // any errors returned from the aReader end here: do not
1831 : // propagate to the caller of WriteSegments.
1832 78 : rv = NS_OK;
1833 78 : break;
1834 : }
1835 :
1836 88 : NS_ASSERTION(readCount <= segmentLen, "read more than expected");
1837 88 : segment += readCount;
1838 88 : segmentLen -= readCount;
1839 88 : aCount -= readCount;
1840 88 : *aWriteCount += readCount;
1841 88 : mLogicalOffset += readCount;
1842 : }
1843 :
1844 100 : if (segmentLen < originalLen) {
1845 88 : mPipe->AdvanceWriteCursor(originalLen - segmentLen);
1846 : }
1847 : }
1848 :
1849 100 : return rv;
1850 : }
1851 :
1852 : static nsresult
1853 7 : nsReadFromRawBuffer(nsIOutputStream* aOutStr,
1854 : void* aClosure,
1855 : char* aToRawSegment,
1856 : uint32_t aOffset,
1857 : uint32_t aCount,
1858 : uint32_t* aReadCount)
1859 : {
1860 7 : const char* fromBuf = (const char*)aClosure;
1861 7 : memcpy(aToRawSegment, &fromBuf[aOffset], aCount);
1862 7 : *aReadCount = aCount;
1863 7 : return NS_OK;
1864 : }
1865 :
1866 : NS_IMETHODIMP
1867 7 : nsPipeOutputStream::Write(const char* aFromBuf,
1868 : uint32_t aBufLen,
1869 : uint32_t* aWriteCount)
1870 : {
1871 7 : return WriteSegments(nsReadFromRawBuffer, (void*)aFromBuf, aBufLen, aWriteCount);
1872 : }
1873 :
1874 : NS_IMETHODIMP
1875 0 : nsPipeOutputStream::Flush(void)
1876 : {
1877 : // nothing to do
1878 0 : return NS_OK;
1879 : }
1880 :
1881 : static nsresult
1882 0 : nsReadFromInputStream(nsIOutputStream* aOutStr,
1883 : void* aClosure,
1884 : char* aToRawSegment,
1885 : uint32_t aOffset,
1886 : uint32_t aCount,
1887 : uint32_t* aReadCount)
1888 : {
1889 0 : nsIInputStream* fromStream = (nsIInputStream*)aClosure;
1890 0 : return fromStream->Read(aToRawSegment, aCount, aReadCount);
1891 : }
1892 :
1893 : NS_IMETHODIMP
1894 0 : nsPipeOutputStream::WriteFrom(nsIInputStream* aFromStream,
1895 : uint32_t aCount,
1896 : uint32_t* aWriteCount)
1897 : {
1898 0 : return WriteSegments(nsReadFromInputStream, aFromStream, aCount, aWriteCount);
1899 : }
1900 :
1901 : NS_IMETHODIMP
1902 0 : nsPipeOutputStream::IsNonBlocking(bool* aNonBlocking)
1903 : {
1904 0 : *aNonBlocking = !mBlocking;
1905 0 : return NS_OK;
1906 : }
1907 :
1908 : NS_IMETHODIMP
1909 0 : nsPipeOutputStream::AsyncWait(nsIOutputStreamCallback* aCallback,
1910 : uint32_t aFlags,
1911 : uint32_t aRequestedCount,
1912 : nsIEventTarget* aTarget)
1913 : {
1914 0 : LOG(("OOO AsyncWait [this=%p]\n", this));
1915 :
1916 0 : nsPipeEvents pipeEvents;
1917 : {
1918 0 : ReentrantMonitorAutoEnter mon(mPipe->mReentrantMonitor);
1919 :
1920 : // replace a pending callback
1921 0 : mCallback = nullptr;
1922 0 : mCallbackFlags = 0;
1923 :
1924 0 : if (!aCallback) {
1925 0 : return NS_OK;
1926 : }
1927 :
1928 0 : nsCOMPtr<nsIOutputStreamCallback> proxy;
1929 0 : if (aTarget) {
1930 0 : proxy = NS_NewOutputStreamReadyEvent(aCallback, aTarget);
1931 0 : aCallback = proxy;
1932 : }
1933 :
1934 0 : if (NS_FAILED(mPipe->mStatus) ||
1935 0 : (mWritable && !(aFlags & WAIT_CLOSURE_ONLY))) {
1936 : // stream is already closed or writable; post event.
1937 0 : pipeEvents.NotifyOutputReady(this, aCallback);
1938 : } else {
1939 : // queue up callback object to be notified when data becomes available
1940 0 : mCallback = aCallback;
1941 0 : mCallbackFlags = aFlags;
1942 : }
1943 : }
1944 0 : return NS_OK;
1945 : }
1946 :
1947 : ////////////////////////////////////////////////////////////////////////////////
1948 :
1949 : nsresult
1950 7 : NS_NewPipe(nsIInputStream** aPipeIn,
1951 : nsIOutputStream** aPipeOut,
1952 : uint32_t aSegmentSize,
1953 : uint32_t aMaxSize,
1954 : bool aNonBlockingInput,
1955 : bool aNonBlockingOutput)
1956 : {
1957 7 : if (aSegmentSize == 0) {
1958 1 : aSegmentSize = DEFAULT_SEGMENT_SIZE;
1959 : }
1960 :
1961 : // Handle aMaxSize of UINT32_MAX as a special case
1962 : uint32_t segmentCount;
1963 7 : if (aMaxSize == UINT32_MAX) {
1964 7 : segmentCount = UINT32_MAX;
1965 : } else {
1966 0 : segmentCount = aMaxSize / aSegmentSize;
1967 : }
1968 :
1969 : nsIAsyncInputStream* in;
1970 : nsIAsyncOutputStream* out;
1971 7 : nsresult rv = NS_NewPipe2(&in, &out, aNonBlockingInput, aNonBlockingOutput,
1972 7 : aSegmentSize, segmentCount);
1973 7 : if (NS_FAILED(rv)) {
1974 0 : return rv;
1975 : }
1976 :
1977 7 : *aPipeIn = in;
1978 7 : *aPipeOut = out;
1979 7 : return NS_OK;
1980 : }
1981 :
1982 : nsresult
1983 76 : NS_NewPipe2(nsIAsyncInputStream** aPipeIn,
1984 : nsIAsyncOutputStream** aPipeOut,
1985 : bool aNonBlockingInput,
1986 : bool aNonBlockingOutput,
1987 : uint32_t aSegmentSize,
1988 : uint32_t aSegmentCount)
1989 : {
1990 76 : nsPipe* pipe = new nsPipe();
1991 76 : nsresult rv = pipe->Init(aNonBlockingInput,
1992 : aNonBlockingOutput,
1993 : aSegmentSize,
1994 76 : aSegmentCount);
1995 76 : if (NS_FAILED(rv)) {
1996 0 : NS_ADDREF(pipe);
1997 0 : NS_RELEASE(pipe);
1998 0 : return rv;
1999 : }
2000 :
2001 : // These always succeed because the pipe is initialized above.
2002 76 : MOZ_ALWAYS_SUCCEEDS(pipe->GetInputStream(aPipeIn));
2003 76 : MOZ_ALWAYS_SUCCEEDS(pipe->GetOutputStream(aPipeOut));
2004 76 : return NS_OK;
2005 : }
2006 :
2007 : nsresult
2008 0 : nsPipeConstructor(nsISupports* aOuter, REFNSIID aIID, void** aResult)
2009 : {
2010 0 : if (aOuter) {
2011 0 : return NS_ERROR_NO_AGGREGATION;
2012 : }
2013 0 : nsPipe* pipe = new nsPipe();
2014 0 : NS_ADDREF(pipe);
2015 0 : nsresult rv = pipe->QueryInterface(aIID, aResult);
2016 0 : NS_RELEASE(pipe);
2017 0 : return rv;
2018 : }
2019 :
2020 : ////////////////////////////////////////////////////////////////////////////////
|