16 */

17  while (stage->data_ready) {

18  status = pthread_cond_wait (&stage->ready, &stage- >mutex);

19  if (status != 0) {

20  pthread_mutex_unlock (&stage->mutex);

21  return status;

22  }

23  }

24

25 /*

26 * Send the new data

27 */

28 stage->data = data;

29 stage->data_ready = 1;

30 status = pthread_cond_signal (&stage->avail);

31 if (status != 0) {

32  pthread_mutex_unlock (&stage->mutex);

33  return status;

34 }

35 status = pthread_mutex_unlock (&stage->mutex);

36 return status;

37 }

Part 3 shows pipe_stage, the start function for each thread in the pipeline. The thread's argument is a pointer to its stage_t structure.

16-27 The thread loops forever, processing data. Because the mutex is locked outside the loop, the thread appears to have the pipeline stage's mutex locked all the

time. However, it spends most of its time waiting for new data, on the avail condition variable. Remember that a thread automatically unlocks the mutex associated with a condition variable, while waiting on that condition variable. In reality, therefore, the thread spends most of its time with mutex unlocked.

22-26 When given data, the thread increases its own data value by one, and passes the result to the next stage. The thread then records that the stage no longer has data by clearing the data_ready flag, and signals the ready condition variable to wake any thread that might be waiting for this pipeline stage.

¦ pipe.c part 3 pipe_stage

1 /*

2 * The thread start routine for pipe stage threads.

3 * Each will wait for a data item passed from the

4 * caller or the previous stage, modify the data

5 * and pass it along to the next (or final) stage.

6 */

7 void *pipe_stage (void *arg)

8 {

9  stage_t *stage = (stage_t*)arg;

10  stage_t *next_stage = stage->next;

11  int status;

12

13  status = pthread_mutex_lock (&stage->mutex);

14  if (status != 0)

15  err_abort (status, 'Lock pipe stage');

16  while (1) {

17  while (stage->data_ready != 1) {

18  status = pthread_cond_wait (&stage->avail, &stage- >mutex);

19  if (status != 0)

20  err_abort (status, 'Wait for previous stage');

21  }

22  pipe_send (next_stage, stage->data + 1);

23  stage->data_ready = 0;

24  status = pthread_cond_signal (&stage->ready);

25  if (status != 0)

26  err_abort (status, 'Wake next stage');

27  }

28 /*

29 * Notice that the routine never unlocks the stage->mutex.

30 * The call to pthread_cond_wait implicitly unlocks the

31 * mutex while the thread is waiting, allowing other threads

32 * to make progress. Because the loop never terminates, this

33 * function has no need to unlock the mutex explicitly.

34 */

Part 4 shows pipe_create, the function that creates a pipeline. It can create a pipeline of any number of stages, linking them together in a list.

18-34 For each stage, it allocates a new stage_t structure and initializes the members. Notice that one additional 'stage' is allocated and initialized to hold the final result of the pipeline.

36-37 The link member of the final stage is set to NULL to terminate the list, and the pipeline's tail is set to point at the final stage. The tail pointer allows pipe_ result to easily find the final product of the pipeline, which is stored into the final stage.

52-59 After all the stage data is initialized, pipe_create creates a thread for each stage. The extra 'final stage' does not get a thread—the termination condition of the for loop is that the current stage's next link is not NULL, which means that it will not process the final stage.

¦ pipe.c part 4 pipe_create

1 /*

2 * External interface to create a pipeline. All the

3 * data is initialized and the threads created. They'll

4 * wait for data.

5 */

6 int pipe_create (pipe_t *pipe, int stages)

7 {

8  int pipe_index;

9  stage_t **link = &pipe->head, *new_stage, *stage; 10 int status;

11

12  status = pthread_mutex_init(&pipe->mutex, NULL);

13  if (status != 0)

14  err_abort (status, 'Init pipe mutex');

15  pipe->stages = stages;

16  pipe->active = 0;

17

18  for (pipe_index = 0; pipe_index <= stages; pipe_index++) {

19  new_stage = (stage_t*)malloc (sizeof (stage_t));

20  if (new_stage == NULL)

Добавить отзыв
ВСЕ ОТЗЫВЫ О КНИГЕ В ИЗБРАННОЕ

0

Вы можете отметить интересные вам фрагменты текста, которые будут доступны по уникальной ссылке в адресной строке браузера.

Отметить Добавить цитату