Изучаем Storm Framework. Часть III

от автора

Во второй части статьи рассказывалось о механизмах обнаружения ошибок в процессе обработки.

Обработка завершилась с ошибкой, что делать дальше? Вполне возможно, что потеряна связь с одним из узлов кластера или временно недоступна база данных. В этом случае, нельзя с уверенностью сказать, какие операции выполнились успешно, а какие — нет. Если все операции в цепочке повторно применимы (идемпотентны), например установка флага, то можно просто перезапустить обработку. Если нет, то на помощь приходят механизмы транзакций Storm.

Когда говорят о характеристиках транзакций, тут же всплывает термин ACID:

  • Atomicity (атомарность). Все изменения произведенные в системе на протяжении транзакции, либо применяются полностью, либо не применяются совсем.
  • Consistency (cогласованность). Транзакция переводит систему из одного непртиворечивого состояния в другое.
  • Isolation (изолированность). Параллельно выполняемые транзакции не оказывают влияние на результат работы друг друга.
  • Durability (надежность). Зафиксированные транзакцией изменения гарантированно остаются в системе.

Consistency и Durability в большей степерни относятся к базам данных. Нас будут интересовать Atomicity и Isolation.

В версии 0.8.0 в Storm появилась подсистема Trident — аналог Apache Pig. В нее же перекочевал функционал Transactional topology.

Транзакции в Storm

Atomicity

В Topology создается объект реализующий интерфейс State, инкапсулирующий работу с БД. Входные данные, поступающие в Spout, разбиваются на Tuple и собираются в пакеты (batch). Batch ассоцируется с уникальным transaction id. Tuple образующие batch могут обрабатываться параллельно.
В конце цепочки обработки, набор Tuple, относящихся к одной транзакции, передается в метод updateState класса, реализующего интерфейс StateUpdater, который и призводит модификацию State. В случае успешного завершения, Spout получает уведомление об успехе обработки batch’a. В случае ошибки, Spout должен передать на обработку весь batch повторно.
Таким образом Storm гарантирует, что Batch будет зафиксирован в БД полностью и только один раз.

Isolation

Storm гарантирует, что Batch’и передаются в StateUpdater строго последовательно, в порядке возрастания transaction id. То есть Batch #2 будет зафиксирован только после успешной фиксации Batch’а #1.

Реализация

Spout с поддержкой транзакций должен реализовывать интерфейс ICommitterTridentSpout<TransactionMetadata>. TransactionMetadata — любой класс, содержит данные для генерации Batch’ей и генерации следующей транзакции: TxMeta.

Скрытый текст

public class TxMeta {     private int start;     private int count;      public TxMeta(int start, int count) {         this.start = start;         this.count = count;     } // Skipped getters  } 

Класс реализующий интерфейс ITridentSpout.BatchCoordinator<TransactionMetadata> инициализирует TransactionMetadata при создании транзакции и отвечает на запрос готовы ли данные для следующей транзакции: TridentTxSpout. Создается в единственном экземпляре для каждой Topology.

Скрытый текст

