728x90

Airflow란
파이썬으로 작성된 데이터 파이프라인(ETL) 프레임워크로 Airbnb에서 시작한 Apache 오픈소스 프로젝트이다.
→ 가장 많이 사용되는 데이터 파이프라인 관리 및 작성 프레임워크로 python 코드로 workflow를 작성하고 스케줄링, 모니터링 하는 플랫폼이며 데이터 파이프라인 스케줄링을 지원한다. (한 ETL 실행 끝나면 다음 ETL 실행)
→ 웹 UI를 제공하며 다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합하는 모듈도 제공하고 있다.
Documentation
Platform created by the community to programmatically author, schedule and monitor workflows.
airflow.apache.org

Airflow 구성
- Web Server
- 스케줄러와 DAG 실행 상황 시각화
- python Flask로 구현되어 있음
- Scheduler
- DAG들을 워커들에게 배정하는 역할 수행
- 정해진 시간에 실행되게 할 수 있으며 순차적 태스크 진행
- 스케줄러와 각 DAG의 실행 결과는 별도 DB에 저장
- Worker
- DAG를 실행하는 역할
- Metadata Database
- SQLite가 기본 설치
- 실제 프로덕션에서는 MySQL이나 Postgres를 사용해야 함
- Queue
- 다수 서버 구성인 경우에만 사용

→ 서버 한대 일 때 Airflow 구조

→ 다수 서버일 때 Airflow 구조
→ 스케일링 시 워커를 별도 서버에서 돌리고 워커가 있는 서버의 수를 늘리는 형태로 용량 증대
Airflow 구조

- 사용자는 UI를 통해 웹 서버와 통신
- Airflow 서버 안에 DAG directory가 있고 여기에 파이썬으로 작성된 파이프라인 코드가 있음
- 이 directory를 Airflow가 주기적으로 파싱해서 Metadata DB에 기록
- 스케줄러가 Executor를 통해 워커들에게 일 할당
- Executor 특성에 따라 Queue가 있기도 하고 없기도 함
Airflow 구동 순서
- web server 구동
- folder DAGs 파일 받아와 보여줌
- Scheduler에서 folder DAGs 보고 Metasotre에 정보 전달 후 DAGRun 생성
- DagRun에 해당하는 Task Instance 생성 및 Executor 전달
- Task가 여러 개면 Metasotre와 Task의 처리 상태 업데이트 하면서 Execute 진행
- 완료 시 DagRun에 완료 상태 보낸 후 Scheduler에 완료 상태 업데이트 알려 끝냄
⇒ Airflow 구동 시 Webserver를 구동하고 난 후 꼭 scheduler도 같이 구동해야 Dag 파일 불러와 읽음
Airflow 구성 요소
- workflow
- 가장 작은 단위인 Operator들이 모여 Task를 만들고 Task가 모여 DAG, DAG가 모여 Workflow가 됨
- DAG (Directed Acyclic Graph)
- Airflow에서 데이터 파이프라인을 DAG라고 부름
- 비선형 구조의 그래프라는 뜻으로 한쪽으로 방향이 흐르는 것을 의미

- 관계 의존성을 그래프로 그려 사용자가 flow를 잘 파악할 수 있도록 돕는 역할
- 1개의 DAG는 1개 이상의 task로 구성
- 3개의 task로 구성된다면 Extract, Transform, Load로 구성
- 기본 DAG 작성법
- DAG 선언 / task 정의 / >>, << 등으로 stream 정의
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner' : 'airflow',
'retries' : 1,
'retry_delay' : timedelta(minutes=5),
}
with DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=['example'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1 >> t2
- task
- Airflow 기본 실행 단위로 하나의 작업 단위를 task라고 함
- 여러 개의 task를 이용해 하나의 DAG 생성
- Airflow의 Operator로 만들어짐(Operator가 인스턴스화 된 것)
- Airflow에서 다양한 종류의 Operator를 제공하기 때문에 경우에 맞게 결정하거나 필요 시 직접 개발
- task 간 >>(downStream), <<(upstream) 이용해서 어떤 작업 할지 정할 수 있음

- Operator
- DAG를 구성하기 위한 가장 작은 단위
- DAGs가 작동하는 동안 workflow를 어떻게 작동하는지 묘사하는 것
- 하나의 task에 대해 정의할 수 있음
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
- Sensor
- 특정 조건이 채워지기를 기다리면 조건을 만족하는 경우 이후 Task로 넘어가게 하는 역할
from airflow.providers.http.sensors.http import HttpSensor
# Http가 호출 될때 까지 기다리는 Sensor
is_api_avaiable = HttpSensor(
task_id='is_api_available',
http_conn_id='opensea_api',
endpoint='api/~~/~~'
)
- pool
- 동시성 제어를 위해 Pool 제공
- 동시에 몇개의 DAG까지 진행할 수 있는지 조작 가능
- Xcom
- Airflow task 간 데이터 공유가 필요할 때 사용
- Airflow 간 데이터 공유 위해 push, pull로 해 전달 및 가져오기 가능
- Python Operator 사용 해 return 값이 자동으로 Xcom에 push 됨
- Variable
- Airflow에서 공통적으로 사용 가능한 변수들을 모아놓는 곳
- Web UI에서 관리가 가능하며 key-value 값으로 보관
- password, secret, passwd, authorization 등 키워드가 포함된 key를 가지는 경우 Web UI에서 정보가 자동으로 가려짐
- Connection
- 외부 시스템과 연결하는 방식에 대한 정보 저장하는 곳
- Operator, Hook 등에서 Connection의 정보를 사용 → UI에서 설정 가능
- Hook
- 외부 플랫폼에 대한 인터페이스를 제공하는 것
- Hive, S3, MySQL 등에 접근 가능한 다양한 Hook 제공
- 독립적으로 task가 될 수 없어 Operator와 함께 사용 됨
728x90