Airlofw源码精读 一

Airlofw源码精读

Airlofw概述

Scalable 可伸缩性

Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

  • Airflow有模块化架构,并使用消息队列来编排任意数量的worker,可以扩展规模。

Dynamic 动态性

Airflow pipelines are defined in Python, allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.

  • Airflow pipelines用python定义,允许动态管道生成。这允许编写动态实例化管道的代码。

Extensible 可扩展性

Easily define your own operators and extend libraries to fit the level of abstraction that suits your environment.

  • 简单自定义operators和扩展库以适用自己的环境和抽象级别

Elegant 优雅的

Airflow pipelines are lean and explicit. Parametrization is built into its core using the powerful Jinja templating engine.

  • Airflowpipelines简洁而明确,参数化使用强大的Jinja模板引擎内置在其核心中。

Features

Pure Python 纯python

No more command-line or XML black-magic! Use standard Python features to create your workflows, including date time formats for scheduling and loops to dynamically generate tasks. This allows you to maintain full flexibility when building your workflows.

  • 不再有命令行或XML黑魔法!使用标准的Python功能来创建工作流,包括用于调度的日期时间格式和动态生成任务的循环。这允许您在构建工作流时保持完全的灵活性。

Useful UI 好用的用户界面

Monitor, schedule and manage your workflows via a robust and modern web application. No need to learn old, cron-like interfaces. You always have full insight into the status and logs of completed and ongoing tasks.

  • 监控,计划和管理您的工作流通过一个强大的和现代的web应用程序。不需要学习古老的、类似cron的界面。您总是能够全面了解已完成和正在进行的任务的状态和日志。

Robust Integrations 强大的集成

Airflow provides many plug-and-play operators that are ready to execute your tasks on Google Cloud Platform, Amazon Web Services, Microsoft Azure and many other third-party services. This makes Airflow easy to apply to current infrastructure and extend to next-gen technologies.

  • Airflow提供了许多即插即用的operators,可以在谷歌云平台、亚马逊网络服务、微软Azure和许多其他第三方服务上执行您的任务。这使得Airflow很容易应用于当前的基础设施并扩展到下一代技术。

Easy to Use 易于使用

Anyone with Python knowledge can deploy a workflow. Apache Airflow does not limit the scope of your pipelines; you can use it to build ML models, transfer data, manage your infrastructure, and more.

  • 任何具有Python知识的人都可以部署工作流。Apache Airflow不会限制管道的范围;你可以使用它来构建ML模型、传输数据、管理基础设施等。

Open Source 开源

Wherever you want to share your improvement you can do this by opening a PR. It’s simple as that, no barriers, no prolonged procedures. Airflow has many active users who willingly share their experiences. Have any questions? Check out our buzzing slack.

  • 无论你想在哪里分享你的改进,你都可以通过打开PR来做。就这么简单,没有障碍,没有漫长的程序。Airflow有许多活跃用户,他们愿意分享他们的经验。有问题吗?通过buzzing slack。

Installation 安装Airflow

版本&配置

Starting with Airflow 2.3.0, Airflow is tested with: 最小内存4GB

  • Python: 3.7, 3.8, 3.9, 3.10

  • Databases:

  • PostgreSQL: 11, 12, 13, 14, 15

  • MySQL: 5.7, 8

  • SQLite: 3.15.0+

  • MSSQL(Experimental): 2017, 2019

  • Kubernetes: 1.20.2, 1.21.1, 1.22.0, 1.23.0, 1.24.0

注意

  • 不支持MariaDB,MariaDB和mysql存在已知问题,社区不支持

  • mysql 5.x的版本无法或有限制运行多个调度器

  • airflow在兼容posix的操作系统上运行,应该只使用基于linux的发行版作为“生产”执行环境

安装依赖

# Ubuntu
sudo apt-get install -y --no-install-recommends \
        freetds-bin \
        krb5-user \
        ldap-utils \
        libffi6 \
        libsasl2-2 \
        libsasl2-modules \
        libssl1.1 \
        locales  \
        lsb-release \
        sasl2-bin \
        sqlite3 \
        unixodbc

Installing from Sources 从源码编译安装

校验包完整

PGP签名可以通过GPG和PGP两种方式验证

# 安装验完整性
gpg -i KEYS
pgpk -a KEYS
pgp -ka KEYS
# 校验binaries/sources的完整性
gpg --verify apache-airflow-********.asc apache-airflow-*********
pgpv apache-airflow-********.asc
pgp apache-airflow-********.asc
# sha512验证
shasum -a 512 apache-airflow--********  | diff - apache-airflow--********.sha512

运行结果

gpg --verify apache-airflow-2.5.1-source.tar.gz.asc apache-airflow-2.5.1-source.tar.gz
  gpg: Signature made Sat 11 Sep 12:49:54 2021 BST
  gpg:                using RSA key CDE15C6E4D3A8EC4ECF4BA4B6674E08AD7DE406F
  gpg:                issuer "kaxilnaik@apache.org"
  gpg: Good signature from "Kaxil Naik <kaxilnaik@apache.org>" [unknown]
  gpg:                 aka "Kaxil Naik <kaxilnaik@gmail.com>" [unknown]
  gpg: WARNING: The key's User ID is not certified with a trusted signature!
  gpg:          There is no indication that the signature belongs to the owner.
  Primary key fingerprint: CDE1 5C6E 4D3A 8EC4 ECF4  BA4B 6674 E08A D7DE 406F
  
  
  shasum -a 512 apache-airflow-2.5.1-source.tar.gz  | diff - apache-airflow-2.5.1-source.tar.gz.sha512
  • WARNING不必担心:发布管理器使用的大多数证书都是自签名的

