Tools/Spark

Spark와 RDD

칼쵸쵸 2023. 8. 18. 19:43

RDD, DataFrame,DataSet

https://www.databricks.com/kr/glossary/what-is-rdd

Resilient Distributed Dataset (RDD) Apache Spark에서 데이터 처리를 위한 기본 추상화입니다. Spark 대규모 데이터 처리를 위한 분산 컴퓨팅 프레임워크로, RDD 이를 구성하는 핵심 개념 하나입니다. RDD 데이터의 불변성과 분할 가능성을 결합하여 데이터 처리를 효율적으로 수행할 있도록 돕습니다.

 

다음은 RDD 대한 주요 특징과 개념입니다:

  • 불변성 (Immutability): RDD는 한 번 생성되면 변경할 수 없습니다. 즉, 데이터는 읽기 전용이며 수정이 불가능합니다. 이로써 데이터의 안정성과 일관성을 보장합니다.
  • 분할 가능성 (Partitioning): RDD는 논리적으로 분할된 여러 개의 파티션으로 구성됩니다. 각 파티션은 클러스터의 다른 머신에서 병렬로 처리될 수 있습니다. 이로써 데이터 처리 성능이 향상됩니다.
  • 장애 허용성 (Resilience): RDD는 장애 복구 메커니즘을 통해 데이터의 손실을 방지합니다. 각 RDD는 자신을 생성한 기반 데이터나 연산의 로그를 저장하여 데이터 손실 시 재계산할 수 있도록 합니다.
  • 지연 연산 (Lazy Evaluation): RDD의 연산은 지연 평가 방식으로 처리됩니다. 즉, 어떤 연산이 수행되기 전까지는 실제로 계산되지 않습니다. 이는 최적화된 실행 계획을 생성하고 필요한 연산만 수행하여 성능을 향상시킵니다.

Apache Spark 최근에는 RDD 이외에도 DataFrame Dataset 같은 고수준의 추상화도 제공하며, 이러한 추상화들은 구조화된 데이터 처리를 편리하게 있는 기능을 제공합니다.

 

* RDD는 Java의 Collection과 비슷한 개념을 가지지만, 분산 환경에서 대용량 데이터를 처리하고 불변성을 유지하며 장애 복구 기능을 제공하는 등의 차이점을 가지고 있습니다.

 

DataFrame:

 

DataFrame RDD 확장 개념으로서, 구조화된 데이터를 처리하기 위한 고수준의 추상화입니다.

DataFrame 스키마(schema) 가지고 있어 컬럼의 데이터 타입을 정의하고 이름으로 데이터에 접근할 있습니다.

Catalyst 옵티마이저를 사용하여 논리적 최적화 물리적 최적화를 수행하여 성능을 향상시킵니다.

DataFrame SQL 쿼리와 유사한 문법을 사용하여 데이터 조작이 가능하며, SQL 엔진을 사용하여 처리할 있습니다.

스칼라, 자바, 파이썬, R 다양한 언어에서 사용 가능하며, SQL 사용자에게 친숙합니다.

스파크 2.0부터는 Datasets DataFrame 유사한 형태로 통합되어 개선되었습니다.

 

Dataset:

Dataset DataFrame RDD 장점을 결합한 개념입니다. (스파크 1.6 버전 이후에 도입되었습니다)

구조화된 데이터 처리와 타입 안전성을 함께 제공합니다.

정적 타입 언어인 스칼라와 자바에서 사용될 컴파일 타임에 타입 오류를 확인할 있어 안전한 코드 작성을 도와줍니다.

데이터 프레임과 같이 Catalyst 옵티마이저를 사용하여 최적화를 수행합니다.

RDD DataFrame보다 약간의 오버헤드가 있을 있으며, 스파크 2.0 이상에서 사용할 있습니다.

DataFrame 유사한 SQL-like 문법을 사용하여 데이터를 처리할 있습니다.

 

 

RDD 다양한 방법으로 생성할  있습니다. 파일에서 데이터를 읽어오거나 메모리에 있는 데이터를 활용하거나 다른 RDD로부터 변환을 수행하여 새로운 RDD 생성할  있습니다. 

 

RDD 연산 

변환 연산(Transformation Operations): 

map, filter, flatMap: 요소별로 함수를 적용하거나, 조건에 맞게 요소를 선택하거나, 요소를 펼치는 연산 등을 수행합니다.

