Слежение за обновлениями из MongoDB Oplog используя Scala и Akka Stream

в 8:14, , рубрики: akka, mongodb, scala, streaming

Представляю вашему вниманию перевод статьи Tailing the MongoDB Replica Set Oplog with Scala and Akka Stream.

Введение

В этой статье я попробую объяснить, как следить за обновлениями в MongoDB Oplog при помощи Scala драйвера MongoDB и Akka Stream.

Примеры, приведенные в данной статье не следует рассматривать и использовать в продакшн среде.

Каждый из нас знает Unix команду tail -f, Tailable Cursor имеет тот же концепт. MongoDB предоставляет возможность использовать эту функцию по умолчанию и не требует дополнительных библиотек и инструментов. Что касается Oplog — это такая же коллекция, как и все остальные и ничего нового не требуется.

Если вы хотите узнать больше об Oplog и Tailable Cursor, то вы можете найти больше информации в документации MongoDB:

Проект созданный в данной статье удобно расположился на Github.

Библиотеки и инструменты

Пример файла build.sbt:

name := "MongoDB Replica Set oplog tailing with Akks Streams"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
  "ch.qos.logback" % "logback-classic" % "1.1.5",
  "org.mongodb.scala" %% "mongo-scala-driver" % "1.1.0",
  "com.typesafe.akka" %% "akka-slf4j" % "2.4.2",
  "com.typesafe.akka" %% "akka-stream" % "2.4.2"
)

Также вам понадобится MongoDB Replica Set, могу порекомендовать официальный образ mongo docker.

Запрос для слежения за MongoDB Oplog

Предполагая, что у вас уже есть установленное соединения, давай те определим запрос. Ниже приведен пример:

val collection: MongoCollection[Document] = _
val observable = collection.find(in("op", "i", "d", "u"))
                           .cursorType(CursorType.TailableAwait)
                           .noCursorTimeout(true)

Как видно из запроса, мы определяем курсор типа tailable без timeout, так же поле op, определяющее тип операции в Oplog, должно быть CRUD операцией i/d/u.

Немного о терминологии Akka Stream

Полня документация на английском языке доступна тут.

В Akka Stream схема обработки потока данных определяется следующими абстракциями:

Source — Этап обработки данных только с одной точкой выхода, предоставляющий элементы данных, как только нижележащие элементы обработки данных готовы к приему.

Sink — Этап обработки данных только с одной точкой входа, запрашивающий и принимающий элементы данных с возможностью замедления поступления данных с вышележащего элемента.

Flow — Этап обработки данных только с одной точкой входа и выхода, которая соединяет поток данных и трансформирует элементы проходящие через него.

RunnableGraph — Это Flow который соединен с Source и Sink и готов к выполнению команды run().

В нашем случае мы будем использовать только Source и Sink, так как мы будем только следить за обновлениями без изменения поступающих данных.

MongoDB Scala драйвер и Akka Stream

К сожалению нету возможности по умолчанию для интеграции Akka Stream и MongoDB драйвера, но у Akka Stream есть возможность интеграции с Reactive Streams, так же как и у недавно опубликованного, нового, официального, асинхронного MongoDB Scala драйвера. Новый драйвер использует модель Observable, которая может быть преобразована в Reactive Streams Publisher всего лишь в несколько строк кода, также команда MongoDB уже привела пример преобразования на базе implicit, который мы будем использовать, как точку соприкосновения между этими двумя библиотеками.

Объявление потока Source

Определение Source очень легкое. Из Oplog мы будем получать объекты Document, это и будет типом потока Source.

val source: Source[Document, NotUsed] = _

На данный момент у нас имеется объект FindObservable[Document] из MongoDB Oplog запроса и тип ресурса Source[Document, NotUsed], так как же нам преобразовать одно в другое?

В этом нам поможет магия неявного преобразования. Тип Source содержит метод:

def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]

который преобразовывает тип Reactive Streams Publisher в Source, также у нас есть неявное преобразование из MongoDB Observable в Publisher. Теперь мы можем связать все части:

import rxStreams.Implicits._

val collection: MongoCollection[Document] = _
val observable = collection.find(in("op", "i", "d", "u"))
                           .cursorType(CursorType.TailableAwait)
                           .noCursorTimeout(true)

val source: Source[Document, NotUsed] = Source.fromPublisher(observable)

Это было давольно легко, не так ли?

Что делать дальше?

Все что вы можете себе представить, я пока просто буду выводить все данные в STDOUT:

source.runWith(Sink.foreach(println))

или же можно использовать шорткат:

source.runForeach(println)

это выведет все CRUD операции из MongoDB Replica Set с начала Oplog коллекции до конца и будет следить за новыми поступлениями.

Вы можете задать более специфический запрос, определить базы данных и коллекции из которых вы хотите получать обновления, так же вы можете определить временные рамки для поступающих документов. Это я оставлю на вас.

Зачем нам это?

Возможно вы задумались, зачем нам нужно преобразовывать Observable в Publisher, а потом еще и в Source, в то время как мы могли бы просто использовать Reactive Streams Publisher или же Observable.

Дело в том что модель Observables и Reactive Streams API предоставляют общие механизмы переноса данных в асинхронных рамках без потерь, когда как Akka Stream API фокусируется на трансформации потока данных.

Так что если вы заинтересованы только в переносе данных из Oplog куда либо, вы можете придерживаться модели Observables предоставляемой MongoDB драйвером, но если вам нужно трансформировать поток данных, то выбор падает на Akka Stream.

Заключение

Как видно из статьи, слежение за MongoDB Oplog очень простая задача, особенно в Replica Set. Могут возникнуть другие подводные камни, если это будет MongoDB Sharded Cluster, это будет покрыто в следующей статье.

Конечно, этот пост не охватывает все аспекты этой темы, на пример, обработка ошибок, гарантии доставки и т.д. Это может быть реализовано различными способами и не является темой данной статьи.

Автор: Timur_Kh

Источник

Поделиться новостью

* - обязательные к заполнению поля