
В предыдущей статье мы использовали приложение MiniFIX для подключения и отправки сообщений на тестовую биржу с помощью протокола FIX. В этой статье напишем собственную реализацию клиента для получения рыночных данных в виде небольшого SpringBoot-приложения. Код доступен в репозитории.
Для реализации приложения нам понадобится:
- Java 8
- Maven
- Spring boot 2.2.5
- Lombok
- QuickFix/J
Содержание для упрощения навигации по статье:
- FIX-Engine и запуск тестового сервера
- Структура проекта
- Настройка параметров подключения
- Создание FIX-приложения
- Создание сервиса для подключения к серверу
- Отправка запроса на получение рыночных данных
- Обработка ответа и сохранение рыночных данных
- Запуск приложения
FIX-Engine и запуск тестового сервера
FIX-Engine, или FIX-движок, обеспечивает связь со сторонними системами по протоколу FIX. Он отвечает за преобразование данных в FIX-сообщения, а также за создание сессии и обеспечение ее работы: проверку валидности сообщений, генерацию контрольных сумм, восстановление работы после потери связи и т.д (здесь можно почитать более подробно).
В нашем случае в роли такого движка выступает QuickFix/J. В предыдущей части я использовала пример Executor из модуля examples, но в нем обрабатываются только сообщения на создание торговых заявок. В этом же модуле есть более подходящий пример — OrderMatch (quickfixj-examples-ordermatch), в нем помимо поддержки торговых заявок присутствует обработка сообщений на получение рыночных данных (MarketDataRequest).
Когда вы первый раз клонируете репозиторий, обязательно нужно выполнить сборку проекта, чтобы сгенерировались FIX-сообщения для различных версий протокола. В Readme проекта есть описание команд для различных видов сборки (с тестами и без), самый быстрый:
mvn clean package -Dmaven.javadoc.skip=true -DskipTests -PskipBundlePlugin
Процесс сборки длился у меня где-то минут 6-7, так что в это время можно заварить себе чашечку чая изучить настройки сервера и приступить к написанию клиента.
Как я уже описывала ранее, открываем файл resources/quickfix.examples.ordermatch/ordermatch.cfg, проверяем SocketAcceptPort и заполняем поле TargetCompID нужным значением для нашего клиента (можно оставить BANZAI, которое указано по умолчанию, можно написать любое другое на ваше усмотрение):
[default] FileStorePath=target/data/ordermatch DataDictionary=FIX42.xml SocketAcceptPort=9876 // порт для подключения BeginString=FIX.4.2 // версия FIX 4.2 [session] SenderCompID=EXEC // идентификатор сервера TargetCompID=FIX_CLIENT // идентификатор клиента ConnectionType=acceptor StartTime=00:00:00 EndTime=00:00:00
Если хотите поменять значение идентификатора клиента, то лучше, конечно, сделать это перед сборкой, чтобы не пришлось собирать еще раз.
Когда сборка завершится, заходим в quickfixj\quickfixj-examples\ordermatch\target, проверяем, что там появились *.jar файлы:

Запускаем файл quickfixj-examples-ordermatch-2.2.0-SNAPSHOT-standalone.jar, так как он содержит все необходимые для запуска зависимости:
java -jar quickfixj-examples-ordermatch-2.2.0-SNAPSHOT-standalone.jar

Если появилась запись "Started QFJ Message Processor" – значит, сервер запустился. Проверьте, что в строке "Listening for connections at … [FIX4.2:EXEC->FIX_CLIENT]" указано нужное значение идентификатора клиента.
Структура проекта
Вот так выглядит готовый проект (стандартная структура веб-приложений: сервисы, контроллеры, модельки и т.д):

