attr.context_alloc = alloc;
attr.block_func = block;
attr.handler_func = handler;
// фактическое создание пула потоков:
void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);
if (tpp == NULL) errx('create pool');
// начало функционирования пула потоков:
thread_pool_start(tpp);
// ... выполнение никогда не дойдет до этой точки!
exit(EXIT_SUCCESS);
}
В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:
• errx()
— реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;
• retrans()
— прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.
Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.
Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне:
• Первоначально (при запуске пула потоков в работу вызовом thread_pool_start()
) создается attr.lo_water
потоков («нижняя ватерлиния» числа блокированных потоков).
• При создании любого потока (как в процессе начального, так и в процессе последующего создания) вызывается функция attr.соntext_alloc()
(в контексте созданного потока).
• По завершении функция вызывает блокирующую функцию потока attr.block_func()
, на которой созданный поток ожидает события активизации (в показанном примере событие активизации — это установление соединения новым клиентом по возврату из accept()
).
• Блокирующая функция после наступления события активизации переведет поток в состояние READY и вызовет в контексте этого потока функцию обработчика attr.handler_func()
.
• Если после предыдущего шага число оставшихся заблокированных потоков станет ниже attr.lo_water
, механизм пула создаст дополнительно attr.increment
потоков и «доведет» их до блокирующей функции.
• Активизированный поток производит всю обработку, предписанную функцией потока, и после выполнения потоковой функции будет опять переведен в блокированное состояние в функции блокирования…
• …но перед переводом потока вновь в блокированное состояние проверяется, не будет ли при этом превышено число блокированных потоков attr.hi_water
(«верхняя ватерлиния»), и если это имеет место, то поток вместо перевода в блокированное состояние самоуничтожается.
• Все проверки числа потоков производятся для того, чтобы общее число потоков пула (т. e. число активизированных потоков вместе с блокированными) не превышало общее ограничение attr.maximum
.
Разобрав общую логику функционирования пула потоков, можно теперь детальнее рассмотреть отдельные шаги всего процесса:
1. Прежде чем создавать пул потоков, мы должны создать атрибутную запись, определяющую все поведение пула. Атрибутная запись описана так (<sys/dispatch.h>
):
typedef struct _thread_pool_attr {
THREAD_POOL_HANDLE_T* handle;
THREAD_POOL_PARAM_T*
(*block_func)(THREAD_POOL_PARAM_T* ctp);
void (*unblock_func)(THREAD_POOL_PARAM_T* ctp);
int (*handler_func)(THREAD_POOL_PARAM_T* ctp);
THREAD_POOL_PARAM_T*
(*context_alloc)(THREAD_POOL_HANDLE_T* handle);
void (*context_free)(THREAD_POOL_PARAM_T* ctp);
pthread_attr_t* attr;
unsigned short lo_water;
unsigned short increment;
unsigned short hi_water;
unsigned short maximum;
unsigned reserved[8];
} thread_pool_attr_t;
Дескриптор создаваемого пула потоков handle
, посредством которого мы будем ссылаться на пул, является просто синонимом типа dispatch_t
:
#ifndef THREAD_POOL_HANDLE_T
#define THREAD_POOL_HANDLE_T dispatch_t
#endif
Атрибуты потоков, которые будут работать в составе пула, определяются полем attr
типа pthread_attr_t
(эту структуру мы детально рассматривали ранее при обсуждении создания единичных потоков).
Численные параметры пула определяют:
lo_water
— «нижняя ватерлиния», минимальное число потоков пула, находящихся в блокированном состоянии (в ожидании активизации). Если в результате некоторого события один из ожидающих потоков переходит в состояние активной обработки и число оставшихся блокированных потоков становится меньше lo_water
, создается дополнительно increment потоков, которые переводятся в блокированное состояние.
hi_water
— максимальное число потоков, которые допустимо иметь в блокированном состоянии. Если после завершения обработки некоторым потоком число заблокированных потоков становится больше hi_water
, то этот поток уничтожается.
maximum
— общая верхняя граница числа потоков пула (активизированных и заблокированных). Даже если число заблокированных потоков (в пике активности) станет ниже lo_water
, но общее число потоков уже достигнет maximum, то новые потоки для пула создаваться не будут.
Функциональные параметры пула определяют:
context_alloc()
и context_free()
— функции создания и уничтожения контекста потока, которые вызываются при создании и уничтожении каждого потока пула. Функция создания контекста потока ответственна за индивидуальные настройки создаваемого потока. Она возвращает «указатель на контекст» типа THREAD_POOL_PARAM_T
. Однако системе такой тип неизвестен:
#ifndef THREAD_POOL_PARAM_T
#define THREAD_POOL_PARAM_T void
#endif
В качестве контекста может использоваться любой пользовательский тип, и он будет передаваться последовательно в качестве параметра (ctp
) во все последующие функции обслуживания потока.
block_func()
— функция блокирования, которая вызывается в потоке сразу же после