Tools/Spark

Spark DataSet

칼쵸쵸 2024. 5. 29. 18:40

 

Apache Spark에서 DataSet은 데이터를 강력하게 타입화된 방식으로 처리할 수 있는 분산 컬렉션입니다. DataSet API는 Spark 1.6에서 도입되었으며, DataFrame API와 RDD의 장점을 결합합니다. DataFrame이 제공하는 최적화된 실행 엔진을 이용하면서도, RDD처럼 컴파일 시간에 타입 안정성을 제공합니다.

 

python에서의 DataSet 예제

다음은 python를 사용한 DataSet 예제입니다. 이 예제에서는 간단한 클래스 Person을 정의하고 이를 이용하여 DataSet을 생성하고 조작하는 방법을 보여줍니다.

 

from pyspark.sql import SparkSession

def main():
    
    # SparkSession 초기화
    spark = SparkSession.builder \
        .appName("DataFrame Example") \
        .getOrCreate()
    
    # Person 데이터 클래스 정의 (Python에서는 namedtuple을 사용할 수 있습니다)
    from collections import namedtuple
    Person = namedtuple('Person', ['name', 'age'])
    
    # Person 객체 리스트 생성
    people = [Person("Alice", 28), Person("Bob", 22)]
    
    # 리스트로부터 DataFrame 생성
    peopleDF = spark.createDataFrame(people)
    
    # DataFrame 출력
    peopleDF.show()
    
    # SparkSession 종료
    spark.stop()

if __name__ == "__main__":
    main()

 

이 예제에서 peopleDS는 Person 타입의 DataSet이며, filter 메소드를 이용해 성인만 필터링하고 있습니다.

DataSet을 사용하는 상황

  • 타입 안정성이 요구되는 상황: DataSet은 컴파일 시간에 타입을 확인하므로 실행 중인 오류를 최소화할 수 있습니다. 이는 데이터와 관련된 작업을 수행할 때 오류를 사전에 방지할 수 있게 해줍니다.
  • 성능 최적화가 필요한 상황: Spark SQL의 Catalyst 최적화 엔진을 활용하여 DataFrame과 동일한 수준의 성능을 제공합니다. 이는 큰 데이터셋을 처리할 때 유리하며, 자동으로 불필요한 계산을 줄여주고 실행 계획을 최적화합니다.
  • 함수형 프로그래밍을 선호하는 경우: Scala 또는 Java와 같은 정적 타입 언어를 사용하는 경우, DataSet은 데이터 처리를 위해 함수형 방식을 적용할 수 있게 해줍니다. 이는 코드의 가독성과 유지 관리를 개선할 수 있습니다.

DataFrame과의 차이점

DataFrame과 DataSet은 모두 Spark SQL의 핵심 부분이지만 주요 차이점이 있습니다:

  1. 타입 안정성(Type Safety):
    • DataFrame은 런타임에만 스키마를 확인할 수 있는 동적 타입 API입니다. 이는 Python과 R에서 사용하기에 적합합니다.
    • DataSet은 컴파일 시간에 타입을 검사하는 정적 타입 API로, Scala와 Java에서 사용됩니다. 이는 타입 오류를 사전에 방지할 수 있게 해주며, IDE의 자동완성 기능 등을 효율적으로 활용할 수 있습니다.
  2. 성능:
    • DataFrame과 DataSet은 내부적으로 동일한 실행 엔진인 Catalyst 최적화 엔진과 Tungsten 실행 엔진을 사용합니다. 따라서 성능 차이는 거의 없으나, DataFrame은 DataSet보다 약간 더 최적화되어 있을 수 있습니다. 이는 DataFrame이 런타임에 타입을 해석하기 때문에 더 많은 최적화 기회를 갖기 때문입니다.
  3. 사용 용이성:
    • DataFrame은 더 유연하고 간단하게 사용할 수 있습니다. SQL과 유사한 연산자를 통해 쉽게 데이터를 조작할 수 있습니다.
    • DataSet은 타입 안정성과 자세한 데이터 처리 기능을 제공합니다. 복잡한 데이터 처리 작업을 수행할 때 오류 가능성을 줄일 수 있습니다.

 

DataSet의 장점

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset

// SparkSession 초기화
val spark = SparkSession.builder()
  .appName("Dataset Example")
  .master("local[*]") // 모든 사용 가능한 코어를 사용
  .getOrCreate()

// 암시적 변환을 위한 import
import spark.implicits._

// Person 케이스 클래스 정의
case class Person(name: String, age: Int)

// Seq[Person]으로부터 Dataset[Person]을 생성
val peopleDS: Dataset[Person] = Seq(Person("Alice", 28), Person("Bob", 22)).toDS()

// DataSet을 사용하여 특정 연령 이상의 사람들을 필터링
val adults = peopleDS.filter(_.age >= 18)

// 결과 출력
adults.show()

설명

  1. 타입 안정성과 정적 타입: Person 케이스 클래스는 name과 age라는 두 필드를 가집니다. 이 클래스는 Dataset[Person]의 타입을 명시적으로 정의하는 데 사용됩니다. 데이터셋 peopleDS는 Person 타입을 가지므로, 컴파일 시 타입 체크가 가능합니다. 이로 인해 잘못된 타입의 데이터 접근을 컴파일 단계에서 차단할 수 있습니다.
  2. 객체지향 프로그래밍 지원: DataSet API를 사용하여, 객체의 메소드(_.age)를 직접 호출하고, 객체지향적 방식으로 데이터를 필터링할 수 있습니다. 이는 DataSet이 각 데이터 항목을 Person 객체로 처리할 수 있음을 의미합니다.
  3. 성능 최적화: DataSet은 DataFrame의 최적화된 실행 엔진을 사용하면서도 타입 안정성을 제공합니다. 이는 성능과 안전성 사이의 좋은 균형을 제공합니다.

 

 

