DataBase

[Data] Apache Spark : 대규모 데이터 처리

디아쿠 2025. 1. 10. 12:38

 

Apache Spark란 ?

Apache Spark는 대규모 데이터를 빠르게 처리하기 위한 분산 컴퓨팅 시스템입니다. Spark는 메모리 내 처리(in-memory processing) 방식으로, 하둡(MapReduce)보다 빠르게 데이터를 처리할 수 있습니다. Spark는 다양한 데이터 소스와 통합이 가능하고, 실시간 처리(Streaming), 기계 학습(MLlib), SQL 쿼리(Spark SQL), 그래프 처리(GraphX) 등 여러 용도로 사용할 수 있습니다.
 

Spark 활용 방법

  1. 데이터 처리: Spark는 RDD(Resilient Distributed Dataset)를 사용하여 데이터를 분산 처리합니다. 이를 통해 대규모 데이터를 빠르게 처리할 수 있습니다.
  2. 실시간 데이터 처리: Spark Streaming을 사용하여 실시간 데이터 파이프라인을 구축할 수 있습니다.
  3. SQL 쿼리: Spark SQL을 사용하면 SQL 문법을 활용하여 데이터를 처리할 수 있습니다. 이는 데이터베이스와 유사하게 데이터를 쿼리할 수 있게 해줍니다.
  4. 기계 학습: MLlib 라이브러리를 통해 기계 학습 모델을 훈련시키고 예측을 수행할 수 있습니다.

 
Spark 시작하기
 
1. Spark 설치
먼저 Spark를 실행하기 위해서는 관련된 라이브러리 및 종속성들을 설치해야 합니다. PySpark을 사용하면, Python 환경에서 사용이 가능합니다.

pip install pyspark

 
2. SparkSession 생성
Spark에서는 ` SparkSession` 을 통해 Spark 작업을 설정합니다. SparkSession은 DataFrame과 SQL 쿼리 등을 처리하는 핵심 객체입니다.
 
간단한 예제 몇 개를 수행 해 봅시다.
 

예제 1: CSV 파일 읽기 및 기본 데이터 처리

다음은 CSV 파일을 읽고 기본적인 데이터를 처리하는 예제입니다.
 

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder.appName("SparkExample").getOrCreate()

# CSV 파일 읽기
data = spark.read.csv("sample_data.csv", header=True, inferSchema=True)

# 데이터 확인
data.show()

# 필터링 예시: 나이가 30 이상인 데이터만 필터링
filtered_data = data.filter(data['age'] >= 30)

# 결과 출력
filtered_data.show()

# 데이터 저장: 결과를 새로운 CSV 파일로 저장
filtered_data.write.csv("filtered_data.csv", header=True)
  1. SparkSession 생성: Spark 애플리케이션을 시작하기 위해 SparkSession 객체를 생성합니다.
  2. CSV 파일 읽기: spark.read.csv() 메서드를 사용해 CSV 파일을 읽고 DataFrame을 생성합니다. header=True는 첫 번째 줄을 컬럼 이름으로 사용하고, inferSchema=True는 데이터 유형을 자동으로 추론합니다.
  3. 필터링: DataFrame에서 age가 30 이상인 데이터를 필터링합니다.
  4. 데이터 출력: show() 메서드로 데이터를 콘솔에 출력합니다.
  5. 데이터 저장: 필터링된 데이터를 새로운 CSV 파일로 저장합니다.

 

예제 2: DataFrame과 SQL 쿼리 사용하기

Spark는 SQL을 사용할 수 있도록 지원합니다. 아래는 SQL 쿼리를 사용하여 데이터 처리하는 예제입니다.
 

from pyspark.sql import SparkSession

# SparkSession 생성
spark = SparkSession.builder.appName("SparkSQLExample").getOrCreate()

# 데이터 생성 (예시로 DataFrame을 직접 생성)
data = [(1, "John", 25), (2, "Jane", 30), (3, "Sam", 35), (4, "Anna", 40)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)

# DataFrame을 SQL 테이블로 등록
df.createOrReplaceTempView("people")

# SQL 쿼리 실행
result = spark.sql("SELECT name, age FROM people WHERE age > 30")

# 결과 출력
result.show()

 

  1. DataFrame 생성: Python 리스트를 사용해 DataFrame을 생성하고, 컬럼 이름을 지정합니다.
  2. SQL 쿼리: createOrReplaceTempView()를 사용하여 DataFrame을 SQL 테이블로 등록한 후, spark.sql()을 사용하여 SQL 쿼리를 실행합니다.
  3. 결과 출력: show() 메서드로 결과를 출력합니다.

예제 3: 실시간 데이터 스트리밍 처리 (Spark Streaming)

Spark는 실시간 데이터를 처리하는 기능도 지원합니다. 예를 들어, Kafka에서 실시간으로 데이터를 읽어 처리하는 방식입니다. 이번 예제에서는 Kafka에서 데이터를 읽어와 처리하는 방식으로 간단히 설명합니다.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

# SparkSession 생성
spark = SparkSession.builder.appName("StreamExample").getOrCreate()

# Kafka에서 데이터 읽기 (예시로 localhost:9092에서 데이터 스트리밍)
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test_topic").load()

# Kafka 데이터에서 value 컬럼을 문자열로 변환
df = df.selectExpr("CAST(value AS STRING)")

# 문자열을 공백 기준으로 분리하여 각 단어를 DataFrame으로 변환
words = df.select(explode(split(df.value, " ")).alias("word"))

# 단어의 빈도 계산
word_counts = words.groupBy("word").count()

# 스트리밍 쿼리 실행 (콘솔에 출력)
query = word_counts.writeStream.outputMode("complete").format("console").start()

# 스트리밍 대기
query.awaitTermination()

 
 

  1. Kafka 스트리밍: spark.readStream.format("kafka")를 사용해 Kafka에서 데이터를 읽어옵니다. Kafka의 bootstrap.servers와 subscribe 옵션으로 Kafka 서버와 토픽을 지정합니다.
  2. 데이터 처리: Kafka에서 읽은 데이터를 value 컬럼을 기준으로 문자열로 변환하고, 공백을 기준으로 단어를 나누어 단어 빈도를 계산합니다.
  3. 스트리밍 출력: writeStream()을 사용하여 스트리밍 데이터를 실시간으로 콘솔에 출력합니다.

 


 
Apache Spark는 대규모 데이터 처리를 위한 툴입니다. 배치처리, 실시간처리, 기계학습, SQL 쿼리 등 다양한 기능을 제공하며, 데이터 분석 및 처리를 효율적으로 수행할 수 있습니다.
위 예제들을 수행 해 보시면서, 기본적인 사용법을 익히고 다양한 기능들을 활용하기를 바랍니다 !
 
 
 

728x90