Для чего это нужно
Конвертация сообщений между брокером и REST упрощает прием и отправку сообщений без использования нативных протоколов или клиентский приложений брокеров сообщений:
Возможные варианты использования:
-
Асинхронное взаимодействие между сервисами. Конвертация REST запросов в сообщения брокера способствует ослаблению связи между сервисами, способствует увеличению производительности и устойчивости к ошибкам
-
Сбор логов. Мобильные приложения могут отправлять логи своей работы через REST в брокер сообщений.
-
Согласование протоколов. Не все приложения имеют возможность взаимодействия через брокеры сообщений, для их интеграции используется конвертация REST-брокер сообщений.
-
Пересечение сегментов. Сегменты предприятия, как правило, разделены и взаимодействуют между собой, используя брокер сообщений.
В статье мы настроим шлюз с открытым исходным кодом OpenIG для конвертации сообщений брокера в REST и обратно.
Подготовка к работе
Предположим, у вас уже установлен и настроен OpenIG. Если же нет, то как быстро это сделать, описано в статье How To Protect Web Services with OpenIG.
Вы можете так же использовать демонстрационный проект https://github.com/maximthomas/openig-mb-example как стартовую точку
Варианты использования
Отправка HTTP запросов в Apache Kafka
Настройка позволяет получать сообщения по HTTP протоколу и отправлять их в Apache Kafka.
Добавьте в файл конфигурации OpenIG config.json
в обработчик Kafka producer:
{ "heap": [ ... { "name": "kafka-producer", "type": "MQ_Kafka", "config": { "bootstrap.servers": "kafka:9092", "topic.produce": "incoming-messages" } }, ... ] }
Важные настройки обработчика:
Настройка |
Описание |
---|---|
boostrap.server |
Список хотсто и портов Apache Kafka, указанные через запятую |
topic.produce |
Топик, в который OpenIG отправляет сообщения |
topic.consume |
Топик, из которого OpenIG читает сообщения |
uri |
Конечная точка маршрута OpenIG |
method |
Метод HTTP, который использует OpenIG для отправки запросов по HTTP |
Добавьте маршрут OpenIG, который получать HTTP запросы и отправлять сообщения в Apache Kafka:
routes/10-http2kafka.json
:
{ "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}", "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2kafka$')}", "monitor": true, "timer": true, "handler": { "type": "Chain", "config": { "filters": [], "handler": { "type": "DispatchHandler", "config": { "bindings": [ { "handler": "kafka-consumer" } ] } } } }
Примеры файлов конфигурации находятся в проекте в директории openig/config
Запустите Docker контейнеры командой
docker compose -f docker-compose.yml up
Создайте топик для Apache Kafka. Пример команды для Docker контейнера:
docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092
Отправьте HTTP запрос в OpenIG и проверьте сообщения в созданном топике:
curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' '<http://localhost:8080/http2kafka>' * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > PUT /http2kafka HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > Content-Type: application/json > Content-Length: 16 > * upload completely sent off: 16 out of 16 bytes * Mark bundle as not supporting multiuse < HTTP/1.1 202 Accepted < Server: Apache-Coyote/1.1 < Content-Length: 0 < Date: Wed, 13 Apr 2022 12:34:03 GMT < * Connection #0 to host localhost left intact
docker exec openig-mb-example-kafka-1 kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092 {"data": "test"}
Отправка сообщений Kafka в HTTP
В следующей конфигурации OpenIG будет получать сообщения и топика topic2
Apache Kafka и отправлять их на конечную точку HTTP.
Потушите Docker контейнеры командой
docker compose -f docker-compose.yml down
Добавьте в файл конфигурации обработчик Kafka producer.
config.json
{ "heap": [ ... { "name": "kafka-consumer", "type": "MQ_Kafka", "config": { "bootstrap.servers": "kafka:9092", "topic.consume": "topic2", "method": "POST" } }, ... ] }
Добавьте в OpenIG маршрут, который будет слушать сообщения из Apache Kafka и перенаправлять их на конечную точку HTTP.
routes/10-kafka2http.json
{ "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}", "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}", "monitor": true, "timer": true, "handler": { "type": "Chain", "config": { "filters": [], "handler": { "type": "DispatchHandler", "config": { "bindings": [{ "handler": "ClientHandler", "capture": "all", "baseURI": "${system['endpoint.api']}" }] } } } } }
Обратите внимание на свойство baseURI
. В нем указан URI конечной точки HTTP. Значение берется из системного свойства. указанного в файле docker-compose.yaml -Dendpoint.api=http://sample-service:8080
для сервиса OpenIG
Добавьте в Apache Kafka топик topic2
, из которого OpenIG будет читать сообщения и перенаправлять их на конечную точку HTTP.
docker exec openig-mb-example-kafka-1 kafka-topics.sh --create --topic topic2 --bootstrap-server localhost:9092
Отправим тестовые данные в созданный топик:
docker exec -it openig-mb-example-kafka-1 kafka-console-producer.sh --topic topic2 --bootstrap-server localhost:9092 >{"data": "test"}
В логе сервиса sample-service
на конечную точку которого OpenIG перенаправляет сообщения появится запись:
2024-07-05T08:46:37.540Z DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter : After request [POST /kafka2http, headers=[correlation-id:"8dd45456-433d-42cb-b992-27047ae75ed9", kafka-offset:"0", kafka-timestamp:"1720169196044", kafka-timestamp-date:"Fri Jul 05 08:46:36 UTC 2024", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/17.0.9)"], payload={"data": "test"}]
Настройка встроенного в OpenIG Apache Kafka
Если в инфраструктуре предприятия нет брокера сообщений, но есть потребность получать и перенаправлять сообщения брокера, то OpenIG предлагает встроенный брокер сообщений. Для использования встроенного Apache Kafka, добавьте в файл конфигурации OpenIG объект EmbeddedKafka
config.json
{ "heap": [ ... { "name": "EmbeddedKafka", "type": "EmbeddedKafka", "config": { "zookeper.port": "${system['zookeper.port']}", "security.inter.broker.protocol": "${empty system['keystore.location'] ?'PLAINTEXT':'SSL'}", "listeners": "${system['kafka.bootstrap']}", "advertised.listeners": "${system['kafka.bootstrap']}", "ssl.endpoint.identification.algorithm": "", "ssl.enabled.protocols":"TLSv1.2", "ssl.keystore.location":"${system['keystore.location']}", "ssl.keystore.password":"${empty system['keystore.password']?'changeit':system['keystore.password']}", "ssl.key.password":"${empty system['key.password']?'changeit':system['key.password']}", "ssl.truststore.location":"${system['truststore.location']}", "ssl.truststore.password":"${empty system['truststore.password']?'changeit':system['truststore.password']}" }, ... ] }
Важные настройки EmbeddedKafka
:
Настройка |
Описание |
---|---|
zookeper.port |
Порт Zookeper для встроенного Apache Kafka. Если не установлен, Kafra не запустится |
listeners |
Имена хостов и порты, которые будет слушать встроенный Apache Kafka. |
advertised.listeners |
Имена хостов и порты клиентов встроенного Apache Kafka. |
Добавьте Kafka listener в массив heap OpenIG и создайте маршрут, который будет слушать сообщения Kafka и перенаправлять их на конечную точку HTTP (вы можете так же перенаправлять сообщения на другой брокер).
config.json
{ "heap": [ ... { "name": "kafka-consumer", "type": "MQ_Kafka", "config": { "bootstrap.servers": "openig:9092", "topic.consume": "topic1", "method": "POST", "uri": "/kafka2http" } ... ] }
10-kafka2http.json
{ "name": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}", "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/kafka2http$')}", "monitor": true, "timer": true, "handler": { "type": "Chain", "config": { "filters": [], "handler": { "type": "DispatchHandler", "config": { "bindings": [{ "handler": "ClientHandler", "capture": "all", "baseURI": "${system['endpoint.api']}" }] } } } }
Запустите OpenIG. Теперь вы можете создать topic и отправлять сообщения в этот topic.
$ kafka-console-producer.sh --topic topic1 --bootstrap-server localhost:9092 >{"data": "test"}
В тестовом сервисе в логе появится сообщение, перенаправленное OpenIG из брокера на конечную точку HTTP.
2022-04-21 07:26:14.645 DEBUG 1 --- [nio-8080-exec-6] o.s.w.f.CommonsRequestLoggingFilter : After request [POST /kafka2http, headers=[kafka-offset:"29", kafka-topic:"topic2", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
Интеграция с IBM MQ
Отправка HTTP запросов в IBM MQ
Следующая настройка позволяет получать сообщения по HTTP протоколу и отправлять их в topic IBM MQ:
Добавьте обработчик IBM MQ Consumer в heap в файл конфигурации OpenIG:
config.json
{ "heap": [ ... { "name": "mq-producer", "type": "MQ_IBM", "config": { "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)", "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN", "XMSC_WMQ_QUEUE_MANAGER":"QM1", "XMSC_USERID":"app", "XMSC_PASSWORD":"passw0rd", "topic.produce": "DEV.QUEUE.1" } }, ... ] }
Важные настройки IBM MQ:
Setting |
Name |
---|---|
XMSC_WMQ_CONNECTION_NAME_LIST |
Адреса брокеров IBM MQ в формате списка именов хостов и портов, указанные через запятую |
XMSC_WMQ_CHANNEL |
Имя канала IBM MQ, используется для соединения |
XMSC_USERID |
Имя пользователя IBM MQ |
XMSC_PASSWORD |
Пароль пользователя IBM MQ |
topic.produce |
Топик, в который OpenIG должен слать сообщения |
topic.consume |
Топик, из кторого OpenIG читает сообщения |
uri |
Конечная точка OpenIG |
method |
Метод HTTP, который OpenIG использует для отправки запросов на конечную точку HTTP |
Добавьте маршрут OpenIG в папку routes
для обработки HTTP запросов.
10-http2mq.json
{ "name": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}", "condition": "${(request.method == 'PUT') and matches(request.uri.path, '^/http2mq$')}", "monitor": true, "timer": true, "handler": { "type": "Chain", "config": { "filters": [], "handler": { "type": "DispatchHandler", "config": { "bindings": [ { "handler": "mq-producer" } ] } } } } }
Отправьте HTTP запрос в OpenIG и проверьте полученное сообщение в топике DEV.QUEUE.1
IBM MQ:
$ curl -v -X PUT --data '{"data": "test"}' -H 'Content-Type: application/json' '<http://localhost:8080/http2mq>' * Trying 127.0.0.1:8080... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 8080 (#0) > PUT /http2mq HTTP/1.1 > Host: localhost:8080 > User-Agent: curl/7.68.0 > Accept: */* > Content-Type: application/json > Content-Length: 16 > * upload completely sent off: 16 out of 16 bytes * Mark bundle as not supporting multiuse < HTTP/1.1 202 Accepted < Server: Apache-Coyote/1.1 < Content-Length: 0 < Date: Wed, 13 Apr 2022 12:34:03 GMT < * Connection #0 to host localhost left intact
Откройте консоль IBM MQ по адресу https://localhost:9443/ibmmq/console/. В топике DEV.QUEUE.1
вы увидите полученное сообщение:
Отправка сообщений IBM MQ на конечную точку HTTP
Добавьте IBM MQ cosumer в heap в файл конфигурации OpenIG config.json
.
{ "heap": [ ... { "name": "mq-consumer", "type": "MQ_IBM", "config": { "XMSC_WMQ_CONNECTION_NAME_LIST":"mq(1414)", "XMSC_WMQ_CHANNEL":"DEV.APP.SVRCONN", "XMSC_WMQ_QUEUE_MANAGER":"QM1", "XMSC_USERID":"app", "XMSC_PASSWORD":"passw0rd", "topic.consume": "DEV.QUEUE.2", "uri": "/mq2http", "method": "POST" } } ... ] }
Добавьте маршрут OpenIG в папку routes
для обработки сообщений IBM MQ:
10-mq2http.json
{ "name": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}", "condition": "${(request.method == 'POST') and matches(request.uri.path, '^/mq2http$')}", "monitor": true, "timer": true, "handler": { "type": "Chain", "config": { "filters": [], "handler": { "type": "DispatchHandler", "config": { "bindings": [ { "handler": "ClientHandler", "capture": "all", "baseURI": "${system['endpoint.api']}" } ] } } } } }
Зайдите в консоль IBM MQ и отправьте сообщение в топик DEV.QUEUE.2
В логе сервиса sample-servive
вы увидите следующее сообщение:
2022-04-21 08:32:35.007 DEBUG 1 --- [nio-8080-exec-1] o.s.w.f.CommonsRequestLoggingFilter : After request [POST /mq2http, headers=[jms_ibm_character_set:"UTF-8", jms_ibm_encoding:"273", jms_ibm_format:"MQSTR", jms_ibm_msgtype:"8", jms_ibm_putappltype:"6", jms_ibm_putdate:"20220421", jms_ibm_puttime:"08323434", jmsxappid:"com.ibm.mq.webconsole", jmsxdeliverycount:"1", jmsxuserid:"unknown", content-length:"16", host:"sample-service:8080", connection:"Keep-Alive", user-agent:"Apache-HttpAsyncClient/4.1.4 (Java/1.8.0_212)"], payload={"data": "test"}]
ссылка на оригинал статьи https://habr.com/ru/articles/828832/
Добавить комментарий