95 #ifdef sun
96 /*
97 * On Solaris 2.5, threads are not timesliced. To ensure
98 * that our threads can run concurrently, we need to
99 * increase the concurrency level to THREADS.
100 */
101 DPRINTF (('Setting concurrency level to %d
', THREADS));
102 thr_setconcurrency (THREADS);
103 #endif
104
105 /*
106 * Initialize the shared data.
107 */
108 for (data_count = 0; data_count < DATASIZE; data_count++) {
109 data[data_count].data = 0;
110 data[data_count].updates = 0;
111 status = rwl_init (&data[data_count].lock);
112 if (status != 0)
113 err_abort (status, 'Init rw lock');
114 }
115
116 /*
117 * Create THREADS threads to access shared data.
118 */
119 for (count = 0; count < THREADS; count++) {
120 threads[count].thread_num = count;
121 threads[count].updates = 0;
122 threads[count].reads = 0;
123 threads[count].interval = rand_r (&seed) % 71;
124 status = pthread_create (&threads[count].thread_id,
125 NULL, thread_routine, (void*)&threads[count]);
126 if (status != 0)
127 err_abort (status, 'Create thread');
128 }
129
130 /*
131 * Wait for all threads to complete, and collect
132 * statistics.
133 */
134 for (count = 0; count < THREADS; count++) {
135 status = pthread_join (threads[count].thread_id, NULL);
136 if (status != 0)
137 err_abort (status, 'Join thread');
138 thread_updates += threads[count].updates;
139 printf ('%02d: interval %d, updates %d, reads %d
',
140 count, threads[count].interval,
141 threads[count].updates, threads[count].reads);
142 }
143
144 /*
145 * Collect statistics for the data.
146 */
147 for (data_count = 0; data_count < DATASIZE; data_count++) {
148 data_updates += data[data_count].updates;
149 printf ('data %02d: value %d, %d updates
',
150 data_count, data[data_count].data,
151 data[data_count].updates);
152 rwl_destroy (&data[data_count].lock);
153 }
154
155 printf ('%d thread updates, %d data updates
',
156 thread_updates, data_updates);
157 return 0;
7.2 Work queue manager
I've already briefly outlined the various models of thread cooperation. These include pipelines, work crews, client/servers, and so forth. In this section, I present the development of a 'work queue,' a set of threads that accepts work requests from a common queue, processing them (potentially) in parallel.
The work
When you create the work queue, you can specify the maximum level of parallelism that you need. The work queue manager interprets that as the maximum number of 'engine' threads that it may create to process your requests. Threads will be started and stopped as required by the amount of work. A thread that finds nothing to do will wait a short time and then terminate. The optimal 'short time' depends on how expensive it is to create a new thread on your system, the cost in system resources to keep a thread going that's not doing anything, and how likely it is that you'll need the thread again soon. I've chosen two seconds, which is probably much too long.
The header file workq.h and the C source file workq.c demonstrate an implementation of a work queue manager. Part 1 shows the two structure types used by the work queue package. The workq_t
type is the external representation of a work queue, and the workq_ele_t
is an internal representation of work items that have been queued.
6-9 The workq_ele_t
structure is used to maintain a linked list of work items. It has a link element (called next) and a data value, which is stored when the work item is queued and passed to the caller's 'engine function' with no interpretation.
14-16 Of course, there's a mutex to serialize access to the workq_t
, and a condition variable (cv
) on which the engine threads wait for work to be queued.
17 The attr
member is a thread attributes object, used when creating new engine threads. The attributes object could instead have been a static variable within workq.c, but I chose to add a little memory overhead to each work queue, rather than add the minor complexity of one-time initialization of a static data item.
18 The first member points to the first item on the work queue. As an optimization to make it easier to queue new items at the end of the queue, the last member points to the last item on the queue.
19-24 These members record assorted information about the work queue. The valid