Новинка в Spark 4.1: декларативный фреймворк пайплайнов (Spark Declarative Pipelines/SDP)

Компания Databricks (разработчик фреймворка Spark) передала в публичное пользование часть обкатанного в ее облачном сервисе решения Lakeflow Spark Declarative Pipelines, а именно, сам движок решения.
О чем речь?
Речь идет о внедрении возможности декларативного описания пайплайна (переход от концепции «как делать» к «что делать»). Пример на Python:
from pyspark import pipelines as dp
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, \
StructField
# Путь к данным
file_path = f"/databricks-datasets/songs/data-001/"
# Определение streaming table для забора данных
schema = StructType(
[
StructField("artist_id", StringType(), True),
StructField("artist_lat", DoubleType(), True),
StructField("artist_long", DoubleType(), True),
StructField("artist_location", StringType(), True),
StructField("artist_name", StringType(), True),
StructField("duration", DoubleType(), True),
StructField("end_of_fade_in", DoubleType(), True),
StructField("key", IntegerType(), True),
StructField("key_confidence", DoubleType(), True),
StructField("loudness", DoubleType(), True),
StructField("release", StringType(), True),
StructField("song_hotnes", DoubleType(), True),
StructField("song_id", StringType(), True),
StructField("start_of_fade_out", DoubleType(), True),
StructField("tempo", DoubleType(), True),
StructField("time_signature", DoubleType(), True),
StructField("time_signature_confidence", DoubleType(), True),
StructField("title", StringType(), True),
StructField("year", IntegerType(), True),
StructField("partial_sequence", IntegerType(), True)
]
)
@dp.table(
comment="Raw songs data"
)
def songs_raw():
return (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "csv")
.option("sep","\t")
.load(file_path))
# Описываем materialized view для валидации данных и переименования колонки
@dp.materialized_view(
comment="Million Song Dataset with data cleaned and prepared for analysis"
)
def songs_prepared():
return (
spark.read.table("songs_raw")
.withColumnRenamed("title", "song_title")
.select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year")
)
# Описываем materialized view для фильтрации, агрегации, сортировки данных
@dp.materialized_view(
comment="A table summarizing counts of songs released by the artists who released the most songs each year."
)
def top_artists_by_year():
return (
spark.read.table("songs_prepared")
.filter(expr("year > 0"))
.groupBy("artist_name", "year")
.count().withColumnRenamed("count", "total_number_of_songs")
.sort(desc("total_number_of_songs"), desc("year"))
)
*) Как мы видим, при описании пайплайнов можно ссылаться на ранее определенные сущности.
Пайплайн можно написать и на языке SQL:
CREATE STREAMING TABLE orders AS SELECT * FROM STREAM orders_source; CREATE MATERIALIZED VIEW customers AS SELECT * FROM customers_source; CREATE MATERIALIZED VIEW customer_orders AS SELECT c.customer_id, o.order_number, c.state, date(timestamp(int(o.order_datetime))) order_date FROM orders o INNER JOIN customers c ON o.customer_id = c.customer_id; CREATE MATERIALIZED VIEW daily_orders_by_state AS SELECT state, order_date, count(*) order_count FROM customer_orders GROUP BY state, order_date;
Разбираемся дальше
Новым фреймворком вводятся следующие основные понятия:
Flow — поток данных (определяется запросом). Flow может быть типа streaming и типа batch.
DataSet — объект, являющийся результатом работы одного или нескольких потоков. Может выступать в качестве источника данных для потоков «ниже по течению».
Датасеты бывают следующих видов:
Streaming Table — финальная цель записи одного или нескольких streaming потоков.
Materialized View — view, материализованный в таблицу. Результат работы одного batch потока.
Temporary View — служит для логической инкапсуляции (множественных) трансформаций внутри потока. Своего рода узкая часть воронки.
Pipeline — коллекция из одного или нескольких потоков (flows):

Также в рамках фреймворка существуют конфигурационные файлы:
#spark-pipeline.yaml
name: my_pipeline
libraries:
- glob:
include: transformations/**
catalog: my_catalog
database: my_db
configuration:
spark.sql.shuffle.partitions: "1000"
и команды запуска пайплайна на исполнение:
spark-pipelines run
и проверку синтаксических ошибок, аналитических ошибок, циклических зависимостей:
spark-pipelines dry-run
Ближайшее будущее
В ближайшем будущем разработчики обещают интеграцию нового фреймворка в планировщик Airflow (в виде нового оператора).
Так ли уж революционен новый фреймворк?
На мой взгляд, нет. Все существующие серьезные команды уже используют облегчающие работу со Spark библиотеки (как правило, самописные). Так что вся обвязка уже по сути есть, нужно только применить ее к изменившемуся прототипу.
Но, безусловно, это облегчит порог вхождения/использования Spark для небольших команд и/или новичков.
И, что немаловажно для Databricks, SDP делает Spark конкурентом dbt и SQL Mesh.
Ссылки
https://spark.apache.org/docs/latest/declarative-pipelines-programming-guide.html
https://docs.databricks.com/aws/en/getting-started/data-pipeline-get-started