Создаем maven-проект со стандартными зависимостями и добавляем библиотеку QuickFix/J для работы с протоколом FIX:
<properties> <quickfixj.version>2.0.0</quickfixj.version> </properties>
<dependency> <groupId>org.quickfixj</groupId> <artifactId>quickfixj-core</artifactId> <version>${quickfixj.version}</version> </dependency> <dependency> <groupId>org.quickfixj</groupId> <artifactId>quickfixj-messages-fix42</artifactId> <version>${quickfixj.version}</version> </dependency>
Я подключила 2 модуля: quickfixj-core и quickfixj-messages-fix42 для работы с сообщениями только версии FIX 4.2.
Если в вашем приложении предполагаются сообщения различных версий протокола, можно подключить quickfixj-core + quickfixj-messages-all или просто quickfixj-all.
Полная версия pom.xml доступна в репозитории.
Настройка параметров подключения
По аналогии с файлом настроек на сервере, создадим файл resources/config/client.cfg с настройками нашего приложения.
В файле может быть один блок [default], в котором находятся параметры, общие для всех сессий, и несколько блоков [session] для описания параметров конкретной сессии (если сервер поддерживает сообщения различных версий протокола FIX, то для каждой версии создается отдельный блок [session]).
[default] SenderCompID=FIX_CLIENT // идентификатор отправителя TargetCompID=EXEC // идентификатор получателя ConnectionType=initiator // приложение является клиентом NonStopSession=Y SocketConnectHost=localhost ReconnectInterval=5 HeartBtInt=30 FileStorePath=target/data/banzai UseDataDictionary=Y DataDictionary=dictionary/fix4_2.xml ValidateUserDefinedFields=N AllowUnknownMsgFields=Y ValidateUserDefinedFields=N AllowUnknownMsgFields=Y [session] BeginString=FIX.4.2 ResetOnLogon=Y
Начнем с блока [default]:
- параметры сессии
— SenderCompID, TargetCompID – идентификатор отправителя и получателя сообщений соответственно (sender – наше приложение, target – сервер). Убедитесь, что эти значения совпадают со значениями параметров на сервере.
— ConnectionType (initiator/acceptor) – указывает, является наше приложение клиентом или сервером.
— С помощью параметров StartTime и EndTime можно указать время начала и соответственно завершения работы сессии (например, биржа работает с 9.00 до 18.00, поэтому нет смысла запускать сессию вне этого времени). Если же сессия будет работать весь день, то можно указать NonStopSession=Y, что будет равносильно варианту StartTime=00:00:00 и EndTime=00:00:00.
-параметры валидации сообщений
— UseDataDictionary=Y – можно использовать словарь сообщений, если вы работаете с биржей, спецификация сообщений которой отличается от стандартной (например, в словаре можно указать дополнительные теги или типы сообщений). При этом использование словаря обязательно, если есть сообщения с повторяющимися группами.
— DataDictionary – путь к словарю. - параметры клиента
— ReconnectInterval – интервал переподключения к серверу (в секундах).
— HeartBtInt – интервал проверочных сообщений типа HeartBeat (в секундах).
— LogonTimeout, LogoutTimeout – время ожидания Logon и Logout сообщений перед отключением сессии (в секундах).
— SocketConnectHost, SocketConnectPort – хост и порт подключения к acceptor-у. - параметры хранения сообщений и логов
Сообщения и логи можно хранить в файлах или в базе данных (сообщения можно нигде не хранить, если выставить параметр PersistMessages=N).
Я указала FileStorePath=target/data/banzai для хранения сообщений и номеров последовательностей в файле. Можно указать параметры базы данных (JdbcURL, JdbcUser, JdbcPassword и т.д), тогда сообщения будут храниться в базе данных.
В настройках конкретной сессии (в блоке [session]) главное – заполнить параметр BeginString, в котором указывается версия протокола FIX, использующегося в сообщениях.
Любые настройки можно указывать непосредственно при создании подключения в коде с помощью класса SessionSettings.
Подробнее о конфигурации клиента можно почитать в официальной документации.
Создание FIX-приложения
Теперь перейдем непосредственно к коду клиента. Чтобы создать FIX-приложение, нам нужно просто реализовать интерфейс Application:
public interface Application { void onCreate(SessionID sessionId); void onLogon(SessionID sessionId); void onLogout(SessionID sessionId); void toAdmin(Message message, SessionID sessionId); void toApp(Message message, SessionID sessionId) throws DoNotSend; void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon; void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType; }
Эти методы вызываются в результате событий, происходящих в приложении (подробнее).
Метод fromApp срабатывает при получении сообщений с сервера, то есть в нем происходит основная логика. Остальные методы в основном служебные. Для удобства я создала абстрактный базовый класс BaseFixService, который реализует служебные методы интерфейса Application, и его наследника FixClientService, который занимается обработкой сообщений с сервера и соответственно реализует метод fromApp.
В приложении может быть установлено несколько сессий, поэтому в базовом классе будем хранить все сессии:
Map<SessionID, Session> sessions = new HashMap<>();
Так как метод onCreate срабатывает при создании новой сессии, в нем будем сохранять сессию по полученному ID с помощью метода lookupSession:
@Override public void onCreate(SessionID sessionId) { log.info(">> onCreate for session: {}", sessionId); Session session = Session.lookupSession(sessionId); if (session != null) { sessions.put(sessionId, session); } else { log.warn("Requested session is not found."); } }
Когда сессия отключается от сервера (мы завершили сеанс сообщением Logout или произошли какие-то технические проблемы и связь оборвалась), мы удаляем её из нашего хранилища.
@Override public void onLogout(SessionID sessionId) { log.info(">> onLogout for session: {}", sessionId); sessions.remove(sessionId); }
В FixClientService у нас находится главный обработчик сообщений – метод fromApp:
@Override public void fromApp(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType { try { String type = MessageUtils.getMessageType(message.toString()); // получение типа сообщения switch (type) { case MARKET_DATA_SNAPSHOT_FULL_REFRESH: log.info("MarketData message: {}", message); break; case SECURITY_DEFINITION: log.info("SecurityDefinition message: {}", message); break; default: log.info("Unhandled message {} of type: {}", message, type); } } catch (Exception ex) { log.debug("Unexpected exception while processing message.", ex); } }
С помощью класса MessageUtils библиотеки QuickFix/J можно получить тип входящего сообщения и далее обработать каждый случай (здесь для примера я указала несколько типов сообщений и вывела их в лог). В этой статье реализуем получение рыночных данных и их сохранение в кэш, остальные типы сообщений и их обработку более подробно разберем в следующих статьях и дополним логику нашего клиента.
Создание сервиса для подключения к серверу
Когда мы создали реализацию FIX-приложения, можно приступить к сервису для подключения к серверу – ConnectorService. При запуске приложения он будет создавать и запускать сокет для обмена сообщениями.
Для обмена сообщениями нужно создать SocketInitiator (на сервере аналогично создается SocketAcceptor). При создании передаются следующие параметры:
Application– FIX-приложение (т.е. класс, реализующий интерфейс Application, FixClientService в нашем случае)MessageStoreFactory– способ хранения сообщений, это может быть, например,JdbcStoreFactory(хранение в базе данных),MemoryStoreFactory(хранение в памяти),FileStoreFactory(хранение в файле).SessionSettings– настройки сессии, для их создания нужно передать файл с настройками (либо его название, либоInputStream).LogFactory– хранение логов (аналогично сообщениям это может бытьFileLogFactory,JdbcLogFactory), я использовалаSLF4JLogFactory.MessageFactory– используется для создания сообщений (можно использоватьDefaultMessageFactoryилиMessageFactoryдля конкретной версии протокола FIX).
Путь к файлу настроек и дополнительные параметры (хост и порт подключения) для удобства я вынесла в конфигурацию приложения (application.yaml):
fix: cfg: 'classpath:config/client.cfg' socketConnectHost: localhost socketConnectPort: 9876
Соответственно при создании настроек сессии я использую этот файл и с помощью метода sessionSettings.set(String key, String value) добавляю параметры SocketConnectHost, SocketConnectPort:
try (InputStream inputStream = config.getCfg().getInputStream()) { SessionSettings sessionSettings = new SessionSettings(inputStream); sessionSettings.setString("SocketConnectHost", config.getSocketConnectHost()); sessionSettings.setString("SocketConnectPort", config.getSocketConnectPort()); MessageStoreFactory storeFactory = new FileStoreFactory(sessionSettings); SLF4JLogFactory logFactory = new SLF4JLogFactory(sessionSettings); MessageFactory messageFactory = new DefaultMessageFactory(); socketInitiator = new SocketInitiator(fixClientService, storeFactory, sessionSettings, logFactory, messageFactory); socketInitiator.start(); } catch (Exception ex) { log.error("Exception while establishing connection to FIX server.", ex); throw new FixClientException("Exception while establishing connection to FIX server.", ex); }
После создания настроек сессии объявляем LogFactory, MessageFactory, MessageStoreFactory и передаем их в конструктор SocketInitiator. Вызвав метод start() запустим подключение и сможем получать сообщения.
Не забудьте закрыть сокет при завершении работы с помощью метода
stop().
Отправка запроса на получение рыночных данных
Когда приложение запустится и установится соединение с сервером, мы сможем отправлять и получать сообщения. Так как взаимодействие у нас построено на сокетах, отправка сообщения и получение ответа на него происходят асинхронно. Поэтому у нас будет два контроллера:
- для инициации отправки сообщений
- для получения данных, сохраненных в результате обработки ответов на отправленные ранее сообщения.
Чтобы получить рыночные данные (например, цену покупки, цену продажи инструмента), нам нужно отправить сообщение-запрос на данные и соответственно обработать ответное сообщение в методе fromApp.

Напишем метод для создания сообщения типа MarketDataRequest (о тегах сообщения можно почитать в спецификации).
public static Message createMarketDataRequest(String symbol) { }
В библиотеке QuickFix/J все сообщения представляют собой классы, поля в которых соответствуют тегам. Можно создать экземпляр класса нужного нам сообщения и с помощью метода set() заполнить теги. Теги также представляют собой классы с обязательным полем FIELD, в котором хранится соответствующее числовое значение.
Например, тег symbol=55:
public class Symbol extends StringField { public static final int FIELD = 55; // constructors }
Стандартные теги (соответствующие спецификации конкретной версии FIX) обычно можно заполнить напрямую. Например, для сообщения MarketDataRequest (далее буду сокращенно писать MDR) определены методы
set(SubscriptionRequestType value) set(MarketDepth value) set(Symbol value) // ...
Если же при работе с конкретной биржей в сообщении присутствуют дополнительные теги, их можно задать с помощью общего метода setField(int key, Field<?> field): например, setField(5020, new IntField(10)) — добавим в сообщение тег <5020> со значением 10: 5020=10.
Создадим объект класса MarketDataRequest:
private static int mdReqID = 1; MarketDataRequest marketDataRequest = new MarketDataRequest( new MDReqID(format("FixClient-%s", mdReqID++)), new SubscriptionRequestType(SNAPSHOT), //263 new MarketDepth(1) //264, 1 = top of book );
В конструкторе передается три параметра:
- MDReqID – уникальный в рамках данной сессии идентификатор сообщения.
- SubscriptionRequestType:
— SNAPSHOT = ‘0’ (текущие данные);
— SNAPSHOT_PLUS_UPDATES = ‘1’ (текущие данные + подписка на обновление данных; при выборе этого типа каждый раз при изменении рыночных данных для инструмента, сервер будет отправлять сообщение типа MarketDataSnapshotFullRefresh с новыми данными);
— DISABLE_PREVIOUS_SNAPSHOT_PLUS_UPDATE_REQUEST = ‘2’ (отписка от получения данных + получение обновленных данных); - MarketDepth – глубина рынка для типа SNAPSHOT (цены формируются исходя из размещенных и ожидающих размещения заявок на покупку и продажу инструмента, эти заявки записываются в “книгу” заявок. Если указываем параметр равным 0 – будут учитываться все значения “книги”, если 1 – только “верхние” значения).
То же самое в виде сообщения: 262=FixClient-1 263=0 264=1.
Далее нужно указать параметры, которые мы хотим получить в результате запроса рыночных данных. Некоторые параметры в FIX-сообщениях задаются группами. При этом начинается такая часть сообщения с тега, в котором указывается количество последующих групп. В нашем случае параметр <267> NoMdEntryTypes хранит количество групп, а сами группы формируются из тегов <269> MdEntryType. Например, 269=0 означает, что мы хотим получить цену, по которой можно продать инструмент (Bid), а 269=1 – цену, по которой мы сможем купить инструмент (Ask, или Offer). Полный список стандартных значений этого тега можно посмотреть в спецификации. QuickFix/J автоматически заполняет в теге <267> количество параметров, мы можем только заполнить нужные нам поля и добавить каждую группу в сообщение:
MarketDataRequest.NoMDEntryTypes group = new MarketDataRequest.NoMDEntryTypes(); //267 group.set(new MDEntryType(MDEntryType.BID)); marketDataRequest.addGroup(group); group.set(new MDEntryType(MDEntryType.OFFER)); marketDataRequest.addGroup(group);
В сообщении будет выглядеть так: 267=2 269=0 269=1.
Можно делать MDR сразу для нескольких инструментов, в поле <146> NoRelatedSum передается их количество и далее заполняются группы тегов для каждого инструмента. Для простого запроса достаточно передать идентификатор инструмента в теге <55> (для более сложных запросов на фьючерсы или опционы нужно указывать дополнительные параметры, но для нашего базового случая это не нужно).
MarketDataRequest.NoRelatedSym instrument = new MarketDataRequest.NoRelatedSym(); instrument.set(new Symbol(symbol)); marketDataRequest.addGroup(instrument);
В сообщении: 146=1 55=AAPL.
Наш полученный MDR теперь можно отправить на сервер с помощью метода session.send():
@Override public void sendMarkedDataRequest(String symbol) { sessions.forEach((sessionID, session) -> session.send(MsgUtils.createMarketDataRequest(symbol))); }
Аналогично можно реализовать методы отправки любого другого сообщения (на создание заявки, на получение детальной информации об инструменте и т.д).
Для полученного метода отправки запроса на получение рыночных данных создадим REST endpoint, чтобы мы могли инициировать его отправку:
@PostMapping(value = "/market-data-request") public void sendMarketDataRequest(@RequestParam("symbol") String symbol) { fixClientService.sendMarkedDataRequest(symbol); }
Так будет выглядеть запрос, чтобы создать и отправить сообщение для получения данных об акциях Apple:
POST localhost:9090/fix-client/v1/market-data-request?symbol=APPL.
В результате будет отправлено сообщение:
8=FIX.4.2 9=117 35=V 34=3 49=FIX_CLIENT 52=20200601-17:10:34.103 56=EXEC 262=FixClient-1 263=0 264=1 146=1 55=AAPL 267=2 269=0 269=1 10=018
Обработка ответа и сохранение рыночных данных
Создадим отдельный сервис (MarketDataService), который будет обрабатывать рыночные данные, полученные в результате отправки запроса. Он будет сохранять полученные данные в объект, записывать их в память и отдавать при запросе по идентификатору инструмента.
Класс для хранения рыночных данных:
@Data @Accessors(chain = true) public class MarketDataModel { private String symbol; private BigDecimal bid; private BigDecimal ask; }
Теперь нужно разобраться, как правильно обработать сообщение с данными и сохранить его.
Вот так выглядит сообщение, отправленное нам в ответ на запрос по символу AAPL:
8=FIX.4.2 9=104 35=W 34=3 49=EXEC 52=20200601-17:10:34.119 56=FIX_CLIENT 55=AAPL 262=FixClient-1 268=1 269=0 270=123.45 10=236.
Так как при запросе мы указывали группы параметров (Bid, Ask и т.д), разбирать сообщение тоже будем по группам:
message.getGroups(NoMDEntries.FIELD).forEach(group -> { int type = MsgUtils.getIntField(group, MDEntryType.FIELD).orElse(-1); BigDecimal value = MsgUtils.getDecimalField(group, MDEntryPx.FIELD).orElse(BigDecimal.ZERO); switch (type) { case 0: dataModel.setBid(value); break; case 1: dataModel.setAsk(value); break; default: log.warn("Invalid entry type: {}", type); break; } });
В теге <269> хранится название параметра, а в теге <270> его значение. Соответственно, если тип параметра = 0 (т.е. Bid), то мы сохраняем значение соответствующего ему тега <270> в поле bid нашего объекта.
Далее проверяем тег <55> – идентификатор инструмента, и сохраняем по нему наши данные:
MsgUtils.getStrField(message, Symbol.FIELD).ifPresent(s -> { dataModel.setSymbol(s); marketData.put(s, dataModel); });
Осталось только добавить сохранение данных в метод fromApp в случай обработки сообщения типа MarketDataSnapshotFullRefresh:
case MARKET_DATA_SNAPSHOT_FULL_REFRESH: marketDataService.saveMarketData(message); break;
Теперь при получении нашим приложением сообщения типа MarketDataSnapshotFullRefresh будет происходить обработка и сохранение данных в память приложения.
Соответственно в отдельный Rest-Controller добавляем метод получения данных по идентификатору:
@GetMapping public ResponseEntity<MarketDataModel> getMarketData(@RequestParam("symbol") String symbol) { return new ResponseEntity<>(marketDataService.getMarketData(symbol), HttpStatus.OK); }
Вызвав метод GET localhost:9090/fix-client/v1/market-data?symbol=AAPL
получим ответ:
{ "symbol": "AAPL", "bid": 123.45, "ask": null }
Запуск приложения
Наконец, можем запустить наше приложение, убедиться, что подключение к серверу осуществляется успешно, и попробовать отправить запрос на получение рыночных данных.
Если при запуске приложения в логах отображаются ошибки подключения (ConnectException), как на скриншоте ниже, проверьте, что сервер запущен и что вы указали правильные идентификаторы клиента и сервера и хост и порт для подключения:

В случае успешного запуска клиент и сервер должны обменяться Logon-сообщениями:

Отправим запрос POST localhost:9090/fix-client/v1/market-data-request?symbol=APPL, чтобы вызвать отправку сообщения MDR и убедимся, что сообщение действительно отправлено и ответ на него получен:

Кстати, сообщения можно удобно парсить с помощью сайта – просто вставляете текст сообщения и получаете разбор по тегам и значениям:

Теперь вызвав метод GET localhost:9090/fix-client/v1/market-data?symbol=AAPL
мы должны получить ответ:
{ "symbol": "AAPL", "bid": 123.45, "ask": null }
Работает!
Конечно, на таком “игрушечном” примере далеко не уедешь, но для начала он хорошо подходит. Для более сложных примеров и для работы с условиями, приближенными к реальной бирже, можно получить доступ к тестовому контуру Московской биржи (MOEX) — для этого нужно оставить заявку на сайте. Я не нашла аналогичных тестовых контуров у других крупных бирж (именно для подключения напрямую через FIX-протокол), кроме симуляторов биржевой торговли, где выдаются виртуальные деньги и с помощью терминалов осуществляется торговля. Если знаете, где найти хороший тестовый сервер для работы по протоколу FIX, — поделитесь в комментариях, буду благодарна.
В следующей статье я планирую рассмотреть основные виды FIX-сообщений (соответственно дополнить приложение методами для их создания) и далее перейти к подробному рассмотрению процесса создания торговых заявок и их обработки биржей. Все примеры сообщений по-прежнему можно создавать с помощью приложения MiniFIX, если не хотите писать реализацию своего клиента.
ссылка на оригинал статьи https://habr.com/ru/post/505022/
Добавить комментарий