aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers;
alD : integer);
end;
constructor TConsumer.Create(aStream : TStream;
aSyncObj : TtdProduceManyConsumeSync;
aBuffers : TQueuedBuffers;
alD : integer);
begin
inherited Create (true);
FStream := aStream;
FSyncObj := aSyncObj;
FBuffers := aBuffers;
FID := alD;
end;
procedure TConsumer.Execute;
var
Head : PBuffer;
begin
{передать сигнал о готовности к началу потребления данных}
FSyncObj.StartConsuming(FID);
{выполнить считывание начального буфера очереди}
Head := FBuffers.Head[FID];
{до тех пор, пока начальный буфер не пуст...}
while (Head^.bCount <> 0) do
begin
{выполнить запись блока из начального буфера очереди в поток}
FStream.Write(Head^.bBlock, Head^.bCount);
{переместить указатель начала очереди}
FBuffers.AdvanceHead(FID);
{обработка этого буфера завершена}
FSyncObj.StopConsuming(FID);
{передать сигнал о повторной готовности к началу потребления данных}
FSyncObj.StartConsuming(FID);
{выполнить считывание начального буфера очереди}
Head := FBuffers.Head[FID];
end;
{обработка последнего буфера завершена}
FSyncObj.StopConsuming(FID);
end;
И, наконец, рассмотрим подпрограмму копирования потоков, код которой показан в листинге 12.21.
Листинг 12.21. Копирование потоков с применением модели 'производитель-потребитель'
procedure ThreadedMultiCopyStream(aSrcStream : TStream;
aDestCount : integer;
aDestStreams : PStreamArray);
var
i : integer;
SyncObj : TtdProduceManyConsumeSync;
Buffers : TQueuedBuffers;
Producer : TProducer;
Consumer : array [0..pred(MaxConsumers) ] of TConsumer;
WaitArray : array [0..MaxConsumers] of THandle;
begin
SyncObj nil;
Buffers nil;
Producer :=nil;
for i := 0 to pred(MaxConsumers) do
Consumer[i] := nil;
for i := 0 to MaxConsumers do
WaitArray[i] := 0;
try
{создать объект синхронизации}
SyncObj : * TtdProduceManyConsumeSync.Create(20, aDestCount);
{создать объект буфера с очередью}
Buffers := TQueuedBuffers.Create(20, aDestCount);
{создать поток производителя и сохранить его дескриптор}
Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);
WaitArray[0] := Producer.Handle;
{создать потоки потребителей и сохранить их дескрипторы}
for i := 0 to pred(aDestCount) do
begin
Consumer [ i ] := TConsumer.Create(
aDestStreams^[i], SyncObj, Buffers, i);
WaitArray[i+1] := Consumer[i].Handle;
end;
{запустить потоки}
for i := 0 to pred(aDestCount) do
Consumer[i].Resume;
Producer.Resume;
{ожидать завершения потоков}
WaitForMultipleObjects(l+aDestCount, @WaitArray, true, INFINITE);
finally Producer.Free;
for i := 0 to pred(aDestCount) do
Consumer[i].Free;
Buffers.Free;
SyncObj.Free;
end;
end;
Большая часть кода предназначена для выполнения тех же рутинных задач, что и в модели с одним потребителем, представленной в листинге 12.14, за исключением того, что на этот раз необходимо заботиться о нескольких потребителях. Полный код подпрограммы находится в файлах TstNCpy.dpr и TstNCpyu.pas на Web-сайте издательства, в разделе материалов.