Java EE, JCA и jNode 2.X announce

от автора

Доброго времени суток, %username%.
Скажу сразу, на 99% данный пост про Java EE Connector Architecture, с примерами кода. Откуда взялся 1% про Fidonet вы поймете в самом конце.

Резюме для ленивыхJMS и JCA — родственники, входящие принимает MessageDrivenBean, исходящие отправляются через ConnectionFactory.
Минимальный пакет для входящего соединения — 4 класса, для исходящего — 8 классов и настройка адаптера на стороне сервера приложений.
Дальше — только подробности и боль

Для начала — история вопроса и решение бизнес-задачи.

Постановка задачи
Мне поставили задачу об интеграции существующей бизнес-системы («система А» ) с другой системой, которая была разработана много лет назад и понимает только собственный протокол передачи данных («система B»). Модифицировать чужие системы нельзя, соответственно задача свелась к написанию некой шины/прокси. Интеграция состоит в передаче туда-сюда сообщений с конвертацией их в процессе из одного формата в другой.

Система «А» имела много современных механизмов интеграции, самым простым для использования были признаны веб-сервисы. Под это дело был оперативно запилен стандартный интеграционный скелет для JEE — JAX-WS+EJB+JMS для гарантированной доставки сообщения.
А вот для работы с системой «B» стандартных средств не было. Робкие попытки поработать с сетью из контекста EJB успехом не увенчались, гугл подсказал два варианта решения проблемы: костылить сервлеты для работы с non-http или написать JCA-адаптер. Понятно, что был выбран второй путь — с JCA я до этого не работал, а узнать что-то новое всегда интересно.

Исследование
Начав копать гугл, я достаточно сильно обломался. Везде писали, ЧТО именно нужно сделать ( коннектор, менеджер, адаптер итд ), но почти нигде не писали, КАК это сделать. Стандартный способ «посмотреть чужой код и понять процесс» дал сбой — чужого кода был такой мизер, что понять что-то мне не представлялось возможным.

Спасли меня две вещи: JSR 322 и единственный нагугленный адаптер на google code. Собственно, это и стало отправной точкой — задеплоив примеры из jca-sockets и открыв pdf, я начал разбираться и путем научного тыка понимать, как оно собственно работает.

Потратив около 16 часов на исследование и эксперименты, я выяснил следующее:

JCA-модуль имеет внутри себя две независимых части: «Входящие» и «Исходящие». Эти части могут быть как вместе, так и по-отдельности. Более того, их может быть несколько. Сам модуль регистритуется классом, реализующим javax.resource.spi.ResourceAdapter и указанным в META-INF/ra.xml, при этом ResourceAdapter нужен в первую очередь для работы с Входящими; Для Исходящих адаптер ничего не делает и его скелет можно даже не заполнять.

Входящие
Входящий канал биндится к MessageEndpoint’у ( обычно это @MessageDrivenBean; да-да, JCA это кишки JMS ) и активируется ActivationSpec’ом.
META-INF/ra.xml — описание ResourceAdapter’а и inbound потоков
ra.xml<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd" version="1.7" metadata-complete="true"> <vendor-name>kreon-services</vendor-name> <eis-type>FidoNet</eis-type> <resourceadapter-version>2.5</resourceadapter-version> <resourceadapter> <!— Класс, который реализует javax.resource.spi.ResourceAdapter; config-property — поля, должны быть доступны через геттеры/сеттеры —> <resourceadapter-class>in.fidonode.binkp.ra.BinkpServerResourceAdapter</resourceadapter-class> <config-property> <config-property-name>version</config-property-name> <config-property-type>java.lang.String</config-property-type> <config-property-value>jnode-jee 2.5 binkp/1.1</config-property-value> </config-property> <!— Описание входящего потока —> <inbound-resourceadapter> <messageadapter> <messagelistener> <!— Интерфейс, который должен реализовать @MessageDrivenBean для того, чтоб получать сообщения с этого адаптера —> <messagelistener-type>in.fidonode.binkp.ra.BinkpMessageListener</messagelistener-type> <activationspec> <!— Класс-холдер параметров, которая передается через @ActivationConfigProperty, должны быть доступны геттеры и сеттеры параметров —> <activationspec-class>in.fidonode.binkp.ra.BinkpActivationSpec</activationspec-class> <!— Список обязательных параметров —> <required-config-property> <config-property-name>listenPort</config-property-name> </required-config-property> <!— Описание параметров параметров —> <config-property> <config-property-name>listenPort</config-property-name> <config-property-type>java.lang.Integer</config-property-type> <config-property-value>24554</config-property-value> </config-property> </activationspec> </messagelistener> </messageadapter> </inbound-resourceadapter> </resourceadapter> </connector>

