Line data Source code
1 : // Protocol Buffers - Google's data interchange format
2 : // Copyright 2008 Google Inc. All rights reserved.
3 : // https://developers.google.com/protocol-buffers/
4 : //
5 : // Redistribution and use in source and binary forms, with or without
6 : // modification, are permitted provided that the following conditions are
7 : // met:
8 : //
9 : // * Redistributions of source code must retain the above copyright
10 : // notice, this list of conditions and the following disclaimer.
11 : // * Redistributions in binary form must reproduce the above
12 : // copyright notice, this list of conditions and the following disclaimer
13 : // in the documentation and/or other materials provided with the
14 : // distribution.
15 : // * Neither the name of Google Inc. nor the names of its
16 : // contributors may be used to endorse or promote products derived from
17 : // this software without specific prior written permission.
18 : //
19 : // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 : // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 : // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 : // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 : // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 : // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 : // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 : // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 : // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 : // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 : // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 :
31 : // Author: kenton@google.com (Kenton Varda)
32 : // Based on original Protocol Buffers design by
33 : // Sanjay Ghemawat, Jeff Dean, and others.
34 :
35 : #ifdef _MSC_VER
36 : #include <io.h>
37 : #else
38 : #include <unistd.h>
39 : #include <sys/types.h>
40 : #include <sys/stat.h>
41 : #include <fcntl.h>
42 : #endif
43 : #include <errno.h>
44 : #include <iostream>
45 : #include <algorithm>
46 :
47 : #include <google/protobuf/io/zero_copy_stream_impl.h>
48 : #include <google/protobuf/stubs/common.h>
49 : #include <google/protobuf/stubs/stl_util.h>
50 :
51 :
52 : namespace google {
53 : namespace protobuf {
54 : namespace io {
55 :
56 : #ifdef _WIN32
57 : // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
58 : // return value is undefined. We re-define it to always produce an error.
59 : #define lseek(fd, offset, origin) ((off_t)-1)
60 : #endif
61 :
62 : namespace {
63 :
64 : // EINTR sucks.
65 0 : int close_no_eintr(int fd) {
66 : int result;
67 0 : do {
68 0 : result = close(fd);
69 0 : } while (result < 0 && errno == EINTR);
70 0 : return result;
71 : }
72 :
73 : } // namespace
74 :
75 :
76 : // ===================================================================
77 :
78 0 : FileInputStream::FileInputStream(int file_descriptor, int block_size)
79 : : copying_input_(file_descriptor),
80 0 : impl_(©ing_input_, block_size) {
81 0 : }
82 :
83 0 : FileInputStream::~FileInputStream() {}
84 :
85 0 : bool FileInputStream::Close() {
86 0 : return copying_input_.Close();
87 : }
88 :
89 0 : bool FileInputStream::Next(const void** data, int* size) {
90 0 : return impl_.Next(data, size);
91 : }
92 :
93 0 : void FileInputStream::BackUp(int count) {
94 0 : impl_.BackUp(count);
95 0 : }
96 :
97 0 : bool FileInputStream::Skip(int count) {
98 0 : return impl_.Skip(count);
99 : }
100 :
101 0 : int64 FileInputStream::ByteCount() const {
102 0 : return impl_.ByteCount();
103 : }
104 :
105 0 : FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
106 0 : int file_descriptor)
107 : : file_(file_descriptor),
108 : close_on_delete_(false),
109 : is_closed_(false),
110 : errno_(0),
111 0 : previous_seek_failed_(false) {
112 0 : }
113 :
114 0 : FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
115 0 : if (close_on_delete_) {
116 0 : if (!Close()) {
117 0 : GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
118 : }
119 : }
120 0 : }
121 :
122 0 : bool FileInputStream::CopyingFileInputStream::Close() {
123 0 : GOOGLE_CHECK(!is_closed_);
124 :
125 0 : is_closed_ = true;
126 0 : if (close_no_eintr(file_) != 0) {
127 : // The docs on close() do not specify whether a file descriptor is still
128 : // open after close() fails with EIO. However, the glibc source code
129 : // seems to indicate that it is not.
130 0 : errno_ = errno;
131 0 : return false;
132 : }
133 :
134 0 : return true;
135 : }
136 :
137 0 : int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
138 0 : GOOGLE_CHECK(!is_closed_);
139 :
140 : int result;
141 0 : do {
142 0 : result = read(file_, buffer, size);
143 0 : } while (result < 0 && errno == EINTR);
144 :
145 0 : if (result < 0) {
146 : // Read error (not EOF).
147 0 : errno_ = errno;
148 : }
149 :
150 0 : return result;
151 : }
152 :
153 0 : int FileInputStream::CopyingFileInputStream::Skip(int count) {
154 0 : GOOGLE_CHECK(!is_closed_);
155 :
156 0 : if (!previous_seek_failed_ &&
157 0 : lseek(file_, count, SEEK_CUR) != (off_t)-1) {
158 : // Seek succeeded.
159 0 : return count;
160 : } else {
161 : // Failed to seek.
162 :
163 : // Note to self: Don't seek again. This file descriptor doesn't
164 : // support it.
165 0 : previous_seek_failed_ = true;
166 :
167 : // Use the default implementation.
168 0 : return CopyingInputStream::Skip(count);
169 : }
170 : }
171 :
172 : // ===================================================================
173 :
174 0 : FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
175 : : copying_output_(file_descriptor),
176 0 : impl_(©ing_output_, block_size) {
177 0 : }
178 :
179 0 : FileOutputStream::~FileOutputStream() {
180 0 : impl_.Flush();
181 0 : }
182 :
183 0 : bool FileOutputStream::Close() {
184 0 : bool flush_succeeded = impl_.Flush();
185 0 : return copying_output_.Close() && flush_succeeded;
186 : }
187 :
188 0 : bool FileOutputStream::Flush() {
189 0 : return impl_.Flush();
190 : }
191 :
192 0 : bool FileOutputStream::Next(void** data, int* size) {
193 0 : return impl_.Next(data, size);
194 : }
195 :
196 0 : void FileOutputStream::BackUp(int count) {
197 0 : impl_.BackUp(count);
198 0 : }
199 :
200 0 : int64 FileOutputStream::ByteCount() const {
201 0 : return impl_.ByteCount();
202 : }
203 :
204 0 : FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
205 0 : int file_descriptor)
206 : : file_(file_descriptor),
207 : close_on_delete_(false),
208 : is_closed_(false),
209 0 : errno_(0) {
210 0 : }
211 :
212 0 : FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
213 0 : if (close_on_delete_) {
214 0 : if (!Close()) {
215 0 : GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
216 : }
217 : }
218 0 : }
219 :
220 0 : bool FileOutputStream::CopyingFileOutputStream::Close() {
221 0 : GOOGLE_CHECK(!is_closed_);
222 :
223 0 : is_closed_ = true;
224 0 : if (close_no_eintr(file_) != 0) {
225 : // The docs on close() do not specify whether a file descriptor is still
226 : // open after close() fails with EIO. However, the glibc source code
227 : // seems to indicate that it is not.
228 0 : errno_ = errno;
229 0 : return false;
230 : }
231 :
232 0 : return true;
233 : }
234 :
235 0 : bool FileOutputStream::CopyingFileOutputStream::Write(
236 : const void* buffer, int size) {
237 0 : GOOGLE_CHECK(!is_closed_);
238 0 : int total_written = 0;
239 :
240 0 : const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
241 :
242 0 : while (total_written < size) {
243 : int bytes;
244 0 : do {
245 0 : bytes = write(file_, buffer_base + total_written, size - total_written);
246 0 : } while (bytes < 0 && errno == EINTR);
247 :
248 0 : if (bytes <= 0) {
249 : // Write error.
250 :
251 : // FIXME(kenton): According to the man page, if write() returns zero,
252 : // there was no error; write() simply did not write anything. It's
253 : // unclear under what circumstances this might happen, but presumably
254 : // errno won't be set in this case. I am confused as to how such an
255 : // event should be handled. For now I'm treating it as an error, since
256 : // retrying seems like it could lead to an infinite loop. I suspect
257 : // this never actually happens anyway.
258 :
259 0 : if (bytes < 0) {
260 0 : errno_ = errno;
261 : }
262 0 : return false;
263 : }
264 0 : total_written += bytes;
265 : }
266 :
267 0 : return true;
268 : }
269 :
270 : // ===================================================================
271 :
272 0 : IstreamInputStream::IstreamInputStream(istream* input, int block_size)
273 : : copying_input_(input),
274 0 : impl_(©ing_input_, block_size) {
275 0 : }
276 :
277 0 : IstreamInputStream::~IstreamInputStream() {}
278 :
279 0 : bool IstreamInputStream::Next(const void** data, int* size) {
280 0 : return impl_.Next(data, size);
281 : }
282 :
283 0 : void IstreamInputStream::BackUp(int count) {
284 0 : impl_.BackUp(count);
285 0 : }
286 :
287 0 : bool IstreamInputStream::Skip(int count) {
288 0 : return impl_.Skip(count);
289 : }
290 :
291 0 : int64 IstreamInputStream::ByteCount() const {
292 0 : return impl_.ByteCount();
293 : }
294 :
295 0 : IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
296 0 : istream* input)
297 0 : : input_(input) {
298 0 : }
299 :
300 0 : IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
301 :
302 0 : int IstreamInputStream::CopyingIstreamInputStream::Read(
303 : void* buffer, int size) {
304 0 : input_->read(reinterpret_cast<char*>(buffer), size);
305 0 : int result = input_->gcount();
306 0 : if (result == 0 && input_->fail() && !input_->eof()) {
307 0 : return -1;
308 : }
309 0 : return result;
310 : }
311 :
312 : // ===================================================================
313 :
314 0 : OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
315 : : copying_output_(output),
316 0 : impl_(©ing_output_, block_size) {
317 0 : }
318 :
319 0 : OstreamOutputStream::~OstreamOutputStream() {
320 0 : impl_.Flush();
321 0 : }
322 :
323 0 : bool OstreamOutputStream::Next(void** data, int* size) {
324 0 : return impl_.Next(data, size);
325 : }
326 :
327 0 : void OstreamOutputStream::BackUp(int count) {
328 0 : impl_.BackUp(count);
329 0 : }
330 :
331 0 : int64 OstreamOutputStream::ByteCount() const {
332 0 : return impl_.ByteCount();
333 : }
334 :
335 0 : OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
336 0 : ostream* output)
337 0 : : output_(output) {
338 0 : }
339 :
340 0 : OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
341 0 : }
342 :
343 0 : bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
344 : const void* buffer, int size) {
345 0 : output_->write(reinterpret_cast<const char*>(buffer), size);
346 0 : return output_->good();
347 : }
348 :
349 : // ===================================================================
350 :
351 0 : ConcatenatingInputStream::ConcatenatingInputStream(
352 0 : ZeroCopyInputStream* const streams[], int count)
353 0 : : streams_(streams), stream_count_(count), bytes_retired_(0) {
354 0 : }
355 :
356 0 : ConcatenatingInputStream::~ConcatenatingInputStream() {
357 0 : }
358 :
359 0 : bool ConcatenatingInputStream::Next(const void** data, int* size) {
360 0 : while (stream_count_ > 0) {
361 0 : if (streams_[0]->Next(data, size)) return true;
362 :
363 : // That stream is done. Advance to the next one.
364 0 : bytes_retired_ += streams_[0]->ByteCount();
365 0 : ++streams_;
366 0 : --stream_count_;
367 : }
368 :
369 : // No more streams.
370 0 : return false;
371 : }
372 :
373 0 : void ConcatenatingInputStream::BackUp(int count) {
374 0 : if (stream_count_ > 0) {
375 0 : streams_[0]->BackUp(count);
376 : } else {
377 0 : GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
378 : }
379 0 : }
380 :
381 0 : bool ConcatenatingInputStream::Skip(int count) {
382 0 : while (stream_count_ > 0) {
383 : // Assume that ByteCount() can be used to find out how much we actually
384 : // skipped when Skip() fails.
385 0 : int64 target_byte_count = streams_[0]->ByteCount() + count;
386 0 : if (streams_[0]->Skip(count)) return true;
387 :
388 : // Hit the end of the stream. Figure out how many more bytes we still have
389 : // to skip.
390 0 : int64 final_byte_count = streams_[0]->ByteCount();
391 0 : GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
392 0 : count = target_byte_count - final_byte_count;
393 :
394 : // That stream is done. Advance to the next one.
395 0 : bytes_retired_ += final_byte_count;
396 0 : ++streams_;
397 0 : --stream_count_;
398 : }
399 :
400 0 : return false;
401 : }
402 :
403 0 : int64 ConcatenatingInputStream::ByteCount() const {
404 0 : if (stream_count_ == 0) {
405 0 : return bytes_retired_;
406 : } else {
407 0 : return bytes_retired_ + streams_[0]->ByteCount();
408 : }
409 : }
410 :
411 :
412 : // ===================================================================
413 :
414 0 : LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
415 0 : int64 limit)
416 0 : : input_(input), limit_(limit) {
417 0 : prior_bytes_read_ = input_->ByteCount();
418 0 : }
419 :
420 0 : LimitingInputStream::~LimitingInputStream() {
421 : // If we overshot the limit, back up.
422 0 : if (limit_ < 0) input_->BackUp(-limit_);
423 0 : }
424 :
425 0 : bool LimitingInputStream::Next(const void** data, int* size) {
426 0 : if (limit_ <= 0) return false;
427 0 : if (!input_->Next(data, size)) return false;
428 :
429 0 : limit_ -= *size;
430 0 : if (limit_ < 0) {
431 : // We overshot the limit. Reduce *size to hide the rest of the buffer.
432 0 : *size += limit_;
433 : }
434 0 : return true;
435 : }
436 :
437 0 : void LimitingInputStream::BackUp(int count) {
438 0 : if (limit_ < 0) {
439 0 : input_->BackUp(count - limit_);
440 0 : limit_ = count;
441 : } else {
442 0 : input_->BackUp(count);
443 0 : limit_ += count;
444 : }
445 0 : }
446 :
447 0 : bool LimitingInputStream::Skip(int count) {
448 0 : if (count > limit_) {
449 0 : if (limit_ < 0) return false;
450 0 : input_->Skip(limit_);
451 0 : limit_ = 0;
452 0 : return false;
453 : } else {
454 0 : if (!input_->Skip(count)) return false;
455 0 : limit_ -= count;
456 0 : return true;
457 : }
458 : }
459 :
460 0 : int64 LimitingInputStream::ByteCount() const {
461 0 : if (limit_ < 0) {
462 0 : return input_->ByteCount() + limit_ - prior_bytes_read_;
463 : } else {
464 0 : return input_->ByteCount() - prior_bytes_read_;
465 : }
466 : }
467 :
468 :
469 : // ===================================================================
470 :
471 : } // namespace io
472 : } // namespace protobuf
473 : } // namespace google
|