카테고리 없음

Spark 파니셔닝

칼쵸쵸 2024. 3. 21. 11:42

Spark 파티셔닝

 

Apache Spark에서 RDD(Resilient Distributed Dataset) 파티셔닝은 데이터를 분산 처리하기 위해 데이터셋을 물리적으로 분할하는 방식입니다. 파티셔닝은 Spark의 병렬 처리와 직접적으로 관련이 있으며, 데이터를 여러 노드에 분산시켜 처리 속도와 효율성을 높입니다. RDD 파티셔닝의 주요 목적은 네트워크 통신 비용을 최소화하면서 데이터 처리 작업을 최적화하는 것입니다.

 

spark 파티셔닝의 종류

- 해시 파티셔닝 (Hash Partitioning): 이 방법은 키를 기준으로 데이터를 파티션에 할당합니다. 각 키는 해시 함수를 통해 파티션 번호에 매핑되며, 이는 키가 동일한 데이터가 같은 파티션에 위치하도록 보장합니다. 이 방식은 키 기반의 집계나 조인 작업에 유용하게 사용됩니다.

 

 

- 범위 파티셔닝 (Range Partitioning): 범위 파티셔닝은 키의 범위를 기준으로 데이터를 파티션에 분배합니다. 이 방식에서는 모든 파티션이 특정 범위의 키를 포함하도록 데이터를 분할합니다. 이는 정렬된 데이터에 대한 작업을 최적화할 때 유용합니다.

 

 

 

 

파티션 관련 함수

1. partitionBy

  • 사용 대상: pairRDD (키-값 쌍을 가진 RDD)
  • 목적: 키-값 쌍을 포함하는 RDD를 특정 파티셔닝 스키마에 따라 재분배합니다. 일반적으로 해시 파티셔닝 또는 사용자 정의 파티셔너를 사용합니다.
  • 예제:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
rdd = sc.parallelize([(1, 'apple'), (2, 'banana'), (1, 'cherry'), (2, 'date')])
partitionedRDD = rdd.partitionBy(2) # 2개의 파티션으로 분할
print(partitionedRDD.glom().collect()) # 각 파티션의 내용을 출력

 

 

2. repartition

  • 목적: RDD, DataFrame, Dataset의 파티션 수를 재조정합니다. 셔플을 발생시키며, 데이터를 더 균일하게 분배할 때 사용됩니다.
  • 예제:
df = spark.read.csv("path/to/data.csv")
repartitionedDF = df.repartition(5) # 5개의 파티션으로 재분배

 

3. coalesce

  • 목적: repartition과 유사하지만, 주로 파티션 수를 줄일 때 사용하며 셔플을 최소화합니다. 효율적인 데이터 축소에 사용됩니다.
  • 예제:
coalescedRDD = repartitionedDF.rdd.coalesce(2) # 2개의 파티션으로 축소

 

4. repartitionAndSortWithinPartitions

  • 사용 대상: pairRDD
  • 목적: 특정 파티셔닝 스키마에 따라 RDD를 재분배하고, 각 파티션 내에서 데이터를 정렬합니다.
  • 예제:
sortedRDD = rdd.repartitionAndSortWithinPartitions(2)
print(sortedRDD.glom().collect()) # 각 파티션의 정렬된 내용을 출력

 

5. foreachPartition

  • 목적: 각 파티션에 대해 반복적으로 함수를 실행합니다. 이는 대량의 데이터를 DB에 저장할 때 등 파티션 단위로 작업을 최적화할 때 유용합니다.
  • 예제:
def f(iterator):
    for x in iterator:
        print(x)
rdd.foreachPartition(f)

 

6. glom

  • 목적: 각 파티션의 데이터를 배열로 변환하여, 파티션별 데이터 구조를 RDD로 반환합니다.
  • 예제:
print(rdd.glom().collect()) # 각 파티션의 데이터를 배열로 출력

 

7. lookup

  • 사용 대상: pairRDD
  • 목적: 주어진 키에 해당하는 모든 값을 반환합니다.
  • 예제:
lookupResult = rdd.lookup(1) # 키가 1인 모든 값 반환
print(lookupResult)

 

8. mapPartitions

  • 목적: 각 파티션에 대해 병렬 처리를 위한 함수를 적용합니다. map 함수와 비슷하지만, mapPartitions는 파티션 단위로 작동하여 전체 파티션의 데이터에 대해 한 번에 함수를 적용합니다. 이는 리소스 사용을 최적화하고 성능을 향상시킬 수 있습니다.
  • 예제:
def process(iterator):
    """파티션 내의 각 요소에 대해 2를 곱합니다."""
    return [x * 2 for x in iterator]

processedRDD = rdd.mapPartitions(process)
print(processedRDD.collect())

 

 

 

각 함수의 사용은 작업의 특성과 요구 사항에 따라 달라집니다.

예를 들어, 데이터셋의 파티션 수를 조정하려면 repartition 또는 coalesce를 사용할 수 있고,

각 파티션에 동일한 연산을 적용하고 싶다면 mapPartitions 또는 foreachPartition을 사용할 수 있습니다. 또한, partitionByrepartitionAndSortWithinPartitions 같은 함수를 사용해 데이터를 특정 방식으로 분배하거나 정렬할 수도 있습니다.