115 }
116
117 pthread_mutex_unlock (&wq->mutex);
118 DPRINTF (('Worker exiting
'));
119 return NULL;
120 }
Finally, workq_main.c is a sample program that uses our work queue manager. Two threads queue work elements to the work queue in parallel. The engine function is designed to gather some statistics about engine usage. To accomplish this, it uses thread-specific data. When the sample run completes, main collects all of the thread-specific data and reports some statistics.
15-19 Each engine thread has an engine_t structure associated with the thread-specific data key engine_key. The engine function gets the calling thread's value of this key, and if the current value is NULL, creates a new engine_t structure and assigns it to the key. The calls member of engine_t structure records the number of calls to the engine function within each thread.
29-37 The thread-specific data key's destructor function, destructor, adds the terminating thread's engine_t to a list (engine_list_head), where main can find it later to generate the final report.
43-68 The engine function's work is relatively boring. The argument is a pointer to a power_t structure, containing the members value and power. It uses a trivial loop to multiply value by itself power times. The result is discarded in this example, and the power_t structure is freed.
73-98 A thread is started, by main, running the thread_routine function. In addition, main calls thread_routine. The thread_routine function loops for some number of iterations, determined by the macro ITERATIONS, creating and queuing work queue elements. The value and power members of the power_t structure are determined semirandomly using rand_r. The function sleeps for a random period of time, from zero to four seconds, to occasionally allow engine threads to time out and terminate. Typically when you run this program you would expect to see summary messages reporting some small number of engine threads, each of which processed some number of calls — which total 50 calls (25 each from the two threads).
¦ workq_main.c
1 #include <pthread.h>
2 #include <stdlib.h>
3 #include <stdio.h>
4 #include <time.h>
5 #include 'workq.h'
6 #include 'errors.h'
7
8 #define ITERATIONS 25
9
10 typedef struct power_tag {
11 int value;
12 int power;
13 } power_t; 14
15 typedef struct engine_tag {
16 struct engine_tag *link;
17 pthread_t thread_id;
18 int calls;
19 } engine_t; 20
21 pthread_key_t engine_key; /* Keep track of active engines */
22 pthread_mutex_t engine_list_mutex = PTHREAD_MUTEX_INITIALIZER;
23 engine_t *engine_list_head = NULL;
24 workq_t workq; 25
26 /*
27 * Thread-specific data destructor routine for engine_key.
28 */
29 void destructor (void *value_ptr)
30 {
31 engine_t *engine = (engine_t*)value_ptr; 32
33 pthread_mutex_lock (&engine_list_mutex);
34 engine->link = engine_list_head;
35 engine_list_head = engine;
36 pthread_mutex_unlock (&engine_list_mutex);
37 } 38
39 /*
40 * This is the routine called by the work queue servers to
41 * perform operations in parallel.
42 */
43 void engine_routine (void *arg)
44 {
45 engine_t *engine;
46 power_t *power = (power_t*)arg;
47 int result, count; 4 8 int status;
49
50 engine = pthread_getspecific (engine_key);
51 if (engine == NULL) {
52 engine = (engine_t*)malloc (sizeof (engine_t));
53 status = pthread_setspecific (
54 engine_key, (void*)engine);
55 if (status != 0)
56 err_abort (status, 'Set tsd');
57 engine->thread_id = pthread_self ();
58 engine->calls = 1;
59 } else
60 engine->calls++;
61 result = 1;
62 printf (
63 'Engine: computing %d^%d
',
64 power->value, power->power);
65 for (count = 1; count <= power->power; count++)
66 result *= power->value;
67 free (arg);
68 }
69
70 /*
71 * Thread start routine that issues work queue requests.
72 */
73 void *thread_routine (void *arg)
74 {
75 power_t *element;
76 int count;
77 unsigned int seed = (unsigned int)time (NULL);
78 int status;