FBuffers : TQueuedBuffers;

FStream : TStream;

FSyncObj : TtdProduceConsumeSync;

protected

procedure Execute; override;

public

constructor Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

end;

constructor TConsumer.Create(aStream : TStream;

aSyncObj : TtdProduceConsumeSync;

aBuffers : TQueuedBuffers);

begin

inherited Create (true);

FStream := aStream;

FSyncObj := aSyncObj;

FBuffers := aBuffers;

end;

procedure TConsumer.Execute;

var

Head : PBuffer;

begin

{сигнализировать о готовности к началу потребления данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

{до тех пор, пока начальный буфер не опустошен...}

while (Head^.bCount <> 0) do

begin

{выполнить запись блока из начального буфера в поток}

FStream.Write(Head^.bBlock, Head^.bCount);

{переместить указатель начала очереди}

FBuffers.AdvanceHead;

{поскольку было выполнено считывание и обработка буфера, необходимо сообщить о том, что данные были использованы}

FSyncObj.StopConsuming;

{сигнализировать о готовности снова приступить к потреблению данных}

FSyncObj.StartConsuming;

{извлечь начальный буфер}

Head := FBuffers.Head;

end;

end;

И, наконец, мы можем рассмотреть подпрограмму копирования потока, приведенную в листинге 12.14. Она принимает два параметра: входной поток и выходной поток. Подпрограмма создает специальный объект типа TQueuedBuffers. Этот объект содержит все ресурсы и методы, необходимые для реализации организованного в виде очереди набора буферов. Он создает также экземпляр класса TtdProducerConsumerSync, который будет действовать в качестве объекта синхронизации, обеспечивающего согласованную работу производителя и потребителя.

Листинг 12.14. Многопоточное копирование

procedure ThreadedCopyStream(aSrcStream, aDestStream : TStream);

var

SyncObj : TtdProduceConsumeSync;

Buffers : TQueuedBuffers;

Producer : TProducer;

Consumer : TConsumer;

WaitArray : array [ 0..1] of THandle;

begin

SyncObj := nil;

Buffers := nil;

Producer :=nil;

Consumer :=nil;

try

{создать объект синхронизации, объект организованных в виде очереди буферов (с 20 буферами) и два потока}

SyncObj := TtdProduceConsumeSync.Create(20);

Buffers := TQueuedBuffers.Create(20);

Producer := TProducer.Create(aSrcStream, SyncObj, Buffers);

Consumer := TConsumer.Create(aDestStream, SyncObj, Buffers);

{сохранить дескрипторы потоков, что обеспечивает возможность ожидания их передачи}

WaitArray[0] := Producer.Handle;

WaitArray[1] := Consumer.Handle;

{запустить потоки}

Consumer.Resume;

Producer.Resume;

{ожидать окончания потоков}

WaitForMultipleObjects(2, @WaitArray, true, INFINITE);

finally

Producer.Free;

Consumer.Free;

Buffers.Free;

SyncObj.Free;

end;

end;

Затем подпрограмма копирования создает два потока, между которыми будет выполняться копирование, и возобновляет их выполнение (потоки создаются в приостановленном состоянии). Далее подпрограмма дожидается завершения обоих потоков и выполняет очистку. Полный код подпрограммы можно найти в файлах TstCopy.dpr и TstCopyu.pas на web-сайте издательства, в разделе материалов.

Модель с одним производителем и несколькими потребителями

Реализовать рассмотренное приложение, в котором используется модель 'производитель-потребитель',

Добавить отзыв
ВСЕ ОТЗЫВЫ О КНИГЕ В ОБРАНЕ

0

Вы можете отметить интересные вам фрагменты текста, которые будут доступны по уникальной ссылке в адресной строке браузера.

Отметить Добавить цитату