Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "mozilla/dom/cache/ReadStream.h"
8 :
9 : #include "mozilla/Unused.h"
10 : #include "mozilla/dom/cache/CacheStreamControlChild.h"
11 : #include "mozilla/dom/cache/CacheStreamControlParent.h"
12 : #include "mozilla/dom/cache/CacheTypes.h"
13 : #include "mozilla/ipc/IPCStreamUtils.h"
14 : #include "mozilla/SnappyUncompressInputStream.h"
15 : #include "nsIAsyncInputStream.h"
16 : #include "nsTArray.h"
17 :
18 : namespace mozilla {
19 : namespace dom {
20 : namespace cache {
21 :
22 : using mozilla::Unused;
23 : using mozilla::ipc::AutoIPCStream;
24 : using mozilla::ipc::IPCStream;
25 :
26 : // ----------------------------------------------------------------------------
27 :
28 : // The inner stream class. This is where all of the real work is done. As
29 : // an invariant Inner::Close() must be called before ~Inner(). This is
30 : // guaranteed by our outer ReadStream class.
31 : class ReadStream::Inner final : public ReadStream::Controllable
32 : {
33 : public:
34 : Inner(StreamControl* aControl, const nsID& aId,
35 : nsIInputStream* aStream);
36 :
37 : void
38 : Serialize(CacheReadStreamOrVoid* aReadStreamOut,
39 : nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
40 : ErrorResult& aRv);
41 :
42 : void
43 : Serialize(CacheReadStream* aReadStreamOut,
44 : nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
45 : ErrorResult& aRv);
46 :
47 : // ReadStream::Controllable methods
48 : virtual void
49 : CloseStream() override;
50 :
51 : virtual void
52 : CloseStreamWithoutReporting() override;
53 :
54 : virtual bool
55 : MatchId(const nsID& aId) const override;
56 :
57 : virtual bool
58 : HasEverBeenRead() const override;
59 :
60 : // Simulate nsIInputStream methods, but we don't actually inherit from it
61 : nsresult
62 : Close();
63 :
64 : nsresult
65 : Available(uint64_t *aNumAvailableOut);
66 :
67 : nsresult
68 : Read(char *aBuf, uint32_t aCount, uint32_t *aNumReadOut);
69 :
70 : nsresult
71 : ReadSegments(nsWriteSegmentFun aWriter, void *aClosure, uint32_t aCount,
72 : uint32_t *aNumReadOut);
73 :
74 : nsresult
75 : IsNonBlocking(bool *aNonBlockingOut);
76 :
77 : private:
78 : class NoteClosedRunnable;
79 : class ForgetRunnable;
80 :
81 : ~Inner();
82 :
83 : void
84 : NoteClosed();
85 :
86 : void
87 : Forget();
88 :
89 : void
90 : NoteClosedOnOwningThread();
91 :
92 : void
93 : ForgetOnOwningThread();
94 :
95 : // Weak ref to the stream control actor. The actor will always call either
96 : // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The
97 : // weak ref is cleared in the resulting NoteClosedOnOwningThread() or
98 : // ForgetOnOwningThread() method call.
99 : StreamControl* mControl;
100 :
101 : const nsID mId;
102 : nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
103 :
104 : enum State
105 : {
106 : Open,
107 : Closed,
108 : NumStates
109 : };
110 : Atomic<State> mState;
111 : Atomic<bool> mHasEverBeenRead;
112 :
113 :
114 : // The wrapped stream objects may not be threadsafe. We need to be able
115 : // to close a stream on our owning thread while an IO thread is simultaneously
116 : // reading the same stream. Therefore, protect all access to these stream
117 : // objects with a mutex.
118 : Mutex mMutex;
119 : nsCOMPtr<nsIInputStream> mStream;
120 : nsCOMPtr<nsIInputStream> mSnappyStream;
121 :
122 0 : NS_INLINE_DECL_THREADSAFE_REFCOUNTING(cache::ReadStream::Inner, override)
123 : };
124 :
125 : // ----------------------------------------------------------------------------
126 :
127 : // Runnable to notify actors that the ReadStream has closed. This must
128 : // be done on the thread associated with the PBackground actor. Must be
129 : // cancelable to execute on Worker threads (which can occur when the
130 : // ReadStream is constructed on a child process Worker thread).
131 : class ReadStream::Inner::NoteClosedRunnable final : public CancelableRunnable
132 : {
133 : public:
134 0 : explicit NoteClosedRunnable(ReadStream::Inner* aStream)
135 0 : : CancelableRunnable("dom::cache::ReadStream::Inner::NoteClosedRunnable")
136 0 : , mStream(aStream)
137 0 : { }
138 :
139 0 : NS_IMETHOD Run() override
140 : {
141 0 : mStream->NoteClosedOnOwningThread();
142 0 : mStream = nullptr;
143 0 : return NS_OK;
144 : }
145 :
146 : // Note, we must proceed with the Run() method since our actor will not
147 : // clean itself up until we note that the stream is closed.
148 0 : nsresult Cancel() override
149 : {
150 0 : Run();
151 0 : return NS_OK;
152 : }
153 :
154 : private:
155 0 : ~NoteClosedRunnable() { }
156 :
157 : RefPtr<ReadStream::Inner> mStream;
158 : };
159 :
160 : // ----------------------------------------------------------------------------
161 :
162 : // Runnable to clear actors without reporting that the ReadStream has
163 : // closed. Since this can trigger actor destruction, we need to do
164 : // it on the thread associated with the PBackground actor. Must be
165 : // cancelable to execute on Worker threads (which can occur when the
166 : // ReadStream is constructed on a child process Worker thread).
167 : class ReadStream::Inner::ForgetRunnable final : public CancelableRunnable
168 : {
169 : public:
170 0 : explicit ForgetRunnable(ReadStream::Inner* aStream)
171 0 : : CancelableRunnable("dom::cache::ReadStream::Inner::ForgetRunnable")
172 0 : , mStream(aStream)
173 0 : { }
174 :
175 0 : NS_IMETHOD Run() override
176 : {
177 0 : mStream->ForgetOnOwningThread();
178 0 : mStream = nullptr;
179 0 : return NS_OK;
180 : }
181 :
182 : // Note, we must proceed with the Run() method so that we properly
183 : // call RemoveListener on the actor.
184 0 : nsresult Cancel() override
185 : {
186 0 : Run();
187 0 : return NS_OK;
188 : }
189 :
190 : private:
191 0 : ~ForgetRunnable() { }
192 :
193 : RefPtr<ReadStream::Inner> mStream;
194 : };
195 :
196 : // ----------------------------------------------------------------------------
197 :
198 0 : ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId,
199 0 : nsIInputStream* aStream)
200 : : mControl(aControl)
201 : , mId(aId)
202 : , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
203 : , mState(Open)
204 : , mHasEverBeenRead(false)
205 : , mMutex("dom::cache::ReadStream")
206 : , mStream(aStream)
207 0 : , mSnappyStream(new SnappyUncompressInputStream(aStream))
208 : {
209 0 : MOZ_DIAGNOSTIC_ASSERT(mStream);
210 0 : MOZ_DIAGNOSTIC_ASSERT(mControl);
211 0 : mControl->AddReadStream(this);
212 0 : }
213 :
214 : void
215 0 : ReadStream::Inner::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
216 : nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
217 : ErrorResult& aRv)
218 : {
219 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
220 0 : MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
221 0 : *aReadStreamOut = CacheReadStream();
222 0 : Serialize(&aReadStreamOut->get_CacheReadStream(), aStreamCleanupList, aRv);
223 0 : }
224 :
225 : void
226 0 : ReadStream::Inner::Serialize(CacheReadStream* aReadStreamOut,
227 : nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
228 : ErrorResult& aRv)
229 : {
230 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
231 0 : MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut);
232 :
233 0 : if (mState != Open) {
234 0 : aRv.ThrowTypeError<MSG_CACHE_STREAM_CLOSED>();
235 0 : return;
236 : }
237 :
238 0 : MOZ_DIAGNOSTIC_ASSERT(mControl);
239 :
240 0 : aReadStreamOut->id() = mId;
241 0 : mControl->SerializeControl(aReadStreamOut);
242 :
243 : {
244 0 : MutexAutoLock lock(mMutex);
245 0 : mControl->SerializeStream(aReadStreamOut, mStream, aStreamCleanupList);
246 : }
247 :
248 0 : MOZ_DIAGNOSTIC_ASSERT(aReadStreamOut->stream().type() ==
249 : IPCStream::TInputStreamParamsWithFds);
250 :
251 : // We're passing ownership across the IPC barrier with the control, so
252 : // do not signal that the stream is closed here.
253 0 : Forget();
254 : }
255 :
256 : void
257 0 : ReadStream::Inner::CloseStream()
258 : {
259 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
260 0 : Close();
261 0 : }
262 :
263 : void
264 0 : ReadStream::Inner::CloseStreamWithoutReporting()
265 : {
266 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
267 0 : Forget();
268 0 : }
269 :
270 : bool
271 0 : ReadStream::Inner::MatchId(const nsID& aId) const
272 : {
273 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
274 0 : return mId.Equals(aId);
275 : }
276 :
277 : bool
278 0 : ReadStream::Inner::HasEverBeenRead() const
279 : {
280 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
281 0 : return mHasEverBeenRead;
282 : }
283 :
284 : nsresult
285 0 : ReadStream::Inner::Close()
286 : {
287 : // stream ops can happen on any thread
288 0 : nsresult rv = NS_OK;
289 : {
290 0 : MutexAutoLock lock(mMutex);
291 0 : rv = mSnappyStream->Close();
292 : }
293 0 : NoteClosed();
294 0 : return rv;
295 : }
296 :
297 : nsresult
298 0 : ReadStream::Inner::Available(uint64_t* aNumAvailableOut)
299 : {
300 : // stream ops can happen on any thread
301 0 : nsresult rv = NS_OK;
302 : {
303 0 : MutexAutoLock lock(mMutex);
304 0 : rv = mSnappyStream->Available(aNumAvailableOut);
305 : }
306 :
307 0 : if (NS_FAILED(rv)) {
308 0 : Close();
309 : }
310 :
311 0 : return rv;
312 : }
313 :
314 : nsresult
315 0 : ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
316 : {
317 : // stream ops can happen on any thread
318 0 : MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
319 :
320 0 : nsresult rv = NS_OK;
321 : {
322 0 : MutexAutoLock lock(mMutex);
323 0 : rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut);
324 : }
325 :
326 0 : if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) ||
327 0 : *aNumReadOut == 0) {
328 0 : Close();
329 : }
330 :
331 0 : mHasEverBeenRead = true;
332 :
333 0 : return rv;
334 : }
335 :
336 : nsresult
337 0 : ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
338 : uint32_t aCount, uint32_t* aNumReadOut)
339 : {
340 : // stream ops can happen on any thread
341 0 : MOZ_DIAGNOSTIC_ASSERT(aNumReadOut);
342 :
343 0 : if (aCount) {
344 0 : mHasEverBeenRead = true;
345 : }
346 :
347 :
348 0 : nsresult rv = NS_OK;
349 : {
350 0 : MutexAutoLock lock(mMutex);
351 0 : rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
352 : }
353 :
354 0 : if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK &&
355 0 : rv != NS_ERROR_NOT_IMPLEMENTED) || *aNumReadOut == 0) {
356 0 : Close();
357 : }
358 :
359 : // Verify bytes were actually read before marking as being ever read. For
360 : // example, code can test if the stream supports ReadSegments() by calling
361 : // this method with a dummy callback which doesn't read anything. We don't
362 : // want to trigger on that.
363 0 : if (*aNumReadOut) {
364 0 : mHasEverBeenRead = true;
365 : }
366 :
367 0 : return rv;
368 : }
369 :
370 : nsresult
371 0 : ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut)
372 : {
373 : // stream ops can happen on any thread
374 0 : MutexAutoLock lock(mMutex);
375 0 : return mSnappyStream->IsNonBlocking(aNonBlockingOut);
376 : }
377 :
378 0 : ReadStream::Inner::~Inner()
379 : {
380 : // Any thread
381 0 : MOZ_DIAGNOSTIC_ASSERT(mState == Closed);
382 0 : MOZ_DIAGNOSTIC_ASSERT(!mControl);
383 0 : }
384 :
385 : void
386 0 : ReadStream::Inner::NoteClosed()
387 : {
388 : // Any thread
389 0 : if (mState == Closed) {
390 0 : return;
391 : }
392 :
393 0 : if (mOwningEventTarget->IsOnCurrentThread()) {
394 0 : NoteClosedOnOwningThread();
395 0 : return;
396 : }
397 :
398 0 : nsCOMPtr<nsIRunnable> runnable = new NoteClosedRunnable(this);
399 0 : MOZ_ALWAYS_SUCCEEDS(
400 : mOwningEventTarget->Dispatch(runnable.forget(), nsIThread::DISPATCH_NORMAL));
401 : }
402 :
403 : void
404 0 : ReadStream::Inner::Forget()
405 : {
406 : // Any thread
407 0 : if (mState == Closed) {
408 0 : return;
409 : }
410 :
411 0 : if (mOwningEventTarget->IsOnCurrentThread()) {
412 0 : ForgetOnOwningThread();
413 0 : return;
414 : }
415 :
416 0 : nsCOMPtr<nsIRunnable> runnable = new ForgetRunnable(this);
417 0 : MOZ_ALWAYS_SUCCEEDS(
418 : mOwningEventTarget->Dispatch(runnable.forget(), nsIThread::DISPATCH_NORMAL));
419 : }
420 :
421 : void
422 0 : ReadStream::Inner::NoteClosedOnOwningThread()
423 : {
424 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
425 :
426 : // Mark closed and do nothing if we were already closed
427 0 : if (!mState.compareExchange(Open, Closed)) {
428 0 : return;
429 : }
430 :
431 0 : MOZ_DIAGNOSTIC_ASSERT(mControl);
432 0 : mControl->NoteClosed(this, mId);
433 0 : mControl = nullptr;
434 : }
435 :
436 : void
437 0 : ReadStream::Inner::ForgetOnOwningThread()
438 : {
439 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
440 :
441 : // Mark closed and do nothing if we were already closed
442 0 : if (!mState.compareExchange(Open, Closed)) {
443 0 : return;
444 : }
445 :
446 0 : MOZ_DIAGNOSTIC_ASSERT(mControl);
447 0 : mControl->ForgetReadStream(this);
448 0 : mControl = nullptr;
449 : }
450 :
451 : // ----------------------------------------------------------------------------
452 :
453 0 : NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream);
454 :
455 : // static
456 : already_AddRefed<ReadStream>
457 0 : ReadStream::Create(const CacheReadStreamOrVoid& aReadStreamOrVoid)
458 : {
459 0 : if (aReadStreamOrVoid.type() == CacheReadStreamOrVoid::Tvoid_t) {
460 0 : return nullptr;
461 : }
462 :
463 0 : return Create(aReadStreamOrVoid.get_CacheReadStream());
464 : }
465 :
466 : // static
467 : already_AddRefed<ReadStream>
468 0 : ReadStream::Create(const CacheReadStream& aReadStream)
469 : {
470 : // The parameter may or may not be for a Cache created stream. The way we
471 : // tell is by looking at the stream control actor. If the actor exists,
472 : // then we know the Cache created it.
473 0 : if (!aReadStream.controlChild() && !aReadStream.controlParent()) {
474 0 : return nullptr;
475 : }
476 :
477 0 : MOZ_DIAGNOSTIC_ASSERT(aReadStream.stream().type() ==
478 : IPCStream::TInputStreamParamsWithFds);
479 :
480 : // Control is guaranteed to survive this method as ActorDestroy() cannot
481 : // run on this thread until we complete.
482 : StreamControl* control;
483 0 : if (aReadStream.controlChild()) {
484 0 : auto actor = static_cast<CacheStreamControlChild*>(aReadStream.controlChild());
485 0 : control = actor;
486 : } else {
487 0 : auto actor = static_cast<CacheStreamControlParent*>(aReadStream.controlParent());
488 0 : control = actor;
489 : }
490 0 : MOZ_DIAGNOSTIC_ASSERT(control);
491 :
492 0 : nsCOMPtr<nsIInputStream> stream = DeserializeIPCStream(aReadStream.stream());
493 0 : MOZ_DIAGNOSTIC_ASSERT(stream);
494 :
495 : // Currently we expect all cache read streams to be blocking file streams.
496 : #if !defined(RELEASE_OR_BETA)
497 0 : nsCOMPtr<nsIAsyncInputStream> asyncStream = do_QueryInterface(stream);
498 0 : MOZ_DIAGNOSTIC_ASSERT(!asyncStream);
499 : #endif
500 :
501 0 : RefPtr<Inner> inner = new Inner(control, aReadStream.id(), stream);
502 0 : RefPtr<ReadStream> ref = new ReadStream(inner);
503 0 : return ref.forget();
504 : }
505 :
506 : // static
507 : already_AddRefed<ReadStream>
508 0 : ReadStream::Create(PCacheStreamControlParent* aControl, const nsID& aId,
509 : nsIInputStream* aStream)
510 : {
511 0 : MOZ_DIAGNOSTIC_ASSERT(aControl);
512 0 : auto actor = static_cast<CacheStreamControlParent*>(aControl);
513 0 : RefPtr<Inner> inner = new Inner(actor, aId, aStream);
514 0 : RefPtr<ReadStream> ref = new ReadStream(inner);
515 0 : return ref.forget();
516 : }
517 :
518 : void
519 0 : ReadStream::Serialize(CacheReadStreamOrVoid* aReadStreamOut,
520 : nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
521 : ErrorResult& aRv)
522 : {
523 0 : mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
524 0 : }
525 :
526 : void
527 0 : ReadStream::Serialize(CacheReadStream* aReadStreamOut,
528 : nsTArray<UniquePtr<AutoIPCStream>>& aStreamCleanupList,
529 : ErrorResult& aRv)
530 : {
531 0 : mInner->Serialize(aReadStreamOut, aStreamCleanupList, aRv);
532 0 : }
533 :
534 0 : ReadStream::ReadStream(ReadStream::Inner* aInner)
535 0 : : mInner(aInner)
536 : {
537 0 : MOZ_DIAGNOSTIC_ASSERT(mInner);
538 0 : }
539 :
540 0 : ReadStream::~ReadStream()
541 : {
542 : // Explicitly close the inner stream so that it does not have to
543 : // deal with implicitly closing at destruction time.
544 0 : mInner->Close();
545 0 : }
546 :
547 : NS_IMETHODIMP
548 0 : ReadStream::Close()
549 : {
550 0 : return mInner->Close();
551 : }
552 :
553 : NS_IMETHODIMP
554 0 : ReadStream::Available(uint64_t* aNumAvailableOut)
555 : {
556 0 : return mInner->Available(aNumAvailableOut);
557 : }
558 :
559 : NS_IMETHODIMP
560 0 : ReadStream::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut)
561 : {
562 0 : return mInner->Read(aBuf, aCount, aNumReadOut);
563 : }
564 :
565 : NS_IMETHODIMP
566 0 : ReadStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
567 : uint32_t aCount, uint32_t* aNumReadOut)
568 : {
569 0 : return mInner->ReadSegments(aWriter, aClosure, aCount, aNumReadOut);
570 : }
571 :
572 : NS_IMETHODIMP
573 0 : ReadStream::IsNonBlocking(bool* aNonBlockingOut)
574 : {
575 0 : return mInner->IsNonBlocking(aNonBlockingOut);
576 : }
577 :
578 : } // namespace cache
579 : } // namespace dom
580 : } // namespace mozilla
|