博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Airflow 调度基础
阅读量:5309 次
发布时间:2019-06-14

本文共 8240 字,大约阅读时间需要 27 分钟。

1. Airflow

Airflow是一个调度、监控工作流的平台。用于将一个工作流制定为一组任务的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。

 

2. 安装

pip安装airflow

pip3 install apache-airflow

 

初始化db

airflow initdb

 

启动web server

airflow webserver -p 8081

 

启动scheduler

airflow scheduler

 

3. 例子

下面是一个基本的管道定义,接下来我们会对它们进行详细解释:

from airflow import DAG

from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
   
'owner': 'tang-airflow',
   
'depends_on_past': False,
   
'start_date': datetime(2019, 6, 23),
   
'email': ['402877015@qq.com'],
   
'email_on_failure': False,
   
'email_on_retry': False,
   
'retries': 1,
   
'retry_delay': timedelta(minutes=5),
   
# 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'first', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
   
task_id='print_date',
   
bash_command='date',
   
dag=dag)
t2 = BashOperator(
   
task_id='sleep',
   
bash_command='sleep 5',
   
retries=3,
   
dag=dag)
templated_command =
"""
    {% for i in range(5) %}
        echo "{
{ ds }}"
        echo "{
{ macros.ds_add(ds, 7)}}"
        echo "{
{ params.my_param }}"
    {% endfor %}
"""
t3 = BashOperator(
   
task_id='templated',
   
bash_command=templated_command,
   
params={
'my_param': 'Parameter I passed in'},
   
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)

 

它是一个DAG定义文件

一件必须要注意的一件事是:Airflow Python脚本仅仅是一个配置文件,以代码的方式指定了DAG的结构。而真正执行的任务会以不同的上下文执行,不是以这个脚本的上下文。

对于这个DAG定义文件来说,它们并不执行任何真正的数据处理,它也不是用于此用途。这个脚本的目的是:定义一个DAG对象。它需要很快地执行(秒级别,而不是分级别),因为scheduler会定期执行它,以反映出任何变化(如果有的话)

 

引入模块

一个Airflow pipeline 仅仅是一个Python脚本,用于定义一个Airflow DAG对象。首先我们需要import需要的库:

# DAG对象;我们需要它实例化一个DAG from airflow import DAG # Operators;我们需要它去做操作 from airflow.operators.bash_operator import BashOperator

 

默认参数

我们接下来会创建一个DAG以及一些tasks任务,并且可以显式地传递一组参数到每个task的构造器中(但是此操作会有些重复工作)。另外一种更好的方法是:我们可以定义一个默认参数的字典,在创建task时使用。

