Tools/Spark

Spark 다양한 데이터 타입 다루기

칼쵸쵸 2024. 5. 8. 18:55

 

1. Boolean 데이터 타입

Boolean 타입은 True 또는 False 값을 가질 수 있습니다. 조건에 따라 데이터를 필터링할 때 주로 사용됩니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Boolean Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq((1, true), (2, false), (3, true))
val df = spark.createDataFrame(data).toDF("id", "isActive")

// Boolean 조건을 사용한 필터링
val filteredDf = df.filter($"isActive" === true)
filteredDf.show()

 

2. 수치 데이터 타입

수치 데이터 타입에는 정수, 실수 등이 있으며, 수치 연산에 자주 사용됩니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Numeric Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq((1, 20.5), (2, 30.0), (3, 25.5))
val df = spark.createDataFrame(data).toDF("id", "temperature")

// 수치 연산 (온도 환산)
val updatedDf = df.withColumn("temperatureF", col("temperature") * 1.8 + 32)
updatedDf.show()

 

3. 문자열 데이터 타입

문자열 데이터 타입은 텍스트를 처리할 때 사용됩니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("String Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq((1, "apple"), (2, "banana"), (3, "cherry"))
val df = spark.createDataFrame(data).toDF("id", "fruit")

// 문자열 함수 사용
val upperDf = df.withColumn("fruitUpper", upper(col("fruit")))
upperDf.show()

 

translate 함수

translate 함수는 DataFrame의 문자열 컬럼에서 특정 문자들을 다른 문자들로 치환할 때 사용됩니다. 이 함수는 각 문자를 개별적으로 매핑하여 치환하며, 정규 표현식을 사용하지 않습니다.

 

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Translate Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq(("1-800-555-2468"), ("1-800-555-ABCD"))
val df = spark.createDataFrame(data).toDF("phone")

// '-'를 제거하고 'ABC'를 '123'로 치환
val modifiedDf = df.withColumn("cleanPhone", translate($"phone", "-ABC", "123"))
modifiedDf.show()

 

contains 함수

contains 함수는 문자열 컬럼이 특정 문자열을 포함하고 있는지를 확인할 때 사용됩니다. 이 함수는 boolean 값을 반환하며, 문자열 검색에 사용됩니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Contains Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq(("spark@example.com"), ("hello@world.com"))
val df = spark.createDataFrame(data).toDF("email")

// 'example'을 포함하는 이메일 필터링
val filteredDf = df.filter($"email".contains("example"))
filteredDf.show()

 

Varargs를 활용한 select 함수

select 함수는 DataFrame에서 특정 컬럼을 선택할 때 사용되며, Scala의 varargs 기능을 활용하여 여러 컬럼을 한 번에 선택할 수 있습니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Select Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq((1, "John Doe", 30), (2, "Jane Doe", 25))
val df = spark.createDataFrame(data).toDF("id", "name", "age")

// varargs를 사용하여 여러 컬럼 선택
val selectedDf = df.select($"name", $"age")
selectedDf.show()

 

 

4. Date 데이터 타입

날짜 타입은 날짜 관련 연산에 사용됩니다.

import org.apache.spark.sql.SparkSession
import java.sql.Date
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Date Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq((1, Date.valueOf("2023-01-01")))
val df = spark.createDataFrame(data).toDF("id", "date")

// 날짜 연산 (날짜 추가)
val updatedDf = df.withColumn("nextDay", date_add(col("date"), 1))
updatedDf.show()

 

5. Timestamp 데이터 타입

Timestamp 데이터 타입은 시간과 관련된 데이터를 처리할 때 사용됩니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.sql.Timestamp

val spark = SparkSession.builder.appName("Timestamp Example").getOrCreate()

// 데이터 프레임 생성
val data = Seq((1, Timestamp.valueOf("2023-01-01 12:00:00")))
val df = spark.createDataFrame(data).toDF("id", "timestamp")

// 타임스탬프 함수 사용
val updatedDf = df.withColumn("hourAdded", expr("timestamp + interval 1 hour"))
updatedDf.show()

 

 

Date 와 Timestamp 형식

  • date: YYYY-MM-DD 형식의 날짜만 포함합니다.
  • timestamp: YYYY-MM-DD HH:MM:SS.ssssss 형식으로 날짜와 시간을 포함합니다.

date_add 와 date_sub 함수

  • date_add: 주어진 날짜에 특정 일 수를 더합니다.
  • date_sub: 주어진 날짜에서 특정 일 수를 뺍니다.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Date Functions").getOrCreate()

val df = spark.createDataFrame(Seq(
  ("2023-01-01"),
  ("2023-01-15")
)).toDF("date")

val df2 = df.select(
  col("date"),
  date_add(col("date"), 10).alias("date_add"),
  date_sub(col("date"), 5).alias("date_sub")
)

df2.show()

 

months_between 함수

  • months_between: 두 날짜 사이의 개월 수를 반환합니다. 결과는 부동 소수점 숫자로 반환되며, 두 날짜의 일수 차이도 고려합니다.\
val df3 = spark.createDataFrame(Seq(
  ("2023-01-01", "2023-04-01"),
  ("2023-01-15", "2023-01-01")
)).toDF("start_date", "end_date")

val df4 = df3.select(
  col("start_date"),
  col("end_date"),
  months_between(col("end_date"), col("start_date")).alias("months_between")
)

df4.show()

 

to_date 함수

  • to_date: 문자열을 날짜 타입으로 변환합니다. 선택적으로 포맷을 지정할 수 있습니다.
val df5 = spark.createDataFrame(Seq(
  ("20230101"),
  ("15-01-2023"),
  ("01/15/2023")
)).toDF("string_date")

val df6 = df5.select(
  col("string_date"),
  to_date(col("string_date"), "yyyyMMdd").alias("date1"),
  to_date(col("string_date"), "dd-MM-yyyy").alias("date2"),
  to_date(col("string_date"), "MM/dd/yyyy").alias("date3")
)

df6.show()

 

 

SimpleDateFormat이 지원하는 포맷

SimpleDateFormat은 날짜와 시간을 파싱하고 포맷팅하는 데 사용되는 Java의 클래스로, 다음과 같은 포맷 문자를 지원합니다:

  • yyyy: 4자리 연도
  • MM: 2자리 월 (01-12)
  • dd: 2자리 일 (01-31)
  • HH: 2자리 시간 (00-23)
  • mm: 2자리 분 (00-59)
  • ss: 2자리 초 (00-59)
  • SSS: 밀리초 (000-999)

이 문자들을 조합하여 다양한 날짜 및 시간 포맷을 지정할 수 있습니다. 예를 들어, "yyyy-MM-dd HH:mm:ss"는 "2023-01-01 12:01:01"과 같은 포맷을 나타냅니다.

 

 

6. 정규 표현식 사용

문자열 필터링

rlike 함수는 정규 표현식을 사용하여 데이터를 필터링하는 데 사용됩니다. 이는 DataFrame의 문자열 컬럼에 패턴이 존재하는지 검사합니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Regex Filtering").getOrCreate()

// 데이터 프레임 생성
val data = Seq(("john@example.com"), ("jane.doe@example.com"), ("doe@example.com"))
val df = spark.createDataFrame(data).toDF("email")

// 정규 표현식을 사용하여 이메일 필터링
val filteredDf = df.filter($"email" rlike "^[a-z]+[.][a-z]+@example.com$")
filteredDf.show()

 

문자열 추출

regexp_extract 함수는 문자열에서 정규 표현식에 해당하는 부분을 추출합니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Regex Extract").getOrCreate()

// 데이터 프레임 생성
val data = Seq(("100-200"), ("300-400"), ("500-600"))
val df = spark.createDataFrame(data).toDF("range")

// 정규 표현식을 사용하여 숫자 추출
val extractedDf = df.withColumn("start", regexp_extract($"range", "(\\d+)-", 1))
                  .withColumn("end", regexp_extract($"range", "-(\\d+)", 1))
extractedDf.show()

 

문자열 치환

regexp_replace 함수를 사용하여 문자열 내용을 정규 표현식을 기반으로 치환할 수 있습니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Regex Replace").getOrCreate()

// 데이터 프레임 생성
val data = Seq(("Mr John Smith"), ("Ms Jane Doe"), ("Dr John Doe"))
val df = spark.createDataFrame(data).toDF("name")

// 정규 표현식을 사용하여 타이틀 제거
val modifiedDf = df.withColumn("cleanName", regexp_replace($"name", "^(Mr|Ms|Dr)\\s", ""))
modifiedDf.show()

 

문자열 분할

split 함수는 정규 표현식을 사용하여 문자열을 분할할 수 있습니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Regex Split").getOrCreate()

// 데이터 프레임 생성
val data = Seq(("John,Smith"), ("Jane,Doe"), ("Someone,Else"))
val df = spark.createDataFrame(data).toDF("fullName")

// 쉼표로 이름 분할
val splitDf = df.withColumn("firstName", split($"fullName", ",")(0))
                .withColumn("lastName", split($"fullName", ",")(1))
splitDf.show()

 

 

7.  Null 다루기

coalesce 함수

coalesce 함수는 인자로 주어진 여러 컬럼 중에서 null이 아닌 첫 번째 값을 반환합니다. 이 함수는 주로 여러 컬럼 중 하나라도 유효한 데이터가 있을 때 그 값을 선택할 때 사용됩니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Null Handling").getOrCreate()

val data = Seq((null, "apple"), ("banana", null), (null, null), ("cherry", "berry"))
val df = spark.createDataFrame(data).toDF("col1", "col2")

val resultDf = df.select(coalesce($"col1", $"col2").alias("non_null_value"))
resultDf.show()

 

nvl 함수

nvl 함수는 두 개의 인자를 받아 첫 번째 인자가 null인 경우 두 번째 인자를 반환합니다. 이 함수는 Oracle SQL에서 가져온 개념이며, 일부 Spark 버전에서는 nvl 대신 coalesce를 사용할 수 있습니다.

 

ifnull 함수

ifnull 함수는 nvl과 비슷하게 동작합니다. 첫 번째 인자가 null인 경우 두 번째 인자를 반환합니다. 만약 첫 번째 인자가 null이 아니면 그 값을 반환합니다.

val resultDf2 = df.select(expr("ifnull(col1, col2) as non_null_value"))
resultDf2.show()

 

nullif 함수

nullif 함수는 두 인자를 받고, 두 인자가 같으면 null을 반환하고, 다르면 첫 번째 인자를 반환합니다. 이는 특정 조건에서 값을 제거할 때 유용합니다.

val resultDf3 = df.select(expr("nullif(col1, col2) as result"))
resultDf3.show()

 

isnull 함수

isnull 함수는 컬럼이 null인지 여부를 검사하여 Boolean 값을 반환합니다. 이 함수는 데이터를 필터링하거나 조건부 로직에 사용됩니다.

val resultDf4 = df.select($"col1", $"col2", isnull($"col1").alias("col1_is_null"))
resultDf4.show()

 

isnan 함수

isnan 함수는 컬럼 값이 NaN인지를 검사합니다. 이 함수는 주로 수치 데이터에서 사용됩니다.

val data2 = Seq((Double.NaN), (1.0), (2.0))
val df2 = spark.createDataFrame(data2.map(Tuple1.apply)).toDF("value")
val resultDf5 = df2.select($"value", isnan($"value").alias("is_NaN"))
resultDf5.show()

 

dropna 함수

dropna 함수는 DataFrame에서 하나 이상의 null 값을 포함하는 로우를 제거합니다. 필터링 조건을 지정할 수 있습니다.

val cleanedDf = df.na.drop()
cleanedDf.show()

 

fillna 함수

fillna 함수는 DataFrame 내의 모든 null 값을 특정 값으로 대체할 수 있습니다. 값과 컬럼을 지정할 수 있습니다.

val filledDf = df.na.fill("default_value")
filledDf.show()

 

8. 복합 데이터 다루기

1. 구조체(Struct)

구조체는 여러 필드를 하나의 복합 데이터 유형으로 그룹화할 수 있게 해줍니다. struct 함수를 사용하여 새로운 구조체 컬럼을 생성할 수 있습니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder.appName("Struct Example").getOrCreate()

val data = Seq(
  (1, "John Doe", 30),
  (2, "Jane Doe", 25)
)
val df = spark.createDataFrame(data).toDF("id", "name", "age")

val structDf = df.select($"id", struct($"name", $"age").alias("personal_info"))
structDf.show(false)

 

2. 배열(Array)

배열은 여러 값의 리스트를 하나의 컬럼에 저장할 수 있게 해줍니다. split, explode, array_contains 함수를 통해 배열 데이터를 다룰 수 있습니다.

split 함수

문자열을 특정 구분자로 나눠 배열로 변환합니다.

val arrayDf = df.withColumn("name_array", split($"name", " "))
arrayDf.show(false)

 

explode 함수

배열 내 각 요소를 별도의 로우로 변환합니다.

val explodedDf = arrayDf.select($"id", explode($"name_array").alias("name_exploded"))
explodedDf.show(false)

 

array_contains 함수

배열이 특정 값을 포함하는지 검사합니다.

val containsDf = arrayDf.select($"id", array_contains($"name_array", "John").alias("contains_John"))
containsDf.show(false)

 

맵(Map)

맵은 키-값 쌍의 컬렉션을 저장하는 데이터 타입입니다. map 함수를 사용하여 맵 컬럼을 생성할 수 있습니다.

val mapDf = df.select($"id", map($"name", $"age").alias("name_age_map"))
mapDf.show(false)

 

배열과 맵의 조합

배열과 맵을 조합하여 더 복잡한 데이터 구조를 만드는 방법도 있습니다. 예를 들어, 사용자의 여러 특성을 배열 안의 맵으로 표현할 수 있습니다.

val complexData = Seq(
  (1, Array(Map("tag" -> "active", "interest" -> "soccer"), Map("tag" -> "geek", "interest" -> "tech"))),
  (2, Array(Map("tag" -> "inactive", "interest" -> "none")))
)
val complexDf = spark.createDataFrame(complexData).toDF("id", "attributes")

complexDf.show(false)

 

9. JSON

JSON 데이터 읽기

Spark에서는 DataFrameReaderjson 메소드를 사용하여 JSON 파일을 DataFrame으로 읽을 수 있습니다. 예를 들어, 로컬 시스템에 저장된 JSON 파일이나 분산 파일 시스템에 저장된 JSON 데이터를 DataFrame으로 쉽게 로드할 수 있습니다.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("JSON Example").getOrCreate()

// JSON 파일 읽기
val df = spark.read.json("path_to_json_file.json")

// DataFrame 출력
df.show()

 

DataFrame에서 JSON 데이터로 쓰기

DataFrame을 JSON 형식으로 파일 시스템에 저장할 수도 있습니다. 이 경우, DataFrameWriterjson 메소드를 사용합니다.

// DataFrame을 JSON 파일로 저장
df.write.json("path_to_output_directory")

 

DataFrame을 JSON 문자열로 변환

DataFrame의 각 로우를 JSON 문자열로 변환할 수 있는 toJSON 메소드가 있습니다. 이 메소드는 DataFrame의 각 로우를 JSON 형식의 문자열로 변환한 후, 이를 Dataset[String]으로 반환합니다.

// DataFrame을 JSON 문자열로 변환
val jsonString = df.toJSON
jsonString.show(false)

 

JSON 문자열을 DataFrame으로 변환

반대로, JSON 형식의 문자열을 포함하는 DataFrame이나 Dataset을 Spark DataFrame으로 변환할 수도 있습니다. 이 작업을 수행하려면 JSON 문자열을 DataFrame으로 읽어들이는 방법을 사용할 수 있습니다.

import org.apache.spark.sql.functions._

// 예제 JSON 문자열
val jsonData = Seq("""{"name":"John","age":30}""", """{"name":"Jane","age":25}""").toDS()

// JSON 문자열을 DataFrame으로 파싱
val dfFromJson = spark.read.json(jsonData)
dfFromJson.show()

 

복잡한 JSON 구조 다루기

JSON 데이터가 중첩된 구조를 가지고 있다면, DataFrame의 select와 explode 함수를 사용하여 중첩된 배열을 다룰 수 있습니다. 또한 get_json_object 또는 json_tuple과 같은 함수를 사용하여 JSON 객체 내의 특정 필드를 추출할 수 있습니다.

// 중첩된 JSON 예제
val complexJsonData = Seq(
  """{"name":"John","details":{"age":30, "address":"123 Main St"}}""",
  """{"name":"Jane","details":{"age":25, "address":"456 Maple Ave"}}"""
).toDS()

// 중첩된 JSON 데이터를 DataFrame으로 파싱
val complexDf = spark.read.json(complexJsonData)
complexDf.select(
  col("name"),
  col("details.age").alias("age"),
  col("details.address").alias("address")
).show()

 

10. 사용자 정의 함수

UDF 생성하기

스칼라에서 UDF를 생성하기 위해서는 먼저 함수를 정의하고, 이를 Spark SQL의 UDF로 등록해야 합니다. 등록된 UDF는 SQL 쿼리에서 사용할 수 있으며, DataFrame의 select와 같은 변환에서도 사용할 수 있습니다.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._

// 간단한 사용자 정의 함수 정의
val addOne = (x: Int) => x + 1

// UDF로 등록
val addOneUDF = udf(addOne)

// 데이터 프레임 생성
val df = Seq((1), (2), (3)).toDF("number")

// UDF 사용
val resultDf = df.select(addOneUDF($"number").alias("plus_one"))
resultDf.show()

 

UDF 사용하기

등록된 UDF는 SQL 문장에서도 사용할 수 있습니다. 먼저 UDF를 SQL 쿼리에서 사용할 수 있도록 Spark SQL에 등록해야 합니다.

// UDF를 SQL에서 사용 가능하게 등록
spark.udf.register("addOneSQL", addOne)

// SQL 쿼리 실행
val resultDfSQL = spark.sql("SELECT addOneSQL(number) as plus_one FROM values (1), (2), (3) AS data(number)")
resultDfSQL.show()

 

복잡한 데이터 타입을 반환하는 UDF

UDF는 단순한 데이터 타입뿐만 아니라, 배열이나 구조체 같은 복잡한 데이터 타입을 반환할 수도 있습니다. 예를 들어, 이름과 나이를 입력받아 구조체로 반환하는 UDF를 만들 수 있습니다.

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

// 반환할 구조체의 스키마 정의
val schema = new StructType()
  .add("name", StringType)
  .add("age", IntegerType)

// 구조체를 반환하는 함수
val createPerson = (name: String, age: Int) => Row(name, age)

// UDF 등록
val createPersonUDF = udf(createPerson, schema)

// 사용
val namesAges = Seq(("John", 30), ("Jane", 25))
val df2 = spark.createDataFrame(namesAges).toDF("name", "age")

val resultDf2 = df2.select(createPersonUDF($"name", $"age").alias("person"))
resultDf2.show(false)

 

이렇게 UDF를 사용하면 Spark에서 제공하지 않는 복잡한 데이터 변환도 쉽게 구현할 수 있습니다. UDF는 유연성을 제공하지만, UDF가 네이티브 함수보다 성능이 떨어질 수 있으므로 성능을 고려해 적절히 사용해야 합니다.