Multi connection IBM MQ с использованием Spring

Приведу пример как сконфигурировать несколько endpoints для подключения к IBM MQ.

Цель:

  • читать из нескольких очередей, именованных одинаково, но находящихся на разных хостах/администраторах очередей
  • писать ответ в рандомно определенную ноду

0. Будем считать, что вы на данный момент уже развернули MQ или пользуетесь чьей-то.

1. Подгружаем зависимости в проект:

maven

<dependency>     <groupId>com.ibm.mq</groupId>     <artifactId>mq-jms-spring-boot-starter</artifactId>     <version>2.3.3</version> </dependency>

gradle

compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'

2. Создаем конфиг, вводим параметры подключения ваших точек (вы же их создали уже?). Используем массив, поэтому подключений может быть сколь угодно много.

mq:   servers:     - queueManager: QM1       channel: DEV.ADMIN.SVRCONN       connName: ibmmq.ru(1414)       user: admin       password: passw0rd     - queueManager: QM2       channel: DEV.ADMIN.SVRCONN       connName: ibmmq.ru(1415)       user: admin       password: passw0rd   queue1: QUEUE1   queue2: QUEUE2 

3. Создаем классы для считывания этих пропертей:

import lombok.Data; import lombok.EqualsAndHashCode; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;  @Configuration @ConfigurationProperties(prefix = "mq") @EqualsAndHashCode(callSuper = false) @Data public class MqConfig {     private List<ConnectionConfiguration> servers;     private String queue1;     private String queue2;  } 

import lombok.Data; import lombok.EqualsAndHashCode;  @Data @EqualsAndHashCode(callSuper = false) public class ConnectionConfiguration {     String queueManager;     String channel;     String connName;     String user;     String password; } 

4. Создаем слушателя:

import javax.jms.MessageListener; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component;  @Component @Slf4j public class MqListener implements MessageListener {      @SneakyThrows     @Override     public void onMessage(@Payload javax.jms.Message message) {         log.info("Получено сообщение <" + message + ">");         //TODO: сюда добавим отправку ответа чуть позже     } 

5. Конфигурируем! Определяем коннекшионФактори для каждого элемента массива из yml-пропертей. Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты. Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories.

 import com.fasterxml.jackson.databind.ObjectMapper; import com.ibm.mq.jms.MQConnectionFactory; import com.ibm.msg.client.wmq.WMQConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.*; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.QosSettings; import org.springframework.jms.support.converter.MappingJackson2MessageConverter; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageType; import org.springframework.jms.support.converter.SimpleMessageConverter;  import javax.jms.*; import java.util.*;  import static javax.jms.DeliveryMode.NON_PERSISTENT; import static javax.jms.Session.CLIENT_ACKNOWLEDGE;  @Configuration @EnableJms @Slf4j public class MqConfiguration {      @Autowired     MqConfig mqConfig;      @Autowired     private JmsListenerEndpointRegistry registry;  //Создаем фабрики слушателей, на вход которых также используем созданные connectionFactories     @Bean     public List<JmsListenerContainerFactory> myFactories(             @Qualifier("myConnFactories")              List<CachingConnectionFactory> connectionFactories,             MqListener mqListener) {         List<JmsListenerContainerFactory> factories = new ArrayList<>();         connectionFactories.forEach(connectionFactory -> {             DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();             factory.setConnectionFactory(connectionFactory);             factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);              QosSettings qosSettings = new QosSettings();             qosSettings.setDeliveryMode(NON_PERSISTENT);             factory.setReplyQosSettings(qosSettings);              SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();             endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());             endpoint.setDestination(mqConfig.getQueue1());             endpoint.setMessageListener(mqListener);             registry.registerListenerContainer(endpoint, factory);              factories.add(factory);         });         return factories;     } //Создаем лист темплейтов для отправки сообщений, на вход которому скармливаем созданные коннекты     @Bean     @Qualifier("myJmsTemplates")     public List<JmsTemplate> jmsTemplates(             @Qualifier("myConnFactories")              List<CachingConnectionFactory> connectionFactories) {         return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));     }      public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {         List<JmsTemplate> jmsTemplates = new ArrayList<>();         for (ConnectionFactory connectionFactory : connectionFactories) {             JmsTemplate jmsTemplate = new JmsTemplate();             jmsTemplate.setConnectionFactory(connectionFactory);             jmsTemplate.setMessageConverter(new SimpleMessageConverter());             jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());             jmsTemplate.setDeliveryMode(NON_PERSISTENT);             jmsTemplate.setDeliveryPersistent(false);             jmsTemplate.setExplicitQosEnabled(true);             jmsTemplates.add(jmsTemplate);         }         return jmsTemplates;     }  //Определяем коннекшионФактори для каждого элемента массива из yml-пропертей     @Bean     @Qualifier("myConnFactories")     public List<CachingConnectionFactory> connectionFactories() throws JMSException {         List<CachingConnectionFactory> factories = new ArrayList<>();          for (ConnectionConfiguration server : mqConfig.getServers()) {             CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();             MQConnectionFactory cf = new MQConnectionFactory();             cachingConnectionFactory.setTargetConnectionFactory(cf);             cf.setQueueManager(server.getQueueManager());             cf.setChannel(server.getChannel());             cf.setConnectionNameList(server.getConnName());             cf.setStringProperty(WMQConstants.USERID, server.getUser());             cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());             cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");              factories.add(cachingConnectionFactory);         }         return factories;     }  } 

endpoint.setMessageListener(mqListener);

Здесь указываем слушателя (которого создали в п.4), чтобы определить действия при приеме сообщения.

6. Создадим сервисный слой, где допустим будет какая-то логика и после отправка ответа.

import javax.jms.TextMessage;  public interface MqService {      void sendToMq(TextMessage msg);  } 

import javax.jms.TextMessage; import org.springframework.jms.JmsException; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service;  @Service @Slf4j public class MqServiceImpl implements MqService {      @Autowired     private MqConfig mqConfig;      @Autowired     @Qualifier("myJmsTemplates")     List<JmsTemplate> jmsTemplates;       @Override     public void sendToMq(TextMessage msg ) {         //какая-то логика         //рандомным образом определяем в какую ноду/темплейт отправлять сообщение.         int maxIndex = jmsTemplates.size()-1; // Конечное значение диапазона - "до"         int randomNumber = (int) Math.round(Math.random() * maxIndex);         jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);     } } 

7. Добавляем отправку ответа в слушатель:

    @Autowired     MqService mqService;      @SneakyThrows     @Override     public void onMessage(@Payload javax.jms.Message message) {         log.info("Получено сообщение <" + message + ">");         mqService.sentToMq((TextMessage) message);     }  

Вуаля, готово, можно проверять.

ссылка на оригинал статьи https://habr.com/ru/post/519466/

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *