您现在的位置是:主页 > news > 在线设计平台崭露头角/朝阳网站seo
在线设计平台崭露头角/朝阳网站seo
admin2025/5/1 20:47:03【news】
简介在线设计平台崭露头角,朝阳网站seo,seo是什么意思中文,苏州高端网站设计制作环境 CentOS Linux release 7.5.1804Python 3.6.4/2.7.14简介 Airflow 是 Airbnb 开源的一个用 Python 编写的工作流管理平台,自带 web UI 和调度,目前在Apache下做孵化。 Airflow 中有两个基本概念,DAG和task。 DAG是多个task的集合&#…
环境
CentOS Linux release 7.5.1804
Python 3.6.4/2.7.14
简介
Airflow 是 Airbnb 开源的一个用 Python 编写的工作流管理平台,自带 web UI 和调度,目前在Apache下做孵化。
Airflow 中有两个基本概念,DAG和task。 DAG是多个task的集合,定义在一个Python文件中,包含了task之间的依赖关系,如task A在task B之后执行,task C可以单独执行等等。
安装并运行
# 默认目录在~/airflow,也可以使用以下命令来指定目录
export AIRFLOW_HOME=~/airflowpip install apache-airflow# 初始化数据库
airflow initdb# 启动web服务,默认端口为8080,也可以通过`-p`来指定
airflow webserver -p 8080# 启动 scheduler
airflow scheduler
定义第一个DAG
在$AIRFLOW_HOME
目录下新建dags
文件夹,后面的所有dag文件都要存储在这个目录。
新建dag文件hello_world.py
,语句含义见注释
# coding: utf-8from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta# 定义默认参数
default_args = {'owner': 'airflow', # 拥有者名称'start_date': datetime(2018, 6, 6, 20, 00), # 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期'email': ['shelmingsong@gmail.com'], # 接收通知的email列表'email_on_failure': True, # 是否在任务执行失败时接收邮件'email_on_retry': True, # 是否在任务重试时接收邮件'retries': 3, # 失败重试次数'retry_delay': timedelta(seconds=5) # 失败重试间隔
}# 定义DAG
dag = DAG(dag_id='hello_world', # dag_iddefault_args=default_args, # 指定默认参数# schedule_interval="00, *, *, *, *" # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行schedule_interval=timedelta(minutes=1) # 执行周期,表示每分钟执行一次
)# 定义要执行的Python函数1
def hello_world_1():current_time = str(datetime.today())with open('/root/tmp/hello_world_1.txt', 'a') as f:f.write('%s\n' % current_time)assert 1 == 1 # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常# 定义要执行的Python函数2
def hello_world_2():current_time = str(datetime.today())with open('/root/tmp/hello_world_2.txt', 'a') as f:f.write('%s\n' % current_time)# 定义要执行的Python函数3
def hello_world_3():current_time = str(datetime.today())with open('/root/tmp/hello_world_3.txt', 'a') as f:f.write('%s\n' % current_time)# 定义要执行的task 1
t1 = PythonOperator(task_id='hello_world_1', # task_idpython_callable=hello_world_1, # 指定要执行的函数dag=dag, # 指定归属的dagretries=2, # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
)# 定义要执行的task 2
t2 = PythonOperator(task_id='hello_world_2', # task_idpython_callable=hello_world_2, # 指定要执行的函数dag=dag, # 指定归属的dag
)# 定义要执行的task 3
t3 = PythonOperator(task_id='hello_world_3', # task_idpython_callable=hello_world_3, # 指定要执行的函数dag=dag, # 指定归属的dag
)t2.set_upstream(t1)
# 表示t2这个任务只有在t1这个任务执行成功时才执行,
# 等价于 t1.set_downstream(t2)
# 同时等价于 dag.set_dependency('hello_world_1', 'hello_world_2')t3.set_upstream(t1) # 同理
写完后执行它检查是否有错误,如果命令行没有报错,就表示没问题。
python $AIRFLOW_HOME/dags/hello_world.py
通过以下命令查看生效的dags
[root@localhost dags]# airflow list_dags
[2018-06-06 21:03:25,808] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-06-06 21:03:25,877] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hello_world
查看hello_world
这个dag下面的tasks
[root@localhost dags]# airflow list_tasks hello_world
[2018-06-06 21:04:45,736] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-06-06 21:04:45,805] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
hello_world_1
hello_world_2
hello_world_3
查看hello_world
这个dag下面tasks的层级关系
[root@localhost dags]# airflow list_tasks hello_world --tree
[2018-06-06 21:05:42,956] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-06-06 21:05:43,020] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
<Task(PythonOperator): hello_world_2><Task(PythonOperator): hello_world_1>
<Task(PythonOperator): hello_world_3><Task(PythonOperator): hello_world_1>
如果按照以上步骤启动了schedule
,则DAG已经开始定时执行了,我们设置了每分钟执行一次,可以访问your_domain:8080
来查看任务的执行情况。 也可以查看/root/tmp/hello_world_1.txt
、/root/tmp/hello_world_2.txt
、/root/tmp/hello_world_3.txt
文件中的内容来检查任务是否执行成功。
执行失败时email通知
如果需要在任务执行失败(执行过程中有异常抛出)的时候邮件通知,除了在DAG文件中指定接收email列表外,还需要在配置文件中指定发送邮箱的信息,打开配置文件$AIRFLOW_HOME/airflow.cfg
,修改以下配置项,修改完需要重启webserver和schedule
smtp_host = smtp.163.com # smtp邮箱地址
smtp_starttls = True # 是否tls加密
smtp_mail_from = demo@163.com # 发件人邮箱地址,需开通smtp服务
smtp_ssl = False # 是否ssl加密
smtp_port = 25 # smtp端口号
使用位位移来指定执行顺序
以下四行的作用是相同的
op1 >> op2
op1.set_downstream(op2)op2 << op1
op2.set_upstream(op1)
也可以连续使用位位移
op1 >> op2 >> op3 << op4
以上等价于
op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)
使用变量(Variables
)
变量的value可以在UI界面的Admin > Variables
里面进行增删改查
可以在代码中这样使用变量
from airflow.models import Variable
foo = Variable.get("foo", default_var='a') # 设置当获取不到时使用的默认值
bar = Variable.get("bar", deserialize_json=True) # 对json数据进行反序列化
更多
Airflow doc
转载:https://zhuanlan.zhihu.com/p/37889267