Tools/Spark

Spark RDD 연산 주요 개념

칼쵸쵸 2024. 3. 11. 23:53
 

RDD 연산

RDD는 두 종류의 연산을 지원합니다:

  • Transformations: 원본 RDD를 변형하여 새로운 RDD를 생성하는 연산입니다. 예를 들어, map, filter, flatMap, union, distinct 등이 있습니다. Transformations는 지연 계산(lazy evaluation) 방식을 사용합니다. 즉, 실제 연산은 액션이 호출될 때까지 실행되지 않습니다.
  • Actions: RDD에 저장된 데이터에 대해 연산을 수행하고 결과를 반환하는 연산입니다. 예를 들어, count, collect, reduce, take 등이 있습니다. 액션 연산은 즉시 실행되며, 결과는 Spark 드라이버 프로그램으로 반환됩니다.

 

Lazy Evaluation

Apache Spark의 "Lazy Evaluation"은 실제 연산이 필요할 때까지 연산을 지연시키는 프로그래밍 모델입니다. 이는 계산 작업의 최적화와 리소스 사용의 효율성을 높이기 위해 설계되었습니다. Spark에서는 transformations(변환)이 lazy하게 평가되며, actions(액션)이 호출될 때 실제 계산이 수행됩니다.

from pyspark.sql import SparkSession

# SparkSession 초기화
spark = SparkSession.builder.appName("Lazy Evaluation Example").getOrCreate()

# RDD 생성
numbers = spark.sparkContext.parallelize([1, 2, 3, 4])

# Transformation: numbers RDD에 2를 곱하는 연산 (실제 계산은 여기서 수행되지 않음)
doubled_numbers = numbers.map(lambda x: x * 2)

# 여기까지의 코드는 실제로 어떠한 계산도 수행하지 않음. Transformation은 lazy하게 평가됨.

# Action: 실제 계산을 트리거. 이제 doubled_numbers RDD의 데이터를 계산함.
result = doubled_numbers.collect()

# 결과 출력
print(result)

 

이 예제에서 map은 변환(transformation)으로, RDD의 각 요소에 함수를 적용하지만, 실제 계산은 이루어지지 않습니다. collect는 액션(action)으로, 이를 호출할 때 처음으로 실제 데이터에 대한 연산이 수행되고, 그 결과가 반환됩니다. Lazy evaluation 덕분에 Spark는 전체 데이터 처리 파이프라인을 통해 필요한 모든 변환을 분석하고, 가능한 최적의 방법으로 계산을 수행할 수 있습니다.

 

Lazy evaluation은 Spark의 핵심 특징 중 하나로, 데이터 처리 작업의 효율성을 크게 향상시킵니다. Executor의 "lazy" 실행은 이 모델의 일부로, 실제 연산이 필요할 때까지 작업을 지연시키며, 이를 통해 불필요한 계산을 최소화하고 리소스를 효율적으로 사용할 수 있습니다.

캐싱 

캐싱 안함

# RDD 캐싱 안함
data = [i for i in range(100)]
rdd = sc.parallelize(data)
odds = rdd.filter(lambda x:x%2)
odds.persist()
odds.count()
odds.collect()

캐싱 함

바로 아래의 count 함수에 대한 결과를 캐싱함

# RDD 캐싱 후 처리
data2 = [i for i in range(100)]
rdd2 = sc.parallelize(data)
odds2 = rdd.filter(lambda x:x%2)
odds2.persist()
odds2.count()
odds2.collect()

 

  1. persist()가 있는 경우:
    • odds2.persist()를 호출하면, evens RDD(또는 DataFrame/Dataset)가 계산될 때 그 결과가 메모리에 캐싱됩니다. 실제로 메모리에 저장되는 것은 odds2에 대한 연산이 처음 실행될 때입니다. 여기서는 odds2.count()가 호출될 때 evens가 처음 계산되며, 그 결과가 캐싱됩니다.
    • 이후 odds2.collect()를 호출할 때, 이미 메모리에 캐싱된 데이터를 재사용하므로, 데이터를 다시 계산할 필요가 없어 처리 속도가 빨라집니다.
  2. persist()가 없는 경우:
    • persist() 호출이 빠지면, odds에 대한 각 연산(count(), collect())이 실행될 때마다 데이터를 처음부터 다시 계산해야 합니다. 이는 데이터를 로딩하고, 필요한 변환을 적용하는 등의 작업이 매번 반복되어야 함을 의미합니다.
    • 따라서, odds를 다시 사용할 때마다 전체 데이터 처리 파이프라인을 재실행해야 하므로, 연산 시간이 더 길어지고 리소스 사용이 비효율적이게 됩니다.

RDD 리니지

