Распределённые транзакции Kafka + PostgreSQL средствами Spring

в 7:04, , рубрики: BPM, postgresql, spring, Блог компании ГК ЛАНИТ, Ланит

Как известно, во многих IT-проектах есть типичная задача -  транзакционная обработка данных в интеграционных сценариях, когда необходимо согласованно отправить или принять данные из внешней системы и при этом обновить собственное состояние приложения.

Особенно интересной эта задача становится, когда для интеграции используется Kafka, так как она имеет свои ограничения, касающиеся реализации транзакционности. Вообще, сейчас Kafka достаточно широко применяется именно  в качестве платформы для асинхронной интеграции, это справедливо и для проектов, которые мы в ЛАНИТ — Би Пи Эм реализуем, например, в Альфа-Банке и ВТБ. Поэтому, надеемся, данная тема будет интересна многим.

В этой статье рассмотрим подход к реализации распределённых транзакций (в рамках одного Java-приложения), которые охватывают Kafka и реляционную СУБД. Для этого воспользуемся средствами управления транзакциями, имеющимися в Spring.

Варианты с организацией eventual consistency с помощью типовых паттернов (Saga, Transactional Outbox и др.) и/или использования дополнительных платформ (Debezium, Kafka Connect и пр.) - тема для отдельной статьи (так что ждите продолжения). В этой статье тему затрагивать не будем. 

Распределённые транзакции Kafka + PostgreSQL средствами Spring - 1

Транзакционные гарантии в Kafka

Пятиминутка теории. Kafka поддерживает распределённые транзакции с полноценными ACID-гарантиями только в пределах самой себя, т.е. транзакция может охватывать несколько producer'ов и consumer'ов для разных топиков. Однако Kafka не реализует XA-протокол, поэтому не поддерживает распределённые транзакции с участием ресурсов других типов (например, СУБД или JMS). 

Поэтому любые (в том числе описываемые в статье!) прикладные решения по организации распределённых транзакций с участием Kafka и сторонних ресурсов будут иметь следующие ограничения:

• полноценные ACID-гарантии обеспечить не получится;

• приложение должно быть устойчиво к возникновению неконсистентного состояния и должно самостоятельно принимать необходимые меры к выходу из него;

• потребуется использование сторонних (по отношению к самой Kafka) инструментов.

С технической точки зрения, распределённая транзакция с участием Kafka будет представлять собой набор отдельных локальных транзакций (в каждом из участвующих ресурсов - Kafka, СУБД), и эти транзакции будут координироваться внешними средствами.

Способ координации, с которым мы будем работать (потому что его реализуют используемые нами средства Spring) называется 1 phase commit best effort, при котором каждый участвующий в общей распределённой транзакции менеджер делает коммит своей локальной транзакции независимо от других менеджеров (отсюда - 1 phase...), при этом все менеджеры делают свои коммиты строго по очереди друг за другом, и в эту очередь они должны быть выстроены по возрастанию «надежности», т.е. по убыванию вероятности ошибки при коммите (отсюда - ...best effort). Таким образом, ошибка при коммите общей транзакции будет выброшена максимально рано (в идеале - при коммите у первого менеджера), и остальные менеджеры не сделают свои коммиты.

Очевидно, 1 phase commit best effort - это вероятностный, а не гарантированный метод. Следовательно, в результате система может остаться в неконсистентном состоянии, когда в цепочке менеджеров транзакций первый менеджер («менее надежный») успешно сделал коммит, а у второго («более надежного») возникла ошибка при коммите. В результате потребуются дополнительные меры по выводу системы из неконсистентного состояния (компенсация транзакций либо повторение всей цепочки транзакций с наложением требований идемпотентности на все операции).

Чтобы описанная здесь схема 1 phase commit best effort была устойчива к ошибкам в прикладном коде, он целиком должен выполняться в контексте самой первой локальной транзакции в очереди. В этом случае любые исключения, выбрасываемые в прикладном коде, будут откатывать первую локальную транзакцию и, соответственно, всю распределенную транзакцию тоже. 

Описание функционального сценария

Итак, нашей задачей является организация распределенных транзакций, охватывающих Kafka и реляционную СУБД в рамках одного приложения (например, микросервиса). При этом для полноты картины рассмотрим как задачу транзакционной отправки, так и задачу транзакционного получения сообщений.

