Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 : /* This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 :
6 : #include "SlicedInputStream.h"
7 : #include "mozilla/ipc/InputStreamUtils.h"
8 : #include "nsISeekableStream.h"
9 : #include "nsStreamUtils.h"
10 :
11 : using namespace mozilla::ipc;
12 :
13 0 : NS_IMPL_ADDREF(SlicedInputStream);
14 0 : NS_IMPL_RELEASE(SlicedInputStream);
15 :
16 0 : NS_INTERFACE_MAP_BEGIN(SlicedInputStream)
17 0 : NS_INTERFACE_MAP_ENTRY(nsIInputStream)
18 0 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsICloneableInputStream,
19 : mWeakCloneableInputStream || !mInputStream)
20 0 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIIPCSerializableInputStream,
21 : mWeakIPCSerializableInputStream || !mInputStream)
22 0 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsISeekableStream,
23 : mWeakSeekableInputStream || !mInputStream)
24 0 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIAsyncInputStream,
25 : mWeakAsyncInputStream || !mInputStream)
26 0 : NS_INTERFACE_MAP_ENTRY_CONDITIONAL(nsIInputStreamCallback,
27 : mWeakAsyncInputStream || !mInputStream)
28 0 : NS_INTERFACE_MAP_ENTRY_AMBIGUOUS(nsISupports, nsIInputStream)
29 0 : NS_INTERFACE_MAP_END
30 :
31 0 : SlicedInputStream::SlicedInputStream(nsIInputStream* aInputStream,
32 0 : uint64_t aStart, uint64_t aLength)
33 : : mWeakCloneableInputStream(nullptr)
34 : , mWeakIPCSerializableInputStream(nullptr)
35 : , mWeakSeekableInputStream(nullptr)
36 : , mWeakAsyncInputStream(nullptr)
37 : , mStart(aStart)
38 : , mLength(aLength)
39 : , mCurPos(0)
40 : , mClosed(false)
41 : , mAsyncWaitFlags(0)
42 0 : , mAsyncWaitRequestedCount(0)
43 : {
44 0 : MOZ_ASSERT(aInputStream);
45 0 : SetSourceStream(aInputStream);
46 0 : }
47 :
48 0 : SlicedInputStream::SlicedInputStream()
49 : : mWeakCloneableInputStream(nullptr)
50 : , mWeakIPCSerializableInputStream(nullptr)
51 : , mWeakSeekableInputStream(nullptr)
52 : , mWeakAsyncInputStream(nullptr)
53 : , mStart(0)
54 : , mLength(0)
55 : , mCurPos(0)
56 : , mClosed(false)
57 : , mAsyncWaitFlags(0)
58 0 : , mAsyncWaitRequestedCount(0)
59 0 : {}
60 :
61 0 : SlicedInputStream::~SlicedInputStream()
62 0 : {}
63 :
64 : void
65 0 : SlicedInputStream::SetSourceStream(nsIInputStream* aInputStream)
66 : {
67 0 : MOZ_ASSERT(!mInputStream);
68 0 : MOZ_ASSERT(aInputStream);
69 :
70 0 : mInputStream = aInputStream;
71 :
72 : nsCOMPtr<nsICloneableInputStream> cloneableStream =
73 0 : do_QueryInterface(aInputStream);
74 0 : if (cloneableStream && SameCOMIdentity(aInputStream, cloneableStream)) {
75 0 : mWeakCloneableInputStream = cloneableStream;
76 : }
77 :
78 : nsCOMPtr<nsIIPCSerializableInputStream> serializableStream =
79 0 : do_QueryInterface(aInputStream);
80 0 : if (serializableStream &&
81 0 : SameCOMIdentity(aInputStream, serializableStream)) {
82 0 : mWeakIPCSerializableInputStream = serializableStream;
83 : }
84 :
85 : nsCOMPtr<nsISeekableStream> seekableStream =
86 0 : do_QueryInterface(aInputStream);
87 0 : if (seekableStream && SameCOMIdentity(aInputStream, seekableStream)) {
88 0 : mWeakSeekableInputStream = seekableStream;
89 : }
90 :
91 : nsCOMPtr<nsIAsyncInputStream> asyncInputStream =
92 0 : do_QueryInterface(aInputStream);
93 0 : if (asyncInputStream && SameCOMIdentity(aInputStream, asyncInputStream)) {
94 0 : mWeakAsyncInputStream = asyncInputStream;
95 : }
96 0 : }
97 :
98 : NS_IMETHODIMP
99 0 : SlicedInputStream::Close()
100 : {
101 0 : NS_ENSURE_STATE(mInputStream);
102 :
103 0 : mClosed = true;
104 0 : return NS_OK;
105 : }
106 :
107 : // nsIInputStream interface
108 :
109 : NS_IMETHODIMP
110 0 : SlicedInputStream::Available(uint64_t* aLength)
111 : {
112 0 : NS_ENSURE_STATE(mInputStream);
113 :
114 0 : if (mClosed) {
115 0 : return NS_BASE_STREAM_CLOSED;
116 : }
117 :
118 0 : nsresult rv = mInputStream->Available(aLength);
119 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
120 0 : return rv;
121 : }
122 :
123 : // Let's remove extra length from the end.
124 0 : if (*aLength + mCurPos > mStart + mLength) {
125 0 : *aLength -= XPCOM_MIN(*aLength, (*aLength + mCurPos) - (mStart + mLength));
126 : }
127 :
128 : // Let's remove extra length from the begin.
129 0 : if (mCurPos < mStart) {
130 0 : *aLength -= XPCOM_MIN(*aLength, mStart - mCurPos);
131 : }
132 :
133 0 : return NS_OK;
134 : }
135 :
136 : NS_IMETHODIMP
137 0 : SlicedInputStream::Read(char* aBuffer, uint32_t aCount, uint32_t* aReadCount)
138 : {
139 0 : *aReadCount = 0;
140 :
141 0 : if (mClosed) {
142 0 : return NS_OK;
143 : }
144 :
145 0 : if (mCurPos < mStart) {
146 : nsCOMPtr<nsISeekableStream> seekableStream =
147 0 : do_QueryInterface(mInputStream);
148 0 : if (seekableStream) {
149 0 : nsresult rv = seekableStream->Seek(nsISeekableStream::NS_SEEK_SET,
150 0 : mStart);
151 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
152 0 : return rv;
153 : }
154 :
155 0 : mCurPos = mStart;
156 : } else {
157 : char buf[4096];
158 0 : while (mCurPos < mStart) {
159 : uint32_t bytesRead;
160 0 : uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf));
161 0 : nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead);
162 0 : if (NS_WARN_IF(NS_FAILED(rv)) || bytesRead == 0) {
163 0 : return rv;
164 : }
165 :
166 0 : mCurPos += bytesRead;
167 : }
168 : }
169 : }
170 :
171 : // Let's reduce aCount in case it's too big.
172 0 : if (mCurPos + aCount > mStart + mLength) {
173 0 : aCount = mStart + mLength - mCurPos;
174 : }
175 :
176 0 : nsresult rv = mInputStream->Read(aBuffer, aCount, aReadCount);
177 0 : if (NS_WARN_IF(NS_FAILED(rv)) || *aReadCount == 0) {
178 0 : return rv;
179 : }
180 :
181 0 : mCurPos += *aReadCount;
182 0 : return NS_OK;
183 : }
184 :
185 : NS_IMETHODIMP
186 0 : SlicedInputStream::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure,
187 : uint32_t aCount, uint32_t *aResult)
188 : {
189 0 : return NS_ERROR_NOT_IMPLEMENTED;
190 : }
191 :
192 : NS_IMETHODIMP
193 0 : SlicedInputStream::IsNonBlocking(bool* aNonBlocking)
194 : {
195 0 : NS_ENSURE_STATE(mInputStream);
196 0 : return mInputStream->IsNonBlocking(aNonBlocking);
197 : }
198 :
199 : // nsICloneableInputStream interface
200 :
201 : NS_IMETHODIMP
202 0 : SlicedInputStream::GetCloneable(bool* aCloneable)
203 : {
204 0 : NS_ENSURE_STATE(mInputStream);
205 0 : NS_ENSURE_STATE(mWeakCloneableInputStream);
206 :
207 0 : *aCloneable = true;
208 0 : return NS_OK;
209 : }
210 :
211 : NS_IMETHODIMP
212 0 : SlicedInputStream::Clone(nsIInputStream** aResult)
213 : {
214 0 : NS_ENSURE_STATE(mInputStream);
215 0 : NS_ENSURE_STATE(mWeakCloneableInputStream);
216 :
217 0 : nsCOMPtr<nsIInputStream> clonedStream;
218 0 : nsresult rv = mWeakCloneableInputStream->Clone(getter_AddRefs(clonedStream));
219 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
220 0 : return rv;
221 : }
222 :
223 : nsCOMPtr<nsIInputStream> sis =
224 0 : new SlicedInputStream(clonedStream, mStart, mLength);
225 :
226 0 : sis.forget(aResult);
227 0 : return NS_OK;
228 : }
229 :
230 : // nsIAsyncInputStream interface
231 :
232 : NS_IMETHODIMP
233 0 : SlicedInputStream::CloseWithStatus(nsresult aStatus)
234 : {
235 0 : NS_ENSURE_STATE(mInputStream);
236 0 : NS_ENSURE_STATE(mWeakAsyncInputStream);
237 :
238 0 : return mWeakAsyncInputStream->CloseWithStatus(aStatus);
239 : }
240 :
241 : NS_IMETHODIMP
242 0 : SlicedInputStream::AsyncWait(nsIInputStreamCallback* aCallback,
243 : uint32_t aFlags,
244 : uint32_t aRequestedCount,
245 : nsIEventTarget* aEventTarget)
246 : {
247 0 : NS_ENSURE_STATE(mInputStream);
248 0 : NS_ENSURE_STATE(mWeakAsyncInputStream);
249 :
250 0 : if (mAsyncWaitCallback && aCallback) {
251 0 : return NS_ERROR_FAILURE;
252 : }
253 :
254 0 : mAsyncWaitCallback = aCallback;
255 :
256 0 : if (!mAsyncWaitCallback) {
257 0 : return NS_OK;
258 : }
259 :
260 : // If we haven't started retrieving data, let's see if we can seek.
261 : // If we cannot seek, we will do consecutive reads.
262 0 : if (mCurPos < mStart && mWeakSeekableInputStream) {
263 : nsresult rv =
264 0 : mWeakSeekableInputStream->Seek(nsISeekableStream::NS_SEEK_SET, mStart);
265 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
266 0 : return rv;
267 : }
268 :
269 0 : mCurPos = mStart;
270 : }
271 :
272 0 : mAsyncWaitFlags = aFlags;
273 0 : mAsyncWaitRequestedCount = aRequestedCount;
274 0 : mAsyncWaitEventTarget = aEventTarget;
275 :
276 : // If we are not at the right position, let's do an asyncWait just internal.
277 0 : if (mCurPos < mStart) {
278 0 : return mWeakAsyncInputStream->AsyncWait(this, 0, mStart - mCurPos,
279 0 : aEventTarget);
280 : }
281 :
282 0 : return mWeakAsyncInputStream->AsyncWait(this, aFlags, aRequestedCount,
283 0 : aEventTarget);
284 : }
285 :
286 : // nsIInputStreamCallback
287 :
288 : NS_IMETHODIMP
289 0 : SlicedInputStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
290 : {
291 0 : MOZ_ASSERT(mInputStream);
292 0 : MOZ_ASSERT(mWeakAsyncInputStream);
293 0 : MOZ_ASSERT(mWeakAsyncInputStream == aStream);
294 :
295 : // We have been canceled in the meanwhile.
296 0 : if (!mAsyncWaitCallback) {
297 0 : return NS_OK;
298 : }
299 :
300 0 : if (mCurPos < mStart) {
301 : char buf[4096];
302 0 : while (mCurPos < mStart) {
303 : uint32_t bytesRead;
304 0 : uint64_t bufCount = XPCOM_MIN(mStart - mCurPos, (uint64_t)sizeof(buf));
305 0 : nsresult rv = mInputStream->Read(buf, bufCount, &bytesRead);
306 0 : if (rv == NS_BASE_STREAM_WOULD_BLOCK) {
307 0 : return mWeakAsyncInputStream->AsyncWait(this, 0, mStart - mCurPos,
308 0 : mAsyncWaitEventTarget);
309 : }
310 :
311 0 : if (NS_WARN_IF(NS_FAILED(rv)) || bytesRead == 0) {
312 0 : return RunAsyncWaitCallback();
313 : }
314 :
315 0 : mCurPos += bytesRead;
316 : }
317 :
318 : // Now we are ready to do the 'real' asyncWait.
319 0 : return mWeakAsyncInputStream->AsyncWait(this, mAsyncWaitFlags,
320 : mAsyncWaitRequestedCount,
321 0 : mAsyncWaitEventTarget);
322 : }
323 :
324 0 : return RunAsyncWaitCallback();
325 : }
326 :
327 : nsresult
328 0 : SlicedInputStream::RunAsyncWaitCallback()
329 : {
330 0 : nsCOMPtr<nsIInputStreamCallback> callback = mAsyncWaitCallback;
331 :
332 0 : mAsyncWaitCallback = nullptr;
333 0 : mAsyncWaitEventTarget = nullptr;
334 :
335 0 : return callback->OnInputStreamReady(this);
336 : }
337 :
338 : // nsIIPCSerializableInputStream
339 :
340 : void
341 0 : SlicedInputStream::Serialize(mozilla::ipc::InputStreamParams& aParams,
342 : FileDescriptorArray& aFileDescriptors)
343 : {
344 0 : MOZ_ASSERT(mInputStream);
345 0 : MOZ_ASSERT(mWeakIPCSerializableInputStream);
346 :
347 0 : SlicedInputStreamParams params;
348 0 : InputStreamHelper::SerializeInputStream(mInputStream, params.stream(),
349 0 : aFileDescriptors);
350 0 : params.start() = mStart;
351 0 : params.length() = mLength;
352 0 : params.curPos() = mCurPos;
353 0 : params.closed() = mClosed;
354 :
355 0 : aParams = params;
356 0 : }
357 :
358 : bool
359 0 : SlicedInputStream::Deserialize(const mozilla::ipc::InputStreamParams& aParams,
360 : const FileDescriptorArray& aFileDescriptors)
361 : {
362 0 : MOZ_ASSERT(!mInputStream);
363 0 : MOZ_ASSERT(!mWeakIPCSerializableInputStream);
364 :
365 0 : if (aParams.type() !=
366 : InputStreamParams::TSlicedInputStreamParams) {
367 0 : NS_ERROR("Received unknown parameters from the other process!");
368 0 : return false;
369 : }
370 :
371 : const SlicedInputStreamParams& params =
372 0 : aParams.get_SlicedInputStreamParams();
373 :
374 : nsCOMPtr<nsIInputStream> stream =
375 0 : InputStreamHelper::DeserializeInputStream(params.stream(),
376 0 : aFileDescriptors);
377 0 : if (!stream) {
378 0 : NS_WARNING("Deserialize failed!");
379 0 : return false;
380 : }
381 :
382 0 : SetSourceStream(stream);
383 :
384 0 : mStart = params.start();
385 0 : mLength = params.length();
386 0 : mCurPos = params.curPos();
387 0 : mClosed = params.closed();
388 :
389 0 : return true;
390 : }
391 :
392 : mozilla::Maybe<uint64_t>
393 0 : SlicedInputStream::ExpectedSerializedLength()
394 : {
395 0 : if (!mInputStream || !mWeakIPCSerializableInputStream) {
396 0 : return mozilla::Nothing();
397 : }
398 :
399 0 : return mWeakIPCSerializableInputStream->ExpectedSerializedLength();
400 : }
401 :
402 : // nsISeekableStream
403 :
404 : NS_IMETHODIMP
405 0 : SlicedInputStream::Seek(int32_t aWhence, int64_t aOffset)
406 : {
407 0 : NS_ENSURE_STATE(mInputStream);
408 0 : NS_ENSURE_STATE(mWeakSeekableInputStream);
409 :
410 : int64_t offset;
411 : nsresult rv;
412 :
413 0 : switch (aWhence) {
414 : case NS_SEEK_SET:
415 0 : offset = mStart + aOffset;
416 0 : break;
417 : case NS_SEEK_CUR:
418 : // mCurPos could be lower than mStart if the reading has not started yet.
419 0 : offset = XPCOM_MAX(mStart, mCurPos) + aOffset;
420 0 : break;
421 : case NS_SEEK_END: {
422 : uint64_t available;
423 0 : rv = mInputStream->Available(&available);
424 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
425 0 : return rv;
426 : }
427 :
428 0 : offset = XPCOM_MIN(mStart + mLength, available) + aOffset;
429 0 : break;
430 : }
431 : default:
432 0 : return NS_ERROR_ILLEGAL_VALUE;
433 : }
434 :
435 0 : if (offset < (int64_t)mStart || offset > (int64_t)(mStart + mLength)) {
436 0 : return NS_ERROR_INVALID_ARG;
437 : }
438 :
439 0 : rv = mWeakSeekableInputStream->Seek(NS_SEEK_SET, offset);
440 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
441 0 : return rv;
442 : }
443 :
444 0 : mCurPos = offset;
445 0 : return NS_OK;
446 : }
447 :
448 : NS_IMETHODIMP
449 0 : SlicedInputStream::Tell(int64_t *aResult)
450 : {
451 0 : NS_ENSURE_STATE(mInputStream);
452 0 : NS_ENSURE_STATE(mWeakSeekableInputStream);
453 :
454 0 : int64_t tell = 0;
455 :
456 0 : nsresult rv = mWeakSeekableInputStream->Tell(&tell);
457 0 : if (NS_WARN_IF(NS_FAILED(rv))) {
458 0 : return rv;
459 : }
460 :
461 0 : if (tell < (int64_t)mStart) {
462 0 : *aResult = 0;
463 0 : return NS_OK;
464 : }
465 :
466 0 : *aResult = tell - mStart;
467 0 : if (*aResult > (int64_t)mLength) {
468 0 : *aResult = mLength;
469 : }
470 :
471 0 : return NS_OK;
472 : }
473 :
474 : NS_IMETHODIMP
475 0 : SlicedInputStream::SetEOF()
476 : {
477 0 : NS_ENSURE_STATE(mInputStream);
478 0 : NS_ENSURE_STATE(mWeakSeekableInputStream);
479 :
480 0 : mClosed = true;
481 0 : return mWeakSeekableInputStream->SetEOF();
482 : }
|