aSyncObj : TtdProduceManyConsumeSync;

aBuffers : TQueuedBuffers;

alD : integer);

end;

constructor TConsumer.Create(aStream : TStream;

aSyncObj : TtdProduceManyConsumeSync;

aBuffers : TQueuedBuffers;

alD : integer);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

FID := alD;

end;

procedure TConsumer.Execute;

var

Head : PBuffer;

begin

{передать сигнал о готовности к началу потребления данных}

FSyncObj.StartConsuming(FID);

{выполнить считывание начального буфера очереди}

Head := FBuffers.Head[FID];

{до тех пор, пока начальный буфер не пуст...}

while (Head^.bCount <> 0) do

begin

{выполнить запись блока из начального буфера очереди в поток}

FStream.Write(Head^.bBlock, Head^.bCount);

{переместить указатель начала очереди}

FBuffers.AdvanceHead(FID);

{обработка этого буфера завершена}

FSyncObj.StopConsuming(FID);

{передать сигнал о повторной готовности к началу потребления данных}

FSyncObj.StartConsuming(FID);

{выполнить считывание начального буфера очереди}

Head := FBuffers.Head[FID];

end;

{обработка последнего буфера завершена}

FSyncObj.StopConsuming(FID);

end;

И, наконец, рассмотрим подпрограмму копирования потоков, код которой показан в листинге 12.21.

Листинг 12.21. Копирование потоков с применением модели 'производитель-потребитель'

procedure ThreadedMultiCopyStream(aSrcStream : TStream;

aDestCount : integer;

aDestStreams : PStreamArray);

var

i : integer;

SyncObj : TtdProduceManyConsumeSync;

Buffers : TQueuedBuffers;

Producer : TProducer;

Consumer : array [0..pred(MaxConsumers) ] of TConsumer;

WaitArray : array [0..MaxConsumers] of THandle;

begin

SyncObj nil;

Buffers nil;

Producer :=nil;

for i := 0 to pred(MaxConsumers) do

Consumer[i] := nil;

for i := 0 to MaxConsumers do

WaitArray[i] := 0;

try

{создать объект синхронизации}

SyncObj : * TtdProduceManyConsumeSync.Create(20, aDestCount);

{создать объект буфера с очередью}

Buffers := TQueuedBuffers.Create(20, aDestCount);

{создать поток производителя и сохранить его дескриптор}

Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);

WaitArray[0] := Producer.Handle;

{создать потоки потребителей и сохранить их дескрипторы}

for i := 0 to pred(aDestCount) do

begin

Consumer [ i ] := TConsumer.Create(

aDestStreams^[i], SyncObj, Buffers, i);

WaitArray[i+1] := Consumer[i].Handle;

end;

{запустить потоки}

for i := 0 to pred(aDestCount) do

Consumer[i].Resume;

Producer.Resume;

{ожидать завершения потоков}

WaitForMultipleObjects(l+aDestCount, @WaitArray, true, INFINITE);

finally Producer.Free;

for i := 0 to pred(aDestCount) do

Consumer[i].Free;

Buffers.Free;

SyncObj.Free;

end;

end;

Большая часть кода предназначена для выполнения тех же рутинных задач, что и в модели с одним потребителем, представленной в листинге 12.14, за исключением того, что на этот раз необходимо заботиться о нескольких потребителях. Полный код подпрограммы находится в файлах TstNCpy.dpr и TstNCpyu.pas на Web-сайте издательства, в разделе материалов.

Добавить отзыв
ВСЕ ОТЗЫВЫ О КНИГЕ В ИЗБРАННОЕ

0

Вы можете отметить интересные вам фрагменты текста, которые будут доступны по уникальной ссылке в адресной строке браузера.

Отметить Добавить цитату