IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark Streaming整合Kafka实战 -> 正文阅读

[大数据]Spark Streaming整合Kafka实战

能力目标
1.能够使用KafkaUtils.createDstream方式读取数据到Spark Dstream
2.能够使用KafkaUtils.createDirectStream方式读取数据到Spark Dstream
3.能够编写Spark Streaming+Kafka应用程序


前言

点赞+评论+关注=养成好习惯

提示:以下是本篇文章正文内容,下面案例可供参考

一、Spark Streaming连接Kafka的两种方式

Kafka作为一个实时的分布式消息队列,实时地生产和消费消息。在大数据计算框架中,可利用Spark Streaming实时读取Kafka中的数据,再进行相关计算。在Spark1.3版本后,KafkaUtils里面提供了两个创建DStream的方式,一种是KafkaUtils.createDstream方式,另一种为KafkaUtils.createDirectStream方式。

1.KafkaUtils.createDstream方式

KafkaUtils.createDstream是通过Zookeeper连接Kafka,receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据。
在这里插入图片描述

2.KafkaUtils.createDirectStream方式

当接收数据时,它会定期地从Kafka中Topic对应Partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,然后Spark通过调用Kafka简单的消费者API(即低级API)来读取一定范围的数据。
在这里插入图片描述

二、Spark Streaming使用KafkaUtils.createDstream方式连接Kafka

1.创建IDEA工程

确定pom.xml文件中添加Spark Streaming整合Kafka的依赖

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
	<version>2.3.2</version>
</dependency>

注:spark项目工程已经添加该依赖

2.创建Scala类,实现词频统计

在spark项目下,创建一个名“SparkStreaming_Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计

SparkStreaming_Kafka_createDstream.scala

下面程序的IP地址根据实际Linux IP地址进行修改

在这里插入图片描述

3.创建Topic,指定消息的类别

先启动zookeeper(可以status查看,如果没有启动则启动)
[root@hadoop kafka]# /opt/zookeeper/bin/zkServer.sh  start
[root@hadoop bin]# jps
20320 DataNode
38257 QuorumPeerMain
38274 Jps
20216 NameNode
21145 Master
21227 Worker
31884 RunJar
20685 ResourceManager
20797 NodeManager
20526 SecondaryNameNode


再在kafka根目录下用命令启动kafka服务
注意:kafka的配置文件要加上应当配置远程监听主机名称(ip根据实际修改)

[root@hadoop kafka]# vim /opt/kafka/config/server.properties
listeners=PLAINTEXT://192.168.10.10:9092
[root@hadoop bin]# cd /opt/kafka/
启动kafka
[root@hadoop kafka]# bin/kafka-server-start.sh  config/server.properties

创建Topic
[root@hadoop ~]# kafka-topics.sh --create --topic kafka_spark --partitions 3 --replication-factor 1 --zookeeper hadoop:2181

启动Kafka的消息生产者,并观察IDEA控制台输出
可先在IDEA中运行SparkStreaming_Kafka_createDstream.scala程序,再启动kafka消息生产者,顺序也不一定这样

[root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_spark
>hadoop spark hbase kafka spark
>kafka itcast itcast spark kafka spark kafka

查看IDEA运行结果,可以从kafka读取生产者的数据,实现了词频统计
-------------------------------------------
Time: 1648391930000 ms
-------------------------------------------
(spark,4)
(hadoop,1)
(hbase,1)
(itcast,2)
(kafka,4)

注意:如果我们使用KafkaUtils.createDstream方式时,一开始系统会正常运行,没有任何问题,但是当系统出现异常,重启SparkStreaming程序后,则发现程序会重复处理已经处理过的数据。由于这种方式是使用kafka高级消费者API,topic的offset偏移量是在zookeeper中。虽然这种方式会配合着WAL日志保证数据零丢失的高可靠性,但却无法保证数据只被处理一次,可能会处理两次,因此官方不再推荐使用这种方式,推荐下面的方式。

三、Spark Streaming使用KafkaUtils.createDirectStream方式连接Kafka

1.IDEA项目pom.xml文件导入依赖

确定pom.xml文件中添加Spark Streaming整合Kafka的依赖

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
	<version>2.3.2</version>
</dependency>

注:spark项目工程已经添加该依赖

2.IDEA项目pom.xml文件导入依赖

在spark项目下,创建一个名为“SparkStreaming_Kafka_createDirectStream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。

SparkStreaming_Kafka_createDirectStream.scala

下面程序的IP地址根据实际Linux IP地址进行修改
在这里插入图片描述

3.创建Topic,指定消息的类别

先启动zookeeper(可以status查看,如果没有启动则启动)
[root@hadoop kafka]# /opt/zookeeper/bin/zkServer.sh  start

创建Topic
[root@hadoop ~]# kafka-topics.sh --create --topic kafka_direct0 --partitions 3 --replication-factor 1 --zookeeper hadoop:2181

启动Kafka的消息生产者,并观察IDEA控制台输出
可先在IDEA中运行SparkStreaming_Kafka_createDstream.scala程序,再启动kafka消息生产者,顺序也不一定这样

[root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_direct0
>hadoop spark hbase kafka spark
>kafka itcast itcast spark kafka spark kafka

查看IDEA运行结果,可以从kafka读取生产者的数据,实现了词频统计
-------------------------------------------
Time: 1648393295000 ms
-------------------------------------------
(itcast,2)
(kafka,4)
(spark,4)
(hadoop,1)
(hbase,1)
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-01 15:49:32  更:2022-05-01 15:52:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:42:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码