from datetime import datetime, timedelta  default_args = {
    'owner': 'tang-airflow',     'depends_on_past': False,     'start_date': datetime(2019, 6, 23),     'email': ['402877015@qq.com'],     'email_on_failure': False,     'email_on_retry': False,     'retries': 1,     'retry_delay': timedelta(minutes=5),     # 'queue': 'bash_queue',     # 'pool': 'backfill',     # 'priority_weight': 10,     # 'end_date': datetime(2016, 1, 1), }

 

以上仅是一组参数定义示范,在实际应用中可以有更多且多样的参数配置。

 

实例化一个DAG

我们需要一个DAG对象用于放置tasks。这里我们传递一个String定义dag_id,作为DAG的唯一标识符。我们也会将之前定义的参数字典传递给此方法,并定义调度DAG的间隔为1天(schedule_interval)。

dag = DAG('first', default_args=default_args, schedule_interval=timedelta(days=1))

 

Tasks

Task任务是在实例化operator对象时生成的。从operator实例化的对象称为constructor。第一个参数task_id作为task的唯一标志符。

t1 = BashOperator(      task_id='print_date',      bash_command='date',      dag=dag)  t2 = BashOperator(      task_id='sleep',      bash_command='sleep 5',      retries=3,      dag=dag)

 

这里我们使用的是BashOperator,执行bash命令,参数部分较为简单。在一个task中,使用的参数优先级为:

1.     显式传递的参数值

2.     default_args 字典中存在的参数值

3.     operator的默认值(如果有的话)

一个task必须包含的两个参数为:task_id以及owner,否则Airflow会抛出异常。

 

使用Jinja构建模版

JinjaPython设计的一种模板语言。Airflow使用Jinja模板语言,为pipeline编写者提供了一组内置的的参数与宏。同时,它也提供了hooks,让用户定义它们自己的参数、宏、以及模板。

提供的例子仅片面地介绍了在Airflow使用模板语言,不过提供这个例子的主要的目的有两个:1.让读者知道模板这个功能是存在的;2. 让读者了解双花括号的使用,以及最常见的模板变量: {

{ ds }} (今天的”data stamp”)

templated_command = """      {% for i in range(5) %}          echo "{
{ ds }}"         echo "{
{ macros.ds_add(ds, 7)}}"         echo "{
{ params.my_param }}"     {% endfor %} """

 

t3 = BashOperator(      task_id='templated',      bash_command=templated_command,      params={
'my_param': 'Parameter I passed in'},     dag=dag)

 

需要注意的是 templated_command 的代码逻辑包含在{% %} 块中,引用的参数为{

{ ds }}。调用的方法例如 {
{ macros.ds_add(ds, 7) }},并且在 {
{ params.my_param }} 中引用了一个用户定义的参数。

BashOperator 中的params hook,允许你传递一个参数字典、以及/或对象到你的模板中。这里需要仔细看一下传递参数时的对应映射关系。

文件也可以作为参数传递给bash_command,例如 bash_command=’templated_command.sh’,文件的地址为pipeline文件(这里是tutorial.py)所在文件夹的相对地址。这个功能对于很多场景是有用的,例如将脚本逻辑与pipeline代码分离、允许执行其他语言的代码文件、以及构建pipeline更多的灵活性等。也可以在DAG构造器调用中定义你的template_searchpath,指向任何目录地址。

使用同样的DAG构造器调用,也可以定义user_defined_macros,指定你自己的变量。例如,传递dict(foo=’bar’)到这个参数,可以让你在模板中使用{

{ foo }}。此外,指定user_defined_filters,可以注册自定义的过滤器。例如,传递dict(hello=lambda name: ‘Hello %s’ % name) 到这个变量,可以让你在模板中使用{
{ ‘world’ | hello }}。对于更多的用户自定义过滤器,可以阅读以下Jinja官方文档:

 

对于更多有关可在模板中使用的变量与宏的信息,可以参考以下文档:

 

设置依赖关系

现在我们有三个taskst1, t2 t3。它们之间并没有相互依赖关系。下面是几种可以用于定义它们之间依赖的方法:

t1.set_downstream(t2) # This means that t2 will depend on t1  # running successfully to run.  # It is equivalent to: t2.set_upstream(t1) # The bit shift operator can also be  # used to chain operations: t1 >> t2 # And the upstream dependency with the  # bit shift operator: t2 << t1 # Chaining multiple dependencies becomes  # concise with the bit shift operator: t1 >> t2 >> t3 # A list of tasks can also be set as  # dependencies. These operations  # all have the same effect: t1.set_downstream([t2, t3])  t1 >> [t2, t3]  [t2, t3] << t1

 

需要注意的是,在执行脚本时,如果Airflow发现在DAG中有回环、或是一个依赖被引用超过一次,会抛出异常。

 

4. 测试

我们将以上代码保存在文件tutorial.py中,保存位置为airflow.cfg文件中定义的DAGs目录。默认的DAGs目录地址为~/airflow/dags

# The folder where your airflow pipelines live, most likely a

# subfolder in a code repository

# This path must be absolute

dags_folder = /home/hadoop/airflow/dags

执行脚本:

python3 ~/airflow/dags/tutorial.py

 

命令行验证元数据

执行脚本后,我们执行几个命令进一步验证脚本:

# 打印出activeDAGs

> airflow list_dags

tutorial

 

# 打印 tutorial DAGtasks

> airflow list_tasks tutorial

print_date

sleep

templated

 

# 打印tutorial DAG tasks 的树状结构

> airflow list_tasks tutorial --tree

<Task(BashOperator): sleep>

    <Task(BashOperator): print_date>

<Task(BashOperator): templated>

    <Task(BashOperator): print_date>

 

测试

我们可以通过执行task实例进行测试,这里除了传入task外,还需要传入一个date(日期)。这里的date在执行上下文中是一个execution_date,模拟了scheduler在某个特定时间点(data + time)执行task

# command layout: command subcommand dag_id task_id date  # testing print_date

> airflow test tutorial print_date 2019-02-02

[2019-06-25 03:51:36,370] {bash_operator.py:90} INFO - Exporting the following env vars:

AIRFLOW_CTX_DAG_ID=tutorial

AIRFLOW_CTX_TASK_ID=print_date

AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00

[2019-06-25 03:51:36,370] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmpc9ntvif0/print_datehrv9r95p

[2019-06-25 03:51:36,370] {bash_operator.py:114} INFO - Running command: date

[2019-06-25 03:51:36,374] {bash_operator.py:123} INFO - Output:

[2019-06-25 03:51:36,376] {bash_operator.py:127} INFO - Tue 25 Jun 03:51:36 UTC 2019

[2019-06-25 03:51:36,376] {bash_operator.py:131} INFO - Command exited with return code 0

 

# testing sleep > airflow test tutorial sleep 2019-02-02
 

[2019-06-25 03:53:15,203] {bash_operator.py:90} INFO - Exporting the following env vars:

AIRFLOW_CTX_DAG_ID=tutorial

AIRFLOW_CTX_TASK_ID=sleep

AIRFLOW_CTX_EXECUTION_DATE=2019-02-02T00:00:00+00:00

[2019-06-25 03:53:15,203] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp175xwnf8/sleepdsa5lg3t

[2019-06-25 03:53:15,203] {bash_operator.py:114} INFO - Running command: sleep 5

[2019-06-25 03:53:15,207] {bash_operator.py:123} INFO - Output:

 

[2019-06-25 03:53:20,209] {bash_operator.py:131} INFO - Command exited with return code 0

 

# testing 模板

> airflow test tutorial templated 2019-02-02

...

[2019-06-25 05:00:21,412] {bash_operator.py:114} INFO - Running command:

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

    echo "Parameter I passed in"

 

    echo "2019-02-02"

    echo "2019-02-09"

echo "Parameter I passed in"

...

需要注意的是,airflow test 命令是在本地运行task实例,将输出打印到stdout,并没有依赖考虑,也没有与数据库沟通状态(running, success, failed, …)。此命令仅测试一个单task实例。

 

Backfill

从本地运行来看,未出现任何问题,现在我们运行一个backfillBackfill可以测试某个DAG在设定的日期区间的运行状况。它会考虑到task之间的依赖、写入日志文件、与数据库交互并记录状态信息。如果启动了一个webserver,则可以在webserver上跟踪它的进度。

需要注意的是,如果使用depends_on_past=True,则单个task实例的运行取决于它的上游task实例的成功运行。

在这个上下文中,时间区间是start_date,以及一个可选的end_date

# optional, start a web server in debug mode in the background  # airflow webserver --debug &  # start your backfill on a date range airflow backfill tutorial -s 2019-02-02 -e 2019-02-09

 

执行之后可在Web Server 界面跟踪它们的执行状态。

 

 

 

References:

转载于:https://www.cnblogs.com/zackstang/p/11082322.html

你可能感兴趣的文章
读《我是一只IT小小鸟》有感
查看>>
linux中系统管理指令
查看>>
JS常用各种正则表达式
查看>>
Java 定时任务
查看>>
二分查找的平均查找长度详解【转】
查看>>
读阿里巴巴Java开发手册v1.2.0之编程规约有感【架构篇】
查看>>
基于page的简单页面推送技术
查看>>
js 日期格式化函数(可自定义)
查看>>
git报错:failed to push some refs to 'git@github.com:JiangXiaoLiang1988/CustomerHandl
查看>>
Eureka高可用,节点均出现在unavailable-replicas下
查看>>
day 21 - 1 包,异常处理
查看>>
机器学习等知识--- map/reduce, python 读json数据。。。
查看>>
字符串编码
查看>>
预编译语句(Prepared Statements)介绍,以MySQL为例
查看>>
Noip2011提高组总结
查看>>
HDU 4416 Good Article Good sentence(后缀自动机)
查看>>
Java异常之try,catch,finally,throw,throws
查看>>
spring的配置文件详解
查看>>
Spring框架第一篇之Spring的第一个程序
查看>>
操作文件
查看>>