Line data Source code
1 : /* This Source Code Form is subject to the terms of the Mozilla Public
2 : * License, v. 2.0. If a copy of the MPL was not distributed with this
3 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 :
5 : #include "nsStreamTransportService.h"
6 : #include "nsXPCOMCIDInternal.h"
7 : #include "nsNetSegmentUtils.h"
8 : #include "nsTransportUtils.h"
9 : #include "nsStreamUtils.h"
10 : #include "nsError.h"
11 : #include "nsNetCID.h"
12 :
13 : #include "nsIAsyncInputStream.h"
14 : #include "nsIAsyncOutputStream.h"
15 : #include "nsISeekableStream.h"
16 : #include "nsIPipe.h"
17 : #include "nsITransport.h"
18 : #include "nsIObserverService.h"
19 : #include "nsIThreadPool.h"
20 : #include "mozilla/Services.h"
21 :
22 : namespace mozilla {
23 : namespace net {
24 :
25 : //-----------------------------------------------------------------------------
26 : // nsInputStreamTransport
27 : //
28 : // Implements nsIInputStream as a wrapper around the real input stream. This
29 : // allows the transport to support seeking, range-limiting, progress reporting,
30 : // and close-when-done semantics while utilizing NS_AsyncCopy.
31 : //-----------------------------------------------------------------------------
32 :
33 : class nsInputStreamTransport : public nsITransport
34 : , public nsIInputStream
35 : {
36 : public:
37 : NS_DECL_THREADSAFE_ISUPPORTS
38 : NS_DECL_NSITRANSPORT
39 : NS_DECL_NSIINPUTSTREAM
40 :
41 66 : nsInputStreamTransport(nsIInputStream *source,
42 : uint64_t offset,
43 : uint64_t limit,
44 : bool closeWhenDone)
45 66 : : mSource(source)
46 : , mOffset(offset)
47 : , mLimit(limit)
48 : , mCloseWhenDone(closeWhenDone)
49 : , mFirstTime(true)
50 66 : , mInProgress(false)
51 : {
52 66 : }
53 :
54 : private:
55 132 : virtual ~nsInputStreamTransport()
56 66 : {
57 198 : }
58 :
59 : nsCOMPtr<nsIAsyncInputStream> mPipeIn;
60 :
61 : // while the copy is active, these members may only be accessed from the
62 : // nsIInputStream implementation.
63 : nsCOMPtr<nsITransportEventSink> mEventSink;
64 : nsCOMPtr<nsIInputStream> mSource;
65 : int64_t mOffset;
66 : int64_t mLimit;
67 : bool mCloseWhenDone;
68 : bool mFirstTime;
69 :
70 : // this variable serves as a lock to prevent the state of the transport
71 : // from being modified once the copy is in progress.
72 : bool mInProgress;
73 : };
74 :
75 726 : NS_IMPL_ISUPPORTS(nsInputStreamTransport,
76 : nsITransport,
77 : nsIInputStream)
78 :
79 : /** nsITransport **/
80 :
81 : NS_IMETHODIMP
82 66 : nsInputStreamTransport::OpenInputStream(uint32_t flags,
83 : uint32_t segsize,
84 : uint32_t segcount,
85 : nsIInputStream **result)
86 : {
87 66 : NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
88 :
89 : nsresult rv;
90 : nsCOMPtr<nsIEventTarget> target =
91 132 : do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
92 66 : if (NS_FAILED(rv)) return rv;
93 :
94 : // XXX if the caller requests an unbuffered stream, then perhaps
95 : // we'd want to simply return mSource; however, then we would
96 : // not be reading mSource on a background thread. is this ok?
97 :
98 66 : bool nonblocking = !(flags & OPEN_BLOCKING);
99 :
100 66 : net_ResolveSegmentParams(segsize, segcount);
101 :
102 132 : nsCOMPtr<nsIAsyncOutputStream> pipeOut;
103 132 : rv = NS_NewPipe2(getter_AddRefs(mPipeIn),
104 132 : getter_AddRefs(pipeOut),
105 : nonblocking, true,
106 : segsize, segcount);
107 66 : if (NS_FAILED(rv)) return rv;
108 :
109 66 : mInProgress = true;
110 :
111 : // startup async copy process...
112 132 : rv = NS_AsyncCopy(this, pipeOut, target,
113 66 : NS_ASYNCCOPY_VIA_WRITESEGMENTS, segsize);
114 66 : if (NS_SUCCEEDED(rv))
115 66 : NS_ADDREF(*result = mPipeIn);
116 :
117 66 : return rv;
118 : }
119 :
120 : NS_IMETHODIMP
121 0 : nsInputStreamTransport::OpenOutputStream(uint32_t flags,
122 : uint32_t segsize,
123 : uint32_t segcount,
124 : nsIOutputStream **result)
125 : {
126 : // this transport only supports reading!
127 0 : NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream");
128 0 : return NS_ERROR_UNEXPECTED;
129 : }
130 :
131 : NS_IMETHODIMP
132 0 : nsInputStreamTransport::Close(nsresult reason)
133 : {
134 0 : if (NS_SUCCEEDED(reason))
135 0 : reason = NS_BASE_STREAM_CLOSED;
136 :
137 0 : return mPipeIn->CloseWithStatus(reason);
138 : }
139 :
140 : NS_IMETHODIMP
141 0 : nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink,
142 : nsIEventTarget *target)
143 : {
144 0 : NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
145 :
146 0 : if (target)
147 0 : return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink),
148 0 : sink, target);
149 :
150 0 : mEventSink = sink;
151 0 : return NS_OK;
152 : }
153 :
154 : /** nsIInputStream **/
155 :
156 : NS_IMETHODIMP
157 66 : nsInputStreamTransport::Close()
158 : {
159 66 : if (mCloseWhenDone)
160 66 : mSource->Close();
161 :
162 : // make additional reads return early...
163 66 : mOffset = mLimit = 0;
164 66 : return NS_OK;
165 : }
166 :
167 : NS_IMETHODIMP
168 0 : nsInputStreamTransport::Available(uint64_t *result)
169 : {
170 0 : return NS_ERROR_NOT_IMPLEMENTED;
171 : }
172 :
173 : NS_IMETHODIMP
174 144 : nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result)
175 : {
176 144 : if (mFirstTime) {
177 66 : mFirstTime = false;
178 66 : if (mOffset != 0) {
179 : // read from current position if offset equal to max
180 66 : if (mOffset != -1) {
181 0 : nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSource);
182 0 : if (seekable)
183 0 : seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset);
184 : }
185 : // reset offset to zero so we can use it to enforce limit
186 66 : mOffset = 0;
187 : }
188 : }
189 :
190 : // limit amount read
191 144 : uint64_t max = count;
192 144 : if (mLimit != -1) {
193 0 : max = mLimit - mOffset;
194 0 : if (max == 0) {
195 0 : *result = 0;
196 0 : return NS_OK;
197 : }
198 : }
199 :
200 144 : if (count > max)
201 0 : count = static_cast<uint32_t>(max);
202 :
203 144 : nsresult rv = mSource->Read(buf, count, result);
204 :
205 144 : if (NS_SUCCEEDED(rv)) {
206 144 : mOffset += *result;
207 144 : if (mEventSink)
208 0 : mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset,
209 0 : mLimit);
210 : }
211 144 : return rv;
212 : }
213 :
214 : NS_IMETHODIMP
215 0 : nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure,
216 : uint32_t count, uint32_t *result)
217 : {
218 0 : return NS_ERROR_NOT_IMPLEMENTED;
219 : }
220 :
221 : NS_IMETHODIMP
222 0 : nsInputStreamTransport::IsNonBlocking(bool *result)
223 : {
224 0 : *result = false;
225 0 : return NS_OK;
226 : }
227 :
228 : //-----------------------------------------------------------------------------
229 : // nsOutputStreamTransport
230 : //
231 : // Implements nsIOutputStream as a wrapper around the real input stream. This
232 : // allows the transport to support seeking, range-limiting, progress reporting,
233 : // and close-when-done semantics while utilizing NS_AsyncCopy.
234 : //-----------------------------------------------------------------------------
235 :
236 : class nsOutputStreamTransport : public nsITransport
237 : , public nsIOutputStream
238 : {
239 : public:
240 : NS_DECL_THREADSAFE_ISUPPORTS
241 : NS_DECL_NSITRANSPORT
242 : NS_DECL_NSIOUTPUTSTREAM
243 :
244 0 : nsOutputStreamTransport(nsIOutputStream *sink,
245 : int64_t offset,
246 : int64_t limit,
247 : bool closeWhenDone)
248 0 : : mSink(sink)
249 : , mOffset(offset)
250 : , mLimit(limit)
251 : , mCloseWhenDone(closeWhenDone)
252 : , mFirstTime(true)
253 0 : , mInProgress(false)
254 : {
255 0 : }
256 :
257 : private:
258 0 : virtual ~nsOutputStreamTransport()
259 0 : {
260 0 : }
261 :
262 : nsCOMPtr<nsIAsyncOutputStream> mPipeOut;
263 :
264 : // while the copy is active, these members may only be accessed from the
265 : // nsIOutputStream implementation.
266 : nsCOMPtr<nsITransportEventSink> mEventSink;
267 : nsCOMPtr<nsIOutputStream> mSink;
268 : int64_t mOffset;
269 : int64_t mLimit;
270 : bool mCloseWhenDone;
271 : bool mFirstTime;
272 :
273 : // this variable serves as a lock to prevent the state of the transport
274 : // from being modified once the copy is in progress.
275 : bool mInProgress;
276 : };
277 :
278 0 : NS_IMPL_ISUPPORTS(nsOutputStreamTransport,
279 : nsITransport,
280 : nsIOutputStream)
281 :
282 : /** nsITransport **/
283 :
284 : NS_IMETHODIMP
285 0 : nsOutputStreamTransport::OpenInputStream(uint32_t flags,
286 : uint32_t segsize,
287 : uint32_t segcount,
288 : nsIInputStream **result)
289 : {
290 : // this transport only supports writing!
291 0 : NS_NOTREACHED("nsOutputStreamTransport::OpenInputStream");
292 0 : return NS_ERROR_UNEXPECTED;
293 : }
294 :
295 : NS_IMETHODIMP
296 0 : nsOutputStreamTransport::OpenOutputStream(uint32_t flags,
297 : uint32_t segsize,
298 : uint32_t segcount,
299 : nsIOutputStream **result)
300 : {
301 0 : NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
302 :
303 : nsresult rv;
304 : nsCOMPtr<nsIEventTarget> target =
305 0 : do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
306 0 : if (NS_FAILED(rv)) return rv;
307 :
308 : // XXX if the caller requests an unbuffered stream, then perhaps
309 : // we'd want to simply return mSink; however, then we would
310 : // not be writing to mSink on a background thread. is this ok?
311 :
312 0 : bool nonblocking = !(flags & OPEN_BLOCKING);
313 :
314 0 : net_ResolveSegmentParams(segsize, segcount);
315 :
316 0 : nsCOMPtr<nsIAsyncInputStream> pipeIn;
317 0 : rv = NS_NewPipe2(getter_AddRefs(pipeIn),
318 0 : getter_AddRefs(mPipeOut),
319 : true, nonblocking,
320 : segsize, segcount);
321 0 : if (NS_FAILED(rv)) return rv;
322 :
323 0 : mInProgress = true;
324 :
325 : // startup async copy process...
326 0 : rv = NS_AsyncCopy(pipeIn, this, target,
327 0 : NS_ASYNCCOPY_VIA_READSEGMENTS, segsize);
328 0 : if (NS_SUCCEEDED(rv))
329 0 : NS_ADDREF(*result = mPipeOut);
330 :
331 0 : return rv;
332 : }
333 :
334 : NS_IMETHODIMP
335 0 : nsOutputStreamTransport::Close(nsresult reason)
336 : {
337 0 : if (NS_SUCCEEDED(reason))
338 0 : reason = NS_BASE_STREAM_CLOSED;
339 :
340 0 : return mPipeOut->CloseWithStatus(reason);
341 : }
342 :
343 : NS_IMETHODIMP
344 0 : nsOutputStreamTransport::SetEventSink(nsITransportEventSink *sink,
345 : nsIEventTarget *target)
346 : {
347 0 : NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
348 :
349 0 : if (target)
350 0 : return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink),
351 0 : sink, target);
352 :
353 0 : mEventSink = sink;
354 0 : return NS_OK;
355 : }
356 :
357 : /** nsIOutputStream **/
358 :
359 : NS_IMETHODIMP
360 0 : nsOutputStreamTransport::Close()
361 : {
362 0 : if (mCloseWhenDone)
363 0 : mSink->Close();
364 :
365 : // make additional writes return early...
366 0 : mOffset = mLimit = 0;
367 0 : return NS_OK;
368 : }
369 :
370 : NS_IMETHODIMP
371 0 : nsOutputStreamTransport::Flush()
372 : {
373 0 : return NS_OK;
374 : }
375 :
376 : NS_IMETHODIMP
377 0 : nsOutputStreamTransport::Write(const char *buf, uint32_t count, uint32_t *result)
378 : {
379 0 : if (mFirstTime) {
380 0 : mFirstTime = false;
381 0 : if (mOffset != 0) {
382 : // write to current position if offset equal to max
383 0 : if (mOffset != -1) {
384 0 : nsCOMPtr<nsISeekableStream> seekable = do_QueryInterface(mSink);
385 0 : if (seekable)
386 0 : seekable->Seek(nsISeekableStream::NS_SEEK_SET, mOffset);
387 : }
388 : // reset offset to zero so we can use it to enforce limit
389 0 : mOffset = 0;
390 : }
391 : }
392 :
393 : // limit amount written
394 0 : uint64_t max = count;
395 0 : if (mLimit != -1) {
396 0 : max = mLimit - mOffset;
397 0 : if (max == 0) {
398 0 : *result = 0;
399 0 : return NS_OK;
400 : }
401 : }
402 :
403 0 : if (count > max)
404 0 : count = static_cast<uint32_t>(max);
405 :
406 0 : nsresult rv = mSink->Write(buf, count, result);
407 :
408 0 : if (NS_SUCCEEDED(rv)) {
409 0 : mOffset += *result;
410 0 : if (mEventSink)
411 0 : mEventSink->OnTransportStatus(this, NS_NET_STATUS_WRITING, mOffset,
412 0 : mLimit);
413 : }
414 0 : return rv;
415 : }
416 :
417 : NS_IMETHODIMP
418 0 : nsOutputStreamTransport::WriteSegments(nsReadSegmentFun reader, void *closure,
419 : uint32_t count, uint32_t *result)
420 : {
421 0 : return NS_ERROR_NOT_IMPLEMENTED;
422 : }
423 :
424 : NS_IMETHODIMP
425 0 : nsOutputStreamTransport::WriteFrom(nsIInputStream *in, uint32_t count, uint32_t *result)
426 : {
427 0 : return NS_ERROR_NOT_IMPLEMENTED;
428 : }
429 :
430 : NS_IMETHODIMP
431 0 : nsOutputStreamTransport::IsNonBlocking(bool *result)
432 : {
433 0 : *result = false;
434 0 : return NS_OK;
435 : }
436 :
437 : //-----------------------------------------------------------------------------
438 : // nsStreamTransportService
439 : //-----------------------------------------------------------------------------
440 :
441 0 : nsStreamTransportService::~nsStreamTransportService()
442 : {
443 0 : NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
444 0 : }
445 :
446 : nsresult
447 3 : nsStreamTransportService::Init()
448 : {
449 3 : mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID);
450 3 : NS_ENSURE_STATE(mPool);
451 :
452 : // Configure the pool
453 3 : mPool->SetName(NS_LITERAL_CSTRING("StreamTrans"));
454 3 : mPool->SetThreadLimit(25);
455 3 : mPool->SetIdleThreadLimit(1);
456 3 : mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
457 :
458 : nsCOMPtr<nsIObserverService> obsSvc =
459 6 : mozilla::services::GetObserverService();
460 3 : if (obsSvc)
461 3 : obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
462 3 : return NS_OK;
463 : }
464 :
465 847 : NS_IMPL_ISUPPORTS(nsStreamTransportService,
466 : nsIStreamTransportService,
467 : nsIEventTarget,
468 : nsIObserver)
469 :
470 : NS_IMETHODIMP
471 0 : nsStreamTransportService::DispatchFromScript(nsIRunnable *task, uint32_t flags)
472 : {
473 0 : nsCOMPtr<nsIRunnable> event(task);
474 0 : return Dispatch(event.forget(), flags);
475 : }
476 :
477 : NS_IMETHODIMP
478 73 : nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task, uint32_t flags)
479 : {
480 146 : nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths
481 146 : nsCOMPtr<nsIThreadPool> pool;
482 : {
483 146 : mozilla::MutexAutoLock lock(mShutdownLock);
484 73 : if (mIsShutdown) {
485 0 : return NS_ERROR_NOT_INITIALIZED;
486 : }
487 73 : pool = mPool;
488 : }
489 73 : NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
490 73 : return pool->Dispatch(event.forget(), flags);
491 : }
492 :
493 : NS_IMETHODIMP
494 0 : nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>, uint32_t)
495 : {
496 0 : return NS_ERROR_NOT_IMPLEMENTED;
497 : }
498 :
499 : NS_IMETHODIMP_(bool)
500 0 : nsStreamTransportService::IsOnCurrentThreadInfallible()
501 : {
502 0 : nsCOMPtr<nsIThreadPool> pool;
503 : {
504 0 : mozilla::MutexAutoLock lock(mShutdownLock);
505 0 : pool = mPool;
506 : }
507 0 : if (!pool) {
508 0 : return false;
509 : }
510 0 : return pool->IsOnCurrentThread();
511 : }
512 :
513 : NS_IMETHODIMP
514 0 : nsStreamTransportService::IsOnCurrentThread(bool *result)
515 : {
516 0 : nsCOMPtr<nsIThreadPool> pool;
517 : {
518 0 : mozilla::MutexAutoLock lock(mShutdownLock);
519 0 : if (mIsShutdown) {
520 0 : return NS_ERROR_NOT_INITIALIZED;
521 : }
522 0 : pool = mPool;
523 : }
524 0 : NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
525 0 : return pool->IsOnCurrentThread(result);
526 : }
527 :
528 : NS_IMETHODIMP
529 66 : nsStreamTransportService::CreateInputTransport(nsIInputStream *stream,
530 : int64_t offset,
531 : int64_t limit,
532 : bool closeWhenDone,
533 : nsITransport **result)
534 : {
535 : nsInputStreamTransport *trans =
536 66 : new nsInputStreamTransport(stream, offset, limit, closeWhenDone);
537 66 : if (!trans)
538 0 : return NS_ERROR_OUT_OF_MEMORY;
539 66 : NS_ADDREF(*result = trans);
540 66 : return NS_OK;
541 : }
542 :
543 : NS_IMETHODIMP
544 0 : nsStreamTransportService::CreateOutputTransport(nsIOutputStream *stream,
545 : int64_t offset,
546 : int64_t limit,
547 : bool closeWhenDone,
548 : nsITransport **result)
549 : {
550 : nsOutputStreamTransport *trans =
551 0 : new nsOutputStreamTransport(stream, offset, limit, closeWhenDone);
552 0 : if (!trans)
553 0 : return NS_ERROR_OUT_OF_MEMORY;
554 0 : NS_ADDREF(*result = trans);
555 0 : return NS_OK;
556 : }
557 :
558 : NS_IMETHODIMP
559 0 : nsStreamTransportService::Observe(nsISupports *subject, const char *topic,
560 : const char16_t *data)
561 : {
562 0 : NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
563 :
564 : {
565 0 : mozilla::MutexAutoLock lock(mShutdownLock);
566 0 : mIsShutdown = true;
567 : }
568 :
569 0 : if (mPool) {
570 0 : mPool->Shutdown();
571 0 : mPool = nullptr;
572 : }
573 0 : return NS_OK;
574 : }
575 :
576 : class AvailableEvent final : public Runnable
577 : {
578 : public:
579 0 : AvailableEvent(nsIInputStream* stream,
580 : nsIInputAvailableCallback* callback)
581 0 : : Runnable("net::AvailableEvent")
582 : , mStream(stream)
583 : , mCallback(callback)
584 : , mDoingCallback(false)
585 : , mSize(0)
586 0 : , mResultForCallback(NS_OK)
587 : {
588 0 : mCallbackTarget = GetCurrentThreadEventTarget();
589 0 : }
590 :
591 0 : NS_IMETHOD Run() override
592 : {
593 0 : if (mDoingCallback) {
594 : // pong
595 0 : mCallback->OnInputAvailableComplete(mSize, mResultForCallback);
596 0 : mCallback = nullptr;
597 : } else {
598 : // ping
599 0 : mResultForCallback = mStream->Available(&mSize);
600 0 : mStream = nullptr;
601 0 : mDoingCallback = true;
602 :
603 0 : nsCOMPtr<nsIRunnable> event(this); // overly cute
604 0 : mCallbackTarget->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
605 0 : mCallbackTarget = nullptr;
606 : }
607 0 : return NS_OK;
608 : }
609 :
610 : private:
611 0 : virtual ~AvailableEvent() { }
612 :
613 : nsCOMPtr<nsIInputStream> mStream;
614 : nsCOMPtr<nsIInputAvailableCallback> mCallback;
615 : nsCOMPtr<nsIEventTarget> mCallbackTarget;
616 : bool mDoingCallback;
617 : uint64_t mSize;
618 : nsresult mResultForCallback;
619 : };
620 :
621 : NS_IMETHODIMP
622 0 : nsStreamTransportService::InputAvailable(nsIInputStream *stream,
623 : nsIInputAvailableCallback *callback)
624 : {
625 0 : nsCOMPtr<nsIThreadPool> pool;
626 : {
627 0 : mozilla::MutexAutoLock lock(mShutdownLock);
628 0 : if (mIsShutdown) {
629 0 : return NS_ERROR_NOT_INITIALIZED;
630 : }
631 0 : pool = mPool;
632 : }
633 0 : nsCOMPtr<nsIRunnable> event = new AvailableEvent(stream, callback);
634 0 : return pool->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
635 : }
636 :
637 : } // namespace net
638 : } // namespace mozilla
|