- PVSM.RU - https://www.pvsm.ru -
В своей работе я часто сталкиваюсь с новыми техническими решениями/программными продуктами, информации о которых в русскоязычном интернете довольно мало. Этой статьей постараюсь восполнить один такой пробел примером из своей недавней практики, когда потребовалось настроить отправку CDC-событий из двух популярных СУБД (PostgreSQL и MongoDB) в кластер Kafka при помощи Debezium. Надеюсь, эта обзорная статья, появившаяся по итогам проделанной работы, окажется полезной и другим.
Debezium [1] — представитель категории программного обеспечения CDC (Capture Data Change [2]), а если точнее — это набор коннекторов для различных СУБД, совместимых с фреймворком Apache Kafka Connect.
Это Open Source-проект, [3] использующий лицензию Apache License v2.0 и спонсируемый компанией Red Hat. Разработка ведётся с 2016 года и на данный момент в нем представлена официальная поддержка следующих СУБД: MySQL, PostgreSQL, MongoDB, SQL Server. Также существуют коннекторы для Cassandra и Oracle, но на данный момент они находятся в статусе «раннего доступа», а новые релизы не гарантируют обратной совместимости.
Если сравнивать CDC с традиционным подходом (когда приложение читает данные из СУБД напрямую), то к его главным преимуществам относят реализацию стриминга изменения данных на уровне строк с низкой задержкой, высокой надежностью и доступностью. Последние два пункта достигаются благодаря использованию кластера Kafka в качестве хранилища CDC-событий.
Также к достоинствам можно отнести тот факт, что для хранения событий используется единая модель, поэтому конечному приложению не придётся беспокоиться о нюансах эксплуатации различных СУБД.
Наконец, благодаря использованию брокера сообщений открывается простор для горизонтального масштабирования приложений, отслеживающих изменения в данных. При этом влияние на источник данных сводится к минимуму, поскольку получение данных происходит не напрямую из СУБД, а из кластера Kafka.
Использование Debezium сводится к такой простой схеме:
СУБД (как источник данных) → коннектор в Kafka Connect → Apache Kafka → консьюмер
В качестве иллюстрации приведу схему с сайта проекта:
Однако эта схема мне не очень нравится, поскольку складывается впечатление, что возможно только использование sink-коннектора.
В действительности же ситуация отличается: наполнение вашего Data Lake (последнее звено на схеме выше) — это не единственный способ применения Debezium. События, отправленные в Apache Kafka, могут быть использоваться вашими приложениями для решения различных ситуаций. Например:
В случае, если у вас приложение на Java и нет необходимости/возможности использовать кластер Kafka, существует также возможность работы через embedded-коннектор [4]. Очевидный плюс в том, что с ним можно отказаться от дополнительной инфраструктуры (в виде коннектора и Kafka). Однако это решение объявлено устаревшим (deprecated) с версии 1.1 и больше не рекомендуется к использованию (в будущих релизах его поддержку могут убрать).
В данной статье будет рассматриваться рекомендуемая разработчиками архитектура, которая обеспечивает отказоустойчивость и возможность масштабирования.
Для того, чтобы начать отслеживать изменения самой главной ценности — данных, — нам потребуются:
Работы по первым двум пунктам, т.е. процесс инсталляции СУБД и Apache Kafka, выходят за рамки статьи. Однако для тех, кто хочет развернуть всё в песочнице, в официальном репозитории с примерами есть готовый docker-compose.yaml [6].
Мы же остановимся подробнее на двух последних пунктах.
Здесь и далее в статье все примеры конфигурации рассматриваются в контексте Docker-образа, распространяемого разработчиками Debezium. Он содержит все необходимые файлы плагинов (коннекторы) и предусматривает конфигурацию Kafka Connect при помощи переменных окружения.
В случае, если предполагается использование Kafka Connect от Confluent, потребуется самостоятельно добавить плагины необходимых коннекторов в директорию, указанную в plugin.path
или задаваемую через переменную окружения CLASSPATH
. Настройки воркера Kafka Connect и коннекторов определяются через конфигурационные файлы, которые передаются аргументами к команде запуска воркера. Подробнее см. в документации [7].
Весь процесс по настройке Debeizum в варианте с коннектором осуществляется в два этапа. Рассмотрим каждый из них:
Для стриминга данных в кластер Apache Kafka во фреймворке Kafka Connect задаются специфичные параметры, такие как:
Официальный Docker-образ проекта поддерживает конфигурацию при помощи переменных окружения — этим и воспользуемся. Итак, скачиваем образ:
docker pull debezium/connect
Минимальный набор переменных окружения, необходимый для запуска коннектора, выглядит следующим образом:
BOOTSTRAP_SERVERS=kafka-1:9092,kafka-2:9092,kafka-3:9092
— начальный список серверов кластера Kafka для получения полного списка членов кластера;OFFSET_STORAGE_TOPIC=connector-offsets
— топик для хранения позиций, на которых на данный момент находится коннектор;CONNECT_STATUS_STORAGE_TOPIC=connector-status
— топик для хранения статуса коннектора и его заданий;CONFIG_STORAGE_TOPIC=connector-config
— топик для хранения данных конфигурации коннектора и его заданий;GROUP_ID=1
— идентификатор группы воркеров, на которых может выполняться задание коннектора; необходим при использовании распределённого (distributed) режима.Запускаем контейнер с этими переменными:
docker run
-e BOOTSTRAP_SERVERS='kafka-1:9092,kafka-2:9092,kafka-3:9092'
-e GROUP_ID=1
-e CONFIG_STORAGE_TOPIC=my_connect_configs
-e OFFSET_STORAGE_TOPIC=my_connect_offsets
-e STATUS_STORAGE_TOPIC=my_connect_statuses debezium/connect:1.2
По умолчанию Debezium пишет данные в формате JSON, что приемлемо для песочниц и небольших объёмов данных, но может стать проблемой в высоконагруженных базах. Альтернативой JSON-конвертеру является сериализация сообщений при помощи Avro [8] в бинарный формат, что позволяет снизить нагрузку на подсистему I/O в Apache Kafka.
Для использования Avro требуется развернуть отдельный schema-registry [9] (для хранения схем). Переменные для конвертера будут выглядеть следующим образом:
name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: http://kafka-registry-01:8081/
name: VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
Детали по использованию Avro и настройке registry за него выходят за рамки статьи — далее для наглядности мы будет использовать JSON.
Теперь можно перейти непосредственно к конфигурации самого коннектора, который будет читать данные из источника.
Рассмотрим на примере коннекторов для двух СУБД: PostgreSQL и MongoDB, — по которым у меня есть опыт и по которым имеются отличия (пусть и небольшие, но в некоторых случаях — существенные!).
Конфигурация описывается в нотации JSON и загружается в Kafka Connect при помощи POST-запроса.
Пример конфигурации коннектора для PostgreSQL:
{
"name": "pg-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "127.0.0.1",
"database.port": "5432",
"database.user": "debezium",
"database.password": "definitelynotpassword",
"database.dbname" : "dbname",
"database.server.name": "pg-dev",
"table.include.list": "public.(.*)",
"heartbeat.interval.ms": "5000",
"slot.name": "dbname_debezium",
"publication.name": "dbname_publication",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "pg-dev.public.(.*)",
"transforms.AddPrefix.replacement": "data.cdc.dbname"
}
}
Принцип работы коннектора после такой настройки довольно прост:
SELECT * FROM table_name
.Об используемых опциях:
name
— имя коннектора, для которого используется конфигурация, описанная ниже; в дальнейшем это имя используется для работы с коннектором (т.е. просмотра статуса/перезапуска/обновления конфигурации) через REST API Kafka Connect;connector.class
— класс коннектора СУБД, который будет использоваться конфигурируемым коннектором;plugin.name
— название плагина для логического декодирования данных из WAL-файлов. На выбор доступны wal2json
, decoderbuffs
и pgoutput
. Первые два требуют установки соответствующих расширений в СУБД, а pgoutput
для PostgreSQL версии 10 и выше не требует дополнительных манипуляций;database.*
— опции для подключения к БД, где database.server.name
— имя инстанса PostgreSQL, используемое для формирования имени топика в кластере Kafka;table.include.list
— список таблиц, в которых мы хотим отслеживать изменения; задаётся в формате schema.table_name
; нельзя использовать вместе с table.exclude.list
;heartbeat.interval.ms
— интервал (в миллисекундах), с которым коннектор отправляет heartbeat-сообщения в специальный топик;heartbeat.action.query
— запрос, который будет выполняться при отправке каждого heartbeat-сообщения (опция появилась с версии 1.1);slot.name
— имя слота репликации, который будет использоваться коннектором;publication.name
— имя публикации [10] в PostgreSQL, которую использует коннектор. В случае, если её не существует, Debezium попытается её создать. В случае, если у пользователя, под которым происходит подключение, недостаточно прав для этого действия — коннектор завершит работу с ошибкой;transforms
определяет, как именно изменять название целевого топика:
transforms.AddPrefix.type
указывает, что будем использовать регулярные выражения;transforms.AddPrefix.regex
— маска, по которой переопределяется название целевого топика;transforms.AddPrefix.replacement
— непосредственно то, на что переопределяем.
По умолчанию коннектор отправляет данные в Kafka по каждой коммитнутой транзакции, а её LSN (Log Sequence Number) записывает в служебный топик offset
. Но что произойдет, если коннектор настроен на чтение не всей базы целиком, а только части её таблиц (в которых обновление данных происходит не часто)?
И тут на помощь приходят опции heartbeat.interval.ms
и heartbeat.action.query
. Использование этих опций в паре даёт возможность каждый раз при отправке heartbeat-сообщения выполнять запрос на изменение данных в отдельной таблице. Тем самым постоянно актуализируется LSN, на котором сейчас находится коннектор (в слоте репликации). Это позволяет СУБД удалить WAL-файлы, которые более не нужны. Подробнее узнать о работе опций можно в документации [11].
Другая опция, достойная более пристального внимания, — это transforms
. Хотя она скорее про удобство и красоту…
По умолчанию Debezium создаёт топики, руководствуясь следующей политикой именования: serverName.schemaName.tableName
. Это не всегда может быть удобно. Опциями transforms
можно с помощью регулярных выражений определять список таблиц, эвенты из которых нужно маршрутизировать в топик с конкретным названием.
В нашей конфигурации благодаря transforms
происходит следующее: все CDC-события из отслеживаемой БД попадут в топик с именем data.cdc.dbname
. В противном случае (без этих настроек) Debezium по умолчанию бы создавал по топику на каждую таблицу вида: pg-dev.public.<table_name>
.
В завершении описания конфигурации коннектора для PostgreSQL стоит рассказать о следующих особенностях/ограничениях его работы:
Итак, загрузим нашу конфигурацию в коннектор:
curl -i -X POST -H "Accept:application/json"
-H "Content-Type:application/json" http://localhost:8083/connectors/
-d @pg-con.json
Проверяем, что загрузка прошла успешно и коннектор запустился:
$ curl -i http://localhost:8083/connectors/pg-connector/status
HTTP/1.1 200 OK
Date: Thu, 17 Sep 2020 20:19:40 GMT
Content-Type: application/json
Content-Length: 175
Server: Jetty(9.4.20.v20190813)
{"name":"pg-connector","connector":{"state":"RUNNING","worker_id":"172.24.0.5:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.24.0.5:8083"}],"type":"source"}
Отлично: он настроен и готов к работе. Теперь прикинемся консьюмером и подключимся к Kafka, после чего добавим и изменим запись в таблице:
$ kafka/bin/kafka-console-consumer.sh
--bootstrap-server kafka:9092
--from-beginning
--property print.key=true
--topic data.cdc.dbname
postgres=# insert into customers (id, first_name, last_name, email) values (1005, 'foo', 'bar', 'foo@bar.com');
INSERT 0 1
postgres=# update customers set first_name = 'egg' where id = 1005;
UPDATE 1
В нашем топике это отобразится следующим образом:
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600374991648,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":602,
"lsn":34088472,
"xmin":null
},
"op":"c",
"ts_ms":1600374991762,
"transaction":null
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
}
],
"optional":false,
"name":"data.cdc.dbname.Key"
},
"payload":{
"id":1005
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"first_name"
},
{
"type":"string",
"optional":false,
"field":"last_name"
},
{
"type":"string",
"optional":false,
"field":"email"
}
],
"optional":true,
"name":"data.cdc.dbname.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":false,
"field":"schema"
},
{
"type":"string",
"optional":false,
"field":"table"
},
{
"type":"int64",
"optional":true,
"field":"txId"
},
{
"type":"int64",
"optional":true,
"field":"lsn"
},
{
"type":"int64",
"optional":true,
"field":"xmin"
}
],
"optional":false,
"name":"io.debezium.connector.postgresql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":false,
"field":"total_order"
},
{
"type":"int64",
"optional":false,
"field":"data_collection_order"
}
],
"optional":true,
"field":"transaction"
}
],
"optional":false,
"name":"data.cdc.dbname.Envelope"
},
"payload":{
"before":{
"id":1005,
"first_name":"foo",
"last_name":"bar",
"email":"foo@bar.com"
},
"after":{
"id":1005,
"first_name":"egg",
"last_name":"bar",
"email":"foo@bar.com"
},
"source":{
"version":"1.2.3.Final",
"connector":"postgresql",
"name":"dbserver1",
"ts_ms":1600375609365,
"snapshot":"false",
"db":"postgres",
"schema":"public",
"table":"customers",
"txId":603,
"lsn":34089688,
"xmin":null
},
"op":"u",
"ts_ms":1600375609778,
"transaction":null
}
}
В обоих случаях записи состоят из ключа (PK) записи, которая была изменена, и непосредственно самой сути изменений: какой была запись до и какой стала после.
INSERT
: значение до (before
) равно null
, а после — строка, которая была вставлена. UPDATE
: в payload.before
отображается предыдущее состояние строки, а в payload.after
— новое с сутью изменений.Этот коннектор использует стандартный механизм репликации MongoDB, считывая информацию из oplog'а primary-узла СУБД.
Аналогично уже описанному коннектору для PgSQL, здесь тоже при первом запуске снимается первичный снапшот данных, после чего коннектор переключается на режим чтения oplog’а.
Пример конфигурации:
{
"name": "mp-k8s-mongo-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "MainRepSet/mongo:27017",
"mongodb.name": "mongo",
"mongodb.user": "debezium",
"mongodb.password": "dbname",
"database.whitelist": "db_1,db_2",
"transforms": "AddPrefix",
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddPrefix.regex": "mongo.([a-zA-Z_0-9]*).([a-zA-Z_0-9]*)",
"transforms.AddPrefix.replacement": "data.cdc.mongo_$1"
}
}
Как можно заметить, здесь нет новых опций по сравнению с прошлым примером, но сократилось лишь количество опций, отвечающих за подключение к БД и их префиксы.
Настройки transforms
в этот раз делают следующее: превращают имя целевого топика из схемы <server_name>.<db_name>.<collection_name>
в data.cdc.mongo_<db_name>
.
Вопрос отказоустойчивости и высокой доступности в наше время стоит как никогда остро — особенно когда мы говорим про данные и транзакции, и отслеживание изменений данных не стоит в этом вопросе в стороне. Рассмотрим, что в принципе может пойти не так и что будет происходить с Debezium в каждом из случаев.
Есть три варианта отказа:
Однако бывают исключения. Если коннектор продолжительное время находился в отключенном состоянии (или не мог достучаться до экземпляра MongoDB), а oplog за это время прошёл ротацию, то при восстановлении подключения коннектор невозмутимо продолжит читать данные с первой доступной позиции, из-за чего часть данных в Kafka не попадёт.
Debezium — мой первый опыт работы с CDC-системами и в целом весьма положительный. Проект подкупил поддержкой основных СУБД, простотой конфигурации, поддержкой кластеризации и активным сообществом. Заинтересовавшимся практикой рекомендую ознакомиться с гайдами для Kafka Connect [13] и Debezium [14].
По сравнению с JDBC-коннектором для Kafka Connect основным преимуществом Debezium является то, что изменения считываются из журналов СУБД, что позволяет получать данные с минимальной задержкой. JDBC Connector (из поставки Kafka Connect) делает запросы к отслеживаемой таблице с фиксированным интервалом и (по этой же причине) не генерирует сообщения при удалении данных (как можно запросить данные, которых нет?).
Для решения схожих задач можно обратить внимание на следующие решения (помимо Debezium):
Читайте также в нашем блоге:
Автор: norc
Источник [23]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/postgresql/357935
Ссылки в тексте:
[1] Debezium: https://debezium.io/
[2] Capture Data Change: https://ru.wikipedia.org/wiki/%D0%97%D0%B0%D1%85%D0%B2%D0%B0%D1%82_%D0%B8%D0%B7%D0%BC%D0%B5%D0%BD%D0%B5%D0%BD%D0%B8%D1%8F_%D0%B4%D0%B0%D0%BD%D0%BD%D1%8B%D1%85
[3] Open Source-проект,: https://github.com/debezium/debezium
[4] embedded-коннектор: https://debezium.io/documentation/reference/1.2/operations/embedded.html
[5] полный список: https://debezium.io/releases/1.2/
[6] docker-compose.yaml: https://github.com/debezium/debezium-examples/tree/master/tutorial
[7] документации: https://docs.confluent.io/current/connect/userguide.html
[8] Avro: https://debezium.io/documentation/reference/1.2/configuration/avro.html
[9] schema-registry: https://docs.confluent.io/current/schema-registry/index.html
[10] публикации: https://www.postgresql.org/docs/12/logical-replication-publication.html
[11] документации: https://debezium.io/documentation/reference/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms
[12] exponential backoff: https://en.wikipedia.org/wiki/Exponential_backoff
[13] Kafka Connect: https://docs.confluent.io/3.1.1/connect/intro.html
[14] Debezium: https://debezium.io/documentation/reference/tutorial.html
[15] JDBC Connector Kafka Connect;: https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#jdbc-connector-source-and-sink-for-cp
[16] MySQL Streamer;: https://github.com/Yelp/mysql_streamer
[17] Maxwell;: https://github.com/zendesk/maxwell
[18] SpinalTap;: https://github.com/airbnb/SpinalTap
[19] Oracle GoldenGate: https://www.oracle.com/uk/middleware/technologies/goldengate.html
[20] Определяем подходящий размер для кластера Kafka в Kubernetes: https://habr.com/ru/company/flant/blog/488920/
[21] Практические истории из наших SRE-будней. Часть 2: https://habr.com/ru/company/flant/blog/510486/
[22] Краткий обзор операторов PostgreSQL для Kubernetes, наш выбор и опыт: https://habr.com/ru/company/flant/blog/520616/
[23] Источник: https://habr.com/ru/post/523510/?utm_source=habrahabr&utm_medium=rss&utm_campaign=523510
Нажмите здесь для печати.