Line data Source code
1 : /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 : /* This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5 :
6 : #include "primpl.h"
7 : #include "pprmwait.h"
8 :
9 : #define _MW_REHASH_MAX 11
10 :
11 : static PRLock *mw_lock = NULL;
12 : static _PRGlobalState *mw_state = NULL;
13 :
14 : static PRIntervalTime max_polling_interval;
15 :
16 : #ifdef WINNT
17 :
18 : typedef struct TimerEvent {
19 : PRIntervalTime absolute;
20 : void (*func)(void *);
21 : void *arg;
22 : LONG ref_count;
23 : PRCList links;
24 : } TimerEvent;
25 :
26 : #define TIMER_EVENT_PTR(_qp) \
27 : ((TimerEvent *) ((char *) (_qp) - offsetof(TimerEvent, links)))
28 :
29 : struct {
30 : PRLock *ml;
31 : PRCondVar *new_timer;
32 : PRCondVar *cancel_timer;
33 : PRThread *manager_thread;
34 : PRCList timer_queue;
35 : } tm_vars;
36 :
37 : static PRStatus TimerInit(void);
38 : static void TimerManager(void *arg);
39 : static TimerEvent *CreateTimer(PRIntervalTime timeout,
40 : void (*func)(void *), void *arg);
41 : static PRBool CancelTimer(TimerEvent *timer);
42 :
43 : static void TimerManager(void *arg)
44 : {
45 : PRIntervalTime now;
46 : PRIntervalTime timeout;
47 : PRCList *head;
48 : TimerEvent *timer;
49 :
50 : PR_Lock(tm_vars.ml);
51 : while (1)
52 : {
53 : if (PR_CLIST_IS_EMPTY(&tm_vars.timer_queue))
54 : {
55 : PR_WaitCondVar(tm_vars.new_timer, PR_INTERVAL_NO_TIMEOUT);
56 : }
57 : else
58 : {
59 : now = PR_IntervalNow();
60 : head = PR_LIST_HEAD(&tm_vars.timer_queue);
61 : timer = TIMER_EVENT_PTR(head);
62 : if ((PRInt32) (now - timer->absolute) >= 0)
63 : {
64 : PR_REMOVE_LINK(head);
65 : /*
66 : * make its prev and next point to itself so that
67 : * it's obvious that it's not on the timer_queue.
68 : */
69 : PR_INIT_CLIST(head);
70 : PR_ASSERT(2 == timer->ref_count);
71 : PR_Unlock(tm_vars.ml);
72 : timer->func(timer->arg);
73 : PR_Lock(tm_vars.ml);
74 : timer->ref_count -= 1;
75 : if (0 == timer->ref_count)
76 : {
77 : PR_NotifyAllCondVar(tm_vars.cancel_timer);
78 : }
79 : }
80 : else
81 : {
82 : timeout = (PRIntervalTime)(timer->absolute - now);
83 : PR_WaitCondVar(tm_vars.new_timer, timeout);
84 : }
85 : }
86 : }
87 : PR_Unlock(tm_vars.ml);
88 : }
89 :
90 : static TimerEvent *CreateTimer(
91 : PRIntervalTime timeout,
92 : void (*func)(void *),
93 : void *arg)
94 : {
95 : TimerEvent *timer;
96 : PRCList *links, *tail;
97 : TimerEvent *elem;
98 :
99 : timer = PR_NEW(TimerEvent);
100 : if (NULL == timer)
101 : {
102 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
103 : return timer;
104 : }
105 : timer->absolute = PR_IntervalNow() + timeout;
106 : timer->func = func;
107 : timer->arg = arg;
108 : timer->ref_count = 2;
109 : PR_Lock(tm_vars.ml);
110 : tail = links = PR_LIST_TAIL(&tm_vars.timer_queue);
111 : while (links->prev != tail)
112 : {
113 : elem = TIMER_EVENT_PTR(links);
114 : if ((PRInt32)(timer->absolute - elem->absolute) >= 0)
115 : {
116 : break;
117 : }
118 : links = links->prev;
119 : }
120 : PR_INSERT_AFTER(&timer->links, links);
121 : PR_NotifyCondVar(tm_vars.new_timer);
122 : PR_Unlock(tm_vars.ml);
123 : return timer;
124 : }
125 :
126 : static PRBool CancelTimer(TimerEvent *timer)
127 : {
128 : PRBool canceled = PR_FALSE;
129 :
130 : PR_Lock(tm_vars.ml);
131 : timer->ref_count -= 1;
132 : if (timer->links.prev == &timer->links)
133 : {
134 : while (timer->ref_count == 1)
135 : {
136 : PR_WaitCondVar(tm_vars.cancel_timer, PR_INTERVAL_NO_TIMEOUT);
137 : }
138 : }
139 : else
140 : {
141 : PR_REMOVE_LINK(&timer->links);
142 : canceled = PR_TRUE;
143 : }
144 : PR_Unlock(tm_vars.ml);
145 : PR_DELETE(timer);
146 : return canceled;
147 : }
148 :
149 : static PRStatus TimerInit(void)
150 : {
151 : tm_vars.ml = PR_NewLock();
152 : if (NULL == tm_vars.ml)
153 : {
154 : goto failed;
155 : }
156 : tm_vars.new_timer = PR_NewCondVar(tm_vars.ml);
157 : if (NULL == tm_vars.new_timer)
158 : {
159 : goto failed;
160 : }
161 : tm_vars.cancel_timer = PR_NewCondVar(tm_vars.ml);
162 : if (NULL == tm_vars.cancel_timer)
163 : {
164 : goto failed;
165 : }
166 : PR_INIT_CLIST(&tm_vars.timer_queue);
167 : tm_vars.manager_thread = PR_CreateThread(
168 : PR_SYSTEM_THREAD, TimerManager, NULL, PR_PRIORITY_NORMAL,
169 : PR_LOCAL_THREAD, PR_UNJOINABLE_THREAD, 0);
170 : if (NULL == tm_vars.manager_thread)
171 : {
172 : goto failed;
173 : }
174 : return PR_SUCCESS;
175 :
176 : failed:
177 : if (NULL != tm_vars.cancel_timer)
178 : {
179 : PR_DestroyCondVar(tm_vars.cancel_timer);
180 : }
181 : if (NULL != tm_vars.new_timer)
182 : {
183 : PR_DestroyCondVar(tm_vars.new_timer);
184 : }
185 : if (NULL != tm_vars.ml)
186 : {
187 : PR_DestroyLock(tm_vars.ml);
188 : }
189 : return PR_FAILURE;
190 : }
191 :
192 : #endif /* WINNT */
193 :
194 : /******************************************************************/
195 : /******************************************************************/
196 : /************************ The private portion *********************/
197 : /******************************************************************/
198 : /******************************************************************/
199 3 : void _PR_InitMW(void)
200 : {
201 : #ifdef WINNT
202 : /*
203 : * We use NT 4's InterlockedCompareExchange() to operate
204 : * on PRMWStatus variables.
205 : */
206 : PR_ASSERT(sizeof(LONG) == sizeof(PRMWStatus));
207 : TimerInit();
208 : #endif
209 3 : mw_lock = PR_NewLock();
210 3 : PR_ASSERT(NULL != mw_lock);
211 3 : mw_state = PR_NEWZAP(_PRGlobalState);
212 3 : PR_ASSERT(NULL != mw_state);
213 3 : PR_INIT_CLIST(&mw_state->group_list);
214 3 : max_polling_interval = PR_MillisecondsToInterval(MAX_POLLING_INTERVAL);
215 3 : } /* _PR_InitMW */
216 :
217 0 : void _PR_CleanupMW(void)
218 : {
219 0 : PR_DestroyLock(mw_lock);
220 0 : mw_lock = NULL;
221 0 : if (mw_state->group) {
222 0 : PR_DestroyWaitGroup(mw_state->group);
223 : /* mw_state->group is set to NULL as a side effect. */
224 : }
225 0 : PR_DELETE(mw_state);
226 0 : } /* _PR_CleanupMW */
227 :
228 0 : static PRWaitGroup *MW_Init2(void)
229 : {
230 0 : PRWaitGroup *group = mw_state->group; /* it's the null group */
231 0 : if (NULL == group) /* there is this special case */
232 : {
233 0 : group = PR_CreateWaitGroup(_PR_DEFAULT_HASH_LENGTH);
234 0 : if (NULL == group) goto failed_alloc;
235 0 : PR_Lock(mw_lock);
236 0 : if (NULL == mw_state->group)
237 : {
238 0 : mw_state->group = group;
239 0 : group = NULL;
240 : }
241 0 : PR_Unlock(mw_lock);
242 0 : if (group != NULL) (void)PR_DestroyWaitGroup(group);
243 0 : group = mw_state->group; /* somebody beat us to it */
244 : }
245 : failed_alloc:
246 0 : return group; /* whatever */
247 : } /* MW_Init2 */
248 :
249 0 : static _PR_HashStory MW_AddHashInternal(PRRecvWait *desc, _PRWaiterHash *hash)
250 : {
251 : /*
252 : ** The entries are put in the table using the fd (PRFileDesc*) of
253 : ** the receive descriptor as the key. This allows us to locate
254 : ** the appropriate entry aqain when the poll operation finishes.
255 : **
256 : ** The pointer to the file descriptor object is first divided by
257 : ** the natural alignment of a pointer in the belief that object
258 : ** will have at least that many zeros in the low order bits.
259 : ** This may not be a good assuption.
260 : **
261 : ** We try to put the entry in by rehashing _MW_REHASH_MAX times. After
262 : ** that we declare defeat and force the table to be reconstructed.
263 : ** Since some fds might be added more than once, won't that cause
264 : ** collisions even in an empty table?
265 : */
266 0 : PRIntn rehash = _MW_REHASH_MAX;
267 : PRRecvWait **waiter;
268 0 : PRUintn hidx = _MW_HASH(desc->fd, hash->length);
269 0 : PRUintn hoffset = 0;
270 :
271 0 : while (rehash-- > 0)
272 : {
273 0 : waiter = &hash->recv_wait;
274 0 : if (NULL == waiter[hidx])
275 : {
276 0 : waiter[hidx] = desc;
277 0 : hash->count += 1;
278 : #if 0
279 : printf("Adding 0x%x->0x%x ", desc, desc->fd);
280 : printf(
281 : "table[%u:%u:*%u]: 0x%x->0x%x\n",
282 : hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
283 : #endif
284 0 : return _prmw_success;
285 : }
286 0 : if (desc == waiter[hidx])
287 : {
288 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0); /* desc already in table */
289 0 : return _prmw_error;
290 : }
291 : #if 0
292 : printf("Failing 0x%x->0x%x ", desc, desc->fd);
293 : printf(
294 : "table[*%u:%u:%u]: 0x%x->0x%x\n",
295 : hidx, hash->count, hash->length, waiter[hidx], waiter[hidx]->fd);
296 : #endif
297 0 : if (0 == hoffset)
298 : {
299 0 : hoffset = _MW_HASH2(desc->fd, hash->length);
300 0 : PR_ASSERT(0 != hoffset);
301 : }
302 0 : hidx = (hidx + hoffset) % (hash->length);
303 : }
304 0 : return _prmw_rehash;
305 : } /* MW_AddHashInternal */
306 :
307 0 : static _PR_HashStory MW_ExpandHashInternal(PRWaitGroup *group)
308 : {
309 : PRRecvWait **desc;
310 : PRUint32 pidx, length;
311 0 : _PRWaiterHash *newHash, *oldHash = group->waiter;
312 : PRBool retry;
313 : _PR_HashStory hrv;
314 :
315 : static const PRInt32 prime_number[] = {
316 : _PR_DEFAULT_HASH_LENGTH, 179, 521, 907, 1427,
317 : 2711, 3917, 5021, 8219, 11549, 18911, 26711, 33749, 44771};
318 0 : PRUintn primes = (sizeof(prime_number) / sizeof(PRInt32));
319 :
320 : /* look up the next size we'd like to use for the hash table */
321 0 : for (pidx = 0; pidx < primes; ++pidx)
322 : {
323 0 : if (prime_number[pidx] == oldHash->length)
324 : {
325 0 : break;
326 : }
327 : }
328 : /* table size must be one of the prime numbers */
329 0 : PR_ASSERT(pidx < primes);
330 :
331 : /* if pidx == primes - 1, we can't expand the table any more */
332 0 : while (pidx < primes - 1)
333 : {
334 : /* next size */
335 0 : ++pidx;
336 0 : length = prime_number[pidx];
337 :
338 : /* allocate the new hash table and fill it in with the old */
339 0 : newHash = (_PRWaiterHash*)PR_CALLOC(
340 : sizeof(_PRWaiterHash) + (length * sizeof(PRRecvWait*)));
341 0 : if (NULL == newHash)
342 : {
343 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
344 0 : return _prmw_error;
345 : }
346 :
347 0 : newHash->length = length;
348 0 : retry = PR_FALSE;
349 0 : for (desc = &oldHash->recv_wait;
350 0 : newHash->count < oldHash->count; ++desc)
351 : {
352 0 : PR_ASSERT(desc < &oldHash->recv_wait + oldHash->length);
353 0 : if (NULL != *desc)
354 : {
355 0 : hrv = MW_AddHashInternal(*desc, newHash);
356 0 : PR_ASSERT(_prmw_error != hrv);
357 0 : if (_prmw_success != hrv)
358 : {
359 0 : PR_DELETE(newHash);
360 0 : retry = PR_TRUE;
361 0 : break;
362 : }
363 : }
364 : }
365 0 : if (retry) continue;
366 :
367 0 : PR_DELETE(group->waiter);
368 0 : group->waiter = newHash;
369 0 : group->p_timestamp += 1;
370 0 : return _prmw_success;
371 : }
372 :
373 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
374 0 : return _prmw_error; /* we're hosed */
375 : } /* MW_ExpandHashInternal */
376 :
377 : #ifndef WINNT
378 0 : static void _MW_DoneInternal(
379 : PRWaitGroup *group, PRRecvWait **waiter, PRMWStatus outcome)
380 : {
381 : /*
382 : ** Add this receive wait object to the list of finished I/O
383 : ** operations for this particular group. If there are other
384 : ** threads waiting on the group, notify one. If not, arrange
385 : ** for this thread to return.
386 : */
387 :
388 : #if 0
389 : printf("Removing 0x%x->0x%x\n", *waiter, (*waiter)->fd);
390 : #endif
391 0 : (*waiter)->outcome = outcome;
392 0 : PR_APPEND_LINK(&((*waiter)->internal), &group->io_ready);
393 0 : PR_NotifyCondVar(group->io_complete);
394 0 : PR_ASSERT(0 != group->waiter->count);
395 0 : group->waiter->count -= 1;
396 0 : *waiter = NULL;
397 0 : } /* _MW_DoneInternal */
398 : #endif /* WINNT */
399 :
400 0 : static PRRecvWait **_MW_LookupInternal(PRWaitGroup *group, PRFileDesc *fd)
401 : {
402 : /*
403 : ** Find the receive wait object corresponding to the file descriptor.
404 : ** Only search the wait group specified.
405 : */
406 : PRRecvWait **desc;
407 0 : PRIntn rehash = _MW_REHASH_MAX;
408 0 : _PRWaiterHash *hash = group->waiter;
409 0 : PRUintn hidx = _MW_HASH(fd, hash->length);
410 0 : PRUintn hoffset = 0;
411 :
412 0 : while (rehash-- > 0)
413 : {
414 0 : desc = (&hash->recv_wait) + hidx;
415 0 : if ((*desc != NULL) && ((*desc)->fd == fd)) return desc;
416 0 : if (0 == hoffset)
417 : {
418 0 : hoffset = _MW_HASH2(fd, hash->length);
419 0 : PR_ASSERT(0 != hoffset);
420 : }
421 0 : hidx = (hidx + hoffset) % (hash->length);
422 : }
423 0 : return NULL;
424 : } /* _MW_LookupInternal */
425 :
426 : #ifndef WINNT
427 0 : static PRStatus _MW_PollInternal(PRWaitGroup *group)
428 : {
429 : PRRecvWait **waiter;
430 0 : PRStatus rv = PR_FAILURE;
431 : PRInt32 count, count_ready;
432 : PRIntervalTime polling_interval;
433 :
434 0 : group->poller = PR_GetCurrentThread();
435 :
436 : while (PR_TRUE)
437 0 : {
438 : PRIntervalTime now, since_last_poll;
439 : PRPollDesc *poll_list;
440 :
441 0 : while (0 == group->waiter->count)
442 : {
443 : PRStatus st;
444 0 : st = PR_WaitCondVar(group->new_business, PR_INTERVAL_NO_TIMEOUT);
445 0 : if (_prmw_running != group->state)
446 : {
447 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
448 0 : goto aborted;
449 : }
450 0 : if (_MW_ABORTED(st)) goto aborted;
451 : }
452 :
453 : /*
454 : ** There's something to do. See if our existing polling list
455 : ** is large enough for what we have to do?
456 : */
457 :
458 0 : while (group->polling_count < group->waiter->count)
459 : {
460 0 : PRUint32 old_count = group->waiter->count;
461 0 : PRUint32 new_count = PR_ROUNDUP(old_count, _PR_POLL_COUNT_FUDGE);
462 0 : PRSize new_size = sizeof(PRPollDesc) * new_count;
463 0 : PRPollDesc *old_polling_list = group->polling_list;
464 :
465 0 : PR_Unlock(group->ml);
466 0 : poll_list = (PRPollDesc*)PR_CALLOC(new_size);
467 0 : if (NULL == poll_list)
468 : {
469 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
470 0 : PR_Lock(group->ml);
471 0 : goto failed_alloc;
472 : }
473 0 : if (NULL != old_polling_list)
474 0 : PR_DELETE(old_polling_list);
475 0 : PR_Lock(group->ml);
476 0 : if (_prmw_running != group->state)
477 : {
478 0 : PR_DELETE(poll_list);
479 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
480 0 : goto aborted;
481 : }
482 0 : group->polling_list = poll_list;
483 0 : group->polling_count = new_count;
484 : }
485 :
486 0 : now = PR_IntervalNow();
487 0 : polling_interval = max_polling_interval;
488 0 : since_last_poll = now - group->last_poll;
489 :
490 0 : waiter = &group->waiter->recv_wait;
491 0 : poll_list = group->polling_list;
492 0 : for (count = 0; count < group->waiter->count; ++waiter)
493 : {
494 0 : PR_ASSERT(waiter < &group->waiter->recv_wait
495 : + group->waiter->length);
496 0 : if (NULL != *waiter) /* a live one! */
497 : {
498 0 : if ((PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
499 0 : && (since_last_poll >= (*waiter)->timeout))
500 0 : _MW_DoneInternal(group, waiter, PR_MW_TIMEOUT);
501 : else
502 : {
503 0 : if (PR_INTERVAL_NO_TIMEOUT != (*waiter)->timeout)
504 : {
505 0 : (*waiter)->timeout -= since_last_poll;
506 0 : if ((*waiter)->timeout < polling_interval)
507 0 : polling_interval = (*waiter)->timeout;
508 : }
509 0 : PR_ASSERT(poll_list < group->polling_list
510 : + group->polling_count);
511 0 : poll_list->fd = (*waiter)->fd;
512 0 : poll_list->in_flags = PR_POLL_READ;
513 0 : poll_list->out_flags = 0;
514 : #if 0
515 : printf(
516 : "Polling 0x%x[%d]: [fd: 0x%x, tmo: %u]\n",
517 : poll_list, count, poll_list->fd, (*waiter)->timeout);
518 : #endif
519 0 : poll_list += 1;
520 0 : count += 1;
521 : }
522 : }
523 : }
524 :
525 0 : PR_ASSERT(count == group->waiter->count);
526 :
527 : /*
528 : ** If there are no more threads waiting for completion,
529 : ** we need to return.
530 : */
531 0 : if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
532 0 : && (1 == group->waiting_threads)) break;
533 :
534 0 : if (0 == count) continue; /* wait for new business */
535 :
536 0 : group->last_poll = now;
537 :
538 0 : PR_Unlock(group->ml);
539 :
540 0 : count_ready = PR_Poll(group->polling_list, count, polling_interval);
541 :
542 0 : PR_Lock(group->ml);
543 :
544 0 : if (_prmw_running != group->state)
545 : {
546 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
547 0 : goto aborted;
548 : }
549 0 : if (-1 == count_ready)
550 : {
551 0 : goto failed_poll; /* that's a shame */
552 : }
553 0 : else if (0 < count_ready)
554 : {
555 0 : for (poll_list = group->polling_list; count > 0;
556 0 : poll_list++, count--)
557 : {
558 0 : PR_ASSERT(
559 : poll_list < group->polling_list + group->polling_count);
560 0 : if (poll_list->out_flags != 0)
561 : {
562 0 : waiter = _MW_LookupInternal(group, poll_list->fd);
563 : /*
564 : ** If 'waiter' is NULL, that means the wait receive
565 : ** descriptor has been canceled.
566 : */
567 0 : if (NULL != waiter)
568 0 : _MW_DoneInternal(group, waiter, PR_MW_SUCCESS);
569 : }
570 : }
571 : }
572 : /*
573 : ** If there are no more threads waiting for completion,
574 : ** we need to return.
575 : ** This thread was "borrowed" to do the polling, but it really
576 : ** belongs to the client.
577 : */
578 0 : if ((!PR_CLIST_IS_EMPTY(&group->io_ready))
579 0 : && (1 == group->waiting_threads)) break;
580 : }
581 :
582 0 : rv = PR_SUCCESS;
583 :
584 : aborted:
585 : failed_poll:
586 : failed_alloc:
587 0 : group->poller = NULL; /* we were that, not we ain't */
588 0 : if ((_prmw_running == group->state) && (group->waiting_threads > 1))
589 : {
590 : /* Wake up one thread to become the new poller. */
591 0 : PR_NotifyCondVar(group->io_complete);
592 : }
593 0 : return rv; /* we return with the lock held */
594 : } /* _MW_PollInternal */
595 : #endif /* !WINNT */
596 :
597 0 : static PRMWGroupState MW_TestForShutdownInternal(PRWaitGroup *group)
598 : {
599 0 : PRMWGroupState rv = group->state;
600 : /*
601 : ** Looking at the group's fields is safe because
602 : ** once the group's state is no longer running, it
603 : ** cannot revert and there is a safe check on entry
604 : ** to make sure no more threads are made to wait.
605 : */
606 0 : if ((_prmw_stopping == rv)
607 0 : && (0 == group->waiting_threads))
608 : {
609 0 : rv = group->state = _prmw_stopped;
610 0 : PR_NotifyCondVar(group->mw_manage);
611 : }
612 0 : return rv;
613 : } /* MW_TestForShutdownInternal */
614 :
615 : #ifndef WINNT
616 0 : static void _MW_InitialRecv(PRCList *io_ready)
617 : {
618 0 : PRRecvWait *desc = (PRRecvWait*)io_ready;
619 0 : if ((NULL == desc->buffer.start)
620 0 : || (0 == desc->buffer.length))
621 0 : desc->bytesRecv = 0;
622 : else
623 : {
624 0 : desc->bytesRecv = (desc->fd->methods->recv)(
625 : desc->fd, desc->buffer.start,
626 0 : desc->buffer.length, 0, desc->timeout);
627 0 : if (desc->bytesRecv < 0) /* SetError should already be there */
628 0 : desc->outcome = PR_MW_FAILURE;
629 : }
630 0 : } /* _MW_InitialRecv */
631 : #endif
632 :
633 : #ifdef WINNT
634 : static void NT_TimeProc(void *arg)
635 : {
636 : _MDOverlapped *overlapped = (_MDOverlapped *)arg;
637 : PRRecvWait *desc = overlapped->data.mw.desc;
638 : PRFileDesc *bottom;
639 :
640 : if (InterlockedCompareExchange((LONG *)&desc->outcome,
641 : (LONG)PR_MW_TIMEOUT, (LONG)PR_MW_PENDING) != (LONG)PR_MW_PENDING)
642 : {
643 : /* This wait recv descriptor has already completed. */
644 : return;
645 : }
646 :
647 : /* close the osfd to abort the outstanding async io request */
648 : /* $$$$
649 : ** Little late to be checking if NSPR's on the bottom of stack,
650 : ** but if we don't check, we can't assert that the private data
651 : ** is what we think it is.
652 : ** $$$$
653 : */
654 : bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
655 : PR_ASSERT(NULL != bottom);
656 : if (NULL != bottom) /* now what!?!?! */
657 : {
658 : bottom->secret->state = _PR_FILEDESC_CLOSED;
659 : if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
660 : {
661 : fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
662 : PR_NOT_REACHED("What shall I do?");
663 : }
664 : }
665 : return;
666 : } /* NT_TimeProc */
667 :
668 : static PRStatus NT_HashRemove(PRWaitGroup *group, PRFileDesc *fd)
669 : {
670 : PRRecvWait **waiter;
671 :
672 : _PR_MD_LOCK(&group->mdlock);
673 : waiter = _MW_LookupInternal(group, fd);
674 : if (NULL != waiter)
675 : {
676 : group->waiter->count -= 1;
677 : *waiter = NULL;
678 : }
679 : _PR_MD_UNLOCK(&group->mdlock);
680 : return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
681 : }
682 :
683 : PRStatus NT_HashRemoveInternal(PRWaitGroup *group, PRFileDesc *fd)
684 : {
685 : PRRecvWait **waiter;
686 :
687 : waiter = _MW_LookupInternal(group, fd);
688 : if (NULL != waiter)
689 : {
690 : group->waiter->count -= 1;
691 : *waiter = NULL;
692 : }
693 : return (NULL != waiter) ? PR_SUCCESS : PR_FAILURE;
694 : }
695 : #endif /* WINNT */
696 :
697 : /******************************************************************/
698 : /******************************************************************/
699 : /********************** The public API portion ********************/
700 : /******************************************************************/
701 : /******************************************************************/
702 : PR_IMPLEMENT(PRStatus) PR_AddWaitFileDesc(
703 : PRWaitGroup *group, PRRecvWait *desc)
704 : {
705 : _PR_HashStory hrv;
706 0 : PRStatus rv = PR_FAILURE;
707 : #ifdef WINNT
708 : _MDOverlapped *overlapped;
709 : HANDLE hFile;
710 : BOOL bResult;
711 : DWORD dwError;
712 : PRFileDesc *bottom;
713 : #endif
714 :
715 0 : if (!_pr_initialized) _PR_ImplicitInitialization();
716 0 : if ((NULL == group) && (NULL == (group = MW_Init2())))
717 : {
718 0 : return rv;
719 : }
720 :
721 0 : PR_ASSERT(NULL != desc->fd);
722 :
723 0 : desc->outcome = PR_MW_PENDING; /* nice, well known value */
724 0 : desc->bytesRecv = 0; /* likewise, though this value is ambiguious */
725 :
726 0 : PR_Lock(group->ml);
727 :
728 0 : if (_prmw_running != group->state)
729 : {
730 : /* Not allowed to add after cancelling the group */
731 0 : desc->outcome = PR_MW_INTERRUPT;
732 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
733 0 : PR_Unlock(group->ml);
734 0 : return rv;
735 : }
736 :
737 : #ifdef WINNT
738 : _PR_MD_LOCK(&group->mdlock);
739 : #endif
740 :
741 : /*
742 : ** If the waiter count is zero at this point, there's no telling
743 : ** how long we've been idle. Therefore, initialize the beginning
744 : ** of the timing interval. As long as the list doesn't go empty,
745 : ** it will maintain itself.
746 : */
747 0 : if (0 == group->waiter->count)
748 0 : group->last_poll = PR_IntervalNow();
749 :
750 : do
751 : {
752 0 : hrv = MW_AddHashInternal(desc, group->waiter);
753 0 : if (_prmw_rehash != hrv) break;
754 0 : hrv = MW_ExpandHashInternal(group); /* gruesome */
755 0 : if (_prmw_success != hrv) break;
756 : } while (PR_TRUE);
757 :
758 : #ifdef WINNT
759 : _PR_MD_UNLOCK(&group->mdlock);
760 : #endif
761 :
762 0 : PR_NotifyCondVar(group->new_business); /* tell the world */
763 0 : rv = (_prmw_success == hrv) ? PR_SUCCESS : PR_FAILURE;
764 0 : PR_Unlock(group->ml);
765 :
766 : #ifdef WINNT
767 : overlapped = PR_NEWZAP(_MDOverlapped);
768 : if (NULL == overlapped)
769 : {
770 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
771 : NT_HashRemove(group, desc->fd);
772 : return rv;
773 : }
774 : overlapped->ioModel = _MD_MultiWaitIO;
775 : overlapped->data.mw.desc = desc;
776 : overlapped->data.mw.group = group;
777 : if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
778 : {
779 : overlapped->data.mw.timer = CreateTimer(
780 : desc->timeout,
781 : NT_TimeProc,
782 : overlapped);
783 : if (0 == overlapped->data.mw.timer)
784 : {
785 : NT_HashRemove(group, desc->fd);
786 : PR_DELETE(overlapped);
787 : /*
788 : * XXX It appears that a maximum of 16 timer events can
789 : * be outstanding. GetLastError() returns 0 when I try it.
790 : */
791 : PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR, GetLastError());
792 : return PR_FAILURE;
793 : }
794 : }
795 :
796 : /* Reach to the bottom layer to get the OS fd */
797 : bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
798 : PR_ASSERT(NULL != bottom);
799 : if (NULL == bottom)
800 : {
801 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
802 : return PR_FAILURE;
803 : }
804 : hFile = (HANDLE)bottom->secret->md.osfd;
805 : if (!bottom->secret->md.io_model_committed)
806 : {
807 : PRInt32 st;
808 : st = _md_Associate(hFile);
809 : PR_ASSERT(0 != st);
810 : bottom->secret->md.io_model_committed = PR_TRUE;
811 : }
812 : bResult = ReadFile(hFile,
813 : desc->buffer.start,
814 : (DWORD)desc->buffer.length,
815 : NULL,
816 : &overlapped->overlapped);
817 : if (FALSE == bResult && (dwError = GetLastError()) != ERROR_IO_PENDING)
818 : {
819 : if (desc->timeout != PR_INTERVAL_NO_TIMEOUT)
820 : {
821 : if (InterlockedCompareExchange((LONG *)&desc->outcome,
822 : (LONG)PR_MW_FAILURE, (LONG)PR_MW_PENDING)
823 : == (LONG)PR_MW_PENDING)
824 : {
825 : CancelTimer(overlapped->data.mw.timer);
826 : }
827 : NT_HashRemove(group, desc->fd);
828 : PR_DELETE(overlapped);
829 : }
830 : _PR_MD_MAP_READ_ERROR(dwError);
831 : rv = PR_FAILURE;
832 : }
833 : #endif
834 :
835 0 : return rv;
836 : } /* PR_AddWaitFileDesc */
837 :
838 : PR_IMPLEMENT(PRRecvWait*) PR_WaitRecvReady(PRWaitGroup *group)
839 : {
840 0 : PRCList *io_ready = NULL;
841 : #ifdef WINNT
842 : PRThread *me = _PR_MD_CURRENT_THREAD();
843 : _MDOverlapped *overlapped;
844 : #endif
845 :
846 0 : if (!_pr_initialized) _PR_ImplicitInitialization();
847 0 : if ((NULL == group) && (NULL == (group = MW_Init2()))) goto failed_init;
848 :
849 0 : PR_Lock(group->ml);
850 :
851 0 : if (_prmw_running != group->state)
852 : {
853 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
854 0 : goto invalid_state;
855 : }
856 :
857 0 : group->waiting_threads += 1; /* the polling thread is counted */
858 :
859 : #ifdef WINNT
860 : _PR_MD_LOCK(&group->mdlock);
861 : while (PR_CLIST_IS_EMPTY(&group->io_ready))
862 : {
863 : _PR_THREAD_LOCK(me);
864 : me->state = _PR_IO_WAIT;
865 : PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
866 : if (!_PR_IS_NATIVE_THREAD(me))
867 : {
868 : _PR_SLEEPQ_LOCK(me->cpu);
869 : _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
870 : _PR_SLEEPQ_UNLOCK(me->cpu);
871 : }
872 : _PR_THREAD_UNLOCK(me);
873 : _PR_MD_UNLOCK(&group->mdlock);
874 : PR_Unlock(group->ml);
875 : _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
876 : me->state = _PR_RUNNING;
877 : PR_Lock(group->ml);
878 : _PR_MD_LOCK(&group->mdlock);
879 : if (_PR_PENDING_INTERRUPT(me)) {
880 : PR_REMOVE_LINK(&me->waitQLinks);
881 : _PR_MD_UNLOCK(&group->mdlock);
882 : me->flags &= ~_PR_INTERRUPT;
883 : me->io_suspended = PR_FALSE;
884 : PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
885 : goto aborted;
886 : }
887 : }
888 : io_ready = PR_LIST_HEAD(&group->io_ready);
889 : PR_ASSERT(io_ready != NULL);
890 : PR_REMOVE_LINK(io_ready);
891 : _PR_MD_UNLOCK(&group->mdlock);
892 : overlapped = (_MDOverlapped *)
893 : ((char *)io_ready - offsetof(_MDOverlapped, data));
894 : io_ready = &overlapped->data.mw.desc->internal;
895 : #else
896 : do
897 : {
898 : /*
899 : ** If the I/O ready list isn't empty, have this thread
900 : ** return with the first receive wait object that's available.
901 : */
902 0 : if (PR_CLIST_IS_EMPTY(&group->io_ready))
903 : {
904 : /*
905 : ** Is there a polling thread yet? If not, grab this thread
906 : ** and use it.
907 : */
908 0 : if (NULL == group->poller)
909 : {
910 : /*
911 : ** This thread will stay do polling until it becomes the only one
912 : ** left to service a completion. Then it will return and there will
913 : ** be none left to actually poll or to run completions.
914 : **
915 : ** The polling function should only return w/ failure or
916 : ** with some I/O ready.
917 : */
918 0 : if (PR_FAILURE == _MW_PollInternal(group)) goto failed_poll;
919 : }
920 : else
921 : {
922 : /*
923 : ** There are four reasons a thread can be awakened from
924 : ** a wait on the io_complete condition variable.
925 : ** 1. Some I/O has completed, i.e., the io_ready list
926 : ** is nonempty.
927 : ** 2. The wait group is canceled.
928 : ** 3. The thread is interrupted.
929 : ** 4. The current polling thread has to leave and needs
930 : ** a replacement.
931 : ** The logic to find a new polling thread is made more
932 : ** complicated by all the other possible events.
933 : ** I tried my best to write the logic clearly, but
934 : ** it is still full of if's with continue and goto.
935 : */
936 : PRStatus st;
937 : do
938 : {
939 0 : st = PR_WaitCondVar(group->io_complete, PR_INTERVAL_NO_TIMEOUT);
940 0 : if (_prmw_running != group->state)
941 : {
942 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
943 0 : goto aborted;
944 : }
945 0 : if (_MW_ABORTED(st) || (NULL == group->poller)) break;
946 0 : } while (PR_CLIST_IS_EMPTY(&group->io_ready));
947 :
948 : /*
949 : ** The thread is interrupted and has to leave. It might
950 : ** have also been awakened to process ready i/o or be the
951 : ** new poller. To be safe, if either condition is true,
952 : ** we awaken another thread to take its place.
953 : */
954 0 : if (_MW_ABORTED(st))
955 : {
956 0 : if ((NULL == group->poller
957 0 : || !PR_CLIST_IS_EMPTY(&group->io_ready))
958 0 : && group->waiting_threads > 1)
959 0 : PR_NotifyCondVar(group->io_complete);
960 0 : goto aborted;
961 : }
962 :
963 : /*
964 : ** A new poller is needed, but can I be the new poller?
965 : ** If there is no i/o ready, sure. But if there is any
966 : ** i/o ready, it has a higher priority. I want to
967 : ** process the ready i/o first and wake up another
968 : ** thread to be the new poller.
969 : */
970 0 : if (NULL == group->poller)
971 : {
972 0 : if (PR_CLIST_IS_EMPTY(&group->io_ready))
973 0 : continue;
974 0 : if (group->waiting_threads > 1)
975 0 : PR_NotifyCondVar(group->io_complete);
976 : }
977 : }
978 0 : PR_ASSERT(!PR_CLIST_IS_EMPTY(&group->io_ready));
979 : }
980 0 : io_ready = PR_LIST_HEAD(&group->io_ready);
981 0 : PR_NotifyCondVar(group->io_taken);
982 0 : PR_ASSERT(io_ready != NULL);
983 0 : PR_REMOVE_LINK(io_ready);
984 0 : } while (NULL == io_ready);
985 :
986 : failed_poll:
987 :
988 : #endif
989 :
990 : aborted:
991 :
992 0 : group->waiting_threads -= 1;
993 : invalid_state:
994 0 : (void)MW_TestForShutdownInternal(group);
995 0 : PR_Unlock(group->ml);
996 :
997 : failed_init:
998 0 : if (NULL != io_ready)
999 : {
1000 : /* If the operation failed, record the reason why */
1001 0 : switch (((PRRecvWait*)io_ready)->outcome)
1002 : {
1003 : case PR_MW_PENDING:
1004 0 : PR_ASSERT(0);
1005 0 : break;
1006 : case PR_MW_SUCCESS:
1007 : #ifndef WINNT
1008 0 : _MW_InitialRecv(io_ready);
1009 : #endif
1010 0 : break;
1011 : #ifdef WINNT
1012 : case PR_MW_FAILURE:
1013 : _PR_MD_MAP_READ_ERROR(overlapped->data.mw.error);
1014 : break;
1015 : #endif
1016 : case PR_MW_TIMEOUT:
1017 0 : PR_SetError(PR_IO_TIMEOUT_ERROR, 0);
1018 0 : break;
1019 : case PR_MW_INTERRUPT:
1020 0 : PR_SetError(PR_PENDING_INTERRUPT_ERROR, 0);
1021 0 : break;
1022 0 : default: break;
1023 : }
1024 : #ifdef WINNT
1025 : if (NULL != overlapped->data.mw.timer)
1026 : {
1027 : PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1028 : != overlapped->data.mw.desc->timeout);
1029 : CancelTimer(overlapped->data.mw.timer);
1030 : }
1031 : else
1032 : {
1033 : PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1034 : == overlapped->data.mw.desc->timeout);
1035 : }
1036 : PR_DELETE(overlapped);
1037 : #endif
1038 : }
1039 0 : return (PRRecvWait*)io_ready;
1040 : } /* PR_WaitRecvReady */
1041 :
1042 : PR_IMPLEMENT(PRStatus) PR_CancelWaitFileDesc(PRWaitGroup *group, PRRecvWait *desc)
1043 : {
1044 : #if !defined(WINNT)
1045 : PRRecvWait **recv_wait;
1046 : #endif
1047 0 : PRStatus rv = PR_SUCCESS;
1048 0 : if (NULL == group) group = mw_state->group;
1049 0 : PR_ASSERT(NULL != group);
1050 0 : if (NULL == group)
1051 : {
1052 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1053 0 : return PR_FAILURE;
1054 : }
1055 :
1056 0 : PR_Lock(group->ml);
1057 :
1058 0 : if (_prmw_running != group->state)
1059 : {
1060 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
1061 0 : rv = PR_FAILURE;
1062 0 : goto unlock;
1063 : }
1064 :
1065 : #ifdef WINNT
1066 : if (InterlockedCompareExchange((LONG *)&desc->outcome,
1067 : (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING) == (LONG)PR_MW_PENDING)
1068 : {
1069 : PRFileDesc *bottom = PR_GetIdentitiesLayer(desc->fd, PR_NSPR_IO_LAYER);
1070 : PR_ASSERT(NULL != bottom);
1071 : if (NULL == bottom)
1072 : {
1073 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1074 : goto unlock;
1075 : }
1076 : bottom->secret->state = _PR_FILEDESC_CLOSED;
1077 : #if 0
1078 : fprintf(stderr, "cancel wait recv: closing socket\n");
1079 : #endif
1080 : if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1081 : {
1082 : fprintf(stderr, "closesocket failed: %d\n", WSAGetLastError());
1083 : exit(1);
1084 : }
1085 : }
1086 : #else
1087 0 : if (NULL != (recv_wait = _MW_LookupInternal(group, desc->fd)))
1088 : {
1089 : /* it was in the wait table */
1090 0 : _MW_DoneInternal(group, recv_wait, PR_MW_INTERRUPT);
1091 0 : goto unlock;
1092 : }
1093 0 : if (!PR_CLIST_IS_EMPTY(&group->io_ready))
1094 : {
1095 : /* is it already complete? */
1096 0 : PRCList *head = PR_LIST_HEAD(&group->io_ready);
1097 : do
1098 : {
1099 0 : PRRecvWait *done = (PRRecvWait*)head;
1100 0 : if (done == desc) goto unlock;
1101 0 : head = PR_NEXT_LINK(head);
1102 0 : } while (head != &group->io_ready);
1103 : }
1104 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1105 0 : rv = PR_FAILURE;
1106 :
1107 : #endif
1108 : unlock:
1109 0 : PR_Unlock(group->ml);
1110 0 : return rv;
1111 : } /* PR_CancelWaitFileDesc */
1112 :
1113 : PR_IMPLEMENT(PRRecvWait*) PR_CancelWaitGroup(PRWaitGroup *group)
1114 : {
1115 : PRRecvWait **desc;
1116 0 : PRRecvWait *recv_wait = NULL;
1117 : #ifdef WINNT
1118 : _MDOverlapped *overlapped;
1119 : PRRecvWait **end;
1120 : PRThread *me = _PR_MD_CURRENT_THREAD();
1121 : #endif
1122 :
1123 0 : if (NULL == group) group = mw_state->group;
1124 0 : PR_ASSERT(NULL != group);
1125 0 : if (NULL == group)
1126 : {
1127 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1128 0 : return NULL;
1129 : }
1130 :
1131 0 : PR_Lock(group->ml);
1132 0 : if (_prmw_stopped != group->state)
1133 : {
1134 0 : if (_prmw_running == group->state)
1135 0 : group->state = _prmw_stopping; /* so nothing new comes in */
1136 0 : if (0 == group->waiting_threads) /* is there anybody else? */
1137 0 : group->state = _prmw_stopped; /* we can stop right now */
1138 : else
1139 : {
1140 0 : PR_NotifyAllCondVar(group->new_business);
1141 0 : PR_NotifyAllCondVar(group->io_complete);
1142 : }
1143 0 : while (_prmw_stopped != group->state)
1144 0 : (void)PR_WaitCondVar(group->mw_manage, PR_INTERVAL_NO_TIMEOUT);
1145 : }
1146 :
1147 : #ifdef WINNT
1148 : _PR_MD_LOCK(&group->mdlock);
1149 : #endif
1150 : /* make all the existing descriptors look done/interrupted */
1151 : #ifdef WINNT
1152 : end = &group->waiter->recv_wait + group->waiter->length;
1153 : for (desc = &group->waiter->recv_wait; desc < end; ++desc)
1154 : {
1155 : if (NULL != *desc)
1156 : {
1157 : if (InterlockedCompareExchange((LONG *)&(*desc)->outcome,
1158 : (LONG)PR_MW_INTERRUPT, (LONG)PR_MW_PENDING)
1159 : == (LONG)PR_MW_PENDING)
1160 : {
1161 : PRFileDesc *bottom = PR_GetIdentitiesLayer(
1162 : (*desc)->fd, PR_NSPR_IO_LAYER);
1163 : PR_ASSERT(NULL != bottom);
1164 : if (NULL == bottom)
1165 : {
1166 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1167 : goto invalid_arg;
1168 : }
1169 : bottom->secret->state = _PR_FILEDESC_CLOSED;
1170 : #if 0
1171 : fprintf(stderr, "cancel wait group: closing socket\n");
1172 : #endif
1173 : if (closesocket(bottom->secret->md.osfd) == SOCKET_ERROR)
1174 : {
1175 : fprintf(stderr, "closesocket failed: %d\n",
1176 : WSAGetLastError());
1177 : exit(1);
1178 : }
1179 : }
1180 : }
1181 : }
1182 : while (group->waiter->count > 0)
1183 : {
1184 : _PR_THREAD_LOCK(me);
1185 : me->state = _PR_IO_WAIT;
1186 : PR_APPEND_LINK(&me->waitQLinks, &group->wait_list);
1187 : if (!_PR_IS_NATIVE_THREAD(me))
1188 : {
1189 : _PR_SLEEPQ_LOCK(me->cpu);
1190 : _PR_ADD_SLEEPQ(me, PR_INTERVAL_NO_TIMEOUT);
1191 : _PR_SLEEPQ_UNLOCK(me->cpu);
1192 : }
1193 : _PR_THREAD_UNLOCK(me);
1194 : _PR_MD_UNLOCK(&group->mdlock);
1195 : PR_Unlock(group->ml);
1196 : _PR_MD_WAIT(me, PR_INTERVAL_NO_TIMEOUT);
1197 : me->state = _PR_RUNNING;
1198 : PR_Lock(group->ml);
1199 : _PR_MD_LOCK(&group->mdlock);
1200 : }
1201 : #else
1202 0 : for (desc = &group->waiter->recv_wait; group->waiter->count > 0; ++desc)
1203 : {
1204 0 : PR_ASSERT(desc < &group->waiter->recv_wait + group->waiter->length);
1205 0 : if (NULL != *desc)
1206 0 : _MW_DoneInternal(group, desc, PR_MW_INTERRUPT);
1207 : }
1208 : #endif
1209 :
1210 : /* take first element of finished list and return it or NULL */
1211 0 : if (PR_CLIST_IS_EMPTY(&group->io_ready))
1212 0 : PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1213 : else
1214 : {
1215 0 : PRCList *head = PR_LIST_HEAD(&group->io_ready);
1216 0 : PR_REMOVE_AND_INIT_LINK(head);
1217 : #ifdef WINNT
1218 : overlapped = (_MDOverlapped *)
1219 : ((char *)head - offsetof(_MDOverlapped, data));
1220 : head = &overlapped->data.mw.desc->internal;
1221 : if (NULL != overlapped->data.mw.timer)
1222 : {
1223 : PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1224 : != overlapped->data.mw.desc->timeout);
1225 : CancelTimer(overlapped->data.mw.timer);
1226 : }
1227 : else
1228 : {
1229 : PR_ASSERT(PR_INTERVAL_NO_TIMEOUT
1230 : == overlapped->data.mw.desc->timeout);
1231 : }
1232 : PR_DELETE(overlapped);
1233 : #endif
1234 0 : recv_wait = (PRRecvWait*)head;
1235 : }
1236 : #ifdef WINNT
1237 : invalid_arg:
1238 : _PR_MD_UNLOCK(&group->mdlock);
1239 : #endif
1240 0 : PR_Unlock(group->ml);
1241 :
1242 0 : return recv_wait;
1243 : } /* PR_CancelWaitGroup */
1244 :
1245 : PR_IMPLEMENT(PRWaitGroup*) PR_CreateWaitGroup(PRInt32 size /* ignored */)
1246 : {
1247 : PRWaitGroup *wg;
1248 :
1249 0 : if (NULL == (wg = PR_NEWZAP(PRWaitGroup)))
1250 : {
1251 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1252 0 : goto failed;
1253 : }
1254 : /* the wait group itself */
1255 0 : wg->ml = PR_NewLock();
1256 0 : if (NULL == wg->ml) goto failed_lock;
1257 0 : wg->io_taken = PR_NewCondVar(wg->ml);
1258 0 : if (NULL == wg->io_taken) goto failed_cvar0;
1259 0 : wg->io_complete = PR_NewCondVar(wg->ml);
1260 0 : if (NULL == wg->io_complete) goto failed_cvar1;
1261 0 : wg->new_business = PR_NewCondVar(wg->ml);
1262 0 : if (NULL == wg->new_business) goto failed_cvar2;
1263 0 : wg->mw_manage = PR_NewCondVar(wg->ml);
1264 0 : if (NULL == wg->mw_manage) goto failed_cvar3;
1265 :
1266 0 : PR_INIT_CLIST(&wg->group_link);
1267 0 : PR_INIT_CLIST(&wg->io_ready);
1268 :
1269 : /* the waiters sequence */
1270 0 : wg->waiter = (_PRWaiterHash*)PR_CALLOC(
1271 : sizeof(_PRWaiterHash) +
1272 : (_PR_DEFAULT_HASH_LENGTH * sizeof(PRRecvWait*)));
1273 0 : if (NULL == wg->waiter)
1274 : {
1275 0 : PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1276 0 : goto failed_waiter;
1277 : }
1278 0 : wg->waiter->count = 0;
1279 0 : wg->waiter->length = _PR_DEFAULT_HASH_LENGTH;
1280 :
1281 : #ifdef WINNT
1282 : _PR_MD_NEW_LOCK(&wg->mdlock);
1283 : PR_INIT_CLIST(&wg->wait_list);
1284 : #endif /* WINNT */
1285 :
1286 0 : PR_Lock(mw_lock);
1287 0 : PR_APPEND_LINK(&wg->group_link, &mw_state->group_list);
1288 0 : PR_Unlock(mw_lock);
1289 0 : return wg;
1290 :
1291 : failed_waiter:
1292 0 : PR_DestroyCondVar(wg->mw_manage);
1293 : failed_cvar3:
1294 0 : PR_DestroyCondVar(wg->new_business);
1295 : failed_cvar2:
1296 0 : PR_DestroyCondVar(wg->io_complete);
1297 : failed_cvar1:
1298 0 : PR_DestroyCondVar(wg->io_taken);
1299 : failed_cvar0:
1300 0 : PR_DestroyLock(wg->ml);
1301 : failed_lock:
1302 0 : PR_DELETE(wg);
1303 0 : wg = NULL;
1304 :
1305 : failed:
1306 0 : return wg;
1307 : } /* MW_CreateWaitGroup */
1308 :
1309 : PR_IMPLEMENT(PRStatus) PR_DestroyWaitGroup(PRWaitGroup *group)
1310 : {
1311 0 : PRStatus rv = PR_SUCCESS;
1312 0 : if (NULL == group) group = mw_state->group;
1313 0 : PR_ASSERT(NULL != group);
1314 0 : if (NULL != group)
1315 : {
1316 0 : PR_Lock(group->ml);
1317 0 : if ((group->waiting_threads == 0)
1318 0 : && (group->waiter->count == 0)
1319 0 : && PR_CLIST_IS_EMPTY(&group->io_ready))
1320 : {
1321 0 : group->state = _prmw_stopped;
1322 : }
1323 : else
1324 : {
1325 0 : PR_SetError(PR_INVALID_STATE_ERROR, 0);
1326 0 : rv = PR_FAILURE;
1327 : }
1328 0 : PR_Unlock(group->ml);
1329 0 : if (PR_FAILURE == rv) return rv;
1330 :
1331 0 : PR_Lock(mw_lock);
1332 0 : PR_REMOVE_LINK(&group->group_link);
1333 0 : PR_Unlock(mw_lock);
1334 :
1335 : #ifdef WINNT
1336 : /*
1337 : * XXX make sure wait_list is empty and waiter is empty.
1338 : * These must be checked while holding mdlock.
1339 : */
1340 : _PR_MD_FREE_LOCK(&group->mdlock);
1341 : #endif
1342 :
1343 0 : PR_DELETE(group->waiter);
1344 0 : PR_DELETE(group->polling_list);
1345 0 : PR_DestroyCondVar(group->mw_manage);
1346 0 : PR_DestroyCondVar(group->new_business);
1347 0 : PR_DestroyCondVar(group->io_complete);
1348 0 : PR_DestroyCondVar(group->io_taken);
1349 0 : PR_DestroyLock(group->ml);
1350 0 : if (group == mw_state->group) mw_state->group = NULL;
1351 0 : PR_DELETE(group);
1352 : }
1353 : else
1354 : {
1355 : /* The default wait group is not created yet. */
1356 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1357 0 : rv = PR_FAILURE;
1358 : }
1359 0 : return rv;
1360 : } /* PR_DestroyWaitGroup */
1361 :
1362 : /**********************************************************************
1363 : ***********************************************************************
1364 : ******************** Wait group enumerations **************************
1365 : ***********************************************************************
1366 : **********************************************************************/
1367 :
1368 : PR_IMPLEMENT(PRMWaitEnumerator*) PR_CreateMWaitEnumerator(PRWaitGroup *group)
1369 : {
1370 0 : PRMWaitEnumerator *enumerator = PR_NEWZAP(PRMWaitEnumerator);
1371 0 : if (NULL == enumerator) PR_SetError(PR_OUT_OF_MEMORY_ERROR, 0);
1372 : else
1373 : {
1374 0 : enumerator->group = group;
1375 0 : enumerator->seal = _PR_ENUM_SEALED;
1376 : }
1377 0 : return enumerator;
1378 : } /* PR_CreateMWaitEnumerator */
1379 :
1380 : PR_IMPLEMENT(PRStatus) PR_DestroyMWaitEnumerator(PRMWaitEnumerator* enumerator)
1381 : {
1382 0 : PR_ASSERT(NULL != enumerator);
1383 0 : PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1384 0 : if ((NULL == enumerator) || (_PR_ENUM_SEALED != enumerator->seal))
1385 : {
1386 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1387 0 : return PR_FAILURE;
1388 : }
1389 0 : enumerator->seal = _PR_ENUM_UNSEALED;
1390 0 : PR_Free(enumerator);
1391 0 : return PR_SUCCESS;
1392 : } /* PR_DestroyMWaitEnumerator */
1393 :
1394 : PR_IMPLEMENT(PRRecvWait*) PR_EnumerateWaitGroup(
1395 : PRMWaitEnumerator *enumerator, const PRRecvWait *previous)
1396 : {
1397 0 : PRRecvWait *result = NULL;
1398 :
1399 : /* entry point sanity checking */
1400 0 : PR_ASSERT(NULL != enumerator);
1401 0 : PR_ASSERT(_PR_ENUM_SEALED == enumerator->seal);
1402 0 : if ((NULL == enumerator)
1403 0 : || (_PR_ENUM_SEALED != enumerator->seal)) goto bad_argument;
1404 :
1405 : /* beginning of enumeration */
1406 0 : if (NULL == previous)
1407 : {
1408 0 : if (NULL == enumerator->group)
1409 : {
1410 0 : enumerator->group = mw_state->group;
1411 0 : if (NULL == enumerator->group)
1412 : {
1413 0 : PR_SetError(PR_GROUP_EMPTY_ERROR, 0);
1414 0 : return NULL;
1415 : }
1416 : }
1417 0 : enumerator->waiter = &enumerator->group->waiter->recv_wait;
1418 0 : enumerator->p_timestamp = enumerator->group->p_timestamp;
1419 0 : enumerator->thread = PR_GetCurrentThread();
1420 0 : enumerator->index = 0;
1421 : }
1422 : /* continuing an enumeration */
1423 : else
1424 : {
1425 0 : PRThread *me = PR_GetCurrentThread();
1426 0 : PR_ASSERT(me == enumerator->thread);
1427 0 : if (me != enumerator->thread) goto bad_argument;
1428 :
1429 : /* need to restart the enumeration */
1430 0 : if (enumerator->p_timestamp != enumerator->group->p_timestamp)
1431 0 : return PR_EnumerateWaitGroup(enumerator, NULL);
1432 : }
1433 :
1434 : /* actually progress the enumeration */
1435 : #if defined(WINNT)
1436 : _PR_MD_LOCK(&enumerator->group->mdlock);
1437 : #else
1438 0 : PR_Lock(enumerator->group->ml);
1439 : #endif
1440 0 : while (enumerator->index++ < enumerator->group->waiter->length)
1441 : {
1442 0 : if (NULL != (result = *(enumerator->waiter)++)) break;
1443 : }
1444 : #if defined(WINNT)
1445 : _PR_MD_UNLOCK(&enumerator->group->mdlock);
1446 : #else
1447 0 : PR_Unlock(enumerator->group->ml);
1448 : #endif
1449 :
1450 0 : return result; /* what we live for */
1451 :
1452 : bad_argument:
1453 0 : PR_SetError(PR_INVALID_ARGUMENT_ERROR, 0);
1454 0 : return NULL; /* probably ambiguous */
1455 : } /* PR_EnumerateWaitGroup */
1456 :
1457 : /* prmwait.c */
|