- PVSM.RU - https://www.pvsm.ru -

Kafka – стильная, модная, молодежная технология, которую разработала в 2011 году компания LinkedIn и значительно усовершенствовал Apache Software Foundation. Представляет собой надежный, масштабируемый и устойчивый инструмент для обработки и передачи данных в режиме реального времени — шину данных.
Но нужно ли внедрять технологию в угоду моде или амбициям вашего продуктового менеджера? Под катом расскажу про сильные стороны Kafka и задачи, в которых она раскрывается по максимуму. Также напишем быстрое приложение на базе Kafka-as-a-service [1], которую мы недавно релизнули в Selectel.
Используйте навигацию, чтобы облегчить изучение текста:
→ Введение [2]
→ Отличия от классических баз данных [3]
→ Отличие от классических брокеров сообщений [4]
→ Когда стоит использовать Kafka [5]
→ Когда не стоит использовать Kafka [6]
→ Пример простого приложения на Python для чтения и записи в Kafka [7]
→ Обслуживание кластеров Apache Kafka [8]
→ Сценарии использования Kafka как сервис [9]
→ Известные в мире примеры использования [10]
→ Заключение [11]
Считается, что одна из основных задач шины данных — передача данных из системы источника в целевую систему. Но когда у нас один консьюмер и один продюсер, все просто — кажется, шина не нужна. Теперь представим, что у нас 4 консьюмера и 6 продюсеров (а уже завтра может стать больше)?
Нам придется реализовать 24 интеграции! И каждая потребует протокола взаимодействия, формата данных и валидации по схеме. Также нам необходимо выполнить нефункциональные требования, такие как:
Задача уже не кажется простой, но Kafka может с ней справиться и сделает это лучше похожих инструментов. Рассмотрим, за счет каких отличий от классических БД у нее это получится.

У Apache Kafka несколько принципиальных отличий от традиционных баз данных типа MySQL или PostgreSQL. Это обусловлено задачей, для которой она проектировалась, а именно — шардированная, отказоустойчивая потоковая обработка большого количества данных в реальном времени.
Среди ключевых отличий:

Пример структуры данных в Kafka. Orders — название топика, в который будут публиковаться события. Partition — упорядоченная история сообщений. Разделение на партиции позволяет распараллеливать обработку данных и обеспечивает горизонтальную масштабируемость. Message — сообщение, содержащее данные о событии: дату, продукт, стоимость, информацию о клиенте.
Как правильно заметил автор в этой статье [12], с порядком сообщений все не так просто и возможны ситуации, приводящие к неожиданному порядку сообщений. Но их можно исправить на стороне потребителя путем правильной обработки времени события. Иными словами, Kafka не гарантирует порядок сообщений в отличие от того же, RabbitMQ.
Иногда можно услышать, что Kafka — это просто очередь или брокер сообщений, очередной аналог RabbitMQ. Да Apache Kafka можно использовать в качестве такой очереди, но прямым аналогом «кролику» называть ее некорректно. Рассмотрим разницу между двумя решениями.
Дисклеймер: сравнение выше не претендует на звание серьезного, инженерного бенчмарка под любой кейс и профиль нагрузки. Это скорее тезисный набросок основных отличий инструментов.
Бонус Kafka — в том, что она позволяет не только хранить, но и обрабатывать полученные данные в реальном времени, разделяя user data на разные топики. Для этого используются библиотеки Kafka streams [13], Apache flink [14].
Вывод: Apache Kafka отличается от всех остальных способов хранения данных и выделяется как платформа для обработки потоков данных в реальном времени. Простыми словами — шина данных.
Эта технология хорошо подходит для следующих технических задач.
Агрегация событий или логов — например, clickstream. Допустим, нужно отделить данные, полученные с ботов, от данных, полученных с людей. В дальнейшем такие очищенные данные используются в системах ML, аналитики, репортинга и визуализации.
IoT-приложение для сенсоров. Разработка приложения для мониторинга данных с IoT-устройств, таких как сенсоры в зданиях или устройства в производстве. Сенсоры могут передавать данные через Kafka-as-a-Service, а ваше приложение будет анализировать и реагировать на эти данные.
Доставка событий многим потребителям. Генерируется событие в одном месте, а обрабатывается сразу несколькими системами без реализации дополнительной логики обработки на уровне приложения.
Система уведомлений и событий. Система уведомлений, которая будет использовать Kafka для доставки сообщений о событиях, таких как новые заказы, обновления статусов и т.д. Клиенты могут подписаться на определенные темы событий и получать уведомления в реальном времени. Это бывает очень полезно в интеграции с CRM/ERP системами.
Обработка огромных данных. Задача вполне возможна благодаря архитектуре Kafka. Она использует партицирование и распределение данных между брокерами, что позволяет масштабировать систему горизонтально для обработки растущего потока данных, а библиотеки kafka streams, Apache flink могут помочь обрабатывать данные на лету.
Платформа для совместной работы и обсуждения. приложение, которое позволит пользователям обмениваться сообщениями и обсуждать проекты, идеи и задачи. Используйте Kafka для обеспечения мгновенной доставки сообщений и создания персонализированных потоков обсуждения.
Проектирование event-driven системы. Apache Kafka идеально подходит для написания приложения с применением event-driven архитектуры. Обеспечивает надежную и масштабируемую платформу для передачи, сохранения и обработки событий:
Stream-processing и CDC. Как промежуточное хранилище для перекладывание данных из одной системы в другую.

