Airflow의 execution_dateHiveOperator의 활용

Happyprg
3 min readMar 22, 2021
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 3, 15, 0, 0),
'email_on_failure': get_email_send_flag(True),
'email_on_retry': get_email_send_flag(False),
'retries': 2,
'retry_delay': timedelta(minutes=1),
}

dag = DAG(os.path.basename(__file__)[:-3],
catchup=True,
default_args=default_args,
concurrency=3, max_active_runs=3,
schedule_interval='10 13 * * *')

위와 같은 설정의 dag가 있다고 가정한다. 그리고 배포를 하게되면

2021년 3월 16일 13시10분에 스케쥴된 작업에 대한 처리가 시작된다.

이때 전달되는 taskinstance의 execution_date값은 3월 15일이다.

우리가 원하는 것은 스케쥴링 된 당일(3월 16일)의 값으로 executiondate를 활용해 HiveOperator의 SQL문이 렌더링 되길 바랬다.

execution_date= dag.startdate부터 현재시간까지의 기간을 interval(daily)로 나눈 값인거 같다. 따라서 15일에 대한 작업을 16일에 수행을 하게되며 이때의 executiondate값은 16일이 아닌 전일인 15일이 된다. 따라서 Airflow jinja2. template에서 render되는 Pendulum의 값은 nextexecutiondate를 써야 바라는 결과값은 2021-03-16일이 된다.

아래는 위 내용에 대한 Hiveql 쿼리 template 예제이다.

select "{{ (next_execution_date.in_tz('Asia/Seoul')).strftime('%Y-%m-%d %H:%M:%S') }}" as pdp_abuse_confirmation_kst_datetime,
...
where a.year >= "{{ macros.ds_format(macros.ds_add(next_execution_date.strftime('%Y-%m-%d'), -30), '%Y-%m-%d', '%Y') }}" AND
a.month >= "{{ macros.ds_format(macros.ds_add(next_execution_date.strftime('%Y-%m-%d'), -30), '%Y-%m-%d', '%-m') }}" AND
a.month <= "{{ macros.ds_format(next_execution_date.strftime('%Y-%m-%d'), '%Y-%m-%d', '%-m') }}" AND
b.year = "{{ execution_date.year }}" AND
b.month = "{{ execution_date.month }}" AND
b.day = "{{ execution_date.day }}" AND

--

--