Airflow笔记

Airflow笔记Airflow已逐渐成为最流行的任务调度框架,加上本身由Python语言编写,对比Azkaban灵活性,可配置性更高Airflow官网配置参数介绍 default_args={ ‘owner’:’Airflow’, ‘depends_on_past’:False, ’email’:[‘airflow@example.com’], ’email_on_failure’:False, ’email_on_retry’:False, ‘retries’:1, ‘ret

大家好,欢迎来到IT知识分享网。

Airflow 已逐渐成为最流行的任务调度框架,加上本身由 Python 语言编写,对比 Azkaban 灵活性,可配置性更高

Airflow官网

配置参数介绍

	default_args = { 
   
	'owner': 'Airflow',
	'depends_on_past': False,
	'email': ['airflow@example.com'],
	'email_on_failure': False,
	'email_on_retry': False,
	'retries': 1,
	'retry_delay': timedelta(minutes=5)
	}

	dag = DAG(
	'tutorial',
	default_args=default_args,
	start_date=datetime(2015, 12, 1),
	description='A simple tutorial DAG',
	schedule_interval='@daily',
	catchup=False)
  • schedule_interval
    任务调度周期
    • None 不需要周期调度
    • 使用预设的注解,缺点整时整点,不够灵活
    • cron 和 linux corntab 用法类似,很灵活
      在这里插入图片描述
  • start_date
    • DAG
      任务从什么时刻开始计时,到达设置的任务执行时间点会触发任务的执行
    • 控制台
      控制台上的start_date 和 dag里面的不是同一个,控制台上的是指本次任务在什么时间开始执行
  • execution_date
    本次任务执行的周期时间,比如我的任务是执行T-1的数据,那今天显示的 execution_date 就是昨天
  • backfill
    回填,如果任务执行有缺失周期,则会自己去回补开始时间到结束时间内的任务

组件介绍

  • OPERATOR

官方为我们提供了多种 operator

  • BashOperator
    这是我们比较常用的,我们的任务基本上通过 shell 脚本来执行
	also_run_this = BashOperator(
    task_id='also_run_this',
    bash_command='echo "run_id={ 
   { run_id }} | dag_run={ 
   { dag_run }}"',
    dag=dag,
)
  • DingDing Operator
    钉钉 Operator 这个看公司需求,支持发送报警短信和富文本消息,使用时配置好短信标题和内容,发送的手机号就可以
	text_msg_remind_none = DingdingOperator(
    task_id='text_msg_remind_none',
    dingding_conn_id='dingding_default',
    message_type='text',
    message='Airflow dingding text message remind none',
    at_mobiles=None,
    at_all=False
)
  • Python Operator
    python operator可以执行 python代码,也是比较常用的 operator
def print_context(ds, **kwargs):
   pprint(kwargs)
   print(ds)
   return 'Whatever you return gets printed in the logs'

# Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.
run_this = PythonOperator(
	task_id='print_the_context',
	provide_context=True,
	python_callable=print_context,
	op_kwargs={ 
   'random_base': float(i) / 10},
	dag=dag,
)

如示例中,python_callable 放 python 方法名
op_kwargs 或 op_args,分别传递字典参数和数组参数,很方便

  • 自定义Operator
    除了上述常用 opeartor ,官方还为我们提供了opeartor自定义扩展支持,前提是得会python语言
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message

过程也不难,都是套模板,主要逻辑都在 execute方法里面完成,调用与官方提供的opeartor无区别
另外这里也提到 Hook,配上官网原文解释

Hooks act as an interface to communicate with the external shared resources in a DAG. For example, multiple tasks in a DAG can require access to a MySQL database. Instead of creating a connection per task, you can retrieve a connection from the hook and utilize it. Hook also helps to avoid storing connection auth parameters in a DAG. See Managing Connections for how to create and manage connections.

大概意思是比如多个task都要操作一个mysql数据库时,正常情况下每个 task 都需要创建 数据库连接,如果使用 hooks 就可以多任务共用一个连接,另外hooks还可以保存授权等参数信息

  • scheduler

    Airflow 中的scheduler 特征其实并不明显,基本体现就在 DAG 中的 schedule_intervel中

  • ShortCircuitOperator

from airflow.operators.python_operator import ShortCircuitOperator, PythonOperator

def method2(id, name):
    """ 获取上个任务传递过来的参数 :param context: :return: """
    if '123' == id and '张三' == name:
        return True
    else:
        return False