Чтобы получить более-менее компактный модельный Java-проект, совместим обе эти задачи в рамках одного общего сквозного сценария, который реализует гарантированную доставку сущностей из одной БД в другую посредством транспорта Kafka. Также для простоты запустим данные через Kafka в один поток: по одному экземпляру producer'а и consumer'а, один topic с одной партицией.

Последовательность шагов сценария будет следующей.

  • Изначально сущности создаются в виде записей в таблице-источнике.

  • На стороне источника запускается обработчик, который в цикле выполняет следующий набор действий (каждая такая итерация заключена в одну транзакцию):

    • выбирает из БД-источника очередную необработанную сущность,

    • помечает её как обработанную и сохраняет в БД-источнике,

    • отправляет её в виде сообщения в Kafka.

  • На стороне потребителя каждое сообщение из Kafka также обрабатывается следующим образом (обработка происходит в рамках отдельной транзакции):

    • сохраняет сущность из полученного сообщения в БД-приёмнике.

Ожидаемый результат:

  • все сущности, изначально созданные в БД-источнике, в результате оказываются в БД-приёмнике,

    • при этом нет повторов и пропусков сущностей;

  • порядок вставки сущностей в БД-приёмнике соответствует порядку их отправки источником,

  • указанные выше результаты достигаются всегда, независимо от исключений, выбрасываемых в прикладной логике источника и приёмника.

Схема интеграционного сценария
Схема интеграционного сценария

Как указывалось ранее, любой из возможных вариантов несёт риск неконсистентного состояния, поэтому отдельно данный минус не указывается.

Spring for Apache Kafka

