member is a magic number that's set when the work queue is initialized, as we've seen before in barriers and read/write locks. (In this case, the magic number is the month and year of my daughter's birthday.) The quit member is a flag that allows the 'work queue manager' to tell engine threads to terminate as soon as the queue is empty. The parallelism member records how many threads the creator chose to allow the work queue to utilize, counter records the number of threads created, and idle records the current number of threads that are waiting for work. The engine member is the user's 'engine function,' supplied when the work queue was created. As you can see, the engine function takes an 'untyped' (void *) argument, and has no return value.
¦ workq.h part 1 workq_t
1 #include <pthread.h>
2
3 /*
4 * Structure to keep track of work queue requests.
5 */
6 typedef struct workq_ele_tag {
7 struct workq_ele_tag *next;
8 void *data;
9 } workq_ele_t;
10
11 /*
12 * Structure describing a work queue.
13 */
14 typedef struct workq_tag {
15 pthread_mutex_t mutex; /* control access to queue */
16 pthread_cond_t cv; /* wait for work */
17 pthread_attr_t attr; /* create detached threads */
18 workq_ele_t *first, *last; /* work queue */
19 int valid; /* valid */
20 int quit; /* workq should quit */
21 int parallelism; /* maximum threads */
22 int counter; /* current threads */
23 int idle; /* number of idle threads * /
24 void (*engine)(void *arg); /* user engine */
25 } workq_t; 26
27 #define WORKQ_VALID 0xdec1992
Part 2 shows the interfaces we'll create for our work queue. We need to create and destroy work queue managers, so we'll define workq_init
and workq_destroy
. Both take a pointer to a workq_t
structure. In addition, the initializer needs the maximum number of threads the manager is allowed to create to service the queue, and the engine function. Finally, the program needs to be able to queue work items for processing — we'll call the interface for this workq_add
. It takes a pointer to the workq_t
and the argument that should be passed to the engine function.
¦ workq.h part 2 interfaces
1 /*
2 * Define work queue functions.
3 */
4 extern int workq_init (
5 workq_t *wq,
6 int threads, /* maximum threads */
7 void (*engine)(void *)); /* engine routine */
8 extern int workq_destroy (workq_t *wq);
9 extern int workq_add (workq_t *wq, void *data);
The file workq.c contains the implementation of our work queue. The following examples break down each of the functions used to implement the workq.h interfaces.
Part 1 shows the workq_init
function, which initializes a work queue. We create the Pthreads synchronization objects that we need, and fill in the remaining members.
14-22 Initialize the thread attributes object attr
so that the engine threads we create will run detached. That means we do not need to keep track of their thread identifier values, or worry about joining with them.
34-40 We're not ready to quit yet (we've hardly started!), so clear the quit flag. The parallelism member records the maximum number of threads we are allowed to create, which is the workq_init
parameter threads. The counter member will record the current number of active engine threads, initially 0, and idle will record the number of active threads waiting for more work. And of course, finally, we set the valid member.
¦ workq.c part 1 workq_init
1 #include <pthread.h>
2 #include <stdlib.h>
3 #include <time.h>
4 #include 'errors.h'
5 #include 'workq.h'
6
7 /*
8 * Initialize a work queue.
9 */
10 int workq_init (workq_t *wq, int threads, void (*engine)(void *arg))
11 {
12 int status;
13
14 status = pthread_attr_init (&wq->attr);
15 if (status != 0)
16 return status;
17 status = pthread_attr_setdetachstate (
18 &wq->attr, PTHREAD_CREATE_DETACHED);
19 if (status != 0) {
20 pthread_attr_destroy (&wq->attr);
21 return status;
22 }
23 status = pthread_mutex_init (&wq->mutex, NULL);
24 if (status != 0) {
25 pthread_attr_destroy (&wq->attr);
26 return status;
27 }
28 status = pthread_cond_init (&wq->cv, NULL);
29 if (status != 0) {
30 pthread_mutex_destroy (&wq->mutex);
31 pthread_attr_destroy (&wq->attr);
32 return status;
33 }
34 wq->quit = 0; /* not time to quit */
35 wq->first = wq->last = NULL; /* no queue entries */
36 wq->parallelism = threads; /* max servers */