Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "IPCStreamSource.h"
8 : #include "nsIAsyncInputStream.h"
9 : #include "nsICancelableRunnable.h"
10 : #include "nsIRunnable.h"
11 : #include "nsISerialEventTarget.h"
12 : #include "nsStreamUtils.h"
13 : #include "nsThreadUtils.h"
14 :
15 : using mozilla::dom::workers::Canceling;
16 : using mozilla::dom::workers::GetCurrentThreadWorkerPrivate;
17 : using mozilla::dom::workers::Status;
18 : using mozilla::dom::workers::WorkerPrivate;
19 :
20 : namespace mozilla {
21 : namespace ipc {
22 :
23 : class IPCStreamSource::Callback final : public nsIInputStreamCallback
24 : , public nsIRunnable
25 : , public nsICancelableRunnable
26 : {
27 : public:
28 0 : explicit Callback(IPCStreamSource* aSource)
29 0 : : mSource(aSource)
30 0 : , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
31 : {
32 0 : MOZ_ASSERT(mSource);
33 0 : }
34 :
35 : NS_IMETHOD
36 0 : OnInputStreamReady(nsIAsyncInputStream* aStream) override
37 : {
38 : // any thread
39 0 : if (mOwningEventTarget->IsOnCurrentThread()) {
40 0 : return Run();
41 : }
42 :
43 : // If this fails, then it means the owning thread is a Worker that has
44 : // been shutdown. Its ok to lose the event in this case because the
45 : // IPCStreamChild listens for this event through the WorkerHolder.
46 0 : nsresult rv = mOwningEventTarget->Dispatch(this, nsIThread::DISPATCH_NORMAL);
47 0 : if (NS_FAILED(rv)) {
48 0 : NS_WARNING("Failed to dispatch stream readable event to owning thread");
49 : }
50 :
51 0 : return NS_OK;
52 : }
53 :
54 : NS_IMETHOD
55 0 : Run() override
56 : {
57 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
58 0 : if (mSource) {
59 0 : mSource->OnStreamReady(this);
60 : }
61 0 : return NS_OK;
62 : }
63 :
64 : nsresult
65 0 : Cancel() override
66 : {
67 : // Cancel() gets called when the Worker thread is being shutdown. We have
68 : // nothing to do here because IPCStreamChild handles this case via
69 : // the WorkerHolder.
70 0 : return NS_OK;
71 : }
72 :
73 : void
74 0 : ClearSource()
75 : {
76 0 : MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread());
77 0 : MOZ_ASSERT(mSource);
78 0 : mSource = nullptr;
79 0 : }
80 :
81 : private:
82 0 : ~Callback()
83 0 : {
84 : // called on any thread
85 :
86 : // ClearSource() should be called before the Callback is destroyed
87 0 : MOZ_ASSERT(!mSource);
88 0 : }
89 :
90 : // This is a raw pointer because the source keeps alive the callback and,
91 : // before beeing destroyed, it nullifies this pointer (this happens when
92 : // ActorDestroyed() is called).
93 : IPCStreamSource* mSource;
94 :
95 : nsCOMPtr<nsISerialEventTarget> mOwningEventTarget;
96 :
97 : NS_DECL_THREADSAFE_ISUPPORTS
98 : };
99 :
100 0 : NS_IMPL_ISUPPORTS(IPCStreamSource::Callback, nsIInputStreamCallback,
101 : nsIRunnable,
102 : nsICancelableRunnable);
103 :
104 0 : IPCStreamSource::IPCStreamSource(nsIAsyncInputStream* aInputStream)
105 : : mStream(aInputStream)
106 : , mWorkerPrivate(nullptr)
107 0 : , mState(ePending)
108 : {
109 0 : MOZ_ASSERT(aInputStream);
110 0 : }
111 :
112 0 : IPCStreamSource::~IPCStreamSource()
113 : {
114 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
115 0 : MOZ_ASSERT(mState == eClosed);
116 0 : MOZ_ASSERT(!mCallback);
117 0 : MOZ_ASSERT(!mWorkerPrivate);
118 0 : }
119 :
120 : bool
121 0 : IPCStreamSource::Initialize()
122 : {
123 0 : bool nonBlocking = false;
124 0 : MOZ_ALWAYS_TRUE(NS_SUCCEEDED(mStream->IsNonBlocking(&nonBlocking)));
125 : // IPCStreamChild reads in the current thread, so it is only supported on
126 : // non-blocking, async channels
127 0 : if (!nonBlocking) {
128 0 : return false;
129 : }
130 :
131 : // A source can be used on any thread, but we only support IPCStream on
132 : // main thread, Workers and PBackground thread right now. This is due
133 : // to the requirement that the thread be guaranteed to live long enough to
134 : // receive messages. We can enforce this guarantee with a WorkerHolder on
135 : // worker threads, but not other threads. Main-thread and PBackground thread
136 : // do not need anything special in order to be kept alive.
137 0 : WorkerPrivate* workerPrivate = nullptr;
138 0 : if (!NS_IsMainThread()) {
139 0 : workerPrivate = GetCurrentThreadWorkerPrivate();
140 0 : if (workerPrivate) {
141 0 : bool result = HoldWorker(workerPrivate, Canceling);
142 0 : if (!result) {
143 0 : return false;
144 : }
145 :
146 0 : mWorkerPrivate = workerPrivate;
147 : } else {
148 0 : AssertIsOnBackgroundThread();
149 : }
150 : }
151 :
152 0 : return true;
153 : }
154 :
155 : void
156 0 : IPCStreamSource::ActorConstructed()
157 : {
158 0 : MOZ_ASSERT(mState == ePending);
159 0 : mState = eActorConstructed;
160 0 : }
161 :
162 : bool
163 0 : IPCStreamSource::Notify(Status aStatus)
164 : {
165 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
166 :
167 : // Keep the worker thread alive until the stream is finished.
168 0 : return true;
169 : }
170 :
171 : void
172 0 : IPCStreamSource::ActorDestroyed()
173 : {
174 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
175 :
176 0 : mState = eClosed;
177 :
178 0 : if (mCallback) {
179 0 : mCallback->ClearSource();
180 0 : mCallback = nullptr;
181 : }
182 :
183 0 : if (mWorkerPrivate) {
184 0 : ReleaseWorker();
185 0 : mWorkerPrivate = nullptr;
186 : }
187 0 : }
188 :
189 : void
190 0 : IPCStreamSource::Start()
191 : {
192 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
193 0 : DoRead();
194 0 : }
195 :
196 : void
197 0 : IPCStreamSource::StartDestroy()
198 : {
199 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
200 0 : OnEnd(NS_ERROR_ABORT);
201 0 : }
202 :
203 : void
204 0 : IPCStreamSource::DoRead()
205 : {
206 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
207 0 : MOZ_ASSERT(mState == eActorConstructed);
208 0 : MOZ_ASSERT(!mCallback);
209 :
210 : // The input stream (likely a pipe) probably uses a segment size of
211 : // 4kb. If there is data already buffered it would be nice to aggregate
212 : // multiple segments into a single IPC call. Conversely, don't send too
213 : // too large of a buffer in a single call to avoid spiking memory.
214 : static const uint64_t kMaxBytesPerMessage = 32 * 1024;
215 : static_assert(kMaxBytesPerMessage <= static_cast<uint64_t>(UINT32_MAX),
216 : "kMaxBytesPerMessage must cleanly cast to uint32_t");
217 :
218 : while (true) {
219 : // It should not be possible to transition to closed state without
220 : // this loop terminating via a return.
221 0 : MOZ_ASSERT(mState == eActorConstructed);
222 :
223 : // Use non-auto here as we're unlikely to hit stack storage with the
224 : // sizes we are sending. Also, it would be nice to avoid another copy
225 : // to the IPC layer which we avoid if we use COW strings. Unfortunately
226 : // IPC does not seem to support passing dependent storage types.
227 0 : nsCString buffer;
228 :
229 0 : uint64_t available = 0;
230 0 : nsresult rv = mStream->Available(&available);
231 0 : if (NS_FAILED(rv)) {
232 0 : OnEnd(rv);
233 0 : return;
234 : }
235 :
236 0 : if (available == 0) {
237 0 : Wait();
238 0 : return;
239 : }
240 :
241 : uint32_t expectedBytes =
242 0 : static_cast<uint32_t>(std::min(available, kMaxBytesPerMessage));
243 :
244 0 : buffer.SetLength(expectedBytes);
245 :
246 0 : uint32_t bytesRead = 0;
247 0 : rv = mStream->Read(buffer.BeginWriting(), buffer.Length(), &bytesRead);
248 0 : MOZ_ASSERT_IF(NS_FAILED(rv), bytesRead == 0);
249 0 : buffer.SetLength(bytesRead);
250 :
251 : // If we read any data from the stream, send it across.
252 0 : if (!buffer.IsEmpty()) {
253 0 : SendData(buffer);
254 : }
255 :
256 0 : if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
257 0 : Wait();
258 0 : return;
259 : }
260 :
261 : // Any other error or zero-byte read indicates end-of-stream
262 0 : if (NS_FAILED(rv) || buffer.IsEmpty()) {
263 0 : OnEnd(rv);
264 0 : return;
265 : }
266 0 : }
267 : }
268 :
269 : void
270 0 : IPCStreamSource::Wait()
271 : {
272 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
273 0 : MOZ_ASSERT(mState == eActorConstructed);
274 0 : MOZ_ASSERT(!mCallback);
275 :
276 : // Set mCallback immediately instead of waiting for success. Its possible
277 : // AsyncWait() will callback synchronously.
278 0 : mCallback = new Callback(this);
279 0 : nsresult rv = mStream->AsyncWait(mCallback, 0, 0, nullptr);
280 0 : if (NS_FAILED(rv)) {
281 0 : OnEnd(rv);
282 0 : return;
283 : }
284 : }
285 :
286 : void
287 0 : IPCStreamSource::OnStreamReady(Callback* aCallback)
288 : {
289 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
290 0 : MOZ_ASSERT(mCallback);
291 0 : MOZ_ASSERT(aCallback == mCallback);
292 0 : mCallback->ClearSource();
293 0 : mCallback = nullptr;
294 0 : DoRead();
295 0 : }
296 :
297 : void
298 0 : IPCStreamSource::OnEnd(nsresult aRv)
299 : {
300 0 : NS_ASSERT_OWNINGTHREAD(IPCStreamSource);
301 0 : MOZ_ASSERT(aRv != NS_BASE_STREAM_WOULD_BLOCK);
302 :
303 0 : if (mState == eClosed) {
304 0 : return;
305 : }
306 :
307 0 : mState = eClosed;
308 :
309 0 : mStream->CloseWithStatus(aRv);
310 :
311 0 : if (aRv == NS_BASE_STREAM_CLOSED) {
312 0 : aRv = NS_OK;
313 : }
314 :
315 : // This will trigger an ActorDestroy() from the other side
316 0 : Close(aRv);
317 : }
318 :
319 : } // namespace ipc
320 9 : } // namespace mozilla
|