Tools/Spark

스파크 클러스터 동작방식

칼쵸쵸 2024. 3. 7. 20:17

yarn에 대한 설명 : https://chalchichi.tistory.com/79

애플리케이션 컴포넌트

  • Application: Spark에서 구동되는 사용자 정의 프로그램으로, 데이터 처리를 위해 클러스터 상의 드라이버 프로그램과 실행기(Executors)로 구성됩니다.
  • Driver program: 애플리케이션의 진입점으로, main() 함수를 실행하고 데이터 처리 작업을 관리하는 SparkContext를 생성합니다.
  • Cluster manager: 클러스터 상에서 리소스(예: 메모리, CPU 등)를 관리하고 할당하는 역할을 하는 외부 서비스입니다. 예로는 Apache Mesos, Hadoop YARN, Kubernetes, Spark 자체의 독립 실행 모드(Standalone) 등이 있습니다.
  • Deploy mode: Spark 애플리케이션을 실행할 때 드라이버 프로그램의 위치를 결정하는 모드입니다. "클러스터" 모드에서는 드라이버가 클러스터 내부의 노드 중 하나에서 실행되고, "클라이언트" 모드에서는 제출하는 사용자의 머신 또는 클러스터 외부의 노드에서 실행됩니다.
  • Worker node: 클러스터 내에서 실제 데이터 처리 작업을 수행하는 노드입니다. Spark 애플리케이션의 실행기가 배치되어 작업을 처리합니다.
  • Executor: Spark 애플리케이션의 구성 요소로, 실제 데이터 처리 작업을 실행하는 노드(Worker node) 상의 프로세스입니다. 각 애플리케이션은 하나 이상의 실행기를 가지며, 이들은 데이터를 처리하고 결과를 저장하는 데 사용됩니다.

 Spark Context 와 Spark Session

SparkContext

  • SparkContext는 Spark 1.x에서 도입되었으며, Spark 애플리케이션의 "진입점" 역할을 합니다. Spark 작업을 실행하기 위한 기본 설정 정보를 포함하고, Spark 클러스터와의 연결을 관리합니다.
  • RDD(Resilient Distributed Dataset)와 같은 저수준 API 작업을 수행하는 데 사용됩니다.
  • SparkContext를 사용하여 Spark 작업을 제어하고, 클러스터 리소스를 할당하며, RDDs를 생성하고 조작할 수 있습니다.

SparkSession

  • SparkSession은 Spark 2.0에서 도입되었으며, SparkContext와 SQLContext, HiveContext의 기능을 통합하여 단일 진입점을 제공합니다. SparkSession은 Spark 2.0 이후의 버전에서 Spark 애플리케이션을 개발하는 데 권장되는 방식입니다.
  • DataFrame과 Dataset API를 사용하여 구조화된 데이터를 처리하는 데 사용됩니다. 이는 RDD보다 높은 수준의 추상화를 제공하며, 최적화된 실행 계획을 자동으로 생성하는 Catalyst 옵티마이저와 Tungsten 실행 엔진을 활용합니다.
  • SparkSession을 통해 SQL 쿼리를 실행하고, 데이터를 읽고 쓰며, 데이터베이스와 테이블을 관리할 수 있습니다.

주요 차이점

  • 통합된 진입점: SparkSession은 Spark 애플리케이션에서 단일 진입점을 제공하여 SparkContext, SQLContext 및 HiveContext의 기능을 통합합니다. 반면, SparkContext는 RDD와 저수준 API에 초점을 맞춥니다.
  • API 수준: SparkSession은 구조화된 데이터 처리를 위한 고수준 API(DataFrame, Dataset)를 제공합니다. SparkContext는 RDD와 같은 저수준 API를 다룹니다.
  • SparkSession은 Spark 2.0 이후의 애플리케이션 개발에 있어 더 권장되는 접근 방식입니다. 그러나 RDD와 같은 저수준 API를 직접 다루어야 하는 경우 SparkContext를 여전히 사용할 수 있습니다.

