본문 바로가기

Data&Processing

Spark dataframe 에서 중첩컬럼(nested column) 처리rename/drop/convert to map

spark dataframe nested column rename/drop/convert to map

Spark dataframe 에서 중첩컬럼(nested column) 처리

Spark Dataframe을 다루다보면 중첩컬럼(a.b 형식과 같은) 데이터를 다루는 경우가 있는데 이는 다른 컬럼과 달리 drop이나 rename이 간단하지는 않아서 여기저기서 찾은 내용을 정리해봅니다.

 

SampleData 준비

val jsonData = sc.parallelize(List(""" {"id":1, "attributes": { "c0":100, "c1":200, "c2":300 } }"""))
val logs1 = spark.read.json(jsonData)
logs1.printSchema

//결과
root
 |-- attributes: struct (nullable = true)
 |    |-- c0: long (nullable = true)
 |    |-- c1: long (nullable = true)
 |    |-- c2: long (nullable = true)
 |-- id: long (nullable = true)

 

Nested Column Rename

- attributes.c1 컬럼을 attributes.column1으로 변경

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import scala.collection.mutable
import org.apache.spark.sql.Column
val schema: StructType = logs1.schema.fields.find(_.name=="attributes").get.dataType.asInstanceOf[StructType]
val newSchema = StructType.apply(schema.fields.map(sf => StructField.apply({
    if (sf.name == "c1") "column1"
    else sf.name
},sf.dataType)))
logs1.withColumn("attributes",$"attributes".cast(newSchema)).printSchema

//결과
root
 |-- attributes: struct (nullable = true)
 |    |-- c0: long (nullable = true)
 |    |-- column1: long (nullable = true)
 |    |-- c2: long (nullable = true)
 |-- id: long (nullable = true)

- 다른방법으로 attributes에서 필요한 컬럼만 선택 후 컬럼명 변경. 위와 결과는 동일합니다.

import org.apache.spark.sql.{functions => f}
val selectColumns = Array("c0", "c1","c2")
val renameColumns = Map("c1"-> "column1")

Try(logs1.schema.fields.filter(_.name == "attributes").head).flatMap(t => {
    Try(t.dataType.asInstanceOf[StructType])
})
.map(t => t.fieldNames.intersect( selectColumns ))
.map(t => {
    f.struct(t.map(x => f.col("attributes").getItem(x).alias( if(renameColumns.keySet.contains(x)) renameColumns(x) else x  )): _* )
})
.map(logs1.withColumn("attributes",_))
.getOrElse(logs1)
.printSchema

//결과
root
 |-- attributes: struct (nullable = true)
 |    |-- c0: long (nullable = true)
 |    |-- column1: long (nullable = true)
 |    |-- c2: long (nullable = true)
 |-- id: long (nullable = true)

 

Nested Column Drop

- 이부분은 인터넷에 몇가지 예제가 있는데, 다음 함수가 간결하고 이해도 쉬워서 소개합니다.https://intellipaat.com/community/16624/dropping-a-nested-column-from-spark-dataframe

import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField}
import org.apache.spark.sql.{functions => f}
import scala.util.Try
case class DFWithDropFrom(df: DataFrame) {
    def getSourceField(source: String): Try[StructField] = {
    Try(df.schema.fields.filter(_.name == source).head)
    }
    def getType(sourceField: StructField): Try[StructType] = {
    Try(sourceField.dataType.asInstanceOf[StructType])
    }
    def genOutputCol(names: Array[String], source: String): Column = {
    f.struct(names.map(x => f.col(source).getItem(x).alias(x)): _*)
    }
    def dropFrom(source: String, toDrop: Array[String]): DataFrame = {
    getSourceField(source)
        .flatMap(getType)
        .map(_.fieldNames.diff(toDrop))
        .map(genOutputCol(_, source))
        .map(df.withColumn(source, _))
        .getOrElse(df)
    }
}

- attributes.c1 컬럼을 삭제하는 예제. Array에 여러개 컬럼을 넣어서 삭제할 수도 있습니다.

DFWithDropFrom(logs1).dropFrom("attributes", Array("c1")).printSchema

//결과
root
 |-- attributes: struct (nullable = false)
 |    |-- c0: long (nullable = true)
 |    |-- c2: long (nullable = true)
 |-- id: long (nullable = true)

Nested Column 을 map으로 변환

- nested column 형태의 데이터가 보기에도 명시적이고, select 등에서도 편하지만, json 데이터 등을 parsing해서 dataframe을 만들경우 nested column에 모든 column이 동일하게 있지 않을수도 있어서 해당 데이터를 union하거나 테이블에 insert할때 에러가 생기게 됩니다. 하나하나씩 필요한 컬럼만 parsing을 하는 방법도 있지만, 여기서는 nested column을 map으로 변환해서 nested column중 일부 column들이 달라도 활용할수 있도록 해보겠습니다.

val index = logs1.schema.fieldIndex("attributes")
val propSchema = logs1.schema(index).dataType.asInstanceOf[StructType]
val columns = mutable.LinkedHashSet[Column]()
propSchema.fields.foreach(field =>{
  columns.add(lit(field.name))
  columns.add(col("attributes." + field.name))
})

logs1.withColumn("attributes",map(columns.toSeq:_*)).printSchema
logs1.withColumn("attributes",map(columns.toSeq:_*)).show()

//결과1(Schema)
root
 |-- attributes: map (nullable = false)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- id: long (nullable = true)
 
 //결과2(Data)
 +--------------------+---+
|          attributes| id|
+--------------------+---+
|[c0 -> 100, c1 ->...|  1|
+--------------------+---+