Tools/Spark

Spark RDD 연산 예제 모음

칼쵸쵸 2024. 3. 13. 00:12
 

TransFormation 연산

1. map

map 함수는 RDD의 각 요소에 주어진 함수를 적용하고 결과로 새 RDD를 생성합니다.

# RDD 생성
rdd = sc.parallelize([1, 2, 3, 4])
# 각 요소에 2를 곱함
result = rdd.map(lambda x: x*2)
result.collect()  # 결과: [2, 4, 6, 8]

2. filter

filter 함수는 주어진 조건 함수에 따라 요소를 필터링합니다. 조건에 맞는 요소만 포함하는 새 RDD를 반환합니다.

# RDD 생성
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 짝수만 필터링
result = rdd.filter(lambda x: x % 2 == 0)
result.collect()  # 결과: [2, 4]

3. distinct

distinct 함수는 RDD에서 중복 요소를 제거합니다.

# RDD 생성
rdd = sc.parallelize([1, 1, 2, 3, 2, 3, 4])
# 중복 제거
result = rdd.distinct()
result.collect()  # 결과: [1, 2, 3, 4]

4. groupBy

groupBy 함수는 주어진 함수의 결과 값에 따라 요소를 그룹화합니다. 이 함수는 (키, 값) 쌍의 형태로 이루어진 새 RDD를 반환합니다.

# RDD 생성
rdd = sc.parallelize(['Apple', 'Banana', 'Cherry', 'Date', 'Apple', 'Date'])
# 첫 글자에 따라 그룹화
result = rdd.groupBy(lambda x: x[0])
result.mapValues(list).collect()  # 결과: [('A', ['Apple', 'Apple']), ('B', ['Banana']), ('C', ['Cherry']), ('D', ['Date', 'Date'])]

5. sortBy

sortBy 함수는 주어진 함수의 반환 값에 따라 RDD의 요소를 정렬합니다.

# RDD 생성
rdd = sc.parallelize([4, 2, 3, 1])
# 요소를 오름차순으로 정렬
result = rdd.sortBy(lambda x: x)
result.collect()  # 결과: [1, 2, 3, 4]

 

 

Action연산

Apache Spark RDD의 액션(Action) 연산들은 실제 작업을 수행하고, 결과를 반환하거나 외부 시스템에 결과를 저장하는 데 사용됩니다. 여기에는 count, collect, take, top, first, reduce, fold, foreach 등이 포함됩니다. 각 연산의 설명과 예제를 살펴보겠습니다.

1. count

count는 RDD에 포함된 요소의 총 수를 반환합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.count()  # 결과: 5

2. collect

collect는 RDD의 모든 요소를 드라이버 프로그램으로 가져와 배열로 반환합니다. 대규모 데이터셋에는 주의해서 사용해야 합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.collect()  # 결과: [1, 2, 3, 4, 5]

3. take

take는 RDD에서 주어진 숫자만큼의 요소를 반환합니다.

rdd = sc.parallelize([5, 4, 3, 2, 1])
rdd.take(3)  # 결과: [5, 4, 3]

4. top

top은 RDD에서 가장 큰 N개의 요소를 반환합니다.

rdd = sc.parallelize([1, 3, 2, 5, 4])
rdd.top(3)  # 결과: [5, 4, 3]

5. first

first는 RDD의 첫 번째 요소를 반환합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.first()  # 결과: 1

6. reduce

reduce 연산은 RDD의 모든 요소를 두 요소의 함수에 의해 순차적으로 병합하여 단일 값으로 축소합니다. 이 함수는 결합 법칙을 만족해야 하지만, 항등원(identity element)이 필요하지 않습니다. 즉, 이 연산은 시작 값 없이 RDD의 요소들만 사용하여 결과를 도출합니다.

예제:

RDD의 모든 요소를 더하는 경우를 생각해 봅시다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.reduce(lambda a, b: a + b)  # 결과: 15

7.  fold

