LCOV - code coverage report
Current view: top level - nsprpub/pr/src/misc - prtpool.c (source / functions) Hit Total Coverage
Test: output.info Lines: 0 510 0.0 %
Date: 2017-07-14 16:53:18 Functions: 0 11 0.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
       2             : /* This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0. If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
       5             : 
       6             : #include "nspr.h"
       7             : 
       8             : /*
       9             :  * Thread pools
      10             :  *      Thread pools create and manage threads to provide support for
      11             :  *      scheduling jobs onto one or more threads.
      12             :  *
      13             :  */
      14             : #ifdef OPT_WINNT
      15             : #include <windows.h>
      16             : #endif
      17             : 
      18             : /*
      19             :  * worker thread
      20             :  */
      21             : typedef struct wthread {
      22             :         PRCList         links;
      23             :         PRThread        *thread;
      24             : } wthread;
      25             : 
      26             : /*
      27             :  * queue of timer jobs
      28             :  */
      29             : typedef struct timer_jobq {
      30             :         PRCList         list;
      31             :         PRLock          *lock;
      32             :         PRCondVar       *cv;
      33             :         PRInt32         cnt;
      34             :         PRCList         wthreads;
      35             : } timer_jobq;
      36             : 
      37             : /*
      38             :  * queue of jobs
      39             :  */
      40             : typedef struct tp_jobq {
      41             :         PRCList         list;
      42             :         PRInt32         cnt;
      43             :         PRLock          *lock;
      44             :         PRCondVar       *cv;
      45             :         PRCList         wthreads;
      46             : #ifdef OPT_WINNT
      47             :         HANDLE          nt_completion_port;
      48             : #endif
      49             : } tp_jobq;
      50             : 
      51             : /*
      52             :  * queue of IO jobs
      53             :  */
      54             : typedef struct io_jobq {
      55             :         PRCList         list;
      56             :         PRPollDesc  *pollfds;
      57             :         PRInt32         npollfds;
      58             :         PRJob           **polljobs;
      59             :         PRLock          *lock;
      60             :         PRInt32         cnt;
      61             :         PRFileDesc      *notify_fd;
      62             :         PRCList         wthreads;
      63             : } io_jobq;
      64             : 
      65             : /*
      66             :  * Threadpool
      67             :  */
      68             : struct PRThreadPool {
      69             :         PRInt32         init_threads;
      70             :         PRInt32         max_threads;
      71             :         PRInt32         current_threads;
      72             :         PRInt32         idle_threads;
      73             :         PRUint32        stacksize;
      74             :         tp_jobq         jobq;
      75             :         io_jobq         ioq;
      76             :         timer_jobq      timerq;
      77             :         PRLock          *join_lock;             /* used with jobp->join_cv */
      78             :         PRCondVar       *shutdown_cv;
      79             :         PRBool          shutdown;
      80             : };
      81             : 
      82             : typedef enum io_op_type
      83             :         { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
      84             : 
      85             : #ifdef OPT_WINNT
      86             : typedef struct NT_notifier {
      87             :         OVERLAPPED overlapped;          /* must be first */
      88             :         PRJob   *jobp;
      89             : } NT_notifier;
      90             : #endif
      91             : 
      92             : struct PRJob {
      93             :         PRCList                 links;          /*      for linking jobs */
      94             :         PRBool                  on_ioq;         /* job on ioq */
      95             :         PRBool                  on_timerq;      /* job on timerq */
      96             :         PRJobFn                 job_func;
      97             :         void                    *job_arg;
      98             :         PRCondVar               *join_cv;
      99             :         PRBool                  join_wait;      /* == PR_TRUE, when waiting to join */
     100             :         PRCondVar               *cancel_cv;     /* for cancelling IO jobs */
     101             :         PRBool                  cancel_io;      /* for cancelling IO jobs */
     102             :         PRThreadPool    *tpool;         /* back pointer to thread pool */
     103             :         PRJobIoDesc             *iod;
     104             :         io_op_type              io_op;
     105             :         PRInt16                 io_poll_flags;
     106             :         PRNetAddr               *netaddr;
     107             :         PRIntervalTime  timeout;        /* relative value */
     108             :         PRIntervalTime  absolute;
     109             : #ifdef OPT_WINNT
     110             :         NT_notifier             nt_notifier;    
     111             : #endif
     112             : };
     113             : 
     114             : #define JOB_LINKS_PTR(_qp) \
     115             :     ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)))
     116             : 
     117             : #define WTHREAD_LINKS_PTR(_qp) \
     118             :     ((wthread *) ((char *) (_qp) - offsetof(wthread, links)))
     119             : 
     120             : #define JOINABLE_JOB(_jobp) (NULL != (_jobp)->join_cv)
     121             : 
     122             : #define JOIN_NOTIFY(_jobp)                                                              \
     123             :                                 PR_BEGIN_MACRO                                                  \
     124             :                                 PR_Lock(_jobp->tpool->join_lock);         \
     125             :                                 _jobp->join_wait = PR_FALSE;                 \
     126             :                                 PR_NotifyCondVar(_jobp->join_cv);            \
     127             :                                 PR_Unlock(_jobp->tpool->join_lock);               \
     128             :                                 PR_END_MACRO
     129             : 
     130             : #define CANCEL_IO_JOB(jobp)                                                             \
     131             :                                 PR_BEGIN_MACRO                                                  \
     132             :                                 jobp->cancel_io = PR_FALSE;                          \
     133             :                                 jobp->on_ioq = PR_FALSE;                             \
     134             :                                 PR_REMOVE_AND_INIT_LINK(&jobp->links);   \
     135             :                                 tp->ioq.cnt--;                                                       \
     136             :                                 PR_NotifyCondVar(jobp->cancel_cv);           \
     137             :                                 PR_END_MACRO
     138             : 
     139             : static void delete_job(PRJob *jobp);
     140             : static PRThreadPool * alloc_threadpool(void);
     141             : static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
     142             : static void notify_ioq(PRThreadPool *tp);
     143             : static void notify_timerq(PRThreadPool *tp);
     144             : 
     145             : /*
     146             :  * locks are acquired in the following order
     147             :  *
     148             :  *      tp->ioq.lock,tp->timerq.lock
     149             :  *                      |
     150             :  *                      V
     151             :  *              tp->jobq->lock            
     152             :  */
     153             : 
     154             : /*
     155             :  * worker thread function
     156             :  */
     157           0 : static void wstart(void *arg)
     158             : {
     159           0 : PRThreadPool *tp = (PRThreadPool *) arg;
     160             : PRCList *head;
     161             : 
     162             :         /*
     163             :          * execute jobs until shutdown
     164             :          */
     165           0 :         while (!tp->shutdown) {
     166             :                 PRJob *jobp;
     167             : #ifdef OPT_WINNT
     168             :                 BOOL rv;
     169             :                 DWORD unused, shutdown;
     170             :                 LPOVERLAPPED olp;
     171             : 
     172             :                 PR_Lock(tp->jobq.lock);
     173             :                 tp->idle_threads++;
     174             :                 PR_Unlock(tp->jobq.lock);
     175             :                 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
     176             :                                         &unused, &shutdown, &olp, INFINITE);
     177             :                 
     178             :                 PR_ASSERT(rv);
     179             :                 if (shutdown)
     180             :                         break;
     181             :                 jobp = ((NT_notifier *) olp)->jobp;
     182             :                 PR_Lock(tp->jobq.lock);
     183             :                 tp->idle_threads--;
     184             :                 tp->jobq.cnt--;
     185             :                 PR_Unlock(tp->jobq.lock);
     186             : #else
     187             : 
     188           0 :                 PR_Lock(tp->jobq.lock);
     189           0 :                 while (PR_CLIST_IS_EMPTY(&tp->jobq.list) && (!tp->shutdown)) {
     190           0 :                         tp->idle_threads++;
     191           0 :                         PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT);
     192           0 :                         tp->idle_threads--;
     193             :                 }       
     194           0 :                 if (tp->shutdown) {
     195           0 :                         PR_Unlock(tp->jobq.lock);
     196           0 :                         break;
     197             :                 }
     198           0 :                 head = PR_LIST_HEAD(&tp->jobq.list);
     199             :                 /*
     200             :                  * remove job from queue
     201             :                  */
     202           0 :                 PR_REMOVE_AND_INIT_LINK(head);
     203           0 :                 tp->jobq.cnt--;
     204           0 :                 jobp = JOB_LINKS_PTR(head);
     205           0 :                 PR_Unlock(tp->jobq.lock);
     206             : #endif
     207             : 
     208           0 :                 jobp->job_func(jobp->job_arg);
     209           0 :                 if (!JOINABLE_JOB(jobp)) {
     210           0 :                         delete_job(jobp);
     211             :                 } else {
     212           0 :                         JOIN_NOTIFY(jobp);
     213             :                 }
     214             :         }
     215           0 :         PR_Lock(tp->jobq.lock);
     216           0 :         tp->current_threads--;
     217           0 :         PR_Unlock(tp->jobq.lock);
     218           0 : }
     219             : 
     220             : /*
     221             :  * add a job to the work queue
     222             :  */
     223             : static void
     224           0 : add_to_jobq(PRThreadPool *tp, PRJob *jobp)
     225             : {
     226             :         /*
     227             :          * add to jobq
     228             :          */
     229             : #ifdef OPT_WINNT
     230             :         PR_Lock(tp->jobq.lock);
     231             :         tp->jobq.cnt++;
     232             :         PR_Unlock(tp->jobq.lock);
     233             :         /*
     234             :          * notify worker thread(s)
     235             :          */
     236             :         PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
     237             :             FALSE, &jobp->nt_notifier.overlapped);
     238             : #else
     239           0 :         PR_Lock(tp->jobq.lock);
     240           0 :         PR_APPEND_LINK(&jobp->links,&tp->jobq.list);
     241           0 :         tp->jobq.cnt++;
     242           0 :         if ((tp->idle_threads < tp->jobq.cnt) &&
     243           0 :                                         (tp->current_threads < tp->max_threads)) {
     244             :                 wthread *wthrp;
     245             :                 /*
     246             :                  * increment thread count and unlock the jobq lock
     247             :                  */
     248           0 :                 tp->current_threads++;
     249           0 :                 PR_Unlock(tp->jobq.lock);
     250             :                 /* create new worker thread */
     251           0 :                 wthrp = PR_NEWZAP(wthread);
     252           0 :                 if (wthrp) {
     253           0 :                         wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
     254             :                                                 tp, PR_PRIORITY_NORMAL,
     255             :                                                 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
     256           0 :                         if (NULL == wthrp->thread) {
     257           0 :                                 PR_DELETE(wthrp);  /* this sets wthrp to NULL */
     258             :                         }
     259             :                 }
     260           0 :                 PR_Lock(tp->jobq.lock);
     261           0 :                 if (NULL == wthrp) {
     262           0 :                         tp->current_threads--;
     263             :                 } else {
     264           0 :                         PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
     265             :                 }
     266             :         }
     267             :         /*
     268             :          * wakeup a worker thread
     269             :          */
     270           0 :         PR_NotifyCondVar(tp->jobq.cv);
     271           0 :         PR_Unlock(tp->jobq.lock);
     272             : #endif
     273           0 : }
     274             : 
     275             : /*
     276             :  * io worker thread function
     277             :  */
     278           0 : static void io_wstart(void *arg)
     279             : {
     280           0 : PRThreadPool *tp = (PRThreadPool *) arg;
     281             : int pollfd_cnt, pollfds_used;
     282             : int rv;
     283             : PRCList *qp, *nextqp;
     284           0 : PRPollDesc *pollfds = NULL;
     285           0 : PRJob **polljobs = NULL;
     286             : int poll_timeout;
     287             : PRIntervalTime now;
     288             : 
     289             :         /*
     290             :          * scan io_jobq
     291             :          * construct poll list
     292             :          * call PR_Poll
     293             :          * for all fds, for which poll returns true, move the job to
     294             :          * jobq and wakeup worker thread.
     295             :          */
     296           0 :         while (!tp->shutdown) {
     297             :                 PRJob *jobp;
     298             : 
     299           0 :                 pollfd_cnt = tp->ioq.cnt + 10;
     300           0 :                 if (pollfd_cnt > tp->ioq.npollfds) {
     301             : 
     302             :                         /*
     303             :                          * re-allocate pollfd array if the current one is not large
     304             :                          * enough
     305             :                          */
     306           0 :                         if (NULL != tp->ioq.pollfds)
     307           0 :                                 PR_Free(tp->ioq.pollfds);
     308           0 :                         tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
     309             :                                                 (sizeof(PRPollDesc) + sizeof(PRJob *)));
     310           0 :                         PR_ASSERT(NULL != tp->ioq.pollfds);
     311             :                         /*
     312             :                          * array of pollfds
     313             :                          */
     314           0 :                         pollfds = tp->ioq.pollfds;
     315           0 :                         tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
     316             :                         /*
     317             :                          * parallel array of jobs
     318             :                          */
     319           0 :                         polljobs = tp->ioq.polljobs;
     320           0 :                         tp->ioq.npollfds = pollfd_cnt;
     321             :                 }
     322             : 
     323           0 :                 pollfds_used = 0;
     324             :                 /*
     325             :                  * add the notify fd; used for unblocking io thread(s)
     326             :                  */
     327           0 :                 pollfds[pollfds_used].fd = tp->ioq.notify_fd;
     328           0 :                 pollfds[pollfds_used].in_flags = PR_POLL_READ;
     329           0 :                 pollfds[pollfds_used].out_flags = 0;
     330           0 :                 polljobs[pollfds_used] = NULL;
     331           0 :                 pollfds_used++;
     332             :                 /*
     333             :                  * fill in the pollfd array
     334             :                  */
     335           0 :                 PR_Lock(tp->ioq.lock);
     336           0 :                 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
     337           0 :                         nextqp = qp->next;
     338           0 :                         jobp = JOB_LINKS_PTR(qp);
     339           0 :                         if (jobp->cancel_io) {
     340           0 :                                 CANCEL_IO_JOB(jobp);
     341           0 :                                 continue;
     342             :                         }
     343           0 :                         if (pollfds_used == (pollfd_cnt))
     344           0 :                                 break;
     345           0 :                         pollfds[pollfds_used].fd = jobp->iod->socket;
     346           0 :                         pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
     347           0 :                         pollfds[pollfds_used].out_flags = 0;
     348           0 :                         polljobs[pollfds_used] = jobp;
     349             : 
     350           0 :                         pollfds_used++;
     351             :                 }
     352           0 :                 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)) {
     353           0 :                         qp = tp->ioq.list.next;
     354           0 :                         jobp = JOB_LINKS_PTR(qp);
     355           0 :                         if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
     356           0 :                                 poll_timeout = PR_INTERVAL_NO_TIMEOUT;
     357           0 :                         else if (PR_INTERVAL_NO_WAIT == jobp->timeout)
     358           0 :                                 poll_timeout = PR_INTERVAL_NO_WAIT;
     359             :                         else {
     360           0 :                                 poll_timeout = jobp->absolute - PR_IntervalNow();
     361           0 :                                 if (poll_timeout <= 0) /* already timed out */
     362           0 :                                         poll_timeout = PR_INTERVAL_NO_WAIT;
     363             :                         }
     364             :                 } else {
     365           0 :                         poll_timeout = PR_INTERVAL_NO_TIMEOUT;
     366             :                 }
     367           0 :                 PR_Unlock(tp->ioq.lock);
     368             : 
     369             :                 /*
     370             :                  * XXXX
     371             :                  * should retry if more jobs have been added to the queue?
     372             :                  *
     373             :                  */
     374           0 :                 PR_ASSERT(pollfds_used <= pollfd_cnt);
     375           0 :                 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
     376             : 
     377           0 :                 if (tp->shutdown) {
     378           0 :                         break;
     379             :                 }
     380             : 
     381           0 :                 if (rv > 0) {
     382             :                         /*
     383             :                          * at least one io event is set
     384             :                          */
     385             :                         PRStatus rval_status;
     386             :                         PRInt32 index;
     387             : 
     388           0 :                         PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd);
     389             :                         /*
     390             :                          * reset the pollable event, if notified
     391             :                          */
     392           0 :                         if (pollfds[0].out_flags & PR_POLL_READ) {
     393           0 :                                 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
     394           0 :                                 PR_ASSERT(PR_SUCCESS == rval_status);
     395             :                         }
     396             : 
     397           0 :                         for(index = 1; index < (pollfds_used); index++) {
     398           0 :                 PRInt16 events = pollfds[index].in_flags;
     399           0 :                 PRInt16 revents = pollfds[index].out_flags;     
     400           0 :                                 jobp = polljobs[index]; 
     401             : 
     402           0 :                 if ((revents & PR_POLL_NVAL) ||  /* busted in all cases */
     403           0 :                         (revents & PR_POLL_ERR) ||
     404           0 :                                         ((events & PR_POLL_WRITE) &&
     405           0 :                                                         (revents & PR_POLL_HUP))) { /* write op & hup */
     406           0 :                                         PR_Lock(tp->ioq.lock);
     407           0 :                                         if (jobp->cancel_io) {
     408           0 :                                                 CANCEL_IO_JOB(jobp);
     409           0 :                                                 PR_Unlock(tp->ioq.lock);
     410           0 :                                                 continue;
     411             :                                         }
     412           0 :                                         PR_REMOVE_AND_INIT_LINK(&jobp->links);
     413           0 :                                         tp->ioq.cnt--;
     414           0 :                                         jobp->on_ioq = PR_FALSE;
     415           0 :                                         PR_Unlock(tp->ioq.lock);
     416             : 
     417             :                                         /* set error */
     418           0 :                     if (PR_POLL_NVAL & revents)
     419           0 :                                                 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR;
     420           0 :                     else if (PR_POLL_HUP & revents)
     421           0 :                                                 jobp->iod->error = PR_CONNECT_RESET_ERROR;
     422             :                     else 
     423           0 :                                                 jobp->iod->error = PR_IO_ERROR;
     424             : 
     425             :                                         /*
     426             :                                          * add to jobq
     427             :                                          */
     428           0 :                                         add_to_jobq(tp, jobp);
     429           0 :                                 } else if (revents) {
     430             :                                         /*
     431             :                                          * add to jobq
     432             :                                          */
     433           0 :                                         PR_Lock(tp->ioq.lock);
     434           0 :                                         if (jobp->cancel_io) {
     435           0 :                                                 CANCEL_IO_JOB(jobp);
     436           0 :                                                 PR_Unlock(tp->ioq.lock);
     437           0 :                                                 continue;
     438             :                                         }
     439           0 :                                         PR_REMOVE_AND_INIT_LINK(&jobp->links);
     440           0 :                                         tp->ioq.cnt--;
     441           0 :                                         jobp->on_ioq = PR_FALSE;
     442           0 :                                         PR_Unlock(tp->ioq.lock);
     443             : 
     444           0 :                                         if (jobp->io_op == JOB_IO_CONNECT) {
     445           0 :                                                 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS)
     446           0 :                                                         jobp->iod->error = 0;
     447             :                                                 else
     448           0 :                                                         jobp->iod->error = PR_GetError();
     449             :                                         } else
     450           0 :                                                 jobp->iod->error = 0;
     451             : 
     452           0 :                                         add_to_jobq(tp, jobp);
     453             :                                 }
     454             :                         }
     455             :                 }
     456             :                 /*
     457             :                  * timeout processing
     458             :                  */
     459           0 :                 now = PR_IntervalNow();
     460           0 :                 PR_Lock(tp->ioq.lock);
     461           0 :                 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
     462           0 :                         nextqp = qp->next;
     463           0 :                         jobp = JOB_LINKS_PTR(qp);
     464           0 :                         if (jobp->cancel_io) {
     465           0 :                                 CANCEL_IO_JOB(jobp);
     466           0 :                                 continue;
     467             :                         }
     468           0 :                         if (PR_INTERVAL_NO_TIMEOUT == jobp->timeout)
     469           0 :                                 break;
     470           0 :                         if ((PR_INTERVAL_NO_WAIT != jobp->timeout) &&
     471           0 :                                                                 ((PRInt32)(jobp->absolute - now) > 0))
     472           0 :                                 break;
     473           0 :                         PR_REMOVE_AND_INIT_LINK(&jobp->links);
     474           0 :                         tp->ioq.cnt--;
     475           0 :                         jobp->on_ioq = PR_FALSE;
     476           0 :                         jobp->iod->error = PR_IO_TIMEOUT_ERROR;
     477           0 :                         add_to_jobq(tp, jobp);
     478             :                 }
     479           0 :                 PR_Unlock(tp->ioq.lock);
     480             :         }
     481           0 : }
     482             : 
     483             : /*
     484             :  * timer worker thread function
     485             :  */
     486           0 : static void timer_wstart(void *arg)
     487             : {
     488           0 : PRThreadPool *tp = (PRThreadPool *) arg;
     489             : PRCList *qp;
     490             : PRIntervalTime timeout;
     491             : PRIntervalTime now;
     492             : 
     493             :         /*
     494             :          * call PR_WaitCondVar with minimum value of all timeouts
     495             :          */
     496           0 :         while (!tp->shutdown) {
     497             :                 PRJob *jobp;
     498             : 
     499           0 :                 PR_Lock(tp->timerq.lock);
     500           0 :                 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
     501           0 :                         timeout = PR_INTERVAL_NO_TIMEOUT;
     502             :                 } else {
     503             :                         PRCList *qp;
     504             : 
     505           0 :                         qp = tp->timerq.list.next;
     506           0 :                         jobp = JOB_LINKS_PTR(qp);
     507             : 
     508           0 :                         timeout = jobp->absolute - PR_IntervalNow();
     509           0 :             if (timeout <= 0)
     510           0 :                                 timeout = PR_INTERVAL_NO_WAIT;  /* already timed out */
     511             :                 }
     512           0 :                 if (PR_INTERVAL_NO_WAIT != timeout)
     513           0 :                         PR_WaitCondVar(tp->timerq.cv, timeout);
     514           0 :                 if (tp->shutdown) {
     515           0 :                         PR_Unlock(tp->timerq.lock);
     516           0 :                         break;
     517             :                 }
     518             :                 /*
     519             :                  * move expired-timer jobs to jobq
     520             :                  */
     521           0 :                 now = PR_IntervalNow(); 
     522           0 :                 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)) {
     523           0 :                         qp = tp->timerq.list.next;
     524           0 :                         jobp = JOB_LINKS_PTR(qp);
     525             : 
     526           0 :                         if ((PRInt32)(jobp->absolute - now) > 0) {
     527           0 :                                 break;
     528             :                         }
     529             :                         /*
     530             :                          * job timed out
     531             :                          */
     532           0 :                         PR_REMOVE_AND_INIT_LINK(&jobp->links);
     533           0 :                         tp->timerq.cnt--;
     534           0 :                         jobp->on_timerq = PR_FALSE;
     535           0 :                         add_to_jobq(tp, jobp);
     536             :                 }
     537           0 :                 PR_Unlock(tp->timerq.lock);
     538             :         }
     539           0 : }
     540             : 
     541             : static void
     542           0 : delete_threadpool(PRThreadPool *tp)
     543             : {
     544           0 :         if (NULL != tp) {
     545           0 :                 if (NULL != tp->shutdown_cv)
     546           0 :                         PR_DestroyCondVar(tp->shutdown_cv);
     547           0 :                 if (NULL != tp->jobq.cv)
     548           0 :                         PR_DestroyCondVar(tp->jobq.cv);
     549           0 :                 if (NULL != tp->jobq.lock)
     550           0 :                         PR_DestroyLock(tp->jobq.lock);
     551           0 :                 if (NULL != tp->join_lock)
     552           0 :                         PR_DestroyLock(tp->join_lock);
     553             : #ifdef OPT_WINNT
     554             :                 if (NULL != tp->jobq.nt_completion_port)
     555             :                         CloseHandle(tp->jobq.nt_completion_port);
     556             : #endif
     557             :                 /* Timer queue */
     558           0 :                 if (NULL != tp->timerq.cv)
     559           0 :                         PR_DestroyCondVar(tp->timerq.cv);
     560           0 :                 if (NULL != tp->timerq.lock)
     561           0 :                         PR_DestroyLock(tp->timerq.lock);
     562             : 
     563           0 :                 if (NULL != tp->ioq.lock)
     564           0 :                         PR_DestroyLock(tp->ioq.lock);
     565           0 :                 if (NULL != tp->ioq.pollfds)
     566           0 :                         PR_Free(tp->ioq.pollfds);
     567           0 :                 if (NULL != tp->ioq.notify_fd)
     568           0 :                         PR_DestroyPollableEvent(tp->ioq.notify_fd);
     569           0 :                 PR_Free(tp);
     570             :         }
     571           0 :         return;
     572             : }
     573             : 
     574             : static PRThreadPool *
     575           0 : alloc_threadpool(void)
     576             : {
     577             : PRThreadPool *tp;
     578             : 
     579           0 :         tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp));
     580           0 :         if (NULL == tp)
     581           0 :                 goto failed;
     582           0 :         tp->jobq.lock = PR_NewLock();
     583           0 :         if (NULL == tp->jobq.lock)
     584           0 :                 goto failed;
     585           0 :         tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
     586           0 :         if (NULL == tp->jobq.cv)
     587           0 :                 goto failed;
     588           0 :         tp->join_lock = PR_NewLock();
     589           0 :         if (NULL == tp->join_lock)
     590           0 :                 goto failed;
     591             : #ifdef OPT_WINNT
     592             :         tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
     593             :                                                                         NULL, 0, 0);
     594             :         if (NULL == tp->jobq.nt_completion_port)
     595             :                 goto failed;
     596             : #endif
     597             : 
     598           0 :         tp->ioq.lock = PR_NewLock();
     599           0 :         if (NULL == tp->ioq.lock)
     600           0 :                 goto failed;
     601             : 
     602             :         /* Timer queue */
     603             : 
     604           0 :         tp->timerq.lock = PR_NewLock();
     605           0 :         if (NULL == tp->timerq.lock)
     606           0 :                 goto failed;
     607           0 :         tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
     608           0 :         if (NULL == tp->timerq.cv)
     609           0 :                 goto failed;
     610             : 
     611           0 :         tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
     612           0 :         if (NULL == tp->shutdown_cv)
     613           0 :                 goto failed;
     614           0 :         tp->ioq.notify_fd = PR_NewPollableEvent();
     615           0 :         if (NULL == tp->ioq.notify_fd)
     616           0 :                 goto failed;
     617           0 :         return tp;
     618             : failed:
     619           0 :         delete_threadpool(tp);
     620           0 :         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
     621           0 :         return NULL;
     622             : }
     623             : 
     624             : /* Create thread pool */
     625             : PR_IMPLEMENT(PRThreadPool *)
     626             : PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
     627             :                                 PRUint32 stacksize)
     628             : {
     629             : PRThreadPool *tp;
     630             : PRThread *thr;
     631             : int i;
     632             : wthread *wthrp;
     633             : 
     634           0 :         tp = alloc_threadpool();
     635           0 :         if (NULL == tp)
     636           0 :                 return NULL;
     637             : 
     638           0 :         tp->init_threads = initial_threads;
     639           0 :         tp->max_threads = max_threads;
     640           0 :         tp->stacksize = stacksize;
     641           0 :         PR_INIT_CLIST(&tp->jobq.list);
     642           0 :         PR_INIT_CLIST(&tp->ioq.list);
     643           0 :         PR_INIT_CLIST(&tp->timerq.list);
     644           0 :         PR_INIT_CLIST(&tp->jobq.wthreads);
     645           0 :         PR_INIT_CLIST(&tp->ioq.wthreads);
     646           0 :         PR_INIT_CLIST(&tp->timerq.wthreads);
     647           0 :         tp->shutdown = PR_FALSE;
     648             : 
     649           0 :         PR_Lock(tp->jobq.lock);
     650           0 :         for(i=0; i < initial_threads; ++i) {
     651             : 
     652           0 :                 thr = PR_CreateThread(PR_USER_THREAD, wstart,
     653             :                                                 tp, PR_PRIORITY_NORMAL,
     654             :                                                 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
     655           0 :                 PR_ASSERT(thr);
     656           0 :                 wthrp = PR_NEWZAP(wthread);
     657           0 :                 PR_ASSERT(wthrp);
     658           0 :                 wthrp->thread = thr;
     659           0 :                 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads);
     660             :         }
     661           0 :         tp->current_threads = initial_threads;
     662             : 
     663           0 :         thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
     664             :                                         tp, PR_PRIORITY_NORMAL,
     665             :                                         PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
     666           0 :         PR_ASSERT(thr);
     667           0 :         wthrp = PR_NEWZAP(wthread);
     668           0 :         PR_ASSERT(wthrp);
     669           0 :         wthrp->thread = thr;
     670           0 :         PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads);
     671             : 
     672           0 :         thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
     673             :                                         tp, PR_PRIORITY_NORMAL,
     674             :                                         PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
     675           0 :         PR_ASSERT(thr);
     676           0 :         wthrp = PR_NEWZAP(wthread);
     677           0 :         PR_ASSERT(wthrp);
     678           0 :         wthrp->thread = thr;
     679           0 :         PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads);
     680             : 
     681           0 :         PR_Unlock(tp->jobq.lock);
     682           0 :         return tp;
     683             : }
     684             : 
     685             : static void
     686           0 : delete_job(PRJob *jobp)
     687             : {
     688           0 :         if (NULL != jobp) {
     689           0 :                 if (NULL != jobp->join_cv) {
     690           0 :                         PR_DestroyCondVar(jobp->join_cv);
     691           0 :                         jobp->join_cv = NULL;
     692             :                 }
     693           0 :                 if (NULL != jobp->cancel_cv) {
     694           0 :                         PR_DestroyCondVar(jobp->cancel_cv);
     695           0 :                         jobp->cancel_cv = NULL;
     696             :                 }
     697           0 :                 PR_DELETE(jobp);
     698             :         }
     699           0 : }
     700             : 
     701             : static PRJob *
     702           0 : alloc_job(PRBool joinable, PRThreadPool *tp)
     703             : {
     704             :         PRJob *jobp;
     705             : 
     706           0 :         jobp = PR_NEWZAP(PRJob);
     707           0 :         if (NULL == jobp) 
     708           0 :                 goto failed;
     709           0 :         if (joinable) {
     710           0 :                 jobp->join_cv = PR_NewCondVar(tp->join_lock);
     711           0 :                 jobp->join_wait = PR_TRUE;
     712           0 :                 if (NULL == jobp->join_cv)
     713           0 :                         goto failed;
     714             :         } else {
     715           0 :                 jobp->join_cv = NULL;
     716             :         }
     717             : #ifdef OPT_WINNT
     718             :         jobp->nt_notifier.jobp = jobp;
     719             : #endif
     720           0 :         return jobp;
     721             : failed:
     722           0 :         delete_job(jobp);
     723           0 :         PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
     724           0 :         return NULL;
     725             : }
     726             : 
     727             : /* queue a job */
     728             : PR_IMPLEMENT(PRJob *)
     729             : PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
     730             : {
     731             :         PRJob *jobp;
     732             : 
     733           0 :         jobp = alloc_job(joinable, tpool);
     734           0 :         if (NULL == jobp)
     735           0 :                 return NULL;
     736             : 
     737           0 :         jobp->job_func = fn;
     738           0 :         jobp->job_arg = arg;
     739           0 :         jobp->tpool = tpool;
     740             : 
     741           0 :         add_to_jobq(tpool, jobp);
     742           0 :         return jobp;
     743             : }
     744             : 
     745             : /* queue a job, when a socket is readable or writeable */
     746             : static PRJob *
     747           0 : queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
     748             :                                 PRBool joinable, io_op_type op)
     749             : {
     750             :         PRJob *jobp;
     751             :         PRIntervalTime now;
     752             : 
     753           0 :         jobp = alloc_job(joinable, tpool);
     754           0 :         if (NULL == jobp) {
     755           0 :                 return NULL;
     756             :         }
     757             : 
     758             :         /*
     759             :          * Add a new job to io_jobq
     760             :          * wakeup io worker thread
     761             :          */
     762             : 
     763           0 :         jobp->job_func = fn;
     764           0 :         jobp->job_arg = arg;
     765           0 :         jobp->tpool = tpool;
     766           0 :         jobp->iod = iod;
     767           0 :         if (JOB_IO_READ == op) {
     768           0 :                 jobp->io_op = JOB_IO_READ;
     769           0 :                 jobp->io_poll_flags = PR_POLL_READ;
     770           0 :         } else if (JOB_IO_WRITE == op) {
     771           0 :                 jobp->io_op = JOB_IO_WRITE;
     772           0 :                 jobp->io_poll_flags = PR_POLL_WRITE;
     773           0 :         } else if (JOB_IO_ACCEPT == op) {
     774           0 :                 jobp->io_op = JOB_IO_ACCEPT;
     775           0 :                 jobp->io_poll_flags = PR_POLL_READ;
     776           0 :         } else if (JOB_IO_CONNECT == op) {
     777           0 :                 jobp->io_op = JOB_IO_CONNECT;
     778           0 :                 jobp->io_poll_flags = PR_POLL_WRITE|PR_POLL_EXCEPT;
     779             :         } else {
     780           0 :                 delete_job(jobp);
     781           0 :                 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
     782           0 :                 return NULL;
     783             :         }
     784             : 
     785           0 :         jobp->timeout = iod->timeout;
     786           0 :         if ((PR_INTERVAL_NO_TIMEOUT == iod->timeout) ||
     787           0 :                         (PR_INTERVAL_NO_WAIT == iod->timeout)) {
     788           0 :                 jobp->absolute = iod->timeout;
     789             :         } else {
     790           0 :                 now = PR_IntervalNow();
     791           0 :                 jobp->absolute = now + iod->timeout;
     792             :         }
     793             : 
     794             : 
     795           0 :         PR_Lock(tpool->ioq.lock);
     796             : 
     797           0 :         if (PR_CLIST_IS_EMPTY(&tpool->ioq.list) ||
     798           0 :                         (PR_INTERVAL_NO_TIMEOUT == iod->timeout)) {
     799           0 :                 PR_APPEND_LINK(&jobp->links,&tpool->ioq.list);
     800           0 :         } else if (PR_INTERVAL_NO_WAIT == iod->timeout) {
     801           0 :                 PR_INSERT_LINK(&jobp->links,&tpool->ioq.list);
     802             :         } else {
     803             :                 PRCList *qp;
     804             :                 PRJob *tmp_jobp;
     805             :                 /*
     806             :                  * insert into the timeout-sorted ioq
     807             :                  */
     808           0 :                 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
     809           0 :                                                         qp = qp->prev) {
     810           0 :                         tmp_jobp = JOB_LINKS_PTR(qp);
     811           0 :                         if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
     812           0 :                                 break;
     813             :                         }
     814             :                 }
     815           0 :                 PR_INSERT_AFTER(&jobp->links,qp);
     816             :         }
     817             : 
     818           0 :         jobp->on_ioq = PR_TRUE;
     819           0 :         tpool->ioq.cnt++;
     820             :         /*
     821             :          * notify io worker thread(s)
     822             :          */
     823           0 :         PR_Unlock(tpool->ioq.lock);
     824           0 :         notify_ioq(tpool);
     825           0 :         return jobp;
     826             : }
     827             : 
     828             : /* queue a job, when a socket is readable */
     829             : PR_IMPLEMENT(PRJob *)
     830             : PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
     831             :                                                                                         PRBool joinable)
     832             : {
     833           0 :         return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
     834             : }
     835             : 
     836             : /* queue a job, when a socket is writeable */
     837             : PR_IMPLEMENT(PRJob *)
     838             : PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
     839             :                                                                                 PRBool joinable)
     840             : {
     841           0 :         return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
     842             : }
     843             : 
     844             : 
     845             : /* queue a job, when a socket has a pending connection */
     846             : PR_IMPLEMENT(PRJob *)
     847             : PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
     848             :                                                                 void * arg, PRBool joinable)
     849             : {
     850           0 :         return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
     851             : }
     852             : 
     853             : /* queue a job, when a socket can be connected */
     854             : PR_IMPLEMENT(PRJob *)
     855             : PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
     856             :                         const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
     857             : {
     858             :         PRStatus rv;
     859             :         PRErrorCode err;
     860             : 
     861           0 :         rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT);
     862           0 :         if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR)){
     863             :                 /* connection pending */
     864           0 :                 return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
     865             :         }
     866             :     /*
     867             :      * connection succeeded or failed; add to jobq right away
     868             :      */
     869           0 :     if (rv == PR_FAILURE)
     870           0 :       iod->error = err;
     871             :     else
     872           0 :       iod->error = 0;
     873           0 :     return(PR_QueueJob(tpool, fn, arg, joinable));
     874             : 
     875             : }
     876             : 
     877             : /* queue a job, when a timer expires */
     878             : PR_IMPLEMENT(PRJob *)
     879             : PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
     880             :                                                         PRJobFn fn, void * arg, PRBool joinable)
     881             : {
     882             :         PRIntervalTime now;
     883             :         PRJob *jobp;
     884             : 
     885           0 :         if (PR_INTERVAL_NO_TIMEOUT == timeout) {
     886           0 :                 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
     887           0 :                 return NULL;
     888             :         }
     889           0 :         if (PR_INTERVAL_NO_WAIT == timeout) {
     890             :                 /*
     891             :                  * no waiting; add to jobq right away
     892             :                  */
     893           0 :                 return(PR_QueueJob(tpool, fn, arg, joinable));
     894             :         }
     895           0 :         jobp = alloc_job(joinable, tpool);
     896           0 :         if (NULL == jobp) {
     897           0 :                 return NULL;
     898             :         }
     899             : 
     900             :         /*
     901             :          * Add a new job to timer_jobq
     902             :          * wakeup timer worker thread
     903             :          */
     904             : 
     905           0 :         jobp->job_func = fn;
     906           0 :         jobp->job_arg = arg;
     907           0 :         jobp->tpool = tpool;
     908           0 :         jobp->timeout = timeout;
     909             : 
     910           0 :         now = PR_IntervalNow();
     911           0 :         jobp->absolute = now + timeout;
     912             : 
     913             : 
     914           0 :         PR_Lock(tpool->timerq.lock);
     915           0 :         jobp->on_timerq = PR_TRUE;
     916           0 :         if (PR_CLIST_IS_EMPTY(&tpool->timerq.list))
     917           0 :                 PR_APPEND_LINK(&jobp->links,&tpool->timerq.list);
     918             :         else {
     919             :                 PRCList *qp;
     920             :                 PRJob *tmp_jobp;
     921             :                 /*
     922             :                  * insert into the sorted timer jobq
     923             :                  */
     924           0 :                 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
     925           0 :                                                         qp = qp->prev) {
     926           0 :                         tmp_jobp = JOB_LINKS_PTR(qp);
     927           0 :                         if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
     928           0 :                                 break;
     929             :                         }
     930             :                 }
     931           0 :                 PR_INSERT_AFTER(&jobp->links,qp);
     932             :         }
     933           0 :         tpool->timerq.cnt++;
     934             :         /*
     935             :          * notify timer worker thread(s)
     936             :          */
     937           0 :         notify_timerq(tpool);
     938           0 :         PR_Unlock(tpool->timerq.lock);
     939           0 :         return jobp;
     940             : }
     941             : 
     942             : static void
     943           0 : notify_timerq(PRThreadPool *tp)
     944             : {
     945             :         /*
     946             :          * wakeup the timer thread(s)
     947             :          */
     948           0 :         PR_NotifyCondVar(tp->timerq.cv);
     949           0 : }
     950             : 
     951             : static void
     952           0 : notify_ioq(PRThreadPool *tp)
     953             : {
     954             : PRStatus rval_status;
     955             : 
     956             :         /*
     957             :          * wakeup the io thread(s)
     958             :          */
     959           0 :         rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
     960           0 :         PR_ASSERT(PR_SUCCESS == rval_status);
     961           0 : }
     962             : 
     963             : /*
     964             :  * cancel a job
     965             :  *
     966             :  *      XXXX: is this needed? likely to be removed
     967             :  */
     968             : PR_IMPLEMENT(PRStatus)
     969             : PR_CancelJob(PRJob *jobp) {
     970             : 
     971           0 :         PRStatus rval = PR_FAILURE;
     972             :         PRThreadPool *tp;
     973             : 
     974           0 :         if (jobp->on_timerq) {
     975             :                 /*
     976             :                  * now, check again while holding the timerq lock
     977             :                  */
     978           0 :                 tp = jobp->tpool;
     979           0 :                 PR_Lock(tp->timerq.lock);
     980           0 :                 if (jobp->on_timerq) {
     981           0 :                         jobp->on_timerq = PR_FALSE;
     982           0 :                         PR_REMOVE_AND_INIT_LINK(&jobp->links);
     983           0 :                         tp->timerq.cnt--;
     984           0 :                         PR_Unlock(tp->timerq.lock);
     985           0 :                         if (!JOINABLE_JOB(jobp)) {
     986           0 :                                 delete_job(jobp);
     987             :                         } else {
     988           0 :                                 JOIN_NOTIFY(jobp);
     989             :                         }
     990           0 :                         rval = PR_SUCCESS;
     991             :                 } else
     992           0 :                         PR_Unlock(tp->timerq.lock);
     993           0 :         } else if (jobp->on_ioq) {
     994             :                 /*
     995             :                  * now, check again while holding the ioq lock
     996             :                  */
     997           0 :                 tp = jobp->tpool;
     998           0 :                 PR_Lock(tp->ioq.lock);
     999           0 :                 if (jobp->on_ioq) {
    1000           0 :                         jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
    1001           0 :                         if (NULL == jobp->cancel_cv) {
    1002           0 :                                 PR_Unlock(tp->ioq.lock);
    1003           0 :                                 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, 0);
    1004           0 :                                 return PR_FAILURE;
    1005             :                         }
    1006             :                         /*
    1007             :                          * mark job 'cancelled' and notify io thread(s)
    1008             :                          * XXXX:
    1009             :                          *              this assumes there is only one io thread; when there
    1010             :                          *              are multiple threads, the io thread processing this job
    1011             :                          *              must be notified.
    1012             :                          */
    1013           0 :                         jobp->cancel_io = PR_TRUE;
    1014           0 :                         PR_Unlock(tp->ioq.lock);     /* release, reacquire ioq lock */
    1015           0 :                         notify_ioq(tp);
    1016           0 :                         PR_Lock(tp->ioq.lock);
    1017           0 :                         while (jobp->cancel_io)
    1018           0 :                                 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT);
    1019           0 :                         PR_Unlock(tp->ioq.lock);
    1020           0 :                         PR_ASSERT(!jobp->on_ioq);
    1021           0 :                         if (!JOINABLE_JOB(jobp)) {
    1022           0 :                                 delete_job(jobp);
    1023             :                         } else {
    1024           0 :                                 JOIN_NOTIFY(jobp);
    1025             :                         }
    1026           0 :                         rval = PR_SUCCESS;
    1027             :                 } else
    1028           0 :                         PR_Unlock(tp->ioq.lock);
    1029             :         }
    1030           0 :         if (PR_FAILURE == rval)
    1031           0 :                 PR_SetError(PR_INVALID_STATE_ERROR, 0);
    1032           0 :         return rval;
    1033             : }
    1034             : 
    1035             : /* join a job, wait until completion */
    1036             : PR_IMPLEMENT(PRStatus)
    1037             : PR_JoinJob(PRJob *jobp)
    1038             : {
    1039           0 :         if (!JOINABLE_JOB(jobp)) {
    1040           0 :                 PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
    1041           0 :                 return PR_FAILURE;
    1042             :         }
    1043           0 :         PR_Lock(jobp->tpool->join_lock);
    1044           0 :         while(jobp->join_wait)
    1045           0 :                 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT);
    1046           0 :         PR_Unlock(jobp->tpool->join_lock);
    1047           0 :         delete_job(jobp);
    1048           0 :         return PR_SUCCESS;
    1049             : }
    1050             : 
    1051             : /* shutdown threadpool */
    1052             : PR_IMPLEMENT(PRStatus)
    1053             : PR_ShutdownThreadPool(PRThreadPool *tpool)
    1054             : {
    1055           0 : PRStatus rval = PR_SUCCESS;
    1056             : 
    1057           0 :         PR_Lock(tpool->jobq.lock);
    1058           0 :         tpool->shutdown = PR_TRUE;
    1059           0 :         PR_NotifyAllCondVar(tpool->shutdown_cv);
    1060           0 :         PR_Unlock(tpool->jobq.lock);
    1061             : 
    1062           0 :         return rval;
    1063             : }
    1064             : 
    1065             : /*
    1066             :  * join thread pool
    1067             :  *      wait for termination of worker threads
    1068             :  *      reclaim threadpool resources
    1069             :  */
    1070             : PR_IMPLEMENT(PRStatus)
    1071             : PR_JoinThreadPool(PRThreadPool *tpool)
    1072             : {
    1073           0 : PRStatus rval = PR_SUCCESS;
    1074             : PRCList *head;
    1075             : PRStatus rval_status;
    1076             : 
    1077           0 :         PR_Lock(tpool->jobq.lock);
    1078           0 :         while (!tpool->shutdown)
    1079           0 :                 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT);
    1080             : 
    1081             :         /*
    1082             :          * wakeup worker threads
    1083             :          */
    1084             : #ifdef OPT_WINNT
    1085             :         /*
    1086             :          * post shutdown notification for all threads
    1087             :          */
    1088             :         {
    1089             :                 int i;
    1090             :                 for(i=0; i < tpool->current_threads; i++) {
    1091             :                         PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
    1092             :                                                                                                 TRUE, NULL);
    1093             :                 }
    1094             :         }
    1095             : #else
    1096           0 :         PR_NotifyAllCondVar(tpool->jobq.cv);
    1097             : #endif
    1098             : 
    1099             :         /*
    1100             :          * wakeup io thread(s)
    1101             :          */
    1102           0 :         notify_ioq(tpool);
    1103             : 
    1104             :         /*
    1105             :          * wakeup timer thread(s)
    1106             :          */
    1107           0 :         PR_Lock(tpool->timerq.lock);
    1108           0 :         notify_timerq(tpool);
    1109           0 :         PR_Unlock(tpool->timerq.lock);
    1110             : 
    1111           0 :         while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)) {
    1112             :                 wthread *wthrp;
    1113             : 
    1114           0 :                 head = PR_LIST_HEAD(&tpool->jobq.wthreads);
    1115           0 :                 PR_REMOVE_AND_INIT_LINK(head);
    1116           0 :                 PR_Unlock(tpool->jobq.lock);
    1117           0 :                 wthrp = WTHREAD_LINKS_PTR(head);
    1118           0 :                 rval_status = PR_JoinThread(wthrp->thread);
    1119           0 :                 PR_ASSERT(PR_SUCCESS == rval_status);
    1120           0 :                 PR_DELETE(wthrp);
    1121           0 :                 PR_Lock(tpool->jobq.lock);
    1122             :         }
    1123           0 :         PR_Unlock(tpool->jobq.lock);
    1124           0 :         while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)) {
    1125             :                 wthread *wthrp;
    1126             : 
    1127           0 :                 head = PR_LIST_HEAD(&tpool->ioq.wthreads);
    1128           0 :                 PR_REMOVE_AND_INIT_LINK(head);
    1129           0 :                 wthrp = WTHREAD_LINKS_PTR(head);
    1130           0 :                 rval_status = PR_JoinThread(wthrp->thread);
    1131           0 :                 PR_ASSERT(PR_SUCCESS == rval_status);
    1132           0 :                 PR_DELETE(wthrp);
    1133             :         }
    1134             : 
    1135           0 :         while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)) {
    1136             :                 wthread *wthrp;
    1137             : 
    1138           0 :                 head = PR_LIST_HEAD(&tpool->timerq.wthreads);
    1139           0 :                 PR_REMOVE_AND_INIT_LINK(head);
    1140           0 :                 wthrp = WTHREAD_LINKS_PTR(head);
    1141           0 :                 rval_status = PR_JoinThread(wthrp->thread);
    1142           0 :                 PR_ASSERT(PR_SUCCESS == rval_status);
    1143           0 :                 PR_DELETE(wthrp);
    1144             :         }
    1145             : 
    1146             :         /*
    1147             :          * Delete queued jobs
    1148             :          */
    1149           0 :         while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)) {
    1150             :                 PRJob *jobp;
    1151             : 
    1152           0 :                 head = PR_LIST_HEAD(&tpool->jobq.list);
    1153           0 :                 PR_REMOVE_AND_INIT_LINK(head);
    1154           0 :                 jobp = JOB_LINKS_PTR(head);
    1155           0 :                 tpool->jobq.cnt--;
    1156           0 :                 delete_job(jobp);
    1157             :         }
    1158             : 
    1159             :         /* delete io jobs */
    1160           0 :         while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)) {
    1161             :                 PRJob *jobp;
    1162             : 
    1163           0 :                 head = PR_LIST_HEAD(&tpool->ioq.list);
    1164           0 :                 PR_REMOVE_AND_INIT_LINK(head);
    1165           0 :                 tpool->ioq.cnt--;
    1166           0 :                 jobp = JOB_LINKS_PTR(head);
    1167           0 :                 delete_job(jobp);
    1168             :         }
    1169             : 
    1170             :         /* delete timer jobs */
    1171           0 :         while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)) {
    1172             :                 PRJob *jobp;
    1173             : 
    1174           0 :                 head = PR_LIST_HEAD(&tpool->timerq.list);
    1175           0 :                 PR_REMOVE_AND_INIT_LINK(head);
    1176           0 :                 tpool->timerq.cnt--;
    1177           0 :                 jobp = JOB_LINKS_PTR(head);
    1178           0 :                 delete_job(jobp);
    1179             :         }
    1180             : 
    1181           0 :         PR_ASSERT(0 == tpool->jobq.cnt);
    1182           0 :         PR_ASSERT(0 == tpool->ioq.cnt);
    1183           0 :         PR_ASSERT(0 == tpool->timerq.cnt);
    1184             : 
    1185           0 :         delete_threadpool(tpool);
    1186           0 :         return rval;
    1187             : }

Generated by: LCOV version 1.13