- PVSM.RU - https://www.pvsm.ru -

Изучаем Storm Framework. Часть II

В первой части [1] рассматривались базовые понятия Storm.

Разные классы задач предъявляют различные требования к надежности. Одно дело пропустить пару записей при подсчете статистики посещений, где счет идет на сотни тысяч и особая точность не нужна. И совсем другое — потерять, например, информацию о платеже клиента.

Далее рассмотрим о механизмы защиты от потери данных, которые реализованы в Storm.

Базовый пример

Spout

Если нам не важно были ли ошибки при обработке Tuple, то Spout отправляет Tuple в SpoutOutputCollector посредством вызова метода emit(new Values(...)) [2].

Eсли мы хотим узнать успешно ли обработался Tuple, то вызов будет выглядеть как emit(new Values(...), msgId), где msgId это объект произвольного класса. В этом случае интерфейс ISpout [3] предоставляет методы:

  • ack(Object msgId) [4] — будет вызван если Tuple обработан
  • fail(Object msgId) [5] — будет вызван если Tuple не обработан

где msgId — это msgId с которым был вызван SpoutOutputCollector.emit [6].
Пример FailAwareSpout [7]:

public class FailAwareSpout extends BaseRichSpout {
private Message[] messages;
// Skipped ...
    private static class Message implements Serializable {
        private String message;
        private int failCount;

        private Message(String message) {
            this.message = message;
        }
    }
// Skipped ...
    @Override
    public void nextTuple() {
// Skipped ...
// Отправляем Tuple c msgId
        outputCollector.emit(new Values(messages[messageId].message), messageId);
    }

// Tuple обработан нормально
    @Override
    public void ack(Object msgId) {
        Message m = messages[(Integer) msgId];

        System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " +
                m.message + " processed successfully");
    }

// Tuple не обработан
    @Override
    public void fail(Object msgId) {
        Message m = messages[(Integer) msgId];
        if(++m.failCount > MAX_RETRY_COUNT) {
            throw new IllegalStateException("Too many message processing errors");
        }
        System.out.println("IN>> [" + Thread.currentThread().getId() + "] message " +
                m.message + " processing failed " + "[" + m.failCount + "]");
// Вставляем в очередь на повторную обработку
        sendQueue.addLast((Integer) msgId);
    }
}

Методы nextTuple, ack и fail, вызываются в одном потоке и не требуют дополнительной синхронизации при обращении к полям Spout.

Bolt

Для того что бы Bolt мог информировать Storm о результатах обработки, он должен реализовывать интерфейс IRichBolt [8]. Проще всего это сделать унаследовав класс BaseRichBolt [9].
Bolt информирует Storm o результатах своей работы посредством вызова следующих методов класса OutputCollector в методе execute(Tuple) [10]:

  • ack(Tuple) [11] — обработка прошла успешно
  • fail(Tuple) [12] — обработка завершилась с ошибкой

Пример FailingBolt [13]:

public class FailingBolt extends BaseRichBolt {
    OutputCollector outputCollector;
// Skipped ...
    @Override
    public void execute(Tuple tuple) {
// Skipped ...
            outputCollector.ack(tuple); // Данные успешно обработаны
        }
        else {
// Skipped ...
            outputCollector.fail(tuple); // Обработка завершилась с ошибкой
        }
    }
// Skipped ...
}

Пример использования: BasicFailApp [14], Spout FailAwareSpout [7] и Bolt FailingBolt [13] случайным образом генерирующий ошибки обработки.

В Bolt'ах унаследованных от класса BaseBasicBolt [15], ack(Tuple) [11] вызывается после выхода из метода execute [16] автоматически.

Anchoring

При обработке входного Tuple, Bolt может генерировать более одного выходного Tuple. Если Bolt вызвал emit(sourceTuple, resultTuple) [17] то образуется DAG [18] с вершиной в виде исходного Tuple и потомками в виде порожденных Tuple. Storm отслеживает ошибки процессинга всех узлов графа. В случае возникновения ошибки на любом уровне иерархии, Spout, породивший исходный Tuple, будет уведомлен вызовом fail. Пример MultiplierBolt [19]:

public class MultiplierBolt extends BaseRichBolt {
// Skipped ...
    @Override
    public void execute(Tuple tuple) {
// Генерируем несколько  исходящих Tuple из одного входящего
        for(int i = 0; i < MULTI_COUNT; ++i) {
// Anchoring, привязываем исходящие Tuple к входящему 
            outputCollector.emit(tuple, new Values(tuple.getString(0) + " - " + i));
        }
        outputCollector.ack(tuple);
    }
// Skipped ...
}

Пример использования Anchoring: TreeFailApp [20]

В Bolt'ах унаследованных от класса BaseBasicBolt [15] метод execute(Tuple, BasicOutputCollector) [16] вызывается с коллектором BasicOutputCollector [21]. Особенность BasicOutputCollector в том, что он автоматически делает Anchor на входной Tuple при emit [22].

Поскольку Storm является распределенной системой, Tuple могут передаваться с одного узла кластера на другой. В связи с этим Storm обеспечивает отслеживание таймаутов обработки. По умолчанию, весь граф должен быть обработан за 30 секунд, или Storm вызовет метод fail у породившего граф Spout'а. Таймаут можно изменить [23].

Код доступен на GitHub [24].

Следующая часть будет посвящена Transactional Topologies, использующихся в связке с транзакционными источниками данных.

Автор: xlix123

Источник [25]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/java/38596

Ссылки в тексте:

[1] первой части: http://habrahabr.ru/post/186208/

[2] emit(new Values(...)): https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/CdrSpout.java#L45

[3] ISpout: http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html

[4] ack(Object msgId): http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html#ack(java.lang.Object)

[5] fail(Object msgId): http://nathanmarz.github.io/storm/doc/backtype/storm/spout/ISpout.html#fail(java.lang.Object)

[6] SpoutOutputCollector.emit: http://nathanmarz.github.io/storm/doc/backtype/storm/spout/SpoutOutputCollector.html#emit(java.util.List, java.lang.Object)

[7] FailAwareSpout: https://github.com/scanban/stormex/blob/master/src/main/java/examples/faults/FailAwareSpout.java

[8] IRichBolt: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/IRichBolt.html

[9] BaseRichBolt: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/base/BaseRichBolt.html

[10] execute(Tuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/IBolt.html#execute(backtype.storm.tuple.Tuple)

[11] ack(Tuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#ack(backtype.storm.tuple.Tuple)

[12] fail(Tuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#fail(backtype.storm.tuple.Tuple)

[13] FailingBolt: https://github.com/scanban/stormex/blob/master/src/main/java/examples/faults/FailingBolt.java

[14] BasicFailApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/BasicFailApp.java

[15] BaseBasicBolt: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/base/BaseBasicBolt.html

[16] execute: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/IBasicBolt.html#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)

[17] emit(sourceTuple, resultTuple): http://nathanmarz.github.io/storm/doc/backtype/storm/task/OutputCollector.html#emit(java.util.Collection, java.util.List)

[18] DAG: http://ru.wikipedia.org/wiki/%D0%9D%D0%B0%D0%BF%D1%80%D0%B0%D0%B2%D0%BB%D0%B5%D0%BD%D0%BD%D1%8B%D0%B9_%D0%B0%D1%86%D0%B8%D0%BA%D0%BB%D0%B8%D1%87%D0%B5%D1%81%D0%BA%D0%B8%D0%B9_%D0%B3%D1%80%D0%B0%D1%84

[19] MultiplierBolt: https://github.com/scanban/stormex/blob/master/src/main/java/examples/faults/MultiplierBolt.java

[20] TreeFailApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/TreeFailApp.java

[21] BasicOutputCollector: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/BasicOutputCollector.html

[22] emit: http://nathanmarz.github.io/storm/doc/backtype/storm/topology/BasicOutputCollector.html#emit(java.util.List)

[23] изменить: http://nathanmarz.github.io/storm/doc/backtype/storm/Config.html#TOPOLOGY_MESSAGE_TIMEOUT_SECS

[24] GitHub: https://github.com/scanban/stormex

[25] Источник: http://habrahabr.ru/post/186436/