fold 연산은 reduce와 유사하지만, 추가적으로 "제로 값"(zero value)을 가집니다. 이 제로 값은 연산의 초기값으로 사용되며, RDD가 비어있을 경우에 반환되는 값입니다. 또한, 이 제로 값은 연산의 항등원이어야 합니다. 즉, 이 값과 어떤 요소를 연산해도 그 요소의 값이 변경되지 않는 값이어야 합니다. fold는 병렬 처리 시 각 파티션에 대해 제로 값을 초기값으로 사용하고, 최종적으로 각 파티션의 결과를 다시 제로 값과 병합합니다.

예제:

동일한 작업에 fold를 사용해 봅시다. 여기서는 0을 제로 값으로 사용합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.fold(0, lambda a, b: a + b)  # 결과: 15

 

이 예에서 fold는 각 파티션의 연산에 0을 시작 값으로 사용하고, 최종적으로 모든 파티션의 결과를 합합니다. 덧셈의 경우 0은 항등원이므로 연산 결과에 영향을 주지 않습니다.

 

만약 제로 값이 10인경우 (항등원이 아닌 경우)

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.fold(10, lambda a, b: a + b)  # 결과: ??

이 경우, 각 파티션의 연산 시작 시 10이 초기값으로 사용됩니다. 그리고 모든 파티션의 연산이 끝난 후, 각 파티션의 결과를 다시 10과 병합합니다.

만약 RDD가 2개의 파티션으로 나뉘어 있고, 각각 [1, 2, 3]과 [4, 5]를 포함한다고 가정하면, 각 파티션에 대한 fold 연산은 다음과 같이 진행될 것입니다.

  1. 첫 번째 파티션: 초기값 10에 [1, 2, 3]을 더합니다. 결과는 10 + 1 + 2 + 3 = 16
  2. 두 번째 파티션: 초기값 10에 [4, 5]를 더합니다. 결과는 10 + 4 + 5 = 19
  3. 최종 집계: 두 파티션의 결과 16과 19에 대해, 다시 제로 값 10을 사용하여 병합합니다. 즉, 10 + 16 + 19 = 45

RDD가 여러 파티션에 걸쳐 있고, 각 파티션에서 제로 값이 적용되었을 때의 계산 방식을 고려해야 합니다. 예시에서는 단순화를 위해 2개의 파티션을 가정했지만, 실제 파티션의 수와 데이터 분배 방식에 따라 결과는 달라질 수 있습니다.

 

파티션을 4개로 정해놓은경우

# RDD를 4개의 파티션으로 재분배
repartitionedRDD = rdd.repartition(4)
# 이후 fold 연산 수행
repartitionedRDD.fold(10, lambda x, y: x + y)

 

  1. 각 파티션의 초기값으로 10을 사용합니다.
  2. 각 파티션에서 연산이 수행됩니다.
  3. 모든 파티션의 결과가 집계되고, 이때도 제로 값 10이 사용됩니다.

파티션 수가 4개일 경우, 예를 들어 데이터가 파티션에 다음과 같이 분배될 수 있습니다(실제 분배는 Spark에 의해 결정됩니다):

  • 파티션 1: [1]
  • 파티션 2: [2]
  • 파티션 3: [3]
  • 파티션 4: [4, 5]

각 파티션에 대한 연산 후 결과는 각각 다음과 같이 됩니다:

  • 파티션 1: 10 + 1 = 11
  • 파티션 2: 10 + 2 = 12
  • 파티션 3: 10 + 3 = 13
  • 파티션 4: 10 + 4 + 5 = 19

이제 이러한 결과들을 집계할 때, 초기값 10이 다시 적용됩니다. 따라서 최종 결과는 모든 파티션의 결과 합계에 제로 값 10을 더한 값입니다:

11 + 12 + 13 + 19 + 10 = 65

 

차이점 정리

  • 제로 값의 유무: reduce는 시작 값이 없습니다. 반면, fold는 연산의 시작 값으로 사용되는 제로 값을 필요로 합니다.
  • 항등원: fold에서 사용하는 제로 값은 연산의 항등원이어야 합니다. 즉, 이 값으로 연산을 수행해도 다른 요소의 값이 변하지 않아야 합니다.
  • 빈 RDD 처리: reduce는 빈 RDD에서 사용할 수 없습니다. 만약 빈 RDD에 reduce를 사용하려 하면 오류가 발생합니다. 반면, fold는 제로 값이 있기 때문에 빈 RDD에 대해서도 사용할 수 있으며, 이 경우 제로 값이 결과로 반환됩니다.

