- PVSM.RU - https://www.pvsm.ru -

Изучаем Storm Framework. Часть III

Во второй части [1] статьи рассказывалось о механизмах обнаружения ошибок в процессе обработки.

Обработка завершилась с ошибкой, что делать дальше? Вполне возможно, что потеряна связь с одним из узлов кластера или временно недоступна база данных. В этом случае, нельзя с уверенностью сказать, какие операции выполнились успешно, а какие — нет. Если все операции в цепочке повторно применимы (идемпотентны [2]), например установка флага, то можно просто перезапустить обработку. Если нет, то на помощь приходят механизмы транзакций Storm.

Когда говорят о характеристиках транзакций, тут же всплывает термин ACID [3]:

  • Atomicity (атомарность). Все изменения произведенные в системе на протяжении транзакции, либо применяются полностью, либо не применяются совсем.
  • Consistency (cогласованность). Транзакция переводит систему из одного непртиворечивого состояния в другое.
  • Isolation (изолированность). Параллельно выполняемые транзакции не оказывают влияние на результат работы друг друга.
  • Durability (надежность). Зафиксированные транзакцией изменения гарантированно остаются в системе.

Consistency и Durability в большей степерни относятся к базам данных. Нас будут интересовать Atomicity и Isolation.

В версии 0.8.0 в Storm появилась подсистема Trident [4] — аналог Apache Pig [5]. В нее же перекочевал функционал Transactional topology [6].

Транзакции в Storm

Atomicity

В Topology создается объект реализующий интерфейс State [7], инкапсулирующий работу с БД. Входные данные, поступающие в Spout, разбиваются на Tuple и собираются в пакеты (batch). Batch ассоцируется с уникальным transaction id. Tuple образующие batch могут обрабатываться параллельно.
В конце цепочки обработки, набор Tuple, относящихся к одной транзакции, передается в метод updateState [8] класса, реализующего интерфейс StateUpdater [9], который и призводит модификацию State. В случае успешного завершения, Spout получает уведомление об успехе обработки batch'a. В случае ошибки, Spout должен передать на обработку весь batch повторно.
Таким образом Storm гарантирует, что Batch будет зафиксирован в БД полностью и только один раз.

Isolation

Storm гарантирует, что Batch'и передаются в StateUpdater строго последовательно, в порядке возрастания transaction id. То есть Batch #2 будет зафиксирован только после успешной фиксации Batch'а #1.

Реализация

Spout с поддержкой транзакций должен реализовывать интерфейс ICommitterTridentSpout<TransactionMetadata> [10]. TransactionMetadata — любой класс, содержит данные для генерации Batch'ей и генерации следующей транзакции: TxMeta [11].

Скрытый текст

public class TxMeta {
    private int start;
    private int count;

    public TxMeta(int start, int count) {
        this.start = start;
        this.count = count;
    }
// Skipped getters 
}

Класс реализующий интерфейс ITridentSpout.BatchCoordinator<TransactionMetadata> [12] инициализирует TransactionMetadata при создании транзакции и отвечает на запрос готовы ли данные для следующей транзакции: TridentTxSpout [13]. Создается в единственном экземпляре для каждой Topology.

Скрытый текст

    static class BCoordinator implements BatchCoordinator<TxMeta> {
        private static final int TRANSACTION_COUNT = 5;
        private static final int TRANSACTION_ELEMENT_COUNT = 5;

//TxMeta - метаданные предыдущей транзакции
        @Override
        public TxMeta initializeTransaction(long l, TxMeta txMeta) { 
            if(txMeta != null) {

            System.out.println(String.format("Initializing transaction id: %08d, "
            + "start: %04d, count: %04d", l, txMeta.getStart() + 
               txMeta.getCount(), txMeta.getCount()));

            return new TxMeta(txMeta.getStart() + txMeta.getCount(),
                    TRANSACTION_ELEMENT_COUNT);
            } else {
                return new TxMeta(0, TRANSACTION_ELEMENT_COUNT);
            }
        }

// Готовы ли данные для следующей транзакции
        @Override
        public boolean isReady(long l) {
            if(l <= TRANSACTION_COUNT) {
                System.out.println("ISREADY " + l);
                return true;
            }
            return false;
        }
    }