校验PyPI releases

.whl包:用下面的脚本从PyPI本地下载包和签名以及SHA和文件来验证

#!/bin/bash
AIRFLOW_VERSION="2.5.1"
# 临时文件夹,验证后可以删除
airflow_download_dir="$(mktemp -d)"
pip download --no-deps "apache-airflow==${AIRFLOW_VERSION}" --dest "${airflow_download_dir}"
curl "https://downloads.apache.org/airflow/${AIRFLOW_VERSION}/apache_airflow-${AIRFLOW_VERSION}-py3-none-any.whl.asc" \
    -L -o "${airflow_download_dir}/apache_airflow-${AIRFLOW_VERSION}-py3-none-any.whl.asc"
curl "https://downloads.apache.org/airflow/${AIRFLOW_VERSION}/apache_airflow-${AIRFLOW_VERSION}-py3-none-any.whl.sha512" \
    -L -o "${airflow_download_dir}/apache_airflow-${AIRFLOW_VERSION}-py3-none-any.whl.sha512"
echo
echo "Please verify files downloaded to ${airflow_download_dir}"
ls -la "${airflow_download_dir}"
echo

Installation from PyPI

安装依赖工具

pypi安装airflow

pip install "apache-airflow[celery]==2.5.1" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-3.7.txt"

Constraints files 约束文件

airflow即是一个库,又是一个应用程序——矛盾

  • 库应该保持依赖项是打开的

  • 应用程序应该固定依赖项

  • 尽可能将依赖开放(在setup.cfg和setup.py中)

在分支中留有一组正在工作“known-to-be-working”的约束文件,再为每个发布版本创建一个标签,如constraints-2.5.1

https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt

constraints-no-providers约束文件: 只包含安装airflow core所需的约束,独立于providers。

https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-no-providers-${PYTHON_VERSION}.txt
# 最新
https://raw.githubusercontent.com/apache/airflow/constraints-latest/constraints-3.7.txt

Installing Airflow with extras and providers

AIRFLOW_VERSION=2.5.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow[async,postgres,google]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"


AIRFLOW_VERSION=2.5.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# 独立安装providers 仅在当前pip install命令期间安装
pip install "apache-airflow[postgres,google]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install "apache-airflow-providers-google==8.0.0"

# 不指定任何额外的providers
AIRFLOW_VERSION=2.5.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
# For example: 3.7
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-no-providers-${PYTHON_VERSION}.txt"
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.5.1/constraints-no-providers-3.7.txt
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

环境排查

# 确定path 路径
PATH=$PATH:~/.local/bin

# start airflow
python -m airflow

# Symbol not found: _Py_GetArgcArgv  在使用不正确的python版本
# 指定python版本样例
# Note: these instructions are for python3.7 but can be loosely modified for other versions
brew install python@3.7
virtualenv -p /usr/local/opt/python@3.7/Frameworks/Python.framework/Versions/3.7/bin/python3 .toy-venv
source .toy-venv/bin/activate
pip install apache-airflow
python
>>> import setproctitle
# Success!

Setting up the database&Upgrading 设置数据库&升级

版本升级

如果需要手动升级数据库

# 通常通过命令升级
airflow db upgrade
# 获得修复sql
airflow db upgrade -r "2.0.0:2.2.0" 
airflow db upgrade --revision-range "e959f08ac86c:142555e44c17"

# 手动修复sql
SHOW CREATE TABLE task_reschedule;
SHOW CREATE TABLE xcom;
SHOW CREATE TABLE task_fail;
SHOW CREATE TABLE rendered_task_instance_fields;
SHOW CREATE TABLE task_instance;
# 确认dag_id, run_id, task_id and key 是 utf8 or utf8mb3 
# 如存在冲突 修复 只在错误的encoding
# 删除外键
ALTER TABLE task_reschedule DROP FOREIGN KEY task_reschedule_ti_fkey;
ALTER TABLE xcom DROP FOREIGN KEY xcom_task_instance_fkey;
ALTER TABLE task_fail DROP FOREIGN KEY task_fail_ti_fkey;
ALTER TABLE rendered_task_instance_fields DROP FOREIGN KEY rtif_ti_fkey;

ALTER TABLE task_instance MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_reschedule MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE rendered_task_instance_fields MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE rendered_task_instance_fields MODIFY dag_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_fail MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_fail MODIFY dag_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE sla_miss MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE sla_miss MODIFY dag_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE task_map MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY dag_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE task_map MODIFY run_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

ALTER TABLE xcom MODIFY task_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY dag_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY run_id VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;
ALTER TABLE xcom MODIFY key VARCHAR(255) CHARACTER SET utf8mb3 COLLATE utf8mb3_bin;

# Here you have to copy the statements from SHOW CREATE TABLE output
# 修复外键 索引
ALTER TABLE <TABLE> ADD CONSTRAINT `<CONSTRAINT_NAME>` <CONSTRAINT>

Post-upgrade warnings

删除无用的表:warnings tables

# 查看dbshell命令
airflow db shell

# k8s pod 执行
kubectl exec -it <your-webserver-pod> python

from airflow.settings import Session

session = Session()
session.execute("DROP TABLE _airflow_moved__2_2__task_instance")
session.commit()

# 参考执行如下sql
SELECT * FROM <table>;
DROP TABLE <table>;

最佳实践

建议首先复制数据库并测试