掌握聚合最新动态了解行业最新趋势
API接口,开发服务,免费咨询服务

使用 Airflow 替代你的 crontab

Airflow 是什么

airflow 是 Airbnb 开发的用于工作流管理的开源项目,现在 Apache 下做孵化,地址是 https://github.com/apache/incubator-airflow

Airflow 解决什么问题

Airflow 主要解决的问题可以参考 Airbnb 官方的博客: airflow-a-workflow-management-platform,简单来说就是管理和调度各种离线定时 Job ,可以替代 crontab。

当 cron job 规模达到数百上千时,其对人的要求将会非常高的,如果你的团队经历过这样的事情,应该能体会其中痛苦,所以使用类似 airflow 这样的工具代替 cron 来做定时任务将会极大提高工作效率。

开始使用 airflow 之前需要知道和准备的

Airflow 在 pip 上已经更名为 apache-airflow,下载最新版请使用后者 pip install apache-airflow。

Airflow 1.8 版本依赖的是 MySQL 5.6 以上,5.7 以下报 1071, u'Specified key was too long; max key length is 767 bytes,如果你使用 MySQL 作为你的 airflow backend 请升级你的 MySQL 到最新版。

MySQL 5.6 升级到 5.7 在使用 airflow 时会报 1146, u"Table 'performance_schema.session_variables' doesn't exist",执行 mysql_upgrade -u root -p --force 解决。

Airflow 的 mysql driver 使用的是 mysqlclient mysql://root:@127.0.0.1/sqlalchemy_lab?charset=utf8,如果使用其他 driver 将报 syntax error。

基础概念

Airflow 中最基本的两个概念是:DAG 和 task。DAG 的全称是 Directed Acyclic Graph 是所有你想执行的任务的集合,在这个集合中你定义了他们的依赖关系,一个 DAG 是指一个 DAG object,一个 DAG object 可以在 Python 脚本中配置完成。

比如一个简单的的 DAG 包含三个 task:A、B、C,A 执行成功之后 B 才能执行,C 不依赖 A 和 B 即可执行。在这个简单的 DAG 中 A B C 可以是任何你想要执行的任务。

DAG 的定义使用 Python 完成的,其实就是一个 Python 文件,存放在 DAG 目录,Airflow 会动态的从这个目录构建 DAG object,每个 DAG object 代表了一个 workflow,每个 workflow 都可以包含任意个 task。

安装和使用

Airflow 是基于 Python 构建的,可以很容易用 pip 安装使用,pip install apache-airflow,默认情况下 airflow 会在 ~/airflow 目录存放相关配置。

Airflow 提供了一些列命令来完成 airflow 的初始化工作来和它的正确使用。

# 在 airflow 目录初始化数据库和 airflow 配置
airflow initdb
# 启动 airflow web
airflow webserver
# 开始调度
airflow scheduler

更详细的信息请参考文档 http://airflow.incubator.apache.org/

第一个 DAG

DAG 的配置用 Python 完成像这样:

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@airflow.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1) # t2 依赖 t1
t3.set_upstream(t1)

DAG 脚本的目的只是定义 DAG 的配置,并不包含任何的数据处理,在这里 operator 就是 task。

DAG 的实例化

一个 DAG 脚本是由 DAG object 的实例化和对应的 operator 组成的,除此之外我们还可以定义默认的参数提供给每个任务。

DAG 对象实例化可以根据我们的需要提供对应的初始化参数,实例化 DAG 对象需要提供唯一的 dag_id:

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))

Task 的实例化

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

task 对象的定义的就是 operator 的实例化,operator 有 task_id,用来区分任务,可以按照需要定制 bash_command,也可以传递参数等。

Task 的依赖

Task 之间是能相互建立依赖的,形如:

t2.set_upstream(t1)

# This means that t2 will depend on t1
# running successfully to run
# It is equivalent to
# t1.set_downstream(t2)

t3.set_upstream(t1)

# all of this is equivalent to
# dag.set_dependency('print_date', 'sleep')
# dag.set_dependency('print_date', 'templated')

Airflow 会自动检测环形依赖以防止 task 无法工作的情况出现,更复杂的情况请参考文档。

执行和测试

和 airflow.cfg 同级目录下建立 dag 目录,用来存放第一个 DAG 脚本,然后执行 python tutorial.py ,如果没有报错说明 tutorial 建立成功了。

Airflow 的命令行

Airflow 提供了一些列的命令行用来查看 DAG 和 task

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

测试任务的执行

执行任务很简单,指定 DAG 并去指定 task 和执行的日期

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

test 命令会执行任务并且输出到控制台,不会把任务的执行状态进行持久化

执行任务和并记录状态

执行任务在 Airflow 中称之为 backfill,以 backfill 执行会真正开始追踪任务的执行状态和依赖,并且会记录日志

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

更多关于 DAG 和 operator

DAG 的 scope

Airflow 会默认加载任意它能导入到饿 DAG object,这就意味着只要是全局的 DAG object 都可以被导入,但是有时候为了让 DAG 不被导入,比如 SubDagOperator 就可以使用 local 的作用域。

dag_1 = DAG('this_dag_will_be_discovered')

def my_function()
    dag_2 = DAG('but_this_dag_will_not')

my_function()

DAG 可以指定默认的参数

DAG 的默认参数会应用到所有的 operator 中。

default_args=dict(
    start_date=datetime(2016, 1, 1),
    owner='Airflow')

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

扩展性极强的 operator

Airflow operator 很容易扩展,这也是 airflow 几乎支持任何形式 task 重要原因。虽然 Airflow 支持不同的 task 可以传输数据,但是如果你的两个 task 之间确实需要共享数据,最好的办法是把他们写在一起。

参考资料

wecatch.jpg

原文来自:wecatch

声明:所有来源为“聚合数据”的内容信息,未经本网许可,不得转载!如对内容有异议或投诉,请与我们联系。邮箱:marketing@think-land.com

  • 营运车判定查询

    输入车牌号码或车架号,判定是否属于营运车辆。

    输入车牌号码或车架号,判定是否属于营运车辆。

  • 名下车辆数量查询

    根据身份证号码/统一社会信用代码查询名下车辆数量。

    根据身份证号码/统一社会信用代码查询名下车辆数量。

  • 车辆理赔情况查询

    根据身份证号码/社会统一信用代码/车架号/车牌号,查询车辆是否有理赔情况。

    根据身份证号码/社会统一信用代码/车架号/车牌号,查询车辆是否有理赔情况。

  • 车辆过户次数查询

    根据身份证号码/社会统一信用代码/车牌号/车架号,查询车辆的过户次数信息。

    根据身份证号码/社会统一信用代码/车牌号/车架号,查询车辆的过户次数信息。

  • 风险人员分值

    根据姓名和身份证查询风险人员分值。

    根据姓名和身份证查询风险人员分值。

0512-88869195
数 据 驱 动 未 来
Data Drives The Future