Класс реализующий интерфейс ICommitterTridentSpout.Emitter [14] формирует Batch. В случае ошибки в обработке Batch'a, формирует Batch повторно.
Важно — повторно сформированный Batch должен содержать точно такой же набор Tuple, что и оригинальный.

Скрытый текст

static class BEmitter implements Emitter {
// Формирует Batch по информации из TransactionMetadata
        @Override
        public void emitBatch(TransactionAttempt transactionAttempt, 
                              Object coordinatorMeta,
                              TridentCollector tridentCollector) {

            TxMeta txMeta = (TxMeta) coordinatorMeta;

            System.out.println("Emitting transaction id: " +
                    transactionAttempt.getTransactionId() + " attempt:" +
                    transactionAttempt.getAttemptId()
            );
            for(int i = 0; i < txMeta.getCount(); ++i) {
                tridentCollector.emit(new Values("TRANS [" +
                        transactionAttempt.getAttemptId() + 
                        "] [" + (txMeta.getStart() + i) + "]")
                );
            }
        }

// Транзакция  успешно закоммичена в State
       @Override
        public void success(TransactionAttempt transactionAttempt) {
            System.out.println("BEmitter:Transaction success id:" + 
                                         transactionAttempt.getTransactionId());
        }

// Попытка коммита транзакции в State
        @Override
        public void commit(TransactionAttempt transactionAttempt) {
            System.out.println("BEmitter:Transaction commit id:" + 
                                        transactionAttempt.getTransactionId());
        }
    }

Класс реализующий интерфейс State [7] в нашем случае драйвер БД: TxDatabase [15].

Скрытый текст

public class TxDatabase implements State {
// Вызывается при начале транзакции в БД
    @Override
    public void beginCommit(Long txId) {
        System.out.println("beginCommit [" + Thread.currentThread().getId() + "] " + txId);
    }

// Вызывается для коммита транзакции в БД
    @Override
    public void commit(Long txId) {
        System.out.println("commit [" + Thread.currentThread().getId() + "] " + txId);
    }
}

Класс наследующий BaseStateUpdater<S extends State> [16], вносит изменения в State (БД): TxDatabaseUpdater [17]

Скрытый текст

public class TxDatabaseUpdater extends BaseStateUpdater<TxDatabase> {
    int count;

    // Вносит изменения в БД
    @Override
    public void updateState(TxDatabase txDatabase, 
                            List<TridentTuple> tridentTuples,
                            TridentCollector tridentCollector) {

        // Эмуляция сбоя транзакции
        if(++count == 2) throw new FailedException("YYYY"); 

        for(TridentTuple t: tridentTuples) {
            System.out.println("Updating: " + t.getString(0));
        }
    }
}

Класс реализующий интерфейс StateFactory [18], создает экземпляры State: TxDatabaseFactory [19].

Собираем все вместе TridentTransactionApp [20]:

public class TridentTransactionApp
{
    public static void main( String[] args ) throws Throwable
    {
        Logger.getRootLogger().setLevel(Level.ERROR);

// Создаем топологию
        TridentTopology tridentTopology = new TridentTopology();
// Добавляем наш Spout
        tridentTopology.newStream("TridentTxSpout", new TridentTxSpout()).
// Обработка Tuple пойдет параллельно - OpPrintout просто печатает записи
                shuffle().each(new Fields("msg"), new OpPrintout()).
                parallelismHint(2).
// Сливаем результаты параллельной обработки в один поток
                global().
// Записываем изменения в State (БД)
                partitionPersist(new TxDatabaseFactory(),
                        new Fields("msg"), new TxDatabaseUpdater());
// Skipped
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("T2", config, tridentTopology.build());
        Thread.sleep(1000*100);
        cluster.shutdown();
    }
}

