spark 에서 join하기
spark 활용 다음 단계로 join 에 대해 설명하겠습니다
우선 Join을 하려면 당연히 두개의 RDD를 만들어야 하며
다음은 ORDER라는 주문테이블과 GOODS 라는 상품테이블 JOIN하는 예입니다.
val conf0 = new SparkConf()
.setMaster("local")
.setAppName("My App")
.set("spark.executor.memory","1g")
.setSparkHome("/home/cloudera/Downloads/spark-0.9.0-incubating")
val sc = new SparkContext(conf0)
val conf_order = HBaseConfiguration.create()
val conf_goods = HBaseConfiguration.create()
val TableName_Order = "ORDER"
val TableName_Goods = "GOODS"
conf_order.set(TableInputFormat.INPUT_TABLE, TableName_Order)
conf_goods.set(TableInputFormat.INPUT_TABLE,TableName_Goods)
val hBaseRDD_Order = sc.newAPIHadoopRDD(conf_order, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val hBaseRDD_Goods = sc.newAPIHadoopRDD(conf_goods, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val t_Order = hBaseRDD_Order.map(tuple => tuple._2)
.map(result => ( new String(result.getValue("Attr".getBytes(), "GoodsNo".getBytes()) ).toInt) , new String(result.getValue("Attr".getBytes(), "OrderNo".getBytes())) )
//이전편에 말한데로 Hbase 데이터를 읽어오게 되면 array[byte] 형태로 저장이 되어서 new String() 과 같은방식으로 String으로 변환
//Join할 컬럼을 먼저해서 Tuple을 만듬
val t_Goods = hBaseRDD_Goods.map(result => ( new String(result._1.get()), new String(result._2.getValue("Attr".getBytes(), "Color".getBytes()) )) )
// GOODS테이블의 경우 GoodsNo가 Key 여서 result._1 로 받음
val f_Order = new PairRDDFunctions(t_Order) //PairRDD생성
val flow1 = f_Order .join(t_Goods) //join
println(flow1.count) //건수를 확인