SparkSessionSparkContext의 관계

  • SparkSession은 내부적으로 SparkContext를 포함하고 있습니다. 즉, SparkSession을 통해 SparkContext에 접근할 수 있으며, Spark 2.0 이상에서는 SparkSession을 사용하는 것이 권장됩니다.
  • Spark 2.0 이상에서는 대부분의 작업을 SparkSession을 통해 수행할 수 있으며, RDD를 직접 조작할 필요가 있는 경우에만 SparkContext를 직접 사용합니다.
  • SparkSession을 사용하면 SparkContext, SQLContext, HiveContext 등 이전 버전의 Spark에서 사용되던 여러 컨텍스트를 하나로 통합하여 사용할 수 있기 때문에, 애플리케이션을 더 간결하고 효율적으로 작성할 수 있습니다.
  • 간단히 말해서, SparkSession은 Spark 2.0 이후의 모든 기능을 위한 통합된 진입점이며, SparkContext는 RDD와 저수준 API에 초점을 맞춘 초기 버전의 Spark 진입점입니다. Spark 애플리케이션 개발 시 Spark 2.0 이상을 사용한다면, SparkSession을 주로 사용하는 것이 좋습니다
  • 사용 예
# SparkContext 사용 예
from pyspark import SparkContext
sc = SparkContext(master="local", appName="Example")
rdd = sc.parallelize([1, 2, 3, 4, 5])
#SparkSession 사용 예
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Example") \
    .getOrCreate()
df = spark.read.json("examples/src/main/resources/people.json")

 

 

내부 구조 

@InterfaceStability.Stable
object SparkSession extends Logging {

  /**
   * Builder for [[SparkSession]].
   */
  @InterfaceStability.Stable
  class Builder extends Logging {

    private[this] val options = new scala.collection.mutable.HashMap[String, String]

    private[this] val extensions = new SparkSessionExtensions

    private[this] var userSuppliedContext: Option[SparkContext] = None

    private[spark] def sparkContext(sparkContext: SparkContext): Builder = synchronized {
      userSuppliedContext = Option(sparkContext)
      this
    }

/// 생략

