Облачно-ориентированный обмен сообщениями на платформе Red Hat OpenShift с использованием Quarkus и AMQ Online

от автора

Всем привет! Вот и он – наш заключительный пост из серии про Quarkus! (Кстати, смотрите наш вебинар «Это Quarkus – Kubernetes native Java фреймворк». Покажем, как начать «с нуля» или перенести готовые решения)

В предыдущем посте мы рассмотрели соответствующие инструменты, с помощью которых можно количественно оценить улучшения, полученные в результате модернизации Java-приложений.

Начиная с версии 0.17.0, Quarkus поддерживает использование Advanced Message Queuing Protocol (AMQP), который является открытым стандартом передачи бизнес-сообщений между приложениями или организациями.

Red Hat AMQ Online – это сервис, построенный на основе открытого проекта EnMasse и реализующий механизм обмена сообщений на базе платформы Red Hat OpenShift. Подробнее о том, как он устроен, см. здесь (EN). Сегодня мы покажем, как объединить AMQ Online и Quarkus, чтобы построить современную систему обмена сообщениями на базе OpenShift с использованием двух новых технологий, связанных с обработкой сообщений.

Предполагается, что вы уже развернули AMQ Online на платформе OpenShift (если нет, то см. руководство по установке).

Для начала мы создадим приложения Quarkus, которое будет представлять собой простую систему обработки заказов с использованием реактивного обмена сообщениями. Это приложение будет включать в себя генератор заказов, отправляющий заказы в очередь сообщений с фиксированным интервалом, а также обработчик заказов, который будет обрабатывать сообщения из очереди и формировать подтверждения, доступные для просмотра в браузере.

После создания приложения мы покажем, как внедрять в него конфигурацию системы обмена сообщениями и задействуем AMQ Online, чтобы инициализировать на этой системе нужные нам ресурсы.

Приложение Quarkus

Наше Quarkus-приложение выполняется на OpenShift и представляет собой модифицированную версию программы amqp-quickstart. Полный пример клиентской части можно найти здесь.

Генератор заказов

Генератор каждые 5 секунд просто монотонно отправляет растущие идентификаторы заказов на адрес «orders».

@ApplicationScoped public class OrderGenerator {       private int orderId = 1;       @Outgoing("orders")     public Flowable<Integer> generate() {         return Flowable.interval(5, TimeUnit.SECONDS)         .map(tick -> orderId++);     } } 

Обработчик заказов

Обработчик заказов еще проще, он всего лишь возвращает идентификатор подтверждения на адрес «confirmations».

@ApplicationScoped public class OrderProcessor {     @Incoming("orders")     @Outgoing("confirmations")     public Integer process(Integer order) {         // Идентификатор подтверждения равен удвоенному идентификатору заказа <img draggable="false" class="emoji" alt=":-)" src="https://s.w.org/images/core/emoji/11.2.0/svg/1f642.svg">         return order * 2;     } } 

Ресурсы подтверждения

Ресурс подтверждения – это конечная точка HTTP для листинга сформированных в результате работы нашего приложения подтверждений.

@Path("/confirmations") public class ConfirmationResource {       @Inject     @Stream("confirmations") Publisher<Integer> orders;       @GET     @Produces(MediaType.TEXT_PLAIN)     public String hello() {         return "hello";     }         @GET     @Path("/stream")     @Produces(MediaType.SERVER_SENT_EVENTS)     public Publisher<Integer> stream() {         return orders;     } } 

Настройка

Для подключения к AMQ Online нашему приложению понадобится некоторые конфигурационные данные, а именно: конфигурация Quarkus-коннектора, сведения о конечной точке AMQP и клиентские учетные данные. Лучше, конечно, держать все конфигурационные данные в одном месте, но мы специально разделим их, чтобы показать возможные варианты настройки приложения Quarkus.

Коннекторы

Конфигурацию коннектора можно предоставить на этапе компиляции с помощью файла свойств приложения:

mp.messaging.outgoing.orders.connector=smallrye-amqp mp.messaging.incoming.orders.connector=smallrye-amqp 

Чтобы не усложнять, мы будем использовать очередь сообщений только для адреса «orders». А адрес «confirmations» в нашем приложении будет использовать очередь в памяти.

Конечная точка AMQP

На этапе компиляции имя хоста и номер порта для конечной точки AMQP неизвестны, поэтому их нужно внедрить. Конечную точку можно задать в configmap, который создается AMQ Online, поэтому мы определим их через переменные среды в манифесте приложения:

spec:   template:     spec:       containers:       - env:         - name: AMQP_HOST           valueFrom:             configMapKeyRef:               name: quarkus-config               key: service.host         - name: AMQP_PORT           valueFrom:             configMapKeyRef:               name: quarkus-config               key: service.port.amqp 

Учетные данные

Маркер учетной записи сервиса можно использовать для аутентификации нашего приложения в OpenShift. Для этого надо сначала создать пользовательский ConfigSource, который будет читать маркер аутентификации из файловой системы pod’а:

public class MessagingCredentialsConfigSource implements ConfigSource {     private static final Set<String> propertyNames;       static {         propertyNames = new HashSet<>();         propertyNames.add("amqp-username");         propertyNames.add("amqp-password");     }       @Override     public Set<String> getPropertyNames() {         return propertyNames;     }       @Override     public Map<String, String> getProperties() {         try {             Map<String, String> properties = new HashMap<>();             properties.put("amqp-username", "@@serviceaccount@@");             properties.put("amqp-password", readTokenFromFile());             return properties;         } catch (IOException e) {             throw new UncheckedIOException(e);         }     }       @Override     public String getValue(String key) {         if ("amqp-username".equals(key)) {             return "@@serviceaccount@@";         }         if ("amqp-password".equals(key)) {             try {                 return readTokenFromFile();             } catch (IOException e) {                 throw new UncheckedIOException(e);             }         }         return null;     }       @Override     public String getName() {         return "messaging-credentials-config";     }       private static String readTokenFromFile() throws IOException {         return new String(Files.readAllBytes(Paths.get("/var/run/secrets/kubernetes.io/serviceaccount/token")), StandardCharsets.UTF_8);     } } 

Сборка и развертывание приложения

Поскольку приложение надо скомпилировать в исполняемый файл, потребуется виртуальная машина GraalVM. Подробнее о том, как настроить для этого окружение, см. соответствующие инструкции в Quarkus Guide.

Затем, следуя приведенным там инструкциям, надо скачать исходник, провести сборку и выполнить развертывание нашего приложения:

git clone https://github.com/EnMasseProject/enmasse-example-clients cd enmasse-example-clients/quarkus-example-client oc new-project myapp mvn -Pnative -Dfabric8.mode=openshift -Dfabric8.build.strategy=docker package fabric8:build fabric8:resource fabric8:apply 

После этих команд приложение будет развернуто, но не запустится до тех пор, пока мы не настроим в AMQ Online необходимые нам ресурсы обмена сообщениями.

Настройка системы обмена сообщениями

Теперь осталось задать в системе обмена сообщениями ресурсы, которые нужны нашему приложению. Для этого надо создать: 1) адресное пространство, чтобы инициализировать конечную точку системы обмена сообщениями; 2) адрес, чтобы настроить адреса, которые мы используем в приложении; 3) пользователя системы обмена сообщениями, чтобы задать учетные данные клиента.

