能力目标 1.能够使用KafkaUtils.createDstream方式读取数据到Spark Dstream 2.能够使用KafkaUtils.createDirectStream方式读取数据到Spark Dstream 3.能够编写Spark Streaming+Kafka应用程序
前言
点赞+评论+关注=养成好习惯
提示:以下是本篇文章正文内容,下面案例可供参考
一、Spark Streaming连接Kafka的两种方式
Kafka作为一个实时的分布式消息队列,实时地生产和消费消息。在大数据计算框架中,可利用Spark Streaming实时读取Kafka中的数据,再进行相关计算。在Spark1.3版本后,KafkaUtils里面提供了两个创建DStream的方式,一种是KafkaUtils.createDstream方式,另一种为KafkaUtils.createDirectStream方式。
1.KafkaUtils.createDstream方式
KafkaUtils.createDstream是通过Zookeeper连接Kafka,receivers接收器从Kafka中获取数据,并且所有receivers获取到的数据都会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据。
2.KafkaUtils.createDirectStream方式
当接收数据时,它会定期地从Kafka中Topic对应Partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,然后Spark通过调用Kafka简单的消费者API(即低级API)来读取一定范围的数据。
二、Spark Streaming使用KafkaUtils.createDstream方式连接Kafka
1.创建IDEA工程
确定pom.xml文件中添加Spark Streaming整合Kafka的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
<version>2.3.2</version>
</dependency>
注:spark项目工程已经添加该依赖
2.创建Scala类,实现词频统计
在spark项目下,创建一个名“SparkStreaming_Kafka_createDstream”的Scala类,用来编写Spark Streaming应用程序实现词频统计
SparkStreaming_Kafka_createDstream.scala
下面程序的IP地址根据实际Linux IP地址进行修改
3.创建Topic,指定消息的类别
先启动zookeeper(可以status查看,如果没有启动则启动)
[root@hadoop kafka]# /opt/zookeeper/bin/zkServer.sh start
[root@hadoop bin]# jps
20320 DataNode
38257 QuorumPeerMain
38274 Jps
20216 NameNode
21145 Master
21227 Worker
31884 RunJar
20685 ResourceManager
20797 NodeManager
20526 SecondaryNameNode
再在kafka根目录下用命令启动kafka服务
注意:kafka的配置文件要加上应当配置远程监听主机名称(ip根据实际修改)
[root@hadoop kafka]# vim /opt/kafka/config/server.properties
listeners=PLAINTEXT://192.168.10.10:9092
[root@hadoop bin]# cd /opt/kafka/
启动kafka
[root@hadoop kafka]# bin/kafka-server-start.sh config/server.properties
创建Topic
[root@hadoop ~]# kafka-topics.sh --create --topic kafka_spark --partitions 3 --replication-factor 1 --zookeeper hadoop:2181
启动Kafka的消息生产者,并观察IDEA控制台输出
可先在IDEA中运行SparkStreaming_Kafka_createDstream.scala程序,再启动kafka消息生产者,顺序也不一定这样
[root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_spark
>hadoop spark hbase kafka spark
>kafka itcast itcast spark kafka spark kafka
查看IDEA运行结果,可以从kafka读取生产者的数据,实现了词频统计
-------------------------------------------
Time: 1648391930000 ms
-------------------------------------------
(spark,4)
(hadoop,1)
(hbase,1)
(itcast,2)
(kafka,4)
注意:如果我们使用KafkaUtils.createDstream方式时,一开始系统会正常运行,没有任何问题,但是当系统出现异常,重启SparkStreaming程序后,则发现程序会重复处理已经处理过的数据。由于这种方式是使用kafka高级消费者API,topic的offset偏移量是在zookeeper中。虽然这种方式会配合着WAL日志保证数据零丢失的高可靠性,但却无法保证数据只被处理一次,可能会处理两次,因此官方不再推荐使用这种方式,推荐下面的方式。
三、Spark Streaming使用KafkaUtils.createDirectStream方式连接Kafka
1.IDEA项目pom.xml文件导入依赖
确定pom.xml文件中添加Spark Streaming整合Kafka的依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_0-8_2.11</artifactId>
<version>2.3.2</version>
</dependency>
注:spark项目工程已经添加该依赖
2.IDEA项目pom.xml文件导入依赖
在spark项目下,创建一个名为“SparkStreaming_Kafka_createDirectStream”的Scala类,用来编写Spark Streaming应用程序实现词频统计。
SparkStreaming_Kafka_createDirectStream.scala
下面程序的IP地址根据实际Linux IP地址进行修改
3.创建Topic,指定消息的类别
先启动zookeeper(可以status查看,如果没有启动则启动)
[root@hadoop kafka]# /opt/zookeeper/bin/zkServer.sh start
创建Topic
[root@hadoop ~]# kafka-topics.sh --create --topic kafka_direct0 --partitions 3 --replication-factor 1 --zookeeper hadoop:2181
启动Kafka的消息生产者,并观察IDEA控制台输出
可先在IDEA中运行SparkStreaming_Kafka_createDstream.scala程序,再启动kafka消息生产者,顺序也不一定这样
[root@hadoop01 servers]# kafka-console-producer.sh --broker-list hadoop:9092 --topic kafka_direct0
>hadoop spark hbase kafka spark
>kafka itcast itcast spark kafka spark kafka
查看IDEA运行结果,可以从kafka读取生产者的数据,实现了词频统计
-------------------------------------------
Time: 1648393295000 ms
-------------------------------------------
(itcast,2)
(kafka,4)
(spark,4)
(hadoop,1)
(hbase,1)
|