IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkStreaming消费Kafka无法消费持续阻塞(无异常信息) -> 正文阅读

[大数据]SparkStreaming消费Kafka无法消费持续阻塞(无异常信息)

产生背景:由于工作需要,目前现有查询业务,其他厂商数据库无法支持,高效率的查询响应速度,于是和数据总线对接,实现接入数据,自己进行数据结构化处理。

技术选型:SparkStreamingKafkaElasticSearch

本人集群:SparkStreaming 版本2.3,Kafka的Scala版本2.11-Kafka版本0.10.0.0
(Kafka_2.11-0.10.0.0.jar)?????
消息总线集群:Kafka总线版本,Kafka_2.10-0.10.2.1.jar

由上述可知,Kafka版本均为0.10 只不过Scala版本相差0.01 直接上SparkStreaming消费Kafka代码。

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.*;


public class ShengPanChePai {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("ShengPanChePai")
                                        .set("spark.dynamicAllocation.enabled", "false")
                    .set("spark.serializer","org.apache.spark.serializer.KryoSerializer");
        conf.setMaster("local[5]");

        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("ERROR");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(60));
        String groupId = "test-10";                                              //指定消费者组id
        String topics = "test-sjzx";                                                  //指定topic
        String brokers = "ip:9092,ip:9092,ip:9092";								 //指定kafka地
        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));

        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //设置当前流消费数据为5MB ~ (10MB)
        kafkaParams.put("fetch.message.max.bytes","5485760");

        //每次程序启动获取最新的消费者偏移量
        //kafkaParams.put("auto.offset.reset", "latest");
        //开启消费之偏移量自动提交
        kafkaParams.put("enable.auto.commit", true);

        //链接kafka,获得DStream对象
        JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream
                (ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topicsSet, kafkaParams));

        messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
                consumerRecordJavaRDD.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
                    @Override
                    public void call(Iterator<ConsumerRecord<String, String>> consumerRecordIterator) throws Exception {
                        System.out.println(consumerRecordIterator);
					}
                });
            }
        });
        ssc.start();
        System.out.println("调用函数开始");
        try {
        ssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

直接运行,打印,发现无法遍历答应结果,发现程序已经阻塞(传统代码消费不存在问题,只有SparkStreaming消费存在阻塞),于是断点Debug发现,阻塞在启动函数ssc.start方法中,一直无法调用ssc.awaitTermination(); 回调函数。没有任何异常,没有任何信息,于是第一步考虑是否是,网络和端口的问题,通过telnet ip:9092 端口没有任何问题,远程调用控制台打印话题数据,完全没有问题,切换自己Kafka集群,同样没有问题,于是猜测是否SparkStreaming和 Kafka版本不一致问题,于是远程在官网查询,发现不是0.8客户端,通过0.10不存在集成问题。

?问题反思:只有还原真实场景才能发现问题

搭建本地环境:
? ? ? ? 在本地虚拟机搭建一套一摸一样的环境,使用SprakStreaming消费Kafka发现完全可以消费,没有问题,于是不是环境问题,继续研究发现,如果使用普通Java代码连接数据总线Kafka,测试是否出现问题)

代码一:
package kafka.demo01;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringEncoder;

public class Consumer01 {

?????? public static void main(String[] args) {
????????????? Properties prop = new Properties();
????????????? prop.put("zookeeper.connect", “zkip:2181: zkip:2181: zkip:2181”);
              prop.put("metadata.broker.list", “bkip:端口, bkip:端口, bkip:端口”);????????????
              prop.put("serializer.class", StringEncoder.class.getName());
????????????? prop.put("group.id", "group1");
????????????? ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));

????????????? Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
????????????? // 创建3个线程去消费topic中的内容,每一个线程一个stream流
????????????? topicCountMap.put(“kafka_topic”, 3);
????????????? Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer
??????????????????????????? .createMessageStreams(topicCountMap);

????????????? System.out.println(messageStreams);
????????????? KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(
??????????????????????????? Config.TOPIC_NAME).get(0);

????????????? ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
????????????? while (iterator.hasNext()) {
???????????????????? String msg = new String(iterator.next().message());
???????????????????? System.out.println("收到消息:" + msg);
????????????? }
?????? }
}

由于上述两个版本客户端消费Kafka,所以尚未明确问题到底出现在什么地方,于是将两份测试代码拷贝到本地环境,进行本地环境测试,发现本地环境两份代码运行没有任何问题,于是,考虑对方集群问题。于是在对方集群,查找一个空白的Kafka话题进行数据推送,自己推送,自己消费,首先使用第一中客户端,发现数据消费没有问题,于是使用第二种客户端进行数据消费,发现也可以消费,于是马上使用原有自己环境中的SparkStreaming 进行消费Kafka打印数据,发现数据惊人的将数据打印出来了,于是断定是对方Kafka集群存在问题,于是将对方kafka集群所有话题进行查询,发现一半的话题是可以消费,一半话题无法消费,于是查看所有Kafka的Topic发现,无法消费的Kafka话题的Topic 中的Leader均不相同,于是想到了Kakfa的选举算法,可能是选举出现了问题,说明之前集群一定出现问题,于是电话沟通数据总线,证实了结论。于是查找解决办法,将Kafka 有问题问题的Topic的话题Leader 全部对元数据在Zookeeper进行修改,修改为正常选举状态发现问题都解决了~~~~。?

?

?集群异常对比发现(使用Kafka的查看集群命令)

由上述操作可知,对方集群存在问题,如果使用当前方法进行Kafka 消费,会永远紫色,回调函数无法调用,程序无法运行,于是 有两个思路,将kafka 数据通过代码消费,然后推送到自己的kafka 集群,于是通过测试发现,当前方法会出现,手动记录偏移量,并且需要维护偏移量,操作繁琐,于是寻求第二种方案,Kafka 数据跨集群同步方案。详情见后续文档,待完善.......

参考文档:Kafka数据集群异常(设计SparkStreaming)

kafka broker Leader -1引起spark Streaming不能消费的故障解决方法_微步229的博客-CSDN博客

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-30 08:47:05  更:2022-04-30 08:48:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 1:00:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码