Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "IPCStreamDestination.h"
8 : #include "mozilla/Mutex.h"
9 : #include "nsIAsyncInputStream.h"
10 : #include "nsIAsyncOutputStream.h"
11 : #include "nsIBufferedStreams.h"
12 : #include "nsICloneableInputStream.h"
13 : #include "nsIPipe.h"
14 :
15 : namespace mozilla {
16 : namespace ipc {
17 :
18 : // ----------------------------------------------------------------------------
19 : // IPCStreamDestination::DelayedStartInputStream
20 : //
21 : // When AutoIPCStream is used with delayedStart, we need to ask for data at the
22 : // first real use of the nsIInputStream. In order to do so, we wrap the
23 : // nsIInputStream, created by the nsIPipe, with this wrapper.
24 :
25 : class IPCStreamDestination::DelayedStartInputStream final
26 : : public nsIAsyncInputStream
27 : , public nsISearchableInputStream
28 : , public nsICloneableInputStream
29 : , public nsIBufferedInputStream
30 : {
31 : public:
32 : NS_DECL_THREADSAFE_ISUPPORTS
33 :
34 0 : DelayedStartInputStream(IPCStreamDestination* aDestination,
35 : already_AddRefed<nsIAsyncInputStream>&& aStream)
36 0 : : mDestination(aDestination)
37 : , mStream(aStream)
38 0 : , mMutex("IPCStreamDestination::DelayedStartInputStream::mMutex")
39 : {
40 0 : MOZ_ASSERT(mDestination);
41 0 : MOZ_ASSERT(mStream);
42 0 : }
43 :
44 : void
45 0 : DestinationShutdown()
46 : {
47 0 : MutexAutoLock lock(mMutex);
48 0 : mDestination = nullptr;
49 0 : }
50 :
51 : // nsIInputStream interface
52 :
53 : NS_IMETHOD
54 0 : Close() override
55 : {
56 0 : MaybeCloseDestination();
57 0 : return mStream->Close();
58 : }
59 :
60 : NS_IMETHOD
61 0 : Available(uint64_t* aLength) override
62 : {
63 0 : MaybeStartReading();
64 0 : return mStream->Available(aLength);
65 : }
66 :
67 : NS_IMETHOD
68 0 : Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount) override
69 : {
70 0 : MaybeStartReading();
71 0 : return mStream->Read(aBuffer, aCount, aReadCount);
72 : }
73 :
74 : NS_IMETHOD
75 0 : ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, uint32_t aCount,
76 : uint32_t *aResult) override
77 : {
78 0 : MaybeStartReading();
79 0 : return mStream->ReadSegments(aWriter, aClosure, aCount, aResult);
80 : }
81 :
82 : NS_IMETHOD
83 0 : IsNonBlocking(bool* aNonBlocking) override
84 : {
85 0 : MaybeStartReading();
86 0 : return mStream->IsNonBlocking(aNonBlocking);
87 : }
88 :
89 : // nsIAsyncInputStream interface
90 :
91 : NS_IMETHOD
92 0 : CloseWithStatus(nsresult aReason) override
93 : {
94 0 : MaybeCloseDestination();
95 0 : return mStream->CloseWithStatus(aReason);
96 : }
97 :
98 : NS_IMETHOD
99 0 : AsyncWait(nsIInputStreamCallback* aCallback, uint32_t aFlags,
100 : uint32_t aRequestedCount, nsIEventTarget* aTarget) override
101 : {
102 0 : MaybeStartReading();
103 0 : return mStream->AsyncWait(aCallback, aFlags, aRequestedCount, aTarget);
104 : }
105 :
106 : NS_IMETHOD
107 0 : Search(const char* aForString, bool aIgnoreCase, bool* aFound,
108 : uint32_t* aOffsetSearchedTo) override
109 : {
110 0 : MaybeStartReading();
111 0 : nsCOMPtr<nsISearchableInputStream> searchable = do_QueryInterface(mStream);
112 0 : MOZ_ASSERT(searchable);
113 0 : return searchable->Search(aForString, aIgnoreCase, aFound, aOffsetSearchedTo);
114 : }
115 :
116 : // nsICloneableInputStream interface
117 :
118 : NS_IMETHOD
119 0 : GetCloneable(bool* aCloneable) override
120 : {
121 0 : MaybeStartReading();
122 0 : nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
123 0 : MOZ_ASSERT(cloneable);
124 0 : return cloneable->GetCloneable(aCloneable);
125 : }
126 :
127 : NS_IMETHOD
128 0 : Clone(nsIInputStream** aResult) override
129 : {
130 0 : MaybeStartReading();
131 0 : nsCOMPtr<nsICloneableInputStream> cloneable = do_QueryInterface(mStream);
132 0 : MOZ_ASSERT(cloneable);
133 0 : return cloneable->Clone(aResult);
134 : }
135 :
136 : // nsIBufferedInputStream
137 :
138 : NS_IMETHOD
139 0 : Init(nsIInputStream* aStream, uint32_t aBufferSize) override
140 : {
141 0 : MaybeStartReading();
142 0 : nsCOMPtr<nsIBufferedInputStream> stream = do_QueryInterface(mStream);
143 0 : MOZ_ASSERT(stream);
144 0 : return stream->Init(aStream, aBufferSize);
145 : }
146 :
147 : NS_IMETHODIMP
148 0 : GetData(nsIInputStream **aResult) override
149 : {
150 0 : return NS_ERROR_NOT_IMPLEMENTED;
151 : }
152 :
153 : void
154 : MaybeStartReading();
155 :
156 : void
157 : MaybeCloseDestination();
158 :
159 : private:
160 0 : ~DelayedStartInputStream() = default;
161 :
162 : IPCStreamDestination* mDestination;
163 : nsCOMPtr<nsIAsyncInputStream> mStream;
164 :
165 : // This protects mDestination: any method can be called by any thread.
166 : Mutex mMutex;
167 :
168 : class HelperRunnable;
169 : };
170 :
171 0 : class IPCStreamDestination::DelayedStartInputStream::HelperRunnable final
172 : : public Runnable
173 : {
174 : public:
175 : enum Op {
176 : eStartReading,
177 : eCloseDestination,
178 : };
179 :
180 0 : HelperRunnable(
181 : IPCStreamDestination::DelayedStartInputStream* aDelayedStartInputStream,
182 : Op aOp)
183 0 : : Runnable(
184 : "ipc::IPCStreamDestination::DelayedStartInputStream::HelperRunnable")
185 : , mDelayedStartInputStream(aDelayedStartInputStream)
186 0 : , mOp(aOp)
187 : {
188 0 : MOZ_ASSERT(aDelayedStartInputStream);
189 0 : }
190 :
191 : NS_IMETHOD
192 0 : Run() override
193 : {
194 0 : switch (mOp) {
195 : case eStartReading:
196 0 : mDelayedStartInputStream->MaybeStartReading();
197 0 : break;
198 : case eCloseDestination:
199 0 : mDelayedStartInputStream->MaybeCloseDestination();
200 0 : break;
201 : }
202 :
203 0 : return NS_OK;
204 : }
205 :
206 : private:
207 : RefPtr<IPCStreamDestination::DelayedStartInputStream> mDelayedStartInputStream;
208 : Op mOp;
209 : };
210 :
211 : void
212 0 : IPCStreamDestination::DelayedStartInputStream::MaybeStartReading()
213 : {
214 0 : MutexAutoLock lock(mMutex);
215 0 : if (!mDestination) {
216 0 : return;
217 : }
218 :
219 0 : if (mDestination->IsOnOwningThread()) {
220 0 : mDestination->StartReading();
221 0 : mDestination = nullptr;
222 0 : return;
223 : }
224 :
225 : RefPtr<Runnable> runnable =
226 0 : new HelperRunnable(this, HelperRunnable::eStartReading);
227 0 : mDestination->DispatchRunnable(runnable.forget());
228 : }
229 :
230 : void
231 0 : IPCStreamDestination::DelayedStartInputStream::MaybeCloseDestination()
232 : {
233 0 : MutexAutoLock lock(mMutex);
234 0 : if (!mDestination) {
235 0 : return;
236 : }
237 :
238 0 : if (mDestination->IsOnOwningThread()) {
239 0 : mDestination->RequestClose(NS_ERROR_ABORT);
240 0 : mDestination = nullptr;
241 0 : return;
242 : }
243 :
244 : RefPtr<Runnable> runnable =
245 0 : new HelperRunnable(this, HelperRunnable::eCloseDestination);
246 0 : mDestination->DispatchRunnable(runnable.forget());
247 : }
248 :
249 0 : NS_IMPL_ADDREF(IPCStreamDestination::DelayedStartInputStream);
250 0 : NS_IMPL_RELEASE(IPCStreamDestination::DelayedStartInputStream);
251 :
252 0 : NS_INTERFACE_MAP_BEGIN(IPCStreamDestination::DelayedStartInputStream)
253 0 : NS_INTERFACE_MAP_ENTRY(nsIAsyncInputStream)
254 0 : NS_INTERFACE_MAP_ENTRY(nsISearchableInputStream)
255 0 : NS_INTERFACE_MAP_ENTRY(nsICloneableInputStream)
256 0 : NS_INTERFACE_MAP_ENTRY(nsIBufferedInputStream)
257 0 : NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsIInputStream, nsIAsyncInputStream)
258 0 : NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIAsyncInputStream)
259 0 : NS_INTERFACE_MAP_END
260 :
261 : // ----------------------------------------------------------------------------
262 : // IPCStreamDestination
263 :
264 0 : IPCStreamDestination::IPCStreamDestination()
265 : : mOwningThread(NS_GetCurrentThread())
266 0 : , mDelayedStart(false)
267 : {
268 0 : }
269 :
270 0 : IPCStreamDestination::~IPCStreamDestination()
271 : {
272 0 : }
273 :
274 : nsresult
275 0 : IPCStreamDestination::Initialize()
276 : {
277 0 : MOZ_ASSERT(!mReader);
278 0 : MOZ_ASSERT(!mWriter);
279 :
280 : // use async versions for both reader and writer even though we are
281 : // opening the writer as an infinite stream. We want to be able to
282 : // use CloseWithStatus() to communicate errors through the pipe.
283 :
284 : // Use an "infinite" pipe because we cannot apply back-pressure through
285 : // the async IPC layer at the moment. Blocking the IPC worker thread
286 : // is not desirable, either.
287 0 : nsresult rv = NS_NewPipe2(getter_AddRefs(mReader),
288 0 : getter_AddRefs(mWriter),
289 : true, true, // non-blocking
290 : 0, // segment size
291 0 : UINT32_MAX); // "infinite" pipe
292 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
293 0 : return rv;
294 : }
295 :
296 0 : return NS_OK;
297 : }
298 :
299 : void
300 0 : IPCStreamDestination::SetDelayedStart(bool aDelayedStart)
301 : {
302 0 : mDelayedStart = aDelayedStart;
303 0 : }
304 :
305 : already_AddRefed<nsIInputStream>
306 0 : IPCStreamDestination::TakeReader()
307 : {
308 0 : MOZ_ASSERT(mReader);
309 0 : MOZ_ASSERT(!mDelayedStartInputStream);
310 :
311 0 : if (mDelayedStart) {
312 : mDelayedStartInputStream =
313 0 : new DelayedStartInputStream(this, mReader.forget());
314 0 : RefPtr<nsIAsyncInputStream> inputStream = mDelayedStartInputStream;
315 0 : return inputStream.forget();
316 : }
317 :
318 0 : return mReader.forget();
319 : }
320 :
321 : bool
322 0 : IPCStreamDestination::IsOnOwningThread() const
323 : {
324 0 : return mOwningThread == NS_GetCurrentThread();
325 : }
326 :
327 : void
328 0 : IPCStreamDestination::DispatchRunnable(already_AddRefed<nsIRunnable>&& aRunnable)
329 : {
330 0 : nsCOMPtr<nsIRunnable> runnable = aRunnable;
331 0 : mOwningThread->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
332 0 : }
333 :
334 : void
335 0 : IPCStreamDestination::ActorDestroyed()
336 : {
337 0 : MOZ_ASSERT(mWriter);
338 :
339 : // If we were gracefully closed we should have gotten RecvClose(). In
340 : // that case, the writer will already be closed and this will have no
341 : // effect. This just aborts the writer in the case where the child process
342 : // crashes.
343 0 : mWriter->CloseWithStatus(NS_ERROR_ABORT);
344 :
345 0 : if (mDelayedStartInputStream) {
346 0 : mDelayedStartInputStream->DestinationShutdown();
347 0 : mDelayedStartInputStream = nullptr;
348 : }
349 0 : }
350 :
351 : void
352 0 : IPCStreamDestination::BufferReceived(const nsCString& aBuffer)
353 : {
354 0 : MOZ_ASSERT(mWriter);
355 :
356 0 : uint32_t numWritten = 0;
357 :
358 : // This should only fail if we hit an OOM condition.
359 0 : nsresult rv = mWriter->Write(aBuffer.get(), aBuffer.Length(), &numWritten);
360 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
361 0 : RequestClose(rv);
362 : }
363 0 : }
364 :
365 : void
366 0 : IPCStreamDestination::CloseReceived(nsresult aRv)
367 : {
368 0 : MOZ_ASSERT(mWriter);
369 0 : mWriter->CloseWithStatus(aRv);
370 0 : TerminateDestination();
371 0 : }
372 :
373 : } // namespace ipc
374 : } // namespace mozilla
|