Создаем сенсор Airflow

Что такое Airflow
Согласно ИИ обзору от Google, Apache Arflow — самый популярный на сегодняшний день оркестратор данных. Как и другие оркестраторы, он выполняет задачи автоматизации запуска задач, пайплайнов (запуск по расписанию, запуск по событию). Управляет зависимостями между задачами. Позволяет удобно мониторить статус выполнения задач.
Отмечу, что в целом область применения оркестратора не ограничивается Дата Инжинирингом. Благодаря расширяемости и 500 существующим операторам Airflow способен отослать email в день рождения, отправить сообщение в Slack или управлять рекламной компанией Google.
Что такое оператор
Оператор — минимальный компонент Airflow. Из совокупности операторов и настроенных зависимостей между ними строится flow (DAG).
Например:
PythonOperator — выполняет код на Python
EmailOperator — посылает письмо
SqlOperator — выполняет SQL запрос
Что такое сенсор
Сенсор — тип оператора, который настроен на ожидание внешнего события или условия.
Например: появление файла; результат SQL запроса; результат вызова API; ждать час.
Виды сенсоров
Сейчас есть два вида сенсоров:
- Стандартный сенсор
- Defferrable сенсор
Стандартные сенсоры (ранее smart sensors) появились в Airflow 2.0.
Их преимуществами являются возможность работы с Python версий <3.7, понятная модель и доступность реализации. Недостатками — повышенное потребление ресурсов и сложности с логированием работы.
Даже когда сенсор «спит», он потребляет полный рабочий слот. Это означает выделение потока операционной системы, оперативной памяти, захват (lock) файлов. Последняя особенность приводит к тому, что сенсор не обновляет лог в UI в процессе своей работы, а выводит все обновления разом — после своего завершения.
Пример стандартного сенсора
from typing import Dict
from airflow.sensors.base import BaseSensorOperator
class MySensor(BaseSensorOperator):
def __init__(
self,
task_id: str,
poke_interval: float = 60 * 15, # таймер вызова сенсора - в секундах
**kwargs,
) -> None:
super().__init__(task_id=task_id, **kwargs)
self.poke_interval = poke_interval
self.soft_fail = False
self.mode = 'reschedule'
def poke(self, context: Dict) -> bool:
if do_request(): # заменить на свою логику
return True # условие выполнено. выполнение перейдет на следующую задачу.
else:
return False
В отличие от стандартного сенсора, defferrable сенсор использует асинхронную (async) модель выполнения. Это означает, что он не потребляет ресурсы 100% времени своего выполнения, а делает это короткими промежутками. Помимо этого, сенсор оперативно обновляет лог в UI.
Пример defferrable сенсора
import time
from datetime import timedelta
from typing import Any
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
class MySensor(BaseSensorOperator):
def __init__(
self,
task_id: str,
deferrable: bool = True,
defer_interval: float = 10, # в минутах
**kwargs,
) -> None:
super().__init__(task_id=task_id, **kwargs)
self.deferrable = deferrable
self.defer_interval = defer_interval
def update(self) -> bool:
if do_request(): # заменить на свою логику
return True # условие выполнено
else:
return False
def execute(self, context: Context, event: Any = None) -> None: # необходимы именно такие входные аргументы!
if not self.update():
if self.deferrable:
self.defer(trigger=TimeDeltaTrigger(timedelta(minutes=self.defer_interval)), method_name="execute",)
else:
time.sleep(self.defer_interval * 60) # эмуляция defer
else:
return
Выводы и рекомендации
Если у вас современное окружение, используйте deferrable операторы. Они потребляют значительно меньше ресурсов и своевременно обновляют лог.
Ссылки
https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#sensors
https://airflow.apache.org/docs/apache-airflow/2.1.3/_modules/airflow/sensors/base.html
https://www.astronomer.io/docs/learn/what-is-a-sensor/
https://airflow.apache.org/docs/apache-airflow/2.11.0/authoring-and-scheduling/deferring.html