Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include "IPCBlobInputStreamChild.h"
8 : #include "IPCBlobInputStreamThread.h"
9 :
10 : #include "mozilla/ipc/IPCStreamUtils.h"
11 : #include "WorkerHolder.h"
12 : #include "WorkerPrivate.h"
13 : #include "WorkerRunnable.h"
14 :
15 : namespace mozilla {
16 : namespace dom {
17 :
18 : using namespace workers;
19 :
20 : namespace {
21 :
22 : // This runnable is used in case the last stream is forgotten on the 'wrong'
23 : // thread.
24 0 : class ShutdownRunnable final : public CancelableRunnable
25 : {
26 : public:
27 0 : explicit ShutdownRunnable(IPCBlobInputStreamChild* aActor)
28 0 : : CancelableRunnable("dom::ShutdownRunnable")
29 0 : , mActor(aActor)
30 0 : {}
31 :
32 : NS_IMETHOD
33 0 : Run() override
34 : {
35 0 : mActor->Shutdown();
36 0 : return NS_OK;
37 : }
38 :
39 : private:
40 : RefPtr<IPCBlobInputStreamChild> mActor;
41 : };
42 :
43 : // This runnable is used in case StreamNeeded() has been called on a non-owning
44 : // thread.
45 0 : class StreamNeededRunnable final : public CancelableRunnable
46 : {
47 : public:
48 0 : explicit StreamNeededRunnable(IPCBlobInputStreamChild* aActor)
49 0 : : CancelableRunnable("dom::StreamNeededRunnable")
50 0 : , mActor(aActor)
51 0 : {}
52 :
53 : NS_IMETHOD
54 0 : Run() override
55 : {
56 0 : MOZ_ASSERT(mActor->State() != IPCBlobInputStreamChild::eActiveMigrating &&
57 : mActor->State() != IPCBlobInputStreamChild::eInactiveMigrating);
58 0 : if (mActor->State() == IPCBlobInputStreamChild::eActive) {
59 0 : mActor->SendStreamNeeded();
60 : }
61 0 : return NS_OK;
62 : }
63 :
64 : private:
65 : RefPtr<IPCBlobInputStreamChild> mActor;
66 : };
67 :
68 : // When the stream has been received from the parent, we inform the
69 : // IPCBlobInputStream.
70 0 : class StreamReadyRunnable final : public CancelableRunnable
71 : {
72 : public:
73 0 : StreamReadyRunnable(IPCBlobInputStream* aDestinationStream,
74 : nsIInputStream* aCreatedStream)
75 0 : : CancelableRunnable("dom::StreamReadyRunnable")
76 : , mDestinationStream(aDestinationStream)
77 0 : , mCreatedStream(aCreatedStream)
78 : {
79 0 : MOZ_ASSERT(mDestinationStream);
80 : // mCreatedStream can be null.
81 0 : }
82 :
83 : NS_IMETHOD
84 0 : Run() override
85 : {
86 0 : mDestinationStream->StreamReady(mCreatedStream);
87 0 : return NS_OK;
88 : }
89 :
90 : private:
91 : RefPtr<IPCBlobInputStream> mDestinationStream;
92 : nsCOMPtr<nsIInputStream> mCreatedStream;
93 : };
94 :
95 0 : class IPCBlobInputStreamWorkerHolder final : public WorkerHolder
96 : {
97 : public:
98 0 : bool Notify(Status aStatus) override
99 : {
100 : // We must keep the worker alive until the migration is completed.
101 0 : return true;
102 : }
103 : };
104 :
105 0 : class ReleaseWorkerHolderRunnable final : public CancelableRunnable
106 : {
107 : public:
108 0 : explicit ReleaseWorkerHolderRunnable(
109 : UniquePtr<workers::WorkerHolder>&& aWorkerHolder)
110 0 : : CancelableRunnable("dom::ReleaseWorkerHolderRunnable")
111 0 : , mWorkerHolder(Move(aWorkerHolder))
112 0 : {}
113 :
114 : NS_IMETHOD
115 0 : Run() override
116 : {
117 0 : mWorkerHolder = nullptr;
118 0 : return NS_OK;
119 : }
120 :
121 : nsresult
122 0 : Cancel() override
123 : {
124 0 : return Run();
125 : }
126 :
127 : private:
128 : UniquePtr<workers::WorkerHolder> mWorkerHolder;
129 : };
130 :
131 : } // anonymous
132 :
133 0 : IPCBlobInputStreamChild::IPCBlobInputStreamChild(const nsID& aID,
134 0 : uint64_t aSize)
135 : : mMutex("IPCBlobInputStreamChild::mMutex")
136 : , mID(aID)
137 : , mSize(aSize)
138 : , mState(eActive)
139 0 : , mOwningEventTarget(GetCurrentThreadSerialEventTarget())
140 : {
141 : // If we are running in a worker, we need to send a Close() to the parent side
142 : // before the thread is released.
143 0 : if (!NS_IsMainThread()) {
144 0 : WorkerPrivate* workerPrivate = GetCurrentThreadWorkerPrivate();
145 0 : if (workerPrivate) {
146 : UniquePtr<WorkerHolder> workerHolder(
147 0 : new IPCBlobInputStreamWorkerHolder());
148 0 : if (workerHolder->HoldWorker(workerPrivate, Canceling)) {
149 0 : mWorkerHolder.swap(workerHolder);
150 : }
151 : }
152 : }
153 0 : }
154 :
155 0 : IPCBlobInputStreamChild::~IPCBlobInputStreamChild()
156 0 : {}
157 :
158 : void
159 0 : IPCBlobInputStreamChild::Shutdown()
160 : {
161 0 : MutexAutoLock lock(mMutex);
162 :
163 0 : RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
164 :
165 0 : mWorkerHolder = nullptr;
166 0 : mPendingOperations.Clear();
167 :
168 0 : if (mState == eActive) {
169 0 : SendClose();
170 0 : mState = eInactive;
171 : }
172 0 : }
173 :
174 : void
175 0 : IPCBlobInputStreamChild::ActorDestroy(IProtocol::ActorDestroyReason aReason)
176 : {
177 0 : bool migrating = false;
178 :
179 : {
180 0 : MutexAutoLock lock(mMutex);
181 0 : migrating = mState == eActiveMigrating;
182 0 : mState = migrating ? eInactiveMigrating : eInactive;
183 : }
184 :
185 0 : if (migrating) {
186 : // We were waiting for this! Now we can migrate the actor in the correct
187 : // thread.
188 : RefPtr<IPCBlobInputStreamThread> thread =
189 0 : IPCBlobInputStreamThread::GetOrCreate();
190 0 : ResetManager();
191 0 : thread->MigrateActor(this);
192 0 : return;
193 : }
194 :
195 : // Let's cleanup the workerHolder and the pending operation queue.
196 0 : Shutdown();
197 : }
198 :
199 : IPCBlobInputStreamChild::ActorState
200 0 : IPCBlobInputStreamChild::State()
201 : {
202 0 : MutexAutoLock lock(mMutex);
203 0 : return mState;
204 : }
205 :
206 : already_AddRefed<nsIInputStream>
207 0 : IPCBlobInputStreamChild::CreateStream()
208 : {
209 0 : bool shouldMigrate = false;
210 :
211 0 : RefPtr<IPCBlobInputStream> stream = new IPCBlobInputStream(this);
212 :
213 : {
214 0 : MutexAutoLock lock(mMutex);
215 :
216 0 : if (mState == eInactive) {
217 0 : return nullptr;
218 : }
219 :
220 : // The stream is active but maybe it is not running in the DOM-File thread.
221 : // We should migrate it there.
222 0 : if (mState == eActive &&
223 0 : !IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget)) {
224 0 : MOZ_ASSERT(mStreams.IsEmpty());
225 0 : shouldMigrate = true;
226 0 : mState = eActiveMigrating;
227 : }
228 :
229 0 : mStreams.AppendElement(stream);
230 : }
231 :
232 : // Send__delete__ will call ActorDestroy(). mMutex cannot be locked at this
233 : // time.
234 0 : if (shouldMigrate) {
235 0 : Send__delete__(this);
236 : }
237 :
238 0 : return stream.forget();
239 : }
240 :
241 : void
242 0 : IPCBlobInputStreamChild::ForgetStream(IPCBlobInputStream* aStream)
243 : {
244 0 : MOZ_ASSERT(aStream);
245 :
246 0 : RefPtr<IPCBlobInputStreamChild> kungFuDeathGrip = this;
247 :
248 : {
249 0 : MutexAutoLock lock(mMutex);
250 0 : mStreams.RemoveElement(aStream);
251 :
252 0 : if (!mStreams.IsEmpty() || mState != eActive) {
253 0 : return;
254 : }
255 : }
256 :
257 0 : if (mOwningEventTarget->IsOnCurrentThread()) {
258 0 : Shutdown();
259 0 : return;
260 : }
261 :
262 0 : RefPtr<ShutdownRunnable> runnable = new ShutdownRunnable(this);
263 0 : mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
264 : }
265 :
266 : void
267 0 : IPCBlobInputStreamChild::StreamNeeded(IPCBlobInputStream* aStream,
268 : nsIEventTarget* aEventTarget)
269 : {
270 0 : MutexAutoLock lock(mMutex);
271 :
272 0 : if (mState == eInactive) {
273 0 : return;
274 : }
275 :
276 0 : MOZ_ASSERT(mStreams.Contains(aStream));
277 :
278 0 : PendingOperation* opt = mPendingOperations.AppendElement();
279 0 : opt->mStream = aStream;
280 0 : opt->mEventTarget = aEventTarget ? aEventTarget : NS_GetCurrentThread();
281 :
282 0 : if (mState == eActiveMigrating || mState == eInactiveMigrating) {
283 : // This operation will be continued when the migration is completed.
284 0 : return;
285 : }
286 :
287 0 : MOZ_ASSERT(mState == eActive);
288 :
289 0 : if (mOwningEventTarget->IsOnCurrentThread()) {
290 0 : SendStreamNeeded();
291 0 : return;
292 : }
293 :
294 0 : RefPtr<StreamNeededRunnable> runnable = new StreamNeededRunnable(this);
295 0 : mOwningEventTarget->Dispatch(runnable.forget(), NS_DISPATCH_NORMAL);
296 : }
297 :
298 : mozilla::ipc::IPCResult
299 0 : IPCBlobInputStreamChild::RecvStreamReady(const OptionalIPCStream& aStream)
300 : {
301 0 : nsCOMPtr<nsIInputStream> stream = mozilla::ipc::DeserializeIPCStream(aStream);
302 :
303 0 : RefPtr<IPCBlobInputStream> pendingStream;
304 0 : nsCOMPtr<nsIEventTarget> eventTarget;
305 :
306 : {
307 0 : MutexAutoLock lock(mMutex);
308 0 : MOZ_ASSERT(!mPendingOperations.IsEmpty());
309 0 : MOZ_ASSERT(mState == eActive);
310 :
311 0 : pendingStream = mPendingOperations[0].mStream;
312 0 : eventTarget = mPendingOperations[0].mEventTarget;
313 :
314 0 : mPendingOperations.RemoveElementAt(0);
315 : }
316 :
317 : RefPtr<StreamReadyRunnable> runnable =
318 0 : new StreamReadyRunnable(pendingStream, stream);
319 0 : eventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
320 :
321 0 : return IPC_OK();
322 : }
323 :
324 : void
325 0 : IPCBlobInputStreamChild::Migrated()
326 : {
327 0 : MutexAutoLock lock(mMutex);
328 0 : MOZ_ASSERT(mState == eInactiveMigrating);
329 :
330 0 : if (mWorkerHolder) {
331 : RefPtr<ReleaseWorkerHolderRunnable> runnable =
332 0 : new ReleaseWorkerHolderRunnable(Move(mWorkerHolder));
333 0 : mOwningEventTarget->Dispatch(runnable, NS_DISPATCH_NORMAL);
334 : }
335 :
336 0 : mOwningEventTarget = GetCurrentThreadSerialEventTarget();
337 0 : MOZ_ASSERT(IPCBlobInputStreamThread::IsOnFileEventTarget(mOwningEventTarget));
338 :
339 : // Maybe we have no reasons to keep this actor alive.
340 0 : if (mStreams.IsEmpty()) {
341 0 : mState = eInactive;
342 0 : SendClose();
343 0 : return;
344 : }
345 :
346 0 : mState = eActive;
347 :
348 : // Let's processing the pending operations. We need a stream for each pending
349 : // operation.
350 0 : for (uint32_t i = 0; i < mPendingOperations.Length(); ++i) {
351 0 : SendStreamNeeded();
352 : }
353 : }
354 :
355 : } // namespace dom
356 : } // namespace mozilla
|