37 wq->counter = 0; /* no server threads yet */
38 wq->idle = 0; /* no idle servers */
39 wq->engine = engine;
40 wq->valid = WORKQ_VALID;
41 return 0;
42 }
¦ workq.c part 1 workq_init
Part 2 shows the workq_destroy
function. The procedure for shutting down a work queue is a little different than the others we've seen. Remember that the Pthreads mutex and condition variable destroy function fail, returning EBUSY, when you try to destroy an object that is in use. We used the same model for barriers and read/write locks. But we cannot do the same for work queues — the calling program cannot know whether the work queue is in use, because the caller only queues requests that are processed asynchronously.
The work queue manager will accept a request to shut down at any time, but it will wait for all existing engine threads to complete their work and terminate. Only when the last work queue element has been processed and the last engine thread has exited will workq_destroy
return successfully.
24 If the work queue has no threads, either it was never used or all threads have timed out and shut down since it was last used. That makes things easy, and we can skip all the shutdown complication.
25-33 If there are engine threads, they are asked to shut down by setting the quit flag in the workq_t
structure and broadcasting the condition variable to awaken any waiting (idle) engine threads. Each engine thread will eventually run and see this flag. When they see it and find no more work, they'll shut themselves down.
44-50 The last thread to shut down will wake up the thread that's waiting in workq_destroy
, and the shutdown will complete. Instead of creating a condition variable that's used only to wake up workq_destroy, the last thread will signal the same condition variable used to inform idle engine threads of new work. At this point, all waiters have already been awakened by a broadcast, and they won't wait again because the quit flag is set. Shutdown occurs only once during the life of the work queue manager, so there's little point to creating a separate condition variable for this purpose.
¦ workq.c part 2 workq_destroy
1 /*
2 * Destroy a work queue.
3 */
4 int workq_destroy (workq_t *wq)
5 {
6 int status, status1, status2;
7
8 if (wq->valid != WORKQ_VALID)
9 return EINVAL;
10 status = pthread_mutex_lock (&wq->mutex);
11 if (status != 0)
12 return status;
13 wq->valid = 0; /* prevent any other operations */
14
15 /*
16 * Check whether any threads are active, and run them down:
17 *
18 * 1. set the quit flag
19 * 2. broadcast to wake any servers that may be asleep
20 * 4. wait for all threads to quit (counter goes to 0)
21 * Because we don't use join, we don't need to worry
22 * about tracking thread IDs.
23 */
24 if (wq->counter > 0) {
25 wq->quit = 1;
26 /* if any threads are idling, wake them. */
27 if (wq->idle > 0) {
28 status = pthread_cond_broadcast (&wq->cv);
29 if (status != 0) {
30 pthread_mutex_unlock (&wq->mutex);
31 return status;
32 }
33 }
34
35 /*
36 * Just to prove that every rule has an exception, I'm
37 * using the 'cv' condition for two separate predicates
38 * here. That's OK, since the case used here applies
39 * only once during the life of a work queue — during
40 * rundown. The overhead is minimal and it's not worth
41 * creating a separate condition variable that would
42 * wait and be signaled exactly once!
43 */
44 while (wq->counter > 0) {
45 status = pthread_cond_wait (&wq->cv, &wq->mutex);
46 if (status != 0) {
47 pthread_mutex_unlock (&wq->mutex);
48 r eturn status;
49 }
50 }
51 }
52 status = pthread_mutex_unlock (&wq->mutex);
53 if (status != 0)
54 return status;
55 status = pthread_mutex_destroy (&wq->mutex);
56 status1 = pthread_cond_destroy (&wq->cv);
57 status2 = pthread_attr_destroy (&wq->attr);
58 return (status ? status : (status1 ? status1 : status2));
59 }
Part 3 shows workq_add
, which accepts work for the queue manager system.
16-35 It allocates a new work queue element and initializes it from the parameters. It
queues the element, updating the first and last pointers as necessary.
40-45 If there are idle engine threads, which were created but ran out of work, signal
the condition variable to wake one.
46-59 If there are no idle engine threads, and the value of parallelism allows for
more, create a new engine thread. If there are no idle threads and it can't create a
new engine thread, workq_add
returns, leaving the new element for the next
thread that finishes its current assignment.
¦ workq.c part 3 workq_add
1 /*
2 * Add an item to a work queue.