Транзакционные возможности Storm очень удобно использовать для передачи данных из одной системы в другую, когда требуется нетривиальная обработка. Например одна система генерирует файлы, Storm их разделяет на записи, обрабатывает в параллельном режиме и складывает в БД. В случае ошибки обработки есть гарантия, что файл не будет удален и не будет обработан дважды.

PS. Раскрыть все возможности Storm в рамках статей невозможно, материала хватит на целую книгу. Надеюсь мне удалось показать ключевые возможности фреймворка и возможности его применения в реальных проектах.
По поводу развертывания кластера — недавно наткнулся на отличную статью [21]. Не вижу смысла повторяться. Развернуть Storm в production действительно несложно.

PPS. В Hadoop [22] существует аналог on-line обработки Storm — Hadoop Streaming [23], но в отличии от Storm, транзакции он не поддерживает.

Автор: xlix123

Источник [24]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/java/38727

Ссылки в тексте:

[1] второй части: http://habrahabr.ru/post/186436/

[2] идемпотентны: http://ru.wikipedia.org/wiki/%D0%98%D0%B4%D0%B5%D0%BC%D0%BF%D0%BE%D1%82%D0%B5%D0%BD%D1%82%D0%BD%D0%BE%D1%81%D1%82%D1%8C

[3] ACID: http://ru.wikipedia.org/wiki/ACID

[4] Trident: https://github.com/nathanmarz/storm/wiki/Trident-tutorial

[5] Apache Pig: http://pig.apache.org/

[6] Transactional topology: https://github.com/nathanmarz/storm/wiki/Transactional-topologies

[7] State: http://nathanmarz.github.io/storm/doc/storm/trident/state/State.html

[8] updateState: http://nathanmarz.github.io/storm/doc/storm/trident/state/StateUpdater.html#updateState(S, java.util.List, storm.trident.operation.TridentCollector)

[9] StateUpdater: http://nathanmarz.github.io/storm/doc/storm/trident/state/StateUpdater.html

[10] ICommitterTridentSpout<TransactionMetadata>: http://nathanmarz.github.io/storm/doc/storm/trident/spout/ICommitterTridentSpout.html

[11] TxMeta: https://github.com/scanban/stormex/blob/master/src/main/java/examples/tx/TxMeta.java

[12] ITridentSpout.BatchCoordinator<TransactionMetadata>: http://nathanmarz.github.io/storm/doc/storm/trident/spout/ITridentSpout.BatchCoordinator.html

[13] TridentTxSpout: https://github.com/scanban/stormex/blob/master/src/main/java/examples/tx/TridentTxSpout.java

[14] ICommitterTridentSpout.Emitter: http://nathanmarz.github.io/storm/doc/storm/trident/spout/ICommitterTridentSpout.Emitter.html

[15] TxDatabase: https://github.com/scanban/stormex/blob/master/src/main/java/examples/tx/TxDatabase.java

[16] BaseStateUpdater<S extends State>: http://nathanmarz.github.io/storm/doc/storm/trident/state/BaseStateUpdater.html

[17] TxDatabaseUpdater: https://github.com/scanban/stormex/blob/master/src/main/java/examples/tx/TxDatabaseUpdater.java

[18] StateFactory: http://nathanmarz.github.io/storm/doc/storm/trident/state/StateFactory.html

[19] TxDatabaseFactory: https://github.com/scanban/stormex/blob/master/src/main/java/examples/tx/TxDatabaseFactory.java

[20] TridentTransactionApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/TridentTransactionApp.java

[21] статью: http://www.michael-noll.com/tutorials/running-multi-node-storm-cluster/

[22] Hadoop: http://hadoop.apache.org/

[23] Hadoop Streaming: http://hadoop.apache.org/docs/stable/streaming.html

[24] Источник: http://habrahabr.ru/post/186634/