본문 바로가기

Data&Processing

spark 에서 join하기

spark 활용 다음 단계로 join 에 대해 설명하겠습니다


우선 Join을 하려면 당연히 두개의 RDD를 만들어야 하며

다음은 ORDER라는 주문테이블과 GOODS 라는 상품테이블 JOIN하는 예입니다.



    val conf0 = new SparkConf()

  .setMaster("local")

  .setAppName("My App")

  .set("spark.executor.memory","1g")

  .setSparkHome("/home/cloudera/Downloads/spark-0.9.0-incubating")


    val sc = new SparkContext(conf0)


    val conf_order = HBaseConfiguration.create()

    val conf_goods = HBaseConfiguration.create()


    val TableName_Order = "ORDER"

    val TableName_Goods = "GOODS"


    conf_order.set(TableInputFormat.INPUT_TABLE, TableName_Order)

    conf_goods.set(TableInputFormat.INPUT_TABLE,TableName_Goods)


    val hBaseRDD_Order = sc.newAPIHadoopRDD(conf_order, classOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])


    val hBaseRDD_Goods = sc.newAPIHadoopRDD(conf_goods, classOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])

     

    val t_Order = hBaseRDD_Order.map(tuple => tuple._2)

    .map(result => ( new String(result.getValue("Attr".getBytes(), "GoodsNo".getBytes()) ).toInt) , new String(result.getValue("Attr".getBytes(), "OrderNo".getBytes())) )

//이전편에 말한데로 Hbase 데이터를 읽어오게 되면 array[byte] 형태로 저장이 되어서 new String() 과 같은방식으로 String으로 변환

//Join할 컬럼을 먼저해서 Tuple을 만듬

    

    val t_Goods = hBaseRDD_Goods.map(result => ( new String(result._1.get()), new String(result._2.getValue("Attr".getBytes(), "Color".getBytes()) )) )

// GOODS테이블의 경우 GoodsNo가 Key 여서 result._1 로 받음


    val f_Order = new PairRDDFunctions(t_Order) //PairRDD생성

    val flow1 = f_Order .join(t_Goods) //join

    

   println(flow1.count) //건수를 확인