728x90
[목표]
1. 효율적인 DAG 설계를 위한 베스트 프랙티스
2. DAG 성능을 개선하는 주요 기법
3. 모니터링 도구와 알림 설정을 통해 운영 환경에서 안정성을 보장
1. DAG 최적화: 베스트 프랙티스
에어플로우는 DAG 설계 시 효율적인 구조와 모범 사례를 따르는 것이 중요합니다. 아래는 최적화를 위한 주요 팁입니다.
1-1. 작업(Task) 병렬 처리
- 작업 간 의존성을 최소화하여 병렬 처리를 최대화합니다.
- DAG 정의 시 max_active_tasks와 pool을 사용해 리소스 제약 조건을 관리합니다.
-
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def sample_task(task_id): print(f"Running {task_id}") default_args = { 'owner': 'airflow', 'retries': 1, 'retry_delay': timedelta(minutes=5), } with DAG( dag_id='optimized_dag', default_args=default_args, schedule_interval='@hourly', start_date=datetime(2024, 11, 17), catchup=False, max_active_tasks=5, # 병렬 처리 제한 ) as dag: tasks = [ PythonOperator( task_id=f'task_{i}', python_callable=sample_task, op_kwargs={'task_id': f'task_{i}'}, ) for i in range(10) ] # 병렬 실행 for task in tasks: task
1-2. DAG 복잡성 줄이기
- DAG를 작은 단위로 분할하고, 상위 DAG에서 TriggerDagRunOperator를 사용해 서브 DAG 호출을 추천합니다.
- 분할로 인해 DAG 실행 시간과 디버깅이 단축됩니다.
from airflow.operators.dagrun import TriggerDagRunOperator
trigger_subdag = TriggerDagRunOperator(
task_id='trigger_subdag',
trigger_dag_id='subdag_example',
)
2. DAG 성능 개선 기법
2-1. 작업 재시도 최적화
- DAG 실패 시 비효율적인 재시도를 방지하기 위해 작업 별로 적절한 retry_delay와 retries를 설정합니다.
- ExponentialBackoff를 적용하면 네트워크나 외부 시스템과의 작업에서 효과적입니다.
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'retries': 3,
'retry_delay': timedelta(minutes=2),
'retry_exponential_backoff': True, # 지수적 증가
}
with DAG(
dag_id='retry_optimized_dag',
default_args=default_args,
start_date=days_ago(1),
schedule_interval='@daily',
) as dag:
pass
2-2. XCom 크기 제한
- XCom의 기본 데이터 저장소는 메타데이터 DB로, 대량 데이터 저장 시 성능 저하를 초래할 수 있습니다.
- XCom 데이터는 반드시 JSON 직렬화 가능한 작은 데이터를 저장하고, 큰 데이터는 S3와 같은 외부 스토리지를 이용하세요.
# 비효율적: 큰 데이터를 XCom에 저장
return_value = {'large_data': list(range(1_000_000))}
# 효율적: 데이터를 외부 저장소로 이동 후 URL만 저장
s3_path = "s3://bucket-name/path-to-large-data"
3. DAG 모니터링 및 알림 설정
3-1. SLA(Specific Level Agreement) 적용
- 특정 작업이 정의된 시간 내에 완료되지 않을 경우 경고를 보내는 SLA를 활용합니다.
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'sla': timedelta(minutes=30), # SLA 설정
}
def sample_task():
import time
time.sleep(3600) # 의도적으로 SLA 위반
with DAG(
dag_id='sla_example',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2024, 11, 17),
) as dag:
task = PythonOperator(
task_id='long_running_task',
python_callable=sample_task,
)
3-2. 알림 설정 (Slack 통합)
- Airflow에서 Slack Webhook을 사용해 작업 상태를 알릴 수 있습니다.
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
slack_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id='slack_webhook',
message="Task {{ task_instance.task_id }} failed!",
channel='#alerts',
username='airflow',
trigger_rule='one_failed', # 실패 시만 실행
)
3-3. Logs 및 Metrics 활용
- Airflow Metrics를 Prometheus와 Grafana로 통합해 실시간 대시보드를 구성합니다.
- 로깅 드라이버를 Elasticsearch로 변경해 로그 검색 및 분석 효율성을 높일 수 있습니다.
다음 글 예고:
다음 글에서는 Airflow 관련 혹은 kubernetes, AWS 관련 간단한 프랙티스 예제나
기초 내용을 정리하는 등 조금 더 DE로서 해볼 수 있는 다양한 부분을 고려하여 또 시리즈를 만들어볼게요!!
💡 도움이 되셨다면 구독하고 잘 따라와주세요!
추가 질문이나 개선 아이디어는 댓글로 남겨주세요!
728x90
'IT_Engineer > DataEngineer' 카테고리의 다른 글
[DE] 데이터 엔지니어를 위한 AWS#3 Glue: ETL 작업 자동화하기 (1) | 2024.11.24 |
---|---|
[DE] Spark + Airflow #2: Airflow와 Spark를 활용한 데이터 처리 파이프라인 완성하기 (3) | 2024.11.20 |
[DE] Spark + Airflow #1: Spark on Kubernetes 구현하기 (0) | 2024.11.19 |
[DE] 데이터 엔지니어를 위한 AWS#2 Lambda: S3 이벤트 기반 파일 처리 자동화 (2) | 2024.11.18 |
[DE] 데이터 엔지니어를 위한 AWS 입문#1 : AWS 기초와 S3 사용법 (0) | 2024.11.17 |