Интерфейс BinkpMessageListener — для клиентов и должен быть в classpath;

Приведу его тут:
public interface BinkpMessageListener { public void onMessage(FidoMessage message); }

Теперь рассмотрим простейшую реализацию ResourceAdapter
BinkpServerResourceAdapter.javapublic class BinkpServerResourceAdapter implements ResourceAdapter, Serializable { private static final long serialVersionUID = 1L; private static Logger log = Logger.getLogger(BinkpServerResourceAdapter.class .getName()); private ConcurrentHashMap<BinkpActivationSpec, BinkpEndpoint> activationMap = new ConcurrentHashMap<BinkpActivationSpec, BinkpEndpoint>(); private BootstrapContext ctx; private String version; @Override public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) throws ResourceException { BinkpEndpoint activation = new BinkpEndpoint(ctx.getWorkManager(), (BinkpActivationSpec) spec, endpointFactory); activationMap.put((BinkpActivationSpec) spec, activation); activation.start(); log.info("endpointActivation(" + activation + ")"); } @Override public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec spec) { BinkpEndpoint activation = activationMap.remove(spec); if (activation != null) activation.stop(); log.info("endpointDeactivation(" + activation + ")"); } @Override public void start(BootstrapContext ctx) throws ResourceAdapterInternalException { this.ctx = ctx; log.info("start()"); } @Override public void stop() { for (BinkpEndpoint act : activationMap.values()) { act.stop(); } activationMap.clear(); log.info("stop()"); } @Override public XAResource[] getXAResources(ActivationSpec[] arg0) throws ResourceException { return null; } public String getVersion() { return version; } public void setVersion(String version) { this.version = version; } }

Что тут происходит? При загрузке JCA-модуля создается экземпляр класса BinkpServerResourceAdapter, у него заполняются параметры ( в данном случае — поле version) и вызывается метод start().
На самом деле, внутри метода start() можно делать много всего, но в данном примере мы просто сохраняем контекст для получения из него в дальнейшем WorkManager’а.

Когда сервер приложений находит @MessageDrivenBean, он пытается найти адаптер, который отправляет сообщения на тот интерфейс, который реализует бин. Для JMS это MessageListener, у нас это BinkpMessageListener. Создается ActivationSpec ( у нас это BinkpActivationSpec, реализующий javax.resource.spi.ActivationSpec), поля в котором заполняются согласно данным в activationConfig, создается MessageEndpointFactory и вызывается ResourceAdapter.endpointActivation(). В этой функции необходимо создать тот «сервер», который будет принимать входящие соединения, будь то tcp/ip сервер или поток для работы с unix-socket, создать на основе того конфига который был в MDB. Класс BinkpEndpoint — это и есть тот самый «сервер».
BinkpEndpoint.javapublic class BinkpEndpoint implements Work, FidoMessageListener { private static final Logger logger = Logger.getLogger(BinkpEndpoint.class .getName()); private BinkpServer server; private final WorkManager workManager; private final MessageEndpointFactory messageEndpointFactory; public BinkpEndpoint(WorkManager workManager, BinkpActivationSpec activationSpec, MessageEndpointFactory messageEndpointFactory) { this.workManager = workManager; this.messageEndpointFactory = messageEndpointFactory; server = new BinkpServer(activationSpec.getListenPort(), this); } public void start() throws ResourceException { workManager.scheduleWork(this); } public void stop() { if (server != null) { server.stop(); } } /** из FidoMessageListener **/ @Override public Message incomingMessage(FidoMessage message) { String message = msg.encode(); BinkpMessageListener listener = (BinkpMessageListener) messageEndpointFactory .createEndpoint(null); listener.onMessage(message); } /** из Work **/ @Override public void run() { server.start(); } /** из Work **/ @Override public void release() { stop(); } }

Можно заметить, что везде фигурируют некие endpoint’ы. У меня был с этим некоторый затык, поэтому расшифрую:
Endpoint — это то, что слушает «входящий» поток. Именно к нему относятся функции endpointActication
MessageEndpoint — экземпляр MDB, который обрабатывает то или иное сообщение. Получается вызовом MessageEndpointFactory.createEndpoint() ( Эту функцию нельзя вызывать из основного треда ). Он легко кастится к интерфейсу MDB.

Собственно, все. Реализацию BinkpServer за ненадобностью опущу, но принцип должен быть понятен, минимальный «Входящий» JCA делается из четырех классов ( ResourceAdapter, MessageListener, ActivationSpec, Endpoint )