8. foreach

foreach는 RDD의 각 요소에 대해 주어진 함수를 실행합니다. 주로 RDD 요소에 대한 부수 효과(side-effect)를 적용할 때 사용됩니다.

def printElement(element):
    print(element)

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.foreach(printElement)  # 콘솔에 1, 2, 3, 4, 5 출력

 

Pair Rdd 연산

Pair RDD는 키-값 쌍(key-value pairs)으로 이루어진 RDD를 말하며, 키-값 쌍에 대한 병렬 연산을 용이하게 해줍니다. 이러한 Pair RDD는 그룹화, 집계 및 조인과 같은 복잡한 데이터 처리 작업을 위해 특별히 설계되었습니다.

1. reduceByKey

reduceByKey 함수는 같은 키를 가진 값들에 대해 reduce 연산을 수행합니다. 이는 각 키에 대한 값들을 병합하여 단일 값으로 집계할 때 유용합니다.

rdd = sc.parallelize([("apple", 2), ("orange", 3), ("apple", 1), ("orange", 4)])
result = rdd.reduceByKey(lambda a, b: a + b)
result.collect()  # 결과: [('apple', 3), ('orange', 7)]

2. groupByKey

groupByKey 함수는 같은 키를 가진 모든 요소를 그룹화합니다. 이 함수는 키-값 쌍에서 키를 기준으로 값의 리스트를 생성합니다.

rdd = sc.parallelize([("apple", 2), ("orange", 3), ("apple", 1), ("orange", 4)])
result = rdd.groupByKey().mapValues(list)
result.collect()  # 결과: [('apple', [2, 1]), ('orange', [3, 4])]

3. mapValues

mapValues 함수는 Pair RDD의 값에 함수를 적용하되 키는 변경하지 않습니다.

rdd = sc.parallelize([("apple", 2), ("orange", 3)])
result = rdd.mapValues(lambda x: x * x)
result.collect()  # 결과: [('apple', 4), ('orange', 9)]

4. flatMapValues

flatMapValues 함수는 각 키에 대해 여러 개의 값이 생길 수 있도록 값을 "펼치는" 작업을 수행합니다.

rdd = sc.parallelize([("apple", 2), ("orange", 3)])
result = rdd.flatMapValues(lambda x: range(x))
result.collect()  # 결과: [('apple', 0), ('apple', 1), ('orange', 0), ('orange', 1), ('orange', 2)]

5. keys와 values

keys 함수와 values 함수는 각각 Pair RDD의 키 또는 값을 추출합니다.

rdd = sc.parallelize([("apple", 2), ("orange", 3)])
keys = rdd.keys()
values = rdd.values()
keys.collect()  # 결과: ['apple', 'orange']
values.collect()  # 결과: [2, 3]

6. sortByKey

sortByKey 함수는 Pair RDD를 키에 따라 정렬합니다.

rdd = sc.parallelize([("apple", 2), ("orange", 3), ("banana", 1)])
result = rdd.sortByKey()
result.collect()  # 결과: [('apple', 2), ('banana', 1), ('orange', 3)]

7. join

join 연산은 두 Pair RDD를 키를 기준으로 조인합니다. 각 키에 대해 왼쪽과 오른쪽 RDD 모두에 있는 값들의 쌍을 생성합니다.

 

rdd1 = sc.parallelize([("apple", 2), ("orange", 3)])
rdd2 = sc.parallelize([("apple", 5), ("orange", 6), ("banana", 7)])
result = rdd1.join(rdd2)
result.collect()  # 결과: [('apple', (2, 5)), ('orange', (3, 6))]

 

8. flatMapValues

Pair RDD의 각 키-값 쌍에 대해 값 부분에 함수를 적용하고, 그 결과로 나오는 여러 값을 키와 함께 펼쳐서(flatten) 새로운 RDD를 생성합니다. 즉, 각 키에 대해 여러 개의 새로운 키-값 쌍을 생성할 수 있습니다. 이는 mapValues 연산과 유사하지만, flatMapValues는 단일 값을 반환하는 대신, 여러 값을 반환하고 이들을 모두 RDD에 포함시킬 수 있다는 점에서 차이가 있습니다.

