Условия
Это реализация однопоточного побайтового копировальщика из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(…)
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CopyUtil { public static void copy(InputStream src, OutputStream dst)throws IOException{ try (InputStream src0 = src; OutputStream dst0 = dst) { int b; while ((b = src.read()) != -1) { dst.write(b); } } } }
Это реализация однопоточного копировальщика массивами из InputStream в OutputStream. Копирование происходит в потоке вызвавшем метод copy(…)
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CopyUtil { public static void copy(InputStream src, OutputStream dst)throws IOException{ byte[] buff = new byte[128]; try (InputStream src0 = src; OutputStream dst0 = dst) { int count; while ((count = src.read(buff)) != -1) { dst.write(buff, 0, count); } } } }
Это реализация многопоточного копировальщика массивами из InputStream в OutputStream. Мы заводим на чтение и на запись по отдельному новому потоку и соединяем их блокирующей ограниченной очередью для передачи данных от читателя к писателю
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicReference; public class CopyUtil { public static void copy(final InputStream src, final OutputStream dst) throws IOException { // reader-to-writer byte[]-channel final BlockingQueue<byte[]> buffer = new ArrayBlockingQueue<>(64); // exception-channel from reader/writer threads? final AtomicReference<Throwable> ex = new AtomicReference<>(); final ThreadGroup group = new ThreadGroup("read-write") { public void uncaughtException(Thread t, Throwable e) {ex.set(e);} }; // reader from 'src' Thread reader = new Thread(group, () -> { try (InputStream src0 = src) { // 'src0' for auto-closing while (true) { byte[] data = new byte[128]; // new data buffer int count = src.read(data, 1, 127); // read up to 127 bytes data[0] = (byte) count; // 0-byte is length-field buffer.put(data); // send to writer if (count == -1) {break;} // src empty } } catch (Exception e) {group.interrupt();} // interrupt writer }); reader.start(); // writer to 'dst' Thread writer = new Thread(group, () -> { try (OutputStream dst0 = dst) { // 'dst0' for auto-closing while (true) { byte[] data = buffer.take(); // get new data from reader if (data[0] == -1) {break;} // its last data dst.write(data, 1, data[0]); // } } catch (Exception e) {group.interrupt();} // interrupt writer }); writer.start(); // wait to complete read/write operations try { reader.join(); // wait for reader writer.join(); // wait for writer } catch (InterruptedException e) {throw new IOException(e);} if (ex.get() != null) {throw new IOException(ex.get());} } }
Для проверки корректности копирования можно использовать следующий тест
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.Random; public class Test { public static void main(String[] args) throws IOException { Random rnd = new Random(0); byte[] testData = new byte[64 * 1024]; rnd.nextBytes(testData); ByteArrayOutputStream dst = new ByteArrayOutputStream(); CopyUtil.copy(new ByteArrayInputStream(testData), dst); if (!Arrays.equals(testData, dst.toByteArray())) { throw new AssertionError("Lab decision wrong!"); } else { System.out.println("OK!"); } } }
Задание #1
В последнем двупоточном решении мы стартуем два потока — для чтения и для записи. Перепишите код, что бы чтение осуществлялось в новом потоке, а запись производилась потоком, вызвавшим copy(…). Кстати, тогда можно будет избавиться от пары join-ов, так как поток на принимающем конце буфера знает, когда закончились данные.
Задание #2
В последнем двупоточном решении читатель постоянно создает новые byte[]-буфера, передает их писателю, а тот отправляет на съедение GC. Создайте отдельную обратную очередь пустых буферов от писателя к читателю.
Задание #3
Во всех трех примерах кода мы реализовывали передачу данных от одного читателя — одному писателю. Реализуйте многопоточное решение передачи данных от одного читателя — многим писателям. Все писатели получают идентичные данные. Читатель и писатели работают каждый в своем отдельном потоке. Не создавайте отдельные копии данных для каждого писателя — пусть писатели читают из одних на всех буферов, но храните эти буфера одновременно в разных очередях (от читателя к каждому писателю тянется отдельная очередь).
import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class CopyUtil { public static void copy(InputStream src, OutputStream ... dst) throws IOException { // some code } }
Задание #4
Сделайте предыдущее задание #3 но образуйте не топологию ‘звезда’, где в центре читатель и от него исходят лучи к писателям, а топологию ‘кольцо’. В которой читатель и писатели выстраиваются в круг и передают буфер по кругу. Читатель — первому писателю, первый писатель — второму,… последний писатель — читателю. И после чего читатель может использовать буфер повторно.
Контакты
Я занимаюсь разработкой курса программирования по Java Core (online-курс).
email: GolovachCourses@gmail.com
skype: GolovachCourses
ссылка на оригинал статьи http://habrahabr.ru/company/golovachcourses/blog/226559/
Добавить комментарий