Лабораторные по многопоточности в Java: Parallel Copy

от автора

Хорошие лабораторные по многопоточности (простые, понятные, нетривиальные и полезные в народном хозяйстве) — большая редкость. Предлагаю Вам одно условие и четыре лабораторные работы по элементарной многопоточности на Java.

Условия

Это реализация однопоточного побайтового копировальщика из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(…)

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream;  public class CopyUtil {     public static void copy(InputStream src, OutputStream dst)throws IOException{         try (InputStream src0 = src; OutputStream dst0 = dst) {             int b;             while ((b = src.read()) != -1) {                 dst.write(b);             }         }     } } 


Это реализация однопоточного копировальщика массивами из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(…)

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream;  public class CopyUtil {     public static void copy(InputStream src, OutputStream dst)throws IOException{         byte[] buff = new byte[128];         try (InputStream src0 = src; OutputStream dst0 = dst) {             int count;             while ((count = src.read(buff)) != -1) {                 dst.write(buff, 0, count);             }         }     } } 

Это реализация многопоточного копировальщика массивами из InputStream в OutputStream. Мы заводим на чтение и на запись по отдельному новому потоку и соединяем их блокирующей ограниченной очередью для передачи данных от читателя к писателю

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference;  public class CopyUtil {     public static void copy(final InputStream src, final OutputStream dst) throws IOException {         // reader-to-writer byte[]-channel         final BlockingQueue<byte[]> buffer = new ArrayBlockingQueue<>(64);         // exception-channel from reader/writer threads?         final AtomicReference<Throwable> ex = new AtomicReference<>();         final ThreadGroup group = new ThreadGroup("read-write") {             public void uncaughtException(Thread t, Throwable e) {ex.set(e);}         };         // reader from 'src'         Thread reader = new Thread(group, () -> {             try (InputStream src0 = src) {              // 'src0' for auto-closing                 while (true) {                     byte[] data = new byte[128];        // new data buffer                     int count = src.read(data, 1, 127); // read up to 127 bytes                     data[0] = (byte) count;             // 0-byte is length-field                     buffer.put(data);                   // send to writer                     if (count == -1) {break;}           // src empty                 }             } catch (Exception e) {group.interrupt();}  // interrupt writer         });         reader.start();         // writer to 'dst'         Thread writer = new Thread(group, () -> {             try (OutputStream dst0 = dst) {      // 'dst0' for auto-closing                 while (true) {                     byte[] data = buffer.take(); // get new data from reader                     if (data[0] == -1) {break;}  // its last data                     dst.write(data, 1, data[0]); //                  }             } catch (Exception e) {group.interrupt();}  // interrupt writer         });         writer.start();         // wait to complete read/write operations         try {             reader.join(); // wait for reader             writer.join(); // wait for writer         } catch (InterruptedException e) {throw new IOException(e);}         if (ex.get() != null) {throw new IOException(ex.get());}     } } 

Для проверки корректности копирования можно использовать следующий тест

import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.Random;  public class Test {     public static void main(String[] args) throws IOException {         Random rnd = new Random(0);         byte[] testData = new byte[64 * 1024];         rnd.nextBytes(testData);         ByteArrayOutputStream dst = new ByteArrayOutputStream();         CopyUtil.copy(new ByteArrayInputStream(testData), dst);         if (!Arrays.equals(testData, dst.toByteArray())) {             throw new AssertionError("Lab decision wrong!");         } else {             System.out.println("OK!");         }     } } 

Задание #1

В последнем двупоточном решении мы стартуем два потока — для чтения и для записи. Перепишите код, что бы чтение осуществлялось в новом потоке, а запись производилась потоком, вызвавшим copy(…). Кстати, тогда можно будет избавиться от пары join-ов, так как поток на принимающем конце буфера знает, когда закончились данные.

Задание #2

В последнем двупоточном решении читатель постоянно создает новые byte[]-буфера, передает их писателю, а тот отправляет на съедение GC. Создайте отдельную обратную очередь пустых буферов от писателя к читателю.

Задание #3

Во всех трех примерах кода мы реализовывали передачу данных от одного читателя — одному писателю. Реализуйте многопоточное решение передачи данных от одного читателя — многим писателям. Все писатели получают идентичные данные. Читатель и писатели работают каждый в своем отдельном потоке. Не создавайте отдельные копии данных для каждого писателя — пусть писатели читают из одних на всех буферов, но храните эти буфера одновременно в разных очередях (от читателя к каждому писателю тянется отдельная очередь).

import java.io.IOException; import java.io.InputStream; import java.io.OutputStream;  public class CopyUtil {     public static void copy(InputStream src, OutputStream ... dst) throws IOException {         // some code     } } 

Задание #4

Сделайте предыдущее задание #3 но образуйте не топологию ‘звезда’, где в центре читатель и от него исходят лучи к писателям, а топологию ‘кольцо’. В которой читатель и писатели выстраиваются в круг и передают буфер по кругу. Читатель — первому писателю, первый писатель — второму,… последний писатель — читателю. И после чего читатель может использовать буфер повторно.

Контакты

Я занимаюсь разработкой курса программирования по Java Core (online-курс).
email: GolovachCourses@gmail.com
skype: GolovachCourses

ссылка на оригинал статьи http://habrahabr.ru/company/golovachcourses/blog/226559/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *