Airflow

Airflow | Airflow DAG 작성하기, 기본 DAG 작성법, DAG 실행법

토오오끼 2025. 8. 6. 23:03
728x90
반응형

 

Airflow에서 DAG(Directed Acyclic Graph)는 워크플로우를 정의하는 핵심 개념으로 어떤 작업(Task)들이 어떤 순서로 실행되어야 하는지를 나타내는 설계도이다.

 

기본 DAG 작성법은 아래와 같다.

  1. module 추가
  2. defualt arguments 추가
  3. DAG 작성 (id, args, schedul_interval)
  4. Task 정의
  5. Dependencies 연결

 

module 추가

from airflow import DAG                              # airflow DAG 모듈
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator

 

Default Arguments

default_args = {
	'depends_on_past': False,
	'email' : ['hjyoo@nc-and.com'],
  'email_on_failure': False,
  'email_on_retry': False,
  'retries': 1,
  # 'queue': 'bash_queue',
  # 'pool': 'backfill',
  'retry_delay': timedelta(minutes=5),
}

 

DAG 선언

  • with문 이용한 선언
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator

# DAG 정의
with DAG(
    dag_id='tutorial',
    default_args={
        'depends_on_past': False,
				'email' : ['hjyoo@nc-and.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        'retry_delay': timedelta(minutes=5),
    },
    description='묘사 내용 정의',
    # Airflow 3.x 에서 권장하는 필드명
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    # 예제 태스크용 callable
    def say_hello():
        print("Hello Airflow!")

    # PythonOperator 태스크
    hello_task = PythonOperator(
        task_id='hello_task',
        python_callable=say_hello,
    )

-> schedule=timedelta(days=1) 은 하루에 한 번 실행 의미

 

  • 표준 생성자로 선언
# 1) DAG 생성
my_dag = DAG(
    dag_id="my_dag_name"
    schedule="@daily",
    start_date=pendulum.datetime(2025, 6, 24, tz="UTC"),
    catchup=False,
    tags=["example"],             # 필요하다면 태그 추가
)

# 3) EmptyOperator 임포트 경로 변경
op = EmptyOperator(
    task_id="task",
    dag=my_dag
)

 

  • 데코레이터 사용 선언
@dag(
    dag_id="generate_dag",  # 생략 시 함수명이 DAG ID
    start_date=pendulum.datetime(2025, 6, 24, tz="UTC"),
    schedule="@daily",
    catchup=False,
    tags=["example"],       # 태그 필요하면 추가
)
def generate_dag():
    # 태스크 정의: dag 파라미터는 내부 컨텍스트로 자동 연결됨
    EmptyOperator(
        task_id="task"
    )

# DAG 객체 생성
dag = generate_dag()

 

 

Task 정의

  • BashOperator 모듈을 사용하여 () 안에 정의 하여 사용
  • task_id는 작업의 고유 식별자 역할
t1 = BashOperator(
	task_id='print_date'
	bash_command='date',
)

t2 = BashOperator(
	task_id='sleep',
	depends_on_past=False,
	bash_command='sleep 5',
	retries=3,
)

#jinja로 템플릿 정의하여 활용하기
templated_command = dedent(
	"""
		{% for i in range(5) %}
			echo "{{ ds }}"
			echo "{{ macros.ds_add(ds, 7) }}"
		{% endfor %}
	"""
)

t3 = BashOperator(
	task_id='templated',
	depends_on_past=False,
	bash_command=templated_command,
)

 

DAG 문서 작성 및 추가

  • dag_md라는 객체를 사용해 문서화 작업을 할 수 있음
t1.doc_md = dedent(
	"""
		doc_md 는 markdown으로 작성한 문서임 
		### 마크다운도 사용가능
		속성 'doc_md'를 사용하여 문서작업 가능 
		'doc' : plian text,
		'doc_rst', 
		'doc_json',  
		'doc_yaml',
		과 같은 문서도 사용가능 
		doc은 ui에 Task Instance Details page에 렌더링 되어 보여짐
	"""
)

dag.doc_md = __doc__
dag.doc_md = """
			dag객체에 doc_md속성을 이용하면 dag에 속한 모든 곳에 doc이 표시됨
			 """

 

Dependency 설정

# downstream 함수로 flow 그리기 가능
t1.set_downstream(t2)

# upstream 함수로 flow 그리기 가능
t2.set_upstream(t1)

# downstream 함수를 >> 로 대체 가능
t1 >> t2

# upstream 함수를 << 로 대체 가능
t2 << t1

# downstream을 연속적으로 표시하면 t1 끝나면 t2 끝나면 t3로 종송석 설정 가능
t1 >> t2 >> t3

# 배열로도 할당 가능
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

 

DAG example

import pendulum

from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator

# 한국 시간대로 설정
kst = pendulum.timezone("Asia/Seoul")

#default args
default_args = {
    "owner": "Hello World",
    "email": ["hjyoo@nc-and.com"],
    "email_on_failure": False,
}

# @25/6/24 @once DAG 선언
with DAG(
    dag_id="ex_hello_world",
    default_args=default_args,
    start_date=pendulum.datetime(2025, 6, 24, tz=kst),
    description="print hello world",
    schedule="@once",
    catchup=False,
    tags=["test"],
) as dag:
		
		# task용 callable
    def print_hello():
        print("hello world")
		
		# task 정의
    t1 = EmptyOperator(
        task_id="dummy_task_id",
        retries=5,
    )

    t2 = PythonOperator(
        task_id="hello_world",
        python_callable=print_hello,
    )
		
		# dependency 설정
    t1 >> t2

 

 

DAG 실행

1. 기본 예제 DAG 실행 옵션을 꺼준다

vim /root/airflow/airflow.cfg

-> [core] load_examples=True를 False로 변경하면 된다.

source ~/.bashrc

 

2. 디폴트 db인 sqlite를 사용하고 있기 때문에 executor를 변경 해 준다.

→ SQLite는 개발용(development)으로만 쓰고 SequentialExecutor만 사용해야 한다.

vim /root/airflow/airflow.cfg
source ~/.bashrc

→ 실제 서비스 때는 postgres나 mysql로 변경 해야 한다.

 

3. db 초기화

# db 초기화
airflow db reset --yes

# migration
airflow db migrate

→ 기본 예제 실행에 대한 metadata가 저장되어 있을 수 있기 때문에 초기화 후 다시 users 등록 해야 한다.

 

4. DAG가 바라보는 경로 확인

airflow config list

→ [core] 에서 load_examples = False이 되어 있는지 확인 후

→ dags_folder = /root/airflow/dags 이 경로가 생성 되어 있는지 확인

→ dags 폴더가 없다면 생성

 

5. dags 폴더로 파일 이동

→ 작성한 DAG를 위에서 생성한 폴더로 이동 시켜 줘야 shceduler가 인지하고 web ui에 로드하게 된다.

cp example.py /root/airflow/dags

 

6. airflow 프로세스 종료 후 재시작

pkill -f "airflow scheduler"
pkill -f "airflow dag-processor"

airflow dag-processor --daemon
airflow scheduler --daemon

→ —daemon 옵션으로 실행하면 백그라운드에서 실행이 되기 때문에 별도의 로그가 출력이 안된다.

→로그 형태로 출력 하고 싶다면 airflow dag-processor만 입력해서 dags 폴더 내 파일을 제대로 바라 보고 있는지 확인 가능하다.

 

→ 재시작 후 30초 뒤에 dags 폴더 내 파일을 제대로 바라 보고 있는지 확인 가능

airflow dags list

→ 이렇게 DAG File이 뜨면 제대로 바라 보고 있는 것

 

7. web ui에서 확인

→ dags list에 옮긴 파일이 제대로 뜬다면 web ui에서도 Dags에 작성한 task가 뜰 것

→ dag_id 왼쪽에 버튼이 pause/unpaused

→ schedule을 @once 로 했기 때문에 실행하면 바로 이 dag가 실행된다.

 

dag_id를 누르면 DAGRuns를 볼 수 있다.

executor가 제대로 변경되지 않아 web ui 상에서 실행 실패 된 화면...

 

좌측 상단 Graph를 누르면 DAG를 그래프 형식으로 볼 수 있다.

→ 의존성을 작성했기 때문에 dummy_task_id와 Hello_world가 연결 되어 있는 걸 볼 수 있다.

 

728x90
반응형