Tools/Spark

Spark DataFrame의 그룹화 예제

칼쵸쵸 2024. 5. 8. 19:21

 

1. 그룹화

표현식을 이용한 그룹화

groupBy 메소드는 하나 이상의 컬럼을 기준으로 데이터를 그룹화하며, 이 때 문자열이나 컬럼 객체(Column)를 직접 지정할 수 있습니다. 그룹화된 데이터에 대해 집계 함수를 적용하여 각 그룹의 요약 정보를 계산할 수 있습니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Grouping Example").getOrCreate()
import spark.implicits._

// 예제 데이터 프레임 생성
val df = Seq(
  ("Banana", 1000, "USA"),
  ("Carrot", 1500, "USA"),
  ("Bean", 1600, "UK"),
  ("Orange", 2000, "USA"),
  ("Orange", 2000, "USA"),
  ("Banana", 400, "China"),
  ("Carrot", 1200, "China"),
  ("Bean", 1500, "China")
).toDF("Product", "Amount", "Country")

// 표현식을 이용한 그룹화
val groupedDf = df.groupBy($"Country", $"Product").agg(sum("Amount").alias("TotalAmount"))
groupedDf.show()

 

맵을 이용한 그룹화

맵을 이용한 그룹화는 집계를 수행할 때 매우 유용합니다. 이 방법을 사용하면 컬럼 이름과 집계 함수를 매핑하는 맵을 agg 메소드에 전달할 수 있습니다. 이는 집계 표현식을 동적으로 생성하고 관리할 때 특히 유용합니다.

// 맵을 이용한 그룹화
val groupedDf2 = df.groupBy($"Country").agg(Map("Amount" -> "sum", "Amount" -> "avg"))
groupedDf2.show()

 

위의 예제에서는 Country 별로 Amount 컬럼에 대한 합계와 평균을 계산합니다. 이렇게 agg 함수에 맵을 전달하면, 키는 집계하려는 컬럼 이름이고, 값은 사용하려는 집계 함수의 이름입니다.

 

이 두 가지 방법은 데이터를 그룹화하고 요약하는 데 각기 다른 장점이 있습니다. 표현식을 사용한 그룹화는 코드가 간결하고 직관적인 반면, 맵을 사용한 그룹화는 더 복잡한 집계를 동적으로 구성할 수 있는 유연성을 제공합니다. Spark 데이터 프레임에서 이러한 그룹화 기능을 사용하여 데이터를 효율적으로 분석하고 요약할 수 있습니다.

2. 윈도우 함수

랭크 함수

랭크 함수에는 rank(), dense_rank(), row_number() 등이 있습니다. 이들은 모두 데이터를 정렬한 후 각 로우에 순위를 할당합니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Window Functions").getOrCreate()
import spark.implicits._

val data = Seq(
  ("Banana", 1000, "USA"),
  ("Carrot", 1500, "USA"),
  ("Banana", 400, "China"),
  ("Carrot", 1200, "China"),
  ("Banana", 2000, "USA"),
  ("Carrot", 2000, "China")
)
val df = spark.createDataFrame(data).toDF("Product", "Amount", "Country")

val windowSpec = Window.partitionBy("Country").orderBy($"Amount".desc)
val rankDf = df.withColumn("rank", rank().over(windowSpec))
rankDf.show()
 

 

분석 함수

분석 함수에는 lead(), lag(), first(), last() 등이 있습니다. 이 함수들은 각 로우에 대해 전후 데이터를 참조할 수 있게 해 줍니다.

val leadDf = df.withColumn("nextAmount", lead($"Amount", 1).over(windowSpec))
leadDf.show()

 

집계 함수

집계 함수에는 sum(), avg(), min(), max() 등이 있습니다. 이 함수들은 윈도우 내에서 각각의 집계를 수행합니다.

val sumDf = df.withColumn("totalAmount", sum($"Amount").over(windowSpec))
sumDf.show()

 

위의 예제에서는 Window.partitionBy("Country").orderBy($"Amount".desc)를 사용하여 각 국가 내에서 Amount에 따라 데이터를 정렬하고, 각 윈도우 함수를 적용하여 다양한 분석을 수행합니다. 이 방식은 데이터를 그룹화하고 각 그룹 내에서 순위를 매기거나 이전/다음 데이터를 참조하거나 집계를 수행할 때 유용합니다.

 

