- PVSM.RU - https://www.pvsm.ru -
Пару месяцев назад я начала изучать Spark, и в какой-то момент столкнулась с проблемой сохранения вычислений Structured Streaming в базе данных Cassandra.
В данном посте я привожу простой пример создания и использования Cassandra Sink для Spark Structured Streaming. Я надеюсь, что пост будет полезен тем, кто недавно начал работать со Spark Structured Streaming и задается вопросом, как выгружать результаты вычислений в базу данных.
Идея приложения очень проста — получить и распарсить сообщения из кафки, выполнить простые трансформации в спарке и сохранить результаты в кассандре.
О Structured Streaming можно подробно почитать в документации [1]. Если коротко, то Structured Streaming — это хорошо масштабируемый механизм обработки потоковой информации, который основан на движке Spark SQL. Он позволяет использовать Dataset / DataFrame для агрегирования данный, вычисления оконных функций, соединений и т. д. То есть Structured Streaming позволяет использовать старый добрый SQL для работы с потоками данных.
Стабильный релиз Spark Structured Streaming вышел в 2017 году. То есть, это довольно новый API, в котором реализован базовый функционал, но некоторые вещи придется делать самим. Например, в Structured Streaming есть стандартные функции для записи выходных данных в файл, кафку, консоль или память, но для того чтобы сохранить данные в базу придется использовать имеющийся в Structured Streaming приемник foreach и реализовать интерфейс ForeachWriter. Начиная с версии Spark 2.3.1, реализовать такой функционал можно только на Scala и Java.
Я предполагаю, что читатель уже знает, как Structured Streaming работает в общих чертах, знает, как реализовать нужные трансформации и теперь готов выгрузить полученные результаты в базу. Если некоторые из вышеперечисленных шагов неясны, официальная документация может послужить хорошей отправной точкой в изучении Structured Streaming. В данной статье, я бы хотела бы сфокусироваться на последнем шаге, когда вам нужно сохранить результаты в базе данных.
Ниже, я опишу пример реализации Cassandra sink для Structured Streaming и поясню как запустить его в кластере. Полный код доступен здесь [2].
Когда я впервые столкнулась с вышеуказанной проблемой, вот этот проект [3] оказался очень полезным. Однако он может показаться немного сложным, если читатель только начал работать со Structured Streaming и ищет простой пример того как выгрузить данные в кассандру. Кроме того, проект написан для работы в локальном режиме и требует некоторых изменений для запуска в кластере.
Также хочу привести примеры того, как сохранить данные в MongoDB [4] и любую другую базу, использующую JDBC [5].
Чтобы выгрузить данные во внешнюю систему, необходимо использовать приемник foreach. Подробнее об этом можно почитать здесь [6]. Если вкратце, то необходимо реализовать интерфейс ForeachWriter. То есть необходимо определить, как открыть соединение, как обработать каждую порцию данных и как закрыть соединение в конце обработки. Исходный код выглядит следующим образом:
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
// This class implements the interface ForeachWriter, which has methods that get called
// whenever there is a sequence of rows generated as output
val cassandraDriver = new CassandraDriver();
def open(partitionId: Long, version: Long): Boolean = {
// open connection
println(s"Open connection")
true
}
def process(record: org.apache.spark.sql.Row) = {
println(s"Process new $record")
cassandraDriver.connector.withSessionDo(session =>
session.execute(s"""
insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
values('${record(0)}', '${record(1)}', '${record(2)}')""")
)
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
println(s"Close connection")
}
}
Определение CassandraDriver и структуру выходнрй таблицы я опишу позже, а пока давайте более подробно рассмотрим, как работает приведенный выше код. Чтобы подключиться к касандре из спарка, я создаю объект CassandraDriver, который обеспечивает доступ к CassandraConnector – коннектору разработанному DataStax [7]. CassandraConnector отвечает за открытие и закрытие соединения с базой, поэтому я просто вывожу отладочные сообщения в open и close методах класса CassandraSinkForeach.
Приведенный выше код вызывается из основного приложения следующим образом:
val sink = parsed
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start()
CassandraSinkForeach создается для каждой строки данных, таким образом каждая рабочая нода вставляет свою часть строк в базу данных. Т.е, каждая рабочая нода выполняет val cassandraDriver = new CassandraDriver (); Так выглядит CassandraDriver:
class CassandraDriver extends SparkSessionBuilder {
// This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
// It extends SparkSessionBuilder so to use the same SparkSession on each node.
val spark = buildSparkSession
import spark.implicits._
val connector = CassandraConnector(spark.sparkContext.getConf)
// Define Cassandra's table which will be used as a sink
/* For this app I used the following table:
CREATE TABLE fx.spark_struct_stream_sink (
fx_marker text,
timestamp_ms timestamp,
timestamp_dt date,
primary key (fx_marker));
*/
val namespace = "fx"
val foreachTableSink = "spark_struct_stream_sink"
}
Давайте посмотрим более подробно на объект spark . Код для SparkSessionBuilder выглядит следующим образом:
class SparkSessionBuilder extends Serializable {
// Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors.
// Note here the usage of @transient lazy val
def buildSparkSession: SparkSession = {
@transient lazy val conf: SparkConf = new SparkConf()
.setAppName("Structured Streaming from Kafka to Cassandra")
.set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
.set("spark.sql.streaming.checkpointLocation", "checkpoint")
@transient lazy val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
spark
}
}
На каждой рабочей ноде SparkSessionBuilder предоставляет доступ к SparkSession, который был создан на драйвере. Чтобы сделать такой доступ возможным, необходимо сериализовать SparkSessionBuilder и использовать transient [8] lazy val, который позволяет системе сериализации игнорировать объекты conf и spark при инициализации программы и до момента обращения к объектам. Таким образом, при запуске программы buildSparkSession сериализуется и отправляется каждой рабочей ноде, но объекты conf и spark разрешаются только в момент когда к ним обращается рабочая нода.
Теперь давайте посмотрим на основной код приложения:
object KafkaToCassandra extends SparkSessionBuilder {
// Main body of the app. It also extends SparkSessionBuilder.
def main(args: Array[String]) {
val spark = buildSparkSession
import spark.implicits._
// Define location of Kafka brokers:
val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"
/*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by n
{"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"}
{"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"}
{"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"}
{"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"}
*/
// Read incoming stream
val dfraw = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", "currency_exchange")
.load()
val schema = StructType(
Seq(
StructField("fx_marker", StringType, false),
StructField("timestamp_ms", StringType, false)
)
)
val df = dfraw
.selectExpr("CAST(value AS STRING)").as[String]
.flatMap(_.split("n"))
val jsons = df.select(from_json($"value", schema) as "data").select("data.*")
// Process data. Create a new date column
val parsed = jsons
.withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS")))
.filter("fx_marker != ''")
// Output results into a database
val sink = parsed
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start()
sink.awaitTermination()
}
}
Когда приложение отправляется на исполнение, buildSparkSession сериализуется и отправляется рабочим нодам, однако conf и spark объекты остаются неразрешенными. Затем драйвер создает spark объект внутри KafkaToCassandra и распределяет работу между рабочими нодами. Каждая рабочая нода читает данные из кафки, делает простые преобразования над полученной порцией записей, и когда рабочая нода готова записать результаты в базу, она разрешает conf и spark объекты, тем самым получая доступ к SparkSession, созданной на драйвере.
Когда я перешла из PySpark в Scala, мне потребовалось некоторое время, чтобы понять, как собрать приложение. Поэтому, я включила Maven pom.xml в свой проект. Читатель может собрать приложение с помощью Maven, выполнив команду mvn package. После приложение можно отправить на исполнение используя
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
Для того чтобы собрать и запустить приложение, необходимо заменить имена моих AWS-машин своими (т.е. заменить все, что похоже на ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).
Spark и Structured Streaming в частности – новая для меня тема, поэтому буду очень признательна читателям за комментарии, обсуждение и исправления.
Автор: Nyrka
Источник [9]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/apache-2/294910
Ссылки в тексте:
[1] документации: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
[2] здесь: https://github.com/epishova/Structured-Streaming-Cassandra-Sink
[3] вот этот проект: https://github.com/polomarcus/Spark-Structured-Streaming-Examples
[4] MongoDB: https://jira.mongodb.org/browse/SPARK-134
[5] JDBC : https://docs.databricks.com/_static/notebooks/structured-streaming-etl-kafka.html
[6] здесь: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
[7] DataStax: https://github.com/datastax/spark-cassandra-connector
[8] transient: https://habr.com/users/transient/
[9] Источник: https://habr.com/post/425503/?utm_campaign=425503
Нажмите здесь для печати.