IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkStreaming02增强 上集群 -> 正文阅读

[大数据]SparkStreaming02增强 上集群

SparkStreaming02增强 上集群

代码展示

package com.hpznyf.sparkstreaming.ss64

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
 *
spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.6 \
--jars /home/hadoop/app/hive/lib/mysql-connector-java-5.1.47.jar \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss


--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
 */
object OffsetClusterApp {
  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println(
        """
          |Usage : OffsetClusterApp <batch> ,<groupid>, <brokers>, <topic>
          | <batch> : spark 流处理作业运行得时间间隔
          | <groupid> : 消费组编号
          | <brokers> : Kafka集群地址
          | <topic> : 消费得Topic名称
          |""".stripMargin)
      System.exit(1)
    }

    val Array(batch, groupid, brokers, topic) = args

    val sparkConf = new SparkConf()
      //.setAppName(this.getClass.getCanonicalName)
      //.setMaster("local[3]")
      //.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

    val ssc = new StreamingContext(sparkConf, Seconds(batch.toInt))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupid, // 换组 重新开始消费
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array(topic)
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.foreachRDD(rdd => {
      if(!rdd.isEmpty()){

        /**
         * TODO... 获取offset 必须要是kafkaRDD, 所以上方必须也是要一手得rdd
         * driver
         * 如果上方改成了mappartitionRDD, 就无法获得offsetRange
         */
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach(x => {
          println(s"${x.topic}, ${x.partition}, ${x.fromOffset}, ${x.untilOffset}")
        })

        /**
         * TODO.. 业务处理
         * executor
         */
        rdd.flatMap(_.value().split(",")).map((_,1)).reduceByKey(_+_).foreach(println)

        /**
         * TODO.. 提交Offset 提交之后 ke页面就可以查看了
         * Driver
         * 异步,但是kafka是没有事务得,输出需要保持幂等性,无法保证精准一次消费
          */
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }else{
        println("该批次没有数据")
      }
    })
    // wc 操作
//    stream.map(_.value()).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

1. 上集群报错解决

脚本:

spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss
报错

noClassDefFoundError:org/apache/kafka./common/stringDeserializer

原因:用到kafka util  但是没有

解决1:

spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.6 \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss

添加了package得依赖
报错
缺少mysql驱动

spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
--conf spark.serializer=org.apache.spark.serializer.KryoSerialize \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:2.4.6 \
--jars /home/hadoop/app/hive/lib/mysql-connector-java-5.1.47.jar \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss

也有可能序列化得类找不到 这个时候再另想办法吧
----------------------但是有个大问题,集群不能上网就完了

2. 上集群报错解决 使用胖包

瘦包:
仅包含源码

如果应用中需要相关得依赖,需要将依赖传入服务器

似胖非胖:
包含源码
但是不包含所有pom依赖中得dependencies
但是包含一些服务器上没有得
对于该应用只需要 mysql sparkstreaming kafka

胖包:
把所有得依赖都打进去

在目录里执行mvn命令
将target文件清除

mvn clean

在这里插入图片描述

mvn assembly:assembly

打包需要狠长时间

但是包太大了,那该怎么办呢?
在这里插入图片描述

这个时候需要解决一下这个问题 就是要把pom文件内 不需要得依赖给屏蔽掉

给不需要得依赖加上
            <scope>provided</scope>

然后git
mvn clean
mvn assembly:assembly  -DskipTests

在这里插入图片描述

现在只需要

spark-submit \
--master local[4] \
--class com.hpznyf.sparkstreaming.ss64.OffsetClusterApp \
/home/hadoop/lib/hpznyf-spark-core-1.0-SNAPSHOT-jar-with-dependencies.jar \
10 ruoze hadoop003:9092,hadoop004:9093,hadoop005:9094 pkss

可以开启producer测一下

kafka-console-producer.sh --broker-list hadoop003:9092,hadoop004:9093,hadoop005:9094 --topic pkss

然后启动消费者
在这里插入图片描述
完成~

3. 复习SS对接Kafka场景

SS对接Kafka数据

  • 业务逻辑处理
  • 保存结果
  • 提交Offset

对于业务逻辑处理结果

  1. 聚合: 数据拉到driver 然后进行保存
  2. 非聚合:数据量巨大,不能拉到driver

那么非聚合得怎么处理呢?

非聚合 典型: ETL 操作
数据 - kafka - ETL - NOSQL(HBase/ES/…)

Executor数据拉入到Driver端进行保存, 不可行啊!!!!! 数据量太大了,Driver扛不住!!!!
这种场景,需要在Executor端完成两件事情

  1. 数据落到NoSQL - a表
  2. Offset也要落到NoSQL里 - b表

HBase保证行级别得事务

关键点 == 如何将 数据存到一个行里面
表设计:
HBase表 – 设计两个CF
CF1 : O 表示数据得CF
CF2 : offset 存储Offset得CF

rk1, 数据cf,offsetCf
rk2, 数据cf,offsetCf
rk3, 数据cf,offsetCf
.....

场景: 数据写成功,如何保证Offset写成功。
offset能保证嘛? 可以得 ,因为是行级别得

场景2:如果数据重复,怎么办?
没问题,RK相同,数据会做Update操作,可以覆盖 多版本就行

要做的事情:

  1. executor端获取offset+data写入HBase
  2. 还需要从HBase中 获取已经有的offset
    只要记录partition内得最后一条记录 存offset

4. 实现非聚合executor端数据进入HBase

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
        </dependency>

创建topic

[hadoop@hadoop003 ~]$ kafka-topics.sh --create --zookeeper hadoop003:2181/kafka --replication-factor 1 --partitions 3 --topic hbaseoffset

[hadoop@hadoop003 ~]$ kafka-topics.sh --list --zookeeper hadoop003:2181/kafka

4.1 简单代码展示: 读取hbaseoffset topic数据

package com.hpznyf.sparkstreaming.ss64

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

object HBaseOffsetApp {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
      .setAppName(this.getClass.getCanonicalName)
      .setMaster("local[3]")
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val groupId  = "kafka-ss-hbase-offset"
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop003:9092,hadoop004:9093,hadoop005:9094",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId, // 换组 重新开始消费
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )



    val topics = Array("hbaseoffset")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.foreachRDD(rdd => {
      if(!rdd.isEmpty()){

        /**
         * TODO... 获取offset 必须要是kafkaRDD, 所以上方必须也是要一手得rdd
         * driver
         * 如果上方改成了mappartitionRDD, 就无法获得offsetRange
         */
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        offsetRanges.foreach(x => {
          println(s"${x.topic}, ${x.partition}, ${x.fromOffset}, ${x.untilOffset}")
        })

      }else{
        println("该批次没有数据")
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

4.2 遇到报错: jar包冲突

noSuchMethodError: io.netty.buffer.PooledByteByufAllocator

报错为jar包冲突
如何解决?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:43:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 17:08:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码