Tools/Spark

RDD의 생성과 데이터 처리

칼쵸쵸 2024. 3. 11. 23:18

 

[ 목차 ]

     

    RDD(Resilient Distributed Dataset)는 Apache Spark의 핵심 개념 중 하나로, 불변성을 가지며 분산된 데이터 컬렉션을 나타냅니다. RDD는 데이터를 메모리에서 효율적으로 처리하고, 병렬로 연산을 수행할 수 있도록 설계되었습니다. 이는 Spark가 대규모 데이터를 빠르게 처리할 수 있는 주요 이유 중 하나입니다.

    RDD의 주요 특징

    • 불변성(Immutability): 한 번 생성된 RDD는 변경될 수 없습니다. 데이터를 변형하려면, 변형 연산을 적용하여 새로운 RDD를 생성해야 합니다.
    • 복원력(Resilience): RDD는 고장 내성을 가지고 있으며, 데이터 손실 시 원본 데이터로부터 자동으로 복구될 수 있습니다. 이는 RDD의 lineage(계보) 정보를 통해 가능하며, lineage는 RDD의 모든 변형 연산을 추적합니다.
    • 분산 처리(Distributed): 데이터는 클러스터의 여러 노드에 걸쳐 분산되어 저장되며, 연산 또한 병렬로 수행됩니다.
    • 내재된 병렬성(Inherent Parallelism): RDD 연산은 클러스터의 여러 노드에서 동시에 수행될 수 있어, 대규모 데이터셋을 효율적으로 처리할 수 있습니다.

     

    RDD 생성과 데이터 처리

    RDD를 생성하는 주요 방법은 두 가지가 있습니다:

    1. 외부 데이터 소스에서의 로드: SparkContext를 사용하여 파일 시스템(HDFS, S3 등), 데이터베이스 또는 기타 데이터 소스에서 데이터를 로드하여 RDD를 생성할 수 있습니다.
    2. 드라이버 프로그램의 컬렉션에서의 병렬화: SparkContext의 parallelize 함수를 사용하여 드라이버 프로그램의 기존 컬렉션(예: 리스트, 배열)을 RDD로 변환할 수 있습니다.
    from pyspark.sql import SparkSession
    
    # Spark 세션 초기화
    spark = SparkSession.builder \
        .appName("Save RDD as CSV") \
        .getOrCreate()
    
    # SparkContext 가져오기
    sc = spark.sparkContext
    
    # 임의의 데이터 생성을 위한 RDD
    data = [(i, i + 10000) for i in range(10000)]  # [(0, 100), (1, 101), ..., (99, 199)]
    rdd = sc.parallelize(data)
    
    # RDD의 각 요소를 CSV 형식의 문자열로 변환
    rdd_csv = rdd.map(lambda x: f"{x[0]},{x[1]}")
    
    # CSV 형태의 RDD를 파일로 저장
    rdd_csv.saveAsTextFile("/home1/irteam/test_rdd")
    
    # Spark 세션 종료
    spark.stop()

    RDD를 사용하여 CSV로 저장할 때 주의해야 할 점은, 저장된 파일의 각 부분(part)이 클러스터의 다른 노드에 분산되어 저장될 수 있다는 것입니다. 따라서, 최종적으로 단일 CSV 파일을 얻기 위해서는 추가적인 단계(예: Hadoop의 getmerge 사용 또는 다른 파일 시스템 병합 도구 사용)가 필요할 수 있습니다.

     

    마스터에는_SUCCESS 파일이 들어갑니다. _SUCCESS 파일은 Spark 작업이 성공적으로 완료되었음을 나타내는 빈 파일입니다. 실제 데이터는 part-xxxxx 파일들에 저장됩니다.

     

    저장 과정은 다음과 같이 진행됩니다:

     

    분산 저장: 각 Spark 작업 노드는 자신이 처리한 데이터 파티션의 일부를 로컬에 저장합니다. 따라서, 최종적으로 생성되는 파일은 여러 파트로 나뉘어 있게 됩니다. 예를 들어, part-00000, part-00001 등의 형식으로 파일이 생성됩니다. 이는 Spark가 데이터를 병렬로 처리하고, 각 노드가 독립적으로 작업을 수행하기 때문입니다.

    특정 RDD 구현

    1. 페어 RDD (Pair RDD)

    페어 RDD는 키-값 쌍으로 구성된 RDD로, 분산 환경에서 키에 따른 연산을 용이하게 합니다. 예를 들어, reduceByKey, groupByKey, sortByKey 같은 키-값 기반의 변환을 수행할 때 사용됩니다.

    만드는 방법:

    from pyspark import SparkContext
    sc = SparkContext.getOrCreate()
    
    data = [("apple", 2), ("banana", 1), ("apple", 1)]
    pairRDD = sc.parallelize(data)

    2. 더블 RDD (Double RDD)

    더블 RDD는 숫자 데이터를 포함하는 RDD로, 통계적 연산이나 수학적 연산을 쉽게 수행할 수 있게 합니다. 예를 들어, mean, sum, max, min 등의 연산을 지원합니다.

    일반 RDD와 마찬가지로 parallelize를 사용해서 생성합니다.

    만드는 방법:

    numbers = [1, 2, 3, 4, 5]
    doubleRDD = sc.parallelize(numbers)

     

    3. 데이터 프레임 (DataFrame)

    데이터 프레임은 관계형 데이터베이스의 테이블과 유사한 구조를 가진 분산 데이터 컬렉션입니다. 각 컬럼은 이름과 타입을 가지며, SQL 쿼리, 데이터 소스와의 통합, 복잡한 데이터 처리 등 고수준의 데이터 조작을 지원합니다.

    만드는 방법:

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder.appName("example").getOrCreate()
    data = [("James", 34), ("Anna", 20), ("Joe", 50)]
    columns = ["Name", "Age"]
    
    df = spark.createDataFrame(data, schema=columns)

    4. 시퀸스파일 RDD (SequenceFile RDD)

    시퀸스파일은 Hadoop의 바이너리 파일 형식으로, 키-값 쌍으로 데이터를 저장합니다. 시퀸스파일 RDD는 이러한 시퀸스파일을 읽거나 쓸 때 사용됩니다.

     

    만드는 방법 (시퀸스파일 읽기):

    sequenceFileRDD = sc.sequenceFile("path/to/sequenceFile")

     

    시퀸스파일 쓰기:

    pairRDD.saveAsSequenceFile("output/path")

    각 데이터 구조는 Spark에서 데이터를 다루는 특정 시나리오에 맞춰 설계되었습니다. 예를 들어, 페어 RDD와 더블 RDD는 저수준 API에서 복잡한 데이터 처리를 할 때 유용하며, 데이터 프레임은 구조화된 데이터를 고수준에서 조작할 때 강력한 성능을 발휘합니다. 시퀸스파일 RDD는 Hadoop 에코시스템과의 통합이 필요한 경우에 주로 사용됩니다.

     

    5. 하둡 RDD (Hadoop RDD)

    하둡 RDD는 Hadoop의 InputFormat을 사용하여 Hadoop 파일 시스템(HDFS)이나 다른 하둡 지원 파일 시스템에서 데이터를 읽기 위한 RDD입니다. 이는 Spark에서 Hadoop MapReduce 작업과 유사한 방식으로 데이터를 처리할 수 있게 합니다. 하둡 RDD는 구식 API인 Hadoop 1.x 버전의 InputFormat을 지원합니다.

    만드는 방법 예시:

    from pyspark import SparkContext
    sc = SparkContext.getOrCreate()
    
    # Hadoop RDD 생성
    hadoopRDD = sc.hadoopFile("hdfs://path/to/file",
                              'org.apache.hadoop.mapred.TextInputFormat',
                              'org.apache.hadoop.io.LongWritable',
                              'org.apache.hadoop.io.Text')

    6. 뉴하둡 RDD (NewHadoop RDD)

    뉴하둡 RDD는 Hadoop의 새로운 InputFormat 클래스를 사용하여 데이터를 읽기 위한 RDD입니다. 이는 Hadoop 2.x에서 도입된 새로운 API를 지원하며, 하둡 RDD에 비해 더 많은 기능과 유연성을 제공합니다. 뉴하둡 RDD를 사용하면, Hadoop의 새로운 InputFormat을 이용하여 데이터를 읽고 Spark에서 처리할 수 있습니다.

    만드는 방법 예시:

    # NewHadoop RDD 생성
    newHadoopRDD = sc.newAPIHadoopFile("hdfs://path/to/file",
                                       'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
                                       'org.apache.hadoop.io.LongWritable',
                                       'org.apache.hadoop.io.Text')

    7. 코그룹 RDD (CoGrouped RDD)

    코그룹 RDD는 두 개 이상의 RDD를 키에 따라 그룹화하여 각 키에 대한 값의 그룹을 포함하는 RDD입니다. 이는 SQL의 JOIN 연산과 유사한 방식으로 작동하며, 여러 데이터 소스로부터 동일한 키를 가진 데이터를 모아 처리할 때 유용합니다. 코그룹 연산을 통해 생성된 RDD는 각 키에 대해 여러 RDD의 값 리스트를 포함합니다.

    만드는 방법 예시:

    rdd1 = sc.parallelize([("apple", 2), ("banana", 1), ("orange", 4)])
    rdd2 = sc.parallelize([("apple", 3), ("banana", 2), ("apple", 1)])
    
    # 코그룹 연산
    coGroupedRDD = rdd1.cogroup(rdd2)
    
    # 결과 출력
    for key, value in coGroupedRDD.collect():
        print(key, [list(v) for v in value])

     

    각 데이터 구조는 Spark에서 데이터를 다루는 특정 시나리오에 맞춰 설계되었습니다. 예를 들어, 페어 RDD와 더블 RDD는 저수준 API에서 복잡한 데이터 처리를 할 때 유용하며, 데이터 프레임은 구조화된 데이터를 고수준에서 조작할 때 강력한 성능을 발휘합니다. 시퀸스파일 RDD는 Hadoop 에코시스템과의 통합이 필요한 경우에 주로 사용됩니다.

     

    이러한 RDD들은 Spark와 Hadoop의 통합을 강화하며, 다양한 형태와 출처의 데이터를 효율적으로 처리할 수 있도록 해줍니다. 하둡 RDD와 뉴하둡 RDD는 Hadoop 파일 시스템에서 데이터를 읽는 데 특화되어 있으며, 코그룹 RDD는 다양한 데이터 소스로부터 수집된 데이터를 키에 따라 그룹화하여 복잡한 분석을 수행할 때 유용합니다.

     

     

     

     

     

    'Tools > Spark' 카테고리의 다른 글

    Spark RDD 연산 예제 모음  (0) 2024.03.13
    Spark RDD 연산 주요 개념  (0) 2024.03.11
    스파크 클러스터 동작방식  (0) 2024.03.07
    Spark 클러스터 환경 구성과 실행  (0) 2024.02.19
    빅데이터, 하둡 및 Spark 소개  (0) 2024.02.18