Event-Driven Architecture в высоконагруженном ДБО: наш опыт

от автора

Привет, Хабр! Меня зовут Александр, я главный солюшн архитектор трайба в ОТП Банке мы с моей коллегой Екатериной, senior разработчиком трайба, в продолжение прошлой статьи расскажем вам про вызовы, с которыми мы столкнулись при реализации интеграций нашего ДБО с бэкофисом. 

введение

введение

Зачем нужна эта статья? Начну с небольшой истории из личного опыта.

В самом начале проекта по разработке нашего ДБО передо мной стояла задача выбрать подрядчика. Ключевым критерием было совпадение в архитектурных подходах. В ходе одного из интервью с архитектором от потенциального подрядчика (далее «Андрей П.», имя вымышленное) состоялся интересный диалог:

Я: начнем с вопроса интеграции нашего ДБО с АБС. Как вы предлагаете выстраивать обмен данными? Какие инструменты будут в основе?

Андрей П.: Без сомнений, прямое соединение с базой данных (DBLink). Это даст максимальную скорость и безупречную консистентность. Клиент всегда будет видеть актуальные данные.

Я: допустим, что безопасность банка категорически запрещает прямые соединения с БД АБС из периметра ДБО. Что тогда?

Андрей П.: В таком случае, очевидной альтернативой является REST API. Современно, стандартно, все умеют с этим работать. Быстро и надежно.

Я: А как быть с доступностью? Если АБС «легло» на плановое обслуживание или из-за инцидента, как мы будем предоставлять сервис клиентам? ДБО должно быть всегда онлайн.

Андрей П.: Это проблема уровня АБС. Её нужно предотвращать на их стороне — обеспечивать отказоустойчивый кластер и минимальные окна простоя. 

Последующие несколько минут я пытался осторожно навести его на мысль о событийной модели интеграции, но натолкнулся на контраргументы: такая модель потребует избыточной инфраструктуры для поддержания кэша, неизбежно породит проблемы с консистентностью данных и в целом снизит надежность системы по сравнению с «проверенными» методами.

Этот диалог, в той или иной форме, повторялся после не раз. Он всплывает в дискуссиях с новыми разработчиками, аналитиками и даже опытными архитекторами, которые присоединяются к нашей команде. Устоявшееся мышление, ориентированное на синхронные point-to-point взаимодействия, оказывается невероятно сильным.

Именно этот повторяющийся спор и стал одним из поводов для данной статьи. В ней я систематизирую весь наш практический опыт, полученный при построении высоконагруженного и отказоустойчивого ДБО. Мы разберем, почему классические подходы (DBLink, REST) ведут в тупик при масштабировании, и докажем на реальных кейсах и архитектурных паттернах, что событийная модель — не усложнение, а единственный путь к созданию по-настоящему надежных и масштабируемых систем в контексте современного банкинга.

Глава 1. Проблема синхронной интеграции с АБС

как организовать интеграцию

как организовать интеграцию

Первой и ключевой проблемой при проектировании высоконагруженного сервиса (дальше ДБО) стала интеграция с нашим бэкофисом и в частности с АБС системой. Из коробки поставщик АБС предлагал два варианта: прямые соединения с базой данных (DB-link) и REST API.

Прямой доступ к БД был категорически отвергнут по соображениям информационной безопасности, из-за риска создания сильной связности (tight coupling) и неподконтрольного влияния на производительность критической системы.

Оставался только REST API, который и был реализован в первой версии MVP. Тем не менее этот подход является антипаттерном и нам в долгосрочном решении не подходит. Рассмотрим, почему в нашем случае синхрон — это плохо:

1. Проблема нагрузки и масштабируемости

Главный недостаток синхронного подхода — лавинообразный рост нагрузки на АБС прямо пропорционально росту клиентской базы ДБО. Даже на первой тысяче мигрированных клиентов уже получали 100 RPS и АБС уходила в отказ от обслуживания (DoS). Проблема частично решалась использованием кэширования, но устранить ее полностью было невозможно.