DataFrame의 최적화 이점

Spark의 최적화 엔진인 Catalyst는 실행 계획을 최적화할 때 다음과 같은 기술을 사용합니다:

  1. 프로젝션 푸시다운 (Projection Pushdown): 필요한 데이터만 읽어들임으로써 I/O를 최소화합니다.
  2. 필터 푸시다운 (Filter Pushdown): 데이터 소스 수준에서 필터를 적용하여 불필요한 데이터를 조기에 걸러냅니다.
  3. 조인 최적화: 조인 순서를 변경하여 더 효율적인 조인 실행 계획을 생성합니다.
  4. 물리적 데이터 소스와의 연동: Parquet 같은 저장소 형식을 이용할 때, 스키마 정보를 활용하여 데이터를 효율적으로 읽습니다.

예제: DataFrame 최적화

다음 예제는 DataFrame의 필터 푸시다운과 프로젝션 푸시다운을 보여주며, DataFrame과 Dataset의 차이를 간단히 비교합니다.

from pyspark.sql import SparkSession

# SparkSession 초기화
spark = SparkSession.builder \
    .appName("DataFrame Optimization Example") \
    .master("local") \  # 모든 사용 가능한 코어를 사용
    .getOrCreate()

# DataFrame 생성
peopleDF = spark.read.json("examples/src/main/resources/people.json")

# 특정 컬럼만 선택하고 조건에 맞는 데이터만 필터링
namesDF = peopleDF.select("name").filter("age > 30")

# 실행 계획을 출력해 최적화 확인
namesDF.explain()


이 코드에서 select와 filter는 DataFrame의 스키마 정보를 사용하여 실행됩니다. explain() 메소드를 호출하면 Spark가 이 쿼리를 어떻게 최적화하는지 볼 수 있습니다. 필요한 데이터만 읽고 필요한 필터를 적용하는 방식으로 최적화가 진행됩니다.

Dataset과의 비교

Dataset은 DataFrame과 다르게 타입 정보를 컴파일 시간에 체크합니다. 이는 타입 안정성을 제공하지만, 최적화에서는 DataFrame만큼 유연하지 않을 수 있습니다. Dataset은 타입 정보를 런타임에 접근할 수 없기 때문에, DataFrame이 제공하는 동적 스키마 최적화의 이점을 누리지 못합니다. 따라서 DataFrame은 동적 타이핑과 스키마 기반의 최적화를 통해 더 넓은 범위의 최적화 전략을 적용할 수 있습니다.

 

Projection Pushdown, Predicate Pushdown, Filter Pushdown 개념 설명

이 세 가지 최적화 기법은 데이터 처리 엔진이 불필요한 데이터를 조기에 제거하여 전체 쿼리 성능을 향상시키는 방법입니다.

  1. Projection Pushdown (프로젝션 푸시다운):
    • 이 최적화 기술은 쿼리에서 실제로 필요로 하는 컬럼(필드)만을 데이터 소스로부터 읽어들입니다. 예를 들어, 사용자가 여러 컬럼을 가진 테이블에서 몇 개의 컬럼만을 요청하는 경우, 전체 데이터 대신 필요한 컬럼만을 읽어 처리 시간과 I/O를 줄입니다.
  2. Predicate Pushdown (조건 푸시다운):
    • 조건 푸시다운은 쿼리의 조건(예: WHERE 절)을 가능한 한 데이터 소스 가까이에 적용하여, 필요한 데이터만을 처리 엔진으로 전송합니다. 예를 들어, 데이터베이스 엔진이나 외부 시스템에 미리 조건을 적용함으로써 전송되는 데이터의 양을 줄이고, 결과적으로 쿼리의 성능을 향상시킵니다.
  3. Filter Pushdown (필터 푸시다운):
    • 필터 푸시다운은 조건 푸시다운의 특별한 경우로, 특히 데이터 필터링 조건을 데이터 저장소 수준에서 적용하여, 적합하지 않은 데이터는 처리 단계로 전송되지 않게 합니다. 이는 네트워크 I/O를 줄이고, 쿼리의 처리 속도를 빠르게 합니다.

 

결론

DataSet은 정적 타입 언어를 사용하는 Spark 개발자에게 매력적인 선택이 될 수 있습니다. 타입 안정성과 성능 최적화를 동시에 제공하멀로, 대규모 데이터 처리 작업에서 신뢰성과 효율성을 높이고 싶을 때 특히 유용합니다. 반면, DataFrame은 다양한 언어를 지원하고 더 간단한 API를 제공하여 데이터 사이언스 작업에 더 적합할 수 있습니다. 사용 사례에 따라 적합한 도구를 선택하는 것이 중요합니다.

 

 

Apache Spark에서 DataFrame이 Dataset보다 런타임에 타입을 해석할 수 있어 최적화에 더 유리하다는 주장은 DataFrame의 구조적 API가 타입 정보를 런타임까지 유지하지 않고 SQL처럼 동작하기 때문입니다. DataFrame은 내부적으로 스키마를 이용해 데이터를 관리하며, 이 스키마 정보를 사용하여 쿼리 실행 계획을 최적화합니다.