Work crew | Each thread performs an operation on its own data. Threads in a work crew may all perform the same operation, or each a separate operation, but they always proceed independently. |
Client/server | A client 'contracts' with an independent server for each job. Often the 'contract' is anonymous —a request is made through some interface that queues the work item. |
TABLE 4.1
All of these models can be combined in arbitrary ways and modified beyond all recognition to fit individual situations. A step in a pipeline could involve requesting a service from a server thread, and the server might use a work crew, and one or more workers in the crew might use a pipeline. Or a parallel search 'engine' might initiate several threads, each trying a different search algorithm.
4.1 Pipeline
In pipelining, a stream of 'data items' is processed serially by an ordered set of threads (Figure 4.1). Each thread performs a specific operation on each item in sequence, passing the data on to the next thread in the pipeline.
For example, the data might be a scanned image, and thread A might process an image array, thread B might search the processed data for a specific set of fea-tures, and thread C might collect the serial stream of search results from thread B into a report. Or each thread might perform a single step in some sequence of modifications on the data.
The following program, called pipe.c, shows the pieces of a simple pipeline program. Each thread in the pipeline increments its input value by 1 and passes it to the next thread. The main program reads a series of 'command lines' from stdin. A command line is either a number, which is fed into the beginning of the pipeline, or the character '=,' which causes the program to read the next result from the end of the pipeline and print it to stdout.
FIGURE4.1
1 #include <pthread.h>
2 #include 'errors.h' 3
4 /*
5 * Internal structure describing a 'stage' in the
6 * pipeline. One for each thread, plus a 'result
7 * stage' where the final thread can stash the value.
8 */
9 typedef struct stage_tag {
10 pthread_mutex_t mutex; /* Protect data */
11 pthread_cond_t avail; /* Data available */
12 pthread_cond_t ready; /* Ready for data */
13 int data_ready; /* Data present */
14 long data; /* Data to process */
15 pthread_t thread; /* Thread for stage */
16 struct stage_tag *next; /* Next stage */
17 } stage_t; 18
19 /*
20 * External structure representing the entire
21 * pipeline.
22 */
23 typedef struct pipe_tag {
24 pthread_mutex_t mutex; /* Mutex to protect pipe */
25 stage_t *head; /* First stage */
26 stage_t *tail; /* Final stage */
27 int stages; /* Number of stages */
28 int active; /* Active data elements */
29 } pipe_t;
9-17 Each stage of a pipeline is represented by a variable of type stage_t. stage_t contains a mutex to synchronize access to the stage. The avail condition variable is used to signal a stage that data is ready for it to process, and each stage signals its own ready condition variable when it is ready for new data. The data member is the data passed from the previous stage, thread is the thread operating this stage, and next is a pointer to the following stage.
23-29 The pipe_t structure describes a pipeline. It provides pointers to the first and last stage of a pipeline. The first stage, head, represents the first thread in the pipeline. The last stage, tail, is a special stage_t that has no thread—it is a place to store the final result of the pipeline.
Part 2 shows pipe_send, a utility function used to start data along a pipeline, and also called by each stage to pass data to the next stage.
17-23 It begins by waiting on the specified pipeline stage's ready condition variable until it can accept new data.
28-30 Store the new data value, and then tell the stage that data is available.
¦ pipe.c part 2 pipe_send
1 /*
2 * Internal function to send a 'message' to the
3 * specified pipe stage. Threads use this to pass
4 * along the modified data item.
5 */
6 int pipe_send (stage_t *stage, long data)
7 {
8 int status; 9
10 status = pthread_mutex_lock (&stage->mutex);
11 if (status != 0)
12 return status;
13 /*
14 * If there's data in the pipe stage, wait for it
15 * to be consumed.