reduceByKey, groupByKey, join: -  형태의 RDD 대해 그룹핑이나 결합 연산을 수행합니다.

 

액션 연산(Action Operations): 

count, collect, take: RDD 요소 수를 세거나, 모든 요소를 수집하거나, 처음  개의 요소를 가져오는  결과를 반환합니다.

saveAsTextFile, saveAsSequenceFile: RDD 내용을 파일로 저장합니다.

 

  1. 메모리에서 연산 (In-Memory Operations): 많은 스파크 연산은 메모리 상에서 수행됩니다. 예를 들어, 맵(map), 필터(filter), 그룹화(groupBy), 조인(join)과 같은 연산은 데이터를 파티션 별로 로드하여 메모리 상에서 처리합니다. 이렇게 하면 중간 결과를 메모리에 캐싱하거나 재사용함으로써 성능을 향상시킬 수 있습니다.
  2. 디스크에서 연산 (Disk-Based Operations): 하지만 모든 데이터를 메모리에 로드하기 어려운 경우도 있습니다. 특히 데이터 크기가 매우 큰 경우나 메모리 자원이 제한된 경우, 스파크는 디스크에서 데이터를 읽어와 연산을 수행할 수 있습니다. 이 경우 데이터는 필요할 때마다 디스크에서 로드되어 처리되며, 메모리 부족 상황을 피할 수 있습니다.
  • spark.memory.fraction: 이 환경 변수는 스파크 애플리케이션이 사용하는 메모리 중 실행 메모리에 할당되는 비율을 설정합니다. 값은 0과 1 사이의 실수로 지정하며, 예를 들어 0.6은 60%의 메모리를 실행 메모리로 할당함을 의미합니다. 
  • spark.memory.storageFraction: 이 변수는 스파크의 메모리 내 스토리지(RDD 데이터 등)에 할당되는 비율을 설정합니다. 보통 실행 메모리와 스토리지 메모리 사이의 균형을 맞추는 데 사용됩니다. 예를 들어 0.4는 40%의 메모리를 스토리지 메모리로 할당함을 나타냅니다. 
  • spark.memory.offHeap.enabled: 이 환경 변수는 오프-힙(off-heap) 메모리 사용 여부를 설정합니다. 오프-힙 메모리는 JVM 힙 공간 외부의 메모리를 사용하여 가비지 컬렉션 영향을 줄일 수 있습니다. 
  • spark.memory.offHeap.size: 오프-힙 메모리에 할당되는 크기를 지정합니다. 기본 단위는 메가바이트(MB)이며, 예를 들어 spark.memory.offHeap.size=1024는 1GB의 오프-힙 메모리를 할당합니다.

 

spark.local.dir: 로컬 디스크 디렉토리를 설정합니다. 스파크는 임시 파일 데이터를 저장하기 위해 디렉토리를 사용합니다.

  • 작업 실행 계획 확인: 스파크의 실행 계획을 확인하여 어떤 연산이 메모리에서 처리되는지 파악할 수 있습니다. 실행 계획에서 메모리 캐시에 저장된 테이블이나 데이터셋을 찾는 것이 연산이 메모리에서 이루어졌는지 확인하는 지표가 될 수 있습니다. 
  • 스파크 UI 사용: 스파크의 웹 기반 UI를 사용하여 클러스터에서 각 작업 및 스테이지의 메모리 사용량과 디스크 I/O 등을 모니터링할 수 있습니다. 메모리 및 디스크 사용률을 분석하여 어떤 연산이 메모리에서 처리되고 있는지 확인할 수 있습니다.
  • 자원 상태 및 구성 고려: 클러스터의 자원 상태와 구성도 연산 처리 방식에 영향을 줄 수 있습니다. 만약 클러스터가 메모리 부족 상태이거나 사용 가능한 메모리가 제한적이라면 스파크는 일부 연산을 디스크에서 수행할 가능성이 높습니다. 
  • 최적화 옵션 및 메서드 사용: 스파크는 연산 처리를 최적화하기 위한 다양한 옵션과 메서드를 제공합니다. 캐싱, 체크포인트, 메모리 사용량 조절 등을 통해 메모리 및 디스크 활용 방식을 조정할 수 있습니다. 
  • 실행 속도 및 성능 분석: 메모리에서 수행되는 연산은 일반적으로 디스크에서 수행되는 연산보다 훨씬 빠른 결과를 제공할 수 있습니다. 따라서 실행 속도나 성능을 분석하여 어떤 연산이 메모리에서 처리되는지를 확인할 수 있습니다.

 

