- PVSM.RU - https://www.pvsm.ru -
В 2011 году Twitter открыл, под лицензией Eclipse Public License [1], проект распределенных вычислений Storm [2]. Storm был создан в компании BackType и перешел к Twitter после покупки.
Storm это система ориентированная на распределенную обработку больших потоков данных, аналогичная Apache Hadoop [3], но в реальном времени.
Ключевые особенности Storm:
В первой части рассматриваются базовые понятия и основы создания приложения c использованием Storm версии 0.8.2.
Tuple
Элемент представления данных. По умолчанию может содержать Long, Integer, Short, Byte, String, Double, Float, Boolean и byte[] поля. Пользовательские типы используемые в Tuple должны быть сериализуемыми.
Stream
Последовательность из Tuple. Содержит схему именования полей в Tuple.
Spout
Поставщик данных для Stream. Получает данные из внешних источников, формирует из них Tuple и отправляет в Stream. Может отправлять Tuple в несколько разных Stream. Есть готовые для популярных систем обмена сообщениями: RabbitMQ / AMQP [5], Kestrel [6], JMS [7], Kafka [8].
Bolt
Обработчик данных. На вход поступают Tuple. На выход отправляет 0 или более Tuple.
Topology
Совокупность элементов с описанием их взаимосвязи. Аналог MapReduce job в Hadoop. В отличии от MapReduce job — не останавливается после исчерпания входного потока данных. Осуществляет транспорт Tuple между элементами Spout и Bolt. Может запускаться локально или загружаться в Storm кластер.
Есть поток данных о телефонных вызовах Cdr [9]. На основании source номера определяется id клиента. На основании destination номера и id клиента определяется тариф и считается стоимость звонка. Каждый из этапов должен работать в несколько потоков.
Пример будет запускаться на локальной машине.
Для начала просто распечатаем входные данные BasicApp [10].
Создаем новую Topology:
TopologyBuilder builder = new TopologyBuilder();
Добавляем Spout CdrSpout [11] генерирующий входные данные:
builder.setSpout("CdrReader", new CdrSpout());
Добавляем Bolt с двумя потоками и указываем что на вход подается выходной поток CdrReader. shuffleGrouping означает что данные из CdrReader подаются на случайно выбранный PrintOutBolt.
builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).shuffleGrouping("CdrReader");
Конфигурируем и запускам локальный Storm кластер:
Config config = new Config(); // Конфигурация кластера по умолчанию
config.setDebug(false);
LocalCluster cluster = new LocalCluster(); // Создаем локальный Storm кластер
cluster.submitTopology("T1", config, builder.createTopology()); // Стартуем Topology
Thread.sleep(1000*10);
cluster.shutdown(); // Останавливаем кластер
На выходе получаем примерно следующее:
OUT>> [80]Cdr{callSource='78119990005', callDestination='8313610698077174239', callTime=7631, clientId=0, price=0} OUT>> [78]Cdr{callSource='78119990006', callDestination='2238707710336895468', callTime=20738, clientId=0, price=0} OUT>> [78]Cdr{callSource='78119990007', callDestination='579372726495390920', callTime=31544, clientId=0, price=0} OUT>> [80]Cdr{callSource='78119990006', callDestination='2010724447342634423', callTime=10268, clientId=0, price=0}
Число в квадратных скобках — Thread Id, видно что обработка ведется параллельно.
Для дальнейших экспериментов нужно разобраться с распределением входных данных между несколькими обработчиками.
В примере выше был использован случайный подход. Но в реальном применении Bolt'ы наверняка будут использовать внешние справочные системы и базы данных. В этом случае желательно чтобы каждый Bolt обрабатывал свое подмножество входных данных. Тогда можно будет организовать эффективное кэширование данных из внешних систем.
Для этого в Storm предусмотрен интерфейс CustomStreamGrouping.
Добавим в проект CdrGrouper [12]. Его задача — отправлять Tuple с одинаковыми source номерами на один и тот же Bolt. Для этого в CustomStreamGrouping предусмотрено два вызова:
prepare — вызывается перед первым использованием:
@Override
public void prepare(WorkerTopologyContext workerTopologyContext, GlobalStreamId globalStreamId, List<Integer> integers) {
tasks = new ArrayList<>(integers); // Запоминаем номера Bolts
}
и chooseTasks — где на вход подается список из Tuple, а возвращается список состоящий из номеров Bolt'ов для каждой позиции в списке Tuple:
@Override
public List<Integer> chooseTasks(int i, List<Object> objects) {
List<Integer> rvalue = new ArrayList<>(objects.size());
for(Object o: objects) {
Cdr cdr = (Cdr) o;
rvalue.add(tasks.get(Math.abs(cdr.getCallSource().hashCode()) %
tasks.size()));
}
return rvalue;
}
Заменим shuffleGrouping на CdrGrouper BasicGroupApp [13]:
builder.setBolt("PrintOutBolt", new PrintOutBolt(), 2).
customGrouping("CdrReader", new CdrGrouper());
Запустим и убедимся что работает как задумано:
OUT>> [80]Cdr{callSource='78119990007', callDestination='3314931472251135073', callTime=17632, clientId=0, price=0} OUT>> [80]Cdr{callSource='78119990007', callDestination='4182885669941386786', callTime=31533, clientId=0, price=0}
Далее в проект добавляем:
ClientIdBolt [14] — определяет id клиента по source номеру.
ClientIdGrouper [15] — Группирует по id клиента.
RaterBolt [16] — занимается тарификацией.
CalcApp [17] — окончательный вариант программы.
Если тема будет интересна, то в следующей части надеюсь рассказать о механизмах защиты от потери данных и запуске на реальном кластере. Код доступен на GitHub [18].
PS. Из песни конечно слова не выкинешь, но название обработчика данных «Bolt» несколько смущает :)
Автор: xlix123
Источник [19]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/38419
Ссылки в тексте:
[1] Eclipse Public License: http://www.eclipse.org/legal/epl-v10.html
[2] Storm: http://storm-project.net/
[3] Apache Hadoop: http://hadoop.apache.org/
[4] Multilang protocol: https://github.com/nathanmarz/storm/wiki/Multilang-protocol
[5] RabbitMQ / AMQP: https://github.com/Xorlev/storm-amqp-spout
[6] Kestrel: https://github.com/nathanmarz/storm-kestrel
[7] JMS: https://github.com/ptgoetz/storm-jms
[8] Kafka: https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka
[9] Cdr: https://github.com/scanban/stormex/blob/master/src/main/java/examples/data/Cdr.java
[10] BasicApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/BasicApp.java
[11] CdrSpout: https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/CdrSpout.java
[12] CdrGrouper: https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/CdrGrouper.java
[13] BasicGroupApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/BasicGroupApp.java
[14] ClientIdBolt: https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/ClientIdBolt.java
[15] ClientIdGrouper: https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/ClientIdGrouper.java
[16] RaterBolt: https://github.com/scanban/stormex/blob/master/src/main/java/examples/storm/RaterBolt.java
[17] CalcApp: https://github.com/scanban/stormex/blob/master/src/main/java/examples/CalcApp.java
[18] GitHub: https://github.com/scanban/stormex
[19] Источник: http://habrahabr.ru/post/186208/
Нажмите здесь для печати.