본문 바로가기
IT_Engineer/DataEngineer

[DE] Spark + Airflow #2: Airflow와 Spark를 활용한 데이터 처리 파이프라인 완성하기

by 좋은데이피치 2024. 11. 20.
728x90

이전 글에서는 Spark를 Kubernetes 위에 배포하고, 실행하기까지의 과정을 다뤘습니다.

2024.11.17 - [IT_Engineer/DataEngineer] - [DE] Spark + Airflow #1: Spark on Kubernetes 구현하기

 

[DE] Spark + Airflow #1: Spark on Kubernetes 구현하기

1. 배경현대 데이터 처리 시스템은 데이터 보관 > 데이터 통합 > 데이터 처리의 단계를 거칩니다. AWS에서 이 과정을 지원하는 대표적인 서비스는 아래와 같습니다:AWS S3: 확장성, 데이터 가용성,

g1-kim.tistory.com

 

 

이번 글에서는 Apache Airflow를 통해 Spark 작업을 자동화하고,
Airflow와 Spark를 결합한 데이터처리 파이프라인을 구축해볼게요!


[ 목표 ] 

- Airflow DAG(Directed Acyclic Graph)로 Spark작업 자동화

- Spark 작업 실행 및 모니터링

- Airflow와 Spark의 통합을 통해 AWS 기반 데이터 처리 흐름 구현


 

 

1. Apache Airflow 설정 및 준비

Airflow는 작업 스케줄링과 워크플로 관리를 위해 널리 사용되는 도구입니다.

이 글에서는 Docker 기반의 Airflow 환경을 구성하고 Spark 작업을 실행할 준비를 합니다.

1-1. Docker 기반 Airflow 설치

아래는 Airflow를 Docker로 설치하는 간단한 방법입니다.

# Airflow Docker Compose 파일 다운로드
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.6.3/docker-compose.yaml'

# 환경 설정
$ mkdir ./dags ./logs ./plugins
$ echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env

# Airflow 초기화
$ docker-compose up airflow-init

# Airflow 시작
$ docker-compose up -d

설치가 완료되면 브라우저에서 http://localhost:8080으로 접속해 Airflow UI에 로그인할 수 있습니다.
(기본 계정: airflow / 비밀번호: airflow)


2. Airflow DAG으로 Spark 작업 자동화

2-1. Airflow와 Spark 통합

Airflow에서 Spark 작업을 실행하려면 **SparkSubmitOperator**를 사용합니다.

Airflow에 Spark 관련 Python 라이브러리를 설치합니다.

$ pip install apache-airflow-providers-apache-spark

2-2. SparkSubmitOperator를 사용한 DAG 작성

다음은 Spark 작업을 실행하는 DAG의 예제입니다.

이 DAG는 S3에 저장된 데이터를 Spark로 처리한 후 결과를 다시 S3에 저장하는 작업을 수행합니다.

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime, timedelta

# 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG 정의
with DAG(
    'spark_data_processing',
    default_args=default_args,
    description='Run Spark job via Airflow',
    schedule_interval='@daily',
    start_date=datetime(2024, 11, 17),
    catchup=False,
) as dag:
    
    spark_job = SparkSubmitOperator(
        task_id='run_spark_job',
        application='/opt/spark/examples/jars/spark-examples_2.12-3.3.2.jar',
        conf={
            'spark.executor.instances': 3,
            'spark.kubernetes.container.image': '{Image주소}/spark:v1.0'
        },
        conn_id='spark_k8s',  # Airflow Connection 설정 필요
        name='spark_example',
        verbose=True
    )

    spark_job

3. Airflow와 Kubernetes 연결

Airflow가 Kubernetes 클러스터와 통신하려면 적절한 연결 설정이 필요합니다.

아래는 Airflow UI에서 연결을 구성하는 방법입니다.

  1. Airflow UI 접속: 브라우저에서 http://localhost:8080으로 이동
  2. Connections로 이동: Admin > Connections 클릭
  3. 새로운 연결 추가:
    • Connection Id: spark_k8s
    • Connection Type: Kubernetes
    • Host: https://{k8sIP}:6443
    • Extra:
{
  "namespace": "default",
  "service_account": "spark"
}

4. Spark 작업 결과 확인

Airflow DAG이 성공적으로 실행되면,

Spark 작업 로그는 Kubernetes Dashboard 또는 kubectl 명령을 통해 확인할 수 있습니다.

# 실행된 Spark 작업 확인
$ kubectl get pods | grep spark

# 로그 확인
$ kubectl logs <spark-driver-pod-name>

5. AWS와의 통합

앞서 Spark 작업에서 사용된 데이터는 AWS S3에 저장된 데이터셋입니다. 아래는 S3와 Spark의 통합 예제입니다.

S3와 Spark 통합 설정

spark = SparkSession.builder \
    .appName("S3 Example") \
    .config("spark.hadoop.fs.s3a.access.key", "AWS_ACCESS_KEY") \
    .config("spark.hadoop.fs.s3a.secret.key", "AWS_SECRET_KEY") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
    .getOrCreate()

df = spark.read.csv("s3a://bucket-name/input-data.csv")
df.write.csv("s3a://bucket-name/output-data.csv")

6. 결론

이 글에서는 Airflow를 통해 Spark 작업을 자동화하고 AWS S3와 통합하여 데이터 파이프라인을 구축하는 방법을 살펴보았습니다. 다음 글에서는 에어플로우 DAG 최적화와 모니터링 기법을 다룰 예정입니다. Spark + Airflow 시리즈는 데이터 처리 파이프라인을 완성하는 데 필요한 모든 단계를 상세히 안내합니다.


💡 질문이나 제안사항이 있다면 댓글로 남겨주세요.

 
728x90

최근댓글

최근글