IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SpringBoot整合Kafka实现生产消费 -> 正文阅读

[大数据]SpringBoot整合Kafka实现生产消费

首先我们看下项目的基本结构:

?在上一篇文章中我们搭建zookeeper和kafka?https://blog.csdn.net/lchmyhua88/article/details/119080644

这你就不在多说这个了,我们先把zookeeper和kafka 服务气起来:

sudo bin/zkServer.sh start

bin/zkCli.sh -server 127.0.0.1:2181

kafka服务:

下面我们来 实现:

application.yml配置如下:

server:
  port: 9990

spring:
  application:
    name:JAVA  Kafka  URL
  # kafka相关配置
  kafka:
    # kafka集群配置
    bootstrap-servers: 192.168.33.10:9092
    ####初始化生产者配置#####
    producer:
      # 重试次数
      retries: 0
      # 应答级别:多少个分区副本北非结束之后向生产者发送ack确认(可选 0、1、all\-1)
      acks: 1
      # 批量大小
      batch-size: 16348
      # 提交延时
      properties:
        linger: 0
      #当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
      # 生产者缓冲区大小(总字节数)
      buffer-memory: 33554432
      # Kafka提供的序列化和反序列化类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    ####初始化消费者配置#####
    consumer:
      # 默认消费组
      group-id: group-user
    properties:
      # 消费会话超时时间
      session:
        timeout: 120000
      # 消费请求超时时间
      request:
        timeout: 120000
      # 是否自动体骄傲offset
      enable-auto-commit: true
      # 提交offset延时(收到消息之后多久提交offset)
      auto-commit-interval: 1000ms
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset;
      # latest:重置为分区中最新的offset(消费分区中新产生的数据);
      # none:只要有一个分区不存在已提交的offset,就抛出异常;
      auto-offset-reset: earliest
      # Kafka提供的序列化和反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer


    # 消费端监听的topic不存在时,项目启动会报错(关掉)
    listener:
      missing-topics-fatal: false


# 自定义配置
kafka:
  bootstrap:
    servers: 192.168.33.10:9092
  topic:
    user: huage
  consumer:
    id: group-user


设置一个主题,10个分区:
@Configuration
public class KafkaInitialConfiguration {

    @Bean
    public NewTopic initTopic(){
        //创建一个主题名称为:topic1  分区数为10,分区副本为2
        return new NewTopic("topic1",10,(short)1);
    }
}

生产者:

@Slf4j
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @Value("${kafka.topic.user}")
    private String topicName;

    public void sendMessage(Message message){
        GsonBuilder gson = new GsonBuilder();
        gson.setPrettyPrinting();
        gson.setDateFormat("yyyy-mm-dd HH:mm:ss");
        String toJson = gson.create().toJson(message);
        kafkaTemplate.send(topicName,toJson);
        log.info("生产者发送消息至kafka:" + message);
    }

}

消费者读取方法:

@Slf4j
@Component
public class KafkaConsumer {

    @Value("${kafka.topic.user}")
    private String topicName;

    public void consume(){
        Properties props = new Properties();
        //设置配置属性
        props.put("bootstrap.servers", "192.168.33.10:9092");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id", "group-user");
        //自动提交offset,每1s提交一次(提交后的消息不再消费,避免重复消费问题)
        props.put("enable.auto.commit","true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms","1000");
        //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
        props.put("auto.offset.reset", "earliest");
        //根据上面的配置,新增消费者对象
        org.apache.kafka.clients.consumer.KafkaConsumer<String,String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
        //订阅topic
        consumer.subscribe(Collections.singletonList(topicName));

        while (true){
            //从服务器拉取数据
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record->{
                System.out.printf("成功消费消息:topic = %s ,partition = %d,offset = %d, key = %s, value = %s%n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            });
        }
    }
}

通过URL来消费

@RestController
@RequestMapping("/kafka/consumer")
public class KafkaConsumerController {

    @KafkaListener(topics = {"topic1"})
    public void receiveMessage(ConsumerRecord<String,Object> record){
        System.out.println("消费信息:" + record.topic() +":" + record.value());
    }

}

通过URL来生成消息

@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @GetMapping("/send/{msg}")
    private void sendMessageByKafkaProducer(@PathVariable("msg") String msg){
        kafkaTemplate.send("topic1",msg);
    }

    /**
     * 添加回调监听
     * @param msg
     */
    @GetMapping("/send/callback/{msg}")
    private void sendMessageByKafkaProducerAndCallback(@PathVariable("msg") String msg){
        kafkaTemplate.send("topic1",msg).addCallback(sucess->{
            //消息成功发送到topic
            String topic = sucess.getRecordMetadata().topic();
            //消息发送到的分区
            int partition = sucess.getRecordMetadata().partition();
            //消息所在分区内地offset
            long offset = sucess.getRecordMetadata().offset();
            System.out.println("消息成功发送到:" + topic + partition + offset);
        },failure->{
            System.out.println("消息发送失败:" + failure.getMessage());
        });
    }
}

消息的消费读取

@RestController
@RequestMapping("/kafka")
public class KafkaMessageController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Autowired
    private KafkaConsumer kafkaConsumer;
    /**
     * 生产者发送信息
     * */
    @RequestMapping("/send")
    public void kafkaSend(){
        Message message = Message.builder()
                .msg("我是从KafkaMessageController 通过KafkaProducer.sendMessage() 方法发送过来的消息")
                .code("200")
                .sentTime("生产者发送消息")
                .sender("huage")
                .build();
        kafkaProducer.sendMessage(message);
    }

    /**
     * 消费者读取信息
     */
    @RequestMapping("/read")
    public void kafkaRead(){
        kafkaConsumer.consume();
    }
}

入口方法

@SpringBootApplication
@RestController
public class huageApplication {

    public static void main(String[] args) {
        SpringApplication.run(huageApplication.class, args);
    }

    @RequestMapping("/")
    public String hello(){
        return "hello huage ,this is springboot project";
    }

}

接着 启动项目:

?

?好了,让我们通过url来访问一下看看是否启动服务:?

在浏览器地址输入:

http://192.168.33.1:9990/看到如下:

?这个时候我们可以发送消息了:

发送消息

http://192.168.33.1:9990/kafka/producer/send/计算机看到几个点

打印 消息

http://192.168.33.1:9990/kafka/producer/send/callback/计算机看到几个点

?

以上可以看到消息发送了

这个时候我们代码来消费消息:

http://192.168.33.1:9990/kafka/read

?

查询消费者组

?bin/kafka-consumer-groups.sh --bootstrap-server 192.168.33.10:9092 --list

查询消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.33.10:9092 --describe --group group-user?

?可以看到现在LAG消息堆积量为0,最大偏移量18,消费正常

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-02 10:53:03  更:2021-08-02 10:54:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 21:48:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码