场景:
为什么要使用协处理器呢? 答: 使用hbase内部的组件, 可以提高我们数据的发送保存效率
图解:
如何使用hbase的协处理器呢?
- 创建类( 继承BaseRegionObserver类),实现prePut或者postPut
- 关联表(在表创建时,就进行关联)
- 将jar包(及依赖jar包) 放到hbase的lib当中,并集群分发
- 注意: 如果使用协处理器之后, 导致HregionServer进程掉线,查看日志,可能是协处理器代码出现问题了
实操代码:
public class InsertCalleeCoprocessor extends BaseRegionObserver {
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
Table table = e.getEnvironment().getTable(TableName.valueOf(Names.TABLE.value()));
PartitionNum part = new PartitionNum();
String rowkey = Bytes.toString(put.getRow());
String[] str = rowkey.split("_");
String call1 = str[1];
String call2 = str[2];
String callTime = str[3];
String duration = str[4];
String flag = str[5];
if("1".equals(flag)) {
String calleeRowkey=part.getPartitionNum(call2,callTime)+"_"+call2+"_"+call1+"_"+callTime+"_"+duration+"_0";
Put calleePut = new Put( calleeRowkey.getBytes());
calleePut.addColumn(Names.CF_CALLEE.value().getBytes(), Bytes.toBytes("call1"),Bytes.toBytes(call2));
calleePut.addColumn(Names.CF_CALLEE.value().getBytes(), Bytes.toBytes("call2"),Bytes.toBytes(call1));
calleePut.addColumn(Names.CF_CALLEE.value().getBytes(), Bytes.toBytes("callTime"),Bytes.toBytes(callTime));
calleePut.addColumn(Names.CF_CALLEE.value().getBytes(), Bytes.toBytes("duration"),Bytes.toBytes(duration));
calleePut.addColumn(Names.CF_CALLEE.value().getBytes(), Bytes.toBytes("flag"),Bytes.toBytes("0"));
table.put(calleePut);
table.close();
}
}
private class PartitionNum extends BaseDao{
}
}
|