21 errno_abort ('Allocate stage');
22 status = pthread_mutex_init (&new_stage->mutex, NULL);
23 if (status != 0)
24 err_abort (status, 'Init stage mutex');
25 status = pthread_cond_init (&new_stage->avail, NULL);
26 if (status != 0)
27 err_abort (status, 'Init avail condition');
28 status = pthread_cond_init (&new_stage->ready, NULL);
29 if (status != 0)
30 err_abort (status, 'Init ready condition');
31 new_stage->data_ready = 0;
32 *link = new_stage;
33 link = &new_stage->next;
34 }
35
36 *link = (stage_t*)NULL; /* Terminate list */
37 pipe->tail = new_stage; /* Record the tail */ 38
39 /*
40 * Create the threads for the pipe stages only after all
41 * the data is initialized (including all links). Note
42 * that the last stage doesn't get a thread, it's just
43 * a receptacle for the final pipeline value.
44 *
45 * At this point, proper cleanup on an error would take up
46 * more space than worthwhile in a 'simple example,' so
47 * instead of cancelling and detaching all the threads
48 * already created, plus the synchronization object and
49 * memory cleanup done for earlier errors, it will simply
50 * abort.
51 */
52 for ( stage = pipe->head;
53 stage->next != NULL;
54 stage = stage->next) {
55 status = pthread_create (
56 &stage->thread, NULL, pipe_stage, (void*)stage);
57 if (status != 0)
58 err_abort (status, 'Create pipe stage');
59 }
60 return 0;
61 }
Part 5 shows pipe_start
and pipe_result
. The pipe_start function pushes an item of data into the beginning of the pipeline and then returns immediately without waiting for a result. The pipe_result function allows the caller to wait for the final result, whenever the result might be needed.
19-22 The pipe_start function sends data to the first stage of the pipeline. The function increments a count of 'active' items in the pipeline, which allows pipe_ result to detect that there are no more active items to collect, and to return immediately instead of blocking. You would not always want a pipeline to behave this way — it makes sense for this example because a single thread alternately 'feeds' and 'reads' the pipeline, and the application would hang forever if the user inadvertently reads one more item than had been fed.
23-47 The pipe_result function first checks whether there is an active item in the pipeline. If not, it returns with a status of 0, after unlocking the pipeline mutex.
48-55 If there is another item in the pipeline, pipe_result locks the tail (final) stage, and waits for it to receive data. It copies the data and then resets the stage so it can receive the next item of data. Remember that the final stage does not have a thread, and cannot reset itself.
¦ pipe.c part 5 pipe_start,pipe_result
1 /*
2 * External interface to start a pipeline by passing
3 * data to the first stage. The routine returns while
4 * the pipeline processes in parallel. Call the
5 * pipe_result return to collect the final stage values
6 * (note that the pipe will stall when each stage fills,
7 * until the result is collected).
8 */
9 int pipe_start (pipe_t *pipe, long value)
10 {
11 int status; 12
13 status = pthread_mutex_lock (&pipe->mutex);
14 if (status != 0)
15 err_abort (status, 'Lock pipe mutex');
16 pipe->active++;
17 status = pthread_mutex_unlock (&pipe->mutex);
18 if (status != 0)
19 err_abort (status, 'Unlock pipe mutex');
20 pipe_send (pipe->head, value);
21 return 0;
22 } 23
24 /*
25 * Collect the result of the pipeline. Wait for a
26 * result if the pipeline hasn't produced one.
27 */
28 int pipe_result (pipe_t *pipe, long *result)
29 {
30 stage_t *tail = pipe->tail;
31 long value;
32 int empty = 0;
33 int status;
34
35 status = pthread_mutex_lock (&pipe->mutex);
36 if (status != 0)
37 err_abort (status, 'Lock pipe mutex');
38 if (pipe->active <= 0)
39 empty = 1;
40 else
41 pipe->active--;
42
43 status = pthread_mutex_unlock (&pipe->mutex);
44 if (status != 0)
45 err_abort (status, 'Unlock pipe mutex');
46 if (empty)