3. 다차원 분석

롤업(Rollup)

롤업은 계층적인 집계를 수행하며, 지정된 컬럼의 계층을 따라서 부분적인 합계를 계산합니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Grouping Example").getOrCreate()
import spark.implicits._

val data = Seq(
  ("Banana", 1000, "USA"),
  ("Carrot", 1500, "USA"),
  ("Banana", 400, "China"),
  ("Carrot", 1200, "China"),
  ("Banana", 2000, "USA"),
  ("Carrot", 2000, "China")
)
val df = spark.createDataFrame(data).toDF("Product", "Amount", "Country")

val rollupDf = df.rollup("Country", "Product").agg(sum("Amount").alias("TotalAmount"))
rollupDf.show()


결과

+-------+-------+-----------+
|Country|Product|TotalAmount|
+-------+-------+-----------+
|   null|   null|       7100|
|  China|   null|       3600|
|  China| Carrot|       3200|
|  China| Banana|        400|
|    USA|   null|       3500|
|    USA| Banana|       3000|
|    USA| Carrot|       1500|
+-------+-------+-----------+

 

큐브(Cube)

큐브는 롤업과 비슷하지만, 모든 차원에 대한 서브토탈을 계산합니다.

val cubeDf = df.cube("Country", "Product").agg(sum("Amount").alias("TotalAmount"))
cubeDf.show()

 

결과

+-------+-------+-----------+
|Country|Product|TotalAmount|
+-------+-------+-----------+
|   null|   null|       7100|
|   null| Banana|       3400|
|   null| Carrot|       3700|
|  China|   null|       3600|
|  China| Carrot|       3200|
|  China| Banana|        400|
|    USA|   null|       3500|
|    USA| Banana|       3000|
|    USA| Carrot|       1500|
+-------+-------+-----------+

 

 

피벗(Pivot)

피벗은 특정 컬럼의 고유 값을 새로운 컬럼으로 변환하여 데이터를 요약합니다.

val pivotDf = df.groupBy("Country").pivot("Product").sum("Amount")
pivotDf.show()

 

결과

+-------+------+------+
|Country|Banana|Carrot|
+-------+------+------+
|  China|   400|  3200|
|    USA|  3000|  1500|
+-------+------+------+

 

그룹화 메타 데이터

그룹화 메타 데이터는 rollup, cube 등에서 생성된 데이터 프레임에 대해 어떤 컬럼이 집계를 위해 그룹화되었는지 식별할 수 있습니다.

val rollupMetaDf = rollupDf.withColumn("grouping_id", grouping_id())
rollupMetaDf.show()

 

결과

+-------+-------+-----------+-----------+
|Country|Product|TotalAmount|grouping_id|
+-------+-------+-----------+-----------+
|   null|   null|       7100|          3|
|  China|   null|       3600|          1|
|  China| Carrot|

 

 

4. 사용자 정의 집계 함수

Apache Spark에서 사용자 정의 집계 함수(User Defined Aggregate Function, UDAF)는 사용자가 직접 집계 로직을 정의할 수 있게 해 주며, 이를 통해 복잡한 데이터 집계를 수행할 수 있습니다. Spark 2.x 이상에서는 UDAF를 정의하기 위해 UserDefinedAggregateFunction 추상 클래스를 상속받아 사용합니다. 여기서는 UDAF의 생성 방법과 UserDefinedAggregateFunction 클래스에 대해 설명하겠습니다.

UserDefinedAggregateFunction 클래스

UserDefinedAggregateFunction 클래스를 상속받아 사용자 정의 집계 함수를 생성하려면 다음 메소드들을 구현해야 합니다:

  1. inputSchema: UDAF 입력 데이터의 스키마를 정의하는 StructType을 반환합니다.
  2. bufferSchema: 집계 중간 결과를 저장할 버퍼의 스키마를 정의하는 StructType을 반환합니다.
  3. dataType: 반환 데이터의 타입을 정의합니다.
  4. deterministic: 함수가 동일한 입력에 대해 항상 동일한 출력을 반환하는지 여부를 반환합니다 (보통 true).
  5. initialize: 집계 버퍼를 초기 상태로 설정하는 메소드입니다.
  6. update: 입력 데이터로 버퍼를 업데이트하는 메소드입니다.
  7. merge: 두 집계 버퍼를 병합하는 메소드입니다.
  8. evaluate: 최종 결과를 계산하여 반환하는 메소드입니다.

