이전 글에서는 Spark를 Kubernetes 위에 배포하고, 실행하기까지의 과정을 다뤘습니다.
2024.11.17 - [IT_Engineer/DataEngineer] - [DE] Spark + Airflow #1: Spark on Kubernetes 구현하기
이번 글에서는 Apache Airflow를 통해 Spark 작업을 자동화하고,
Airflow와 Spark를 결합한 데이터처리 파이프라인을 구축해볼게요!
[ 목표 ]
- Airflow DAG(Directed Acyclic Graph)로 Spark작업 자동화
- Spark 작업 실행 및 모니터링
- Airflow와 Spark의 통합을 통해 AWS 기반 데이터 처리 흐름 구현
1. Apache Airflow 설정 및 준비
Airflow는 작업 스케줄링과 워크플로 관리를 위해 널리 사용되는 도구입니다.
이 글에서는 Docker 기반의 Airflow 환경을 구성하고 Spark 작업을 실행할 준비를 합니다.
1-1. Docker 기반 Airflow 설치
아래는 Airflow를 Docker로 설치하는 간단한 방법입니다.
# Airflow Docker Compose 파일 다운로드
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'
# 환경 설정
$ mkdir ./dags ./logs ./plugins
$ echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
# Airflow 초기화
$ docker-compose up airflow-init
# Airflow 시작
$ docker-compose up -d
설치가 완료되면 브라우저에서 http://localhost:8080으로 접속해 Airflow UI에 로그인할 수 있습니다.
(기본 계정: airflow / 비밀번호: airflow)
2. Airflow DAG으로 Spark 작업 자동화
2-1. Airflow와 Spark 통합
Airflow에서 Spark 작업을 실행하려면 **SparkSubmitOperator**를 사용합니다.
Airflow에 Spark 관련 Python 라이브러리를 설치합니다.
$ pip install apache-airflow-providers-apache-spark
2-2. SparkSubmitOperator를 사용한 DAG 작성
다음은 Spark 작업을 실행하는 DAG의 예제입니다.
이 DAG는 S3에 저장된 데이터를 Spark로 처리한 후 결과를 다시 S3에 저장하는 작업을 수행합니다.
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta
# 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
with DAG(
'spark_data_processing',
default_args=default_args,
description='Run Spark job via Airflow',
schedule_interval='@daily',
start_date=datetime(2024, 11, 17),
catchup=False,
) as dag:
spark_job = SparkSubmitOperator(
task_id='run_spark_job',
application='/opt/spark/examples/jars/spark-examples_2.12-3.3.2.jar',
conf={
'spark.executor.instances': 3,
'spark.kubernetes.container.image': '{Image주소}/spark:v1.0'
},
conn_id='spark_k8s', # Airflow Connection 설정 필요
name='spark_example',
verbose=True
)
spark_job
3. Airflow와 Kubernetes 연결
Airflow가 Kubernetes 클러스터와 통신하려면 적절한 연결 설정이 필요합니다.
아래는 Airflow UI에서 연결을 구성하는 방법입니다.
- Airflow UI 접속: 브라우저에서 http://localhost:8080으로 이동
- Connections로 이동: Admin > Connections 클릭
- 새로운 연결 추가:
- Connection Id: spark_k8s
- Connection Type: Kubernetes
- Host: https://{k8sIP}:6443
- Extra:
{
"namespace": "default",
"service_account": "spark"
}
4. Spark 작업 결과 확인
Airflow DAG이 성공적으로 실행되면,
Spark 작업 로그는 Kubernetes Dashboard 또는 kubectl 명령을 통해 확인할 수 있습니다.
# 실행된 Spark 작업 확인
$ kubectl get pods | grep spark
# 로그 확인
$ kubectl logs <spark-driver-pod-name>
5. AWS와의 통합
앞서 Spark 작업에서 사용된 데이터는 AWS S3에 저장된 데이터셋입니다. 아래는 S3와 Spark의 통합 예제입니다.
S3와 Spark 통합 설정
spark = SparkSession.builder \
.appName("S3 Example") \
.config("spark.hadoop.fs.s3a.access.key", "AWS_ACCESS_KEY") \
.config("spark.hadoop.fs.s3a.secret.key", "AWS_SECRET_KEY") \
.config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
.getOrCreate()
df = spark.read.csv("s3a://bucket-name/input-data.csv")
df.write.csv("s3a://bucket-name/output-data.csv")
6. 결론
이 글에서는 Airflow를 통해 Spark 작업을 자동화하고 AWS S3와 통합하여 데이터 파이프라인을 구축하는 방법을 살펴보았습니다. 다음 글에서는 에어플로우 DAG 최적화와 모니터링 기법을 다룰 예정입니다. Spark + Airflow 시리즈는 데이터 처리 파이프라인을 완성하는 데 필요한 모든 단계를 상세히 안내합니다.
💡 질문이나 제안사항이 있다면 댓글로 남겨주세요.
'IT_Engineer > DataEngineer' 카테고리의 다른 글
[DE] 데이터 엔지니어를 위한 AWS#3 Glue: ETL 작업 자동화하기 (2) | 2024.11.24 |
---|---|
[DE] 에어플로우 DAG 최적화와 모니터링 기법 (0) | 2024.11.21 |
[DE] Spark + Airflow #1: Spark on Kubernetes 구현하기 (0) | 2024.11.19 |
[DE] 데이터 엔지니어를 위한 AWS#2 Lambda: S3 이벤트 기반 파일 처리 자동화 (2) | 2024.11.18 |
[DE] 데이터 엔지니어를 위한 AWS 입문#1 : AWS 기초와 S3 사용법 (0) | 2024.11.17 |