阿毛
It's me !
想你所想
如何获取Airflow内部变量并经过逻辑处理后传入BashOperator?

一、问题

一个DAG里的bash task任务,每天会多次执行,并每次执行传入shell脚本的参数相同,即每天重复执行。传入的参数为时间参数,具体是前一天的时间datetime。

如果仅保证任务每次顺利执行的话,可以在代码中取“当前时间”,然后经过处理传给BashOperator,但这样的实现对于重跑隔日的任务或以往的任务时就不行了,因为时间要求是任务执行周期的前一日,如4月10重跑4月5日的任务,传入的时间参数应该为4月4日,但如果代码取当前时间,则最终传入4月9,显然不符合要求。所以需要通过airflow context的内置变量来实现,如execution_date。所以有了下面最先写的(部分)代码

'''
任务同一天需要检测两次,但两次任务传入的时间参数都应是昨日(同一)时间,任务内容就是对昨日数据进行检查,相当于两次进行的任务内容一样。
如果当前为06:00任务(execution_date=01:00 UTC 前一日17:00),则脚本传参上一次任务执行传入的execution_date,即previous_execution_date_success
'''
utc_cron = "0 17,22 * * *"

sh_date = "{{ execution_date }}"
index = sh_date.find('T') if sh_date.find(' ') == -1 else sh_date.find(' ')
if sh_date[index + 1 : index + 6] == '17:00':
    sh_date = "{{ previous_execution_date_success }}"

check_cos_upload_logs = BashOperator(
    task_id="check_cos_upload_logs",
bash_command='/check_cos_upload_logs.sh ' + sh_date,
    dag=dag
)

上述代码大致含义:UTC时间,每天17:00和22:00执行任务。因为最终时间参数应该为昨日,首先明白execution_date为上一次任务执行时间!!如果当时任务时间17:00,那么execution_date为昨日22:00,满足。如果当时任务时间22:00,那么execution_date为当日17:00,不满足昨日时间。所以需要根据execution_date去判断,如果是22:00,则不处理,如果17:00,则说明当前任务是22:00执行的任务,此时需要去上上次任务执行时间,即昨日22:00,这里参数previous_execution_date_success

根据上面的需求,所以想着在BashOperator前,通过python逻辑处理下,于是有了上面的代码。但是发现上面代码的结果,最终airflow执行永远执行/check_cos_upload_logs.sh {{ execution_date }},我原本以为应该:
如果当前时间为“2021-04-19T22:00:00+00:00”,则execution_date=2021-04-19T17:00:00+00:00,那么/check_cos_upload_logs.sh 2021-04-18T22:00:00+00:00
如果当前时间为“2021-04-19T17:00:00+00:00”,则execution_date=2021-04-18T22:00:00+00:00,那么/check_cos_upload_logs.sh 2021-04-18T22:00:00+00:00
但实际代码并不是这样的处理!

二、原因分析

其实,一开始就出现问题,sh_date = "{{ execution_date }}",我希望将内置变量execution_date取出赋值给sh_date,但实际并不会,是将{{ execution_date }}这样的字符串赋给了sh_date,这种双花括号的语法为Jinja Template,airflow只会在operator中解析该模板语言,在其他地方(operator的param参数也不会解析)并不会解析,所以上面代码中紧接着的逻辑判断处理,都是对{{ execution_date }}这个字符串的处理!导致bash_command一直为/check_cos_upload_logs.sh {{ execution_date }}

三、解决方案

从airflow支持的策略中了解分析

可以得到如下两个方向

1、在代码解释执行中进行时间逻辑的处理,需要在代码中能够取到airflow context 内部变量。

目前仅PythonOperator可以操作context变量,代码如下

# 一个函数处理时间,PythonOperator获取execution_date,执行该函数,将函数返回值通过xcom传递给后续bash task
def get_sh_date_method(**context):
    """
    任务同一天需要检测两次,但两次任务传入的时间参数都应是昨日(同一)时间,任务内容就是对昨日数据进行检查,相当于两次进行的任务内容一样。
    如果当前为06:00任务(execution_date=01:00 UTC 前一日17:00),则脚本传参上一次任务执行传入的execution_date,即previous_execution_date_success
    """
    sh_date = context['execution_date']
    if sh_date.hour == 17:
        # datetime.datetime
        return str(context['prev_execution_date_success'])
    return sh_date.to_datetime_string()


get_sh_date = PythonOperator(
    task_id='get_sh_date',
    python_callable=get_sh_date_method,
    provide_context=True,
    dag=dag
)

PythonOperator在调用python_callable的时候,会将context作为参数传入,这样就可以在callable对应函数中,使用context获取内部变量。前提是,必须provide_context=True