예제: 사용자 정의 집계 함수

다음은 간단한 사용자 정의 집계 함수 예제로, 입력된 숫자의 평균을 계산하는 UDAF입니다.

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

class AverageUDAF extends UserDefinedAggregateFunction {
  // 입력 데이터 타입 정의
  def inputSchema: StructType = StructType(StructField("inputColumn", DoubleType) :: Nil)

  // 버퍼 데이터 타입 정의
  def bufferSchema: StructType = StructType(
    StructField("sum", DoubleType) ::
    StructField("count", LongType) :: Nil
  )

  // 반환 타입 정의
  def dataType: DataType = DoubleType

  // 함수의 결정성 여부
  def deterministic: Boolean = true

  // 버퍼 초기화
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0.0  // sum
    buffer(1) = 0L   // count
  }

  // 버퍼 업데이트
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getDouble(0) + input.getDouble(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }

  // 버퍼 병합
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }

  // 최종 결과 계산
  def evaluate(buffer: Row): Double = buffer.getDouble(0) / buffer.getLong(1)
}

 

사용 방법

위에서 정의한 AverageUDAF를 사용하는 방법은 다음과 같습니다.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("UDAF Example").getOrCreate()
import spark.implicits._

val df = Seq(1.0, 2.0, 3.0, 4.0, 5.0).toDF("numbers")

val averageUdaf = new AverageUDAF()
val result = df.agg(averageUdaf($"numbers").alias("average"))
result.show()

이 예제는 입력된 숫자의 평균을 계산하여 출력합니다. 사용자 정의 집계 함수는 표준 집계 함수로 처리할 수 없는 복잡한 데이터 집계 작업에 유용합니다.

 

AggregationBuffer

AggregationBuffer, 정확히는 MutableAggregationBuffer,는 Apache Spark에서 사용자 정의 집계 함수(UDAF)를 구현할 때 중요한 구성 요소입니다. 이 버퍼는 사용자 정의 집계 함수의 중간 계산 결과를 저장하는 데 사용됩니다. 각 그룹별로 집계가 진행되는 동안 필요한 모든 데이터를 이 버퍼에 저장하고, 최종 결과를 계산할 때까지 이를 유지 관리합니다.

 

MutableAggregationBuffer의 역할

MutableAggregationBuffer는 집계 함수가 데이터를 그룹별로 처리할 때 각 그룹의 중간 결과를 저장하는 역할을 합니다. 이는 SQL의 GROUP BY 절과 유사한 방식으로 작동합니다. 다음과 같은 주요 기능을 수행합니다:

  1. 초기화(initialize): 집계가 시작될 때, 이 버퍼를 초기 상태로 설정합니다. 예를 들어, 합계를 계산하는 UDAF의 경우, 초기 합계를 0으로 설정할 수 있습니다.
  2. 업데이트(update): 새로운 데이터 행이 처리될 때마다 이 버퍼를 업데이트합니다. 예를 들어, 각 행의 값을 합계에 추가할 수 있습니다.
  3. 병합(merge): 여러 태스크 또는 파티션에서 독립적으로 계산된 중간 결과를 병합할 때 사용됩니다. 예를 들어, 두 개의 합계 버퍼를 하나로 합칠 수 있습니다.
  4. 평가(evaluate): 최종적으로 버퍼에 저장된 데이터를 기반으로 최종 결과를 계산하고 반환합니다.

동작 방식

UDAF는 데이터 프레임의 각 파티션에서 독립적으로 실행될 수 있으며, 각 파티션의 결과는 최종적으로 병합됩니다. MutableAggregationBuffer는 이 과정에서 각 단계의 중간 결과를 유지하는 데 중요한 역할을 합니다.

예를 들어, 전체 데이터셋에 대한 평균을 계산하는 경우, 각 파티션의 합계와 개수를 저장하고, 모든 파티션에서의 결과를 병합한 후 최종 평균을 계산할 수 있습니다.