Line data Source code
1 : /*
2 : * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3 : * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions
8 : * are met:
9 : * 1. Redistributions of source code must retain the above copyright
10 : * notice, this list of conditions and the following disclaimer.
11 : * 2. Redistributions in binary form must reproduce the above copyright
12 : * notice, this list of conditions and the following disclaimer in the
13 : * documentation and/or other materials provided with the distribution.
14 : * 3. The name of the author may not be used to endorse or promote products
15 : * derived from this software without specific prior written permission.
16 : *
17 : * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18 : * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19 : * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20 : * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21 : * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22 : * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26 : * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 : */
28 : #include "evconfig-private.h"
29 :
30 : #include <sys/types.h>
31 : #include <limits.h>
32 : #include <string.h>
33 : #include <stdlib.h>
34 :
35 : #include "event2/event.h"
36 : #include "event2/event_struct.h"
37 : #include "event2/util.h"
38 : #include "event2/bufferevent.h"
39 : #include "event2/bufferevent_struct.h"
40 : #include "event2/buffer.h"
41 :
42 : #include "ratelim-internal.h"
43 :
44 : #include "bufferevent-internal.h"
45 : #include "mm-internal.h"
46 : #include "util-internal.h"
47 : #include "event-internal.h"
48 :
49 : int
50 0 : ev_token_bucket_init_(struct ev_token_bucket *bucket,
51 : const struct ev_token_bucket_cfg *cfg,
52 : ev_uint32_t current_tick,
53 : int reinitialize)
54 : {
55 0 : if (reinitialize) {
56 : /* on reinitialization, we only clip downwards, since we've
57 : already used who-knows-how-much bandwidth this tick. We
58 : leave "last_updated" as it is; the next update will add the
59 : appropriate amount of bandwidth to the bucket.
60 : */
61 0 : if (bucket->read_limit > (ev_int64_t) cfg->read_maximum)
62 0 : bucket->read_limit = cfg->read_maximum;
63 0 : if (bucket->write_limit > (ev_int64_t) cfg->write_maximum)
64 0 : bucket->write_limit = cfg->write_maximum;
65 : } else {
66 0 : bucket->read_limit = cfg->read_rate;
67 0 : bucket->write_limit = cfg->write_rate;
68 0 : bucket->last_updated = current_tick;
69 : }
70 0 : return 0;
71 : }
72 :
73 : int
74 0 : ev_token_bucket_update_(struct ev_token_bucket *bucket,
75 : const struct ev_token_bucket_cfg *cfg,
76 : ev_uint32_t current_tick)
77 : {
78 : /* It's okay if the tick number overflows, since we'll just
79 : * wrap around when we do the unsigned substraction. */
80 0 : unsigned n_ticks = current_tick - bucket->last_updated;
81 :
82 : /* Make sure some ticks actually happened, and that time didn't
83 : * roll back. */
84 0 : if (n_ticks == 0 || n_ticks > INT_MAX)
85 0 : return 0;
86 :
87 : /* Naively, we would say
88 : bucket->limit += n_ticks * cfg->rate;
89 :
90 : if (bucket->limit > cfg->maximum)
91 : bucket->limit = cfg->maximum;
92 :
93 : But we're worried about overflow, so we do it like this:
94 : */
95 :
96 0 : if ((cfg->read_maximum - bucket->read_limit) / n_ticks < cfg->read_rate)
97 0 : bucket->read_limit = cfg->read_maximum;
98 : else
99 0 : bucket->read_limit += n_ticks * cfg->read_rate;
100 :
101 :
102 0 : if ((cfg->write_maximum - bucket->write_limit) / n_ticks < cfg->write_rate)
103 0 : bucket->write_limit = cfg->write_maximum;
104 : else
105 0 : bucket->write_limit += n_ticks * cfg->write_rate;
106 :
107 :
108 0 : bucket->last_updated = current_tick;
109 :
110 0 : return 1;
111 : }
112 :
113 : static inline void
114 0 : bufferevent_update_buckets(struct bufferevent_private *bev)
115 : {
116 : /* Must hold lock on bev. */
117 : struct timeval now;
118 : unsigned tick;
119 0 : event_base_gettimeofday_cached(bev->bev.ev_base, &now);
120 0 : tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg);
121 0 : if (tick != bev->rate_limiting->limit.last_updated)
122 0 : ev_token_bucket_update_(&bev->rate_limiting->limit,
123 0 : bev->rate_limiting->cfg, tick);
124 0 : }
125 :
126 : ev_uint32_t
127 0 : ev_token_bucket_get_tick_(const struct timeval *tv,
128 : const struct ev_token_bucket_cfg *cfg)
129 : {
130 : /* This computation uses two multiplies and a divide. We could do
131 : * fewer if we knew that the tick length was an integer number of
132 : * seconds, or if we knew it divided evenly into a second. We should
133 : * investigate that more.
134 : */
135 :
136 : /* We cast to an ev_uint64_t first, since we don't want to overflow
137 : * before we do the final divide. */
138 0 : ev_uint64_t msec = (ev_uint64_t)tv->tv_sec * 1000 + tv->tv_usec / 1000;
139 0 : return (unsigned)(msec / cfg->msec_per_tick);
140 : }
141 :
142 : struct ev_token_bucket_cfg *
143 0 : ev_token_bucket_cfg_new(size_t read_rate, size_t read_burst,
144 : size_t write_rate, size_t write_burst,
145 : const struct timeval *tick_len)
146 : {
147 : struct ev_token_bucket_cfg *r;
148 : struct timeval g;
149 0 : if (! tick_len) {
150 0 : g.tv_sec = 1;
151 0 : g.tv_usec = 0;
152 0 : tick_len = &g;
153 : }
154 0 : if (read_rate > read_burst || write_rate > write_burst ||
155 0 : read_rate < 1 || write_rate < 1)
156 0 : return NULL;
157 0 : if (read_rate > EV_RATE_LIMIT_MAX ||
158 0 : write_rate > EV_RATE_LIMIT_MAX ||
159 0 : read_burst > EV_RATE_LIMIT_MAX ||
160 0 : write_burst > EV_RATE_LIMIT_MAX)
161 0 : return NULL;
162 0 : r = mm_calloc(1, sizeof(struct ev_token_bucket_cfg));
163 0 : if (!r)
164 0 : return NULL;
165 0 : r->read_rate = read_rate;
166 0 : r->write_rate = write_rate;
167 0 : r->read_maximum = read_burst;
168 0 : r->write_maximum = write_burst;
169 0 : memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
170 0 : r->msec_per_tick = (tick_len->tv_sec * 1000) +
171 0 : (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
172 0 : return r;
173 : }
174 :
175 : void
176 0 : ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg)
177 : {
178 0 : mm_free(cfg);
179 0 : }
180 :
181 : /* Default values for max_single_read & max_single_write variables. */
182 : #define MAX_SINGLE_READ_DEFAULT 16384
183 : #define MAX_SINGLE_WRITE_DEFAULT 16384
184 :
185 : #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0)
186 : #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0)
187 :
188 : static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g);
189 : static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g);
190 : static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g);
191 : static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g);
192 :
193 : /** Helper: figure out the maximum amount we should write if is_write, or
194 : the maximum amount we should read if is_read. Return that maximum, or
195 : 0 if our bucket is wholly exhausted.
196 : */
197 : static inline ev_ssize_t
198 0 : bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write)
199 : {
200 : /* needs lock on bev. */
201 0 : ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read;
202 :
203 : #define LIM(x) \
204 : (is_write ? (x).write_limit : (x).read_limit)
205 :
206 : #define GROUP_SUSPENDED(g) \
207 : (is_write ? (g)->write_suspended : (g)->read_suspended)
208 :
209 : /* Sets max_so_far to MIN(x, max_so_far) */
210 : #define CLAMPTO(x) \
211 : do { \
212 : if (max_so_far > (x)) \
213 : max_so_far = (x); \
214 : } while (0);
215 :
216 0 : if (!bev->rate_limiting)
217 0 : return max_so_far;
218 :
219 : /* If rate-limiting is enabled at all, update the appropriate
220 : bucket, and take the smaller of our rate limit and the group
221 : rate limit.
222 : */
223 :
224 0 : if (bev->rate_limiting->cfg) {
225 0 : bufferevent_update_buckets(bev);
226 0 : max_so_far = LIM(bev->rate_limiting->limit);
227 : }
228 0 : if (bev->rate_limiting->group) {
229 0 : struct bufferevent_rate_limit_group *g =
230 0 : bev->rate_limiting->group;
231 : ev_ssize_t share;
232 0 : LOCK_GROUP(g);
233 0 : if (GROUP_SUSPENDED(g)) {
234 : /* We can get here if we failed to lock this
235 : * particular bufferevent while suspending the whole
236 : * group. */
237 0 : if (is_write)
238 0 : bufferevent_suspend_write_(&bev->bev,
239 : BEV_SUSPEND_BW_GROUP);
240 : else
241 0 : bufferevent_suspend_read_(&bev->bev,
242 : BEV_SUSPEND_BW_GROUP);
243 0 : share = 0;
244 : } else {
245 : /* XXXX probably we should divide among the active
246 : * members, not the total members. */
247 0 : share = LIM(g->rate_limit) / g->n_members;
248 0 : if (share < g->min_share)
249 0 : share = g->min_share;
250 : }
251 0 : UNLOCK_GROUP(g);
252 0 : CLAMPTO(share);
253 : }
254 :
255 0 : if (max_so_far < 0)
256 0 : max_so_far = 0;
257 0 : return max_so_far;
258 : }
259 :
260 : ev_ssize_t
261 0 : bufferevent_get_read_max_(struct bufferevent_private *bev)
262 : {
263 0 : return bufferevent_get_rlim_max_(bev, 0);
264 : }
265 :
266 : ev_ssize_t
267 0 : bufferevent_get_write_max_(struct bufferevent_private *bev)
268 : {
269 0 : return bufferevent_get_rlim_max_(bev, 1);
270 : }
271 :
272 : int
273 0 : bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
274 : {
275 : /* XXXXX Make sure all users of this function check its return value */
276 0 : int r = 0;
277 : /* need to hold lock on bev */
278 0 : if (!bev->rate_limiting)
279 0 : return 0;
280 :
281 0 : if (bev->rate_limiting->cfg) {
282 0 : bev->rate_limiting->limit.read_limit -= bytes;
283 0 : if (bev->rate_limiting->limit.read_limit <= 0) {
284 0 : bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW);
285 0 : if (event_add(&bev->rate_limiting->refill_bucket_event,
286 0 : &bev->rate_limiting->cfg->tick_timeout) < 0)
287 0 : r = -1;
288 0 : } else if (bev->read_suspended & BEV_SUSPEND_BW) {
289 0 : if (!(bev->write_suspended & BEV_SUSPEND_BW))
290 0 : event_del(&bev->rate_limiting->refill_bucket_event);
291 0 : bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
292 : }
293 : }
294 :
295 0 : if (bev->rate_limiting->group) {
296 0 : LOCK_GROUP(bev->rate_limiting->group);
297 0 : bev->rate_limiting->group->rate_limit.read_limit -= bytes;
298 0 : bev->rate_limiting->group->total_read += bytes;
299 0 : if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
300 0 : bev_group_suspend_reading_(bev->rate_limiting->group);
301 0 : } else if (bev->rate_limiting->group->read_suspended) {
302 0 : bev_group_unsuspend_reading_(bev->rate_limiting->group);
303 : }
304 0 : UNLOCK_GROUP(bev->rate_limiting->group);
305 : }
306 :
307 0 : return r;
308 : }
309 :
310 : int
311 0 : bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes)
312 : {
313 : /* XXXXX Make sure all users of this function check its return value */
314 0 : int r = 0;
315 : /* need to hold lock */
316 0 : if (!bev->rate_limiting)
317 0 : return 0;
318 :
319 0 : if (bev->rate_limiting->cfg) {
320 0 : bev->rate_limiting->limit.write_limit -= bytes;
321 0 : if (bev->rate_limiting->limit.write_limit <= 0) {
322 0 : bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW);
323 0 : if (event_add(&bev->rate_limiting->refill_bucket_event,
324 0 : &bev->rate_limiting->cfg->tick_timeout) < 0)
325 0 : r = -1;
326 0 : } else if (bev->write_suspended & BEV_SUSPEND_BW) {
327 0 : if (!(bev->read_suspended & BEV_SUSPEND_BW))
328 0 : event_del(&bev->rate_limiting->refill_bucket_event);
329 0 : bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
330 : }
331 : }
332 :
333 0 : if (bev->rate_limiting->group) {
334 0 : LOCK_GROUP(bev->rate_limiting->group);
335 0 : bev->rate_limiting->group->rate_limit.write_limit -= bytes;
336 0 : bev->rate_limiting->group->total_written += bytes;
337 0 : if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
338 0 : bev_group_suspend_writing_(bev->rate_limiting->group);
339 0 : } else if (bev->rate_limiting->group->write_suspended) {
340 0 : bev_group_unsuspend_writing_(bev->rate_limiting->group);
341 : }
342 0 : UNLOCK_GROUP(bev->rate_limiting->group);
343 : }
344 :
345 0 : return r;
346 : }
347 :
348 : /** Stop reading on every bufferevent in <b>g</b> */
349 : static int
350 0 : bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g)
351 : {
352 : /* Needs group lock */
353 : struct bufferevent_private *bev;
354 0 : g->read_suspended = 1;
355 0 : g->pending_unsuspend_read = 0;
356 :
357 : /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK,
358 : to prevent a deadlock. (Ordinarily, the group lock nests inside
359 : the bufferevent locks. If we are unable to lock any individual
360 : bufferevent, it will find out later when it looks at its limit
361 : and sees that its group is suspended.)
362 : */
363 0 : LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
364 0 : if (EVLOCK_TRY_LOCK_(bev->lock)) {
365 0 : bufferevent_suspend_read_(&bev->bev,
366 : BEV_SUSPEND_BW_GROUP);
367 0 : EVLOCK_UNLOCK(bev->lock, 0);
368 : }
369 : }
370 0 : return 0;
371 : }
372 :
373 : /** Stop writing on every bufferevent in <b>g</b> */
374 : static int
375 0 : bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g)
376 : {
377 : /* Needs group lock */
378 : struct bufferevent_private *bev;
379 0 : g->write_suspended = 1;
380 0 : g->pending_unsuspend_write = 0;
381 0 : LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) {
382 0 : if (EVLOCK_TRY_LOCK_(bev->lock)) {
383 0 : bufferevent_suspend_write_(&bev->bev,
384 : BEV_SUSPEND_BW_GROUP);
385 0 : EVLOCK_UNLOCK(bev->lock, 0);
386 : }
387 : }
388 0 : return 0;
389 : }
390 :
391 : /** Timer callback invoked on a single bufferevent with one or more exhausted
392 : buckets when they are ready to refill. */
393 : static void
394 0 : bev_refill_callback_(evutil_socket_t fd, short what, void *arg)
395 : {
396 : unsigned tick;
397 : struct timeval now;
398 0 : struct bufferevent_private *bev = arg;
399 0 : int again = 0;
400 0 : BEV_LOCK(&bev->bev);
401 0 : if (!bev->rate_limiting || !bev->rate_limiting->cfg) {
402 0 : BEV_UNLOCK(&bev->bev);
403 0 : return;
404 : }
405 :
406 : /* First, update the bucket */
407 0 : event_base_gettimeofday_cached(bev->bev.ev_base, &now);
408 0 : tick = ev_token_bucket_get_tick_(&now,
409 0 : bev->rate_limiting->cfg);
410 0 : ev_token_bucket_update_(&bev->rate_limiting->limit,
411 0 : bev->rate_limiting->cfg,
412 : tick);
413 :
414 : /* Now unsuspend any read/write operations as appropriate. */
415 0 : if ((bev->read_suspended & BEV_SUSPEND_BW)) {
416 0 : if (bev->rate_limiting->limit.read_limit > 0)
417 0 : bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW);
418 : else
419 0 : again = 1;
420 : }
421 0 : if ((bev->write_suspended & BEV_SUSPEND_BW)) {
422 0 : if (bev->rate_limiting->limit.write_limit > 0)
423 0 : bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW);
424 : else
425 0 : again = 1;
426 : }
427 0 : if (again) {
428 : /* One or more of the buckets may need another refill if they
429 : started negative.
430 :
431 : XXXX if we need to be quiet for more ticks, we should
432 : maybe figure out what timeout we really want.
433 : */
434 : /* XXXX Handle event_add failure somehow */
435 0 : event_add(&bev->rate_limiting->refill_bucket_event,
436 0 : &bev->rate_limiting->cfg->tick_timeout);
437 : }
438 0 : BEV_UNLOCK(&bev->bev);
439 : }
440 :
441 : /** Helper: grab a random element from a bufferevent group.
442 : *
443 : * Requires that we hold the lock on the group.
444 : */
445 : static struct bufferevent_private *
446 0 : bev_group_random_element_(struct bufferevent_rate_limit_group *group)
447 : {
448 : int which;
449 : struct bufferevent_private *bev;
450 :
451 : /* requires group lock */
452 :
453 0 : if (!group->n_members)
454 0 : return NULL;
455 :
456 0 : EVUTIL_ASSERT(! LIST_EMPTY(&group->members));
457 :
458 0 : which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members);
459 :
460 0 : bev = LIST_FIRST(&group->members);
461 0 : while (which--)
462 0 : bev = LIST_NEXT(bev, rate_limiting->next_in_group);
463 :
464 0 : return bev;
465 : }
466 :
467 : /** Iterate over the elements of a rate-limiting group 'g' with a random
468 : starting point, assigning each to the variable 'bev', and executing the
469 : block 'block'.
470 :
471 : We do this in a half-baked effort to get fairness among group members.
472 : XXX Round-robin or some kind of priority queue would be even more fair.
473 : */
474 : #define FOREACH_RANDOM_ORDER(block) \
475 : do { \
476 : first = bev_group_random_element_(g); \
477 : for (bev = first; bev != LIST_END(&g->members); \
478 : bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
479 : block ; \
480 : } \
481 : for (bev = LIST_FIRST(&g->members); bev && bev != first; \
482 : bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \
483 : block ; \
484 : } \
485 : } while (0)
486 :
487 : static void
488 0 : bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g)
489 : {
490 0 : int again = 0;
491 : struct bufferevent_private *bev, *first;
492 :
493 0 : g->read_suspended = 0;
494 0 : FOREACH_RANDOM_ORDER({
495 : if (EVLOCK_TRY_LOCK_(bev->lock)) {
496 : bufferevent_unsuspend_read_(&bev->bev,
497 : BEV_SUSPEND_BW_GROUP);
498 : EVLOCK_UNLOCK(bev->lock, 0);
499 : } else {
500 : again = 1;
501 : }
502 : });
503 0 : g->pending_unsuspend_read = again;
504 0 : }
505 :
506 : static void
507 0 : bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g)
508 : {
509 0 : int again = 0;
510 : struct bufferevent_private *bev, *first;
511 0 : g->write_suspended = 0;
512 :
513 0 : FOREACH_RANDOM_ORDER({
514 : if (EVLOCK_TRY_LOCK_(bev->lock)) {
515 : bufferevent_unsuspend_write_(&bev->bev,
516 : BEV_SUSPEND_BW_GROUP);
517 : EVLOCK_UNLOCK(bev->lock, 0);
518 : } else {
519 : again = 1;
520 : }
521 : });
522 0 : g->pending_unsuspend_write = again;
523 0 : }
524 :
525 : /** Callback invoked every tick to add more elements to the group bucket
526 : and unsuspend group members as needed.
527 : */
528 : static void
529 0 : bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg)
530 : {
531 0 : struct bufferevent_rate_limit_group *g = arg;
532 : unsigned tick;
533 : struct timeval now;
534 :
535 0 : event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
536 :
537 0 : LOCK_GROUP(g);
538 :
539 0 : tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg);
540 0 : ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick);
541 :
542 0 : if (g->pending_unsuspend_read ||
543 0 : (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) {
544 0 : bev_group_unsuspend_reading_(g);
545 : }
546 0 : if (g->pending_unsuspend_write ||
547 0 : (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){
548 0 : bev_group_unsuspend_writing_(g);
549 : }
550 :
551 : /* XXXX Rather than waiting to the next tick to unsuspend stuff
552 : * with pending_unsuspend_write/read, we should do it on the
553 : * next iteration of the mainloop.
554 : */
555 :
556 0 : UNLOCK_GROUP(g);
557 0 : }
558 :
559 : int
560 0 : bufferevent_set_rate_limit(struct bufferevent *bev,
561 : struct ev_token_bucket_cfg *cfg)
562 : {
563 0 : struct bufferevent_private *bevp =
564 : EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
565 0 : int r = -1;
566 : struct bufferevent_rate_limit *rlim;
567 : struct timeval now;
568 : ev_uint32_t tick;
569 0 : int reinit = 0, suspended = 0;
570 : /* XXX reference-count cfg */
571 :
572 0 : BEV_LOCK(bev);
573 :
574 0 : if (cfg == NULL) {
575 0 : if (bevp->rate_limiting) {
576 0 : rlim = bevp->rate_limiting;
577 0 : rlim->cfg = NULL;
578 0 : bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
579 0 : bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
580 0 : if (event_initialized(&rlim->refill_bucket_event))
581 0 : event_del(&rlim->refill_bucket_event);
582 : }
583 0 : r = 0;
584 0 : goto done;
585 : }
586 :
587 0 : event_base_gettimeofday_cached(bev->ev_base, &now);
588 0 : tick = ev_token_bucket_get_tick_(&now, cfg);
589 :
590 0 : if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) {
591 : /* no-op */
592 0 : r = 0;
593 0 : goto done;
594 : }
595 0 : if (bevp->rate_limiting == NULL) {
596 0 : rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
597 0 : if (!rlim)
598 0 : goto done;
599 0 : bevp->rate_limiting = rlim;
600 : } else {
601 0 : rlim = bevp->rate_limiting;
602 : }
603 0 : reinit = rlim->cfg != NULL;
604 :
605 0 : rlim->cfg = cfg;
606 0 : ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit);
607 :
608 0 : if (reinit) {
609 0 : EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event));
610 0 : event_del(&rlim->refill_bucket_event);
611 : }
612 0 : event_assign(&rlim->refill_bucket_event, bev->ev_base,
613 : -1, EV_FINALIZE, bev_refill_callback_, bevp);
614 :
615 0 : if (rlim->limit.read_limit > 0) {
616 0 : bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
617 : } else {
618 0 : bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
619 0 : suspended=1;
620 : }
621 0 : if (rlim->limit.write_limit > 0) {
622 0 : bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
623 : } else {
624 0 : bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
625 0 : suspended = 1;
626 : }
627 :
628 0 : if (suspended)
629 0 : event_add(&rlim->refill_bucket_event, &cfg->tick_timeout);
630 :
631 0 : r = 0;
632 :
633 : done:
634 0 : BEV_UNLOCK(bev);
635 0 : return r;
636 : }
637 :
638 : struct bufferevent_rate_limit_group *
639 0 : bufferevent_rate_limit_group_new(struct event_base *base,
640 : const struct ev_token_bucket_cfg *cfg)
641 : {
642 : struct bufferevent_rate_limit_group *g;
643 : struct timeval now;
644 : ev_uint32_t tick;
645 :
646 0 : event_base_gettimeofday_cached(base, &now);
647 0 : tick = ev_token_bucket_get_tick_(&now, cfg);
648 :
649 0 : g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group));
650 0 : if (!g)
651 0 : return NULL;
652 0 : memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
653 0 : LIST_INIT(&g->members);
654 :
655 0 : ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0);
656 :
657 0 : event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE,
658 : bev_group_refill_callback_, g);
659 : /*XXXX handle event_add failure */
660 0 : event_add(&g->master_refill_event, &cfg->tick_timeout);
661 :
662 0 : EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
663 :
664 0 : bufferevent_rate_limit_group_set_min_share(g, 64);
665 :
666 0 : evutil_weakrand_seed_(&g->weakrand_seed,
667 0 : (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g));
668 :
669 0 : return g;
670 : }
671 :
672 : int
673 0 : bufferevent_rate_limit_group_set_cfg(
674 : struct bufferevent_rate_limit_group *g,
675 : const struct ev_token_bucket_cfg *cfg)
676 : {
677 : int same_tick;
678 0 : if (!g || !cfg)
679 0 : return -1;
680 :
681 0 : LOCK_GROUP(g);
682 0 : same_tick = evutil_timercmp(
683 : &g->rate_limit_cfg.tick_timeout, &cfg->tick_timeout, ==);
684 0 : memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg));
685 :
686 0 : if (g->rate_limit.read_limit > (ev_ssize_t)cfg->read_maximum)
687 0 : g->rate_limit.read_limit = cfg->read_maximum;
688 0 : if (g->rate_limit.write_limit > (ev_ssize_t)cfg->write_maximum)
689 0 : g->rate_limit.write_limit = cfg->write_maximum;
690 :
691 0 : if (!same_tick) {
692 : /* This can cause a hiccup in the schedule */
693 0 : event_add(&g->master_refill_event, &cfg->tick_timeout);
694 : }
695 :
696 : /* The new limits might force us to adjust min_share differently. */
697 0 : bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
698 :
699 0 : UNLOCK_GROUP(g);
700 0 : return 0;
701 : }
702 :
703 : int
704 0 : bufferevent_rate_limit_group_set_min_share(
705 : struct bufferevent_rate_limit_group *g,
706 : size_t share)
707 : {
708 0 : if (share > EV_SSIZE_MAX)
709 0 : return -1;
710 :
711 0 : g->configured_min_share = share;
712 :
713 : /* Can't set share to less than the one-tick maximum. IOW, at steady
714 : * state, at least one connection can go per tick. */
715 0 : if (share > g->rate_limit_cfg.read_rate)
716 0 : share = g->rate_limit_cfg.read_rate;
717 0 : if (share > g->rate_limit_cfg.write_rate)
718 0 : share = g->rate_limit_cfg.write_rate;
719 :
720 0 : g->min_share = share;
721 0 : return 0;
722 : }
723 :
724 : void
725 0 : bufferevent_rate_limit_group_free(struct bufferevent_rate_limit_group *g)
726 : {
727 0 : LOCK_GROUP(g);
728 0 : EVUTIL_ASSERT(0 == g->n_members);
729 0 : event_del(&g->master_refill_event);
730 0 : UNLOCK_GROUP(g);
731 0 : EVTHREAD_FREE_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
732 0 : mm_free(g);
733 0 : }
734 :
735 : int
736 0 : bufferevent_add_to_rate_limit_group(struct bufferevent *bev,
737 : struct bufferevent_rate_limit_group *g)
738 : {
739 : int wsuspend, rsuspend;
740 0 : struct bufferevent_private *bevp =
741 : EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
742 0 : BEV_LOCK(bev);
743 :
744 0 : if (!bevp->rate_limiting) {
745 : struct bufferevent_rate_limit *rlim;
746 0 : rlim = mm_calloc(1, sizeof(struct bufferevent_rate_limit));
747 0 : if (!rlim) {
748 0 : BEV_UNLOCK(bev);
749 0 : return -1;
750 : }
751 0 : event_assign(&rlim->refill_bucket_event, bev->ev_base,
752 : -1, EV_FINALIZE, bev_refill_callback_, bevp);
753 0 : bevp->rate_limiting = rlim;
754 : }
755 :
756 0 : if (bevp->rate_limiting->group == g) {
757 0 : BEV_UNLOCK(bev);
758 0 : return 0;
759 : }
760 0 : if (bevp->rate_limiting->group)
761 0 : bufferevent_remove_from_rate_limit_group(bev);
762 :
763 0 : LOCK_GROUP(g);
764 0 : bevp->rate_limiting->group = g;
765 0 : ++g->n_members;
766 0 : LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group);
767 :
768 0 : rsuspend = g->read_suspended;
769 0 : wsuspend = g->write_suspended;
770 :
771 0 : UNLOCK_GROUP(g);
772 :
773 0 : if (rsuspend)
774 0 : bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP);
775 0 : if (wsuspend)
776 0 : bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP);
777 :
778 0 : BEV_UNLOCK(bev);
779 0 : return 0;
780 : }
781 :
782 : int
783 0 : bufferevent_remove_from_rate_limit_group(struct bufferevent *bev)
784 : {
785 0 : return bufferevent_remove_from_rate_limit_group_internal_(bev, 1);
786 : }
787 :
788 : int
789 0 : bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev,
790 : int unsuspend)
791 : {
792 0 : struct bufferevent_private *bevp =
793 : EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
794 0 : BEV_LOCK(bev);
795 0 : if (bevp->rate_limiting && bevp->rate_limiting->group) {
796 0 : struct bufferevent_rate_limit_group *g =
797 0 : bevp->rate_limiting->group;
798 0 : LOCK_GROUP(g);
799 0 : bevp->rate_limiting->group = NULL;
800 0 : --g->n_members;
801 0 : LIST_REMOVE(bevp, rate_limiting->next_in_group);
802 0 : UNLOCK_GROUP(g);
803 : }
804 0 : if (unsuspend) {
805 0 : bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP);
806 0 : bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP);
807 : }
808 0 : BEV_UNLOCK(bev);
809 0 : return 0;
810 : }
811 :
812 : /* ===
813 : * API functions to expose rate limits.
814 : *
815 : * Don't use these from inside Libevent; they're meant to be for use by
816 : * the program.
817 : * === */
818 :
819 : /* Mostly you don't want to use this function from inside libevent;
820 : * bufferevent_get_read_max_() is more likely what you want*/
821 : ev_ssize_t
822 0 : bufferevent_get_read_limit(struct bufferevent *bev)
823 : {
824 : ev_ssize_t r;
825 : struct bufferevent_private *bevp;
826 0 : BEV_LOCK(bev);
827 0 : bevp = BEV_UPCAST(bev);
828 0 : if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
829 0 : bufferevent_update_buckets(bevp);
830 0 : r = bevp->rate_limiting->limit.read_limit;
831 : } else {
832 0 : r = EV_SSIZE_MAX;
833 : }
834 0 : BEV_UNLOCK(bev);
835 0 : return r;
836 : }
837 :
838 : /* Mostly you don't want to use this function from inside libevent;
839 : * bufferevent_get_write_max_() is more likely what you want*/
840 : ev_ssize_t
841 0 : bufferevent_get_write_limit(struct bufferevent *bev)
842 : {
843 : ev_ssize_t r;
844 : struct bufferevent_private *bevp;
845 0 : BEV_LOCK(bev);
846 0 : bevp = BEV_UPCAST(bev);
847 0 : if (bevp->rate_limiting && bevp->rate_limiting->cfg) {
848 0 : bufferevent_update_buckets(bevp);
849 0 : r = bevp->rate_limiting->limit.write_limit;
850 : } else {
851 0 : r = EV_SSIZE_MAX;
852 : }
853 0 : BEV_UNLOCK(bev);
854 0 : return r;
855 : }
856 :
857 : int
858 0 : bufferevent_set_max_single_read(struct bufferevent *bev, size_t size)
859 : {
860 : struct bufferevent_private *bevp;
861 0 : BEV_LOCK(bev);
862 0 : bevp = BEV_UPCAST(bev);
863 0 : if (size == 0 || size > EV_SSIZE_MAX)
864 0 : bevp->max_single_read = MAX_SINGLE_READ_DEFAULT;
865 : else
866 0 : bevp->max_single_read = size;
867 0 : BEV_UNLOCK(bev);
868 0 : return 0;
869 : }
870 :
871 : int
872 0 : bufferevent_set_max_single_write(struct bufferevent *bev, size_t size)
873 : {
874 : struct bufferevent_private *bevp;
875 0 : BEV_LOCK(bev);
876 0 : bevp = BEV_UPCAST(bev);
877 0 : if (size == 0 || size > EV_SSIZE_MAX)
878 0 : bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
879 : else
880 0 : bevp->max_single_write = size;
881 0 : BEV_UNLOCK(bev);
882 0 : return 0;
883 : }
884 :
885 : ev_ssize_t
886 0 : bufferevent_get_max_single_read(struct bufferevent *bev)
887 : {
888 : ev_ssize_t r;
889 :
890 0 : BEV_LOCK(bev);
891 0 : r = BEV_UPCAST(bev)->max_single_read;
892 0 : BEV_UNLOCK(bev);
893 0 : return r;
894 : }
895 :
896 : ev_ssize_t
897 0 : bufferevent_get_max_single_write(struct bufferevent *bev)
898 : {
899 : ev_ssize_t r;
900 :
901 0 : BEV_LOCK(bev);
902 0 : r = BEV_UPCAST(bev)->max_single_write;
903 0 : BEV_UNLOCK(bev);
904 0 : return r;
905 : }
906 :
907 : ev_ssize_t
908 0 : bufferevent_get_max_to_read(struct bufferevent *bev)
909 : {
910 : ev_ssize_t r;
911 0 : BEV_LOCK(bev);
912 0 : r = bufferevent_get_read_max_(BEV_UPCAST(bev));
913 0 : BEV_UNLOCK(bev);
914 0 : return r;
915 : }
916 :
917 : ev_ssize_t
918 0 : bufferevent_get_max_to_write(struct bufferevent *bev)
919 : {
920 : ev_ssize_t r;
921 0 : BEV_LOCK(bev);
922 0 : r = bufferevent_get_write_max_(BEV_UPCAST(bev));
923 0 : BEV_UNLOCK(bev);
924 0 : return r;
925 : }
926 :
927 : const struct ev_token_bucket_cfg *
928 0 : bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) {
929 0 : struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
930 : struct ev_token_bucket_cfg *cfg;
931 :
932 0 : BEV_LOCK(bev);
933 :
934 0 : if (bufev_private->rate_limiting) {
935 0 : cfg = bufev_private->rate_limiting->cfg;
936 : } else {
937 0 : cfg = NULL;
938 : }
939 :
940 0 : BEV_UNLOCK(bev);
941 :
942 0 : return cfg;
943 : }
944 :
945 : /* Mostly you don't want to use this function from inside libevent;
946 : * bufferevent_get_read_max_() is more likely what you want*/
947 : ev_ssize_t
948 0 : bufferevent_rate_limit_group_get_read_limit(
949 : struct bufferevent_rate_limit_group *grp)
950 : {
951 : ev_ssize_t r;
952 0 : LOCK_GROUP(grp);
953 0 : r = grp->rate_limit.read_limit;
954 0 : UNLOCK_GROUP(grp);
955 0 : return r;
956 : }
957 :
958 : /* Mostly you don't want to use this function from inside libevent;
959 : * bufferevent_get_write_max_() is more likely what you want. */
960 : ev_ssize_t
961 0 : bufferevent_rate_limit_group_get_write_limit(
962 : struct bufferevent_rate_limit_group *grp)
963 : {
964 : ev_ssize_t r;
965 0 : LOCK_GROUP(grp);
966 0 : r = grp->rate_limit.write_limit;
967 0 : UNLOCK_GROUP(grp);
968 0 : return r;
969 : }
970 :
971 : int
972 0 : bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr)
973 : {
974 0 : int r = 0;
975 : ev_ssize_t old_limit, new_limit;
976 : struct bufferevent_private *bevp;
977 0 : BEV_LOCK(bev);
978 0 : bevp = BEV_UPCAST(bev);
979 0 : EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
980 0 : old_limit = bevp->rate_limiting->limit.read_limit;
981 :
982 0 : new_limit = (bevp->rate_limiting->limit.read_limit -= decr);
983 0 : if (old_limit > 0 && new_limit <= 0) {
984 0 : bufferevent_suspend_read_(bev, BEV_SUSPEND_BW);
985 0 : if (event_add(&bevp->rate_limiting->refill_bucket_event,
986 0 : &bevp->rate_limiting->cfg->tick_timeout) < 0)
987 0 : r = -1;
988 0 : } else if (old_limit <= 0 && new_limit > 0) {
989 0 : if (!(bevp->write_suspended & BEV_SUSPEND_BW))
990 0 : event_del(&bevp->rate_limiting->refill_bucket_event);
991 0 : bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW);
992 : }
993 :
994 0 : BEV_UNLOCK(bev);
995 0 : return r;
996 : }
997 :
998 : int
999 0 : bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr)
1000 : {
1001 : /* XXXX this is mostly copy-and-paste from
1002 : * bufferevent_decrement_read_limit */
1003 0 : int r = 0;
1004 : ev_ssize_t old_limit, new_limit;
1005 : struct bufferevent_private *bevp;
1006 0 : BEV_LOCK(bev);
1007 0 : bevp = BEV_UPCAST(bev);
1008 0 : EVUTIL_ASSERT(bevp->rate_limiting && bevp->rate_limiting->cfg);
1009 0 : old_limit = bevp->rate_limiting->limit.write_limit;
1010 :
1011 0 : new_limit = (bevp->rate_limiting->limit.write_limit -= decr);
1012 0 : if (old_limit > 0 && new_limit <= 0) {
1013 0 : bufferevent_suspend_write_(bev, BEV_SUSPEND_BW);
1014 0 : if (event_add(&bevp->rate_limiting->refill_bucket_event,
1015 0 : &bevp->rate_limiting->cfg->tick_timeout) < 0)
1016 0 : r = -1;
1017 0 : } else if (old_limit <= 0 && new_limit > 0) {
1018 0 : if (!(bevp->read_suspended & BEV_SUSPEND_BW))
1019 0 : event_del(&bevp->rate_limiting->refill_bucket_event);
1020 0 : bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW);
1021 : }
1022 :
1023 0 : BEV_UNLOCK(bev);
1024 0 : return r;
1025 : }
1026 :
1027 : int
1028 0 : bufferevent_rate_limit_group_decrement_read(
1029 : struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1030 : {
1031 0 : int r = 0;
1032 : ev_ssize_t old_limit, new_limit;
1033 0 : LOCK_GROUP(grp);
1034 0 : old_limit = grp->rate_limit.read_limit;
1035 0 : new_limit = (grp->rate_limit.read_limit -= decr);
1036 :
1037 0 : if (old_limit > 0 && new_limit <= 0) {
1038 0 : bev_group_suspend_reading_(grp);
1039 0 : } else if (old_limit <= 0 && new_limit > 0) {
1040 0 : bev_group_unsuspend_reading_(grp);
1041 : }
1042 :
1043 0 : UNLOCK_GROUP(grp);
1044 0 : return r;
1045 : }
1046 :
1047 : int
1048 0 : bufferevent_rate_limit_group_decrement_write(
1049 : struct bufferevent_rate_limit_group *grp, ev_ssize_t decr)
1050 : {
1051 0 : int r = 0;
1052 : ev_ssize_t old_limit, new_limit;
1053 0 : LOCK_GROUP(grp);
1054 0 : old_limit = grp->rate_limit.write_limit;
1055 0 : new_limit = (grp->rate_limit.write_limit -= decr);
1056 :
1057 0 : if (old_limit > 0 && new_limit <= 0) {
1058 0 : bev_group_suspend_writing_(grp);
1059 0 : } else if (old_limit <= 0 && new_limit > 0) {
1060 0 : bev_group_unsuspend_writing_(grp);
1061 : }
1062 :
1063 0 : UNLOCK_GROUP(grp);
1064 0 : return r;
1065 : }
1066 :
1067 : void
1068 0 : bufferevent_rate_limit_group_get_totals(struct bufferevent_rate_limit_group *grp,
1069 : ev_uint64_t *total_read_out, ev_uint64_t *total_written_out)
1070 : {
1071 0 : EVUTIL_ASSERT(grp != NULL);
1072 0 : if (total_read_out)
1073 0 : *total_read_out = grp->total_read;
1074 0 : if (total_written_out)
1075 0 : *total_written_out = grp->total_written;
1076 0 : }
1077 :
1078 : void
1079 0 : bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *grp)
1080 : {
1081 0 : grp->total_read = grp->total_written = 0;
1082 0 : }
1083 :
1084 : int
1085 0 : bufferevent_ratelim_init_(struct bufferevent_private *bev)
1086 : {
1087 0 : bev->rate_limiting = NULL;
1088 0 : bev->max_single_read = MAX_SINGLE_READ_DEFAULT;
1089 0 : bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT;
1090 :
1091 0 : return 0;
1092 : }
|