使用HBase保存偏移量
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
package com.qf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class HBaseConnectionPool {
private static LinkedList<Connection> pool = new LinkedList<Connection>();
static {
try {
Configuration config = HBaseConfiguration.create();
config.addResource(HBaseConfiguration.class.getClassLoader().getResourceAsStream("hbase-site.xml"));
for (int i = 0; i < 5; i++) {
pool.push(ConnectionFactory.createConnection(config));
}
}catch (Exception e) {
e.printStackTrace();
}
}
public static Connection getConnection() {
while (pool.isEmpty()) {
try {
System.out.println("pool is empty, please wait for a moment");
Thread.sleep(1000);
}catch (Exception e) {
e.printStackTrace();
}
}
return pool.poll();
}
public static void realse(Connection connection) {
if (connection != null) pool.push(connection);
}
public static void set(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column, byte[] value) {
try {
Table table = connection.getTable(tableName);
Put put = new Put(rowkey);
put.addColumn(columnFamily, column, value);
table.put(put);
table.close();
}catch (Exception e) {
e.printStackTrace();
}
}
public static String getColValue(Connection connection, TableName tableName, byte[] rowkey, byte[] columnFamily, byte[] column) {
try {
Table table = connection.getTable(tableName);
Result result = table.get(new Get(rowkey));
return new String(result.getValue(columnFamily, column));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static Map<Integer, Long> getColValue(Connection connection, TableName tableName, byte[] rowkey) {
Map<Integer, Long> partition2Offset = new HashMap<>();
try {
Table table = connection.getTable(tableName);
Scan scan = new Scan();
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(rowkey));
scan.setFilter(rowFilter);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
List<Cell> cells = result.listCells();
for (Cell cell : cells) {
Integer partition = Integer.parseInt(new String(CellUtil.cloneQualifier(cell)));
Long offset = Long.parseLong(new String(CellUtil.cloneValue(cell)));
partition2Offset.put(partition, offset);
}
}
table.close();
} catch (IOException e) {
e.printStackTrace();
}
return partition2Offset;
}
@Test
public void test() {
Connection connection = HBaseConnectionPool.getConnection();
TableName tableName = TableName.valueOf("hbase-spark");
Map<Integer, Long> map = HBaseConnectionPool.getColValue(connection, tableName, "kafka-zk".getBytes());
System.out.println(map.size());
for (Map.Entry<Integer, Long> entry : map.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
}
}
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hadoop:9000/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop</value>
</property>
</configuration>
package com.qf.bigdata.spark.streaming.day2
import com.qf.HBaseConnectionPool
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.zookeeper.CreateMode
import java.{lang, util}
import scala.collection.{JavaConversions, mutable}
object Demo3_Offset_HBase extends LoggerTrait {
def main(args: Array[String]): Unit = {
if (args == null || args.length != 3) {
println(
"""
|usage <brokerList> <groupId> <topicstr>
|""".stripMargin)
System.exit(-1)
}
val Array(brokerList, groupId, topicstr) = args;
val sparkConf = new SparkConf()
.setAppName("Demo3_Offset_HBase")
.setMaster("local[*]")
val duration = Seconds(2)
val ssc = new StreamingContext(sparkConf, duration)
val kafkaParams = Map[String, String](
"bootstrap.servers" -> brokerList,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
val topics: Set[String] = topicstr.split(",").toSet
val messages: InputDStream[(String, String)] = createMsg(ssc, kafkaParams, topics)
messages.foreachRDD((rdd, time) => {
if (!rdd.isEmpty()) {
println("-" * 30)
println(s"Time : ${time}")
println(s"####### RDD Count : ${rdd.count()}")
saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, kafkaParams("group.id"))
println("-" * 30)
}
})
ssc.start()
ssc.awaitTermination()
}
def createMsg(ssc:StreamingContext, kafkaParams:Map[String, String], topics: Set[String]): InputDStream[(String, String)] = {
val fromOffsets: Map[TopicAndPartition, Long] = getFromOffsets(topics, kafkaParams("group.id"))
var messages:InputDStream[(String, String)] = null
if (fromOffsets.isEmpty) {
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}else {
val messageHandler = (msgHandler:MessageAndMetadata[String, String]) => (msgHandler.key(), msgHandler.message())
messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
}
messages
}
def getFromOffsets(topics:Set[String], groupId:String):Map[TopicAndPartition, Long] = {
val offsets: mutable.Map[TopicAndPartition, Long] = mutable.Map[TopicAndPartition, Long]()
val connection: Connection = HBaseConnectionPool.getConnection
val tableName: TableName = TableName.valueOf("hbase-spark")
val cf: Array[Byte] = Bytes.toBytes("cf")
topics.foreach(topic => {
val rk = Bytes.toBytes(s"${topic}-${groupId}")
val partition2Offsets: util.Map[Integer, lang.Long] = HBaseConnectionPool.getColValue(connection, tableName, rk)
JavaConversions.mapAsScalaMap(partition2Offsets).foreach {
case (partition, offset) => offsets.put(TopicAndPartition(topic, partition), offset)
}
})
HBaseConnectionPool.realse(connection)
offsets.toMap
}
def saveOffsets(offsetRanges:Array[OffsetRange], groupId:String) = {
val connection: Connection = HBaseConnectionPool.getConnection
val tableName: TableName = TableName.valueOf("hbase-spark")
val cf: Array[Byte] = Bytes.toBytes("cf")
for(offsetRange <- offsetRanges) {
val topic = offsetRange.topic
val partition = offsetRange.partition
val offset = offsetRange.untilOffset + 1L
val rk = s"${topic}-${partition}".getBytes()
HBaseConnectionPool.set(connection, tableName, rk, cf, partition.toString.getBytes(), offset.toString.getBytes())
println(s"${topic} -> ${partition} -> ${offsetRange.fromOffset} -> ${offset}")
}
HBaseConnectionPool.realse(connection)
}
}
hbase(main):002:0> create 'hbase-spark','cf'
0 row(s) in 1.4630 seconds
=> Hbase::Table - hbase-spark
## 开启16000、16020端口
|