Схема микросервисного, асинхронного, событийно-ориентированного приложения на базе Kafka.
Вывод: Apache Kafka подходит для организаций, которые стремятся оптимизировать свои процессы обработки данных в условиях высокой нагрузки и требований к надежности. Решение может быть незаменимо для ситуаций, где оперативная обработка транзакций и мониторинг рисков критически важны. Также Kafka подойдет для предприятий в сфере интернета вещей (IoT), где потоки данных от большого количества устройств требуют непрерывной обработки и анализа.
Я не сторонник использовать технологии только потому, что они модные, стильные и молодежные. Убежден, что для абсолютного большинства задач в разработке достаточно старой доброй PostgreSQL, о чем недавно писал [15] мой коллега. Поэтому приведу примеры, когда точно не стоит использовать Kafka:
Вывод: для небольших и простых проектов без требований к обработке данных в реальном времени Apache Kafka будет избыточной. Не усложняйте и используйте классическую базу данных или очередь.
Давайте посмотрим на Kafka поближе и напишем небольшое приложение для быстрой демонстрации работы с шиной данных.
Так как мы хотим упростить знакомство, воспользуемся готовым сервисом Kafka в Selectel:

mkdir -p ~/.kafka/ wget https://storage.dbaas.selcloud.ru/CA.pem -O ~/.kafka/root.crt chmod 0600 ~/.kafka/root.crt


