Line data Source code
1 : /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* vim: set ts=8 sts=2 et sw=2 tw=80: */
3 : // Copyright (c) 2008 The Chromium Authors. All rights reserved.
4 : // Use of this source code is governed by a BSD-style license that can be
5 : // found in the LICENSE file.
6 :
7 : #include "base/message_pump_libevent.h"
8 :
9 : #include <errno.h>
10 : #include <fcntl.h>
11 : #if defined(ANDROID) || defined(OS_POSIX)
12 : #include <unistd.h>
13 : #endif
14 :
15 : #include "eintr_wrapper.h"
16 : #include "base/logging.h"
17 : #include "base/scoped_nsautorelease_pool.h"
18 : #include "base/time.h"
19 : #include "nsDependentSubstring.h"
20 : #include "event.h"
21 : #include "mozilla/UniquePtr.h"
22 :
23 : // This macro checks that the _EVENT_SIZEOF_* constants defined in
24 : // ipc/chromiume/src/third_party/<platform>/event2/event-config.h are correct.
25 : #if defined(_EVENT_SIZEOF_SHORT)
26 : #define CHECK_EVENT_SIZEOF(TYPE, type) \
27 : static_assert(_EVENT_SIZEOF_##TYPE == sizeof(type), \
28 : "bad _EVENT_SIZEOF_"#TYPE);
29 : #elif defined(EVENT__SIZEOF_SHORT)
30 : #define CHECK_EVENT_SIZEOF(TYPE, type) \
31 : static_assert(EVENT__SIZEOF_##TYPE == sizeof(type), \
32 : "bad EVENT__SIZEOF_"#TYPE);
33 : #else
34 : #error Cannot find libevent type sizes
35 : #endif
36 :
37 : CHECK_EVENT_SIZEOF(LONG, long);
38 : CHECK_EVENT_SIZEOF(LONG_LONG, long long);
39 : CHECK_EVENT_SIZEOF(OFF_T, off_t);
40 : CHECK_EVENT_SIZEOF(PTHREAD_T, pthread_t);
41 : CHECK_EVENT_SIZEOF(SHORT, short);
42 : CHECK_EVENT_SIZEOF(SIZE_T, size_t);
43 : CHECK_EVENT_SIZEOF(VOID_P, void*);
44 :
45 : // Lifecycle of struct event
46 : // Libevent uses two main data structures:
47 : // struct event_base (of which there is one per message pump), and
48 : // struct event (of which there is roughly one per socket).
49 : // The socket's struct event is created in
50 : // MessagePumpLibevent::WatchFileDescriptor(),
51 : // is owned by the FileDescriptorWatcher, and is destroyed in
52 : // StopWatchingFileDescriptor().
53 : // It is moved into and out of lists in struct event_base by
54 : // the libevent functions event_add() and event_del().
55 : //
56 : // TODO(dkegel):
57 : // At the moment bad things happen if a FileDescriptorWatcher
58 : // is active after its MessagePumpLibevent has been destroyed.
59 : // See MessageLoopTest.FileDescriptorWatcherOutlivesMessageLoop
60 : // Not clear yet whether that situation occurs in practice,
61 : // but if it does, we need to fix it.
62 :
63 : namespace base {
64 :
65 : // Return 0 on success
66 : // Too small a function to bother putting in a library?
67 6 : static int SetNonBlocking(int fd) {
68 6 : int flags = fcntl(fd, F_GETFL, 0);
69 6 : if (flags == -1)
70 0 : flags = 0;
71 6 : return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
72 : }
73 :
74 126 : MessagePumpLibevent::FileDescriptorWatcher::FileDescriptorWatcher()
75 : : is_persistent_(false),
76 126 : event_(NULL) {
77 126 : }
78 :
79 78 : MessagePumpLibevent::FileDescriptorWatcher::~FileDescriptorWatcher() {
80 39 : if (event_) {
81 0 : StopWatchingFileDescriptor();
82 : }
83 39 : }
84 :
85 37 : void MessagePumpLibevent::FileDescriptorWatcher::Init(event *e,
86 : bool is_persistent) {
87 37 : DCHECK(e);
88 37 : DCHECK(event_ == NULL);
89 :
90 37 : is_persistent_ = is_persistent;
91 37 : event_ = e;
92 37 : }
93 :
94 76 : event *MessagePumpLibevent::FileDescriptorWatcher::ReleaseEvent() {
95 76 : struct event *e = event_;
96 76 : event_ = NULL;
97 76 : return e;
98 : }
99 :
100 39 : bool MessagePumpLibevent::FileDescriptorWatcher::StopWatchingFileDescriptor() {
101 39 : event* e = ReleaseEvent();
102 39 : if (e == NULL)
103 39 : return true;
104 :
105 : // event_del() is a no-op if the event isn't active.
106 0 : int rv = event_del(e);
107 : delete e;
108 0 : return (rv == 0);
109 : }
110 :
111 : // Called if a byte is received on the wakeup pipe.
112 397 : void MessagePumpLibevent::OnWakeup(int socket, short flags, void* context) {
113 : base::MessagePumpLibevent* that =
114 397 : static_cast<base::MessagePumpLibevent*>(context);
115 397 : DCHECK(that->wakeup_pipe_out_ == socket);
116 :
117 : // Remove and discard the wakeup byte.
118 : char buf;
119 397 : int nread = HANDLE_EINTR(read(socket, &buf, 1));
120 397 : DCHECK_EQ(nread, 1);
121 : // Tell libevent to break out of inner loop.
122 397 : event_base_loopbreak(that->event_base_);
123 397 : }
124 :
125 3 : MessagePumpLibevent::MessagePumpLibevent()
126 : : keep_running_(true),
127 : in_run_(false),
128 3 : event_base_(event_base_new()),
129 : wakeup_pipe_in_(-1),
130 6 : wakeup_pipe_out_(-1) {
131 3 : if (!Init())
132 0 : NOTREACHED();
133 3 : }
134 :
135 3 : bool MessagePumpLibevent::Init() {
136 : int fds[2];
137 3 : if (pipe(fds)) {
138 0 : DLOG(ERROR) << "pipe() failed, errno: " << errno;
139 0 : return false;
140 : }
141 3 : if (SetNonBlocking(fds[0])) {
142 0 : DLOG(ERROR) << "SetNonBlocking for pipe fd[0] failed, errno: " << errno;
143 0 : return false;
144 : }
145 3 : if (SetNonBlocking(fds[1])) {
146 0 : DLOG(ERROR) << "SetNonBlocking for pipe fd[1] failed, errno: " << errno;
147 0 : return false;
148 : }
149 3 : wakeup_pipe_out_ = fds[0];
150 3 : wakeup_pipe_in_ = fds[1];
151 :
152 3 : wakeup_event_ = new event;
153 3 : event_set(wakeup_event_, wakeup_pipe_out_, EV_READ | EV_PERSIST,
154 3 : OnWakeup, this);
155 3 : event_base_set(event_base_, wakeup_event_);
156 :
157 3 : if (event_add(wakeup_event_, 0))
158 0 : return false;
159 3 : return true;
160 : }
161 :
162 0 : MessagePumpLibevent::~MessagePumpLibevent() {
163 0 : DCHECK(wakeup_event_);
164 0 : DCHECK(event_base_);
165 0 : event_del(wakeup_event_);
166 0 : delete wakeup_event_;
167 0 : if (wakeup_pipe_in_ >= 0)
168 0 : close(wakeup_pipe_in_);
169 0 : if (wakeup_pipe_out_ >= 0)
170 0 : close(wakeup_pipe_out_);
171 0 : event_base_free(event_base_);
172 0 : }
173 :
174 37 : bool MessagePumpLibevent::WatchFileDescriptor(int fd,
175 : bool persistent,
176 : Mode mode,
177 : FileDescriptorWatcher *controller,
178 : Watcher *delegate) {
179 37 : DCHECK(fd > 0);
180 37 : DCHECK(controller);
181 37 : DCHECK(delegate);
182 37 : DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
183 :
184 37 : int event_mask = persistent ? EV_PERSIST : 0;
185 37 : if ((mode & WATCH_READ) != 0) {
186 31 : event_mask |= EV_READ;
187 : }
188 37 : if ((mode & WATCH_WRITE) != 0) {
189 6 : event_mask |= EV_WRITE;
190 : }
191 :
192 : // |should_delete_event| is true if we're modifying an event that's currently
193 : // active in |controller|.
194 : // If we're modifying an existing event and there's an error then we need to
195 : // tell libevent to clean it up via event_delete() before returning.
196 37 : bool should_delete_event = true;
197 74 : mozilla::UniquePtr<event> evt(controller->ReleaseEvent());
198 37 : if (evt.get() == NULL) {
199 34 : should_delete_event = false;
200 : // Ownership is transferred to the controller.
201 34 : evt = mozilla::MakeUnique<event>();
202 : } else {
203 : // It's illegal to use this function to listen on 2 separate fds with the
204 : // same |controller|.
205 3 : if (EVENT_FD(evt.get()) != fd) {
206 0 : NOTREACHED() << "FDs don't match" << EVENT_FD(evt.get()) << "!=" << fd;
207 0 : return false;
208 : }
209 :
210 : // Make sure we don't pick up any funky internal libevent masks.
211 3 : int old_interest_mask = evt.get()->ev_events &
212 3 : (EV_READ | EV_WRITE | EV_PERSIST);
213 :
214 : // Combine old/new event masks.
215 3 : event_mask |= old_interest_mask;
216 :
217 : // Must disarm the event before we can reuse it.
218 3 : event_del(evt.get());
219 : }
220 :
221 : // Set current interest mask and message pump for this event.
222 37 : event_set(evt.get(), fd, event_mask, OnLibeventNotification,
223 37 : delegate);
224 :
225 : // Tell libevent which message pump this socket will belong to when we add it.
226 37 : if (event_base_set(event_base_, evt.get()) != 0) {
227 0 : if (should_delete_event) {
228 0 : event_del(evt.get());
229 : }
230 0 : return false;
231 : }
232 :
233 : // Add this socket to the list of monitored sockets.
234 37 : if (event_add(evt.get(), NULL) != 0) {
235 0 : if (should_delete_event) {
236 0 : event_del(evt.get());
237 : }
238 0 : return false;
239 : }
240 :
241 : // Transfer ownership of evt to controller.
242 37 : controller->Init(evt.release(), persistent);
243 37 : return true;
244 : }
245 :
246 :
247 258 : void MessagePumpLibevent::OnLibeventNotification(int fd, short flags,
248 : void* context) {
249 258 : Watcher* watcher = static_cast<Watcher*>(context);
250 :
251 258 : if (flags & EV_WRITE) {
252 6 : watcher->OnFileCanWriteWithoutBlocking(fd);
253 : }
254 258 : if (flags & EV_READ) {
255 252 : watcher->OnFileCanReadWithoutBlocking(fd);
256 : }
257 258 : }
258 :
259 :
260 0 : MessagePumpLibevent::SignalEvent::SignalEvent() :
261 0 : event_(NULL)
262 : {
263 0 : }
264 :
265 0 : MessagePumpLibevent::SignalEvent::~SignalEvent()
266 : {
267 0 : if (event_) {
268 0 : StopCatching();
269 : }
270 0 : }
271 :
272 : void
273 0 : MessagePumpLibevent::SignalEvent::Init(event *e)
274 : {
275 0 : DCHECK(e);
276 0 : DCHECK(event_ == NULL);
277 0 : event_ = e;
278 0 : }
279 :
280 : bool
281 0 : MessagePumpLibevent::SignalEvent::StopCatching()
282 : {
283 : // XXX/cjones: this code could be shared with
284 : // FileDescriptorWatcher. ironic that libevent is "more"
285 : // object-oriented than this C++
286 0 : event* e = ReleaseEvent();
287 0 : if (e == NULL)
288 0 : return true;
289 :
290 : // event_del() is a no-op if the event isn't active.
291 0 : int rv = event_del(e);
292 : delete e;
293 0 : return (rv == 0);
294 : }
295 :
296 : event *
297 0 : MessagePumpLibevent::SignalEvent::ReleaseEvent()
298 : {
299 0 : event *e = event_;
300 0 : event_ = NULL;
301 0 : return e;
302 : }
303 :
304 : bool
305 0 : MessagePumpLibevent::CatchSignal(int sig,
306 : SignalEvent* sigevent,
307 : SignalWatcher* delegate)
308 : {
309 0 : DCHECK(sig > 0);
310 0 : DCHECK(sigevent);
311 0 : DCHECK(delegate);
312 : // TODO if we want to support re-using SignalEvents, this code needs
313 : // to jump through the same hoops as WatchFileDescriptor(). Not
314 : // needed at present
315 0 : DCHECK(NULL == sigevent->event_);
316 :
317 0 : mozilla::UniquePtr<event> evt = mozilla::MakeUnique<event>();
318 0 : signal_set(evt.get(), sig, OnLibeventSignalNotification, delegate);
319 :
320 0 : if (event_base_set(event_base_, evt.get()))
321 0 : return false;
322 :
323 0 : if (signal_add(evt.get(), NULL))
324 0 : return false;
325 :
326 : // Transfer ownership of evt to controller.
327 0 : sigevent->Init(evt.release());
328 0 : return true;
329 : }
330 :
331 : void
332 0 : MessagePumpLibevent::OnLibeventSignalNotification(int sig, short flags,
333 : void* context)
334 : {
335 0 : DCHECK(sig > 0);
336 0 : DCHECK(EV_SIGNAL == flags);
337 0 : DCHECK(context);
338 0 : reinterpret_cast<SignalWatcher*>(context)->OnSignal(sig);
339 0 : }
340 :
341 :
342 : // Reentrant!
343 3 : void MessagePumpLibevent::Run(Delegate* delegate) {
344 3 : DCHECK(keep_running_) << "Quit must have been called outside of Run!";
345 :
346 3 : bool old_in_run = in_run_;
347 3 : in_run_ = true;
348 :
349 : for (;;) {
350 1051 : ScopedNSAutoreleasePool autorelease_pool;
351 :
352 1051 : bool did_work = delegate->DoWork();
353 1051 : if (!keep_running_)
354 0 : break;
355 :
356 1051 : did_work |= delegate->DoDelayedWork(&delayed_work_time_);
357 1051 : if (!keep_running_)
358 0 : break;
359 :
360 1051 : if (did_work)
361 794 : continue;
362 :
363 654 : did_work = delegate->DoIdleWork();
364 654 : if (!keep_running_)
365 0 : break;
366 :
367 654 : if (did_work)
368 0 : continue;
369 :
370 : // EVLOOP_ONCE tells libevent to only block once,
371 : // but to service all pending events when it wakes up.
372 654 : if (delayed_work_time_.is_null()) {
373 654 : event_base_loop(event_base_, EVLOOP_ONCE);
374 : } else {
375 0 : TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
376 0 : if (delay > TimeDelta()) {
377 : struct timeval poll_tv;
378 0 : poll_tv.tv_sec = delay.InSeconds();
379 0 : poll_tv.tv_usec = delay.InMicroseconds() % Time::kMicrosecondsPerSecond;
380 0 : event_base_loopexit(event_base_, &poll_tv);
381 0 : event_base_loop(event_base_, EVLOOP_ONCE);
382 : } else {
383 : // It looks like delayed_work_time_ indicates a time in the past, so we
384 : // need to call DoDelayedWork now.
385 0 : delayed_work_time_ = TimeTicks();
386 : }
387 : }
388 1048 : }
389 :
390 0 : keep_running_ = true;
391 0 : in_run_ = old_in_run;
392 0 : }
393 :
394 0 : void MessagePumpLibevent::Quit() {
395 0 : DCHECK(in_run_);
396 : // Tell both libevent and Run that they should break out of their loops.
397 0 : keep_running_ = false;
398 0 : ScheduleWork();
399 0 : }
400 :
401 393 : void MessagePumpLibevent::ScheduleWork() {
402 : // Tell libevent (in a threadsafe way) that it should break out of its loop.
403 393 : char buf = 0;
404 393 : int nwrite = HANDLE_EINTR(write(wakeup_pipe_in_, &buf, 1));
405 393 : DCHECK(nwrite == 1 || errno == EAGAIN)
406 0 : << "[nwrite:" << nwrite << "] [errno:" << errno << "]";
407 393 : }
408 :
409 0 : void MessagePumpLibevent::ScheduleDelayedWork(
410 : const TimeTicks& delayed_work_time) {
411 : // We know that we can't be blocked on Wait right now since this method can
412 : // only be called on the same thread as Run, so we only need to update our
413 : // record of how long to sleep when we do sleep.
414 0 : delayed_work_time_ = delayed_work_time;
415 0 : }
416 :
417 0 : void LineWatcher::OnFileCanReadWithoutBlocking(int aFd)
418 : {
419 0 : ssize_t length = 0;
420 :
421 : while (true) {
422 0 : length = read(aFd, mReceiveBuffer.get(), mBufferSize - mReceivedIndex);
423 0 : DCHECK(length <= ssize_t(mBufferSize - mReceivedIndex));
424 0 : if (length <= 0) {
425 0 : if (length < 0) {
426 0 : if (errno == EINTR) {
427 0 : continue; // retry system call when interrupted
428 : }
429 0 : if (errno == EAGAIN || errno == EWOULDBLOCK) {
430 0 : return; // no data available: return and re-poll
431 : }
432 0 : DLOG(ERROR) << "Can't read from fd, error " << errno;
433 : } else {
434 0 : DLOG(ERROR) << "End of file";
435 : }
436 : // At this point, assume that we can't actually access
437 : // the socket anymore, and indicate an error.
438 0 : OnError();
439 0 : mReceivedIndex = 0;
440 0 : return;
441 : }
442 :
443 0 : while (length-- > 0) {
444 0 : DCHECK(mReceivedIndex < mBufferSize);
445 0 : if (mReceiveBuffer[mReceivedIndex] == mTerminator) {
446 0 : nsDependentCSubstring message(mReceiveBuffer.get(), mReceivedIndex);
447 0 : OnLineRead(aFd, message);
448 0 : if (length > 0) {
449 0 : DCHECK(mReceivedIndex < (mBufferSize - 1));
450 0 : memmove(&mReceiveBuffer[0], &mReceiveBuffer[mReceivedIndex + 1], length);
451 : }
452 0 : mReceivedIndex = 0;
453 : } else {
454 0 : mReceivedIndex++;
455 : }
456 : }
457 0 : }
458 : }
459 : } // namespace base
|