Разные классы задач предъявляют различные требования к надежности. Одно дело пропустить пару записей при подсчете статистики посещений, где счет идет на сотни тысяч и особая точность не нужна. И совсем другое — потерять, например, информацию о платеже клиента.
Далее рассмотрим о механизмы защиты от потери данных, которые реализованы в Storm.
Базовый пример
Spout
Если нам не важно были ли ошибки при обработке Tuple, то Spout отправляет Tuple в SpoutOutputCollector посредством вызова метода emit(new Values(…)).
Eсли мы хотим узнать успешно ли обработался Tuple, то вызов будет выглядеть как emit(new Values(…), msgId), где msgId это объект произвольного класса. В этом случае интерфейс ISpout предоставляет методы:
- ack(Object msgId) — будет вызван если Tuple обработан
- fail(Object msgId) — будет вызван если Tuple не обработан
где msgId — это msgId с которым был вызван SpoutOutputCollector.emit.
Пример FailAwareSpout:
public class FailAwareSpout extends BaseRichSpout { private Message[] messages; // Skipped ... private static class Message implements Serializable { private String message; private int failCount; private Message(String message) { this.message = message; } } // Skipped ... @Override public void nextTuple() { // Skipped ... // Отправляем Tuple c msgId outputCollector.emit(new Values(messages[messageId].message), messageId); } // Tuple обработан нормально @Override public void ack(Object msgId) { Message m = messages[(Integer) msgId]; System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processed successfully"); } // Tuple не обработан @Override public void fail(Object msgId) { Message m = messages[(Integer) msgId]; if(++m.failCount > MAX_RETRY_COUNT) { throw new IllegalStateException("Too many message processing errors"); } System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " + m.message + " processing failed " + "[" + m.failCount + "]"); // Вставляем в очередь на повторную обработку sendQueue.addLast((Integer) msgId); } }
Методы nextTuple, ack и fail, вызываются в одном потоке и не требуют дополнительной синхронизации при обращении к полям Spout.
Bolt
Для того что бы Bolt мог информировать Storm о результатах обработки, он должен реализовывать интерфейс IRichBolt. Проще всего это сделать унаследовав класс BaseRichBolt.
Bolt информирует Storm o результатах своей работы посредством вызова следующих методов класса OutputCollector в методе execute(Tuple):
- ack(Tuple) — обработка прошла успешно
- fail(Tuple) — обработка завершилась с ошибкой
Пример FailingBolt:
public class FailingBolt extends BaseRichBolt { OutputCollector outputCollector; // Skipped ... @Override public void execute(Tuple tuple) { // Skipped ... outputCollector.ack(tuple); // Данные успешно обработаны } else { // Skipped ... outputCollector.fail(tuple); // Обработка завершилась с ошибкой } } // Skipped ... }
Пример использования: BasicFailApp, Spout FailAwareSpout и Bolt FailingBolt случайным образом генерирующий ошибки обработки.
В Bolt’ах унаследованных от класса BaseBasicBolt, ack(Tuple) вызывается после выхода из метода execute автоматически.
Anchoring
При обработке входного Tuple, Bolt может генерировать более одного выходного Tuple. Если Bolt вызвал emit(sourceTuple, resultTuple) то образуется DAG с вершиной в виде исходного Tuple и потомками в виде порожденных Tuple. Storm отслеживает ошибки процессинга всех узлов графа. В случае возникновения ошибки на любом уровне иерархии, Spout, породивший исходный Tuple, будет уведомлен вызовом fail. Пример MultiplierBolt:
public class MultiplierBolt extends BaseRichBolt { // Skipped ... @Override public void execute(Tuple tuple) { // Генерируем несколько исходящих Tuple из одного входящего for(int i = 0; i < MULTI_COUNT; ++i) { // Anchoring, привязываем исходящие Tuple к входящему outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i)); } outputCollector.ack(tuple); } // Skipped ... }
Пример использования Anchoring: TreeFailApp
В Bolt’ах унаследованных от класса BaseBasicBolt метод execute(Tuple, BasicOutputCollector) вызывается с коллектором BasicOutputCollector. Особенность BasicOutputCollector в том, что он автоматически делает Anchor на входной Tuple при emit.
Поскольку Storm является распределенной системой, Tuple могут передаваться с одного узла кластера на другой. В связи с этим Storm обеспечивает отслеживание таймаутов обработки. По умолчанию, весь граф должен быть обработан за 30 секунд, или Storm вызовет метод fail у породившего граф Spout’а. Таймаут можно изменить.
Код доступен на GitHub.
Следующая часть будет посвящена Transactional Topologies, использующихся в связке с транзакционными источниками данных.
ссылка на оригинал статьи http://habrahabr.ru/post/186436/
Добавить комментарий