Airflow에서 DAG(Directed Acyclic Graph)는 워크플로우를 정의하는 핵심 개념으로 어떤 작업(Task)들이 어떤 순서로 실행되어야 하는지를 나타내는 설계도이다.
기본 DAG 작성법은 아래와 같다.
- module 추가
- defualt arguments 추가
- DAG 작성 (id, args, schedul_interval)
- Task 정의
- Dependencies 연결
module 추가
from airflow import DAG # airflow DAG 모듈
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
Default Arguments
default_args = {
'depends_on_past': False,
'email' : ['hjyoo@nc-and.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
'retry_delay': timedelta(minutes=5),
}
DAG 선언
- with문 이용한 선언
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
# DAG 정의
with DAG(
dag_id='tutorial',
default_args={
'depends_on_past': False,
'email' : ['hjyoo@nc-and.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
# 'queue': 'bash_queue',
# 'pool': 'backfill',
'retry_delay': timedelta(minutes=5),
},
description='묘사 내용 정의',
# Airflow 3.x 에서 권장하는 필드명
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# 예제 태스크용 callable
def say_hello():
print("Hello Airflow!")
# PythonOperator 태스크
hello_task = PythonOperator(
task_id='hello_task',
python_callable=say_hello,
)
-> schedule=timedelta(days=1) 은 하루에 한 번 실행 의미
- 표준 생성자로 선언
# 1) DAG 생성
my_dag = DAG(
dag_id="my_dag_name"
schedule="@daily",
start_date=pendulum.datetime(2025, 6, 24, tz="UTC"),
catchup=False,
tags=["example"], # 필요하다면 태그 추가
)
# 3) EmptyOperator 임포트 경로 변경
op = EmptyOperator(
task_id="task",
dag=my_dag
)
- 데코레이터 사용 선언
@dag(
dag_id="generate_dag", # 생략 시 함수명이 DAG ID
start_date=pendulum.datetime(2025, 6, 24, tz="UTC"),
schedule="@daily",
catchup=False,
tags=["example"], # 태그 필요하면 추가
)
def generate_dag():
# 태스크 정의: dag 파라미터는 내부 컨텍스트로 자동 연결됨
EmptyOperator(
task_id="task"
)
# DAG 객체 생성
dag = generate_dag()
Task 정의
- BashOperator 모듈을 사용하여 () 안에 정의 하여 사용
- task_id는 작업의 고유 식별자 역할
t1 = BashOperator(
task_id='print_date'
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
#jinja로 템플릿 정의하여 활용하기
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7) }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
DAG 문서 작성 및 추가
- dag_md라는 객체를 사용해 문서화 작업을 할 수 있음
t1.doc_md = dedent(
"""
doc_md 는 markdown으로 작성한 문서임
### 마크다운도 사용가능
속성 'doc_md'를 사용하여 문서작업 가능
'doc' : plian text,
'doc_rst',
'doc_json',
'doc_yaml',
과 같은 문서도 사용가능
doc은 ui에 Task Instance Details page에 렌더링 되어 보여짐
"""
)
dag.doc_md = __doc__
dag.doc_md = """
dag객체에 doc_md속성을 이용하면 dag에 속한 모든 곳에 doc이 표시됨
"""
Dependency 설정
# downstream 함수로 flow 그리기 가능
t1.set_downstream(t2)
# upstream 함수로 flow 그리기 가능
t2.set_upstream(t1)
# downstream 함수를 >> 로 대체 가능
t1 >> t2
# upstream 함수를 << 로 대체 가능
t2 << t1
# downstream을 연속적으로 표시하면 t1 끝나면 t2 끝나면 t3로 종송석 설정 가능
t1 >> t2 >> t3
# 배열로도 할당 가능
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
DAG example
import pendulum
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
# 한국 시간대로 설정
kst = pendulum.timezone("Asia/Seoul")
#default args
default_args = {
"owner": "Hello World",
"email": ["hjyoo@nc-and.com"],
"email_on_failure": False,
}
# @25/6/24 @once DAG 선언
with DAG(
dag_id="ex_hello_world",
default_args=default_args,
start_date=pendulum.datetime(2025, 6, 24, tz=kst),
description="print hello world",
schedule="@once",
catchup=False,
tags=["test"],
) as dag:
# task용 callable
def print_hello():
print("hello world")
# task 정의
t1 = EmptyOperator(
task_id="dummy_task_id",
retries=5,
)
t2 = PythonOperator(
task_id="hello_world",
python_callable=print_hello,
)
# dependency 설정
t1 >> t2
DAG 실행
1. 기본 예제 DAG 실행 옵션을 꺼준다
vim /root/airflow/airflow.cfg
-> [core] load_examples=True를 False로 변경하면 된다.
source ~/.bashrc
2. 디폴트 db인 sqlite를 사용하고 있기 때문에 executor를 변경 해 준다.
→ SQLite는 개발용(development)으로만 쓰고 SequentialExecutor만 사용해야 한다.
vim /root/airflow/airflow.cfg
source ~/.bashrc
→ 실제 서비스 때는 postgres나 mysql로 변경 해야 한다.
3. db 초기화
# db 초기화
airflow db reset --yes
# migration
airflow db migrate
→ 기본 예제 실행에 대한 metadata가 저장되어 있을 수 있기 때문에 초기화 후 다시 users 등록 해야 한다.
4. DAG가 바라보는 경로 확인
airflow config list
→ [core] 에서 load_examples = False이 되어 있는지 확인 후
→ dags_folder = /root/airflow/dags 이 경로가 생성 되어 있는지 확인
→ dags 폴더가 없다면 생성
5. dags 폴더로 파일 이동
→ 작성한 DAG를 위에서 생성한 폴더로 이동 시켜 줘야 shceduler가 인지하고 web ui에 로드하게 된다.
cp example.py /root/airflow/dags
6. airflow 프로세스 종료 후 재시작
pkill -f "airflow scheduler"
pkill -f "airflow dag-processor"
airflow dag-processor --daemon
airflow scheduler --daemon
→ —daemon 옵션으로 실행하면 백그라운드에서 실행이 되기 때문에 별도의 로그가 출력이 안된다.
→로그 형태로 출력 하고 싶다면 airflow dag-processor만 입력해서 dags 폴더 내 파일을 제대로 바라 보고 있는지 확인 가능하다.
→ 재시작 후 30초 뒤에 dags 폴더 내 파일을 제대로 바라 보고 있는지 확인 가능
airflow dags list

→ 이렇게 DAG File이 뜨면 제대로 바라 보고 있는 것
7. web ui에서 확인

→ dags list에 옮긴 파일이 제대로 뜬다면 web ui에서도 Dags에 작성한 task가 뜰 것
→ dag_id 왼쪽에 버튼이 pause/unpaused
→ schedule을 @once 로 했기 때문에 실행하면 바로 이 dag가 실행된다.

dag_id를 누르면 DAGRuns를 볼 수 있다.

좌측 상단 Graph를 누르면 DAG를 그래프 형식으로 볼 수 있다.

→ 의존성을 작성했기 때문에 dummy_task_id와 Hello_world가 연결 되어 있는 걸 볼 수 있다.