一、问题
一个DAG里的bash task任务,每天会多次执行,并每次执行传入shell脚本的参数相同,即每天重复执行。传入的参数为时间参数,具体是前一天的时间datetime。
如果仅保证任务每次顺利执行的话,可以在代码中取“当前时间”,然后经过处理传给BashOperator,但这样的实现对于重跑隔日的任务或以往的任务时就不行了,因为时间要求是任务执行周期的前一日,如4月10重跑4月5日的任务,传入的时间参数应该为4月4日,但如果代码取当前时间,则最终传入4月9,显然不符合要求。所以需要通过airflow context的内置变量来实现,如execution_date
。所以有了下面最先写的(部分)代码
''' 任务同一天需要检测两次,但两次任务传入的时间参数都应是昨日(同一)时间,任务内容就是对昨日数据进行检查,相当于两次进行的任务内容一样。 如果当前为06:00任务(execution_date=01:00 UTC 前一日17:00),则脚本传参上一次任务执行传入的execution_date,即previous_execution_date_success ''' utc_cron = "0 17,22 * * *" sh_date = "{{ execution_date }}" index = sh_date.find('T') if sh_date.find(' ') == -1 else sh_date.find(' ') if sh_date[index + 1 : index + 6] == '17:00': sh_date = "{{ previous_execution_date_success }}" check_cos_upload_logs = BashOperator( task_id="check_cos_upload_logs", bash_command='/check_cos_upload_logs.sh ' + sh_date, dag=dag )
上述代码大致含义:UTC时间,每天17:00和22:00执行任务。因为最终时间参数应该为昨日,首先明白execution_date
为上一次任务执行时间!!如果当时任务时间17:00,那么execution_date
为昨日22:00,满足。如果当时任务时间22:00,那么execution_date
为当日17:00,不满足昨日时间。所以需要根据execution_date
去判断,如果是22:00,则不处理,如果17:00,则说明当前任务是22:00执行的任务,此时需要去上上次任务执行时间,即昨日22:00,这里参数previous_execution_date_success
。
根据上面的需求,所以想着在BashOperator前,通过python逻辑处理下,于是有了上面的代码。但是发现上面代码的结果,最终airflow执行永远执行/check_cos_upload_logs.sh {{ execution_date }}
,我原本以为应该:
如果当前时间为“2021-04-19T22:00:00+00:00”,则execution_date=2021-04-19T17:00:00+00:00
,那么/check_cos_upload_logs.sh 2021-04-18T22:00:00+00:00
;
如果当前时间为“2021-04-19T17:00:00+00:00”,则execution_date=2021-04-18T22:00:00+00:00
,那么/check_cos_upload_logs.sh 2021-04-18T22:00:00+00:00
。
但实际代码并不是这样的处理!
二、原因分析
其实,一开始就出现问题,sh_date = "{{ execution_date }}"
,我希望将内置变量execution_date取出赋值给sh_date,但实际并不会,是将{{ execution_date }}这样的字符串赋给了sh_date,这种双花括号的语法为Jinja Template,暂发现airflow只在operator的command参数中会解析该模板语言,在其他地方(operator的param以及on_failure_callback参数也不会解析)并不会解析,所以上面代码中紧接着的逻辑判断处理,都是对{{ execution_date }}这个字符串的处理!导致bash_command一直为/check_cos_upload_logs.sh {{ execution_date }}
。
三、解决方案
从airflow支持的策略中了解分析
可以得到如下两个方向
1、在代码解释执行中进行时间逻辑的处理,需要在代码中能够取到airflow context 内部变量。
目前仅PythonOperator可以操作context变量,代码如下
# 一个函数处理时间,PythonOperator获取execution_date,执行该函数,将函数返回值通过xcom传递给后续bash task def get_sh_date_method(**context): """ 任务同一天需要检测两次,但两次任务传入的时间参数都应是昨日(同一)时间,任务内容就是对昨日数据进行检查,相当于两次进行的任务内容一样。 如果当前为06:00任务(execution_date=01:00 UTC 前一日17:00),则脚本传参上一次任务执行传入的execution_date,即previous_execution_date_success """ sh_date = context['execution_date'] if sh_date.hour == 17: # datetime.datetime return str(context['prev_execution_date_success']) return sh_date.to_datetime_string() get_sh_date = PythonOperator( task_id='get_sh_date', python_callable=get_sh_date_method, provide_context=True, dag=dag )
PythonOperator在调用python_callable
的时候,会将context作为参数传入,这样就可以在callable对应函数中,使用context获取内部变量。前提是,必须provide_context=True
。
函数内部可以直接操作入参context,以字典方式获取指定key的变量值。具体key参考airflow package的“airflow/models/taskinstance.py”
return { 'conf': conf, 'dag': task.dag, 'ds': ds, 'next_ds': next_ds, 'next_ds_nodash': next_ds_nodash, 'prev_ds': prev_ds, 'prev_ds_nodash': prev_ds_nodash, 'ds_nodash': ds_nodash, 'ts': ts, 'ts_nodash': ts_nodash, 'ts_nodash_with_tz': ts_nodash_with_tz, 'yesterday_ds': yesterday_ds, 'yesterday_ds_nodash': yesterday_ds_nodash, 'tomorrow_ds': tomorrow_ds, 'tomorrow_ds_nodash': tomorrow_ds_nodash, 'END_DATE': ds, 'end_date': ds, 'dag_run': dag_run, 'run_id': run_id, 'execution_date': pendulum.instance(self.execution_date), 'prev_execution_date': prev_execution_date, 'prev_execution_date_success': lazy_object_proxy.Proxy( lambda: self.previous_execution_date_success), 'prev_start_date_success': lazy_object_proxy.Proxy(lambda: self.previous_start_date_success), 'next_execution_date': next_execution_date, 'latest_date': ds, 'macros': macros, 'params': params, 'tables': tables, 'task': task, 'task_instance': self, 'ti': self, 'task_instance_key_str': ti_key_str, 'test_mode': self.test_mode, 'var': { 'value': VariableAccessor(), 'json': VariableJsonAccessor() }, 'inlets': task.inlets, 'outlets': task.outlets, }
上面罗列的就是当前context支持内部变量key。context['execution_date']
即可获取execution_date,和Jinja模板{{ execution_date }}
等同。
注意:
- context[‘execution_date’]的结果是pendulum.datetime的类型,而context[‘prev_execution_date_success’]的结果是datetime.datetime的类型!
- context[‘prev_execution_date_success’]对应TaskInstance中的previous_execution_date_success
最后就可以在该函数内进行我的时间处理逻辑了。
接下来会有一个新的问题,如果将PythonOperator处理后的结果给到BashOperator使用呢?这里就要通过xcom的机制了。
xom就是为了解决task之间传递参数出现的。xcom的原理其实就是通过一个DB来实现写入K-V和读取K-V。该K-V关联了DAG和task。当PythonOperator执行了一个有返回值的call_back函数时,就会xcom写入(push)一个K-V,如下
K就是return_value
,V就是我函数的返回值。
然后在BashOperator中就可以通过Jinja模板的形式,以xom.pull获取K-V。如下:
check_kodo_upload_logs = BashOperator(task_id="check_kodo_upload_logs", bash_command="/check_kodo_upload_logs.sh " + '{{ ti.xcom_pull(task_ids="get_sh_date") }}', dag=dag)
当然上面PythonOperator中的call_back对应函数,可以无返回值,在函数体中通过xcom.push
显式插入一个K-V也是可以的。
还有你可能注意到了,xcom中的数据在随着任务不断执行,在累积。这样可能随时间推移,占用空间。xcom支持你在每次DAG运行成功后,清除对应xom的数据!
具体关于xcom以及push和pull内容,可以参考这里https://kiwijia.work/2020/04/06/xcom/
2、把时间逻辑处理过程放在Operator执行中处理,并不在代码解释中执行。
因为Operator本身对Jinja的支持,而Jinja语言支持一定的逻辑性编写和时间函数。因为我觉得模板语言写起来麻烦,没实践,具体可以自行度娘Jinja的内容。可参考:http://docs.jinkan.org/docs/jinja2/templates.html
四、补充
1、airflow operator env参数,bash_command可以$取参。
bash_task = BashOperator( task_id="bash_task", bash_command='echo "here is the message: '$message'"', env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, )
2、airflow UI上添加Variables,代码中可以Variables.get() api获取。
https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html
本文参考
- “气流中的execution_date:需要以变量形式访问”:https://www.itranslater.com/qa/details/2583867886343291904
- “Airflow中PythonOperator参数传递方法”:http://bigbigben.com/2019/09/20/airflow-python-operator-variable/
- “Airflow中利用xcom实现task间信息传递”:https://kiwijia.work/2020/04/06/xcom/
- “Airflow源码分析(2)-xcom部分”:http://chace.in/2019/09/01/Airflow%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90(2)-xcom%E9%83%A8%E5%88%86/
发表评论