Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 :
6 : #include "nspr.h"
7 :
8 : /*
9 : * Thread pools
10 : * Thread pools create and manage threads to provide support for
11 : * scheduling jobs onto one or more threads.
12 : *
13 : */
14 : #ifdef OPT_WINNT
15 : #include <windows.h>
16 : #endif
17 :
18 : /*
19 : * worker thread
20 : */
21 : typedef struct wthread {
22 : PRCList links;
23 : PRThread *thread;
24 : } wthread;
25 :
26 : /*
27 : * queue of timer jobs
28 : */
29 : typedef struct timer_jobq {
30 : PRCList list;
31 : PRLock *lock;
32 : PRCondVar *cv;
33 : PRInt32 cnt;
34 : PRCList wthreads;
35 : } timer_jobq;
36 :
37 : /*
38 : * queue of jobs
39 : */
40 : typedef struct tp_jobq {
41 : PRCList list;
42 : PRInt32 cnt;
43 : PRLock *lock;
44 : PRCondVar *cv;
45 : PRCList wthreads;
46 : #ifdef OPT_WINNT
47 : HANDLE nt_completion_port;
48 : #endif
49 : } tp_jobq;
50 :
51 : /*
52 : * queue of IO jobs
53 : */
54 : typedef struct io_jobq {
55 : PRCList list;
56 : PRPollDesc *pollfds;
57 : PRInt32 npollfds;
58 : PRJob **polljobs;
59 : PRLock *lock;
60 : PRInt32 cnt;
61 : PRFileDesc *notify_fd;
62 : PRCList wthreads;
63 : } io_jobq;
64 :
65 : /*
66 : * Threadpool
67 : */
68 : struct PRThreadPool {
69 : PRInt32 init_threads;
70 : PRInt32 max_threads;
71 : PRInt32 current_threads;
72 : PRInt32 idle_threads;
73 : PRUint32 stacksize;
74 : tp_jobq jobq;
75 : io_jobq ioq;
76 : timer_jobq timerq;
77 : PRLock *join_lock; /* used with jobp->join_cv */
78 : PRCondVar *shutdown_cv;
79 : PRBool shutdown;
80 : };
81 :
82 : typedef enum io_op_type
83 : { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
84 :
85 : #ifdef OPT_WINNT
86 : typedef struct NT_notifier {
87 : OVERLAPPED overlapped; /* must be first */
88 : PRJob *jobp;
89 : } NT_notifier;
90 : #endif
91 :
92 : struct PRJob {
93 : PRCList links; /* for linking jobs */
94 : PRBool on_ioq; /* job on ioq */
95 : PRBool on_timerq; /* job on timerq */
96 : PRJobFn job_func;
97 : void *job_arg;
98 : PRCondVar *join_cv;
99 : PRBool join_wait; /* == PR_TRUE, when waiting to join */
100 : PRCondVar *cancel_cv; /* for cancelling IO jobs */
101 : PRBool cancel_io; /* for cancelling IO jobs */
102 : PRThreadPool *tpool; /* back pointer to thread pool */
103 : PRJobIoDesc *iod;
104 : io_op_type io_op;
105 : PRInt16 io_poll_flags;
106 : PRNetAddr *netaddr;
107 : PRIntervalTime timeout; /* relative value */
108 : PRIntervalTime absolute;
109 : #ifdef OPT_WINNT
110 : NT_notifier nt_notifier;
111 : #endif
112 : };
113 :
114 : #define JOB_LINKS_PTR(_qp) \
115 : ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
116 :
117 : #define WTHREAD_LINKS_PTR(_qp) \
118 : ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
119 :
120 : #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
121 :
122 : #define JOIN_NOTIFY(_jobp) \
123 : PR_BEGIN_MACRO \
124 : PR_Lock(_jobp->tpool->join_lock); \
125 : _jobp->join_wait = PR_FALSE; \
126 : PR_NotifyCondVar(_jobp->join_cv); \
127 : PR_Unlock(_jobp->tpool->join_lock); \
128 : PR_END_MACRO
129 :
130 : #define CANCEL_IO_JOB(jobp) \
131 : PR_BEGIN_MACRO \
132 : jobp->cancel_io = PR_FALSE; \
133 : jobp->on_ioq = PR_FALSE; \
134 : PR_REMOVE_AND_INIT_LINK(&jobp->links); \
135 : tp->ioq.cnt--; \
136 : PR_NotifyCondVar(jobp->cancel_cv); \
137 : PR_END_MACRO
138 :
139 : static void delete_job(PRJob *jobp);
140 : static PRThreadPool * alloc_threadpool(void);
141 : static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
142 : static void notify_ioq(PRThreadPool *tp);
143 : static void notify_timerq(PRThreadPool *tp);
144 :
145 : /*
146 : * locks are acquired in the following order
147 : *
148 : * tp->ioq.lock,tp->timerq.lock
149 : * |
150 : * V
151 : * tp->jobq->lock
152 : */
153 :
154 : /*
155 : * worker thread function
156 : */
157 0 : static void wstart(void *arg)
158 : {
159 0 : PRThreadPool *tp = (PRThreadPool *) arg;
160 : PRCList *head;
161 :
162 : /*
163 : * execute jobs until shutdown
164 : */
165 0 : while (!tp->shutdown) {
166 : PRJob *jobp;
167 : #ifdef OPT_WINNT
168 : BOOL rv;
169 : DWORD unused, shutdown;
170 : LPOVERLAPPED olp;
171 :
172 : PR_Lock(tp->jobq.lock);
173 : tp->idle_threads++;
174 : PR_Unlock(tp->jobq.lock);
175 : rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
176 : &unused, &shutdown, &olp, INFINITE);
177 :
178 : PR_ASSERT(rv);
179 : if (shutdown)
180 : break;
181 : jobp = ((NT_notifier *) olp)->jobp;
182 : PR_Lock(tp->jobq.lock);
183 : tp->idle_threads--;
184 : tp->jobq.cnt--;
185 : PR_Unlock(tp->jobq.lock);
186 : #else
187 :
188 0 : PR_Lock(tp->jobq.lock);
189 0 : while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
190 0 : tp->idle_threads++;
191 0 : PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
192 0 : tp->idle_threads--;
193 : }
194 0 : if (tp->shutdown) {
195 0 : PR_Unlock(tp->jobq.lock);
196 0 : break;
197 : }
198 0 : head = PR_LIST_HEAD(&tp->jobq.list);
199 : /*
200 : * remove job from queue
201 : */
202 0 : PR_REMOVE_AND_INIT_LINK(head);
203 0 : tp->jobq.cnt--;
204 0 : jobp = JOB_LINKS_PTR(head);
205 0 : PR_Unlock(tp->jobq.lock);
206 : #endif
207 :
208 0 : jobp->job_func(jobp->job_arg);
209 0 : if (!JOINABLE_JOB(jobp)) {
210 0 : delete_job(jobp);
211 : } else {
212 0 : JOIN_NOTIFY(jobp);
213 : }
214 : }
215 0 : PR_Lock(tp->jobq.lock);
216 0 : tp->current_threads--;
217 0 : PR_Unlock(tp->jobq.lock);
218 0 : }
219 :
220 : /*
221 : * add a job to the work queue
222 : */
223 : static void
224 0 : add_to_jobq(PRThreadPool *tp, PRJob *jobp)
225 : {
226 : /*
227 : * add to jobq
228 : */
229 : #ifdef OPT_WINNT
230 : PR_Lock(tp->jobq.lock);
231 : tp->jobq.cnt++;
232 : PR_Unlock(tp->jobq.lock);
233 : /*
234 : * notify worker thread(s)
235 : */
236 : PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
237 : FALSE, &jobp->nt_notifier.overlapped);
238 : #else
239 0 : PR_Lock(tp->jobq.lock);
240 0 : PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
241 0 : tp->jobq.cnt++;
242 0 : if ((tp->idle_threads < tp->jobq.cnt) &&
243 0 : (tp->current_threads < tp->max_threads)) {
244 : wthread *wthrp;
245 : /*
246 : * increment thread count and unlock the jobq lock
247 : */
248 0 : tp->current_threads++;
249 0 : PR_Unlock(tp->jobq.lock);
250 : /* create new worker thread */
251 0 : wthrp = PR_NEWZAP(wthread);
252 0 : if (wthrp) {
253 0 : wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
254 : tp, PR_PRIORITY_NORMAL,
255 : PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
256 0 : if (NULL == wthrp->thread) {
257 0 : PR_DELETE(wthrp); /* this sets wthrp to NULL */
258 : }
259 : }
260 0 : PR_Lock(tp->jobq.lock);
261 0 : if (NULL == wthrp) {
262 0 : tp->current_threads--;
263 : } else {
264 0 : PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
265 : }
266 : }
267 : /*
268 : * wakeup a worker thread
269 : */
270 0 : PR_NotifyCondVar(tp->jobq.cv);
271 0 : PR_Unlock(tp->jobq.lock);
272 : #endif
273 0 : }
274 :
275 : /*
276 : * io worker thread function
277 : */
278 0 : static void io_wstart(void *arg)
279 : {
280 0 : PRThreadPool *tp = (PRThreadPool *) arg;
281 : int pollfd_cnt, pollfds_used;
282 : int rv;
283 : PRCList *qp, *nextqp;
284 0 : PRPollDesc *pollfds = NULL;
285 0 : PRJob **polljobs = NULL;
286 : int poll_timeout;
287 : PRIntervalTime now;
288 :
289 : /*
290 : * scan io_jobq
291 : * construct poll list
292 : * call PR_Poll
293 : * for all fds, for which poll returns true, move the job to
294 : * jobq and wakeup worker thread.
295 : */
296 0 : while (!tp->shutdown) {
297 : PRJob *jobp;
298 :
299 0 : pollfd_cnt = tp->ioq.cnt + 10;
300 0 : if (pollfd_cnt > tp->ioq.npollfds) {
301 :
302 : /*
303 : * re-allocate pollfd array if the current one is not large
304 : * enough
305 : */
306 0 : if (NULL != tp->ioq.pollfds)
307 0 : PR_Free(tp->ioq.pollfds);
308 0 : tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
309 : (sizeof(PRPollDesc) + sizeof(PRJob *)));
310 0 : PR_ASSERT(NULL != tp->ioq.pollfds);
311 : /*
312 : * array of pollfds
313 : */
314 0 : pollfds = tp->ioq.pollfds;
315 0 : tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
316 : /*
317 : * parallel array of jobs
318 : */
319 0 : polljobs = tp->ioq.polljobs;
320 0 : tp->ioq.npollfds = pollfd_cnt;
321 : }
322 :
323 0 : pollfds_used = 0;
324 : /*
325 : * add the notify fd; used for unblocking io thread(s)
326 : */
327 0 : pollfds[pollfds_used].fd = tp->ioq.notify_fd;
328 0 : pollfds[pollfds_used].in_flags = PR_POLL_READ;
329 0 : pollfds[pollfds_used].out_flags = 0;
330 0 : polljobs[pollfds_used] = NULL;
331 0 : pollfds_used++;
332 : /*
333 : * fill in the pollfd array
334 : */
335 0 : PR_Lock(tp->ioq.lock);
336 0 : for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
337 0 : nextqp = qp->next;
338 0 : jobp = JOB_LINKS_PTR(qp);
339 0 : if (jobp->cancel_io) {
340 0 : CANCEL_IO_JOB(jobp);
341 0 : continue;
342 : }
343 0 : if (pollfds_used == (pollfd_cnt))
344 0 : break;
345 0 : pollfds[pollfds_used].fd = jobp->iod->socket;
346 0 : pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
347 0 : pollfds[pollfds_used].out_flags = 0;
348 0 : polljobs[pollfds_used] = jobp;
349 :
350 0 : pollfds_used++;
351 : }
352 0 : if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
353 0 : qp = tp->ioq.list.next;
354 0 : jobp = JOB_LINKS_PTR(qp);
355 0 : if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
356 0 : poll_timeout = PR_INTERVAL_NO_TIMEOUT;
357 0 : else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
358 0 : poll_timeout = PR_INTERVAL_NO_WAIT;
359 : else {
360 0 : poll_timeout = jobp->absolute - PR_IntervalNow();
361 0 : if (poll_timeout <= 0) /* already timed out */
362 0 : poll_timeout = PR_INTERVAL_NO_WAIT;
363 : }
364 : } else {
365 0 : poll_timeout = PR_INTERVAL_NO_TIMEOUT;
366 : }
367 0 : PR_Unlock(tp->ioq.lock);
368 :
369 : /*
370 : * XXXX
371 : * should retry if more jobs have been added to the queue?
372 : *
373 : */
374 0 : PR_ASSERT(pollfds_used <= pollfd_cnt);
375 0 : rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
376 :
377 0 : if (tp->shutdown) {
378 0 : break;
379 : }
380 :
381 0 : if (rv > 0) {
382 : /*
383 : * at least one io event is set
384 : */
385 : PRStatus rval_status;
386 : PRInt32 index;
387 :
388 0 : PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
389 : /*
390 : * reset the pollable event, if notified
391 : */
392 0 : if (pollfds[0].out_flags & PR_POLL_READ) {
393 0 : rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
394 0 : PR_ASSERT(PR_SUCCESS == rval_status);
395 : }
396 :
397 0 : for(index = 1; index < (pollfds_used); index++) {
398 0 : PRInt16 events = pollfds[index].in_flags;
399 0 : PRInt16 revents = pollfds[index].out_flags;
400 0 : jobp = polljobs[index];
401 :
402 0 : if ((revents & PR_POLL_NVAL) || /* busted in all cases */
403 0 : (revents & PR_POLL_ERR) ||
404 0 : ((events & PR_POLL_WRITE) &&
405 0 : (revents & PR_POLL_HUP))) { /* write op & hup */
406 0 : PR_Lock(tp->ioq.lock);
407 0 : if (jobp->cancel_io) {
408 0 : CANCEL_IO_JOB(jobp);
409 0 : PR_Unlock(tp->ioq.lock);
410 0 : continue;
411 : }
412 0 : PR_REMOVE_AND_INIT_LINK(&jobp->links);
413 0 : tp->ioq.cnt--;
414 0 : jobp->on_ioq = PR_FALSE;
415 0 : PR_Unlock(tp->ioq.lock);
416 :
417 : /* set error */
418 0 : if (PR_POLL_NVAL & revents)
419 0 : jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
420 0 : else if (PR_POLL_HUP & revents)
421 0 : jobp->iod->error = PR_CONNECT_RESET_ERROR;
422 : else
423 0 : jobp->iod->error = PR_IO_ERROR;
424 :
425 : /*
426 : * add to jobq
427 : */
428 0 : add_to_jobq(tp, jobp);
429 0 : } else if (revents) {
430 : /*
431 : * add to jobq
432 : */
433 0 : PR_Lock(tp->ioq.lock);
434 0 : if (jobp->cancel_io) {
435 0 : CANCEL_IO_JOB(jobp);
436 0 : PR_Unlock(tp->ioq.lock);
437 0 : continue;
438 : }
439 0 : PR_REMOVE_AND_INIT_LINK(&jobp->links);
440 0 : tp->ioq.cnt--;
441 0 : jobp->on_ioq = PR_FALSE;
442 0 : PR_Unlock(tp->ioq.lock);
443 :
444 0 : if (jobp->io_op == JOB_IO_CONNECT) {
445 0 : if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
446 0 : jobp->iod->error = 0;
447 : else
448 0 : jobp->iod->error = PR_GetError();
449 : } else
450 0 : jobp->iod->error = 0;
451 :
452 0 : add_to_jobq(tp, jobp);
453 : }
454 : }
455 : }
456 : /*
457 : * timeout processing
458 : */
459 0 : now = PR_IntervalNow();
460 0 : PR_Lock(tp->ioq.lock);
461 0 : for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
462 0 : nextqp = qp->next;
463 0 : jobp = JOB_LINKS_PTR(qp);
464 0 : if (jobp->cancel_io) {
465 0 : CANCEL_IO_JOB(jobp);
466 0 : continue;
467 : }
468 0 : if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
469 0 : break;
470 0 : if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
471 0 : ((PRInt32)(jobp->absolute - now) > 0))
472 0 : break;
473 0 : PR_REMOVE_AND_INIT_LINK(&jobp->links);
474 0 : tp->ioq.cnt--;
475 0 : jobp->on_ioq = PR_FALSE;
476 0 : jobp->iod->error = PR_IO_TIMEOUT_ERROR;
477 0 : add_to_jobq(tp, jobp);
478 : }
479 0 : PR_Unlock(tp->ioq.lock);
480 : }
481 0 : }
482 :
483 : /*
484 : * timer worker thread function
485 : */
486 0 : static void timer_wstart(void *arg)
487 : {
488 0 : PRThreadPool *tp = (PRThreadPool *) arg;
489 : PRCList *qp;
490 : PRIntervalTime timeout;
491 : PRIntervalTime now;
492 :
493 : /*
494 : * call PR_WaitCondVar with minimum value of all timeouts
495 : */
496 0 : while (!tp->shutdown) {
497 : PRJob *jobp;
498 :
499 0 : PR_Lock(tp->timerq.lock);
500 0 : if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
501 0 : timeout = PR_INTERVAL_NO_TIMEOUT;
502 : } else {
503 : PRCList *qp;
504 :
505 0 : qp = tp->timerq.list.next;
506 0 : jobp = JOB_LINKS_PTR(qp);
507 :
508 0 : timeout = jobp->absolute - PR_IntervalNow();
509 0 : if (timeout <= 0)
510 0 : timeout = PR_INTERVAL_NO_WAIT; /* already timed out */
511 : }
512 0 : if (PR_INTERVAL_NO_WAIT != timeout)
513 0 : PR_WaitCondVar(tp->timerq.cv, timeout);
514 0 : if (tp->shutdown) {
515 0 : PR_Unlock(tp->timerq.lock);
516 0 : break;
517 : }
518 : /*
519 : * move expired-timer jobs to jobq
520 : */
521 0 : now = PR_IntervalNow();
522 0 : while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
523 0 : qp = tp->timerq.list.next;
524 0 : jobp = JOB_LINKS_PTR(qp);
525 :
526 0 : if ((PRInt32)(jobp->absolute - now) > 0) {
527 0 : break;
528 : }
529 : /*
530 : * job timed out
531 : */
532 0 : PR_REMOVE_AND_INIT_LINK(&jobp->links);
533 0 : tp->timerq.cnt--;
534 0 : jobp->on_timerq = PR_FALSE;
535 0 : add_to_jobq(tp, jobp);
536 : }
537 0 : PR_Unlock(tp->timerq.lock);
538 : }
539 0 : }
540 :
541 : static void
542 0 : delete_threadpool(PRThreadPool *tp)
543 : {
544 0 : if (NULL != tp) {
545 0 : if (NULL != tp->shutdown_cv)
546 0 : PR_DestroyCondVar(tp->shutdown_cv);
547 0 : if (NULL != tp->jobq.cv)
548 0 : PR_DestroyCondVar(tp->jobq.cv);
549 0 : if (NULL != tp->jobq.lock)
550 0 : PR_DestroyLock(tp->jobq.lock);
551 0 : if (NULL != tp->join_lock)
552 0 : PR_DestroyLock(tp->join_lock);
553 : #ifdef OPT_WINNT
554 : if (NULL != tp->jobq.nt_completion_port)
555 : CloseHandle(tp->jobq.nt_completion_port);
556 : #endif
557 : /* Timer queue */
558 0 : if (NULL != tp->timerq.cv)
559 0 : PR_DestroyCondVar(tp->timerq.cv);
560 0 : if (NULL != tp->timerq.lock)
561 0 : PR_DestroyLock(tp->timerq.lock);
562 :
563 0 : if (NULL != tp->ioq.lock)
564 0 : PR_DestroyLock(tp->ioq.lock);
565 0 : if (NULL != tp->ioq.pollfds)
566 0 : PR_Free(tp->ioq.pollfds);
567 0 : if (NULL != tp->ioq.notify_fd)
568 0 : PR_DestroyPollableEvent(tp->ioq.notify_fd);
569 0 : PR_Free(tp);
570 : }
571 0 : return;
572 : }
573 :
574 : static PRThreadPool *
575 0 : alloc_threadpool(void)
576 : {
577 : PRThreadPool *tp;
578 :
579 0 : tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
580 0 : if (NULL == tp)
581 0 : goto failed;
582 0 : tp->jobq.lock = PR_NewLock();
583 0 : if (NULL == tp->jobq.lock)
584 0 : goto failed;
585 0 : tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
586 0 : if (NULL == tp->jobq.cv)
587 0 : goto failed;
588 0 : tp->join_lock = PR_NewLock();
589 0 : if (NULL == tp->join_lock)
590 0 : goto failed;
591 : #ifdef OPT_WINNT
592 : tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
593 : NULL, 0, 0);
594 : if (NULL == tp->jobq.nt_completion_port)
595 : goto failed;
596 : #endif
597 :
598 0 : tp->ioq.lock = PR_NewLock();
599 0 : if (NULL == tp->ioq.lock)
600 0 : goto failed;
601 :
602 : /* Timer queue */
603 :
604 0 : tp->timerq.lock = PR_NewLock();
605 0 : if (NULL == tp->timerq.lock)
606 0 : goto failed;
607 0 : tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
608 0 : if (NULL == tp->timerq.cv)
609 0 : goto failed;
610 :
611 0 : tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
612 0 : if (NULL == tp->shutdown_cv)
613 0 : goto failed;
614 0 : tp->ioq.notify_fd = PR_NewPollableEvent();
615 0 : if (NULL == tp->ioq.notify_fd)
616 0 : goto failed;
617 0 : return tp;
618 : failed:
619 0 : delete_threadpool(tp);
620 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
621 0 : return NULL;
622 : }
623 :
624 : /* Create thread pool */
625 : PR_IMPLEMENT(PRThreadPool *)
626 : PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
627 : PRUint32 stacksize)
628 : {
629 : PRThreadPool *tp;
630 : PRThread *thr;
631 : int i;
632 : wthread *wthrp;
633 :
634 0 : tp = alloc_threadpool();
635 0 : if (NULL == tp)
636 0 : return NULL;
637 :
638 0 : tp->init_threads = initial_threads;
639 0 : tp->max_threads = max_threads;
640 0 : tp->stacksize = stacksize;
641 0 : PR_INIT_CLIST(&tp->jobq.list);
642 0 : PR_INIT_CLIST(&tp->ioq.list);
643 0 : PR_INIT_CLIST(&tp->timerq.list);
644 0 : PR_INIT_CLIST(&tp->jobq.wthreads);
645 0 : PR_INIT_CLIST(&tp->ioq.wthreads);
646 0 : PR_INIT_CLIST(&tp->timerq.wthreads);
647 0 : tp->shutdown = PR_FALSE;
648 :
649 0 : PR_Lock(tp->jobq.lock);
650 0 : for(i=0; i < initial_threads; ++i) {
651 :
652 0 : thr = PR_CreateThread(PR_USER_THREAD, wstart,
653 : tp, PR_PRIORITY_NORMAL,
654 : PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
655 0 : PR_ASSERT(thr);
656 0 : wthrp = PR_NEWZAP(wthread);
657 0 : PR_ASSERT(wthrp);
658 0 : wthrp->thread = thr;
659 0 : PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
660 : }
661 0 : tp->current_threads = initial_threads;
662 :
663 0 : thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
664 : tp, PR_PRIORITY_NORMAL,
665 : PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
666 0 : PR_ASSERT(thr);
667 0 : wthrp = PR_NEWZAP(wthread);
668 0 : PR_ASSERT(wthrp);
669 0 : wthrp->thread = thr;
670 0 : PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
671 :
672 0 : thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
673 : tp, PR_PRIORITY_NORMAL,
674 : PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
675 0 : PR_ASSERT(thr);
676 0 : wthrp = PR_NEWZAP(wthread);
677 0 : PR_ASSERT(wthrp);
678 0 : wthrp->thread = thr;
679 0 : PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
680 :
681 0 : PR_Unlock(tp->jobq.lock);
682 0 : return tp;
683 : }
684 :
685 : static void
686 0 : delete_job(PRJob *jobp)
687 : {
688 0 : if (NULL != jobp) {
689 0 : if (NULL != jobp->join_cv) {
690 0 : PR_DestroyCondVar(jobp->join_cv);
691 0 : jobp->join_cv = NULL;
692 : }
693 0 : if (NULL != jobp->cancel_cv) {
694 0 : PR_DestroyCondVar(jobp->cancel_cv);
695 0 : jobp->cancel_cv = NULL;
696 : }
697 0 : PR_DELETE(jobp);
698 : }
699 0 : }
700 :
701 : static PRJob *
702 0 : alloc_job(PRBool joinable, PRThreadPool *tp)
703 : {
704 : PRJob *jobp;
705 :
706 0 : jobp = PR_NEWZAP(PRJob);
707 0 : if (NULL == jobp)
708 0 : goto failed;
709 0 : if (joinable) {
710 0 : jobp->join_cv = PR_NewCondVar(tp->join_lock);
711 0 : jobp->join_wait = PR_TRUE;
712 0 : if (NULL == jobp->join_cv)
713 0 : goto failed;
714 : } else {
715 0 : jobp->join_cv = NULL;
716 : }
717 : #ifdef OPT_WINNT
718 : jobp->nt_notifier.jobp = jobp;
719 : #endif
720 0 : return jobp;
721 : failed:
722 0 : delete_job(jobp);
723 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
724 0 : return NULL;
725 : }
726 :
727 : /* queue a job */
728 : PR_IMPLEMENT(PRJob *)
729 : PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
730 : {
731 : PRJob *jobp;
732 :
733 0 : jobp = alloc_job(joinable, tpool);
734 0 : if (NULL == jobp)
735 0 : return NULL;
736 :
737 0 : jobp->job_func = fn;
738 0 : jobp->job_arg = arg;
739 0 : jobp->tpool = tpool;
740 :
741 0 : add_to_jobq(tpool, jobp);
742 0 : return jobp;
743 : }
744 :
745 : /* queue a job, when a socket is readable or writeable */
746 : static PRJob *
747 0 : queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
748 : PRBool joinable, io_op_type op)
749 : {
750 : PRJob *jobp;
751 : PRIntervalTime now;
752 :
753 0 : jobp = alloc_job(joinable, tpool);
754 0 : if (NULL == jobp) {
755 0 : return NULL;
756 : }
757 :
758 : /*
759 : * Add a new job to io_jobq
760 : * wakeup io worker thread
761 : */
762 :
763 0 : jobp->job_func = fn;
764 0 : jobp->job_arg = arg;
765 0 : jobp->tpool = tpool;
766 0 : jobp->iod = iod;
767 0 : if (JOB_IO_READ == op) {
768 0 : jobp->io_op = JOB_IO_READ;
769 0 : jobp->io_poll_flags = PR_POLL_READ;
770 0 : } else if (JOB_IO_WRITE == op) {
771 0 : jobp->io_op = JOB_IO_WRITE;
772 0 : jobp->io_poll_flags = PR_POLL_WRITE;
773 0 : } else if (JOB_IO_ACCEPT == op) {
774 0 : jobp->io_op = JOB_IO_ACCEPT;
775 0 : jobp->io_poll_flags = PR_POLL_READ;
776 0 : } else if (JOB_IO_CONNECT == op) {
777 0 : jobp->io_op = JOB_IO_CONNECT;
778 0 : jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
779 : } else {
780 0 : delete_job(jobp);
781 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
782 0 : return NULL;
783 : }
784 :
785 0 : jobp->timeout = iod->timeout;
786 0 : if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
787 0 : (PR_INTERVAL_NO_WAIT == iod->timeout)) {
788 0 : jobp->absolute = iod->timeout;
789 : } else {
790 0 : now = PR_IntervalNow();
791 0 : jobp->absolute = now + iod->timeout;
792 : }
793 :
794 :
795 0 : PR_Lock(tpool->ioq.lock);
796 :
797 0 : if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
798 0 : (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
799 0 : PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
800 0 : } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
801 0 : PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
802 : } else {
803 : PRCList *qp;
804 : PRJob *tmp_jobp;
805 : /*
806 : * insert into the timeout-sorted ioq
807 : */
808 0 : for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
809 0 : qp = qp->prev) {
810 0 : tmp_jobp = JOB_LINKS_PTR(qp);
811 0 : if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
812 0 : break;
813 : }
814 : }
815 0 : PR_INSERT_AFTER(&jobp->links,qp);
816 : }
817 :
818 0 : jobp->on_ioq = PR_TRUE;
819 0 : tpool->ioq.cnt++;
820 : /*
821 : * notify io worker thread(s)
822 : */
823 0 : PR_Unlock(tpool->ioq.lock);
824 0 : notify_ioq(tpool);
825 0 : return jobp;
826 : }
827 :
828 : /* queue a job, when a socket is readable */
829 : PR_IMPLEMENT(PRJob *)
830 : PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
831 : PRBool joinable)
832 : {
833 0 : return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
834 : }
835 :
836 : /* queue a job, when a socket is writeable */
837 : PR_IMPLEMENT(PRJob *)
838 : PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
839 : PRBool joinable)
840 : {
841 0 : return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
842 : }
843 :
844 :
845 : /* queue a job, when a socket has a pending connection */
846 : PR_IMPLEMENT(PRJob *)
847 : PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
848 : void * arg, PRBool joinable)
849 : {
850 0 : return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
851 : }
852 :
853 : /* queue a job, when a socket can be connected */
854 : PR_IMPLEMENT(PRJob *)
855 : PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
856 : const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
857 : {
858 : PRStatus rv;
859 : PRErrorCode err;
860 :
861 0 : rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
862 0 : if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
863 : /* connection pending */
864 0 : return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
865 : }
866 : /*
867 : * connection succeeded or failed; add to jobq right away
868 : */
869 0 : if (rv == PR_FAILURE)
870 0 : iod->error = err;
871 : else
872 0 : iod->error = 0;
873 0 : return(PR_QueueJob(tpool, fn, arg, joinable));
874 :
875 : }
876 :
877 : /* queue a job, when a timer expires */
878 : PR_IMPLEMENT(PRJob *)
879 : PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
880 : PRJobFn fn, void * arg, PRBool joinable)
881 : {
882 : PRIntervalTime now;
883 : PRJob *jobp;
884 :
885 0 : if (PR_INTERVAL_NO_TIMEOUT == timeout) {
886 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
887 0 : return NULL;
888 : }
889 0 : if (PR_INTERVAL_NO_WAIT == timeout) {
890 : /*
891 : * no waiting; add to jobq right away
892 : */
893 0 : return(PR_QueueJob(tpool, fn, arg, joinable));
894 : }
895 0 : jobp = alloc_job(joinable, tpool);
896 0 : if (NULL == jobp) {
897 0 : return NULL;
898 : }
899 :
900 : /*
901 : * Add a new job to timer_jobq
902 : * wakeup timer worker thread
903 : */
904 :
905 0 : jobp->job_func = fn;
906 0 : jobp->job_arg = arg;
907 0 : jobp->tpool = tpool;
908 0 : jobp->timeout = timeout;
909 :
910 0 : now = PR_IntervalNow();
911 0 : jobp->absolute = now + timeout;
912 :
913 :
914 0 : PR_Lock(tpool->timerq.lock);
915 0 : jobp->on_timerq = PR_TRUE;
916 0 : if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
917 0 : PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
918 : else {
919 : PRCList *qp;
920 : PRJob *tmp_jobp;
921 : /*
922 : * insert into the sorted timer jobq
923 : */
924 0 : for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
925 0 : qp = qp->prev) {
926 0 : tmp_jobp = JOB_LINKS_PTR(qp);
927 0 : if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
928 0 : break;
929 : }
930 : }
931 0 : PR_INSERT_AFTER(&jobp->links,qp);
932 : }
933 0 : tpool->timerq.cnt++;
934 : /*
935 : * notify timer worker thread(s)
936 : */
937 0 : notify_timerq(tpool);
938 0 : PR_Unlock(tpool->timerq.lock);
939 0 : return jobp;
940 : }
941 :
942 : static void
943 0 : notify_timerq(PRThreadPool *tp)
944 : {
945 : /*
946 : * wakeup the timer thread(s)
947 : */
948 0 : PR_NotifyCondVar(tp->timerq.cv);
949 0 : }
950 :
951 : static void
952 0 : notify_ioq(PRThreadPool *tp)
953 : {
954 : PRStatus rval_status;
955 :
956 : /*
957 : * wakeup the io thread(s)
958 : */
959 0 : rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
960 0 : PR_ASSERT(PR_SUCCESS == rval_status);
961 0 : }
962 :
963 : /*
964 : * cancel a job
965 : *
966 : * XXXX: is this needed? likely to be removed
967 : */
968 : PR_IMPLEMENT(PRStatus)
969 : PR_CancelJob(PRJob *jobp) {
970 :
971 0 : PRStatus rval = PR_FAILURE;
972 : PRThreadPool *tp;
973 :
974 0 : if (jobp->on_timerq) {
975 : /*
976 : * now, check again while holding the timerq lock
977 : */
978 0 : tp = jobp->tpool;
979 0 : PR_Lock(tp->timerq.lock);
980 0 : if (jobp->on_timerq) {
981 0 : jobp->on_timerq = PR_FALSE;
982 0 : PR_REMOVE_AND_INIT_LINK(&jobp->links);
983 0 : tp->timerq.cnt--;
984 0 : PR_Unlock(tp->timerq.lock);
985 0 : if (!JOINABLE_JOB(jobp)) {
986 0 : delete_job(jobp);
987 : } else {
988 0 : JOIN_NOTIFY(jobp);
989 : }
990 0 : rval = PR_SUCCESS;
991 : } else
992 0 : PR_Unlock(tp->timerq.lock);
993 0 : } else if (jobp->on_ioq) {
994 : /*
995 : * now, check again while holding the ioq lock
996 : */
997 0 : tp = jobp->tpool;
998 0 : PR_Lock(tp->ioq.lock);
999 0 : if (jobp->on_ioq) {
1000 0 : jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1001 0 : if (NULL == jobp->cancel_cv) {
1002 0 : PR_Unlock(tp->ioq.lock);
1003 0 : PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
1004 0 : return PR_FAILURE;
1005 : }
1006 : /*
1007 : * mark job 'cancelled' and notify io thread(s)
1008 : * XXXX:
1009 : * this assumes there is only one io thread; when there
1010 : * are multiple threads, the io thread processing this job
1011 : * must be notified.
1012 : */
1013 0 : jobp->cancel_io = PR_TRUE;
1014 0 : PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
1015 0 : notify_ioq(tp);
1016 0 : PR_Lock(tp->ioq.lock);
1017 0 : while (jobp->cancel_io)
1018 0 : PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
1019 0 : PR_Unlock(tp->ioq.lock);
1020 0 : PR_ASSERT(!jobp->on_ioq);
1021 0 : if (!JOINABLE_JOB(jobp)) {
1022 0 : delete_job(jobp);
1023 : } else {
1024 0 : JOIN_NOTIFY(jobp);
1025 : }
1026 0 : rval = PR_SUCCESS;
1027 : } else
1028 0 : PR_Unlock(tp->ioq.lock);
1029 : }
1030 0 : if (PR_FAILURE == rval)
1031 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
1032 0 : return rval;
1033 : }
1034 :
1035 : /* join a job, wait until completion */
1036 : PR_IMPLEMENT(PRStatus)
1037 : PR_JoinJob(PRJob *jobp)
1038 : {
1039 0 : if (!JOINABLE_JOB(jobp)) {
1040 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1041 0 : return PR_FAILURE;
1042 : }
1043 0 : PR_Lock(jobp->tpool->join_lock);
1044 0 : while(jobp->join_wait)
1045 0 : PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
1046 0 : PR_Unlock(jobp->tpool->join_lock);
1047 0 : delete_job(jobp);
1048 0 : return PR_SUCCESS;
1049 : }
1050 :
1051 : /* shutdown threadpool */
1052 : PR_IMPLEMENT(PRStatus)
1053 : PR_ShutdownThreadPool(PRThreadPool *tpool)
1054 : {
1055 0 : PRStatus rval = PR_SUCCESS;
1056 :
1057 0 : PR_Lock(tpool->jobq.lock);
1058 0 : tpool->shutdown = PR_TRUE;
1059 0 : PR_NotifyAllCondVar(tpool->shutdown_cv);
1060 0 : PR_Unlock(tpool->jobq.lock);
1061 :
1062 0 : return rval;
1063 : }
1064 :
1065 : /*
1066 : * join thread pool
1067 : * wait for termination of worker threads
1068 : * reclaim threadpool resources
1069 : */
1070 : PR_IMPLEMENT(PRStatus)
1071 : PR_JoinThreadPool(PRThreadPool *tpool)
1072 : {
1073 0 : PRStatus rval = PR_SUCCESS;
1074 : PRCList *head;
1075 : PRStatus rval_status;
1076 :
1077 0 : PR_Lock(tpool->jobq.lock);
1078 0 : while (!tpool->shutdown)
1079 0 : PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
1080 :
1081 : /*
1082 : * wakeup worker threads
1083 : */
1084 : #ifdef OPT_WINNT
1085 : /*
1086 : * post shutdown notification for all threads
1087 : */
1088 : {
1089 : int i;
1090 : for(i=0; i < tpool->current_threads; i++) {
1091 : PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
1092 : TRUE, NULL);
1093 : }
1094 : }
1095 : #else
1096 0 : PR_NotifyAllCondVar(tpool->jobq.cv);
1097 : #endif
1098 :
1099 : /*
1100 : * wakeup io thread(s)
1101 : */
1102 0 : notify_ioq(tpool);
1103 :
1104 : /*
1105 : * wakeup timer thread(s)
1106 : */
1107 0 : PR_Lock(tpool->timerq.lock);
1108 0 : notify_timerq(tpool);
1109 0 : PR_Unlock(tpool->timerq.lock);
1110 :
1111 0 : while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
1112 : wthread *wthrp;
1113 :
1114 0 : head = PR_LIST_HEAD(&tpool->jobq.wthreads);
1115 0 : PR_REMOVE_AND_INIT_LINK(head);
1116 0 : PR_Unlock(tpool->jobq.lock);
1117 0 : wthrp = WTHREAD_LINKS_PTR(head);
1118 0 : rval_status = PR_JoinThread(wthrp->thread);
1119 0 : PR_ASSERT(PR_SUCCESS == rval_status);
1120 0 : PR_DELETE(wthrp);
1121 0 : PR_Lock(tpool->jobq.lock);
1122 : }
1123 0 : PR_Unlock(tpool->jobq.lock);
1124 0 : while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
1125 : wthread *wthrp;
1126 :
1127 0 : head = PR_LIST_HEAD(&tpool->ioq.wthreads);
1128 0 : PR_REMOVE_AND_INIT_LINK(head);
1129 0 : wthrp = WTHREAD_LINKS_PTR(head);
1130 0 : rval_status = PR_JoinThread(wthrp->thread);
1131 0 : PR_ASSERT(PR_SUCCESS == rval_status);
1132 0 : PR_DELETE(wthrp);
1133 : }
1134 :
1135 0 : while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
1136 : wthread *wthrp;
1137 :
1138 0 : head = PR_LIST_HEAD(&tpool->timerq.wthreads);
1139 0 : PR_REMOVE_AND_INIT_LINK(head);
1140 0 : wthrp = WTHREAD_LINKS_PTR(head);
1141 0 : rval_status = PR_JoinThread(wthrp->thread);
1142 0 : PR_ASSERT(PR_SUCCESS == rval_status);
1143 0 : PR_DELETE(wthrp);
1144 : }
1145 :
1146 : /*
1147 : * Delete queued jobs
1148 : */
1149 0 : while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
1150 : PRJob *jobp;
1151 :
1152 0 : head = PR_LIST_HEAD(&tpool->jobq.list);
1153 0 : PR_REMOVE_AND_INIT_LINK(head);
1154 0 : jobp = JOB_LINKS_PTR(head);
1155 0 : tpool->jobq.cnt--;
1156 0 : delete_job(jobp);
1157 : }
1158 :
1159 : /* delete io jobs */
1160 0 : while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
1161 : PRJob *jobp;
1162 :
1163 0 : head = PR_LIST_HEAD(&tpool->ioq.list);
1164 0 : PR_REMOVE_AND_INIT_LINK(head);
1165 0 : tpool->ioq.cnt--;
1166 0 : jobp = JOB_LINKS_PTR(head);
1167 0 : delete_job(jobp);
1168 : }
1169 :
1170 : /* delete timer jobs */
1171 0 : while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
1172 : PRJob *jobp;
1173 :
1174 0 : head = PR_LIST_HEAD(&tpool->timerq.list);
1175 0 : PR_REMOVE_AND_INIT_LINK(head);
1176 0 : tpool->timerq.cnt--;
1177 0 : jobp = JOB_LINKS_PTR(head);
1178 0 : delete_job(jobp);
1179 : }
1180 :
1181 0 : PR_ASSERT(0 == tpool->jobq.cnt);
1182 0 : PR_ASSERT(0 == tpool->ioq.cnt);
1183 0 : PR_ASSERT(0 == tpool->timerq.cnt);
1184 :
1185 0 : delete_threadpool(tpool);
1186 0 : return rval;
1187 : }
|