- PVSM.RU - https://www.pvsm.ru -
Меня зовут Дмитрий и я вляпался в Airflow. Цель этой статьи — помочь начинающим пользователям Airflow ознакомиться с таблицами PostgreSQL. Время узнать насколько глубока аирфлоуольная нора.
Общая информация про Airflow
Airflow - это инструмент планирования и управления рабочими процессами, он использует базу данных для хранения метаданных о задачах, DAGs (Directed Acyclic Graphs), запусках и результатах.
База данных в Airflow:
SQLAlchemy: Airflow использует библиотеку SQLAlchemy для взаимодействия с различными системами управления базами данных;
SQLite: По умолчанию Airflow использует встроенную базу данных SQLite, которая хранится в файле airflow.db. Этот вариант подходит для маленьких проектов, но не рекомендуется для продуктовой среды;
Postgres, MySQL, MSSQL: Для более крупных проектов и продуктовых сред рекомендуется использовать реляционные базы данных (Postgres, MySQL, MSSQL).
Как было сказано выше основной инструмент это SQLAlchemy посредством которой проиcходит взаимодействие с БД. Рассказывать про то, как её развернуть и подключить я не буду, но есть замечательная статья https://habr.com/ru/articles/860900/ [1]. Упомяну, что настройка БД осуществляется в файле ~/airflow/airflow.cfg в строчке sql_alchemy_conn, где можно настроить подключение именно на постгрес. Сам постгрес настраивается отдельно.
Метаданные в Airflow:
Задачи: Информация о каждой задаче (имя, тип, зависимости, параметры и т.д.);
DAGs: Описание графика зависимостей между задачами;
Запуски: Информация о каждом запуске DAGs (дата и время запуска, статус и т.д.);
Результаты: Информация о результатах выполнения задач (успешно, с ошибкой, время выполнения и т.д.).
Таблицы и их содержимое
Далее коротко расскажу про некоторые таблицы и их поля.
Dag_code
fileloc_hash - хеш дага;
fileloc - расположение файла дага в системе;
last_updated - дата последнего обновления дага;
source_code - весь код дага с управляющими символами.
Из данной таблички можно вытянуть весь код DAG и сохранить в файл для переиспользования или корректировки, а также использовать в системе контроля версий.
Dag_pickle
id - идентификатор дага;
pickle - основное поле, хранит сериализованные данные дага;
created_dttm - информация о времени создания или обновления записи;
pickle_hash - хеш сериализованного объекта дага, позволяет Airflow быстро проверять изменился ли даг с последнего сохранения.
Отслеживать изменения дагов при их выполнении.
Log
id - идентификатор записи;
dttm - дата и время записи события;
dag_id - идентификатор DAG, к которому относится событие;
task_id - идентификатор задачи внутри DAG;
map_index - если задач внутри DAG несколько, то можно так получить инстансы задачи;
event - тип события (например, success, failed, skipped, и т.д.). Это более высокоуровневая информация, чем подробные сообщения в log.
execution_date - дата и время выполнения задачи;
owner - автор DAG или группа пользователей, отвечающих за него;
extra - дополнительная информация в формате JSON, может выводить автор DAG, хост, полный путь выполнения задачи и т.п.
По необходимости проанализировать события для конкретных пользователей или для определенных задач. Стоит так же отметить, что место хранения логов определяется в вышеупомянтом конфигурационном файле airflow.cfg в разделе [core], параметр base_log_folder(если у вас версия 2.0 и выше, то иначе – в разделе [logging]), и по умолчанию Airflow сохраняет логи задач в файлах на локальной файловой системе сервера Airflow. Поэтому если своевременно не ухаживать за своими логами, то есть все шансы потратить кучу времени на очистку в будущем. Либо использовать удаленное хранилище для логов. Но никто не запрещает вам испытать на себе роль Нео. Тук-тук-тук, так сказать.
Slot_pool
id: уникальный идентификатор записи в таблице;
pool: имя пула слотов. Это текстовая строка, которая используется для идентификации пула в Airflow;
slots: максимальное количество слотов, доступных в пуле. Это целое число, которое определяет, сколько задач могут выполняться одновременно из этого пула;
description: Дополнительное описание пула слотов (необязательное поле).
Из этой таблички мы можем узнать сколько можно в целом запускать DAGs одновременно. То есть таким образом мы можем распределять задачи в зависимости от потребностей и потребляемых ресурсов по разных пулам. Нет времени объяснять – просто запускай!
Connection
conn_id: уникальный идентификатор соединения;
conn_type: тип соединения. Это определяет, к какой системе подключается соединение Postgres, MySQL, S3, Hive, OpenAI Cloud Storage и т.п.;
description: описание соединения. Полезно для документирования и понимания назначения соединения;
host: хост или адрес сервера, к которому подключается соединение. Например, это может быть имя хоста базы данных, URL-адрес веб-сервиса или имя файла;
schema: схема (или база данных) в пределах сервера, если это применимо. Это поле используется, например, при работе с базами данных, такими как PostgreSQL или MySQL;
login: имя пользователя для аутентификации на сервере;
password: пароль для аутентификации на сервере. Важно: В производственных средах не рекомендуется хранить пароли в открытом виде в таблице;
port: порт сервера, к которому подключается соединение. Обычно используется для баз данных или других сетевых служб;
Если вы используете шифрование, то могут быть дополнительные поля is_encrypted и is_extra_encrypted. Первое указывает зашифрован ли пароль, а второе зашифрованы ли дополнительные параметры;
extra: дополнительные параметры соединения в формате JSON.
Таблица connections в Airflow хранит информацию о соединениях с внешними системами, которые используются DAGs для доступа к данным или выполнения других операций.
Dag
dag_id: уникальный идентификатор DAG. Да, это может быть и текст;
root_dag_id: идентификатор корневого DAG, если это под-DAG;
is_paused: DAG приостановлен? Если TRUE, DAG не будет запускаться автоматически;
is_subdag: DAG является под-DAGом?
is_active: DAG активен?
last_parsed_time: время последней успешной парсинга DAG-файла. Когда Airflow загружает DAG из файла, он записывает время в это поле;
last_pickled: сериализованный (pickled) объект DAG. Это позволяет Airflow быстро загружать DAG из базы данных, минуя повторный парсинг кода. В современных версиях Airflow это поле используется реже из-за перехода на более эффективные методы сериализации.
last_expired: время когда DAG получал сигнал об обновлении;
scheduler_lock: время установки блокировки планировщиком. Это предотвращает одновременный доступ к DAG нескольким планировщикам;
pickle_id: Идентификатор записи в таблице dag_pickle.
Fileloc: путь к файлу DAG в файловой системе;
processor_subdir: подкаталог для обработки DAG.
owners: владельцы DAG (обычно в формате JSON).
description: описание DAG.
default_view: предпочтительный способ просмотра DAG в интерфейсе Airflow.
schedule_interval: интервал планирования DAG (например, @daily, 0 0 * * *).
timetable_description: описание расписания запусков.
max_active_tasks: максимальное количество одновременно выполняемых задач в DAG.
max_active_runs: максимальное количество одновременно выполняемых запусков DAG.
has_task_concurrency_limits: есть ли ограничения на параллелизм задач?
has_import_errors: есть ли ошибки импорта в DAG-файле?
next_dagrun: планируемое время следующего запуска DAG.
next_dagrun_data_interval_start: начало интервала данных для следующего запуска.
next_dagrun_data_interval_end: конец интервала данных для следующего запуска.
next_dagrun_create_after: время, после которого можно создать следующий запуск DAG.
Таблица dag (или dag_model в более новых версиях Airflow) в Airflow хранит метаданные о ваших DAGs (Directed Acyclic Graphs). Это центральное хранилище информации о всех ваших DAG-файлах, их состоянии и истории выполнения.
Какая статья без кода. Посмотрим, сколько активных дагов, на паузе, активных на паузе, всего дагов, и какую долю составляют вышеупомянутые.
WITH dags AS (
SELECT
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True) AS active,
(SELECT COUNT(dag_id) FROM dag WHERE is_paused = True) AS paused,
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True AND is_paused = True) AS a_n_p,
(SELECT COUNT(dag_id) FROM dag) AS all_dags
)
SELECT
active,
paused,
a_n_p,
all_dags,
CAST(active AS FLOAT) * 100 / all_dags AS active_ratio,
CAST(paused AS FLOAT) * 100 / all_dags AS paused_ratio,
CAST(a_n_p AS FLOAT) * 100 / all_dags AS a_n_p_ratio
FROM dags;
Вы сможете это протестить самостоятельно(при некоторых условиях), а у меня получилось следующее:
|
Активных |
Паузанутых |
Активно паузанутых |
Всего |
Доля активных |
Доля паузанутых |
Доля активно паузанутых |
|
360 |
278 |
166 |
574 |
45% |
35% |
20% |

