Создаем сенсор Airflow


Создаем сенсор Airflow
Скриншот ИИ-обзора

Что такое Airflow

Согласно ИИ обзору от Google, Apache Arflow — самый популярный на сегодняшний день оркестратор данных. Как и другие оркестраторы, он выполняет задачи автоматизации запуска задач, пайплайнов (запуск по расписанию, запуск по событию). Управляет зависимостями между задачами. Позволяет удобно мониторить статус выполнения задач.

Отмечу, что в целом область применения оркестратора не ограничивается Дата Инжинирингом. Благодаря расширяемости и 500 существующим операторам Airflow способен отослать email в день рождения, отправить сообщение в Slack или управлять рекламной компанией Google.

Что такое оператор

Оператор — минимальный компонент Airflow. Из совокупности операторов и настроенных зависимостей между ними строится flow (DAG).

Например:

PythonOperator — выполняет код на Python

EmailOperator — посылает письмо

SqlOperator — выполняет SQL запрос

Что такое сенсор

Сенсор — тип оператора, который настроен на ожидание внешнего события или условия.

Например: появление файла; результат SQL запроса; результат вызова API; ждать час.

Виды сенсоров

Сейчас есть два вида сенсоров:

  • Стандартный сенсор
  • Defferrable сенсор

Стандартные сенсоры (ранее smart sensors) появились в Airflow 2.0.

Их преимуществами являются возможность работы с Python версий <3.7, понятная модель и доступность реализации. Недостатками — повышенное потребление ресурсов и сложности с логированием работы.

Даже когда сенсор «спит», он потребляет полный рабочий слот. Это означает выделение потока операционной системы, оперативной памяти, захват (lock) файлов. Последняя особенность приводит к тому, что сенсор не обновляет лог в UI в процессе своей работы, а выводит все обновления разом — после своего завершения.

Пример стандартного сенсора

from typing import Dict
from airflow.sensors.base import BaseSensorOperator
 
class MySensor(BaseSensorOperator):
  def __init__(
      self,
      task_id: str,
      poke_interval: float = 60 * 15, # таймер вызова сенсора - в секундах
      **kwargs,
  ) -> None:
    super().__init__(task_id=task_id, **kwargs)
    self.poke_interval = poke_interval
    self.soft_fail = False
    self.mode = 'reschedule'
 
  def poke(self, context: Dict) -> bool:
    if do_request(): # заменить на свою логику
      return True # условие выполнено. выполнение перейдет на следующую задачу.
    else:
      return False

В отличие от стандартного сенсора, defferrable сенсор использует асинхронную (async) модель выполнения. Это означает, что он не потребляет ресурсы 100% времени своего выполнения, а делает это короткими промежутками. Помимо этого, сенсор оперативно обновляет лог в UI.

Пример defferrable сенсора

import time
from datetime import timedelta
from typing import Any
 
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context
 
class MySensor(BaseSensorOperator):
  def __init__(
      self,
      task_id: str,
      deferrable: bool = True,
      defer_interval: float = 10, # в минутах
      **kwargs,
  ) -> None:
    super().__init__(task_id=task_id, **kwargs)
    self.deferrable = deferrable
    self.defer_interval = defer_interval
 
  def update(self) -> bool:
    if do_request(): # заменить на свою логику
      return True # условие выполнено
    else:
      return False
 
  def execute(self, context: Context, event: Any = None) -> None: # необходимы именно такие входные аргументы!
    if not self.update():
      if self.deferrable:
        self.defer(trigger=TimeDeltaTrigger(timedelta(minutes=self.defer_interval)), method_name="execute",)
      else:
        time.sleep(self.defer_interval * 60) # эмуляция defer
    else:
      return

Выводы и рекомендации

Если у вас современное окружение, используйте deferrable операторы. Они потребляют значительно меньше ресурсов и своевременно обновляют лог.

Ссылки

https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html#sensors

https://airflow.apache.org/docs/apache-airflow/2.1.3/_modules/airflow/sensors/base.html

https://www.astronomer.io/docs/learn/what-is-a-sensor/

https://airflow.apache.org/docs/apache-airflow/2.11.0/authoring-and-scheduling/deferring.html

Бесплатный
Airflow1
Комментарии
avatar
Здесь будут комментарии к публикации