- PVSM.RU - https://www.pvsm.ru -

Cassandra Sink для Spark Structured Streaming

Пару месяцев назад я начала изучать Spark, и в какой-то момент столкнулась с проблемой сохранения вычислений Structured Streaming в базе данных Cassandra.

В данном посте я привожу простой пример создания и использования Cassandra Sink для Spark Structured Streaming. Я надеюсь, что пост будет полезен тем, кто недавно начал работать со Spark Structured Streaming и задается вопросом, как выгружать результаты вычислений в базу данных.

Идея приложения очень проста — получить и распарсить сообщения из кафки, выполнить простые трансформации в спарке и сохранить результаты в кассандре.

Плюсы Structured Streaming

О Structured Streaming можно подробно почитать в документации [1]. Если коротко, то Structured Streaming — это хорошо масштабируемый механизм обработки потоковой информации, который основан на движке Spark SQL. Он позволяет использовать Dataset / DataFrame для агрегирования данный, вычисления оконных функций, соединений и т. д. То есть Structured Streaming позволяет использовать старый добрый SQL для работы с потоками данных.

В чем проблема?

Стабильный релиз Spark Structured Streaming вышел в 2017 году. То есть, это довольно новый API, в котором реализован базовый функционал, но некоторые вещи придется делать самим. Например, в Structured Streaming есть стандартные функции для записи выходных данных в файл, кафку, консоль или память, но для того чтобы сохранить данные в базу придется использовать имеющийся в Structured Streaming приемник foreach и реализовать интерфейс ForeachWriter. Начиная с версии Spark 2.3.1, реализовать такой функционал можно только на Scala и Java.

Я предполагаю, что читатель уже знает, как Structured Streaming работает в общих чертах, знает, как реализовать нужные трансформации и теперь готов выгрузить полученные результаты в базу. Если некоторые из вышеперечисленных шагов неясны, официальная документация может послужить хорошей отправной точкой в изучении Structured Streaming. В данной статье, я бы хотела бы сфокусироваться на последнем шаге, когда вам нужно сохранить результаты в базе данных.

Ниже, я опишу пример реализации Cassandra sink для Structured Streaming и поясню как запустить его в кластере. Полный код доступен здесь [2].

Когда я впервые столкнулась с вышеуказанной проблемой, вот этот проект [3] оказался очень полезным. Однако он может показаться немного сложным, если читатель только начал работать со Structured Streaming и ищет простой пример того как выгрузить данные в кассандру. Кроме того, проект написан для работы в локальном режиме и требует некоторых изменений для запуска в кластере.

Также хочу привести примеры того, как сохранить данные в MongoDB [4] и любую другую базу, использующую JDBC [5].

Простое решение

Чтобы выгрузить данные во внешнюю систему, необходимо использовать приемник foreach. Подробнее об этом можно почитать здесь [6]. Если вкратце, то необходимо реализовать интерфейс ForeachWriter. То есть необходимо определить, как открыть соединение, как обработать каждую порцию данных и как закрыть соединение в конце обработки. Исходный код выглядит следующим образом:

class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
  // This class implements the interface ForeachWriter, which has methods that get called 
  // whenever there is a sequence of rows generated as output
  val cassandraDriver = new CassandraDriver();
  def open(partitionId: Long, version: Long): Boolean = {
    // open connection
    println(s"Open connection")
    true
  }
  def process(record: org.apache.spark.sql.Row) = {
    println(s"Process new $record")
    cassandraDriver.connector.withSessionDo(session =>
      session.execute(s"""
       insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
       values('${record(0)}', '${record(1)}', '${record(2)}')""")
    )
  }
  def close(errorOrNull: Throwable): Unit = {
    // close the connection
    println(s"Close connection")
  }
}

Определение CassandraDriver и структуру выходнрй таблицы я опишу позже, а пока давайте более подробно рассмотрим, как работает приведенный выше код. Чтобы подключиться к касандре из спарка, я создаю объект CassandraDriver, который обеспечивает доступ к CassandraConnector – коннектору разработанному DataStax [7]. CassandraConnector отвечает за открытие и закрытие соединения с базой, поэтому я просто вывожу отладочные сообщения в open и close методах класса CassandraSinkForeach.

Приведенный выше код вызывается из основного приложения следующим образом:

val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()

CassandraSinkForeach создается для каждой строки данных, таким образом каждая рабочая нода вставляет свою часть строк в базу данных. Т.е, каждая рабочая нода выполняет val cassandraDriver = new CassandraDriver (); Так выглядит CassandraDriver:

class CassandraDriver extends SparkSessionBuilder {
  // This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
  // It extends SparkSessionBuilder so to use the same SparkSession on each node.
  val spark = buildSparkSession
  import spark.implicits._
  val connector = CassandraConnector(spark.sparkContext.getConf)
  // Define Cassandra's table which will be used as a sink
  /* For this app I used the following table:
       CREATE TABLE fx.spark_struct_stream_sink (
       fx_marker text,
       timestamp_ms timestamp,
       timestamp_dt date,
       primary key (fx_marker));
  */
  val namespace = "fx"
  val foreachTableSink = "spark_struct_stream_sink"
}

Давайте посмотрим более подробно на объект spark . Код для SparkSessionBuilder выглядит следующим образом:

class SparkSessionBuilder extends Serializable {
  // Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors. 
  // Note here the usage of @transient lazy val 
  def buildSparkSession: SparkSession = {
    @transient lazy val conf: SparkConf = new SparkConf()
    .setAppName("Structured Streaming from Kafka to Cassandra")
    .set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
    .set("spark.sql.streaming.checkpointLocation", "checkpoint")
    @transient lazy val spark = SparkSession
    .builder()
    .config(conf)
    .getOrCreate()
    spark
  }
}

На каждой рабочей ноде SparkSessionBuilder предоставляет доступ к SparkSession, который был создан на драйвере. Чтобы сделать такой доступ возможным, необходимо сериализовать SparkSessionBuilder и использовать transient [8] lazy val, который позволяет системе сериализации игнорировать объекты conf и spark при инициализации программы и до момента обращения к объектам. Таким образом, при запуске программы buildSparkSession сериализуется и отправляется каждой рабочей ноде, но объекты conf и spark разрешаются только в момент когда к ним обращается рабочая нода.

Теперь давайте посмотрим на основной код приложения:

object KafkaToCassandra extends SparkSessionBuilder {
  // Main body of the app. It also extends SparkSessionBuilder.
  def main(args: Array[String]) {
    val spark = buildSparkSession
    import spark.implicits._
    // Define location of Kafka brokers:
    val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"
    /*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by n 
    {"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"}
    {"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"}
    {"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"}
    {"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"}
    */
    // Read incoming stream
    val dfraw = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", broker)
    .option("subscribe", "currency_exchange")
    .load()
    val schema = StructType(
      Seq(
        StructField("fx_marker", StringType, false),
        StructField("timestamp_ms", StringType, false)
      )
    )
    val df = dfraw
    .selectExpr("CAST(value AS STRING)").as[String]
    .flatMap(_.split("n"))
    val jsons = df.select(from_json($"value", schema) as "data").select("data.*")
    // Process data. Create a new date column
    val parsed = jsons
      .withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS")))
      .filter("fx_marker != ''")
    // Output results into a database
    val sink = parsed
    .writeStream
    .queryName("KafkaToCassandraForeach")
    .outputMode("update")
    .foreach(new CassandraSinkForeach())
    .start()
    sink.awaitTermination()
  }
}

Когда приложение отправляется на исполнение, buildSparkSession сериализуется и отправляется рабочим нодам, однако conf и spark объекты остаются неразрешенными. Затем драйвер создает spark объект внутри KafkaToCassandra и распределяет работу между рабочими нодами. Каждая рабочая нода читает данные из кафки, делает простые преобразования над полученной порцией записей, и когда рабочая нода готова записать результаты в базу, она разрешает conf и spark объекты, тем самым получая доступ к SparkSession, созданной на драйвере.

Как собрать и запустить приложение?

Когда я перешла из PySpark в Scala, мне потребовалось некоторое время, чтобы понять, как собрать приложение. Поэтому, я включила Maven pom.xml в свой проект. Читатель может собрать приложение с помощью Maven, выполнив команду mvn package. После приложение можно отправить на исполнение используя

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar

Для того чтобы собрать и запустить приложение, необходимо заменить имена моих AWS-машин своими (т.е. заменить все, что похоже на ec2-xx-xxx-xx-xx.compute-1.amazonaws.com).

Spark и Structured Streaming в частности – новая для меня тема, поэтому буду очень признательна читателям за комментарии, обсуждение и исправления.

Автор: Nyrka

Источник [9]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/apache-2/294910

Ссылки в тексте:

[1] документации: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

[2] здесь: https://github.com/epishova/Structured-Streaming-Cassandra-Sink

[3] вот этот проект: https://github.com/polomarcus/Spark-Structured-Streaming-Examples

[4] MongoDB: https://jira.mongodb.org/browse/SPARK-134

[5] JDBC : https://docs.databricks.com/_static/notebooks/structured-streaming-etl-kafka.html

[6] здесь: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

[7] DataStax: https://github.com/datastax/spark-cassandra-connector

[8] transient: https://habr.com/users/transient/

[9] Источник: https://habr.com/post/425503/?utm_campaign=425503