В целом, Spring for Apache Kafka имеет следующие механизмы для поддержки транзакций при работе с Kafka (https://docs.spring.io/spring-kafka/reference/html/#transactions):

  • KafkaTransactionManager,

  • KafkaMessageListenerContainer,

  • Локальные транзакции при использовании KafkaTemplate,

  • Синхронизация транзакций с другими менеджерами транзакций.

Для включения транзакционной работы с сообщениями необходимо задать свойства Kafka:

  • spring.kafka.producer.transaction-id-prefix = <строка>

  • spring.kafka.consumer.properties.isolation.level = read_committed

Для включения Kafka producer'а в транзакцию Spring нужно использовать бин KafkaTransactionManager и подать в его конструктор бин ProducerFactory как параметр. Для отправки сообщений нужно использовать бин KafkaTemplate, который настроен на эту же самую ProducerFactory, что и KafkaTransactionManager. Детали описаны в документации: https://docs.spring.io/spring-kafka/reference/html/#using-kafkatransactionmanager

Далее рассмотрим способы организации транзакционной работы с использованием указанных выше механизмов.

Первый вариант

Использовать бин ChainedTransactionManager или более специализированный вариант - ChainedKafkaTransactionManager - детали см. https://docs.spring.io/spring-kafka/reference/html/#container-transaction-manager

ChainedTransactionManager - это готовая реализация принципа 1 phase commit best effort.

В модельном проекте ChainedTransactionManager создаётся со следующим порядком transaction manager'ов: ChainedTransactionManager(myKafkaTransactionManager, jpaTransactionManager),

- т.е. сначала создаётся транзакция Kafka, далее - транзакция БД. Коммит происходит в обратном порядке - сначала транзакция БД, потом транзакция Kafka.

Не рекомендуем этот вариант - из-за использования deprecated-компонентов ChainedTransactionManager и ChainedKafkaTransactionManager.

Второй вариант

Вместо deprecated ChainedKafkaTransactionManager можно связывать транзакции Kafka и БД в прикладном коде, для чего потребуется реализация следующих мер:

В конфигурации задать:

Транзакции в приложении должны создаваться в следующем порядке.

  1. Сначала инициируется транзакция Kafka (для этого бин KafkaTransactionManager и сделан менеджером по умолчанию). 

    • При отправке сообщения она создаётся в головном компоненте, помеченном @Transactional.

    • При получении сообщения она создаётся автоматически контейнером.

  1. После этого инициируется транзакция БД.

    • При отправке сообщения она создаётся во вложенном компоненте (вызываемом из головного, см. выше), помеченном как @Transactional(transactionManager = "jpaTransactionManager").

    • При получении сообщения она создаётся в listener'е, помеченном аннотациями @KafkaListener + @Transactional("jpaTransactionManager").

В результате, точно как и в варианте с ChainedTransactionManager, сначала создаётся транзакция Kafka, далее - транзакция БД. Коммит происходит в обратном порядке - сначала транзакция БД, потом транзакция Kafka. Таким образом, этот вариант полностью функционально идентичен предыдущему варианту с ChainedKafkaTransactionManager, он также является реализацией метода 1 phase commit best effort.

Если ошибка происходит на этапе коммита транзакции Kafka, то закомитченная перед этим транзакция БД не откатывается. В сценариях получения сообщения это приведёт к тому, что сообщение будет доставлено повторно, поэтому операции работы с БД должны быть идемпотентными (https://docs.spring.io/spring-kafka/reference/html/#ex-jdbc-sync).

Рекомендуем этот вариант.

TransactionSynchronization

Описанные выше варианты являются неявной реализацией распределённых транзакций - основную логику координации берёт на себя фреймворк.

В качестве альтернативы можно выбрать вариант явной реализации логики транзакции - в прикладном коде. Для этого можно использовать компоненты TransactionSynchronizationManager и TransactionSynchronization, с помощью которых можно достаточно гибко управлять разными аспектами выполнения транзакции, в т.ч. порядком исполнения отдельных шагов и обработкой ошибок (сюда же можно включить логику выхода из неконсистентного состояния).

Однако сложность этого варианта может быть избыточной для многих прикладных сценариев.

Получается, что это нишевый вариант, потенциально более сложный по сравнению с вышеописанными.

Модельный проект

Общее описание

Указанный в начале документа функциональный сценарий реализуется в модельных проектах.

Оба проекта базируются на следующих компонентах:

  • Spring for Apache Kafka;

  • Test containers для Apache Kafka и PostgreSQL. Соответственно, для локального запуска проектов требуется наличие локального Docker (например, Docker Desktop).

Ожидаемым результатом выполнения каждого из проектов будет следующее.

  1. В логах присутствуют сообщения об exception'ах: Sender fault 1, Sender fault 2, Sender fault 3, Receiver fault 1, Receiver fault 2.

  2. После выброса всех exception'ов в логах присутствует информация о том, что все сущности получены, причем в том же порядке, в котором они были отправлены: 

List of received entities: [Text-1, Text-2, Text-3]

No entities left to send

Моделирование ошибок

В модельных проектах есть ряд настроек, позволяющих включать/выключать генерацию ошибок в разных участках кода:

myservice:
  send-transactions-faults-num: 0     # сколько имитировать сбоев при отправке сообщений
  receive-transactions-faults-num: 0  # сколько имитировать сбоев при получении сообщений
  business-faults: true               # имитировать ли сбои в бизнес-логике

send-transactions-faults-num

Генерация системных (инфраструктурных) исключений. Если задать его значение = N (где N > 0), то будут генерироваться исключения в kafka transaction manager'е в транзакциях отправки сообщений. Это приведет к тому, что при отправке сообщений транзакция БД (комиттится первой в цепочке 1PC best effort) будет закомитчена, и соответственно сущность в БД-источнике получит статус «обработана», а транзакция Kafka (комиттится второй в цепочке 1PC best effort) откатится, и соответственно сообщение не уйдет consumer'у. В результате нарушится консистентность: будет потеряно N первых сообщений, например для N = 1:

List of received entities: [Text-2, Text-3]

No entities left to send

receive-transactions-faults-num

Генерация системных (инфраструктурных) исключений. Если задать его значение = N (где N > 0), то будут генерироваться исключения в kafka transaction manager'е в транзакциях получения сообщений. Это приведет к тому, что при отправке сообщений транзакция БД (комиттится первой в цепочке 1PC best effort) будет закомитчена, и соответственно сущность в БД-приемнике сохранится, а транзакция Kafka (комиттится второй в цепочке 1PC best effort) откатится, и соответственно сообщение будет повторно доставлено consumer'у. В результате нарушится консистентность: первое сообщение будет повторено N раз, например для N = 2:

List of received entities: [Text-1, Text-1, Text-1, Text-2, Text-3]

No entities left to send

business-faults

Генерация исключений в прикладном коде источника и приемника сообщений. Эти исключения не влияют на консистентность обработки потока сообщений.

Для выхода из неконсистентного состояния в приложении требуется реализовать отдельную логику. В модельном проекте она не реализована.

Заключение

В целом, можно сказать, что Spring for Apache Kafka предоставляет разработчику привычные (“@Transactional - и всё работает !”) и при этом достаточно неплохие средства поддержки транзакций в интеграционных сценариях. Однако полноценных ACID-гарантий достичь не получится, поэтому могут потребоваться самостоятельные усилия по возврату приложения к консистентному состоянию в случае ошибок.

Автор: Ермаков Денис

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js