Tools/Airflow

Airflow Dag와 Task

칼쵸쵸 2024. 10. 5. 00:01

 

1. DAG (Directed Acyclic Graph)

  • 정의: DAG는 Python 파일에서 정의됩니다. DAG는 워크플로우의 전체적인 구조와 실행 주기, 기본 설정 등을 포함합니다.
  • 작성 예시
from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 10, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'my_dag',  # DAG 이름
    default_args=default_args,
    description='A simple DAG example',
    schedule_interval=timedelta(days=1),  # 1일 간격으로 실행
)

 

DAG의 종류

  • 정적 DAG (Static DAG):
    • DAG가 고정된 형태로 정의되어 항상 동일한 워크플로우를 실행합니다.
    • 예시: 매일 동일한 데이터 파이프라인을 실행하는 경우.
  • 동적 DAG (Dynamic DAG):
    • DAG의 구조가 동적으로 변경됩니다. 파라미터에 따라 실행 시마다 작업 수나 조건이 달라질 수 있습니다.
    • 예시: 데이터의 크기에 따라 작업 수를 가변적으로 설정하는 경우.

주요 속성

  • schedule_interval: DAG의 실행 주기 (예: 매일, 매주, 매시간 실행)
  • start_date: DAG의 시작 날짜
  • catchup: 과거 실행되지 않은 DAG를 실행할지 여부

 

2. Task (Operator 사용)

  • 정의: 각 작업(Task)은 Operator를 통해 정의되며, Python으로 각각의 Task를 정의합니다. 다양한 Operator들이 있으며, 각각의 Operator는 특정 작업을 실행합니다.
  • 작성 예시
from airflow.operators.bash import BashOperator

task1 = BashOperator(
    task_id='print_date',
    bash_command='date',  # bash 명령어 실행
    dag=dag
)

 

주요 Task (Operator) 종류:

  1. BashOperator:
    • 기능: Bash 명령어를 실행합니다.
    • 사용 예시: bash_command="echo Hello, World!"
  2. PythonOperator:
    • 기능: Python 함수를 실행합니다.
    • 사용 예시: python_callable=my_function
  3. EmailOperator:
    • 기능: 이메일을 발송합니다.
    • 사용 예시: to='example@example.com', subject='Task Notification'
  4. HttpOperator:
    • 기능: HTTP 요청을 보냅니다.
    • 사용 예시: http_conn_id='my_http_conn', endpoint='api/endpoint'
  5. SqlOperator:
    • 기능: SQL 쿼리를 실행합니다.
    • 사용 예시: sql='SELECT * FROM my_table'
  6. S3FileTransformOperator:
    • 기능: S3 버킷에서 파일을 읽고 변환 작업을 수행합니다.
    • 사용 예시: source_s3_key='s3://my-bucket/my-file'
  7. DockerOperator:
    • 기능: Docker 컨테이너에서 작업을 실행합니다.
    • 사용 예시: image='my_docker_image'
  8. KubernetesPodOperator:
    • 기능: Kubernetes 클러스터 내에서 작업을 실행합니다.
    • 사용 예시: namespace='my_namespace', image='my_docker_image'
  9. BranchPythonOperator:
    • 기능: 조건에 따라 실행할 작업을 분기 처리합니다.
    • 사용 예시: python_callable=choose_branch
  10. SubDagOperator:
    • 기능: 하위 DAG를 실행합니다.
    • 사용 예시: DAG 내에서 또 다른 DAG를 정의하여 재사용할 수 있음.
  11. DummyOperator:
    • 기능: 아무 작업도 하지 않는 단순한 Operator로, 주로 DAG의 구조를 만들거나 테스트에 사용됩니다.
  • DAG는 정적 및 동적으로 나눌 수 있으며, 작업의 흐름과 스케줄을 정의합니다.
  • Task는 다양한 Operator를 통해 정의되며, 작업 유형에 따라 여러 종류가 있습니다.

5. 예제

예제 1: 기본적인 DAG와 Task

이 예제는 가장 기본적인 DAG와 Task의 구조를 보여줍니다. DAG는 Python 파일로 정의되며, 하나의 Task가 포함된 예제입니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 10, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
with DAG(
    'basic_dag_example',
    default_args=default_args,
    description='A simple DAG with one task',
    schedule_interval=timedelta(days=1),  # 매일 실행
    catchup=False,
) as dag:

    # Bash 명령어를 실행하는 Task
    task1 = BashOperator(
        task_id='print_date',
        bash_command='date',  # 현재 날짜 출력
    )

 

