大家好,欢迎来到IT知识分享网。
Airflow 已逐渐成为最流行的任务调度框架,加上本身由 Python 语言编写,对比 Azkaban 灵活性,可配置性更高
配置参数介绍
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'tutorial',
default_args=default_args,
start_date=datetime(2015, 12, 1),
description='A simple tutorial DAG',
schedule_interval='@daily',
catchup=False)
- schedule_interval
任务调度周期- None 不需要周期调度
- 使用预设的注解,缺点整时整点,不够灵活
- cron 和 linux corntab 用法类似,很灵活
- start_date
- DAG
任务从什么时刻开始计时,到达设置的任务执行时间点会触发任务的执行 - 控制台
控制台上的start_date 和 dag里面的不是同一个,控制台上的是指本次任务在什么时间开始执行
- DAG
- execution_date
本次任务执行的周期时间,比如我的任务是执行T-1的数据,那今天显示的 execution_date 就是昨天 - backfill
回填,如果任务执行有缺失周期,则会自己去回补开始时间到结束时间内的任务
组件介绍
官方为我们提供了多种 operator
- BashOperator
这是我们比较常用的,我们的任务基本上通过 shell 脚本来执行
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={
{ run_id }} | dag_run={
{ dag_run }}"',
dag=dag,
)
- DingDing Operator
钉钉 Operator 这个看公司需求,支持发送报警短信和富文本消息,使用时配置好短信标题和内容,发送的手机号就可以
text_msg_remind_none = DingdingOperator(
task_id='text_msg_remind_none',
dingding_conn_id='dingding_default',
message_type='text',
message='Airflow dingding text message remind none',
at_mobiles=None,
at_all=False
)
- Python Operator
python operator可以执行 python代码,也是比较常用的 operator
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
# Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
op_kwargs={
'random_base': float(i) / 10},
dag=dag,
)
如示例中,python_callable 放 python 方法名
op_kwargs 或 op_args,分别传递字典参数和数组参数,很方便
- 自定义Operator
除了上述常用 opeartor ,官方还为我们提供了opeartor自定义扩展支持,前提是得会python语言
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class HelloOperator(BaseOperator):
@apply_defaults
def __init__(
self,
name: str,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
def execute(self, context):
message = "Hello {}".format(self.name)
print(message)
return message
过程也不难,都是套模板,主要逻辑都在 execute
方法里面完成,调用与官方提供的opeartor
无区别
另外这里也提到 Hook,配上官网原文解释
Hooks act as an interface to communicate with the external shared resources in a DAG. For example, multiple tasks in a DAG can require access to a MySQL database. Instead of creating a connection per task, you can retrieve a connection from the hook and utilize it. Hook also helps to avoid storing connection auth parameters in a DAG. See Managing Connections for how to create and manage connections.
大概意思是比如多个task
都要操作一个mysql
数据库时,正常情况下每个 task
都需要创建 数据库连接,如果使用 hooks
就可以多任务共用一个连接,另外hooks
还可以保存授权等参数信息
from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator
def method2(id, name):
""" 获取上个任务传递过来的参数 :param context: :return: """
if '123' == id and '张三' == name:
return True
else:
return False
# 分支操作流程 继承自 PythonOperator
# 相当于IF ,如果方法返回 True,则执行后面的流程,False,流程结束
task2 = ShortCircuitOperator(
task_id='task2',
python_callable=method2,
provide_context=True,
op_kwargs={
'id': '123', 'name': '张三'},
dag=dag)
- Xcom 任务之间传参
from airflow.operators.bash_operator import BashOperator
def method1(id, **context):
# 传递参数
context['task_instance'].xcom_push(key='id', value=id)
# 传递参数,需要 provide_context=True
task1 = PythonOperator(
task_id='task1',
python_callable=method1,
provide_context=True,
op_kwargs={
'id': '123'},
dag=dag)
# 通过xcom_pull 获取上一个任务传递的参数
task3 = BashOperator(
task_id='task3',
bash_command="echo {
{ id = task_instance.xcom_pull(key='id', task_ids='task1') }}",
dag=dag
)
REST API介绍
Airflow
还有一个优势就是支持REST API
,通过插件的形式安装部署重启;通过 REST API
可以传递参数来触发任务的执行
- 下载插件
wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/master.zip
- 解压 zip 包,将plugin文件夹下的内容放入airflow/plugin/下,若不存在则新建,注意是 plugin 文件夹下的所有内容,不要去更改,不然可能会报错
- 重启
airflow
- 进入到管理界面
使用介绍
- 命令
airflow backfill -s START_DATE -e END_DATE dag_id
回填任务airflow clear dag_id -t task_regex -s START_DATE -d END_DATE
清除任务airflow test dag_id -t task_regex -s START_DATE -d END_DATE
测试任务airflow run dag_id -t task_regex -s START_DATE -d END_DATE
将任务上传至控制台airflow trigger_dag -e execution_date run_id
手动触发任务airflow dags trigger --conf '{"conf1": "value1"}' example_parametrized_dag
传递参数
- 控制台
- DAG
- 默认
off
,需要开启才能执行和调度
- 默认
- Recent Tasks
- 主要展示当前活跃任务的各个状态数,如果当前没有活跃任务,则展示最近运行过的任务状态,状态用9个环表示,从左至右一次是
success
,running
,failed
,upstream_failed
,skiped
,up_for_retry
,up_for_reschedule
,queued
,无状态
,scheduled
- 主要展示当前活跃任务的各个状态数,如果当前没有活跃任务,则展示最近运行过的任务状态,状态用9个环表示,从左至右一次是
- last_run
- 最近一次任务执行的周期时间,如果是按T-1执行,此处应该显示是昨天
- DAG runs
- 显示执行的历史任务的各个状态数,共三个状态,一次是
success
,running
,failed
- 显示执行的历史任务的各个状态数,共三个状态,一次是
- links
triggle dag 跳转到triggle界面
- tree view 以树状方式展现任务之间的关联关系和执行状态
- graph view 以图的方式展现任务关联关系和状态
- task duration 任务耗时图标
- task retry 任务重试图标
- landing times 不明白
- gantt view 各个任务的执行时间段
- code view 查看当前脚本的代码,是最新的
- logs 任务的执行日志
- DAG
实操问题
- 调度周期
我在使用的时候,设置好 start_date,schedule_intervel 之后,发现任务并没有按照自己的预期去执行,执行了也是比预期要滞后一个任务周期,我在网上搜索这个问题后发现这个问题很多人都遇到了,的确是 Airflow 的任务调度滞后一个周期,这个问题我解决方式也比较粗暴
start_date = "{% if dag_run.conf.start_date %} {
{dag_run.conf.start_date}} {% else %} {
{ds}} {% endif %}"
end_date = "{% if dag_run.conf.end_date %} {
{dag_run.conf.end_date}} {% else %} {
{tomorrow_ds}} {% endif %}"
如果外部传入参数则使用外部参数,否则,通过 {
和
{ds}}{
来将滞后的周期提前一天,这样就可以正常执行昨天的任务数据
{tomorrow_ds}}
- triggle传参
刚开始triggle
传参也是搞不清楚的一点,主要是这个参数格式,经过反复测试,triggle
直接点则无参执行,也可以通过json的方式设置参数,但是key
必须和脚本里面的key
保持一致;
当然也可以通过REST API
来triggle
,这个参数怎么传可以在demo
里面试,demo
会生成链接,参考一下就ok了
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/22004.html