И всё в целом хорошо, но как могут быть активно паузанутые?
А это вопрос для самоизучения. Мало ли на практике нужно будет проанализировать DAGs, а тут такая подстава.
Log_template
id: уникальный идентификатор записи лога;
filename: путь к файлу лога;
elasticsearch_id: ID записи в Elasticsearch (если используется Elasticsearch для хранения логов). Это поле будет NULL, если Elasticsearch не используется;
created_at: время создания записи лога.
Таблица log_template в Airflow используется для хранения шаблонов имен файлов логов. Эти шаблоны определяют, как Airflow формирует имена файлов для логов задач. Она не так часто используется напрямую разработчиками DAG, а больше управляется самой системой Airflow. Так же в таблице может присутствовать поле template, в котором мы можем задать шаблон для записи логов. Как и определить это самостоятельно новым классом/методом и т.п. Это довольно сложный и специфический процесс под конкретные цели и задачи, со своими плюсами и минусами. Главное - оно работает из коробки...

Ab_user
id: уникальный идентификатор пользователя;
first_name: имя пользователя;
last_name: фамилия пользователя;
username: имя пользователя для входа, иногда известно как логин, но это уже совсем другая история;
password: хешированный пароль;
active: указывает, активен ли аккаунт пользователя;
email: адрес электронной почты пользователя;
last_login: время последнего(если кому-то это очень важно, то крайнего) входа пользователя;
login_count: количество успешных входов пользователя;
fail_login_count: количество неудачных попыток входа пользователя;
created_on: время создания аккаунта пользователя;
changed_on: время последнего изменения данных пользователя;
created_by_fk: идентификатор пользователя, который создал данного;
changed_by_fk: идентификатор пользователя, который последний раз изменил данные этого пользователя.
Таблица ab_user содержит базовую информацию о пользователях Airflow, необходимую для управления доступом и аутентификации. В зависимости от вашей конфигурации, она может содержать дополнительные поля для более сложного управления доступом и пользовательскими профилями.
В этой статье мы рассмотрели структуру нескольких ключевых таблиц в базе данных Airflow, включая dag, connections, ab_user и таблицы логов. Понимание структуры этих таблиц необходимо для эффективного управления и мониторинга рабочих процессов в Airflow. Более подробную информацию можно найти в официальной документации Airflow.
Автор: LunarBirdMYT
Источник [2]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/airflow/405030
Ссылки в тексте:
[1] https://habr.com/ru/articles/860900/: https://habr.com/ru/articles/860900/
[2] Источник: https://habr.com/ru/articles/866542/?utm_source=habrahabr&utm_medium=rss&utm_campaign=866542
Нажмите здесь для печати.