    def getOrCreate(): SparkSession = synchronized {
      assertOnDriver()
      // Get the session from current thread's active session.
      var session = activeThreadSession.get()
      if ((session ne null) && !session.sparkContext.isStopped) {
        options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
        if (options.nonEmpty) {
          logWarning("Using an existing SparkSession; some configuration may not take effect.")
        }
        return session
      }
      
/// 생략

        val sparkContext = userSuppliedContext.getOrElse {
          val sparkConf = new SparkConf()
          options.foreach { case (k, v) => sparkConf.set(k, v) }

          // set a random app name if not given.
          if (!sparkConf.contains("spark.app.name")) {
            sparkConf.setAppName(java.util.UUID.randomUUID().toString)
          }

          SparkContext.getOrCreate(sparkConf)
          // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
        }

 

SparkSession 객체는 싱글턴 객체를 제공하지만 jvm 내에서 다른 SparkSession이 생성 되는것을 허용합니다.

SparkSession 객체에 SparkContext가 할당되며 따로 지정하지 않을시에는 현재 존재하는 다른 Session을 참고하여 context를 가져오며 다른 객체가 없을 시에는 SparkContext Object를 가져옵니다.

class SparkContext(config: SparkConf) extends Logging {

  // The call site where this SparkContext was constructed.
  private val creationSite: CallSite = Utils.getCallSite()

  // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
  private val allowMultipleContexts: Boolean =
    config.getBoolean("spark.driver.allowMultipleContexts", false)

  // In order to prevent multiple SparkContexts from being active at the same time, mark this
  // context as having started construction.
  // NOTE: this must be placed at the beginning of the SparkContext constructor.
  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
  
    private def assertNoOtherContextIsRunning(
      sc: SparkContext,
      allowMultipleContexts: Boolean): Unit = {
    SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
      Option(activeContext.get()).filter(_ ne sc).foreach { ctx =>
          val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
            " To ignore this error, set spark.driver.allowMultipleContexts = true. " +
            s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
          val exception = new SparkException(errMsg)
          if (allowMultipleContexts) {
            logWarning("Multiple running SparkContexts detected in the same JVM!", exception)
          } else {
            throw exception
          }
        }

 

SparkContext는 기본적으로 1개만 존재하도록 구성되어 있습니다. allowMultipleContexts를 허용하여야 복수개가 존재할수 있습니다.

 

Spark 애플리케이션이 리소스를 할당 받는 과정

from pyspark import SparkContext, SparkConf

# Spark 설정 및 SparkContext 생성
conf = SparkConf().setAppName("MySparkJob")
sc = SparkContext(conf=conf)

# YARN 리소스 설정
# 'yarn'을 Spark 마스터로 설정하여 YARN 클러스터에서 실행하도록 지정합니다.
conf.set("spark.master", "yarn")

# YARN 리소스 할당 설정
# Spark 잡이 요청할 YARN 리소스를 설정합니다.
conf.set("spark.executor.instances", "2")  # 실행자(노드) 수
conf.set("spark.executor.memory", "2g")     # 각 실행자의 메모리
conf.set("spark.executor.cores", "2")       # 각 실행자의 CPU 코어 수

# SparkContext를 새로 생성하여 설정을 적용
sc.stop()  # 이미 생성된 SparkContext가 있다면 종료
sc = SparkContext(conf=conf)

# 여기에 Spark 잡 코드를 작성합니다.

# SparkContext를 종료
sc.stop()

이 코드에서는 SparkConf를 사용하여 Spark 설정을 구성하고, SparkContext를 생성합니다. 그리고 YARN 리소스 관련 설정을 conf 객체에 추가합니다. spark.master를 "yarn"으로 설정하여 YARN 클러스터에서 실행하도록 지정하고,

spark.executor.instances, spark.executor.memory, 그리고 spark.executor.cores와 같은 설정을 통해 YARN에서 할당되는 리소스를 조절할 수 있습니다.

마지막으로, Spark 잡 코드를 실행하고 SparkContext를 종료합니다. 이 코드를 실행하면 Spark 잡은 YARN 클러스터에서 설정한 리소스에 따라 실행될 것입니다.

 

 

Spark 애플리케이션 실행 과정

  1. Spark 애플리케이션 제출: spark-submit 명령어 또는 프로그램을 통해 애플리케이션을 클러스터에 제출합니다.
  2. Cluster Manager 선택: YARN, Mesos 등의 클러스터 매니저 중 하나를 선택합니다.
  3. Resource 요청: 실행자의 개수, 메모리, CPU 코어 등 필요한 리소스를 클러스터 매니저에 요청합니다.
  4. 리소스 할당: 클러스터 매니저는 가용 리소스를 기반으로 리소스를 할당합니다.
  5. 애플리케이션 실행자 시작: 할당된 리소스로 클러스터 노드에 실행자를 시작합니다.
  6. 작업 실행: 실행자는 데이터를 처리하고 결과를 생성합니다.
  7. 상태 모니터링: 실행 중에 애플리케이션은 실행자의 상태를 클러스터 매니저에 주기적으로 보고합니다.
  8. 작업 완료 및 자원 반환: 모든 작업이 완료되면 리소스를 반환하고 클러스터 매니저가 이를 다른 애플리케이션에 재할당할 수 있습니다.

클라이언트 모드 vs. 클러스터 모드

 

 

  • 클라이언트 모드:
    • 드라이버가 사용자의 클라이언트 머신에서 실행됩니다.
    • 드라이버 메모리 설정은 클라이언트 머신에서 이루어지며, 클러스터 리소스와 무관합니다.
    • 드라이버 실패 시, 사용자가 수동으로 재시작해야 합니다.
    • 애플리케이션 코드와 의존성이 클라이언트 머신에 위치합니다.
    • 개발 및 디버깅에 유리합니다.
    • 마스터 노드에서 Running Drivers를 확인 할 수 없습니다.

 

 

spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://dev-yunhoo-test001-ncl:7077 \
  --deploy-mode client \
  --driver-memory 512m \
  --executor-memory 512m \
  --executor-cores 1 \
  $SPARK_HOME/examples/jars/spark-examples*.jar \
  100000

 

 

  • 클러스터 모드:
    • 드라이버가 클러스터 내부, 관리 노드에서 실행됩니다.
    • 드라이버 메모리는 클러스터 관리자에 의해 관리됩니다.
    • 드라이버 실패 시, 관리노드에서 자동으로 재시작을 시도합니다.
    • 애플리케이션 코드와 의존성은 클러스터 내에서 관리됩니다.
    • 실제 운영 환경에서 사용에 적합합니다.
    • 마스터 노드 UI에서 Running Drivers를 확인 할수 있습니다.

 

spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://dev-yunhoo-test001-ncl:7077 \
  --deploy-mode cluster \
  --driver-memory 512m \
  --executor-memory 512m \
  --executor-cores 1 \
  $SPARK_HOME/examples/jars/spark-examples*.jar \
  100000

 

'Tools > Spark' 카테고리의 다른 글

Spark RDD 연산 주요 개념  (0) 2024.03.11
RDD의 생성과 데이터 처리  (1) 2024.03.11
Spark 클러스터 환경 구성과 실행  (0) 2024.02.19
빅데이터, 하둡 및 Spark 소개  (0) 2024.02.18
Apache Spark 설정 및 튜닝  (0) 2023.10.17