函数内部可以直接操作入参context,以字典方式获取指定key的变量值。具体key参考airflow package的“airflow/models/taskinstance.py”

        return {
            'conf': conf,
            'dag': task.dag,
            'ds': ds,
            'next_ds': next_ds,
            'next_ds_nodash': next_ds_nodash,
            'prev_ds': prev_ds,
            'prev_ds_nodash': prev_ds_nodash,
            'ds_nodash': ds_nodash,
            'ts': ts,
            'ts_nodash': ts_nodash,
            'ts_nodash_with_tz': ts_nodash_with_tz,
            'yesterday_ds': yesterday_ds,
            'yesterday_ds_nodash': yesterday_ds_nodash,
            'tomorrow_ds': tomorrow_ds,
            'tomorrow_ds_nodash': tomorrow_ds_nodash,
            'END_DATE': ds,
            'end_date': ds,
            'dag_run': dag_run,
            'run_id': run_id,
            'execution_date': pendulum.instance(self.execution_date),
            'prev_execution_date': prev_execution_date,
            'prev_execution_date_success': lazy_object_proxy.Proxy(
                lambda: self.previous_execution_date_success),
            'prev_start_date_success': lazy_object_proxy.Proxy(lambda: self.previous_start_date_success),
            'next_execution_date': next_execution_date,
            'latest_date': ds,
            'macros': macros,
            'params': params,
            'tables': tables,
            'task': task,
            'task_instance': self,
            'ti': self,
            'task_instance_key_str': ti_key_str,
            'test_mode': self.test_mode,
            'var': {
                'value': VariableAccessor(),
                'json': VariableJsonAccessor()
            },
            'inlets': task.inlets,
            'outlets': task.outlets,
        }

上面罗列的就是当前context支持内部变量key。context['execution_date']即可获取execution_date,和Jinja模板{{ execution_date }}等同。

注意:

  • context[‘execution_date’]的结果是pendulum.datetime的类型,而context[‘prev_execution_date_success’]的结果是datetime.datetime的类型!
  • context[‘prev_execution_date_success’]对应TaskInstance中的previous_execution_date_success

最后就可以在该函数内进行我的时间处理逻辑了。

接下来会有一个新的问题,如果将PythonOperator处理后的结果给到BashOperator使用呢?这里就要通过xcom的机制了。

xom就是为了解决task之间传递参数出现的。xcom的原理其实就是通过一个DB来实现写入K-V和读取K-V。该K-V关联了DAG和task。当PythonOperator执行了一个有返回值的call_back函数时,就会xcom写入(push)一个K-V,如下

https://file.blog.humh.cn/2021/04/d2b5ca33bd970f64a6301fa75ae2eb22.png

K就是return_value,V就是我函数的返回值。
然后在BashOperator中就可以通过Jinja模板的形式,以xom.pull获取K-V。如下:

check_kodo_upload_logs = BashOperator(task_id="check_kodo_upload_logs", bash_command="/check_kodo_upload_logs.sh " + '{{ ti.xcom_pull(task_ids="get_sh_date") }}', dag=dag)

当然上面PythonOperator中的call_back对应函数,可以无返回值,在函数体中通过xcom.push显式插入一个K-V也是可以的。

还有你可能注意到了,xcom中的数据在随着任务不断执行,在累积。这样可能随时间推移,占用空间。xcom支持你在每次DAG运行成功后,清除对应xom的数据!

具体关于xcom以及push和pull内容,可以参考这里https://kiwijia.work/2020/04/06/xcom/

2、把时间逻辑处理过程放在Operator执行中处理,并不在代码解释中执行。

因为Operator本身对Jinja的支持,而Jinja语言支持一定的逻辑性编写和时间函数。因为我觉得模板语言写起来麻烦,没实践,具体可以自行度娘Jinja的内容。可参考:http://docs.jinkan.org/docs/jinja2/templates.html

四、补充

1、airflow operator env参数,bash_command可以$取参。

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/bash/index.html?highlight=airflow%20operators%20bash#module-airflow.operators.bash

bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "here is the message: '$message'"',
    env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
)

2、airflow UI上添加Variables,代码中可以Variables.get() api获取。

https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html


本文参考

首页      code      如何获取Airflow内部变量并经过逻辑处理后传入BashOperator?

发表评论

textsms
account_circle
email

想你所想

如何获取Airflow内部变量并经过逻辑处理后传入BashOperator?
一、问题 一个DAG里的bash task任务,每天会多次执行,并每次执行传入shell脚本的参数相同,即每天重复执行。传入的参数为时间参数,具体是前一天的时间datetime。 如果仅保证任务…
扫描二维码继续阅读
2021-04-21