본문 바로가기
IT_Engineer/DataEngineer

[DE] 에어플로우 DAG 최적화와 모니터링 기법

by 좋은데이피치 2024. 11. 21.
728x90


[목표]

1. 효율적인 DAG 설계를 위한 베스트 프랙티스

2. DAG 성능을 개선하는 주요 기법

3. 모니터링 도구와 알림 설정을 통해 운영 환경에서 안정성을 보장


1. DAG 최적화: 베스트 프랙티스

에어플로우는 DAG 설계 시 효율적인 구조와 모범 사례를 따르는 것이 중요합니다. 아래는 최적화를 위한 주요 팁입니다.

1-1. 작업(Task) 병렬 처리

  • 작업 간 의존성을 최소화하여 병렬 처리를 최대화합니다.
  • DAG 정의 시 max_active_tasks와 pool을 사용해 리소스 제약 조건을 관리합니다.
  • from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime
    
    def sample_task(task_id):
        print(f"Running {task_id}")
    
    default_args = {
        'owner': 'airflow',
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    with DAG(
        dag_id='optimized_dag',
        default_args=default_args,
        schedule_interval='@hourly',
        start_date=datetime(2024, 11, 17),
        catchup=False,
        max_active_tasks=5,  # 병렬 처리 제한
    ) as dag:
    
        tasks = [
            PythonOperator(
                task_id=f'task_{i}',
                python_callable=sample_task,
                op_kwargs={'task_id': f'task_{i}'},
            ) for i in range(10)
        ]
    
        # 병렬 실행
        for task in tasks:
            task
 

1-2. DAG 복잡성 줄이기

  • DAG를 작은 단위로 분할하고, 상위 DAG에서 TriggerDagRunOperator를 사용해 서브 DAG 호출을 추천합니다.
  • 분할로 인해 DAG 실행 시간과 디버깅이 단축됩니다.
from airflow.operators.dagrun import TriggerDagRunOperator

trigger_subdag = TriggerDagRunOperator(
    task_id='trigger_subdag',
    trigger_dag_id='subdag_example',
)

2. DAG 성능 개선 기법

2-1. 작업 재시도 최적화

  • DAG 실패 시 비효율적인 재시도를 방지하기 위해 작업 별로 적절한 retry_delay와 retries를 설정합니다.
  • ExponentialBackoff를 적용하면 네트워크나 외부 시스템과의 작업에서 효과적입니다.
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'retries': 3,
    'retry_delay': timedelta(minutes=2),
    'retry_exponential_backoff': True,  # 지수적 증가
}

with DAG(
    dag_id='retry_optimized_dag',
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval='@daily',
) as dag:
    pass

 

2-2. XCom 크기 제한

  • XCom의 기본 데이터 저장소는 메타데이터 DB로, 대량 데이터 저장 시 성능 저하를 초래할 수 있습니다.
  • XCom 데이터는 반드시 JSON 직렬화 가능한 작은 데이터를 저장하고, 큰 데이터는 S3와 같은 외부 스토리지를 이용하세요.
# 비효율적: 큰 데이터를 XCom에 저장
return_value = {'large_data': list(range(1_000_000))}  

# 효율적: 데이터를 외부 저장소로 이동 후 URL만 저장
s3_path = "s3://bucket-name/path-to-large-data"

 

3. DAG 모니터링 및 알림 설정

3-1. SLA(Specific Level Agreement) 적용

  • 특정 작업이 정의된 시간 내에 완료되지 않을 경우 경고를 보내는 SLA를 활용합니다.
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'sla': timedelta(minutes=30),  # SLA 설정
}

def sample_task():
    import time
    time.sleep(3600)  # 의도적으로 SLA 위반

with DAG(
    dag_id='sla_example',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2024, 11, 17),
) as dag:

    task = PythonOperator(
        task_id='long_running_task',
        python_callable=sample_task,
    )

 

3-2. 알림 설정 (Slack 통합)

  • Airflow에서 Slack Webhook을 사용해 작업 상태를 알릴 수 있습니다.
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

slack_alert = SlackWebhookOperator(
    task_id='slack_alert',
    http_conn_id='slack_webhook',
    message="Task {{ task_instance.task_id }} failed!",
    channel='#alerts',
    username='airflow',
    trigger_rule='one_failed',  # 실패 시만 실행
)

 

3-3. Logs 및 Metrics 활용

  • Airflow Metrics를 Prometheus와 Grafana로 통합해 실시간 대시보드를 구성합니다.
  • 로깅 드라이버를 Elasticsearch로 변경해 로그 검색 및 분석 효율성을 높일 수 있습니다.

다음 글 예고:

다음 글에서는 Airflow 관련 혹은 kubernetes, AWS 관련 간단한 프랙티스 예제나

기초 내용을 정리하는 등 조금 더 DE로서 해볼 수 있는 다양한 부분을 고려하여 또 시리즈를 만들어볼게요!!

 

💡 도움이 되셨다면 구독하고 잘 따라와주세요!
     추가 질문이나 개선 아이디어는 댓글로 남겨주세요!

728x90

최근댓글

최근글