SparkStreaming02增强 上集群
代码展示
package com.hpznyf.sparkstreaming.ss64
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object OffsetClusterApp {
def main(args: Array[String]): Unit = {
if(args.length != 4){
System.err.println(
"""
|Usage : OffsetClusterApp <batch> ,<groupid>, <brokers>, <topic>
| <batch> : spark 流处理作业运行得时间间隔
| <groupid> : 消费组编号
| <brokers> : Kafka集群地址
| <topic> : 消费得Topic名称
|""".stripMargin)
System.exit(1)
}
val Array(batch, groupid, brokers, topic) = args
val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Seconds(batch.toInt))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupid,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array(topic)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd => {
if(!rdd.isEmpty()){
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(x => {
println(s"${x.topic}, ${x.partition}, ${x.fromOffset}, ${x.untilOffset}")
})
rdd.flatMap(_.value().split(",")).map((_,1)).reduceByKey(_+_).foreach(println)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}else{
println("该批次没有数据")
}
})
ssc.start()
ssc.awaitTermination()
}
}
1. 上集群报错解决
脚本:
spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss
报错
noClassDefFoundError:org/apache/kafka./common/stringDeserializer
原因:用到kafka util 但是没有
解决1:
spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.6 \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss
添加了package得依赖
报错
缺少mysql驱动
spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.6 \
--jars /home/hadoop/app/hive/lib/mysql-connector-java-5.1.47.jar \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss
也有可能序列化得类找不到 这个时候再另想办法吧 ----------------------但是有个大问题,集群不能上网就完了
2. 上集群报错解决 使用胖包
瘦包: 仅包含源码
如果应用中需要相关得依赖,需要将依赖传入服务器
似胖非胖: 包含源码 但是不包含所有pom依赖中得dependencies 但是包含一些服务器上没有得 对于该应用只需要 mysql sparkstreaming kafka
胖包: 把所有得依赖都打进去
在目录里执行mvn命令 将target文件清除
mvn clean

mvn assembly:assembly
打包需要狠长时间
但是包太大了,那该怎么办呢? 
这个时候需要解决一下这个问题 就是要把pom文件内 不需要得依赖给屏蔽掉
给不需要得依赖加上
<scope>provided</scope>
然后git
mvn clean
mvn assembly:assembly -DskipTests

现在只需要
spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT-jar-with-dependencies.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss
可以开启producer测一下
kafka-console-producer.sh --broker-list hadoop003:9092,hadoop004:9093,hadoop005:9094 --topic pkss
然后启动消费者  完成~
3. 复习SS对接Kafka场景
SS对接Kafka数据
对于业务逻辑处理结果
- 聚合: 数据拉到driver 然后进行保存
- 非聚合:数据量巨大,不能拉到driver
那么非聚合得怎么处理呢?
非聚合 典型: ETL 操作 数据 - kafka - ETL - NOSQL(HBase/ES/…)
Executor数据拉入到Driver端进行保存, 不可行啊!!!!! 数据量太大了,Driver扛不住!!!! 这种场景,需要在Executor端完成两件事情
- 数据落到NoSQL - a表
- Offset也要落到NoSQL里 - b表
HBase保证行级别得事务
关键点 == 如何将 数据存到一个行里面 表设计: HBase表 – 设计两个CF CF1 : O 表示数据得CF CF2 : offset 存储Offset得CF
rk1, 数据cf,offsetCf
rk2, 数据cf,offsetCf
rk3, 数据cf,offsetCf
.....
场景: 数据写成功,如何保证Offset写成功。 offset能保证嘛? 可以得 ,因为是行级别得
场景2:如果数据重复,怎么办? 没问题,RK相同,数据会做Update操作,可以覆盖 多版本就行
要做的事情:
- executor端获取offset+data写入HBase
- 还需要从HBase中 获取已经有的offset
只要记录partition内得最后一条记录 存offset
4. 实现非聚合executor端数据进入HBase
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
创建topic
[hadoop@hadoop003 ~]$ kafka-topics.sh --create --zookeeper hadoop003:2181/kafka --replication-factor 1 --partitions 3 --topic hbaseoffset
[hadoop@hadoop003 ~]$ kafka-topics.sh --list --zookeeper hadoop003:2181/kafka
4.1 简单代码展示: 读取hbaseoffset topic数据
package com.hpznyf.sparkstreaming.ss64
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
object HBaseOffsetApp {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName(this.getClass.getCanonicalName)
.setMaster("local[3]")
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val groupId = "kafka-ss-hbase-offset"
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop003:9092,hadoop004:9093,hadoop005:9094",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId, // 换组 重新开始消费
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("hbaseoffset")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.foreachRDD(rdd => {
if(!rdd.isEmpty()){
/**
* TODO... 获取offset 必须要是kafkaRDD, 所以上方必须也是要一手得rdd
* driver
* 如果上方改成了mappartitionRDD, 就无法获得offsetRange
*/
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(x => {
println(s"${x.topic}, ${x.partition}, ${x.fromOffset}, ${x.untilOffset}")
})
}else{
println("该批次没有数据")
}
})
ssc.start()
ssc.awaitTermination()
}
}
4.2 遇到报错: jar包冲突
noSuchMethodError: io.netty.buffer.PooledByteByufAllocator
报错为jar包冲突 如何解决?
|