Пространство адресов

Объект AddressSpace в AMQ Online – это группа адресов, которые совместно используют конечные точки подключения, а также политики аутентификации и авторизации. При создании пространства адресов можно задать, как именно будут предоставляться конечные точки системы обмена сообщениями:

apiVersion: enmasse.io/v1beta1 kind: AddressSpace metadata:   name: quarkus-example spec:   type: brokered   plan: brokered-single-broker   endpoints:   - name: messaging     service: messaging     exports:     - name: quarkus-config       kind: configmap 

Адреса

Адреса используются для отправки и приема сообщений. У каждого адреса есть тип, который определяет его семантику, а также план, который задает количество резервируемых ресурсов. Адрес можно определить, например, вот так:

apiVersion: enmasse.io/v1beta1 kind: Address metadata:   name: quarkus-example.orders spec:   address: orders   type: queue   plan: brokered-queue 

Пользователь системы обмена сообщениями

Чтобы отправлять и получать сообщения на ваши адреса могли только доверенные приложения, в системе обмена сообщениями необходимо создать пользователя. Для приложений, работающих на кластере, клиентов можно аутентифицировать с помощью учетной записи сервиса OpenShift. Пользователя «serviceaccount» можно определить, например, так:

apiVersion: user.enmasse.io/v1beta1 kind: MessagingUser metadata:   name: quarkus-example.app spec:   username: system:serviceaccount:myapp:default   authentication:     type: serviceaccount   authorization:   - operations: ["send", "recv"]     addresses: ["orders"] 

Разрешения для настройки приложения

Чтобы AMQ Online мог создать configmap, который мы использовали для внедрения сведений о конечной точке AMQP, необходимо задать роль и привязку роли (Role и RoleBinding):

--- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata:   name: quarkus-config spec:   rules:   - apiGroups: [ "" ]     resources: [ "configmaps" ]     verbs: [ "create" ]   - apiGroups: [ "" ]     resources: [ "configmaps" ]     resourceNames: [ "quarkus-config" ]     verbs: [ "get", "update", "patch" ] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata:   name: quarkus-config roleRef:   apiGroup: rbac.authorization.k8s.io   kind: Role   name: quarkus-config subjects: - kind: ServiceAccount   name: address-space-controller   namespace: amq-online-infra 

Как применить конфигурации

Применить конфигурацию системы обмена сообщениями можно вот так:

cd enmasse-example-clients/quarkus-example-client oc project myapp oc apply -f src/main/resources/k8s/addressspace oc apply -f src/main/resources/k8s/address 

Верификация приложения

Чтобы убедиться, что приложение запустилось, первым делом проверим, создались ли и активны ли соответствующие адреса:

until [[ `oc get address quarkus-example.prices -o jsonpath='{.status.phase}'` == "Active" ]]; do echo "Not yet ready"; sleep 5; done 

Затем проверим URL маршрута приложения (просто откроем это адрес в браузере):

echo "http://$(oc get route quarkus-example-client -o jsonpath='{.spec.host}')/prices.html" 

В браузере должно быть видно, что тикеты периодически обновляются по мере того, как сообщения отправляются и принимаются AMQ Online.

Подводим итоги

Итак, мы написали приложение Quarkus, использующее AMQP для обмена сообщениями, настроили это приложение для работы на платформе Red Hat OpenShift, а также внедрили его конфигурацию на основе конфигурации AMQ Online. Затем мы создали манифесты, необходимые для инициализации системы обмена сообщениями под наше приложение.

На этом мы завершаем серию про Quarkus, но впереди много нового и интересного, оставайтесь с нами!

ссылка на оригинал статьи https://habr.com/ru/company/redhatrussia/blog/503940/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *