IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> spring boot kafka初试,多线程尝试 -> 正文阅读

[大数据]spring boot kafka初试,多线程尝试

通用配置:

pom.xml增加配置

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
</dependency>

application.properties增加kafka配置

spring.kafka.bootstrap-servers=ip:9092

一、生产者

a.增加kafka话题配置?

package com.test.demo.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class KafkaInitialConfiguration {
    // 创建一个名为testtopic的Topic并设置分区数为3,分区副本数为1
    // 因为我使用的是单台kafka,所以分区副本只能为1
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("topic1",3, (short) 1 );
    }
}

b.发送消息的服务

package com.test.demo.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;


@Component
public class Pro {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    public void sendMessage1(String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }
}

c.线程池配置(不使用多线程跳过此步)

package com.test.demo.kafka;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;


@Configuration
@EnableAsync
public class AsyncConfiguration {
    @Bean("doSomethingExecutor")
    public Executor doSomethingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数:线程池创建时候初始化的线程数
        executor.setCorePoolSize(100);
        // 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(500);
        // 缓冲队列:用来缓冲执行任务的队列
        executor.setQueueCapacity(500);
        // 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("do-something-");
        // 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        executor.initialize();
        return executor;
    }
}

d.单线程执行方法(不使用多线程跳过此步)

package com.test.demo.kafka;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;


@Component
public class ProThread{

    final Pro pro;

    public ProThread(Pro pro) {
        this.pro = pro;
    }

    @Async("doSomethingExecutor")
    public void run(int i) {
        String name = Thread.currentThread().getName();
        System.out.println(System.currentTimeMillis()+name+"执行异步任务: "+i);
        pro.sendMessage1(i+"");
    }
}

e.测试controller

package com.test.demo.ctr;

import com.test.demo.kafka.ProThread;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/test")
public class TestCtr {
    
    //不使用多线程,此类更换为Pro
    @Autowired
    ProThread pro;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable("message")String message)throws Exception{
        for (int i = 0; i < 1000; i++) {
            pro.run(i);
        }
        return "成功";
    }
}

二、消费者

配置消息监听

package com.test.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Component
public class Cus {

    @Autowired
    KafkaTemplate<String,Object> kafkaTemplate;

    @KafkaListener(topics = "topic1",groupId = "test")
    public void topic_test1(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            System.out.println(System.currentTimeMillis()+"topic1 消费了: Topic:" + topic + ",Message:" + msg);
        }
    }
}

三、测试

测试结果:

四、记录一下kafka部署

a.使用下载命令下载kafka包

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz

b.解压包

tar -zxcf kafka_2.13-2.8.0.tgz

c.修改config/server.properties下的zookeep-ip和服务器ip

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://ip:9092

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeep-ip:2181

d.启动

bin/kafka-server-start.sh config/server.properties

#后台运行在 命令 后加“&”

e.测试

#创建话题 test
bin/kafka-topics.sh --create --bootstrap-server ip:9092 --replication-factor 1 --partitions 1 --topic test

#查询话题列表,检查是否创建成功
bin/kafka-topics.sh --list --bootstrap-server ip:9092

#发送消息
bin/kafka-console-producer.sh --bootstrap-server ip:9092 --topic test
>消息内容


#消费者测试命令
bin/kafka-console-consumer.sh --bootstrap-server ip --topic test --from-beginning

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 12:00:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/28 10:24:11-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码