2. Проблема задержек и пользовательского опыта (UX)

Синхронный вызов создает длинную цепочку зависимостей:

Клиент -> API-Gateway -> Микросервис ДБО (их тут обычно больше одного) -> API-Gateway -> адаптер АБС -> АБС

Задержки на каждом этапе (сеть, обработка) суммируются. Если АБС под нагрузкой отвечает за 2-3 секунды, а не за 200-300 мс, клиентское приложение будет ожидать ответа 5-10 секунд. При первом входе, когда данных в кэше нет или они неактуальны, необходимо подгрузить большой объем данных. Это время становится неприемлемым и ведет к потере клиентов.

Детали реализации с примерами кода

Для того, чтобы АБС справлялась с нагрузкой, сделали ограничение нагрузки. То есть, на нескольких инстансах запускается задача, которая берет только разрешенной количество соединений, при этом это ограничение должно действовать на все инстансы. Например, на 6 инстансах сервиса одновременно могут работать только 50 потоков для выкачивания данных по ресту.

Есть 2 таблички, в одной мы храним данные организаций, в другой данные счетов этих организаций. Каждая запись может быть в одном из 3 состояний WAITING_FOR_PROCESSING, PROCESSING, DONE

@Bean public ExecutorService threadPoolExecutor() {     // ограничиваем пул     final var executor = Executors.newFixedThreadPool(50);     return ContextExecutorService.wrap(executor,         // продолжаем трейсинг, если уходим в новый тред         () -> ContextSnapshotFactory.builder().build().captureAll()); }
@SchedulerLock(name = "processTransactionEnrichmentPool") @Scheduled(cron = "0 */2 * * * *") public void processTransactionEnrichmentPool() {         final List<TransactionEnrichmentAccount> accounts = new ArrayList<>();         // у каждой организации может быть несколько счетов. При этом из за ограничений нагрузки у каждой в обработке может находиться все или только часть счетов.         // Поэтому сначала пытаемся найти все организации, у которых уже началась обработка, но были забраны не все счета         accounts.addAll(getReadyToProcessAccounts(PROCESSING, 0));         // а затем берем новые организации         accounts.addAll(getReadyToProcessAccounts(WAITING_FOR_PROCESSING, accounts.size()));         // запускаем загрузку         accounts.forEach(this::processAccount); }    private List<TransactionEnrichmentAccount> getReadyToProcessAccounts(@Nonnull final TransactionEnrichmentStatus organizationStatus, final int readyToProcessAccountSize) {         final int freeCount = Math.max(accountMaxProcessingCount - IN_PROCESS_ACCOUNTS.size() - readyToProcessAccountSize, 0);         if (freeCount == 0) {             // если все слоты заняты, то не добавляем счета в обработку             return List.of();         }         // берем первые организации, которые были добавлены в очередь         final var organizations = organizationRepository.findOldestByStatus(organizationStatus, freeCount);         final var accountsByStatus = organizations             .stream()             .flatMap(organization -> organization.getTransactionEnrichmentAccounts().stream())             .collect(Collectors.groupingBy(TransactionEnrichmentAccount::getStatus));         // смотрим сколько свободных слотов, выкинув данные тез счетов, которые уже в обработке         final int freeCountWithoutOtherEnv = freeCount - accountsByStatus.getOrDefault(PROCESSING, List.of()).size();         if (freeCountWithoutOtherEnv <= 0) {             // если все слоты заняты, то не добавляем счета в обработку             return List.of();         }         // добираем данные для оставшихся свободных слотов         final var accounts = accountsByStatus.getOrDefault(WAITING_FOR_PROCESSING, List.of());         final List<TransactionEnrichmentAccount> availableAccounts = accounts.size() < freeCountWithoutOtherEnv ?             accounts : accounts.subList(0, freeCountWithoutOtherEnv);         availableAccounts.forEach(account -> {             // фиксируем, что начали обработку             account.startProcessing();         });         // если все аккаунты обработались, то ставим статус "окончено" для организации         organizations.forEach(TransactionEnrichmentOrganization::checkStatusDoneToSetStatusIsDone);         organizationRepository.saveAll(organizations);         return availableAccounts;     }          private void processAccount(final TransactionEnrichmentAccount account) {         CompletableFuture.runAsync(() -> {             Thread.currentThread().setName("Enrichment-" + account.getOrganizationId() + "-" + account.getAccountNumber());             // начинаем загружать данные из АБС             // когда загрузка закончится, надо обновить данные             account.setStatus(DONE);             accountRepository.save(account);         }, transactionEnrichmentExecutor);     }

 

3. Архитектурный антипаттерн: «Распределенный монолит» (Distributed Monolith)

Мы пришли к ключевой архитектурной проблеме. Внешне система выглядит как современная микросервисная архитектура (кубер кластер, куча микросервисов). Однако, все эти сервисы жестко и синхронно зависят от одной точки отказа — АБС. Это не микросервисы, распределенный монолит.

   Слабая изоляция отказов: Падение АБС немедленно и гарантированно «валит» всё ДБО.

   Нулевая независимость масштабирования: Мы не можем масштабировать сервисы ДБО независимо от АБС. Увеличивая количество инстансов микросервисов ДБО, мы лишь увеличиваем нагрузку на АБС, усугубляя проблему.

   Сниженная надежность: Общая надежность системы (ДБО + АБС) становится равной произведению их надежностей. 0.999  0.99 = 0.989 (98.9%), что ниже целевого показателя в 99.99% для ДБО.

Теоретическая основа: почему CAP-теорема важна.

CAP в системах финтеха

CAP в системах финтеха

Любое решение в распределенных системах — это компромисс. CAP-теорема (Consistency, Availability, Partition Tolerance) — фундаментальный принцип, который диктует нам условия этого компромисса. Она гласит, что в распределенной системе можно гарантировать выполнение не более двух из трех свойств одновременно. При интеграции ДБО и АБС мы имеем классический распределенный кластер с сетевым взаимодействием, где сеть ненадежна по определению (Partition Tolerance — P). Следовательно, выбор стоит между консистентностью (C) и доступностью (A).

Синхронный REST-подход выбирает консистентность (C) в ущерб доступности (A). Мы блокируем клиента и ждем ответа от АБС, чтобы немедленно дать ему актуальные данные. Если АБС недоступна, мы не можем дать клиенту никаких данных — мы жертвуем доступностью.

Для ДБО правильный выбор — это доступность и устойчивость к разделению (AP). Мы должны отвечать клиенту мгновенно, даже если данные могут быть не совсем актуальными на данный момент (Eventually Consistent — конечная согласованность). Клиент может работать с системой (просматривать кэшированные данные, инициировать новые операции), а система гарантированно синхронизирует состояния в фоне, когда АБС снова станет доступна.

Этот выбор определяет переход на событийную модель (Event-Driven Architecture), которая и будет рассмотрена дальше.

Глава 2: Выбор и настройка брокера сообщений (Kafka). Первые проблемы

Первым и очевидным кандидатом на роль брокера сообщений для нашей Event-Driven архитектуры стал Apache Kafka. Ключевым требованием была необходимость развертывания кластера Kafka в строгом соответствии с зональностью сети банка.

Архитектура и ключевые настройки

схематичная архитектура развертки кластера kafka

схематичная архитектура развертки кластера kafka

Зона CDE: В этой зоне с самыми строгими требованиями безопасности находится АБС — источник истинных данных.

  Зона CDE-connected-to (security impacted systems): В этой зоне располагаются системы, взаимодействующие с остальной сетью банка, в частности внутренняя сеть и наше ДБО. Здесь был развернут наш кластер кафки. Больше про работу с зоной CDE можно почитать тут: PCI-DSS. Для обеспечения отказоустойчивости был развернут кластер из 3-х брокеров Kafka в каждой зоне безопасности.

   Топики и партиции: Для изоляции потоков данных было создано 6 топиков в соответствии с бизнес процессами. Каждый топик был разделен на 6 партиций для обеспечения параллельной обработки. Количество партиций было выбрано исходя из планируемой пиковой нагрузки и является «верхним пределом» для количества консьюмеров в группе.

   Ретеншен (Retention): retention.ms = 1209600000 (2 недели). Этот период был взят с большим запасом, прежде всего на время инцидентов. Если консьюмерная группа отстает из-за сбоя, у нее есть 14 дней, чтобы восстановиться и дочитать данные без потери.

   Фактор репликации (Replication Factor): replication.factor = 3. Каждое сообщение хранилось на всех трех брокерах кластера, что гарантировало сохранность данных при падении любого одного-двух брокеров (в зависимости от min.insync.replicas).

   Подтверждение записи (Acks): acks = all. Это наиболее строгая настройка, гарантирующая, что лидер подтвердит запись только после того, как все синхронные реплики (in-sync replicas) получат сообщение. Это обеспечивает сохранность данных даже при падении лидера партиции.

   Мониторинг лага (Consumer Lag): Лаг консьюмера более 1 часа считался инцидентом, требующим немедленной реакции. Для мониторинга использовались Prometheus + Grafana с дашбордами, отображающими отставание каждой консьюмерной группы по всем партициям, а также log alerting, который присылает на почту детализированную информацию 

Детали реализации с примерами кода
final var recordDateTime = Instant.ofEpochMilli(kafkaRecord.timestamp());     final boolean oldRecord = recordDateTime.isBefore(Instant.now().minus(1, ChronoUnit.HOURS));     if (oldRecord) {             log.error("New kafka record queue is too big {}, lag time:{}, topic: {}, partition: {}, offset: {}",                 LocalDateTime.ofInstant(recordDateTime, ZoneId.systemDefault()), timeDurationFrom(recordDateTime),                 kafkaRecord.topic(), kafkaRecord.partition(), kafkaRecord.offset());     }

Так же были добавлены кастомные хелсчеки на кафку

@Service @RequiredArgsConstructor @Log4j2 @ConditionalOnBean(name = "ibSmeKafkaAdminClient") public class IbSmeKafkaHealthIndicator extends AbstractHealthIndicator {       private final AdminClient ibSmeKafkaAdminClient;       @Override     protected void doHealthCheck(final Health.Builder builder) {         final DescribeTopicsResult describeTopicsResult = ibSmeKafkaAdminClient.describeTopics(             List.of(topic1, topic2, topic3));         builder.up()             .withDetail("topics", describeTopicsResult.allTopicNames())             .build();     } }

 

Проблема «серых» исправлений и консистентности данных

Первоначальная выгрузка данных была проведена вручную, и система работала стабильно, пока не проявилась фундаментальная проблема: не все события по транзакциям отлавливались приложением на стороне АБС, некоторые шли в обход API.

Поскольку механизм генерации событий был заточен под изменения, инициированные через API АБС, эти прямые правки не фиксировались в логе и, следовательно, не публиковались в Kafka. Это привело к расхождению между состоянием АБС и данными в кэше ДБО. Так как выписки и прочие финансовые документы формируются на основе кэша, это создало критические риски.

Варианты устранения проблемы консистенции:

1.  Ручная синхронизация через REST (Reactive Approach):

       Суть: Оставить старый REST-эндпоинт и предоставить команде поддержки или продукт-менеджерам возможность вручную инициировать полную перевыгрузку данных для конкретного клиента через админку.

       Минусы: Реактивная, а не проактивная модель. Мы узнаем о проблеме только от разгневанного клиента. Процесс ручной, трудоемкий и не масштабируемый. Не решает проблему скрытых, необнаруженных расхождений.

2.  Полная периодическая инвалидация кэша (Scheduled Invalidation):

       Суть: Регулярно (раз в день/неделю) помечать все данные устаревшими и инициировать фоновую массовую пересинхронизацию через тот же REST API.

       Минусы: Создает чудовищную нагрузку на АБС, аналогичную изначальной проблеме REST-подхода. Фактически мы сами организуем себе DoS-атаку в нерабочее время. Кроме того, временные окна для таких операций могут быть крайне малы, а объем данных — слишком велик. Также возможны расхождения в данных, которые изменились во время длительной синхронизации.

3.  Инвалидация кэша и загрузка через ETL в нерабочее время (Compensating Transaction Pattern):

       Суть: Реализовать компенсирующую транзакцию — регулярный (ежедневный) ETL-процесс (Extract, Transform, Load), запускаемый в строго отведенное низконагрузочное окно (например, с 2:00 до 5:00 ночи). Этот процесс, автоматизированный с помощью Apache Airflow, выполняет следующие шаги:

        1.  Extract: Выполняет выгрузку «золотого снапшота» ключевых данных (например, текущих остатков на счетах, актуальных реквизитов договоров) напрямую из реплики БД АБС, минимизируя импакт на основную базу.

        2.  Transform: Преобразует данные в формат, пригодный для загрузки в кэш ДБО.

        3.  Load: Не инвалидирует весь кэш целиком, а производит точечное обновление записей в кэше ДБО на основе выгруженного снапшота.

       Преимущество перед п.2: Кардинально снижает нагрузку на АБС, так как:

        —   Используется реплика БД, а не рабочий инстанс.

        —   Выполняется один сложный SELECT для выгрузки снапшота, а не тысячи мелких REST-запросов.

        —   Процесс происходит в строго регламентированное время.

       Недостаток: Система не является полностью реактивной. Консистентность данных гарантируется только на момент выполнения ETL-задачи («на начало дня»). Расхождения, возникшие в течение дня из-за прямых правок в БД, будут устранены только следующей ночью. Это trade-off между сложностью реализации полноценного CDC и рисками расхождений.

Данный подход, несмотря на свою архаичность, часто является единственным быстрым и надежным способом побороть «серые» операции. Более элегантного решение мы пока не придумали. CDC тут к сожалению ложится плохо, так как триггеры именно в коде АБС, скидывать всю таблицу в кафку возможности нет как в части требований безопасности, так и из соображений экономии инфраструктуры.

Инцидент с «лавиной событий»

(Наша команда с командой АБС на разборе инцидента)

(Наша команда с командой АБС на разборе инцидента)

В завершение главы стоит рассказать о показательном инциденте, который наглядно демонстрирует важность правильной настройки потребителей Kafka. В один из рабочих дней операционист принял решение закрыть проводки, зависшие еще 10 лет назад. Казалось бы, рутинная операция. Однако она спровоцировала лавинообразный выброс событий в Kafka: закрытие одной проводки за 2015 год вызывало каскадный пересчет всей связанной ленты операций за десятилетний период.

Технически это выразилось в следующем:

— Размер отдельных (менее 1 процента) сообщений в топике достигал максимально разрешенного лимита в 1 МБ.

— В одно сообщение упаковывалось до 10 000 проводок.

Каждое такое сообщение инициировало более 5 различных внутренних интеграций:

  — Обмен данными между микросервисами ДБО.

  — Запись в Elasticsearch для полнотекстового поиска.

  — Обновление кэшей клиентских приложений.

Первые признаки проблемы проявились в виде участившихся ребалансов потребительской группы. Оффсет практически не двигался, хотя потребители были активны. Причина оказалась в том, что обработка одного сообщения занимала больше времени, чем разрешено параметром max.poll.interval.ms. Консьюмер исключался из группы, происходил ребаланс, и сообщение переходило к следующему потребителю — который также не успевал его обработать.

Наши попытки решить проблему включали:

1. Увеличение количества инстансов потребителей (запускали на двух ЦОДах до 6 под, на каждую поду по 2 консумера).

2. Эксперименты с настройками были разными, доводили цифры до абсурдных значений, но в итоге остановились на этом:

max.poll.interval.ms=900000    # 15 минут   session.timeout.ms=45000       # 45 секунд   heartbeat.interval.ms=15000    # 15 секунд   request.timeout.ms=30000 # 30 секунд
private static ConcurrentKafkaListenerContainerFactory<String, String> buildKafkaListenerContainerFactory(@NonNull final ConsumerFactory<String, String> kafkaConsumerFactory, @NonNull final KafkaProperties kafkaProperties) {         final var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();         factory.setConsumerFactory(kafkaConsumerFactory);         factory.setBatchListener(false);         factory.getContainerProperties().setDeliveryAttemptHeader(true);    // получаем данные о номере попытки обработки         factory.getContainerProperties().setObservationEnabled(true);       // включаем трейсинг (traceId), чтобы мы могли найти проблему при межсервисном взаимодействии по одному идентификатору         // То есть, если сервис отправляет в кафку сообщение, то потребитель получает traceId и мы можем видеть всю обработку         factory.setCommonErrorHandler(buildErrorHandler(kafkaProperties.getMaxOnErrorAttempts())); // конфигурируем обработку ошибок на кафке, разрешаем сервису сделать ограниченной количество ретраев перед тем, как потерять сообщение         return factory;     }       private static DefaultErrorHandler buildErrorHandler(int maxOnErrorAttempts) {         final var fixedBackOff = new FixedBackOff();         fixedBackOff.setMaxAttempts(maxOnErrorAttempts == 0 ? 5 : maxOnErrorAttempts);         final var errorHandler = new DefaultErrorHandler((consumerRecord, e) ->             // logic to execute when all the retry attempts are exhausted             log.error("Last process attempt for kafka message {}", consumerRecord, e), fixedBackOff);         // после исчерпания количества попыток отправляется ак на сообщение, чтобы не зависала очередь, при этом информация о потерянном сообщении будет отправлена на почту для детального разбора         errorHandler.setCommitRecovered(true);         errorHandler.setAckAfterHandle(true);         return errorHandler;     }

Однако все эти попытки давали лишь временное улучшение. Кардинальное решение пришло только после анализа бизнес-логики: было принято решение игнорировать события старше 6 месяцев через внедрение фильтрации на уровне потребителя. Это позволило:

— Снизить нагрузку на все компоненты системы.

— Избежать ребалансов потребительской группы.

— Удерживать обработку сообщений в рамках допустимых таймаутов.

 private void processUpdate(final KafkaMessage message) {         // дата операции         final var date = DateUtils.formatDateWithSlashSeparator(message.getDate());         // OldestHistoryDate = текущая дата - 6 месяцев         if (date.isBefore(getOldestHistoryDate())) {             log.info("Skip too old turnover with date {}", date);             return;         }         // обработка сообщения     }

Этот инцидент наглядно показал, что даже правильно спроектированная событийная архитектура требует глубокого понимания предметной области и тщательной настройки всех компонентов. Важно не только уметь обрабатывать сообщения, но и понимать, какие из них действительно несут бизнес-ценность в текущем контексте.

Глава 3: Событийная интеграция через MQ. Вызовы гарантированной доставки

Несмотря на успешное внедрение Kafka в ядре нашей архитектуры, оказалось, что этот инструмент применим не везде. Ограничения были не только техническими, но и, что часто встречается в крупных организациях, бюрократическими и историческими.

Ключевые системы, такие как CRM (управление взаимоотношениями с клиентами) и MDM (Master Data Management — управление мастер-данными), исторически использовали для интеграции REST API. Несмотря на наличие Kafka в стеке, политика информационной безопасности и устоявшиеся практики этих команд диктовали использование MQ либо REST в качестве канала для передачи данных. Все аргументы против синхронного REST-подхода, описанные ранее оставались в силе, но повлиять на выбор технологий партнерских команд не представилось возможным. Это привело к усложнению архитектуры.

Характер данных и требования к консистентности

Данные, поступающие из этих систем, являются критически важными для бизнеса и требуют максимального уровня консистентности:

   Из CRM: Наиболее чувствительные данные — полномочия и права доступа клиентов (например, кто может подписывать платежные документы, кто имеет доступ к тем или иным счетам). Расхождение в этих данных между источником и нашим кэшем может привести к серьезным операционным рискам и финансовым потерям.

   Из MDM: Актуальные персональные данные и реквизиты клиентов (как юридических, так и физических лиц). Неверные реквизиты при формировании платежных документов или неактуальный юридический адрес при выводе справок — прямые риски для репутации банка и качества обслуживания.

Таким образом, мы не могли позволить себе потерю или «проседание» даже одного сообщения из этих потоков.

Анализ возможных вариантов обеспечения консистентности:

1.  Гибридный подход с фоновой синхронизацией (Временное решение):

Гибридный подход с фоновой синхронизацией

Гибридный подход с фоновой синхронизацией 

Суть: Асинхронно читаем очередь MQ и обновляем кэш. При этом, при каждом входе клиента в ДБО, в фоновом режиме инициируется синхронный REST-вызов к системе-источнику для проверки актуальности ключевых данных (полномочий, реквизитов).

       Недостатки: Фактически, мы не избавляемся от синхронной зависимости. Мы лишь маскируем ее, перенося на «момент входа». Падение CRM/MDM в этот момент приведет к задержкам или невозможности входа для клиента. Данные старше суток считаются неактуальными, что создает постоянный фоновый стресс для системы и клиента.

2.  Использование встроенных механизмов MQ:

Использование встроенных механизмов MQ

Использование встроенных механизмов MQ

Суть: Использование механизма подтверждений (квитанций) о доставке сообщений на стороне MQ.

       Недостатки: системы-источники имеют множество подписчиков. Обработка квитанций от всех потребителей создает значительную нагрузку и сложность логики на стороне отправителя. Это может легко привести к исчерпанию ресурсов и эффекту DoS. Кроме того, этот механизм не защищает от потери сообщения до его попадания в очередь (сбой на стороне отправителя) или от логических ошибок обработки на стороне потребителя.

3.  Сквозная нумерация сообщений (Sequence ID):

Сквозная нумерация сообщений (Sequence ID)

Сквозная нумерация сообщений (Sequence ID)

Суть: Каждое сообщение, отправляемое источником, содержит порядковый номер (Sequence ID). Потребитель (наша система) ведет учет этих номеров и детектирует пропуски.

       Преимущества: Простота реализации на стороне потребителя. Позволяет четко идентифицировать факт потери сообщения.

       Недостатки: При обнаружении пропуска необходимо запросить повторную отправку именно этого сообщения. Это требует создания отдельного механизма рестарта потока (retransmission request) на стороне источника, что часто отсутствует в legacy-системах. Также не решает проблему пропуска сообщения на источнике.

4.  Ежедневная полная синхронизация через ETL:

Ежедневная полная синхронизация через ETL

Ежедневная полная синхронизация через ETL

   Суть: MQ используется для оперативных уведомлений об изменениях, но раз в сутки (ночью) запускается ETL-задача (например, на Apache Airflow), которая выгружает полный слепок актуальных данных из источника и полностью перезаписывает соответствующий сегмент кэша.

       Преимущества: Обеспечивает высокий уровень консистентности «на начало дня». Снимает нагрузку с MQ и источника, так как не требует сложных механизмов повтора.

       Недостатки: Данные в течение дня могут расходиться. Архитектурно это костыль. Требует поддержания двух параллельных механизмов обновления (MQ + ETL). Так как со стороны информационной безопасности вариант приемлем, он остается как запасной.

5.  Комбинированная стратегия «Точечный ретрай + Валидация»:

Комбинированная стратегия "Точечный ретрай + Валидация"

Комбинированная стратегия «Точечный ретрай + Валидация»

 Осознавая недостатки всех предыдущих вариантов, мы разработали гибридный подход, который лег в основу нашей интеграции:

    a) Idempotent Consumer с проверкой последовательностей:

    Наш консьюмер MQ был спроектирован как идемпотентный. Он хранит в своем локальном хранилище максимальный Sequence ID, успешно обработанный для каждого клиента. При получении нового сообщения проверяется его номер.

     —  Если номер N+1 — данные применяются к кэшу, номер сохраняется.

     —  Если номер <= N — сообщение игнорируется (дубликат).

     —  Если номер > N+1 — фиксируется пропуск.

    b) Точечный запрос при обнаружении пропуска:

    При детектировании пропуска (например, получили сообщение N+5 вместо N+1), система не инициирует массовую перевыгрузку. Вместо этого она отправляет точечный REST-запрос к API источника (CRM/MDM) для получения актуального состояния той сущности, по которой обнаружен пропуск. Это может быть запрос по ID клиента или договора.

    c) Фоновая валидация по расписанию:

    Для подстраховки раз в несколько часов запускается фоновая задача, которая выборочно (для 1% рандомных клиентов) сверяет Sequence ID в нашем кэше с эталонным логом на стороне источника.

    Почему этот подход оказался оптимальным:

    —   Минимальная нагрузка на источник: Точечные REST-запросы при пропуске одного сообщения создают на несколько порядков меньшую нагрузку, чем постоянные проверки или полные выгрузки.

    —   Высокая гарантия доставки: Мы не пропускаем пропуски и активно их исправляем.

    —   Отказ от синхронности: Основной трафик идет асинхронно через MQ. Синхронные вызовы происходят редко и только по необходимости, не блокируя работу системы в целом.

    —   Приемлемый уровень риска: Комбинация идемпотентности, детекции пропусков и фоновой валидации снижает риск расхождений до приемлемого минимума.

Это решение позволит нам соблюсти бюрократические требования к использованию MQ, обеспечив при этом тот уровень надежности и консистентности, который необходим для работы с критичными бизнес-данными.

Заключение

Было реализовано в нашем MVP, где скорость была важнее качества

было в mvp

было в mvp

К чему стремимся, где на первый план выдвигается доступность без проблем в безопасности

to be

to be

Наш путь от синхронной REST-интеграции к гибридной событийной модели позволил сформулировать несколько ключевых принципов, которые выходят далеко за рамки выбора между Kafka и MQ. Это скорее уроки о принятии сложности в обмен на надежность.

1. Сложность как плата за гибкость
Да, событийная модель сложнее прямых вызовов. Но эта сложность — осознанная плата за гибкость и масштабируемость. Мы получили систему, которую можно развивать, не опасаясь создать точку отказа.

2. Глубже топиков: консистентность требует усилий
Событийная модель — не просто брокер и топики. Для критичных потоков данных приходится вводить дополнительные механизмы и усложнять архитектуру для соблюдения консистетности.

3. Отказоустойчивость — главный приоритет
Все это служит одной цели — сделать систему по настоящему отказоустойчивой. Мы сознательно выбрали статус AP-системы, где доступность клиента важнее сиюминутной консистентности всех данных. Клиент всегда может работать с сервисом, пока фоном идет синхронизация.

Интеграция систем в большом банке похожа на реконструкцию самолета во время полета. Вы не можете просто остановить все процессы и переписать их с нуля. Событийная архитектура в этом смысле становится системой автоматического пилотирования — она позволяет постепенно, эволюционно отключать прямые синхронные связи, заменяя их гибкими, асинхронными потоками данных.

Это не просто смена технологий — это смена парадигмы мышления: от создания статичных, хрупких связей к построению живой, пульсирующей сети данных, которая способна адаптироваться к сбоям, выдерживать нагрузку и расти вместе с бизнесом. Начните с анализа ваших самых болезненных синхронных интеграций — и вы найдете точку входа для внедрения событийного подхода. Мы построили не просто новую систему — мы заложили основу для цифровой экосистемы банка будущего, где данные становятся стратегическим активом, а не побочным продуктом работы разрозненных систем.


ссылка на оригинал статьи https://habr.com/ru/articles/943980/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *