1. DAG (Directed Acyclic Graph)
- 정의: DAG는 Python 파일에서 정의됩니다. DAG는 워크플로우의 전체적인 구조와 실행 주기, 기본 설정 등을 포함합니다.
- 작성 예시
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 10, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'my_dag', # DAG 이름
default_args=default_args,
description='A simple DAG example',
schedule_interval=timedelta(days=1), # 1일 간격으로 실행
)
DAG의 종류
- 정적 DAG (Static DAG):
- DAG가 고정된 형태로 정의되어 항상 동일한 워크플로우를 실행합니다.
- 예시: 매일 동일한 데이터 파이프라인을 실행하는 경우.
- 동적 DAG (Dynamic DAG):
- DAG의 구조가 동적으로 변경됩니다. 파라미터에 따라 실행 시마다 작업 수나 조건이 달라질 수 있습니다.
- 예시: 데이터의 크기에 따라 작업 수를 가변적으로 설정하는 경우.
주요 속성
- schedule_interval: DAG의 실행 주기 (예: 매일, 매주, 매시간 실행)
- start_date: DAG의 시작 날짜
- catchup: 과거 실행되지 않은 DAG를 실행할지 여부
2. Task (Operator 사용)
- 정의: 각 작업(Task)은 Operator를 통해 정의되며, Python으로 각각의 Task를 정의합니다. 다양한 Operator들이 있으며, 각각의 Operator는 특정 작업을 실행합니다.
- 작성 예시
from airflow.operators.bash import BashOperator
task1 = BashOperator(
task_id='print_date',
bash_command='date', # bash 명령어 실행
dag=dag
)
주요 Task (Operator) 종류:
- BashOperator:
- 기능: Bash 명령어를 실행합니다.
- 사용 예시: bash_command="echo Hello, World!"
- PythonOperator:
- 기능: Python 함수를 실행합니다.
- 사용 예시: python_callable=my_function
- EmailOperator:
- 기능: 이메일을 발송합니다.
- 사용 예시: to='example@example.com', subject='Task Notification'
- HttpOperator:
- 기능: HTTP 요청을 보냅니다.
- 사용 예시: http_conn_id='my_http_conn', endpoint='api/endpoint'
- SqlOperator:
- 기능: SQL 쿼리를 실행합니다.
- 사용 예시: sql='SELECT * FROM my_table'
- S3FileTransformOperator:
- 기능: S3 버킷에서 파일을 읽고 변환 작업을 수행합니다.
- 사용 예시: source_s3_key='s3://my-bucket/my-file'
- DockerOperator:
- 기능: Docker 컨테이너에서 작업을 실행합니다.
- 사용 예시: image='my_docker_image'
- KubernetesPodOperator:
- 기능: Kubernetes 클러스터 내에서 작업을 실행합니다.
- 사용 예시: namespace='my_namespace', image='my_docker_image'
- BranchPythonOperator:
- 기능: 조건에 따라 실행할 작업을 분기 처리합니다.
- 사용 예시: python_callable=choose_branch
- SubDagOperator:
- 기능: 하위 DAG를 실행합니다.
- 사용 예시: DAG 내에서 또 다른 DAG를 정의하여 재사용할 수 있음.
- DummyOperator:
- 기능: 아무 작업도 하지 않는 단순한 Operator로, 주로 DAG의 구조를 만들거나 테스트에 사용됩니다.
- DAG는 정적 및 동적으로 나눌 수 있으며, 작업의 흐름과 스케줄을 정의합니다.
- Task는 다양한 Operator를 통해 정의되며, 작업 유형에 따라 여러 종류가 있습니다.
5. 예제
예제 1: 기본적인 DAG와 Task
이 예제는 가장 기본적인 DAG와 Task의 구조를 보여줍니다. DAG는 Python 파일로 정의되며, 하나의 Task가 포함된 예제입니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 10, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
with DAG(
'basic_dag_example',
default_args=default_args,
description='A simple DAG with one task',
schedule_interval=timedelta(days=1), # 매일 실행
catchup=False,
) as dag:
# Bash 명령어를 실행하는 Task
task1 = BashOperator(
task_id='print_date',
bash_command='date', # 현재 날짜 출력
)
1. default_args 설정
default_args = {
'owner': 'airflow', # 이 DAG의 소유자를 'airflow'로 설정
'depends_on_past': False, # 이전 DAG 실행에 의존하지 않음
'start_date': datetime(2024, 10, 1), # DAG가 처음으로 실행될 시작 날짜
'retries': 1, # 실패 시 최대 1회 재시도
'retry_delay': timedelta(minutes=5), # 재시도 사이의 대기 시간(5분)
}
- owner: 이 DAG의 소유자를 나타냅니다. 여기서는 'airflow'로 설정되어 있습니다.
- depends_on_past: 이전 DAG 실행의 성공 여부에 의존하는지 여부입니다. False이므로 이전 실행에 상관없이 DAG가 실행됩니다.
- start_date: 이 DAG의 시작 날짜를 2024년 10월 1일로 설정합니다.
- retries: 만약 작업이 실패하면, 최대 1회까지 재시도하도록 설정되어 있습니다.
2. DAG 정의
with DAG(
'basic_dag_example',
default_args=default_args,
description='A simple DAG with one task',
schedule_interval=timedelta(days=1), # 매일 실행
catchup=False,
) as dag:
- dag_id: 'basic_dag_example'라는 이름을 가진 DAG를 정의합니다.
- default_args: 위에서 설정한 default_args를 적용합니다.
- description: DAG에 대한 설명으로, "A simple DAG with one task"로 설명되어 있습니다.
- schedule_interval: 이 DAG는 하루에 한 번씩 실행되도록 설정되어 있습니다. timedelta(days=1)는 매일 한 번을 의미합니다.
- catchup=False: DAG가 처음 실행될 때 과거 날짜의 실행을 catchup하지 않도록 설정합니다. 즉, 과거 실행되지 않은 DAG는 무시하고 현재부터 시작합니다.
3. BashOperator로 작업 정의
task1 = BashOperator(
task_id='print_date',
bash_command='date', # 현재 날짜 출력
)
- task_id: 'print_date'라는 이름의 태스크(task)를 정의합니다.
- bash_command: Bash 명령어로 date를 실행하여 현재 날짜를 출력합니다.
전체 요약
이 DAG는 2024년 10월 1일부터 시작하여 매일 한 번 실행됩니다. 실행할 작업은 Bash 명령어를 통해 현재 날짜를 출력하는 것입니다. 만약 태스크가 실패하면 5분 후에 한 번 더 재시도됩니다.
예제 2 : 실행순서와 병렬 실행
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
# 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 10, 1),
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
# DAG 정의
with DAG(
dag_id='dag_run_together',
default_args=default_args,
description='dag_run_together',
schedule_interval=timedelta(minutes=1), # 매일 실행
catchup=False,
) as dag:
# Bash 명령어를 실행하는 Task
task1 = BashOperator(
task_id='task1',
bash_command='echo 1', # 현재 날짜 출력
)
# Bash 명령어를 실행하는 Task
task2 = BashOperator(
task_id='task2',
bash_command='echo 2', # 현재 날짜 출력
)
# Bash 명령어를 실행하는 Task
task3 = BashOperator(
task_id='task3',
bash_command='echo 3', # 현재 날짜 출력
)
# Bash 명령어를 실행하는 Task
task4 = BashOperator(
task_id='task4',
bash_command='echo 4', # 현재 날짜 출력
)
task1 >> [task2, task3] >> task4
이 코드에서 >>는 순방향 의존성을 나타냅니다. 즉, task1이 완료된 후에 task2와 task3가 동시에 실행되며, task2와 task3가 모두 완료된 후에 task4가 실행됩니다.
이 실행 순서는 다음과 같습니다:
- task1이 가장 먼저 실행됩니다.
- task2와 task3가 동시에 실행됩니다. 즉, task1이 끝난 후에 task2와 task3가 병렬로 실행됩니다.
- task2와 task3가 모두 완료된 후 task4가 실행됩니다.
따라서 순서를 다시 요약하면:
- task1 -> task2, task3 (병렬 실행) -> task4
'Tools > Airflow' 카테고리의 다른 글
Docker Compose를 이용한 Airflow 설치 (2) | 2024.10.01 |
---|