В статье я опишу способ разработки REST сервиса, позволяющего принимать файлы и сохранять их в систему обмена сообщениями в потоковом режиме без необходимости хранения всего файла на стороне сервиса. Также будет описан обратный сценарий, при котором клиент будет получать в качестве ответа файл, размещенный в системе обмена сообщениями.
Для наглядности я приведу примеры кода разработанного сервиса на JEE7 под сервер приложений IBM WebSphere Liberty Server, а в качестве системы обмена сообщениями будет выступать IBM MQ.
Тем не менее, описанный метод подходит и для других аналогичных платформ, т.е. в качестве системы обмена сообщений может выступать любой поставщик JMS API, а в качестве сервера приложений любой JEE сервер (например, Apache Tomcat).
Постановка задачи
Возникла потребность в реализации решения, которое бы позволяло как получать от клиента файлы большого размера (> 100 Mb) и передавать их в другую территориально удаленную систему, так и в обратную сторону – передавать клиенту в качестве ответа файлы из этой системы. В виду ненадежного сетевого канала между сетью клиента и сетью приложения используется система обмена сообщениями, обеспечивающая гарантированную доставку между ними.
Верхнеуровневое решение включает в себя три компонента:
- REST сервис – задача которого предоставить клиенту возможность передать файл (или запросить).
- MQ – отвечает за передачу сообщений между различными сетями.
- Application – приложение, отвечающее за хранение файлов и выдачу их по запросу.
В этой статье я описываю способ реализации REST сервиса, в задачи которого входит:
- Получение файла от клиента.
- Передача полученного файла в MQ.
- Передача файла из MQ клиенту в качестве ответа.
Метод решения
В виду большого размера передаваемого файла отсутствует возможность размещения его полностью в оперативной памяти, более того, со стороны MQ также накладывается ограничение – максимальный размер одного сообщения в MQ не может превышать 100 Mb. Таким образом мое решение будет основываться на следующих принципах:
- Получение файла и сохранение его в MQ очереди должно выполняться в потоковом режиме, без помещения в память полностью файла.
- В очереди MQ файл будет размещаться в виде набора небольших сообщений.
Графически размещение файла на стороне клиента, REST сервиса и MQ показано ниже:
На стороне клиента файл полностью размещается на файловой системе, в REST-сервисе в оперативной памяти хранится лишь порция файла, а на стороне MQ – каждая порция файла размещается в виде отдельного сообщения.
Разработка REST сервиса
Для наглядности предлагаемого метода решения будет разработан демонстрационный REST сервис, содержащий два метода:
- upload – получает от клиента файл и записывает его в MQ очередь, в качестве ответа возвращает идентификатор группы сообщений (в base64 формате).
- download – получает от клиента идентификатор группы сообщений (в base64 формате) и возвращает файл, хранящийся в MQ очереди.
Метод получения файла от клиента (upload)
В задачу метода входит получение потока входящего файла и последующая запись его в MQ очередь.
Получение потока входящего файла
Для получения входящего файла от клиента, метод ожидает в качестве входящего параметра объект с интерфейсом com.ibm.websphere.jaxrs20.multipart.IMultipartBody, который предоставляет возможность получить ссылку на поток входящего файла
@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... }
Данный интерфейс (IMultipartBody) находится в JAR-архиве com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, входит в поставку к IBM Liberty Server и размещается в папке: <WLP_INSTALLATION_PATH>/dev/api/ibm.
Примечание:
- WLP_INSTALLATION_PATH — путь к директории WebSphere Liberty Profile.
- Ожидается, что клиент будет передавать файл в параметре с именем «file».
- Если используется другой сервер приложений, то можно воспользоваться альтернативной библиотекой от Apache CXF.
Потоковое сохранение файла в MQ
Метод получает на вход поток входящего файла, название MQ очереди, куда следует записать файл, и идентификатор группы сообщений, который будут использоваться для связывания сообщений. Идентификатор группы генерируется на стороне сервиса, например, утилитой org.apache.commons.lang3.RandomStringUtils:
String groupId = RandomStringUtils.randomAscii(24);
Алгоритм сохранения входящего файла в MQ состоит из следующих этапов:
- Инициализация объектов подключения к MQ.
- Цикличное считывание порции входящего файла пока файл не будет полностью считан:
- Порция данных файла записывается в виде отдельного сообщения в MQ.
- Каждое сообщение файла имеет свой порядковый номер (свойство «JMSXGroupSeq»).
- Все сообщения файла имеет одинаковое значение группы (свойство «JMSXGroupID»).
- Последнее сообщение имеет признак, означающий, что это сообщение является завершающим (свойство «JMS_IBM_Last_Msg_In_Group»).
- Константа SEGMENT_SIZE содержит размер порции. Например, 1Mb.
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } }
Метод отправки файла клиенту (download)
Метод получает идентификатор группы сообщений в формате base64, по которому считывает сообщения из MQ очереди и отправляет в качестве ответа в потоковом режиме.
Получение идентификатора группы сообщений
В качестве входящего параметра метод получает идентификатор группы сообщений.
@PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... }
Потоковая передача ответа клиенту
Для передачи клиенту файла, хранящемуся в виде набора отдельных сообщений в MQ, в потоковом режиме следует создать класс с интерфейсом javax.ws.rs.core.StreamingOutput:
public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } }
В классе реализуем метод write, который получает на вход ссылку на исходящий поток, в который будут записываться сообщения из MQ. Я добавил в класс еще название очереди и идентификатор группы, сообщения которой будут считываться.
Объект этого класса будет передан в качестве параметра для создания ответа клиенту:
@GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); }
Потоковое считывание файла из MQ
Алгоритм считывания сообщений из MQ в исходящий поток состоит из следующих этапов:
- Инициализация объектов подключения к MQ.
- Цикличное считывание сообщений из MQ пока не будет считано сообщение с признаком завершающего в группе (свойство «JMS_IBM_Last_Msg_In_Group»):
- Перед каждым считыванием сообщения из очереди устанавливается фильтр (messageSelector), в котором задается идентификатор группы сообщений и порядковый номер сообщения в группе.
- Содержимое считанного сообщения записывается в исходящий поток.
public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } }
Вызов REST сервиса
Для проверки работы сервиса я воспользуюсь инструментом curl.
Отправка файла
curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload
В ответ будет получена base64 строка, содержащая идентификатор группы сообщений, которую мы укажем в следующем методе для получения файла.
Получение файла
curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64_строка_идентификатор_группы_сообщений> -o <путь_к_файлу_куда_запишется_ответ>
Заключение
В статье был рассмотрен подход к разработке REST сервиса, позволяющему в потоковом режиме как получать и сохранять большие данные в очередь системы обмена сообщениями, так и считывать их из очереди для возвращения в виде ответа. Такой способ позволяет сократить использование ресурсов, и тем самым увеличить пропускную способность решения.
Дополнительные материалы
Подробнее об интерфейсе IMultipartBody, используемый для получения входящего потока файла — ссылка.
Альтернативная библиотека для получения файлов в потоковом режиме в REST сервисах – Apache CXF.
Интерфейс StreamingOutput для потокового возвращения REST ответа клиенту — ссылка.
ссылка на оригинал статьи https://habr.com/post/424941/
Добавить комментарий