Инфраструктура готова, теперь переходим к написанию кода.
Задача первого сервиса будет тривиальной: постить в Kafka сообщение в формате JSON каждые 10 секунд. Я заказал Kafka-as-a-service c публичным адресом, поэтому писать программу буду прямо на своем ноутбуке с выходом в интернет. Вы можете использовать для этого любую другую архитектуру, главное обеспечить сетевую связность до кластера Kafka.
Писать будем на Python, так что нам понадобится скачать библиотеку kafka-python. Снова заходим в консоль ноутбука и выполняем команду (она может отличаться в зависимости от вашей ОС, за подробностями обращайтесь в документацию [18]):
pip install kafka-python
Код первого сервиса — producer:
import json
import time
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
# Настройки для подключения к Kafka
bootstrap_servers = '<IP-ADDR>:9093' # тут нужно указать адрес Kafka
topic = 'testtt' # тут нужно указать топик
username = 'natasha'
password = '<PASS>'# тут нужно указать пароль
ssl_cafile = '/Users/alex/.kafka/root.crt' # Путь к CA сертификату
ssl_certfile = '/Users/alex/.kafka/root.crt' # Путь к вашему клиентскому сертификату
# Пример данных о погоде (в реальности это может быть получено из API)
weather_data = {
"city": "SPb",
"temperature": 42,
"condition": "baby's on Fire!"
}
# Преобразование данных о погоде в JSON
weather_json = json.dumps(weather_data)
# Создание Kafka-продюсера с аутентификацией и SSL
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
ssl_cafile=ssl_cafile,
ssl_certfile=ssl_certfile,
)
print(producer.config['api_version'])
# Отправка данных о погоде в тему Kafka
while True:
try:
future = producer.send(topic, value=weather_json.encode())
record_metadata = future.get(timeout=10)
except KafkaError as e:
print(f"Failed to send data: {e}")
time.sleep(10)
Перейдем к другому сервису. Его задача снова простая — читать эти сообщения. В примере мы используем параметр enable_auto_commit, для того чтобы сообщения можно было легко перечитать сначала.
Код второго сервиса — consumer:
import json
from kafka import KafkaProducer, KafkaConsumer
# Настройки для подключения к Kafka
bootstrap_servers = '<IP-ADDR>:9093' # тут нужно указать адрес Kafka
topic = 'testtt' # тут нужно указать топик
username = 'natasha'
password = '<PASS>'# тут нужно указать пароль
ssl_cafile = '/Users/alex/.kafka/root.crt' # Путь к CA сертификату
ssl_certfile = '/Users/alex/.kafka/root.crt' # Путь к вашему клиентскому сертификату
# Создание Kafka-консьюмера с аутентификацией и SSL
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id='my-group',
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
ssl_cafile=ssl_cafile,
ssl_certfile=ssl_certfile,
auto_offset_reset='none',
enable_auto_commit=False
)
# Чтение данных о погоде из темы Kafka
for message in consumer:
received_data = json.loads(message.value.decode())
print("Received weather data:")
print("City:", received_data["city"])
print("Temperature:", received_data["temperature"])
print("Condition:", received_data["condition"])
print("-----------------------")
# Закрываем консьюмера
consumer.close()
Запускаем оба сервиса и смотрим результат работы второго:
.../consumer.py
-----------------------
Received weather data:
City: SPb
Temperature: 42
Condition: baby's on Fire!
-----------------------
Received weather data:
City: SPb
Temperature: 42
Condition:baby's on Fire!
-----------------------
Received weather data:
City: SPb
Temperature: 42
Condition: baby's on Fire!
-----------------------
...
В рамках примера мы не рассмотрели несколько важных нюансов Kafka — партиции и оффсет.
Партиции — это фундаментальный механизм хранения данных в Apache Kafka. Каждый топик в Kafka разделен на одну или несколько партиций. Каждая партиция — это упорядоченная и неизменяемая последовательность сообщений, которая хранит данные.
При отправке сообщения в Kafka вы можете указать ключ и определить, в какую партицию отправить сообщение. Это позволяет управлять упорядоченностью данных с одним ключом и предотвращать изменение порядка обработки.
# Настройки для подключения к Kafka
bootstrap_servers = 'kafka_address:port'
topic = 'weather_data'
partition = 1 # Номер партиции, в которую отправляем сообщение
username = 'your_username'
password = 'your_password'
ssl_cafile = '/path/to/ca.crt'
ssl_certfile = '/path/to/client.crt'
ssl_keyfile = '/path/to/client.key'
# Содазание экземпляра продюсера не отличается от примеров выше
Для чтения можно реализовать функцию, в которой консьюмеры подпишутся на определенные партиции топика, чтобы обрабатывать данные параллельно. Вы можете регулировать количество консьюмеров и партиций для достижения оптимального баланса между скоростью и обработкой.
# Создание Kafka-консьюмера с аутентификацией и SSL
consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=username,
sasl_plain_password=password,
ssl_cafile=ssl_cafile,
ssl_certfile=ssl_certfile,
auto_offset_reset='earliest',
group_id=None # Отключаем группу потребителей для параллельной обработки
)
# Функция для обработки сообщений
def process_messages():
for message in consumer:
print(f"Consumer in partition {message.partition} received message: {message.value.decode()}")
# Запуск потоков для параллельной обработки
threads = []
for _ in range(3): # Пример: запустить 3 потока
thread = threading.Thread(target=process_messages)
threads.append(thread)
thread.start()
# Ожидание завершения всех потоков
for thread in threads:
thread.join()
Каждая партиция может иметь несколько реплик, которые распределяют данные для обеспечения отказоустойчивости. Репликация позволяет сохранить данные, даже если один из брокеров выходит из строя.
Офсеты в Kafka — это позиции в партициях, указывающие на конкретные места в потоке сообщений. Оффсеты используются для отслеживания прогресса консьюмеров при чтении данных из топиков.
Оффсеты позволяют консьюмерам запоминать, до какого момента они уже прочитали данные. При перезапуске или сбое консьюмер сможет продолжить чтение с места, где он остановился, а не с начала.
Если консьюмер прочитал сообщение и успешно обработал его, он может сохранить оффсет после успешной обработки. Если впоследствии возникнут ошибки, консьюмер сможет использовать оффсет, чтобы перечитать это сообщение и обработать его повторно.
Оффсеты позволяют отслеживать прогресс консьюмеров в реальном времени. Это важно для мониторинга и контроля обработки данных.
Пример работы с офсетом:
# Функция для обработки сообщений
def process_messages():
for message in consumer:
print(f"Received message: {message.value.decode()}")
# Получение и сохранение оффсета
topic_partition = (message.topic, message.partition)
offset = message.offset
print(f"Offset for {topic_partition}: {offset}")
# Запуск функции для обработки сообщений
process_messages()
Для работы с Kafka в примерах я использовал библиотеку kafka-python [19], однако хотел бы отметить еще одну библиотеку — confluent-kafka [20]. Если вам интересны примеры ее работы, пишите в комментариях — я обязательно напишу отдельную статью про примеры работы с confluent-kafka.
В примере выше я использовал готовый кластер Kafka-as-a-service. Это связано с тем, что внедрение Kafka может быть связано с рядом технических сложностей, требующих внимательного и компетентного инженерного подхода.
Кластер Apache Kafka состоит из двух ключевых компонентов:
В процессе установки этих составляющих могут возникнуть проблемы, связанные с конфигурацией зависимостей, требований к аппаратному обеспечению, настроек сетевой связанности и, конечно, определением оптимальных параметров балансировки нагрузки, шардирования и производительности кластера.
В предыдущих версиях Kafka (до 2.8) в качестве контроллера использовался ZooKeeper. Он выполнял роль базы для хранения метаданных о состоянии узлов кластера и расположении сообщений, позволял организовать репликацию, отказоустойчивость и шардирование. В новой версии можно обойтись без ZooKeeper. Теперь эту работу выполняет сервер Kafka в режиме контроллера. Он реализует новый механизм управления метаданными, известный как KRaft. Если вы планируете установку новой версии Kafka, то можете использовать этот механизм управления, чтобы избежать зависимости от отдельной службы ZooKeeper.
Помимо проблем установки и настройки Kafka, отмечу еще ряд трудностей:
Чтобы упростить работу с Kafka, мы в Selectel берем на себя работу, связанную с администрированием инфраструктуры, и ответственность за пул задач, с которым можно познакомиться ниже:

Разграничение ответственности в облачных базах данных Selectel.
Рассмотрим несколько типовых бизнес-сценариев использования Managed Kafka в информационных системах:
Отдельно выделю сценарий использования Kafka-as-a-service для stateful-приложений совместно с Managed Kubernetes [22] для stateless-приложений. Такое сочетание серьезно ускорит time-to-market и снизит капитальные затраты на инфраструктуру для небольших команд.

Схема, как можно использовать облачную Kafka с другими продуктами Selectel.
Возможно, самый яркий пример использования Kafka — газета The NY Times, которая все статьи и правки за последние 160 лет хранит в Kafka. [23] Однако рассмотрим еще несколько чуть менее известных примеров того, как крупные компании успешно используют эту технологию для обработки данных в реальном времени и повышают эффективности бизнес-процессов.
Кто еще:
Итак, мы рассмотрели особенности Apache Kafka, ее функциональность как элемента инфраструктуры. Также быстро создали небольшое приложение на базе managed-решения. Kafka используют многие глобальные компании, но мне интересно, как ею пользуетесь именно вы. Расскажите в комментариях, в каких задачах она для вас незаменима и какая ее функциональность особенно ценна. Либо просто поучаствуйте в опросе ниже!
Автор: Александр Гришин
Источник [27]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/event-driven/386906
Ссылки в тексте:
[1] Kafka-as-a-service: https://slc.tl/c6ie2
[2] Введение: #1
[3] Отличия от классических баз данных: #2
[4] Отличие от классических брокеров сообщений: #3
[5] Когда стоит использовать Kafka : #4
[6] Когда не стоит использовать Kafka: #5
[7] Пример простого приложения на Python для чтения и записи в Kafka: #6
[8] Обслуживание кластеров Apache Kafka : #7
[9] Сценарии использования Kafka как сервис : #8
[10] Известные в мире примеры использования : #9
[11] Заключение: #10
[12] в этой статье: https://habr.com/ru/companies/southbridge/articles/743336/
[13] Kafka streams: https://kafka.apache.org/documentation/streams/
[14] Apache flink: https://habr.com/ru/company/beeline/blog/648729/
[15] писал: https://habr.com/ru/companies/selectel/articles/740680/
[16] панель управления: https://my.selectel.ru/login/
[17] в документации: https://docs.selectel.ru/cloud/managed-databases/kafka/connect-to-cluster/
[18] в документацию: https://kafka-python.readthedocs.io/en/master/install.html
[19] библиотеку kafka-python: https://kafka-python.readthedocs.io/en/master/
[20] confluent-kafka: https://docs.confluent.io/kafka-clients/python/current/overview.html
[21] cloud-native подходе: https://selectel.ru/solutions/microservices/
[22] Managed Kubernetes: https://selectel.ru/services/cloud/kubernetes/?utm_source=habr.com&utm_medium=referral&utm_campaign=kubernetes_article_kafka_290823_content
[23] хранит в Kafka.: https://open.nytimes.com/publishing-with-apache-kafka-at-the-new-york-times-7f0e3b7d2077
[24] Image: https://slc.tl/z7e8t
[25] Image: https://slc.tl/m99nu
[26] Image: https://slc.tl/i5aee
[27] Источник: https://habr.com/ru/companies/selectel/articles/757440/?utm_source=habrahabr&utm_medium=rss&utm_campaign=757440
Нажмите здесь для печати.