IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SpringBoot教程(六)——集成Kafka -> 正文阅读

[大数据]SpringBoot教程(六)——集成Kafka

作者:language-java

1.pom依赖:

????????springboot与kafka版本对应关系:https://spring.io/projects/spring-kafka#overview

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.yml配置:

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=192.168.33.128:9092,192.168.33.128:9093,192.168.33.128:9094
###########【初始化生产者配置】###########
# 重试次数。
spring.kafka.producer.retries=3
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
spring.kafka.producer.acks=-1
# 批量发送的消息数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# 提交延时
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
spring.kafka.producer.properties.linger.ms=0
# 生产端缓冲区大小 32MBproduce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
#spring.kafka.producer.properties.partitioner.class=com.springboot.kafka.config.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID(此处name和消费端注解名称必须一致)
spring.kafka.consumer.properties.group.id=zlx-group
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=false
# enable_auto_commit=false时才生效,有以下几种:
# record 每处理一条commit一次
# batch (默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率
# time 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
# count 累积达到ackCount次的ack去commit
# count_time ackTime或ackCount哪个条件先满足,就commit
# manual listener负责ack,但是背后也是批量上去
# manual_immediate listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
# 提交offset延时(接收到消息后多久提交offset,enable-auto-commit=true才会生效)
spring.kafka.consumer.auto-commit-interval=1000
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=earliest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
###########【批量配置】###########
# 设置批量消费
#spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
#spring.kafka.consumer.max-poll-records=50

3.Kafka配置类:

  1. 定义主题topic
  2. 自定义消费端异常处理
/**
 * @Description 配置类
 * @Date 2021/7/5 13:55
 * @Created by zlx
 */
@Configuration
public class KafkaConfig {

    /**
     * @description: 创建Topic并设置分区数为3,分区副本数为2,如果要修改分区数,只需修改配置值重启项目即可,修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
     * @author: zlx
     * @time: 2021/7/5 14:23
     */
    @Bean
    public NewTopic createTopic1() {
        return new NewTopic(KafkaConstant.TOPIC_ONE_NAME, KafkaConstant.NUM_PARTITIONS, KafkaConstant.REPLICATION);
    }

    

    /**
     * @description: 自定义消费端异常处理器,只有消费端出现异常且未catch才会触发
     * @author: zlx
     * @time: 2021/7/5 14:32
     */
    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return (message, exception, consumer) -> {
            System.out.println("进入异常。。。。。消费异常:" + message.getPayload());
            //do something
            return null;
        };
    }
}

4.消息生产者:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * @param topic     主题
     * @param partition 分区
     * @param message   消息体
     * @description: 发送消息。addCallback回调方法,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
     * @author: zlx
     * @time: 2021/7/5 14:18
     */
    public void sendMessage(String topic, Integer partition, String message) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, partition, null, message);
        kafkaTemplate.send(producerRecord)
                .addCallback(result -> {
                    if (result != null && null != result.getRecordMetadata()) {
                        System.out.println("消费发送成功,消息发送到的topic:" + result.getRecordMetadata().topic() +
                                ",消息发送到的分区:" + result.getRecordMetadata().partition() +
                                ",消息在分区内的offset:" + result.getRecordMetadata().offset());
                    }
                }, throwable -> {
                    //可做补偿机制
                    System.out.println("消费发送失败:" + throwable.getMessage());
                });
    }
}

5.消息消费者:

@Component
public class KafkaConsumer {

    //单条接收消息,
    @KafkaListener(groupId = KafkaConstant.GROUP_NAME, topics = KafkaConstant.TOPIC_ONE_NAME, errorHandler = "consumerAwareErrorHandler")
    public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("topic:" + record.topic() + "|partition:" + record.partition() + "|offset:" + record.offset() + "|value:" + record.value());
        ack.acknowledge();
    }

    //批量接收消息时用List来接收,对应配置文件最后两个
    /*@KafkaListener(groupId = KafkaConstant.GROUP_NAME, topics = KafkaConstant.TOPIC_ONE_NAME)
    public void onMessage2(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        System.out.println(">>>批量消费一次,records.size()=" + records.size());
        for (ConsumerRecord<?, ?> record : records) {
            System.out.println(record.value());
        }
        ack.acknowledge();
        System.out.println("-------------------------------------");
    }*/
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-15 23:47:47  更:2021-07-15 23:49:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/7 20:43:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码