잡, 스테이지, 테스크

 

스파크에서의 (Jobs), 스테이지(Stages), 태스크(Tasks) 클러스터 내에서 작업을 조직화하고 실행하는 단위를 나타내는 용어입니다. 이들은 스파크의 실행 모델을 이해하는데 중요한 역할을 합니다.

 

  • 잡 (Jobs): 잡은 스파크 애플리케이션의 최상위 논리적 단위입니다. 하나의 스파크 애플리케이션은 여러 개의 잡으로 구성될 수 있습니다. 잡은 사용자 코드에서 작업을 스케줄링하고 실행하는 단위로, 애플리케이션 내에서 고유한 식별자를 가집니다. 예를 들어, 사용자가 RDD 변환 작업을 호출하는 것은 새로운 잡을 생성하는 것입니다. 
  • 스테이지 (Stages): 스테이지는 잡을 더 작은 단위로 분할한 것으로, 연산 그래프에서의 단계를 의미합니다. 스테이지는 크게 두 종류로 나뉩니다: 셔플(shuffle) 스테이지와 셔플이 없는 스테이지입니다. 셔플 스테이지는 셔플 작업을 필요로 하는 스테이지이며, 셔플이 없는 스테이지는 데이터 셔플이 필요 없는 스테이지입니다. 
  • 각 스테이지는 데이터 의존성에 따라 순차적으로 실행됩니다. 먼저 셔플 스테이지가 실행되고, 그 다음에 셔플이 없는 스테이지가 실행됩니다. 이렇게 스테이지 단위로 분할함으로써 작업의 병렬 처리와 최적화를 촉진합니다. 
  • 태스크 (Tasks): 태스크는 각 스테이지 내에서 실제로 작업을 수행하는 단위입니다. 스파크 클러스터의 익스큐터 노드에서 실행되며, 데이터를 로드하고 변환하며, 결과를 생성합니다. 각 스테이지는 여러 개의 태스크로 분할되어 병렬로 실행됩니다. 태스크는 데이터의 분할(partition) 별로 실행되며, 스파크 클러스터의 리소스를 효율적으로 활용하여 처리합니다.

 

요약하자면, 스파크에서의 잡은 애플리케이션의 최상위 논리적 단위이며, 스테이지는 작업을 작은 단위로 분할한 것입니다. 스테이지는 셔플 스테이지와 셔플이 없는 스테이지로 나뉘며, 스테이지 내에서 실제 작업이 수행되는 단위가 태스크입니다. 이들은 스파크의 작업 실행과 최적화에 중요한 역할을 합니다.

 

import org.apache.spark.sql.SparkSession

// Create a Spark Session
val spark = SparkSession.builder
  .appName("Spark Job Stage Task Example")
  .getOrCreate()

// Read a CSV file - this is a transformation and doesn't trigger a job
val data = spark.read.option("header", "true").csv("path/to/your/file.csv")

// Perform a transformation to create a new DataFrame with an added column
// This also doesn't trigger a job, as it's a transformation (not an action)
val transformedData = data.withColumn("new_column", data("existing_column") * 2)

// Now, call an action - this triggers a Spark job
val result = transformedData.count() 

println(result)

spark.stop()

1. 액션을 호출할 (Job) 트리거됩니다. Spark 작업을 실행하기 위해 태스크를 스케줄합니다.

 

2. 스테이지(Stage) 변환(transformations) 기반하여 생성됩니다. 예시에서는 "read.csv" "withColumn"이라는 개의 변환 작업이 있습니다. 그러나 변환이 같은 스테이지에 속합니다. 왜냐하면 이들 사이에 데이터 셔플이 없기 때문입니다.

 

3. 태스크(Task) 작업의 가장 작은 단위로, 하나의 익스큐터(Executor) 전송됩니다. 태스크의 개수는 데이터 파티션의 개수에 따라 달라집니다. 태스크는 데이터의 작은 조각에 대해 변환 작업을 수행합니다.

 

