Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : /* This Source Code Form is subject to the terms of the Mozilla Public
4 : * License, v. 2.0. If a copy of the MPL was not distributed with this
5 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 :
7 : #include <stdlib.h>
8 : #include "mozilla/Logging.h"
9 :
10 : #include "mozilla/Mutex.h"
11 : #include "mozilla/Attributes.h"
12 : #include "nsIInputStreamTee.h"
13 : #include "nsIInputStream.h"
14 : #include "nsIOutputStream.h"
15 : #include "nsCOMPtr.h"
16 : #include "nsAutoPtr.h"
17 : #include "nsIEventTarget.h"
18 : #include "nsThreadUtils.h"
19 :
20 : using namespace mozilla;
21 :
22 : #ifdef LOG
23 : #undef LOG
24 : #endif
25 :
26 : static LazyLogModule sTeeLog("nsInputStreamTee");
27 : #define LOG(args) MOZ_LOG(sTeeLog, mozilla::LogLevel::Debug, args)
28 :
29 : class nsInputStreamTee final : public nsIInputStreamTee
30 : {
31 : public:
32 : NS_DECL_THREADSAFE_ISUPPORTS
33 : NS_DECL_NSIINPUTSTREAM
34 : NS_DECL_NSIINPUTSTREAMTEE
35 :
36 : nsInputStreamTee();
37 : bool SinkIsValid();
38 : void InvalidateSink();
39 :
40 : private:
41 2 : ~nsInputStreamTee()
42 2 : {
43 2 : }
44 :
45 : nsresult TeeSegment(const char* aBuf, uint32_t aCount);
46 :
47 : static nsresult WriteSegmentFun(nsIInputStream*, void*, const char*,
48 : uint32_t, uint32_t, uint32_t*);
49 :
50 : private:
51 : nsCOMPtr<nsIInputStream> mSource;
52 : nsCOMPtr<nsIOutputStream> mSink;
53 : nsCOMPtr<nsIEventTarget> mEventTarget;
54 : nsWriteSegmentFun mWriter; // for implementing ReadSegments
55 : void* mClosure; // for implementing ReadSegments
56 : nsAutoPtr<Mutex> mLock; // synchronize access to mSinkIsValid
57 : bool mSinkIsValid; // False if TeeWriteEvent fails
58 : };
59 :
60 : class nsInputStreamTeeWriteEvent : public Runnable
61 : {
62 : public:
63 : // aTee's lock is held across construction of this object
64 0 : nsInputStreamTeeWriteEvent(const char* aBuf,
65 : uint32_t aCount,
66 : nsIOutputStream* aSink,
67 : nsInputStreamTee* aTee)
68 0 : : mozilla::Runnable("nsInputStreamTeeWriteEvent")
69 : {
70 : // copy the buffer - will be free'd by dtor
71 0 : mBuf = (char*)malloc(aCount);
72 0 : if (mBuf) {
73 0 : memcpy(mBuf, (char*)aBuf, aCount);
74 : }
75 0 : mCount = aCount;
76 0 : mSink = aSink;
77 : bool isNonBlocking;
78 0 : mSink->IsNonBlocking(&isNonBlocking);
79 0 : NS_ASSERTION(isNonBlocking == false, "mSink is nonblocking");
80 0 : mTee = aTee;
81 0 : }
82 :
83 0 : NS_IMETHOD Run() override
84 : {
85 0 : if (!mBuf) {
86 : NS_WARNING("nsInputStreamTeeWriteEvent::Run() "
87 0 : "memory not allocated\n");
88 0 : return NS_OK;
89 : }
90 0 : MOZ_ASSERT(mSink, "mSink is null!");
91 :
92 : // The output stream could have been invalidated between when
93 : // this event was dispatched and now, so check before writing.
94 0 : if (!mTee->SinkIsValid()) {
95 0 : return NS_OK;
96 : }
97 :
98 0 : LOG(("nsInputStreamTeeWriteEvent::Run() [%p]"
99 : "will write %u bytes to %p\n",
100 : this, mCount, mSink.get()));
101 :
102 0 : uint32_t totalBytesWritten = 0;
103 0 : while (mCount) {
104 : nsresult rv;
105 0 : uint32_t bytesWritten = 0;
106 0 : rv = mSink->Write(mBuf + totalBytesWritten, mCount, &bytesWritten);
107 0 : if (NS_FAILED(rv)) {
108 0 : LOG(("nsInputStreamTeeWriteEvent::Run[%p] error %" PRIx32 " in writing",
109 : this, static_cast<uint32_t>(rv)));
110 0 : mTee->InvalidateSink();
111 0 : break;
112 : }
113 0 : totalBytesWritten += bytesWritten;
114 0 : NS_ASSERTION(bytesWritten <= mCount, "wrote too much");
115 0 : mCount -= bytesWritten;
116 : }
117 0 : return NS_OK;
118 : }
119 :
120 : protected:
121 0 : virtual ~nsInputStreamTeeWriteEvent()
122 0 : {
123 0 : if (mBuf) {
124 0 : free(mBuf);
125 : }
126 0 : mBuf = nullptr;
127 0 : }
128 :
129 : private:
130 : char* mBuf;
131 : uint32_t mCount;
132 : nsCOMPtr<nsIOutputStream> mSink;
133 : // back pointer to the tee that created this runnable
134 : RefPtr<nsInputStreamTee> mTee;
135 : };
136 :
137 2 : nsInputStreamTee::nsInputStreamTee(): mLock(nullptr)
138 2 : , mSinkIsValid(true)
139 : {
140 2 : }
141 :
142 : bool
143 0 : nsInputStreamTee::SinkIsValid()
144 : {
145 0 : MutexAutoLock lock(*mLock);
146 0 : return mSinkIsValid;
147 : }
148 :
149 : void
150 0 : nsInputStreamTee::InvalidateSink()
151 : {
152 0 : MutexAutoLock lock(*mLock);
153 0 : mSinkIsValid = false;
154 0 : }
155 :
156 : nsresult
157 2 : nsInputStreamTee::TeeSegment(const char* aBuf, uint32_t aCount)
158 : {
159 2 : if (!mSink) {
160 0 : return NS_OK; // nothing to do
161 : }
162 2 : if (mLock) { // asynchronous case
163 0 : NS_ASSERTION(mEventTarget, "mEventTarget is null, mLock is not null.");
164 0 : if (!SinkIsValid()) {
165 0 : return NS_OK; // nothing to do
166 : }
167 : nsCOMPtr<nsIRunnable> event =
168 0 : new nsInputStreamTeeWriteEvent(aBuf, aCount, mSink, this);
169 0 : LOG(("nsInputStreamTee::TeeSegment [%p] dispatching write %u bytes\n",
170 : this, aCount));
171 0 : return mEventTarget->Dispatch(event, NS_DISPATCH_NORMAL);
172 : } else { // synchronous case
173 2 : NS_ASSERTION(!mEventTarget, "mEventTarget is not null, mLock is null.");
174 : nsresult rv;
175 2 : uint32_t totalBytesWritten = 0;
176 6 : while (aCount) {
177 2 : uint32_t bytesWritten = 0;
178 2 : rv = mSink->Write(aBuf + totalBytesWritten, aCount, &bytesWritten);
179 2 : if (NS_FAILED(rv)) {
180 : // ok, this is not a fatal error... just drop our reference to mSink
181 : // and continue on as if nothing happened.
182 0 : NS_WARNING("Write failed (non-fatal)");
183 : // catch possible misuse of the input stream tee
184 0 : NS_ASSERTION(rv != NS_BASE_STREAM_WOULD_BLOCK, "sink must be a blocking stream");
185 0 : mSink = nullptr;
186 0 : break;
187 : }
188 2 : totalBytesWritten += bytesWritten;
189 2 : NS_ASSERTION(bytesWritten <= aCount, "wrote too much");
190 2 : aCount -= bytesWritten;
191 : }
192 2 : return NS_OK;
193 : }
194 : }
195 :
196 : nsresult
197 0 : nsInputStreamTee::WriteSegmentFun(nsIInputStream* aIn, void* aClosure,
198 : const char* aFromSegment, uint32_t aOffset,
199 : uint32_t aCount, uint32_t* aWriteCount)
200 : {
201 0 : nsInputStreamTee* tee = reinterpret_cast<nsInputStreamTee*>(aClosure);
202 0 : nsresult rv = tee->mWriter(aIn, tee->mClosure, aFromSegment, aOffset,
203 0 : aCount, aWriteCount);
204 0 : if (NS_FAILED(rv) || (*aWriteCount == 0)) {
205 0 : NS_ASSERTION((NS_FAILED(rv) ? (*aWriteCount == 0) : true),
206 : "writer returned an error with non-zero writeCount");
207 0 : return rv;
208 : }
209 :
210 0 : return tee->TeeSegment(aFromSegment, *aWriteCount);
211 : }
212 :
213 24 : NS_IMPL_ISUPPORTS(nsInputStreamTee,
214 : nsIInputStreamTee,
215 : nsIInputStream)
216 : NS_IMETHODIMP
217 0 : nsInputStreamTee::Close()
218 : {
219 0 : if (NS_WARN_IF(!mSource)) {
220 0 : return NS_ERROR_NOT_INITIALIZED;
221 : }
222 0 : nsresult rv = mSource->Close();
223 0 : mSource = nullptr;
224 0 : mSink = nullptr;
225 0 : return rv;
226 : }
227 :
228 : NS_IMETHODIMP
229 1 : nsInputStreamTee::Available(uint64_t* aAvail)
230 : {
231 1 : if (NS_WARN_IF(!mSource)) {
232 0 : return NS_ERROR_NOT_INITIALIZED;
233 : }
234 1 : return mSource->Available(aAvail);
235 : }
236 :
237 : NS_IMETHODIMP
238 2 : nsInputStreamTee::Read(char* aBuf, uint32_t aCount, uint32_t* aBytesRead)
239 : {
240 2 : if (NS_WARN_IF(!mSource)) {
241 0 : return NS_ERROR_NOT_INITIALIZED;
242 : }
243 :
244 2 : nsresult rv = mSource->Read(aBuf, aCount, aBytesRead);
245 2 : if (NS_FAILED(rv) || (*aBytesRead == 0)) {
246 0 : return rv;
247 : }
248 :
249 2 : return TeeSegment(aBuf, *aBytesRead);
250 : }
251 :
252 : NS_IMETHODIMP
253 0 : nsInputStreamTee::ReadSegments(nsWriteSegmentFun aWriter,
254 : void* aClosure,
255 : uint32_t aCount,
256 : uint32_t* aBytesRead)
257 : {
258 0 : if (NS_WARN_IF(!mSource)) {
259 0 : return NS_ERROR_NOT_INITIALIZED;
260 : }
261 :
262 0 : mWriter = aWriter;
263 0 : mClosure = aClosure;
264 :
265 0 : return mSource->ReadSegments(WriteSegmentFun, this, aCount, aBytesRead);
266 : }
267 :
268 : NS_IMETHODIMP
269 0 : nsInputStreamTee::IsNonBlocking(bool* aResult)
270 : {
271 0 : if (NS_WARN_IF(!mSource)) {
272 0 : return NS_ERROR_NOT_INITIALIZED;
273 : }
274 0 : return mSource->IsNonBlocking(aResult);
275 : }
276 :
277 : NS_IMETHODIMP
278 2 : nsInputStreamTee::SetSource(nsIInputStream* aSource)
279 : {
280 2 : mSource = aSource;
281 2 : return NS_OK;
282 : }
283 :
284 : NS_IMETHODIMP
285 0 : nsInputStreamTee::GetSource(nsIInputStream** aSource)
286 : {
287 0 : NS_IF_ADDREF(*aSource = mSource);
288 0 : return NS_OK;
289 : }
290 :
291 : NS_IMETHODIMP
292 4 : nsInputStreamTee::SetSink(nsIOutputStream* aSink)
293 : {
294 : #ifdef DEBUG
295 4 : if (aSink) {
296 : bool nonBlocking;
297 2 : nsresult rv = aSink->IsNonBlocking(&nonBlocking);
298 2 : if (NS_FAILED(rv) || nonBlocking) {
299 0 : NS_ERROR("aSink should be a blocking stream");
300 : }
301 : }
302 : #endif
303 4 : mSink = aSink;
304 4 : return NS_OK;
305 : }
306 :
307 : NS_IMETHODIMP
308 0 : nsInputStreamTee::GetSink(nsIOutputStream** aSink)
309 : {
310 0 : NS_IF_ADDREF(*aSink = mSink);
311 0 : return NS_OK;
312 : }
313 :
314 : NS_IMETHODIMP
315 2 : nsInputStreamTee::SetEventTarget(nsIEventTarget* aEventTarget)
316 : {
317 2 : mEventTarget = aEventTarget;
318 2 : if (mEventTarget) {
319 : // Only need synchronization if this is an async tee
320 0 : mLock = new Mutex("nsInputStreamTee.mLock");
321 : }
322 2 : return NS_OK;
323 : }
324 :
325 : NS_IMETHODIMP
326 0 : nsInputStreamTee::GetEventTarget(nsIEventTarget** aEventTarget)
327 : {
328 0 : NS_IF_ADDREF(*aEventTarget = mEventTarget);
329 0 : return NS_OK;
330 : }
331 :
332 :
333 : nsresult
334 2 : NS_NewInputStreamTeeAsync(nsIInputStream** aResult,
335 : nsIInputStream* aSource,
336 : nsIOutputStream* aSink,
337 : nsIEventTarget* aEventTarget)
338 : {
339 : nsresult rv;
340 :
341 4 : nsCOMPtr<nsIInputStreamTee> tee = new nsInputStreamTee();
342 2 : rv = tee->SetSource(aSource);
343 2 : if (NS_FAILED(rv)) {
344 0 : return rv;
345 : }
346 :
347 2 : rv = tee->SetSink(aSink);
348 2 : if (NS_FAILED(rv)) {
349 0 : return rv;
350 : }
351 :
352 2 : rv = tee->SetEventTarget(aEventTarget);
353 2 : if (NS_FAILED(rv)) {
354 0 : return rv;
355 : }
356 :
357 2 : tee.forget(aResult);
358 2 : return rv;
359 : }
360 :
361 : nsresult
362 2 : NS_NewInputStreamTee(nsIInputStream** aResult,
363 : nsIInputStream* aSource,
364 : nsIOutputStream* aSink)
365 : {
366 2 : return NS_NewInputStreamTeeAsync(aResult, aSource, aSink, nullptr);
367 : }
368 :
369 : #undef LOG
|