이전에 소개드린데로 apache hudi가 EMR에 적용이 되었고 예제는 https://aws.amazon.com/ko/blogs/korea/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/에 잘 나와있습니다.
여기에 glue metastore 기반한 테이블 생성 및 unique 데이터 insert하는 방법에 대해서만 추가해서 설명해보도록 하겠으며 이 예제를 활용하게되면 운영데이터의 ODS 데이터 적재도 별다른 테이블작업 없이 가능해보입니다.
Hive 테이블 생성
- Hive(glue metastore)와 동기화하려면 HIVE_DATABASE_OPT_KEY 와 HIVE_SYNC_ENABLED_OPT_KEY 를 설정해줍니다.
- 예제가 되는 데이터는 elb에 들어오는 로그데이터로 보여지는데 ip가 unique하지 않지만 insert 했기때문에 unique하지 않은 request_ip 가 들어가게 되고 count결과를 보시면 unique하지 않음을 확인할수 있습니다.
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val inputDataPath = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=1/"
val inputDF = spark.read.format("parquet").load(inputDataPath)
val hudiTableName = "elb_logs_hudi_cow"
val hudiTablePath = "s3://location..../" + hudiTableName
inputDF.write.format("org.apache.hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "request_timestamp").
option(RECORDKEY_FIELD_OPT_KEY, "request_ip").
option(PARTITIONPATH_FIELD_OPT_KEY, "request_verb").
option(HIVE_SYNC_ENABLED_OPT_KEY,"true").
option(TABLE_NAME, hudiTableName).
option(HIVE_TABLE_OPT_KEY, hudiTableName).
option(HIVE_DATABASE_OPT_KEY,"db_name").
option(HIVE_PARTITION_FIELDS_OPT_KEY,"request_verb").
option(HIVE_ASSUME_DATE_PARTITION_OPT_KEY,"false").
option(OPERATION_OPT_KEY,INSERT_OPERATION_OPT_VAL).
option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Overwrite).
save(hudiTablePath)
scala> inputDF.count()
res13: Long = 10491958
scala> spark.sql("select count(*),count(distinct request_ip) from db_name.elb_logs_hudi_cow").show()
+--------+--------------------------+
|count(1)|count(DISTINCT request_ip)|
+--------+--------------------------+
|10491958| 10136077|
+--------+--------------------------+
데이터 중복없는 테이블 생성
- 다음과 같이 OPERATION_OPT_KEY 옵션을 UPSERT_OPERATION_OPT_VAL로 설정하면 pk인 request_ip에 대해서 unique한 테이블이 만들어지게 됩니다.
val hudiTableName = "elb_logs_hudi_cow_test01"
val hudiTablePath = "s3://location..../" + hudiTableName
inputDF.write.format("org.apache.hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "request_timestamp").
option(RECORDKEY_FIELD_OPT_KEY, "request_ip").
option(PARTITIONPATH_FIELD_OPT_KEY, "request_verb").
option(HIVE_SYNC_ENABLED_OPT_KEY,"true").
option(TABLE_NAME, hudiTableName).
option(HIVE_TABLE_OPT_KEY, hudiTableName).
option(HIVE_DATABASE_OPT_KEY,"db_name").
option(HIVE_PARTITION_FIELDS_OPT_KEY,"request_verb").
option(HIVE_ASSUME_DATE_PARTITION_OPT_KEY,"false").
option(OPERATION_OPT_KEY,UPSERT_OPERATION_OPT_VAL).
option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Overwrite).
save(hudiTablePath)
spark.sql("select count(*),count(distinct request_ip) from db_name.elb_logs_hudi_cow_test01").show()
+--------+--------------------------+
|count(1)|count(DISTINCT request_ip)|
+--------+--------------------------+
|10136077| 10136077|
+--------+--------------------------+
중복없는 데이터 Insert
- 테이블이 만들어진후 데이터를 insert하기위해서는 mode를 Append로 해서 실행합니다.
- Append할때도 OPERATION_OPT_KEY를 UPSERT_OPERATION_OPT_VAL 로 하지 않고 INSERT로 하게되면 중복있는데이터가 들어갑니다.
val inputDataPath2 = "s3://athena-examples-us-west-2/elb/parquet/year=2015/month=1/day=2/"
val inputDF2 = spark.read.format("parquet").load(inputDataPath2)
inputDF2.write.format("org.apache.hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "request_timestamp").
option(RECORDKEY_FIELD_OPT_KEY, "request_ip").
option(PARTITIONPATH_FIELD_OPT_KEY, "request_verb").
option(HIVE_SYNC_ENABLED_OPT_KEY,"true").
option(TABLE_NAME, hudiTableName).
option(HIVE_TABLE_OPT_KEY, hudiTableName).
option(HIVE_DATABASE_OPT_KEY,"db_name").
option(HIVE_PARTITION_FIELDS_OPT_KEY,"request_verb").
option(HIVE_ASSUME_DATE_PARTITION_OPT_KEY,"false").
option(OPERATION_OPT_KEY,UPSERT_OPERATION_OPT_VAL).
option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName).
mode(Append).
save(hudiTablePath)
spark.sql("select count(*),count(distinct request_ip) from db_name.elb_logs_hudi_cow_test02").show()
+--------+--------------------------+
|count(1)|count(DISTINCT request_ip)|
+--------+--------------------------+
|14915438| 14915438|
+--------+--------------------------+