https://medium.com/p/fa9fae40c3ed

 

  • Catalyst Optimizer: 스파크는 "Catalyst Optimizer"라는 고급 최적화 프레임워크를 사용하여 스파크 작업의 실행 계획을 최적화합니다. Catalyst는 계산 계획에 논리적 최적화를 적용하며, 이는 예를 들어 조건부 미루기(predicate pushdown) 및 상수 폴딩(constant folding)과 같은 최적화를 포함합니다. 이후에는 물리적 최적화가 수행되며, 조인 알고리즘 및 데이터 구조에 대한 결정을 내립니다. Catalyst의 작동 방식을 이해하면 더 효율적인 변환 작업을 작성하는 데 도움이 됩니다. 
  • Narrow와 Wide 변환:  "Narrow" 변환은 map이나 filter와 같이 데이터를 파티션 간에 셔플하지 않고 처리하는 변환을 의미합니다. 반면에 "Wide" 변환은 groupByKey나 reduceByKey와 같이 데이터 셔플이 필요한 변환입니다. "Wide" 변환을 최소화하면 성능을 크게 향상시킬 수 있습니다.

예시)

Stage 1 → 집에서 은행까지 차로 가는 과정

  • 시작하는 차량: 하나의 태스크
  • GPS를 시작하는 것: 다른 하나의 태스크
  • 도로를 운전하는 것: 또 다른 하나의 태스크

Stage 2 → 은행으로 들어가는 과정

  • 차에서 내려오는 것: 하나의 태스크
  • 은행까지 걸어가는 것: 다른 하나의 태스크
  • 창구 창구로 이동하는 것: 또 다른 하나의 태스크

Stage 3 → 창구에서 현금 인출하는 과정

  • 텔러에게 직불 카드를 제공하는 것: 하나의 태스크
  • 현금 인출 금액 요청: 다른 하나의 태스크
  • 돈을 받는 것: 또 다른 하나의 태스크

예시에서 Stage 3 Stage 2 완료를 기다리며, Stage 2 Stage 1 완료를 기다리는 종속성 체인이 있습니다. 이런 종속성 체인은 셔플링이나 와이드 변환과 관련된 경우에 일반적으로 나타나며, 이러한 패턴을 "ShuffleMapStage"라고도 부르곤 합니다. 셔플 작업은 서로에게 종속성이 있는 스테이지를 구분짓는 경계를 형성하기 때문입니다.

 

또한, 출금한 돈을 사용하여 빌을 지불하는 것을 최종 스테이지로 생각하는 것은 "ResultStage" 개념과 일치합니다. ResultStage 결과를 생성하는 액션을 수행하는 스파크 작업의 마지막 스테이지를 나타냅니다. 예시에서 최종 스테이지는 출금한 현금을 사용하여 결제를 완료하는 것으로, 전체 변환과 액션의 시퀀스의 최종 목표를 대표합니다.

 

스테이지 종속성 ShuffleMapStage ResultStage 차이점에 대한 개념은 복잡한 데이터 처리 워크플로우를 효율적으로 구성하고 실행하는 방법을 이해하는 기본적입니다.

 

애플리케이션 컴포넌트

Application Spark 구축된 사용자 프로그램클러스터의 드라이버 프로그램  실행기  구성됩니다 .
Application jar 사용자의 Spark 애플리케이션이 포함된 jar입니다경우에 따라 사용자는 종속성과 함께 애플리케이션을 포함하는 "uber jar" 생성하려고 합니다사용자의 jar에는 Hadoop 또는 Spark 라이브러리가 포함되어서는 되지만 런타임 추가됩니다.
Driver program 애플리케이션의 main() 함수를 실행하고 SparkContext 생성하는 프로세스
Cluster manager 클러스터에서 리소스를 획득하기 위한 외부 서비스(: 독립형 관리자, Mesos, YARN, Kubernetes)
Deploy mode 드라이버 프로세스가 실행되는 위치를 구별합니다. "클러스터" 모드에서 프레임워크는 클러스터 내부의 드라이버를 시작합니다. "클라이언트" 모드에서 제출자는 클러스터 외부에서 드라이버를 시작합니다.
Worker node 클러스터에서 애플리케이션 코드를 실행할 있는 모든 노드
Executor 작업을 실행하고 데이터를 메모리 또는 디스크 스토리지에 보관하는 작업자 노드의 애플리케이션에 대해 시작된 프로세스입니다 응용 프로그램에는 자체 실행기가 있습니다.

 

'Tools > Spark' 카테고리의 다른 글

RDD의 생성과 데이터 처리  (1) 2024.03.11
스파크 클러스터 동작방식  (0) 2024.03.07
Spark 클러스터 환경 구성과 실행  (0) 2024.02.19
빅데이터, 하둡 및 Spark 소개  (0) 2024.02.18
Apache Spark 설정 및 튜닝  (0) 2023.10.17