# 分支操作流程 继承自 PythonOperator
# 相当于IF ,如果方法返回 True,则执行后面的流程,False,流程结束
task2 = ShortCircuitOperator(
    task_id='task2',
    python_callable=method2,
    provide_context=True,
    op_kwargs={ 
   'id': '123', 'name': '张三'},
    dag=dag)
  • Xcom 任务之间传参
from airflow.operators.bash_operator import BashOperator

def method1(id, **context):
    # 传递参数
    context['task_instance'].xcom_push(key='id', value=id)
    
# 传递参数,需要 provide_context=True
task1 = PythonOperator(
    task_id='task1',
    python_callable=method1,
    provide_context=True,
    op_kwargs={ 
   'id': '123'},
    dag=dag)

# 通过xcom_pull 获取上一个任务传递的参数
task3 = BashOperator(
    task_id='task3',
    bash_command="echo { 
   { id = task_instance.xcom_pull(key='id', task_ids='task1') }}",
    dag=dag
)

REST API介绍

Airflow 还有一个优势就是支持REST API,通过插件的形式安装部署重启;通过 REST API 可以传递参数来触发任务的执行

  • 下载插件
    wget https://github.com/teamclairvoyant/airflow-rest-api-plugin/archive/master.zip
  • 解压 zip 包,将plugin文件夹下的内容放入airflow/plugin/下,若不存在则新建,注意是 plugin 文件夹下的所有内容,不要去更改,不然可能会报错
  • 重启 airflow
  • 进入到管理界面
    在这里插入图片描述

使用介绍

  • 命令
    • airflow backfill -s START_DATE -e END_DATE dag_id
      回填任务
    • airflow clear dag_id -t task_regex -s START_DATE -d END_DATE
      清除任务
    • airflow test dag_id -t task_regex -s START_DATE -d END_DATE
      测试任务
    • airflow run dag_id -t task_regex -s START_DATE -d END_DATE
      将任务上传至控制台
    • airflow trigger_dag -e execution_date run_id
      手动触发任务
    • airflow dags trigger --conf '{"conf1": "value1"}' example_parametrized_dag
      传递参数
  • 控制台
    • DAG
      • 默认 off,需要开启才能执行和调度
    • Recent Tasks
      • 主要展示当前活跃任务的各个状态数,如果当前没有活跃任务,则展示最近运行过的任务状态,状态用9个环表示,从左至右一次是
        success,running,failed,upstream_failed,skiped,up_for_retry,up_for_reschedule,queued,无状态,scheduled
    • last_run
      • 最近一次任务执行的周期时间,如果是按T-1执行,此处应该显示是昨天
    • DAG runs
      • 显示执行的历史任务的各个状态数,共三个状态,一次是
        success,running,failed
    • links
      • triggle dag 跳转到triggle界面
      • tree view 以树状方式展现任务之间的关联关系和执行状态
      • graph view 以图的方式展现任务关联关系和状态
      • task duration 任务耗时图标
      • task retry 任务重试图标
      • landing times 不明白
      • gantt view 各个任务的执行时间段
      • code view 查看当前脚本的代码,是最新的
      • logs 任务的执行日志

实操问题

  • 调度周期
    我在使用的时候,设置好 start_date,schedule_intervel 之后,发现任务并没有按照自己的预期去执行,执行了也是比预期要滞后一个任务周期,我在网上搜索这个问题后发现这个问题很多人都遇到了,的确是 Airflow 的任务调度滞后一个周期,这个问题我解决方式也比较粗暴
start_date = "{% if dag_run.conf.start_date %} { 
   {dag_run.conf.start_date}} {% else %} { 
   {ds}} {% endif %}"
end_date = "{% if dag_run.conf.end_date %} { 
   {dag_run.conf.end_date}} {% else %} { 
   {tomorrow_ds}} {% endif %}"

如果外部传入参数则使用外部参数,否则,通过 {
{ds}}
{
{tomorrow_ds}}
来将滞后的周期提前一天,这样就可以正常执行昨天的任务数据

  • triggle传参
    刚开始triggle传参也是搞不清楚的一点,主要是这个参数格式,经过反复测试,triggle直接点则无参执行,也可以通过json的方式设置参数,但是 key必须和脚本里面的key保持一致;
    当然也可以通过 REST APItriggle,这个参数怎么传可以在demo里面试,demo会生成链接,参考一下就ok了

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/22004.html

(0)

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信