- PVSM.RU - https://www.pvsm.ru -
Привет! Сегодня мы построим систему, которая будет при помощи Spark Streaming обрабатывать потоки сообщений Apache Kafka и записывать результат обработки в облачную базу данных AWS RDS.
Представим, что некая кредитная организация ставит перед нами задачу обработки входящих транзакций «на лету» по всем своим филиалам. Это может быть сделано с целью оперативного расчета открытой валютой позиции для казначейства, лимитов или финансового результата по сделкам и т.д.
Как реализовать этот кейс без применения магии и волшебных заклинаний — читаем под катом! Поехали!
Безусловно, обработка большого массива данных в реальном времени предоставляет широкие возможности для использования в современных системах. Одной из популярнейших комбинаций для этого является тандем Apache Kafka и Spark Streaming, где Kafka — создает поток пакетов входящих сообщений, а Spark Streaming обрабатывает эти пакеты через заданный интервал времени.
Для повышения отказоустойчивости приложения, будем использовать контрольные точки — чекпоинты (checkpoints). При помощи этого механизма, когда модулю Spark Streaming потребуется восстановить утраченные данные, ему нужно будет только вернуться к последней контрольной точке и возобновить вычисления от нее.
Используемые компоненты:
Перед непосредственным использованием Kafka, необходимо убедиться в наличии Java, т.к. для работы используется JVM:
sudo apt-get update
sudo apt-get install default-jre
java -version
Создадим нового пользователя для работы с Kafka:
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
Далее скачиваем дистрибутив с официального сайта Apache Kafka:
wget -P /YOUR_PATH "http://apache-mirror.rbc.ru/pub/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz"
Распаковываем скаченный архив:
tar -xvzf /YOUR_PATH/kafka_2.12-2.2.0.tgz
ln -s /YOUR_PATH/kafka_2.12-2.2.0 kafka
Следующий шаг — опциональный. Дело в том, что настройки по умолчанию не позволяют полноценно использовать все возможности Apache Kafka. Например, удалять тему, категорию, группу, на которые могут быть опубликованы сообщения. Чтобы изменить это, отредактируем файл конфигурации:
vim ~/kafka/config/server.properties
Добавьте в конец файла следующее:
delete.topic.enable = true
Перед запуском сервера Kafka, необходимо стартовать сервер ZooKeeper, будем использовать вспомогательный скрипт, который поставляется вместе с дистрибутивом Kafka:
Cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
После того, как ZooKeeper успешно стартовал, в отдельном терминале запускаем сервер Kafka:
bin/kafka-server-start.sh config/server.properties
Создадим новый топик под названием Transaction:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic transaction
Убедимся, что топик с нужным количеством партиций и репликацией был создан:
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Упустим моменты тестирования продюсера и консьюмера для вновь созданного топика. Более подробно о том, как можно протестировать отправку и прием сообщений, написано в официальной документации — Send some messages [6]. Ну а мы переходим к написанию продюсера на Python с использованием KafkaProducer API.
Продюсер будет генерить случайные данные — по 100 сообщений каждую секунду. Под случайными данными будем понимать словарь, состоящий из трех полей:
Код для продюсера выглядит следующим образом:
from numpy.random import choice, randint
def get_random_value():
new_dict = {}
branch_list = ['Kazan', 'SPB', 'Novosibirsk', 'Surgut']
currency_list = ['RUB', 'USD', 'EUR', 'GBP']
new_dict['branch'] = choice(branch_list)
new_dict['currency'] = choice(currency_list)
new_dict['amount'] = randint(-100, 100)
return new_dict
Далее, используя метод send, отправляем сообщение на сервер, в нужный нам топик, в формате JSON:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:dumps(x).encode('utf-8'),
compression_type='gzip')
my_topic = 'transaction'
data = get_random_value()
try:
future = producer.send(topic = my_topic, value = data)
record_metadata = future.get(timeout=10)
print('--> The message has been sent to a topic:
{}, partition: {}, offset: {}'
.format(record_metadata.topic,
record_metadata.partition,
record_metadata.offset ))
except Exception as e:
print('--> It seems an Error occurred: {}'.format(e))
finally:
producer.flush()
При запуске скрипта получаем в терминале следующие сообщения:
Это означает, что все работает как мы хотели — продюсер генерит и отправляет сообщения в нужный нам топик.
Следующим шагом будет установка Spark и обработка этого потока сообщений.
Apache Spark — это универсальная и высокопроизводительная кластерная вычислительная платформа.
По производительности Spark превосходит популярные реализации модели MapReduce, попутно обеспечивая поддержку более широкого диапазона типов вычислений, включая интерактивные запросы и потоковую обработку. Скорость играет важную роль при обработке больших объемов данных, так как именно скорость позволяет работать в интерактивном режиме, не тратя минуты или часы на ожидание. Одно из важнейших достоинств Spark, обеспечивающих столь высокую скорость, — способность выполнять вычисления в памяти.
Данный фреймворк написан на Scala, поэтому необходимо установить ее в первую очередь:
sudo apt-get install scala
Скачиваем с официального сайта дистрибутив Spark:
wget "http://mirror.linux-ia64.org/apache/spark/spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz"
Распаковываем архив:
sudo tar xvf spark-2.4.2/spark-2.4.2-bin-hadoop2.7.tgz -C /usr/local/spark
Добавляем путь к Spark в bash-файл:
vim ~/.bashrc
Добавляем через редактор следующие строчки:
SPARK_HOME=/usr/local/spark
export PATH=$SPARK_HOME/bin:$PATH
Выполняем команду ниже после внесения правок в bashrc:
source ~/.bashrc
Осталось развернуть базу данных, куда будем заливать обработанную информацию из потоков. Для этого будем использовать сервис AWS RDS.
Заходим в консоль AWS --> AWS RDS --> Databases --> Create database:
Выбираем PostgreSQL и нажимаем кнопку Next:
Т.к. данный пример разбирается исключительно в образовательных целях, будем использовать бесплатный сервер «на минималках» (Free Tier):
Далее, ставим галочку в блоке Free Tier, и после этого нам автоматом будет предложен инстанс класса t2.micro — хоть и слабенький, но бесплатный и вполне подойдет для нашей задачи:
Следом идут очень важные вещи: наименование инстанса БД, имя мастер-пользователя и его пароль. Назовем инстанст: myHabrTest, мастер-пользователь: habr, пароль: habr12345 и нажимаем на кнопку Next:
На следующей странице находятся параметры, отвечающие за доступность нашего сервера БД извне (Public accessibility) и доступность портов:
Давайте создадим новую настройку для VPC security group, которая позволит извне обращаться к нашему серверу БД через порт 5432 (PostgreSQL).
Перейдем в отдельном окне браузера к консоли AWS в раздел VPC Dashboard --> Security Groups --> Create security group:
Задаем имя для Security group — PostgreSQL, описание, указываем к какой VPC данная группа должна быть ассоциирована и нажимаем кнопку Create:
Заполняем для свежесозданной группы Inbound rules для порта 5432, как показано на картинке ниже. Вручную порт можно не указывать, а выбрать PostgreSQL из раскрывающегося списка Type.
Строго говоря, значение ::/0 означает доступность входящего траффика для сервера со всего мира, что канонически не совсем верно, но для разбора примера позволим себе использовать такой подход:
Возвращаемся к странице браузера, где у нас открыто «Configure advanced settings» и выбираем в разделе VPC security groups --> Choose existing VPC security groups --> PostgreSQL:
Далее, в разделе Database options --> Database name --> задаем имя — habrDB.
Остальные параметры, за исключением разве что отключения бэкапирования (backup retention period — 0 days), мониторинга и Performance Insights, можем оставить по умолчанию. Нажимаем на кнопку Create database:
Завершающим этапом будет разработка Spark-джобы, которая будет каждые две секунды обрабатывать новые данные, пришедшие от Kafka и заносить результат в базу данных.
Как было отмечено выше, контрольные точки (сheckpoints) — это основной механизм в SparkStreaming, который должен быть настроен для обеспечения отказоустойчивости. Будем использовать контрольные точки и, в случае падения процедуры, модулю Spark Streaming для восстановления утраченных данных нужно будет только вернуться к последней контрольной точке и возобновить вычисления от нее.
Контрольную точку можно включить, установив каталог в отказоустойчивой, надежной файловой системе (например, HDFS, S3 и т. Д.), в которой будет сохранена информация контрольной точки. Это делается с помощью, например:
streamingContext.checkpoint(checkpointDirectory)
В нашем примере будем использовать следующий подход, а именно, если checkpointDirectory существует, то контекст будет воссоздан из данных контрольной точки. Если каталог не существует (т.е. выполняется в первый раз), то вызывается функция functionToCreateContext для создания нового контекста и настройки DStreams:
from pyspark.streaming import StreamingContext
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
Создаем объект DirectStream с целью подключения к топику «transaction» при помощи метода createDirectStream библиотеки KafkaUtils:
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 2)
broker_list = 'localhost:9092'
topic = 'transaction'
directKafkaStream = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": broker_list})
Парсим входящие данные в формате JSON:
rowRdd = rdd.map(lambda w: Row(branch=w['branch'],
currency=w['currency'],
amount=w['amount']))
testDataFrame = spark.createDataFrame(rowRdd)
testDataFrame.createOrReplaceTempView("treasury_stream")
Используя Spark SQL, делаем несложную группировку и выводим результат в консоль:
select
from_unixtime(unix_timestamp()) as curr_time,
t.branch as branch_name,
t.currency as currency_code,
sum(amount) as batch_value
from treasury_stream t
group by
t.branch,
t.currency
Получение текста запроса и запуск его через Spark SQL:
sql_query = get_sql_query()
testResultDataFrame = spark.sql(sql_query)
testResultDataFrame.show(n=5)
А затем сохраняем полученные агрегированные данные в таблицу в AWS RDS. Чтобы сохранить результаты агрегации в таблицу базы данных, будем использовать метод write объекта DataFrame:
testResultDataFrame.write
.format("jdbc")
.mode("append")
.option("driver", 'org.postgresql.Driver')
.option("url","jdbc:postgresql://myhabrtest.ciny8bykwxeg.us-east-1.rds.amazonaws.com:5432/habrDB")
.option("dbtable", "transaction_flow")
.option("user", "habr")
.option("password", "habr12345")
.save()
Несколько слов о настройке подключения к AWS RDS. Пользователя и пароль к нему мы создавали на шаге «Развертывание AWS PostgreSQL». В качестве url сервера баз данных следует использовать Endpoint, который отображается в разделе Connectivity & security:
В целях корректной связки Spark и Kafka, следует запускать джобу через smark-submit с использованием артефакта spark-streaming-kafka-0-8_2.11. Дополнительно применим также артефакт для взаимодействия с базой данных PostgreSQL, их будем передавать через --packages.
Для гибкости скрипта, вынесем в качестве входных параметров также наименование сервера сообщений и топик, из которого хотим получать данные.
Итак, пришло время запустить и проверить работоспособность системы:
spark-submit
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,
org.postgresql:postgresql:9.4.1207
spark_job.py localhost:9092 transaction
Все получилось! Как видно на картинке ниже — в процессе работы приложения новые результаты агрегации выводятся каждые 2 секунды, потому что мы установили интервал пакетирования равным 2 секундам, когда создавали объект StreamingContext:
Далее, делаем нехитрый запрос к базе данных, чтобы проверить наличие записей в таблице transaction_flow:
В данной статье был рассмотрен пример поточной обработки информации с использованием Spark Streaming в связке с Apache Kafka и PostgreSQL. С ростом объемов данных из различных источников, сложно переоценить практическую ценность Spark Streaming для создания потоковых приложений и приложений, действующих в масштабе реального времени.
Полный исходный код вы можете найти в моем репозитории на GitHub [7].
С удовольствием готов обсудить данную статью, жду Ваших комментариев, а также, надеюсь на конструктивную критику всех неравнодушных читателей.
Желаю успехов!
P.S. Первоначально планировалось использовать локальную БД PostgreSQL, но учитывая мою любовь к AWS, я решил вынести базу данных в облако. В следующей статье по этой теме я покажу, как реализовать целиком вышеописанную систему в AWS при помощи AWS Kinesis и AWS EMR. Следите за новостями!
Автор: Igor Gorbenko
Источник [8]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/python/317231
Ссылки в тексте:
[1] (Источник картинки): https://www.megapixl.com/valerybrozhinsky-stock-images-videos-portfolio
[2] Apache Kafka: https://kafka.apache.org/intro
[3] Apache Spark Streaming: https://spark.apache.org/streaming/
[4] Apache Spark SQL : https://spark.apache.org/sql/
[5] AWS RDS: https://docs.aws.amazon.com/en_us/AmazonRDS/latest/UserGuide/Welcome.html
[6] Send some messages: https://kafka.apache.org/documentation/#quickstart_send
[7] GitHub: https://github.com/igorgorbenko/kafka_project_habr
[8] Источник: https://habr.com/ru/post/451160/?utm_source=habrahabr&utm_medium=rss&utm_campaign=451160
Нажмите здесь для печати.