Data&Processing

[talend] Hbase에서 연동 (rowkey 적용)

별이별이 2012. 12. 3. 11:10

Talend Open Studio for Big Data 는 hbase를 source로 사용가능하다.

그런데, HBaseInput에 보면 



로 되어있는데 Hbase 를 잘 아는건 아니지만 table별로 family 정보는 관련담당자에게 받어야한다.

입력할때 "이름" 이런식으로 입력해야한다.


그치만, 문제가 있는데 전체테이블만 가져올수 있다.


Hbase구조가 각 row마다 rowkey가 있고 key구조를 사용자가 설계를 할 수 있는데 batch로 데이터를 가져오려면 해당table 전체를 가져올게 아니고 rowkey를 이용해서 특정구간/기간만을 가져오려고 할때는 사용을 할 수가 없다.


5.1.2 version에서 Hbase지원에서 가능한 옵션은 다 찾아보았지만 못찾았고, 어떻게 해결해야하나 찾아보았고 Custom Code>>tJavaFlex 에서 해결이 가능했다.


1) 우선 Hbase관련한 java coding에 대해서는 아는게 없어서 기존에 HbaseInput에서 사용한 Java를 참조.

2) tJavaFlex는 Main Code 부분에서 row를 return해주는 방식으로.

3) Hbase관련한 library를 컴파일할때 넣어주어야하는데 옵션은 못찾겠고 그냥 tHbaseConnection 컴포넌트만 Job에 넣어주면 간단히 해결된다. 컴파일 옵션에 넣어주면 되긴하는데 못찾겠음.


간단한 Layout은 다음과 같고.




아래코드들은 tHbaseInput에서 복사한거라서 약간비효율적일 수도 있으나, 그냥 썻다.


1) javaFlex에 schema를 설정하고

2) Start code


org.apache.hadoop.conf.Configuration conn_tHBaseInput_1 = null;

conn_tHBaseInput_1 = org.apache.hadoop.hbase.HBaseConfiguration.create();

conn_tHBaseInput_1.clear();

conn_tHBaseInput_1.set("hbase.zookeeper.quorum","IP");

conn_tHBaseInput_1.set("hbase.zookeeper.property.clientPort", "Port");

conn_tHBaseInput_1.set("hbase.cluster.distributed", "true");


org.apache.hadoop.hbase.client.Scan scan_tHBaseInput_1 = new org.apache.hadoop.hbase.client.Scan();

scan_tHBaseInput_1.setStartRow(org.apache.hadoop.hbase.util.Bytes.toBytes("start rowkey"));

scan_tHBaseInput_1.setStopRow(org.apache.hadoop.hbase.util.Bytes.toBytes("end rowkey"));


scan_tHBaseInput_1.addColumn(org.apache.hadoop.hbase.util.Bytes.toBytes("family명")

,org.apache.hadoop.hbase.util.Bytes.toBytes("컬럼명"));

//컬럼이 여러개면 계속추가


String temp_tHBaseInput_1 = null;

byte[] rowResult_tHBaseInput_1 = null;

org.apache.hadoop.hbase.client.ResultScanner scanner_tHBaseInput_1 

= table_tHBaseInput_1.getScanner(scan_tHBaseInput_1);


// start part of your Java code

for(org.apache.hadoop.hbase.client.Result rr_tHBaseInput_1 = scanner_tHBaseInput_1.next()

; rr_tHBaseInput_1 != null

; rr_tHBaseInput_1 = scanner_tHBaseInput_1.next()) {


3) Main code


//row key return

String byteToString = new String(rr_tHBaseInput_1.getRow());

row2.rowkey = byteToString;


rowResult_tHBaseInput_1 = rr_tHBaseInput_1.getValue(org.apache.hadoop.hbase.util.Bytes.toBytes("family명")

,org.apache.hadoop.hbase.util.Bytes.toBytes("컬럼명"));

temp_tHBaseInput_1 = org.apache.hadoop.hbase.util.Bytes.toString(rowResult_tHBaseInput_1);

if (temp_tHBaseInput_1 != null && temp_tHBaseInput_1.length() > 0) { row2.컬럼명 = temp_tHBaseInput_1.toString();

} else { row2.컬럼명= null; }

//컬럼이 여러개면 위에 두statement를 반복
//컬럼정보를 가져와서 row2에 입력!!

4) End Code

}


table_tHBaseInput_1.close();

scanner_tHBaseInput_1.close();

org.apache.hadoop.hbase.client.HConnectionManager.deleteConnection(conn_tHBaseInput_1, true);

//for문 닫고 connection 종료


위와 같이 하면 Hbase를 읽어 올수 있음.

이런방식은 Talend가 지원하지않거나 옵션이 부족한 다른 Database나 Java가 지원하는 Data Source에서도 가능할 듯 하다.