- PVSM.RU - https://www.pvsm.ru -
В первой части [1] рассматривались базовые понятия Storm.
Разные классы задач предъявляют различные требования к надежности. Одно дело пропустить пару записей при подсчете статистики посещений, где счет идет на сотни тысяч и особая точность не нужна. И совсем другое — потерять, например, информацию о платеже клиента.
Далее рассмотрим о механизмы защиты от потери данных, которые реализованы в Storm.
Если нам не важно были ли ошибки при обработке Tuple, то Spout отправляет Tuple в SpoutOutputCollector посредством вызова метода emit(new Values(...)) [2].
Eсли мы хотим узнать успешно ли обработался Tuple, то вызов будет выглядеть как emit(new Values(...), msgId), где msgId это объект произвольного класса. В этом случае интерфейс ISpout [3] предоставляет методы:
где msgId — это msgId с которым был вызван SpoutOutputCollector.emit [6].
Пример FailAwareSpout [7]:
public class FailAwareSpout extends BaseRichSpout {
private Message[] messages;
// Skipped ...
private static class Message implements Serializable {
private String message;
private int failCount;
private Message(String message) {
this.message = message;
}
}
// Skipped ...
@Override
public void nextTuple() {
// Skipped ...
// Отправляем Tuple c msgId
outputCollector.emit(new Values(messages[messageId].message), messageId);
}
// Tuple обработан нормально
@Override
public void ack(Object msgId) {
Message m = messages[(Integer) msgId];
System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " +
m.message + " processed successfully");
}
// Tuple не обработан
@Override
public void fail(Object msgId) {
Message m = messages[(Integer) msgId];
if(++m.failCount > MAX_RETRY_COUNT) {
throw new IllegalStateException("Too many message processing errors");
}
System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " +
m.message + " processing failed " + "[" + m.failCount + "]");
// Вставляем в очередь на повторную обработку
sendQueue.addLast((Integer) msgId);
}
}
Методы nextTuple, ack и fail, вызываются в одном потоке и не требуют дополнительной синхронизации при обращении к полям Spout.
Для того что бы Bolt мог информировать Storm о результатах обработки, он должен реализовывать интерфейс IRichBolt [8]. Проще всего это сделать унаследовав класс BaseRichBolt [9].
Bolt информирует Storm o результатах своей работы посредством вызова следующих методов класса OutputCollector в методе execute(Tuple) [10]:
Пример FailingBolt [13]:
public class FailingBolt extends BaseRichBolt {
OutputCollector outputCollector;
// Skipped ...
@Override
public void execute(Tuple tuple) {
// Skipped ...
outputCollector.ack(tuple); // Данные успешно обработаны
}
else {
// Skipped ...
outputCollector.fail(tuple); // Обработка завершилась с ошибкой
}
}
// Skipped ...
}
Пример использования: BasicFailApp [14], Spout FailAwareSpout [7] и Bolt FailingBolt [13] случайным образом генерирующий ошибки обработки.
В Bolt'ах унаследованных от класса BaseBasicBolt [15], ack(Tuple) [11] вызывается после выхода из метода execute [16] автоматически.
При обработке входного Tuple, Bolt может генерировать более одного выходного Tuple. Если Bolt вызвал emit(sourceTuple, resultTuple) [17] то образуется DAG [18] с вершиной в виде исходного Tuple и потомками в виде порожденных Tuple. Storm отслеживает ошибки процессинга всех узлов графа. В случае возникновения ошибки на любом уровне иерархии, Spout, породивший исходный Tuple, будет уведомлен вызовом fail. Пример MultiplierBolt [19]:
public class MultiplierBolt extends BaseRichBolt {
// Skipped ...
@Override
public void execute(Tuple tuple) {
// Генерируем несколько исходящих Tuple из одного входящего
for(int i = 0; i < MULTI_COUNT; ++i) {
// Anchoring, привязываем исходящие Tuple к входящему
outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i));
}
outputCollector.ack(tuple);
}
// Skipped ...
}
Пример использования Anchoring: TreeFailApp [20]
В Bolt'ах унаследованных от класса BaseBasicBolt [15] метод execute(Tuple, BasicOutputCollector) [16] вызывается с коллектором BasicOutputCollector [21]. Особенность BasicOutputCollector в том, что он автоматически делает Anchor на входной Tuple при emit [22].
Поскольку Storm является распределенной системой, Tuple могут передаваться с одного узла кластера на другой. В связи с этим Storm обеспечивает отслеживание таймаутов обработки. По умолчанию, весь граф должен быть обработан за 30 секунд, или Storm вызовет метод fail у породившего граф Spout'а. Таймаут можно изменить [23].
Код доступен на GitHub [24].
Следующая часть будет посвящена Transactional Topologies, использующихся в связке с транзакционными источниками данных.
Автор: xlix123
Источник [25]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/38596
Ссылки в тексте:
[1] первой части: http://habrahabr.ru/post/186208/
[2] emit(new Values(...)): https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/CdrSpout.java#L45
[3] ISpout: http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html
[4] ack(Object msgId): http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html#ack(java.lang.Object)
[5] fail(Object msgId): http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html#fail(java.lang.Object)
[6] SpoutOutputCollector.emit: http://nathanmarz.github.io/storm/doc/backtype/storm/spout/SpoutOutputCollector.html#emit(java.util.List, java.lang.Object)
[7] FailAwareSpout: https://github.com/scanban/stormex/blob/master/src/main/java/examples/faults/FailAwareSpout.java
[8] IRichBolt: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/IRichBolt.html
[9] BaseRichBolt: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/base/BaseRichBolt.html
[10] execute(Tuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/IBolt.html#execute(backtype.storm.tuple.Tuple)
[11] ack(Tuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#ack(backtype.storm.tuple.Tuple)
[12] fail(Tuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#fail(backtype.storm.tuple.Tuple)
[13] FailingBolt: https://github.com/scanban/stormex/blob/master/src/main/java/examples/faults/FailingBolt.java
[14] BasicFailApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/BasicFailApp.java
[15] BaseBasicBolt: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/base/BaseBasicBolt.html
[16] execute: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/IBasicBolt.html#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
[17] emit(sourceTuple, resultTuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#emit(java.util.Collection, java.util.List)
[18] DAG: http://ru.wikipedia.org/wiki/%D0%9D%D0%B0%D0%BF%D1%80%D0%B0%D0%B2%D0%BB%D0%B5%D0%BD%D0%BD%D1%8B%D0%B9_%D0%B0%D1%86%D0%B8%D0%BA%D0%BB%D0%B8%D1%87%D0%B5%D1%81%D0%BA%D0%B8%D0%B9_%D0%B3%D1%80%D0%B0%D1%84
[19] MultiplierBolt: https://github.com/scanban/stormex/blob/master/src/main/java/examples/faults/MultiplierBolt.java
[20] TreeFailApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/TreeFailApp.java
[21] BasicOutputCollector: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/BasicOutputCollector.html
[22] emit: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/BasicOutputCollector.html#emit(java.util.List)
[23] изменить: http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_MESSAGE_TIMEOUT_SECS
[24] GitHub: https://github.com/scanban/stormex
[25] Источник: http://habrahabr.ru/post/186436/
Нажмите здесь для печати.