    static class BCoordinator implements BatchCoordinator<TxMeta> {         private static final int TRANSACTION_COUNT = 5;         private static final int TRANSACTION_ELEMENT_COUNT = 5;  //TxMeta - метаданные предыдущей транзакции         @Override         public TxMeta initializeTransaction(long l, TxMeta txMeta) {              if(txMeta != null) {              System.out.println(String.format("Initializing transaction id: %08d, "             + "start: %04d, count: %04d", l, txMeta.getStart() +                 txMeta.getCount(), txMeta.getCount()));              return new TxMeta(txMeta.getStart() + txMeta.getCount(),                     TRANSACTION_ELEMENT_COUNT);             } else {                 return new TxMeta(0, TRANSACTION_ELEMENT_COUNT);             }         }  // Готовы ли данные для следующей транзакции         @Override         public boolean isReady(long l) {             if(l <= TRANSACTION_COUNT) {                 System.out.println("ISREADY " + l);                 return true;             }             return false;         }     } 

Класс реализующий интерфейс ICommitterTridentSpout.Emitter формирует Batch. В случае ошибки в обработке Batch’a, формирует Batch повторно.
Важно — повторно сформированный Batch должен содержать точно такой же набор Tuple, что и оригинальный.

Скрытый текст

static class BEmitter implements Emitter { // Формирует Batch по информации из TransactionMetadata         @Override         public void emitBatch(TransactionAttempt transactionAttempt,                                Object coordinatorMeta,                               TridentCollector tridentCollector) {              TxMeta txMeta = (TxMeta) coordinatorMeta;              System.out.println("Emitting transaction id: " +                     transactionAttempt.getTransactionId() + " attempt:" +                     transactionAttempt.getAttemptId()             );             for(int i = 0; i < txMeta.getCount(); ++i) {                 tridentCollector.emit(new Values("TRANS [" +                         transactionAttempt.getAttemptId() +                          "] [" + (txMeta.getStart() + i) + "]")                 );             }         }  // Транзакция  успешно закоммичена в State        @Override         public void success(TransactionAttempt transactionAttempt) {             System.out.println("BEmitter:Transaction success id:" +                                           transactionAttempt.getTransactionId());         }  // Попытка коммита транзакции в State         @Override         public void commit(TransactionAttempt transactionAttempt) {             System.out.println("BEmitter:Transaction commit id:" +                                          transactionAttempt.getTransactionId());         }     } 

Класс реализующий интерфейс State в нашем случае драйвер БД: TxDatabase.

Скрытый текст

public class TxDatabase implements State { // Вызывается при начале транзакции в БД     @Override     public void beginCommit(Long txId) {         System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId);     }  // Вызывается для коммита транзакции в БД     @Override     public void commit(Long txId) {         System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId);     } } 

Класс наследующий BaseStateUpdater<S extends State>, вносит изменения в State (БД): TxDatabaseUpdater

Скрытый текст

public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> {     int count;      // Вносит изменения в БД     @Override     public void updateState(TxDatabase txDatabase,                              List<TridentTuple> tridentTuples,                             TridentCollector tridentCollector) {          // Эмуляция сбоя транзакции         if(++count == 2) throw new FailedException("YYYY");           for(TridentTuple t: tridentTuples) {             System.out.println("Updating: " + t.getString(0));         }     } } 

Класс реализующий интерфейс StateFactory, создает экземпляры State: TxDatabaseFactory.

Собираем все вместе TridentTransactionApp:

public class TridentTransactionApp {     public static void main( String[] args ) throws Throwable     {         Logger.getRootLogger().setLevel(Level.ERROR);  // Создаем топологию         TridentTopology tridentTopology = new TridentTopology(); // Добавляем наш Spout         tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()). // Обработка Tuple пойдет параллельно - OpPrintout просто печатает записи                 shuffle().each(new Fields("msg"), new OpPrintout()).                 parallelismHint(2). // Сливаем результаты параллельной обработки в один поток                 global(). // Записываем изменения в State (БД)                 partitionPersist(new TxDatabaseFactory(),                         new Fields("msg"), new TxDatabaseUpdater()); // Skipped         LocalCluster cluster = new LocalCluster();         cluster.submitTopology("T2", config, tridentTopology.build());         Thread.sleep(1000*100);         cluster.shutdown();     } } 

Транзакционные возможности Storm очень удобно использовать для передачи данных из одной системы в другую, когда требуется нетривиальная обработка. Например одна система генерирует файлы, Storm их разделяет на записи, обрабатывает в параллельном режиме и складывает в БД. В случае ошибки обработки есть гарантия, что файл не будет удален и не будет обработан дважды.

PS. Раскрыть все возможности Storm в рамках статей невозможно, материала хватит на целую книгу. Надеюсь мне удалось показать ключевые возможности фреймворка и возможности его применения в реальных проектах.
По поводу развертывания кластера — недавно наткнулся на отличную статью. Не вижу смысла повторяться. Развернуть Storm в production действительно несложно.

PPS. В Hadoop существует аналог on-line обработки Storm — Hadoop Streaming, но в отличии от Storm, транзакции он не поддерживает.

ссылка на оригинал статьи http://habrahabr.ru/post/186634/