Line data Source code
1 : /*
2 : * Copyright (c) 2000-2007 Niels Provos <provos@citi.umich.edu>
3 : * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4 : *
5 : * Redistribution and use in source and binary forms, with or without
6 : * modification, are permitted provided that the following conditions
7 : * are met:
8 : * 1. Redistributions of source code must retain the above copyright
9 : * notice, this list of conditions and the following disclaimer.
10 : * 2. Redistributions in binary form must reproduce the above copyright
11 : * notice, this list of conditions and the following disclaimer in the
12 : * documentation and/or other materials provided with the distribution.
13 : * 3. The name of the author may not be used to endorse or promote products
14 : * derived from this software without specific prior written permission.
15 : *
16 : * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 : * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 : * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 : * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 : * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 : * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 : * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 : */
27 : #include "event2/event-config.h"
28 : #include "evconfig-private.h"
29 :
30 : #ifdef _WIN32
31 : #define WIN32_LEAN_AND_MEAN
32 : #include <winsock2.h>
33 : #include <windows.h>
34 : #undef WIN32_LEAN_AND_MEAN
35 : #endif
36 :
37 : #include <sys/types.h>
38 : #ifndef _WIN32
39 : #include <sys/socket.h>
40 : #endif
41 : #ifdef EVENT__HAVE_SYS_TIME_H
42 : #include <sys/time.h>
43 : #endif
44 : #include <sys/queue.h>
45 : #include <stdio.h>
46 : #include <stdlib.h>
47 : #ifndef _WIN32
48 : #include <unistd.h>
49 : #endif
50 : #include <errno.h>
51 : #include <signal.h>
52 : #include <string.h>
53 :
54 : #include <sys/queue.h>
55 :
56 : #include "event2/event.h"
57 : #include "event2/event_struct.h"
58 : #include "event2/rpc.h"
59 : #include "event2/rpc_struct.h"
60 : #include "evrpc-internal.h"
61 : #include "event2/http.h"
62 : #include "event2/buffer.h"
63 : #include "event2/tag.h"
64 : #include "event2/http_struct.h"
65 : #include "event2/http_compat.h"
66 : #include "event2/util.h"
67 : #include "util-internal.h"
68 : #include "log-internal.h"
69 : #include "mm-internal.h"
70 :
71 : struct evrpc_base *
72 0 : evrpc_init(struct evhttp *http_server)
73 : {
74 0 : struct evrpc_base* base = mm_calloc(1, sizeof(struct evrpc_base));
75 0 : if (base == NULL)
76 0 : return (NULL);
77 :
78 : /* we rely on the tagging sub system */
79 0 : evtag_init();
80 :
81 0 : TAILQ_INIT(&base->registered_rpcs);
82 0 : TAILQ_INIT(&base->input_hooks);
83 0 : TAILQ_INIT(&base->output_hooks);
84 :
85 0 : TAILQ_INIT(&base->paused_requests);
86 :
87 0 : base->http_server = http_server;
88 :
89 0 : return (base);
90 : }
91 :
92 : void
93 0 : evrpc_free(struct evrpc_base *base)
94 : {
95 : struct evrpc *rpc;
96 : struct evrpc_hook *hook;
97 : struct evrpc_hook_ctx *pause;
98 : int r;
99 :
100 0 : while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
101 0 : r = evrpc_unregister_rpc(base, rpc->uri);
102 0 : EVUTIL_ASSERT(r == 0);
103 : }
104 0 : while ((pause = TAILQ_FIRST(&base->paused_requests)) != NULL) {
105 0 : TAILQ_REMOVE(&base->paused_requests, pause, next);
106 0 : mm_free(pause);
107 : }
108 0 : while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
109 0 : r = evrpc_remove_hook(base, EVRPC_INPUT, hook);
110 0 : EVUTIL_ASSERT(r);
111 : }
112 0 : while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
113 0 : r = evrpc_remove_hook(base, EVRPC_OUTPUT, hook);
114 0 : EVUTIL_ASSERT(r);
115 : }
116 0 : mm_free(base);
117 0 : }
118 :
119 : void *
120 0 : evrpc_add_hook(void *vbase,
121 : enum EVRPC_HOOK_TYPE hook_type,
122 : int (*cb)(void *, struct evhttp_request *, struct evbuffer *, void *),
123 : void *cb_arg)
124 : {
125 0 : struct evrpc_hooks_ *base = vbase;
126 0 : struct evrpc_hook_list *head = NULL;
127 0 : struct evrpc_hook *hook = NULL;
128 0 : switch (hook_type) {
129 : case EVRPC_INPUT:
130 0 : head = &base->in_hooks;
131 0 : break;
132 : case EVRPC_OUTPUT:
133 0 : head = &base->out_hooks;
134 0 : break;
135 : default:
136 0 : EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
137 : }
138 :
139 0 : hook = mm_calloc(1, sizeof(struct evrpc_hook));
140 0 : EVUTIL_ASSERT(hook != NULL);
141 :
142 0 : hook->process = cb;
143 0 : hook->process_arg = cb_arg;
144 0 : TAILQ_INSERT_TAIL(head, hook, next);
145 :
146 0 : return (hook);
147 : }
148 :
149 : static int
150 0 : evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
151 : {
152 0 : struct evrpc_hook *hook = NULL;
153 0 : TAILQ_FOREACH(hook, head, next) {
154 0 : if (hook == handle) {
155 0 : TAILQ_REMOVE(head, hook, next);
156 0 : mm_free(hook);
157 0 : return (1);
158 : }
159 : }
160 :
161 0 : return (0);
162 : }
163 :
164 : /*
165 : * remove the hook specified by the handle
166 : */
167 :
168 : int
169 0 : evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
170 : {
171 0 : struct evrpc_hooks_ *base = vbase;
172 0 : struct evrpc_hook_list *head = NULL;
173 0 : switch (hook_type) {
174 : case EVRPC_INPUT:
175 0 : head = &base->in_hooks;
176 0 : break;
177 : case EVRPC_OUTPUT:
178 0 : head = &base->out_hooks;
179 0 : break;
180 : default:
181 0 : EVUTIL_ASSERT(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
182 : }
183 :
184 0 : return (evrpc_remove_hook_internal(head, handle));
185 : }
186 :
187 : static int
188 0 : evrpc_process_hooks(struct evrpc_hook_list *head, void *ctx,
189 : struct evhttp_request *req, struct evbuffer *evbuf)
190 : {
191 : struct evrpc_hook *hook;
192 0 : TAILQ_FOREACH(hook, head, next) {
193 0 : int res = hook->process(ctx, req, evbuf, hook->process_arg);
194 0 : if (res != EVRPC_CONTINUE)
195 0 : return (res);
196 : }
197 :
198 0 : return (EVRPC_CONTINUE);
199 : }
200 :
201 : static void evrpc_pool_schedule(struct evrpc_pool *pool);
202 : static void evrpc_request_cb(struct evhttp_request *, void *);
203 :
204 : /*
205 : * Registers a new RPC with the HTTP server. The evrpc object is expected
206 : * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
207 : * calls this function.
208 : */
209 :
210 : static char *
211 0 : evrpc_construct_uri(const char *uri)
212 : {
213 : char *constructed_uri;
214 : size_t constructed_uri_len;
215 :
216 0 : constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
217 0 : if ((constructed_uri = mm_malloc(constructed_uri_len)) == NULL)
218 0 : event_err(1, "%s: failed to register rpc at %s",
219 : __func__, uri);
220 0 : memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
221 0 : memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
222 0 : constructed_uri[constructed_uri_len - 1] = '\0';
223 :
224 0 : return (constructed_uri);
225 : }
226 :
227 : int
228 0 : evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
229 : void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
230 : {
231 0 : char *constructed_uri = evrpc_construct_uri(rpc->uri);
232 :
233 0 : rpc->base = base;
234 0 : rpc->cb = cb;
235 0 : rpc->cb_arg = cb_arg;
236 :
237 0 : TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
238 :
239 0 : evhttp_set_cb(base->http_server,
240 : constructed_uri,
241 : evrpc_request_cb,
242 : rpc);
243 :
244 0 : mm_free(constructed_uri);
245 :
246 0 : return (0);
247 : }
248 :
249 : int
250 0 : evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
251 : {
252 0 : char *registered_uri = NULL;
253 : struct evrpc *rpc;
254 : int r;
255 :
256 : /* find the right rpc; linear search might be slow */
257 0 : TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
258 0 : if (strcmp(rpc->uri, name) == 0)
259 0 : break;
260 : }
261 0 : if (rpc == NULL) {
262 : /* We did not find an RPC with this name */
263 0 : return (-1);
264 : }
265 0 : TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
266 :
267 0 : registered_uri = evrpc_construct_uri(name);
268 :
269 : /* remove the http server callback */
270 0 : r = evhttp_del_cb(base->http_server, registered_uri);
271 0 : EVUTIL_ASSERT(r == 0);
272 :
273 0 : mm_free(registered_uri);
274 :
275 0 : mm_free((char *)rpc->uri);
276 0 : mm_free(rpc);
277 0 : return (0);
278 : }
279 :
280 : static int evrpc_pause_request(void *vbase, void *ctx,
281 : void (*cb)(void *, enum EVRPC_HOOK_RESULT));
282 : static void evrpc_request_cb_closure(void *, enum EVRPC_HOOK_RESULT);
283 :
284 : static void
285 0 : evrpc_request_cb(struct evhttp_request *req, void *arg)
286 : {
287 0 : struct evrpc *rpc = arg;
288 0 : struct evrpc_req_generic *rpc_state = NULL;
289 :
290 : /* let's verify the outside parameters */
291 0 : if (req->type != EVHTTP_REQ_POST ||
292 0 : evbuffer_get_length(req->input_buffer) <= 0)
293 : goto error;
294 :
295 0 : rpc_state = mm_calloc(1, sizeof(struct evrpc_req_generic));
296 0 : if (rpc_state == NULL)
297 0 : goto error;
298 0 : rpc_state->rpc = rpc;
299 0 : rpc_state->http_req = req;
300 0 : rpc_state->rpc_data = NULL;
301 :
302 0 : if (TAILQ_FIRST(&rpc->base->input_hooks) != NULL) {
303 : int hook_res;
304 :
305 0 : evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
306 :
307 : /*
308 : * allow hooks to modify the outgoing request
309 : */
310 0 : hook_res = evrpc_process_hooks(&rpc->base->input_hooks,
311 : rpc_state, req, req->input_buffer);
312 0 : switch (hook_res) {
313 : case EVRPC_TERMINATE:
314 0 : goto error;
315 : case EVRPC_PAUSE:
316 0 : evrpc_pause_request(rpc->base, rpc_state,
317 : evrpc_request_cb_closure);
318 0 : return;
319 : case EVRPC_CONTINUE:
320 0 : break;
321 : default:
322 0 : EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
323 : hook_res == EVRPC_CONTINUE ||
324 : hook_res == EVRPC_PAUSE);
325 : }
326 : }
327 :
328 0 : evrpc_request_cb_closure(rpc_state, EVRPC_CONTINUE);
329 0 : return;
330 :
331 : error:
332 0 : evrpc_reqstate_free_(rpc_state);
333 0 : evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
334 0 : return;
335 : }
336 :
337 : static void
338 0 : evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
339 : {
340 0 : struct evrpc_req_generic *rpc_state = arg;
341 : struct evrpc *rpc;
342 : struct evhttp_request *req;
343 :
344 0 : EVUTIL_ASSERT(rpc_state);
345 0 : rpc = rpc_state->rpc;
346 0 : req = rpc_state->http_req;
347 :
348 0 : if (hook_res == EVRPC_TERMINATE)
349 0 : goto error;
350 :
351 : /* let's check that we can parse the request */
352 0 : rpc_state->request = rpc->request_new(rpc->request_new_arg);
353 0 : if (rpc_state->request == NULL)
354 0 : goto error;
355 :
356 0 : if (rpc->request_unmarshal(
357 : rpc_state->request, req->input_buffer) == -1) {
358 : /* we failed to parse the request; that's a bummer */
359 0 : goto error;
360 : }
361 :
362 : /* at this point, we have a well formed request, prepare the reply */
363 :
364 0 : rpc_state->reply = rpc->reply_new(rpc->reply_new_arg);
365 0 : if (rpc_state->reply == NULL)
366 0 : goto error;
367 :
368 : /* give the rpc to the user; they can deal with it */
369 0 : rpc->cb(rpc_state, rpc->cb_arg);
370 :
371 0 : return;
372 :
373 : error:
374 0 : evrpc_reqstate_free_(rpc_state);
375 0 : evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
376 0 : return;
377 : }
378 :
379 :
380 : void
381 0 : evrpc_reqstate_free_(struct evrpc_req_generic* rpc_state)
382 : {
383 : struct evrpc *rpc;
384 0 : EVUTIL_ASSERT(rpc_state != NULL);
385 0 : rpc = rpc_state->rpc;
386 :
387 : /* clean up all memory */
388 0 : if (rpc_state->hook_meta != NULL)
389 0 : evrpc_hook_context_free_(rpc_state->hook_meta);
390 0 : if (rpc_state->request != NULL)
391 0 : rpc->request_free(rpc_state->request);
392 0 : if (rpc_state->reply != NULL)
393 0 : rpc->reply_free(rpc_state->reply);
394 0 : if (rpc_state->rpc_data != NULL)
395 0 : evbuffer_free(rpc_state->rpc_data);
396 0 : mm_free(rpc_state);
397 0 : }
398 :
399 : static void
400 : evrpc_request_done_closure(void *, enum EVRPC_HOOK_RESULT);
401 :
402 : void
403 0 : evrpc_request_done(struct evrpc_req_generic *rpc_state)
404 : {
405 : struct evhttp_request *req;
406 : struct evrpc *rpc;
407 :
408 0 : EVUTIL_ASSERT(rpc_state);
409 :
410 0 : req = rpc_state->http_req;
411 0 : rpc = rpc_state->rpc;
412 :
413 0 : if (rpc->reply_complete(rpc_state->reply) == -1) {
414 : /* the reply was not completely filled in. error out */
415 0 : goto error;
416 : }
417 :
418 0 : if ((rpc_state->rpc_data = evbuffer_new()) == NULL) {
419 : /* out of memory */
420 0 : goto error;
421 : }
422 :
423 : /* serialize the reply */
424 0 : rpc->reply_marshal(rpc_state->rpc_data, rpc_state->reply);
425 :
426 0 : if (TAILQ_FIRST(&rpc->base->output_hooks) != NULL) {
427 : int hook_res;
428 :
429 0 : evrpc_hook_associate_meta_(&rpc_state->hook_meta, req->evcon);
430 :
431 : /* do hook based tweaks to the request */
432 0 : hook_res = evrpc_process_hooks(&rpc->base->output_hooks,
433 : rpc_state, req, rpc_state->rpc_data);
434 0 : switch (hook_res) {
435 : case EVRPC_TERMINATE:
436 0 : goto error;
437 : case EVRPC_PAUSE:
438 0 : if (evrpc_pause_request(rpc->base, rpc_state,
439 : evrpc_request_done_closure) == -1)
440 0 : goto error;
441 0 : return;
442 : case EVRPC_CONTINUE:
443 0 : break;
444 : default:
445 0 : EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
446 : hook_res == EVRPC_CONTINUE ||
447 : hook_res == EVRPC_PAUSE);
448 : }
449 : }
450 :
451 0 : evrpc_request_done_closure(rpc_state, EVRPC_CONTINUE);
452 0 : return;
453 :
454 : error:
455 0 : evrpc_reqstate_free_(rpc_state);
456 0 : evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
457 0 : return;
458 : }
459 :
460 : void *
461 0 : evrpc_get_request(struct evrpc_req_generic *req)
462 : {
463 0 : return req->request;
464 : }
465 :
466 : void *
467 0 : evrpc_get_reply(struct evrpc_req_generic *req)
468 : {
469 0 : return req->reply;
470 : }
471 :
472 : static void
473 0 : evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
474 : {
475 0 : struct evrpc_req_generic *rpc_state = arg;
476 : struct evhttp_request *req;
477 0 : EVUTIL_ASSERT(rpc_state);
478 0 : req = rpc_state->http_req;
479 :
480 0 : if (hook_res == EVRPC_TERMINATE)
481 0 : goto error;
482 :
483 : /* on success, we are going to transmit marshaled binary data */
484 0 : if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
485 0 : evhttp_add_header(req->output_headers,
486 : "Content-Type", "application/octet-stream");
487 : }
488 0 : evhttp_send_reply(req, HTTP_OK, "OK", rpc_state->rpc_data);
489 :
490 0 : evrpc_reqstate_free_(rpc_state);
491 :
492 0 : return;
493 :
494 : error:
495 0 : evrpc_reqstate_free_(rpc_state);
496 0 : evhttp_send_error(req, HTTP_SERVUNAVAIL, NULL);
497 0 : return;
498 : }
499 :
500 :
501 : /* Client implementation of RPC site */
502 :
503 : static int evrpc_schedule_request(struct evhttp_connection *connection,
504 : struct evrpc_request_wrapper *ctx);
505 :
506 : struct evrpc_pool *
507 0 : evrpc_pool_new(struct event_base *base)
508 : {
509 0 : struct evrpc_pool *pool = mm_calloc(1, sizeof(struct evrpc_pool));
510 0 : if (pool == NULL)
511 0 : return (NULL);
512 :
513 0 : TAILQ_INIT(&pool->connections);
514 0 : TAILQ_INIT(&pool->requests);
515 :
516 0 : TAILQ_INIT(&pool->paused_requests);
517 :
518 0 : TAILQ_INIT(&pool->input_hooks);
519 0 : TAILQ_INIT(&pool->output_hooks);
520 :
521 0 : pool->base = base;
522 0 : pool->timeout = -1;
523 :
524 0 : return (pool);
525 : }
526 :
527 : static void
528 0 : evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
529 : {
530 0 : if (request->hook_meta != NULL)
531 0 : evrpc_hook_context_free_(request->hook_meta);
532 0 : mm_free(request->name);
533 0 : mm_free(request);
534 0 : }
535 :
536 : void
537 0 : evrpc_pool_free(struct evrpc_pool *pool)
538 : {
539 : struct evhttp_connection *connection;
540 : struct evrpc_request_wrapper *request;
541 : struct evrpc_hook_ctx *pause;
542 : struct evrpc_hook *hook;
543 : int r;
544 :
545 0 : while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
546 0 : TAILQ_REMOVE(&pool->requests, request, next);
547 0 : evrpc_request_wrapper_free(request);
548 : }
549 :
550 0 : while ((pause = TAILQ_FIRST(&pool->paused_requests)) != NULL) {
551 0 : TAILQ_REMOVE(&pool->paused_requests, pause, next);
552 0 : mm_free(pause);
553 : }
554 :
555 0 : while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
556 0 : TAILQ_REMOVE(&pool->connections, connection, next);
557 0 : evhttp_connection_free(connection);
558 : }
559 :
560 0 : while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
561 0 : r = evrpc_remove_hook(pool, EVRPC_INPUT, hook);
562 0 : EVUTIL_ASSERT(r);
563 : }
564 :
565 0 : while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
566 0 : r = evrpc_remove_hook(pool, EVRPC_OUTPUT, hook);
567 0 : EVUTIL_ASSERT(r);
568 : }
569 :
570 0 : mm_free(pool);
571 0 : }
572 :
573 : /*
574 : * Add a connection to the RPC pool. A request scheduled on the pool
575 : * may use any available connection.
576 : */
577 :
578 : void
579 0 : evrpc_pool_add_connection(struct evrpc_pool *pool,
580 : struct evhttp_connection *connection)
581 : {
582 0 : EVUTIL_ASSERT(connection->http_server == NULL);
583 0 : TAILQ_INSERT_TAIL(&pool->connections, connection, next);
584 :
585 : /*
586 : * associate an event base with this connection
587 : */
588 0 : if (pool->base != NULL)
589 0 : evhttp_connection_set_base(connection, pool->base);
590 :
591 : /*
592 : * unless a timeout was specifically set for a connection,
593 : * the connection inherits the timeout from the pool.
594 : */
595 0 : if (!evutil_timerisset(&connection->timeout))
596 0 : evhttp_connection_set_timeout(connection, pool->timeout);
597 :
598 : /*
599 : * if we have any requests pending, schedule them with the new
600 : * connections.
601 : */
602 :
603 0 : if (TAILQ_FIRST(&pool->requests) != NULL) {
604 0 : struct evrpc_request_wrapper *request =
605 : TAILQ_FIRST(&pool->requests);
606 0 : TAILQ_REMOVE(&pool->requests, request, next);
607 0 : evrpc_schedule_request(connection, request);
608 : }
609 0 : }
610 :
611 : void
612 0 : evrpc_pool_remove_connection(struct evrpc_pool *pool,
613 : struct evhttp_connection *connection)
614 : {
615 0 : TAILQ_REMOVE(&pool->connections, connection, next);
616 0 : }
617 :
618 : void
619 0 : evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
620 : {
621 : struct evhttp_connection *evcon;
622 0 : TAILQ_FOREACH(evcon, &pool->connections, next) {
623 0 : evhttp_connection_set_timeout(evcon, timeout_in_secs);
624 : }
625 0 : pool->timeout = timeout_in_secs;
626 0 : }
627 :
628 :
629 : static void evrpc_reply_done(struct evhttp_request *, void *);
630 : static void evrpc_request_timeout(evutil_socket_t, short, void *);
631 :
632 : /*
633 : * Finds a connection object associated with the pool that is currently
634 : * idle and can be used to make a request.
635 : */
636 : static struct evhttp_connection *
637 0 : evrpc_pool_find_connection(struct evrpc_pool *pool)
638 : {
639 : struct evhttp_connection *connection;
640 0 : TAILQ_FOREACH(connection, &pool->connections, next) {
641 0 : if (TAILQ_FIRST(&connection->requests) == NULL)
642 0 : return (connection);
643 : }
644 :
645 0 : return (NULL);
646 : }
647 :
648 : /*
649 : * Prototypes responsible for evrpc scheduling and hooking
650 : */
651 :
652 : static void evrpc_schedule_request_closure(void *ctx, enum EVRPC_HOOK_RESULT);
653 :
654 : /*
655 : * We assume that the ctx is no longer queued on the pool.
656 : */
657 : static int
658 0 : evrpc_schedule_request(struct evhttp_connection *connection,
659 : struct evrpc_request_wrapper *ctx)
660 : {
661 0 : struct evhttp_request *req = NULL;
662 0 : struct evrpc_pool *pool = ctx->pool;
663 : struct evrpc_status status;
664 :
665 0 : if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
666 0 : goto error;
667 :
668 : /* serialize the request data into the output buffer */
669 0 : ctx->request_marshal(req->output_buffer, ctx->request);
670 :
671 : /* we need to know the connection that we might have to abort */
672 0 : ctx->evcon = connection;
673 :
674 : /* if we get paused we also need to know the request */
675 0 : ctx->req = req;
676 :
677 0 : if (TAILQ_FIRST(&pool->output_hooks) != NULL) {
678 : int hook_res;
679 :
680 0 : evrpc_hook_associate_meta_(&ctx->hook_meta, connection);
681 :
682 : /* apply hooks to the outgoing request */
683 0 : hook_res = evrpc_process_hooks(&pool->output_hooks,
684 : ctx, req, req->output_buffer);
685 :
686 0 : switch (hook_res) {
687 : case EVRPC_TERMINATE:
688 0 : goto error;
689 : case EVRPC_PAUSE:
690 : /* we need to be explicitly resumed */
691 0 : if (evrpc_pause_request(pool, ctx,
692 : evrpc_schedule_request_closure) == -1)
693 0 : goto error;
694 0 : return (0);
695 : case EVRPC_CONTINUE:
696 : /* we can just continue */
697 0 : break;
698 : default:
699 0 : EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
700 : hook_res == EVRPC_CONTINUE ||
701 : hook_res == EVRPC_PAUSE);
702 : }
703 : }
704 :
705 0 : evrpc_schedule_request_closure(ctx, EVRPC_CONTINUE);
706 0 : return (0);
707 :
708 : error:
709 0 : memset(&status, 0, sizeof(status));
710 0 : status.error = EVRPC_STATUS_ERR_UNSTARTED;
711 0 : (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
712 0 : evrpc_request_wrapper_free(ctx);
713 0 : return (-1);
714 : }
715 :
716 : static void
717 0 : evrpc_schedule_request_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
718 : {
719 0 : struct evrpc_request_wrapper *ctx = arg;
720 0 : struct evhttp_connection *connection = ctx->evcon;
721 0 : struct evhttp_request *req = ctx->req;
722 0 : struct evrpc_pool *pool = ctx->pool;
723 : struct evrpc_status status;
724 0 : char *uri = NULL;
725 0 : int res = 0;
726 :
727 0 : if (hook_res == EVRPC_TERMINATE)
728 0 : goto error;
729 :
730 0 : uri = evrpc_construct_uri(ctx->name);
731 0 : if (uri == NULL)
732 0 : goto error;
733 :
734 0 : if (pool->timeout > 0) {
735 : /*
736 : * a timeout after which the whole rpc is going to be aborted.
737 : */
738 : struct timeval tv;
739 0 : evutil_timerclear(&tv);
740 0 : tv.tv_sec = pool->timeout;
741 0 : evtimer_add(&ctx->ev_timeout, &tv);
742 : }
743 :
744 : /* start the request over the connection */
745 0 : res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
746 0 : mm_free(uri);
747 :
748 0 : if (res == -1)
749 0 : goto error;
750 :
751 0 : return;
752 :
753 : error:
754 0 : memset(&status, 0, sizeof(status));
755 0 : status.error = EVRPC_STATUS_ERR_UNSTARTED;
756 0 : (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
757 0 : evrpc_request_wrapper_free(ctx);
758 : }
759 :
760 : /* we just queue the paused request on the pool under the req object */
761 : static int
762 0 : evrpc_pause_request(void *vbase, void *ctx,
763 : void (*cb)(void *, enum EVRPC_HOOK_RESULT))
764 : {
765 0 : struct evrpc_hooks_ *base = vbase;
766 0 : struct evrpc_hook_ctx *pause = mm_malloc(sizeof(*pause));
767 0 : if (pause == NULL)
768 0 : return (-1);
769 :
770 0 : pause->ctx = ctx;
771 0 : pause->cb = cb;
772 :
773 0 : TAILQ_INSERT_TAIL(&base->pause_requests, pause, next);
774 0 : return (0);
775 : }
776 :
777 : int
778 0 : evrpc_resume_request(void *vbase, void *ctx, enum EVRPC_HOOK_RESULT res)
779 : {
780 0 : struct evrpc_hooks_ *base = vbase;
781 0 : struct evrpc_pause_list *head = &base->pause_requests;
782 : struct evrpc_hook_ctx *pause;
783 :
784 0 : TAILQ_FOREACH(pause, head, next) {
785 0 : if (pause->ctx == ctx)
786 0 : break;
787 : }
788 :
789 0 : if (pause == NULL)
790 0 : return (-1);
791 :
792 0 : (*pause->cb)(pause->ctx, res);
793 0 : TAILQ_REMOVE(head, pause, next);
794 0 : mm_free(pause);
795 0 : return (0);
796 : }
797 :
798 : int
799 0 : evrpc_make_request(struct evrpc_request_wrapper *ctx)
800 : {
801 0 : struct evrpc_pool *pool = ctx->pool;
802 :
803 : /* initialize the event structure for this rpc */
804 0 : evtimer_assign(&ctx->ev_timeout, pool->base, evrpc_request_timeout, ctx);
805 :
806 : /* we better have some available connections on the pool */
807 0 : EVUTIL_ASSERT(TAILQ_FIRST(&pool->connections) != NULL);
808 :
809 : /*
810 : * if no connection is available, we queue the request on the pool,
811 : * the next time a connection is empty, the rpc will be send on that.
812 : */
813 0 : TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
814 :
815 0 : evrpc_pool_schedule(pool);
816 :
817 0 : return (0);
818 : }
819 :
820 :
821 : struct evrpc_request_wrapper *
822 0 : evrpc_make_request_ctx(
823 : struct evrpc_pool *pool, void *request, void *reply,
824 : const char *rpcname,
825 : void (*req_marshal)(struct evbuffer*, void *),
826 : void (*rpl_clear)(void *),
827 : int (*rpl_unmarshal)(void *, struct evbuffer *),
828 : void (*cb)(struct evrpc_status *, void *, void *, void *),
829 : void *cbarg)
830 : {
831 0 : struct evrpc_request_wrapper *ctx = (struct evrpc_request_wrapper *)
832 : mm_malloc(sizeof(struct evrpc_request_wrapper));
833 0 : if (ctx == NULL)
834 0 : return (NULL);
835 :
836 0 : ctx->pool = pool;
837 0 : ctx->hook_meta = NULL;
838 0 : ctx->evcon = NULL;
839 0 : ctx->name = mm_strdup(rpcname);
840 0 : if (ctx->name == NULL) {
841 0 : mm_free(ctx);
842 0 : return (NULL);
843 : }
844 0 : ctx->cb = cb;
845 0 : ctx->cb_arg = cbarg;
846 0 : ctx->request = request;
847 0 : ctx->reply = reply;
848 0 : ctx->request_marshal = req_marshal;
849 0 : ctx->reply_clear = rpl_clear;
850 0 : ctx->reply_unmarshal = rpl_unmarshal;
851 :
852 0 : return (ctx);
853 : }
854 :
855 : static void
856 : evrpc_reply_done_closure(void *, enum EVRPC_HOOK_RESULT);
857 :
858 : static void
859 0 : evrpc_reply_done(struct evhttp_request *req, void *arg)
860 : {
861 0 : struct evrpc_request_wrapper *ctx = arg;
862 0 : struct evrpc_pool *pool = ctx->pool;
863 0 : int hook_res = EVRPC_CONTINUE;
864 :
865 : /* cancel any timeout we might have scheduled */
866 0 : event_del(&ctx->ev_timeout);
867 :
868 0 : ctx->req = req;
869 :
870 : /* we need to get the reply now */
871 0 : if (req == NULL) {
872 0 : evrpc_reply_done_closure(ctx, EVRPC_CONTINUE);
873 0 : return;
874 : }
875 :
876 0 : if (TAILQ_FIRST(&pool->input_hooks) != NULL) {
877 0 : evrpc_hook_associate_meta_(&ctx->hook_meta, ctx->evcon);
878 :
879 : /* apply hooks to the incoming request */
880 0 : hook_res = evrpc_process_hooks(&pool->input_hooks,
881 : ctx, req, req->input_buffer);
882 :
883 0 : switch (hook_res) {
884 : case EVRPC_TERMINATE:
885 : case EVRPC_CONTINUE:
886 0 : break;
887 : case EVRPC_PAUSE:
888 : /*
889 : * if we get paused we also need to know the
890 : * request. unfortunately, the underlying
891 : * layer is going to free it. we need to
892 : * request ownership explicitly
893 : */
894 0 : if (req != NULL)
895 0 : evhttp_request_own(req);
896 :
897 0 : evrpc_pause_request(pool, ctx,
898 : evrpc_reply_done_closure);
899 0 : return;
900 : default:
901 0 : EVUTIL_ASSERT(hook_res == EVRPC_TERMINATE ||
902 : hook_res == EVRPC_CONTINUE ||
903 : hook_res == EVRPC_PAUSE);
904 : }
905 : }
906 :
907 0 : evrpc_reply_done_closure(ctx, hook_res);
908 :
909 : /* http request is being freed by underlying layer */
910 : }
911 :
912 : static void
913 0 : evrpc_reply_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
914 : {
915 0 : struct evrpc_request_wrapper *ctx = arg;
916 0 : struct evhttp_request *req = ctx->req;
917 0 : struct evrpc_pool *pool = ctx->pool;
918 : struct evrpc_status status;
919 0 : int res = -1;
920 :
921 0 : memset(&status, 0, sizeof(status));
922 0 : status.http_req = req;
923 :
924 : /* we need to get the reply now */
925 0 : if (req == NULL) {
926 0 : status.error = EVRPC_STATUS_ERR_TIMEOUT;
927 0 : } else if (hook_res == EVRPC_TERMINATE) {
928 0 : status.error = EVRPC_STATUS_ERR_HOOKABORTED;
929 : } else {
930 0 : res = ctx->reply_unmarshal(ctx->reply, req->input_buffer);
931 0 : if (res == -1)
932 0 : status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
933 : }
934 :
935 0 : if (res == -1) {
936 : /* clear everything that we might have written previously */
937 0 : ctx->reply_clear(ctx->reply);
938 : }
939 :
940 0 : (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
941 :
942 0 : evrpc_request_wrapper_free(ctx);
943 :
944 : /* the http layer owned the original request structure, but if we
945 : * got paused, we asked for ownership and need to free it here. */
946 0 : if (req != NULL && evhttp_request_is_owned(req))
947 0 : evhttp_request_free(req);
948 :
949 : /* see if we can schedule another request */
950 0 : evrpc_pool_schedule(pool);
951 0 : }
952 :
953 : static void
954 0 : evrpc_pool_schedule(struct evrpc_pool *pool)
955 : {
956 0 : struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
957 : struct evhttp_connection *evcon;
958 :
959 : /* if no requests are pending, we have no work */
960 0 : if (ctx == NULL)
961 0 : return;
962 :
963 0 : if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
964 0 : TAILQ_REMOVE(&pool->requests, ctx, next);
965 0 : evrpc_schedule_request(evcon, ctx);
966 : }
967 : }
968 :
969 : static void
970 0 : evrpc_request_timeout(evutil_socket_t fd, short what, void *arg)
971 : {
972 0 : struct evrpc_request_wrapper *ctx = arg;
973 0 : struct evhttp_connection *evcon = ctx->evcon;
974 0 : EVUTIL_ASSERT(evcon != NULL);
975 :
976 0 : evhttp_connection_fail_(evcon, EVREQ_HTTP_TIMEOUT);
977 0 : }
978 :
979 : /*
980 : * frees potential meta data associated with a request.
981 : */
982 :
983 : static void
984 0 : evrpc_meta_data_free(struct evrpc_meta_list *meta_data)
985 : {
986 : struct evrpc_meta *entry;
987 0 : EVUTIL_ASSERT(meta_data != NULL);
988 :
989 0 : while ((entry = TAILQ_FIRST(meta_data)) != NULL) {
990 0 : TAILQ_REMOVE(meta_data, entry, next);
991 0 : mm_free(entry->key);
992 0 : mm_free(entry->data);
993 0 : mm_free(entry);
994 : }
995 0 : }
996 :
997 : static struct evrpc_hook_meta *
998 0 : evrpc_hook_meta_new_(void)
999 : {
1000 : struct evrpc_hook_meta *ctx;
1001 0 : ctx = mm_malloc(sizeof(struct evrpc_hook_meta));
1002 0 : EVUTIL_ASSERT(ctx != NULL);
1003 :
1004 0 : TAILQ_INIT(&ctx->meta_data);
1005 0 : ctx->evcon = NULL;
1006 :
1007 0 : return (ctx);
1008 : }
1009 :
1010 : static void
1011 0 : evrpc_hook_associate_meta_(struct evrpc_hook_meta **pctx,
1012 : struct evhttp_connection *evcon)
1013 : {
1014 0 : struct evrpc_hook_meta *ctx = *pctx;
1015 0 : if (ctx == NULL)
1016 0 : *pctx = ctx = evrpc_hook_meta_new_();
1017 0 : ctx->evcon = evcon;
1018 0 : }
1019 :
1020 : static void
1021 0 : evrpc_hook_context_free_(struct evrpc_hook_meta *ctx)
1022 : {
1023 0 : evrpc_meta_data_free(&ctx->meta_data);
1024 0 : mm_free(ctx);
1025 0 : }
1026 :
1027 : /* Adds meta data */
1028 : void
1029 0 : evrpc_hook_add_meta(void *ctx, const char *key,
1030 : const void *data, size_t data_size)
1031 : {
1032 0 : struct evrpc_request_wrapper *req = ctx;
1033 0 : struct evrpc_hook_meta *store = NULL;
1034 0 : struct evrpc_meta *meta = NULL;
1035 :
1036 0 : if ((store = req->hook_meta) == NULL)
1037 0 : store = req->hook_meta = evrpc_hook_meta_new_();
1038 :
1039 0 : meta = mm_malloc(sizeof(struct evrpc_meta));
1040 0 : EVUTIL_ASSERT(meta != NULL);
1041 0 : meta->key = mm_strdup(key);
1042 0 : EVUTIL_ASSERT(meta->key != NULL);
1043 0 : meta->data_size = data_size;
1044 0 : meta->data = mm_malloc(data_size);
1045 0 : EVUTIL_ASSERT(meta->data != NULL);
1046 0 : memcpy(meta->data, data, data_size);
1047 :
1048 0 : TAILQ_INSERT_TAIL(&store->meta_data, meta, next);
1049 0 : }
1050 :
1051 : int
1052 0 : evrpc_hook_find_meta(void *ctx, const char *key, void **data, size_t *data_size)
1053 : {
1054 0 : struct evrpc_request_wrapper *req = ctx;
1055 0 : struct evrpc_meta *meta = NULL;
1056 :
1057 0 : if (req->hook_meta == NULL)
1058 0 : return (-1);
1059 :
1060 0 : TAILQ_FOREACH(meta, &req->hook_meta->meta_data, next) {
1061 0 : if (strcmp(meta->key, key) == 0) {
1062 0 : *data = meta->data;
1063 0 : *data_size = meta->data_size;
1064 0 : return (0);
1065 : }
1066 : }
1067 :
1068 0 : return (-1);
1069 : }
1070 :
1071 : struct evhttp_connection *
1072 0 : evrpc_hook_get_connection(void *ctx)
1073 : {
1074 0 : struct evrpc_request_wrapper *req = ctx;
1075 0 : return (req->hook_meta != NULL ? req->hook_meta->evcon : NULL);
1076 : }
1077 :
1078 : int
1079 0 : evrpc_send_request_generic(struct evrpc_pool *pool,
1080 : void *request, void *reply,
1081 : void (*cb)(struct evrpc_status *, void *, void *, void *),
1082 : void *cb_arg,
1083 : const char *rpcname,
1084 : void (*req_marshal)(struct evbuffer *, void *),
1085 : void (*rpl_clear)(void *),
1086 : int (*rpl_unmarshal)(void *, struct evbuffer *))
1087 : {
1088 : struct evrpc_status status;
1089 : struct evrpc_request_wrapper *ctx;
1090 0 : ctx = evrpc_make_request_ctx(pool, request, reply,
1091 : rpcname, req_marshal, rpl_clear, rpl_unmarshal, cb, cb_arg);
1092 0 : if (ctx == NULL)
1093 0 : goto error;
1094 0 : return (evrpc_make_request(ctx));
1095 : error:
1096 0 : memset(&status, 0, sizeof(status));
1097 0 : status.error = EVRPC_STATUS_ERR_UNSTARTED;
1098 0 : (*(cb))(&status, request, reply, cb_arg);
1099 0 : return (-1);
1100 : }
1101 :
1102 : /** Takes a request object and fills it in with the right magic */
1103 : static struct evrpc *
1104 0 : evrpc_register_object(const char *name,
1105 : void *(*req_new)(void*), void *req_new_arg, void (*req_free)(void *),
1106 : int (*req_unmarshal)(void *, struct evbuffer *),
1107 : void *(*rpl_new)(void*), void *rpl_new_arg, void (*rpl_free)(void *),
1108 : int (*rpl_complete)(void *),
1109 : void (*rpl_marshal)(struct evbuffer *, void *))
1110 : {
1111 0 : struct evrpc* rpc = (struct evrpc *)mm_calloc(1, sizeof(struct evrpc));
1112 0 : if (rpc == NULL)
1113 0 : return (NULL);
1114 0 : rpc->uri = mm_strdup(name);
1115 0 : if (rpc->uri == NULL) {
1116 0 : mm_free(rpc);
1117 0 : return (NULL);
1118 : }
1119 0 : rpc->request_new = req_new;
1120 0 : rpc->request_new_arg = req_new_arg;
1121 0 : rpc->request_free = req_free;
1122 0 : rpc->request_unmarshal = req_unmarshal;
1123 0 : rpc->reply_new = rpl_new;
1124 0 : rpc->reply_new_arg = rpl_new_arg;
1125 0 : rpc->reply_free = rpl_free;
1126 0 : rpc->reply_complete = rpl_complete;
1127 0 : rpc->reply_marshal = rpl_marshal;
1128 0 : return (rpc);
1129 : }
1130 :
1131 : int
1132 0 : evrpc_register_generic(struct evrpc_base *base, const char *name,
1133 : void (*callback)(struct evrpc_req_generic *, void *), void *cbarg,
1134 : void *(*req_new)(void *), void *req_new_arg, void (*req_free)(void *),
1135 : int (*req_unmarshal)(void *, struct evbuffer *),
1136 : void *(*rpl_new)(void *), void *rpl_new_arg, void (*rpl_free)(void *),
1137 : int (*rpl_complete)(void *),
1138 : void (*rpl_marshal)(struct evbuffer *, void *))
1139 : {
1140 0 : struct evrpc* rpc =
1141 : evrpc_register_object(name, req_new, req_new_arg, req_free, req_unmarshal,
1142 : rpl_new, rpl_new_arg, rpl_free, rpl_complete, rpl_marshal);
1143 0 : if (rpc == NULL)
1144 0 : return (-1);
1145 0 : evrpc_register_rpc(base, rpc,
1146 : (void (*)(struct evrpc_req_generic*, void *))callback, cbarg);
1147 0 : return (0);
1148 : }
1149 :
1150 : /** accessors for obscure and undocumented functionality */
1151 : struct evrpc_pool *
1152 0 : evrpc_request_get_pool(struct evrpc_request_wrapper *ctx)
1153 : {
1154 0 : return (ctx->pool);
1155 : }
1156 :
1157 : void
1158 0 : evrpc_request_set_pool(struct evrpc_request_wrapper *ctx,
1159 : struct evrpc_pool *pool)
1160 : {
1161 0 : ctx->pool = pool;
1162 0 : }
1163 :
1164 : void
1165 0 : evrpc_request_set_cb(struct evrpc_request_wrapper *ctx,
1166 : void (*cb)(struct evrpc_status*, void *request, void *reply, void *arg),
1167 : void *cb_arg)
1168 : {
1169 0 : ctx->cb = cb;
1170 0 : ctx->cb_arg = cb_arg;
1171 0 : }
|