1. default_args 설정

default_args = {
    'owner': 'airflow',  # 이 DAG의 소유자를 'airflow'로 설정
    'depends_on_past': False,  # 이전 DAG 실행에 의존하지 않음
    'start_date': datetime(2024, 10, 1),  # DAG가 처음으로 실행될 시작 날짜
    'retries': 1,  # 실패 시 최대 1회 재시도
    'retry_delay': timedelta(minutes=5),  # 재시도 사이의 대기 시간(5분)
}

 

 

  • owner: 이 DAG의 소유자를 나타냅니다. 여기서는 'airflow'로 설정되어 있습니다.
  • depends_on_past: 이전 DAG 실행의 성공 여부에 의존하는지 여부입니다. False이므로 이전 실행에 상관없이 DAG가 실행됩니다.
  • start_date: 이 DAG의 시작 날짜를 2024년 10월 1일로 설정합니다.
  • retries: 만약 작업이 실패하면, 최대 1회까지 재시도하도록 설정되어 있습니다.

2. DAG 정의

with DAG(
    'basic_dag_example',
    default_args=default_args,
    description='A simple DAG with one task',
    schedule_interval=timedelta(days=1),  # 매일 실행
    catchup=False,
) as dag:

 

 

  • dag_id: 'basic_dag_example'라는 이름을 가진 DAG를 정의합니다.
  • default_args: 위에서 설정한 default_args를 적용합니다.
  • description: DAG에 대한 설명으로, "A simple DAG with one task"로 설명되어 있습니다.
  • schedule_interval: 이 DAG는 하루에 한 번씩 실행되도록 설정되어 있습니다. timedelta(days=1)는 매일 한 번을 의미합니다.
  • catchup=False: DAG가 처음 실행될 때 과거 날짜의 실행을 catchup하지 않도록 설정합니다. 즉, 과거 실행되지 않은 DAG는 무시하고 현재부터 시작합니다.

3. BashOperator로 작업 정의

task1 = BashOperator(
    task_id='print_date',
    bash_command='date',  # 현재 날짜 출력
)
  • task_id: 'print_date'라는 이름의 태스크(task)를 정의합니다.
  • bash_command: Bash 명령어로 date를 실행하여 현재 날짜를 출력합니다.

전체 요약

이 DAG는 2024년 10월 1일부터 시작하여 매일 한 번 실행됩니다. 실행할 작업은 Bash 명령어를 통해 현재 날짜를 출력하는 것입니다. 만약 태스크가 실패하면 5분 후에 한 번 더 재시도됩니다.

 

 

예제 2 : 실행순서와 병렬 실행

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 10, 1),
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}

# DAG 정의
with DAG(
        dag_id='dag_run_together',
        default_args=default_args,
        description='dag_run_together',
        schedule_interval=timedelta(minutes=1),  # 매일 실행
        catchup=False,
) as dag:
    # Bash 명령어를 실행하는 Task
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo 1',  # 현재 날짜 출력
    )

    # Bash 명령어를 실행하는 Task
    task2 = BashOperator(
        task_id='task2',
        bash_command='echo 2',  # 현재 날짜 출력
    )

    # Bash 명령어를 실행하는 Task
    task3 = BashOperator(
        task_id='task3',
        bash_command='echo 3',  # 현재 날짜 출력
    )

    # Bash 명령어를 실행하는 Task
    task4 = BashOperator(
        task_id='task4',
        bash_command='echo 4',  # 현재 날짜 출력
    )

    task1 >> [task2, task3] >> task4

 

 

이 코드에서 >>는 순방향 의존성을 나타냅니다. 즉, task1이 완료된 후에 task2와 task3가 동시에 실행되며, task2와 task3가 모두 완료된 후에 task4가 실행됩니다.

 

이 실행 순서는 다음과 같습니다:

  1. task1이 가장 먼저 실행됩니다.
  2. task2와 task3가 동시에 실행됩니다. 즉, task1이 끝난 후에 task2와 task3가 병렬로 실행됩니다.
  3. task2와 task3가 모두 완료된 후 task4가 실행됩니다.

따라서 순서를 다시 요약하면:

  • task1 -> task2, task3 (병렬 실행) -> task4

'Tools > Airflow' 카테고리의 다른 글

Docker Compose를 이용한 Airflow 설치  (2) 2024.10.01