# Pair RDD 생성
rdd = sc.parallelize([("apple", 2), ("banana", 3)])

# flatMapValues를 사용하여 각 과일에 대해 0부터 해당 숫자 미만까지의 범위를 생성
result = rdd.flatMapValues(lambda x: range(x))

# 결과 출력
result.collect()

 

이 코드는 ("apple", 2)와 ("banana", 3)이라는 두 개의 키-값 쌍을 포함하는 Pair RDD에서 flatMapValues를 사용합니다. lambda x: range(x) 함수는 각 값에 대해 0부터 그 값 미만까지의 범위를 생성합니다. 따라서 이 함수는 다음과 같은 새로운 키-값 쌍을 생성합니다:

  • ("apple", 0)
  • ("apple", 1)
  • ("banana", 0)
  • ("banana", 1)
  • ("banana", 2)

 

RDD 간의 집합 연산

 

1. union

union 연산은 두 RDD를 결합하여, 두 RDD의 모든 요소를 포함하는 새로운 RDD를 생성합니다. 중복 요소는 제거되지 않고 그대로 유지됩니다.

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.union(rdd2)
result.collect()  # 결과: [1, 2, 3, 3, 4, 5]

 

 

2. intersect

intersect 연산은 두 RDD의 교집합을 찾아, 두 RDD에 모두 존재하는 요소만을 포함하는 새로운 RDD를 생성합니다.

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.intersect(rdd2)
result.collect()  # 결과: [3]

 

3. subtract

subtract 연산은 첫 번째 RDD에서 두 번째 RDD에 존재하는 요소를 제거한 후, 남은 요소만을 포함하는 새로운 RDD를 생성합니다.

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.subtract(rdd2)
result.collect()  # 결과: [1, 2]

 

4. subtractByKey

subtractByKey는 Pair RDD에만 사용될 수 있는 연산으로, 첫 번째 RDD에서 두 번째 RDD에 있는 키를 가진 요소들을 제거합니다. 이 연산의 결과로, 첫 번째 RDD에서만 고유한 키-값 쌍을 가진 새로운 RDD가 생성됩니다.

rdd1 = sc.parallelize([("a", 1), ("b", 2), ("c", 3)])
rdd2 = sc.parallelize([("a", 4), ("a", 5), ("b", 6)])
result = rdd1.subtractByKey(rdd2)
result.collect()  # 결과: [('c', 3)]

 

 

Numeric RDD 연산

1. sum

sum 함수는 RDD의 모든 숫자형 요소의 합을 계산합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.sum()  # 결과: 15

2. mean

mean (또는 평균) 함수는 RDD의 모든 숫자형 요소의 평균값을 계산합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.mean()  # 결과: 3.0

3. max와 min

max와 min 함수는 각각 RDD의 최대값과 최소값을 찾습니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.max()  # 결과: 5
rdd.min()  # 결과: 1

4. stdev와 variance

stdev와 variance 함수는 각각 RDD의 표준편차와 분산을 계산합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.stdev()  # 결과: 표준편차
rdd.variance()  # 결과: 분산

5. stats

stats 함수는 RDD의 숫자형 요소에 대한 요약 통계를 제공하는 StatCounter 객체를 반환합니다. 이 객체는 평균, 표준편차, 최소값, 최대값, 합계 및 개수를 포함합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
stats = rdd.stats()
# stats.mean, stats.stdev, stats.max, stats.min 등으로 접근 가능

6. histogram

histogram 함수는 주어진 버킷 수 또는 경계값을 기반으로 데이터의 히스토그램을 계산합니다. 이 함수는 데이터 분포를 분석할 때 유용합니다.

rdd = sc.parallelize([1, 2, 3, 4, 5])
buckets = [0, 3, 5]
histo = rdd.histogram(buckets)
# histo는 (버킷 경계값, 각 버킷에 속하는 요소의 개수)의 튜플을 반환