Создание Endpoint’а и обработка входящих:
@MessageDriven(messageListenerInterface = BinkpMessageListener.class, activationConfig = { @ActivationConfigProperty(propertyName = "listenPort", propertyValue = "24554") }) public class ReceiveMessageBean implements BinkpMessageListener { @Override public void onMessage(FidoMessage msg) { // do smth with mesaage } }

Исходящие

А вот тут — все веселее, минимальный «Исходящий» JCA делается аж из 8 классов, что в 2 раза больше чем «Входящий». Но давайте по-порядку.

META-INF/ra.xml — описание ResourceAdapter’а и outbound потоков

ra.xml<connector xmlns="http://xmlns.jcp.org/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/connector_1_7.xsd" version="1.7" metadata-complete="true"> <vendor-name>kreon-services</vendor-name> <eis-type>FidoNet</eis-type> <resourceadapter-version>2.5</resourceadapter-version> <resourceadapter> <!— Класс, который реализует javax.resource.spi.ResourceAdapter; config-property — поля, должны быть доступны через геттеры/сеттеры —> <resourceadapter-class>in.fidonode.binkp.ra.BinkpServerResourceAdapter</resourceadapter-class> <config-property> <config-property-name>version</config-property-name> <config-property-type>java.lang.String</config-property-type> <config-property-value>jnode-jee 2.5 binkp/1.1</config-property-value> </config-property> <!— Описание исходящего потока. Всегда ПЕРЕД входящим —> <outbound-resourceadapter> <connection-definition> <!— Фабрика с которой будет работать JEE-сервер, она создает фабрику для создания соединений —> <managedconnectionfactory-class>in.fidonode.binkp.ra.ManagedConnectionFactory</managedconnectionfactory-class> <!— Фабика соединений, которая будет отдаваться по запросу. Интерфейс должен быть у клиента в classpath —> <connectionfactory-interface>in.fidonode.binkp.ra.ConnectionFactory</connectionfactory-interface> <connectionfactory-impl-class>in.fidonode.binkp.ra.ConnectionFactoryImpl</connectionfactory-impl-class> <!— Соединение, которое будет отдавать фабрика соединений. Интерфейс должен быть у клиента в classpath —> <connection-interface>in.fidonode.binkp.ra.Connection</connection-interface> <connection-impl-class>in.fidonode.binkp.ra.ConnectionImpl</connection-impl-class> </connection-definition> <!— Про транзакции и аутентификацию есть отдельный талмуд, я этот момент пропустил —> <transaction-support>NoTransaction</transaction-support> <reauthentication-support>false</reauthentication-support> </outbound-resourceadapter> <!— Описание входящего потока —> <inbound-resourceadapter> <!— … —> </inbound-resourceadapter> </resourceadapter> </connector>

Интерфейсы Connection и ConnectionFactory — для клиентов и должны быть в classpath. Сразу приведу их тут, там ничего интересного.

BinkpClient приводить не буду 🙂
public interface Connection { public BinkpClient connect(String hostname, int port); } public interface ConnectionFactory { public Connection createConnection(); }

Соединения бывают Managed и Unmanaged. Первые — со свистелками, listener’ами и другим, вторые — без.
Класс, реализующий ManagedConnectionFactory, должен уметь создавать оба типа соединений.
ManagedConnectionFactory.javapublic class ManagedConnectionFactory implements javax.resource.spi.ManagedConnectionFactory { private PrintWriter logwriter; private static final long serialVersionUID = 1L; /** * Создание фабрики для unmanaged-соединений */ @Override public Object createConnectionFactory() throws ResourceException { return new ConnectionFactoryImpl(); } /** * Создание managed-фабрики для managed-connection */ @Override public Object createConnectionFactory(ConnectionManager cxManager) throws ResourceException { return new ManagedConnectionFactoryImpl(this, cxManager); } /** * Создание managed-соединения */ @Override public ManagedConnection createManagedConnection(Subject subject, ConnectionRequestInfo cxRequestInfo) throws ResourceException { return new in.fidonode.binkp.ra.ManagedConnection(); } @Override public PrintWriter getLogWriter() throws ResourceException { return logwriter; } @SuppressWarnings("rawtypes") @Override public ManagedConnection matchManagedConnections(Set connectionSet, Subject subject, ConnectionRequestInfo cxRequestInfo) throws ResourceException { ManagedConnection result = null; Iterator it = connectionSet.iterator(); while (result == null && it.hasNext()) { ManagedConnection mc = (ManagedConnection) it.next(); if (mc instanceof in.fidonode.binkp.ra.ManagedConnection) { result = mc; } } return result; } @Override public void setLogWriter(PrintWriter out) throws ResourceException { logwriter = out; } }

Когда приложение запрашивает у JEE-сервера тот или иной коннектор, сервер приложений просит ManagedConnectionFactory создать ConnectionFactory и отдает его приложению.

Как можно заметить, ConnectionFactory тоже бывает Managed и Unmanaged. В принципе все это можно свести к одному классу, но это сильно зависит от того, что именно и как мы передаем, есть ли там транзакции итд.
ConnectionFactoryIml просто делает new ConnectionImpl(), а вот ManagedConnectionFactoryImpl чуть посложнее:

ManagedConnectionFactoryImpl.javapublic class ManagedConnectionFactoryImpl implements ConnectionFactory { private ManagedConnectionFactory factory; private ConnectionManager manager; public ManagedConnectionFactoryImpl(ManagedConnectionFactory factory, ConnectionManager manager) { super(); this.factory = factory; this.manager = manager; } /** создает managed-соединение через родителя-ManagedConnectionFactory **/ @Override public Connection createConnection() { try { return (Connection) manager.allocateConnection(factory, null); } catch (ResourceException e) { return null; } } }

ManagedConnection, реализующий javax.resource.spi.ManagedConnection — это обертка для интерфейса Connection, которая как-раз добавляет свистелок и listener’ов. Именно этот класс возвращает ManagedConnectionFactory.createManagedConnection(), которую мы вызываем при создании соединения из ManagedConnectionFactoryImpl.createConnection() через ConnectionManager.allocateConnection()

ManagedConnection.javapublic class ManagedConnection implements javax.resource.spi.ManagedConnection { private PrintWriter logWriter; private Connection connection; private List<ConnectionEventListener> listeners; public ManagedConnection() { listeners = Collections .synchronizedList(new ArrayList<ConnectionEventListener>()); } @Override public void associateConnection(Object connection) throws ResourceException { if (connection != null && connection instanceof Connection) { this.connection = (Connection) connection; } } @Override public Object getConnection(Subject subject, ConnectionRequestInfo cxRequestInfo) throws ResourceException { if (connection == null) { connection = new ManagedConnectionImpl(); } return connection; } @Override public void cleanup() throws ResourceException { } @Override public void destroy() throws ResourceException { } @Override public PrintWriter getLogWriter() throws ResourceException { return logWriter; } @Override public ManagedConnectionMetaData getMetaData() throws ResourceException { throw new NotSupportedException(); } @Override public XAResource getXAResource() throws ResourceException { throw new NotSupportedException(); } @Override public LocalTransaction getLocalTransaction() throws ResourceException { return null; } @Override public void setLogWriter(PrintWriter out) throws ResourceException { logWriter = out; } @Override public void addConnectionEventListener(ConnectionEventListener listener) { if (listener != null) { listeners.add(listener); } } @Override public void removeConnectionEventListener(ConnectionEventListener listener) { if (listener != null) { listeners.remove(listener); } } }

Ну вот, теперь мы подошли к самому простому — реализации соединения 🙂
public class ConnectionImpl implements Connection { @Override public BinkpClient connect(String hostname, int port) { return new BinkpClient(hostname, port); } }

Итоговая цепочка вызовов для установления исходящего соединения
ManagedConnectionFactory.createConnectionFactory()
->ManagedConnectionFactoryImpl.createConnection()
—>СonnectionManager.allocateConnection()
—>ManagedConnectionFactory.createManagedConnection()
—->ManagedConnection.getConnection()
——>ManagedConnectionImpl.connect()

Ну и не забудьте настроить сервер приложений для работы с этим адаптером, ну и jndi указать.

Код для вызова:
private BinkpClient createBinkpClient(String host, int port) { ConnectionFactory cf = ((ConnectionFactory) new InitialContext().lookup("java:eis/BinkpConnectionFactory")); Connection conn = cf.getConnection(); return conn.connect(host, port); }
А причем тут Фидо?

А почти непричем. Дело в том, что изначальная задача была вовсе не о binkp, но она была рабочей, а значит попадала под NDA. Поэтому, разобравшись с JCA и решив что нужно написать статью на Хабре ( кстати, оглядываясь назад я начинаю понимать, почему никто такую статью еще не написал. И это еще без транзакций! ), я разморозил старую идею — форк jnode для JEE-серверов, для запуска ноды в виде одного ear. В свое время именно знаний JCA мне не хватило для того, чтобы запустить проект 🙂

Под это все я написал вышеприведенные примеры, и они даже заработали. Так что если вы хотите потренироваться в java ee вообще и рефакторинге из java se в частности — пишите письма и коммитьте код. Да, пойнтов все еще беру.

Спасибо за внимание, оставайтесь с нами. Опечатки можно писать в комментариях, я даже не сомневаюсь что их тут десятки. http://habrahabr.ru/post/251131/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *