- PVSM.RU - https://www.pvsm.ru -
Привет!
В статье я опишу способ разработки REST сервиса, позволяющего принимать файлы и сохранять их в систему обмена сообщениями в потоковом режиме без необходимости хранения всего файла на стороне сервиса. Также будет описан обратный сценарий, при котором клиент будет получать в качестве ответа файл, размещенный в системе обмена сообщениями.
Для наглядности я приведу примеры кода разработанного сервиса на JEE7 под сервер приложений IBM WebSphere Liberty Server, а в качестве системы обмена сообщениями будет выступать IBM MQ.
Тем не менее, описанный метод подходит и для других аналогичных платформ, т.е. в качестве системы обмена сообщений может выступать любой поставщик JMS API, а в качестве сервера приложений любой JEE сервер (например, Apache Tomcat).
Возникла потребность в реализации решения, которое бы позволяло как получать от клиента файлы большого размера (> 100 Mb) и передавать их в другую территориально удаленную систему, так и в обратную сторону – передавать клиенту в качестве ответа файлы из этой системы. В виду ненадежного сетевого канала между сетью клиента и сетью приложения используется система обмена сообщениями, обеспечивающая гарантированную доставку между ними.
Верхнеуровневое решение включает в себя три компонента:
В этой статье я описываю способ реализации REST сервиса, в задачи которого входит:
В виду большого размера передаваемого файла отсутствует возможность размещения его полностью в оперативной памяти, более того, со стороны MQ также накладывается ограничение – максимальный размер одного сообщения в MQ не может превышать 100 Mb. Таким образом мое решение будет основываться на следующих принципах:
Графически размещение файла на стороне клиента, REST сервиса и MQ показано ниже:
На стороне клиента файл полностью размещается на файловой системе, в REST-сервисе в оперативной памяти хранится лишь порция файла, а на стороне MQ – каждая порция файла размещается в виде отдельного сообщения.
Для наглядности предлагаемого метода решения будет разработан демонстрационный REST сервис, содержащий два метода:
В задачу метода входит получение потока входящего файла и последующая запись его в MQ очередь.
Для получения входящего файла от клиента, метод ожидает в качестве входящего параметра объект с интерфейсом com.ibm.websphere.jaxrs20.multipart.IMultipartBody, который предоставляет возможность получить ссылку на поток входящего файла
@PUT
@Path("upload")
public Response upload(IMultipartBody body) {
...
IAttachment attachment = body.getAttachment("file");
InputStream inputStream = attachment.getDataHandler().getInputStream();
...
}
Данный интерфейс (IMultipartBody) находится в JAR-архиве com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, входит в поставку к IBM Liberty Server и размещается в папке: <WLP_INSTALLATION_PATH>/dev/api/ibm.
Примечание:
Метод получает на вход поток входящего файла, название MQ очереди, куда следует записать файл, и идентификатор группы сообщений, который будут использоваться для связывания сообщений. Идентификатор группы генерируется на стороне сервиса, например, утилитой org.apache.commons.lang3.RandomStringUtils:
String groupId = RandomStringUtils.randomAscii(24);
Алгоритм сохранения входящего файла в MQ состоит из следующих этапов:
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException {
try (
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
MessageProducer producer = session.createProducer(session.createQueue(queueName));
) {
byte[] buffer = new byte[SEGMENT_SIZE];
BytesMessage message = null;
for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) {
readBytesSize = inputStream.read(buffer);
if (message != null) {
if (readBytesSize < 1) {
message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
} producer.send(message);
}
if (readBytesSize > 0) {
message = session.createBytesMessage();
message.setStringProperty("JMSXGroupID", groupId);
message.setIntProperty("JMSXGroupSeq", sequenceNumber);
if (readBytesSize == SEGMENT_SIZE) {
message.writeBytes(buffer);
} else {
message.writeBytes(Arrays.copyOf(buffer, readBytesSize));
}
}
}
}
}
Метод получает идентификатор группы сообщений в формате base64, по которому считывает сообщения из MQ очереди и отправляет в качестве ответа в потоковом режиме.
В качестве входящего параметра метод получает идентификатор группы сообщений.
@PUT
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
...
}
Для передачи клиенту файла, хранящемуся в виде набора отдельных сообщений в MQ, в потоковом режиме следует создать класс с интерфейсом javax.ws.rs.core.StreamingOutput:
public class MQStreamingOutput implements StreamingOutput {
private String groupId;
private String queueName;
public MQStreamingOutput(String groupId, String queueName) {
super();
this.groupId = groupId;
this.queueName = queueName;
}
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
try {
MQWorker().read(outputStream, queueName, groupId);
} catch(NamingException | JMSException e) {
e.printStackTrace();
new IOException(e);
} finally {
outputStream.flush();
outputStream.close();
}
}
}
В классе реализуем метод write, который получает на вход ссылку на исходящий поток, в который будут записываться сообщения из MQ. Я добавил в класс еще название очереди и идентификатор группы, сообщения которой будут считываться.
Объект этого класса будет передан в качестве параметра для создания ответа клиенту:
@GET
@Path("download")
public Response download(@QueryParam("groupId") String groupId) {
ResponseBuilder responseBuilder = null;
try {
MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME);
responseBuilder = Response.ok(streamingOutput);
} catch(Exception e) {
e.printStackTrace();
responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage());
}
return responseBuilder.build();
}
Алгоритм считывания сообщений из MQ в исходящий поток состоит из следующих этапов:
public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException {
try(
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession();
) {
connection.start();
Queue queue = session.createQueue(queueName);
int sequenceNumber = 1;
for(boolean isMessageExist = true; isMessageExist == true; ) {
String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++;
try(
MessageConsumer consumer = session.createConsumer(queue, messageSelector);
) {
BytesMessage message = (BytesMessage) consumer.receiveNoWait();
if (message == null) {
isMessageExist = false;
} else {
byte[] buffer = new byte[(int) message.getBodyLength()];
message.readBytes(buffer);
outputStream.write(buffer);
if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) {
isMessageExist = false;
}
}
}
}
}
}
Для проверки работы сервиса я воспользуюсь инструментом curl.
curl -X PUT -F file=@<путь_к_файлу> http://localhost:9080/Demo/rest/service/upload
В ответ будет получена base64 строка, содержащая идентификатор группы сообщений, которую мы укажем в следующем методе для получения файла.
curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64_строка_идентификатор_группы_сообщений> -o <путь_к_файлу_куда_запишется_ответ>
В статье был рассмотрен подход к разработке REST сервиса, позволяющему в потоковом режиме как получать и сохранять большие данные в очередь системы обмена сообщениями, так и считывать их из очереди для возвращения в виде ответа. Такой способ позволяет сократить использование ресурсов, и тем самым увеличить пропускную способность решения.
Подробнее об интерфейсе IMultipartBody, используемый для получения входящего потока файла — ссылка [1].
Альтернативная библиотека для получения файлов в потоковом режиме в REST сервисах – Apache CXF [2].
Интерфейс StreamingOutput для потокового возвращения REST ответа клиенту — ссылка [3].
Автор: Nickmetal
Источник [4]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/294419
Ссылки в тексте:
[1] ссылка: https://www.ibm.com/support/knowledgecenter/en/was_beta_liberty/com.ibm.websphere.javadoc.liberty.doc/com.ibm.websphere.appserver.api.jaxrs20_1.0-javadoc/com/ibm/websphere/jaxrs20/multipart/IMultipartBody.html
[2] Apache CXF: http://cxf.apache.org/docs/jax-rs-multiparts.html
[3] ссылка: https://docs.oracle.com/javaee/7/api/javax/ws/rs/core/StreamingOutput.html
[4] Источник: https://habr.com/post/424941/?utm_campaign=424941
Нажмите здесь для печати.