IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot 整合kafka -> 正文阅读

[大数据]springboot 整合kafka

1.kafka集群配置的话需要在本地host文件里配置
如下:

192.168.5.11 kafka1
192.168.5.12 kafka2
192.168.5.13 kafka3
192.168.5.14 kafka4
192.168.5.15 kafka5
192.168.5.16 kafka6

2.先引依赖

 <dependency>
     <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
 </dependency>

在这里插入图片描述
这是对应的版本,千万不要引错版本,负责回报找不到类这种异常
3.在application.yml中进行配置kafka

kafka:
    bootstrap-servers: 192.168.5.11:9092,192.168.5.12:9092,192.168.5.13:9092,192.168.5.14:9092,192.168.5.15:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        linger.ms: 1

    consumer:
      enable-auto-commit: false
      auto-commit-interval: 100ms
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        session.timeout.ms: 15000
      group-id: test-group-id

    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

4.可配置kafkaconfig也可让springboot自动装配

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;
    @Value("${spring.kafka.producer.properties.linger.ms}")
    private int linger;
    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<String, Object>(producerFactory());
    }

    @Bean//通过bean创建(bean的名字为initialTopic)
    public NewTopic initialTopic() {
        return new NewTopic("test",3, (short) 1 );
    }

    @Bean //创建一个kafka管理类,相当于rabbitMQ的管理类rabbitAdmin,没有此bean无法自定义的使用adminClient创建topic
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = new HashMap<>();
        //配置Kafka实例的连接地址
        //kafka的地址,不是zookeeper
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    @Bean  //kafka客户端,在spring中创建这个bean之后可以注入并且创建topic,用于集群环境,创建对个副本
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }

}

5.注入kafkaTemplate,生产者发送消息

 @Slf4j
public class TestHandlerKafka {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    private static final String TOPIC_NOTIFY = "test";

    @Autowired
    private KafkaSendResultHandler producerListener;


    public ResponseStatusList producer() {
        ResponseStatusList responseStatusList = new ResponseStatusList();

        try {
            //发送消息前配置回调
            kafkaTemplate.setProducerListener(producerListener);
            String key = "test";
            ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send(TOPIC_NOTIFY, key, JSON.toJSONString(entity));
            SendResult<String, Object> stringObjectSendResult = send.get();
            log.info("发送消息成功,", stringObjectSendResult);
            responseStatusList.setLocalDate(new Date());
            responseStatusList.setStatusCode("0");
            responseStatusList.setStatusString("正常");
            return responseStatusList;
        } catch (Exception e) {
            log.error(Status.KAFKA_SEND_ERROR.getMsg(), e);
            responseStatusList.setLocalDate(new Date());
            responseStatusList.setStatusCode("1");
            responseStatusList.setStatusString("其他未知错误");
            return responseStatusList;
        }
    }
}


	

6.可进行简单的消费

@Component
public class KafkaConsumer {


    private static final String TOPIC_NOTIFY = "test";

    // 消费监听
    @KafkaListener(topics = {TOPIC_NOTIFY})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.err.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-14 10:59:11  更:2021-07-14 11:01:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 5:12:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码