Apache Spark에서 RDD(Resilient Distributed Dataset)의 리니지(Lineage)는 RDD의 변환 과정을 추적하는 정보입니다. 이 리니지 정보를 통해 Spark는 데이터 손실이 발생했을 때 장애 복구를 수행할 수 있습니다. 리니지는 RDD가 어떤 변환을 거쳐 생성되었는지, 원본 데이터 소스는 무엇인지 등의 정보를 포함합니다.

RDD 변환에는 크게 두 가지 유형이 있습니다: Narrow Transformation과 Wide Transformation.

Narrow Transformation

Narrow Transformation은 각 입력 파티션에 대한 출력이 하나의 파티션으로만 이동하는 변환을 의미합니다. 이러한 변환은 셔플(shuffle)을 일으키지 않으며, 예를 들어 map, filter, flatMap 등이 여기에 해당합니다. 이 변환들은 종속성이 좁은(narrow) 범위 내에서만 발생하기 때문에, 네트워크를 통한 데이터 이동이 필요하지 않습니다.

Narrow Transformation 예제:

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# 초기 RDD 생성
rdd = sc.parallelize([1, 2, 3, 4])

# Narrow Transformation: 각 요소를 제곱하는 map 변환
squared_rdd = rdd.map(lambda x: x**2)

# 계산을 트리거하는 액션 실행
print(squared_rdd.collect())

 

이 예제에서 map은 narrow transformation입니다. 각 요소의 제곱을 계산하는데, 이는 입력 데이터의 파티션과 동일한 파티션에서 이루어집니다.

Wide Transformation

Wide Transformation(또는 Shuffle Transformation)은 결과 데이터의 파티션이 여러 입력 파티션에 걸쳐 있을 때 발생하는 변환입니다. 이는 셔플을 유발하며, 예를 들어 reduceByKey, groupBy, join 등이 여기에 해당합니다. 이러한 변환은 네트워크를 통한 데이터의 광범위한 이동을 필요로 하며, 결과적으로 성능에 영향을 줄 수 있습니다.

Wide Transformation 예제:

# 초기 RDD 생성
pairs_rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])

# Wide Transformation: 키별로 값 합계 계산
summed_rdd = pairs_rdd.reduceByKey(lambda a, b: a + b)

# 계산을 트리거하는 액션 실행
print(summed_rdd.collect())

이 예제에서 reduceByKey는 wide transformation입니다. 각 키("a", "b")에 대한 값들이 합산되는데, 이 과정에서 여러 파티션의 데이터가 하나의 파티션으로 모이게 됩니다.

 

아래는 Spark에서 복잡한 RDD 변환을 포함하는 예제 코드입니다. 이 코드는 여러 narrow transformation과 wide transformation을 결합하여, DAG(Directed Acyclic Graph) 시각화에서 리니지가 여러 개 표현되는 복잡한 작업 흐름을 생성합니다.

 

이 예제에서는 초기 RDD를 생성하고, 이에 대해 map (narrow transformation)과 filter (narrow transformation), reduceByKey (wide transformation), 그리고 join (wide transformation)을 적용합니다. 결과적으로, 이러한 변환들은 Spark UI의 DAG 시각화에서 다양한 스테이지와 리니지를 보여주는 복잡한 작업 흐름을 생성할 것입니다.

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# 초기 RDD 생성
rdd1 = sc.parallelize([("apple", 2), ("banana", 1), ("orange", 4), ("apple", 3), ("banana", 2), ("orange", 3)])

# Narrow Transformation: 키별로 값 더하기
rdd2 = rdd1.map(lambda x: (x[0], x[1] + 1))

# Narrow Transformation: 값이 4보다 큰 요소만 필터링
rdd3 = rdd2.filter(lambda x: x[1] > 4)

# Wide Transformation: 키별로 값 합계 계산
rdd4 = rdd2.reduceByKey(lambda a, b: a + b)

# 또 다른 RDD 생성 및 변환 적용
rdd5 = sc.parallelize([("apple", 5), ("orange", 6)])
rdd6 = rdd5.map(lambda x: (x[0], x[1] * 2))

# Wide Transformation: 두 RDD 조인
rdd7 = rdd4.join(rdd6)

# 최종 결과 계산을 위한 액션 실행
final_result = rdd7.collect()

# 결과 출력
print(final_result)

#[('orange', (9, 12)), ('apple', (7, 10))]

 

 

 

 

리니지는 Spark가 RDD의 전체 변환 과정을 추적할 수 있게 해주며, 이를 통해 장애 발생 시 데이터를 복구할 수 있는 기능을 제공합니다. Narrow transformation은 데이터 셔플링 없이 처리되며, Wide transformation은 셔플링을 포함하는 더 복잡한 데이터 처리 작업입니다. 이러한 이해는 Spark 애플리케이션을 최적화하는 데 중요합니다.