IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 8.SpringBoot 与消息 -> 正文阅读

[Java知识库]8.SpringBoot 与消息

目录

消息队列

MQ作?

1 处理消息规则

JMS

JMS消息模型

JMS消息种类

AMQP

AMQP消息种类:

AMQP消息模型

MQTT?

2.Spring整合ActiveMQ

安装(以windows为例)

解压即可

启动服务器

?访问web管理服务

整合

1.导?springboot整合ActiveMQ的starter?

2.配置ActiveMQ的服务器地址?

3.使?JmsMessagingTemplate操作ActiveMQ?

4.使?消息监听器

5.切换消息模型

3. SpringBoot整合RabbitMQ

Erlang安装(windows版)

?安装RabbitMQ

启动服务器?

访问web管理服务

?整合(direct模型)

1.导?springboot整合amqp的starter

2.配置RabbitMQ的服务器地址?

3.初始化直连模式系统设置

4.使?AmqpTemplate操作RabbitMQ

5.使?消息监听器

?整合(topic模型)

1.初始化主题模式系统设置

2.使?AmqpTemplate操作RabbitMQ

3.使?消息监听器在服务器启动后,监听指定队列

4 SpringBoot整合RocketMQ?

下载:

RocketMQ?作模式?

?启动服务器

测试服务器启动状态

整合

1.导?springboot整合RocketMQ的starter

2.配置RocketMQ的服务器地址?

3.使?RocketMQTemplate操作RocketMQ?

4.使?消息监听器

5.SpringBoot整合Kafka?

下载安装

启动服务器

创建主题

?测试服务器启动状态

整合

?1.导?springboot整合Kafka的starter

2.配置Kafka的服务器地址?

3.使?KafkaTemplate操作Kafka

4.使?消息监听器


消息队列

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的?种数据结构。指把要传输的数据(消息)放在队列中,?队列机制来实现消息传递——?产者产?消息并把消息放?队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

MQ作?

1.流量消峰

在?量请求时(秒杀场景),使?消息队列做缓冲处理,削弱峰值流量,防?系统在短时间内被峰值流量冲垮。

2.应?解耦

使?消息MQ后,只需要保证消息格式不变,不需要关?发布者及消费者之间的关系,这两者不需要彼此联系

3.异步处理

对于消息的?产者与消费者的?作模式,还可以将消息划分成两种模式,同步消费与异步消息。

1 处理消息规则

?前企业级开发中?泛使?的消息处理技术共三?类,具体如下:

  • JMS
  • AMQP
  • MQTT

????????这些都是规范,就想JDBC技术,是个规范,开发针对规范开发,运?还要靠实现类,例如MySQL提供了JDBC的实现,最终运?靠的还是实现。并且这三类规范都是针对异步消息进?处理的,也符合消息的设计本质,处理异步的业务。

JMS

JMS(Java Message Service),这是?个规范,作?等同于JDBC规范,提供了与消息服务相关的API接?。

JMS消息模型

????????JMS规范中规范了消息有两种模型。分别是点对点模型和发布订阅模型。

????????点对点模型:peer-2-peer,?产者会将消息发送到?个保存消息的容器中,通常使?队列模型,使?队列保存消息。?个队列的消息只能被?个消费者消费,或未被及时消费导致超时。这种模型下,?产者和消费者是?对?绑定的。

????????发布订阅模型:publish-subscribe,?产者将消息发送到?个保存消息的容器中,也是使?队列模型来保存。但是消息可以被多个消费者消费,?产者和消费者完全独?,相互不需要感知对?的存在。

以上这种分类是从消息的?产和消费过程来进?区分,针对消息所包含的信息不同,还可以进?不同类别的划分。

JMS消息种类

????????根据消息中包含的数据种类划分,可以将消息划分成6种消息。

  • TextMessage
  • MapMessage
  • BytesMessage
  • StreamMessage
  • ObjectMessage
  • Message (只有消息头和属性)

????????JMS主张不同种类的消息,消费?式不同,可以根据使?需要选择不同种类的消息。例如ActiveMQ、Redis、HornetMQ。但是也有?些不太规范的实现,参考JMS的标准设计,但是?不完全满?其规范,例如:RabbitMQ、RocketMQ。?

AMQP

????????JMS的设计是J2EE规范,站在Java开发的?度思考问题。但是现实往往是复杂度很?的。?如我有?个.NET开发的系统A,有?个Java开发的系统B,现在要从A系统给B系统发业务消息,结果两边数据格式不统?,没法操作。JMS不是可以统?数据格式吗?提供了6种数据种类,因为A系统的底层语?不是Java语?开发的,根本不?持那些对象。这就意味着如果想使?现有的业务系统A继续开发已经不可能了,必须推翻重新做使?Java语?开发的A系统。

????????AMQP的出现解决的是消息传递时使?的消息种类的问题,化繁为简,但是其并没有完全推翻JMS的操作API,所以说AMQP仅仅是?种协议,规范了数据传输的格式?已。

????????AMQP(advanced message queuing protocol):?种协议(?级消息队列协议,也是消息代理规范),规范了?络交换的数据格式,兼容JMS操作。

优点
????????具有跨平台性,服务器供应商,?产者,消费者可以使?不同的语?来实现

AMQP消息种类:

????????byte[]

????????AMQP在JMS的消息模型基础上?进?了进?步的扩展,除了点对点和发布订阅的模型,开发了?种全新的消息模型,适应各种各样的消息发送。

AMQP消息模型

  • direct exchange
  • fanout exchange
  • topic exchange
  • headers exchange
  • system exchange

?????????前实现了AMQP协议的消息中间件技术也很多,?且都是较为流?的技术,例如:RabbitMQ、StormMQ、RocketMQ?

MQTT?

????????MQTT(Message Queueing Telemetry Transport)消息队列遥测传输,专为?设备设计,是物联?(IOT)?态系统中主要成分之?。由于与JavaEE企业级开发没有交集,此处不作过多的说明。

2.Spring整合ActiveMQ

????????ActiveMQ是MQ产品中的元?级产品,早期标准MQ产品之?,在AMQP协议没有出现之前,占据了消息中间件市场的绝?部分份额,后期因为AMQP系列产品的出现,迅速?弱,?前仅在?些线上运?的产品中出现,新产品开发较少采?。

安装(以windows为例)

??????? 官方下载地址https://activemq.apache.org/components/classic/download/

解压即可

启动服务器

运?bin?录下的win32或win64?录下的activemq.bat命令即可,根据??的操作系统选择即可,默认对外服务端?61616。

?访问web管理服务

????????ActiveMQ启动后会启动?个Web控制台服务,可以通过该服务管理ActiveMQ。

????????http://127.0.0.1:8161/

????????web管理服务默认端?8161,访问后可以打开ActiveMQ的管理界?,如下:

??????????先输?访问?户名和密码,初始化?户名和密码相同,均为:admin,成功登录

点击Mange ActiveMQ broker进入主页面

进?管理后台界?,如下:

?????????看到上述界?视为启动ActiveMQ服务成功。

点对点消息列表

列表各列信息含义如下:

Number Of Pending Messages? :等待消费的消息 ?这个是当前未出队列的数量。

Number Of Consumers? :消费者??? 这个是消费者端的消费者数量

Messages Enqueued? :进入队列的消息 ???进入队列的总数量,包括出队列的。

Messages Dequeued ?:出了队列的消息??? 可以理解为是消费这消费掉的数量。

整合

需求: 订单业务发送短信需求。

处理订单信息的时候会有消息发送。因此会有消息的创建和消息的消费。
我们模拟消息的创建在队列中,消费的时候从队列中获取消息即可。

1.导?springboot整合ActiveMQ的starter?

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2.配置ActiveMQ的服务器地址?

server:
  port: 80
spring:
  activemq:
    # 地址端口
    broker-url: tcp://localhost:61616
  jms:
    template:
      # 给mq起一个名字,不然报错
      default-destination: zzx

3.使?JmsMessagingTemplate操作ActiveMQ?

创建消息服务

public interface MessageService {
    String doMessage();
    void sendMessage(String id);
}
@Service
public class MessageServiceImpl implements MessageService {

    @Autowired
    private JmsMessagingTemplate messagingTemplate;

    //消费消息,从mq中取消息消费
    @Override
    public String doMessage() {
        String id = messagingTemplate.receiveAndConvert(String.class);
        System.out.println("已完成信息业务 id=" + id);
        return id;
    }

    //创建消息,并发送给订单(生产者)
    @Override
    public void sendMessage(String id) {
        System.out.println("发送的消息已经进入mq队列 id=" + id);
        messagingTemplate.convertAndSend(id);
    }
}

????????发送消息需要先将消息的类型转换成字符串,然后再发送,所以是convertAndSend,定义消息发送的位置,和具体的消息内容,此处使?id作为消息

????????接收消息需要先将消息接收到,然后再转换成指定的数据类型,所以是receiveAndConvert,接收消息除了提供读取的位置,还要给出转换后的数据的具体类型。
内容。?

创建订单服务

/**
 * 处理订单的servic
 */
public interface OrderService {

    public void order(String id);
}
@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private MessageService messageService;

    @Override
    public void order(String id) {
        System.out.println("订单开始处理...");
        //订单处理
        messageService.sendMessage(id);

        System.out.println("订单处理结束.");
    }
}

创建controller

@RestController
@RequestMapping("/msgs")
public class MessageController {

    @Autowired
    private MessageService service;

    //消费消息
    @GetMapping
    public String doMessage(){
        String id = service.doMessage();
        return id;
    }
}
/**
 * 订单表现层
 */
@RestController
@RequestMapping("/orders")
public class OrderController{

    @Autowired
    private OrderService orderService;

    @PostMapping("{id}")
    public void order(@PathVariable String id){
        orderService.order(id);
    }

}

postman发送post创建消息请求http://localhost/orders/1

?

?postman发送get消费消息请求http://localhost/msgs

?如果将消息发送到某个mq队列也可以这样设置

@Service
public class MessageServiceImpl implements MessageService {

    @Autowired
    private JmsMessagingTemplate messagingTemplate;

    //消费消息,从mq中取消息消费
    @Override
    public String doMessage() {
        //String id = messagingTemplate.receiveAndConvert(String.class);
        String id = messagingTemplate.receiveAndConvert("order.queue.id", String.class);
        System.out.println("已完成信息业务 id=" + id);
        return id;
    }

    //创建消息,并发送给订单(生产者)
    @Override
    public void sendMessage(String id) {
        System.out.println("发送的消息已经进入mq队列 id=" + id);
        //messagingTemplate.convertAndSend(id);
        messagingTemplate.convertAndSend("order.queue.id",id);
    }
}

?上述使用postman发送请求

4.使?消息监听器

在服务器启动后,监听指定位置,当消息出现后,?即消费消息

@Component
public class MessageListener {

    //配置监听器监听队列
    @JmsListener(destination = "order.queue.id")
    //转发到队列
    @SendTo("order.queue.demo")
    public String recrive(String id){
        System.out.println("已完成信息业务 id=" + id);
        return "listener: "+id;
    }
}

?使?注解@JmsListener定义当前?法监听ActiveMQ中指定名称的消息队列。

如果当前消息队列处理完还需要继续向下传递当前消息到另?个队列中使?注解@SendTo即可,这样即可构造连续执?的顺序消息队列。

5.切换消息模型

由点对点模型到发布订阅模型,修改jms配置即可

server:
  port: 80
spring:
  activemq:
    # 地址端口
    broker-url: tcp://localhost:61616
  jms:
    template:
      # 给mq起一个名字,不然报错
      default-destination: zzx
    #由点对点模型到发布订阅模型
    pub-sub-domain: true

????????pub-sub-domain默认值为false,即点对点模型,修改为true后就是发布订阅模型。?

总结
1. springboot整合ActiveMQ提供了JmsMessagingTemplate对象作为客户端操作消息队列
2. 操作ActiveMQ需要配置ActiveMQ服务器地址,默认端?61616
3. 企业开发时通常使?监听器来处理消息队列中的消息,设置监听器使?注解@JmsListener
4. 配置jms的pub-sub-domain属性可以在点对点模型和发布订阅模型间切换消息模型

3. SpringBoot整合RabbitMQ

RabbitMQ是MQ产品中的?前较为流?的产品之?,它遵从AMQP协议。
RabbitMQ的底层实现语?使?的是Erlang,所以安装RabbitMQ需要先安装Erlang。

RabbitMQ与Erlang版本对应表RabbitMQ Erlang Version Requirements — RabbitMQ

Erlang安装(windows版)

安装包下载地址Downloads - Erlang/OTP

下载完毕后得到exe安装?件,?键傻?式安装,安装完毕需要重启,需要重启,需要重启。

安装的过程中可能会出现依赖Windows组件的提示,根据提示下载安装即可,都是?动执?的,如下:

?Erlang安装后需要配置环境变量,否则RabbitMQ将?法找到安装的Erlang。需要配置项如下,作?等同JDK配置环境变量的作?。

  • ERLANG_HOME
  • PATH

?安装RabbitMQ

????????下载地址:Installing on Windows — RabbitMQ

????????下载完毕后得到exe安装?件,?键傻?式安装

启动服务器?

以管理员身份在安装目录的sbin目录下执行命令

rabbitmq-service.bat start # 启动服务
rabbitmq-service.bat stop # 停?服务
rabbitmqctl status # 查看服务状态

或者运?sbin?录下的rabbitmq-service.bat ,默认对外服务端?5672,后台管理端口15672。

访问web管理服务

????????RabbitMQ也提供有web控制台服务,但是此功能是?个插件,需要先启?才可以使?。

rabbitmq-plugins.bat list # 查看当前所有插件的运?状态
rabbitmq-plugins.bat enable rabbitmq_management # 安装rabbitmq_management插件

启动插件后可以在插件运?状态中查看是否运?,运?后通过浏览器即可打开服务后台管理界??

http://localhost:15672

????????web管理服务默认端?15672,访问后可以打开RabbitMQ的管理界?,如下:?

??????????先输?访问?户名和密码,初始化?户名和密码相同,均为:guest,成功登录后进?管理后台界?,如下:

?整合(direct模型)

????????RabbitMQ满?AMQP协议,因此不同的消息模型对应的制作不同,先使?最简单的direct模型开发。

1.导?springboot整合amqp的starter

amqp协议默认实现为rabbitmq?案

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置RabbitMQ的服务器地址?

server:
  port: 80
spring:
  rabbitmq:
    #地址
    host: localhost
    #对外提供服务端口
    port: 5672

3.初始化直连模式系统设置

????????由于RabbitMQ不同模型要使?不同的交换机,因此需要先初始化RabbitMQ相关
的对象,例如队列,交换机等

@Configuration
public class RabbitConfigDirect {

    //创建消息队列
    @Bean
    public Queue directQueue(){
        return new Queue("direct_queue");
    }
    @Bean
    public Queue directQueue2(){
        return new Queue("direct_queue2");
    }

    //创建交换机
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("directExchange");
    }

    //交换机和队列绑定
    @Bean
    public Binding bindingDirect(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
    }
    @Bean
    public Binding bindingDirect2(){
        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");
    }
}

????????队列Queue与直连交换机DirectExchange创建后,还需要绑定他们之间的关系Binding,这样就可以通过交换机操作对应队列。

4.使?AmqpTemplate操作RabbitMQ

创建服务

public interface MessageService {
    String doMessage();
    void sendMessage(String id);
}
@Service
public class MessageServiceRabbitMQDirectImpl implements MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳?处理队列(rabbitmqdirect),id:"+id);
        amqpTemplate.convertAndSend("directExchange","direct",id);

    }

    @Override
    public String doMessage() {
        return null;
    }
}
/**
 * 处理订单的servic
 */
public interface OrderService {

    public void order(String id);
}
@Service
public class OrderServiceImpl implements OrderService {

    @Autowired
    private MessageService messageService;

    @Override
    public void order(String id) {
        System.out.println("订单开始处理...");
        //订单处理
        messageService.sendMessage(id);

        System.out.println("订单处理结束.");
    }
}

5.使?消息监听器

在服务器启动后,监听指定位置,当消息出现后,?即消费消息

@Component
public class RabbitMessageListener {

    //配置监听器监听队列
    @RabbitListener(queues = "direct_queue")
    public void recrive(String id){
        System.out.println("已经完成消息获取 id=" + id);
    }
}

????????使?注解@RabbitListener定义当前?法监听RabbitMQ中指定名称的消息队列。?

创建controller

/**
 * 订单表现层
 */
@RestController
@RequestMapping("/orders")
public class OrderController{

    @Autowired
    private OrderService orderService;

    @PostMapping("{id}")
    public void order(@PathVariable String id){
        orderService.order(id);
    }

}

postman发送post请求http://localhost/orders/3

?整合(topic模型)

1.初始化主题模式系统设置

@Configuration
public class RabbitConfigTopic {

    //创建消息队列
    @Bean
    public Queue topicQueue(){
        return new Queue("topic_queue");
    }

    //创建交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    //交换机和队列绑定
    @Bean
    public Binding bindingTopic(){
        return BindingBuilder.bind(topicQueue()).to(topicExchange()).with("topic.order.id");
    }

}

????????主题模式?持routingKey匹配模式,*表示匹配?个单词,#表示匹配任意内容,这样就可以通过主题交换机将消息分发到不同的队列中

2.使?AmqpTemplate操作RabbitMQ

@Service
public class MessageServiceRabbitMQTopicImpl implements MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳?处理队列(rabbitmq topic),id:"+id);
        amqpTemplate.convertAndSend("topicExchange","topic.order.id",id);

    }

    @Override
    public String doMessage() {
        return null;
    }
}

????????发送消息后,根据当前提供的routingKey与绑定交换机时设定的routingKey进?匹配,规则匹配成功消息才会进?到对应的队列中。?

3.使?消息监听器在服务器启动后,监听指定队列

@Component
public class RabbitMessageTopicListener {

    //配置监听器监听队列
    @RabbitListener(queues = "topic_queue")
    public void recrive(String id){
        System.out.println("已经完成消息获取 id=" + id);
    }
}

????????使?注解@RabbitListener定义当前?法监听RabbitMQ中指定名称的消息队列。

其他与direct模型一样

?总结

1. springboot整合RabbitMQ提供了AmqpTemplate对象作为客户端操作消息队列
2. 操作ActiveMQ需要配置ActiveMQ服务器地址,默认端?5672
3. 企业开发时通常使?监听器来处理消息队列中的消息,设置监听器使?注@RabbitListener
4. RabbitMQ有5种消息模型,使?的队列相同,但是交换机不同。交换机不同,对应的消息进?的策略也不同

4 SpringBoot整合RocketMQ?

????????RocketMQ 是阿?巴巴在2012年开源的分布式消息中间件,?前已经捐赠给Apache 软件基?会,并于2017年9?25?成为 Apache 的顶级项?。作为经历过多次阿?巴巴双??的洗礼并有稳定出?表现的国产中间件,以其?性能、低延时和?可靠等特性近年来已经也被越来越多的企业使?,它遵从AMQP协议。

下载:

下载地址:下载 | RocketMQ????????Binary 下载

RocketMQ安装后需要配置环境变量,具体如下:

  • ROCKETMQ_HOME
  • PATH
  • NAMESRV_ADDR (建议): 127.0.0.1:9876

??????? RocketMQ是基于jdk8环境完成的,所以最好有一个免安装的jdk8

RocketMQ?作模式?

?????????在RocketMQ中,处理业务的服务器称为broker,?产者与消费者不是直接与broker联系的,?是通过命名服务器进?通信。broker启动后会通知命名服务器??已经上线,这样命名服务器中就保存有所有的broker信息。当?产者与消费者需要连接broker时,通过命名服务器找到对应的处理业务的broker,因此命名服务器在整套结构中起到?个信息中?的作?。并且broker启动前必须保障命名服务器先启动。

?

?启动服务器

在jdk8的环境下,在安装目录的bin目录下运行

mqnamesrv.cmd # 启动命名服务器
mqbroker.cmd # 启动broker

测试服务器启动状态

?RocketMQ提供有?套测试服务器功能的测试程序,运?bin?录下的tools命令即可使?。

tools org.apache.rocketmq.example.quickstart.Producer # ?产消息
tools org.apache.rocketmq.example.quickstart.Consumer # 消费消息?

整合

1.导?springboot整合RocketMQ的starter

此坐标不由springboot维护版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version> 2.2.1</version>
</dependency>

2.配置RocketMQ的服务器地址?

rocketmq:
  #地址
  name-server: loclhost:9876
  #生产者组
  producer:
    group: group_rocketmq

????????设置默认的?产者消费者所属组group。?

3.使?RocketMQTemplate操作RocketMQ?

@Service
public class MessageServiceRocketMQImpl implements MessageService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳?处理队列(rocketMQ),id:"+id);
        //rocketMQTemplate.convertAndSend("order_id",id);//同步执行

        //异步执行
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送失败");
            }
        };
        rocketMQTemplate.asyncSend("order_id",id,callback);
    }


}

?使?asyncSend?法发送异步消息。

4.使?消息监听器

监听指定位置,当消息出现后,?即消费消息

@Component
@RocketMQMessageListener(topic = "order_id",consumerGroup = "group_rocketmq")
public class MessageListener implements RocketMQListener<String> {

    @Override
    public void onMessage(String id) {
        System.out.println("已完成信息业务 id=" + id);
    }
}

?RocketMQ的监听器必须按照标准格式开发,实现RocketMQListener接?,泛型为消息类型。

使?注解@RocketMQMessageListener定义当前类监听RabbitMQ中指定组、指定名称的消息队列。?

总结
1. springboot整合RocketMQ使?RocketMQTemplate对象作为客户端操作消息队列
2. 操作RocketMQ需要配置RocketMQ服务器地址,默认端?9876
3. 企业开发时通常使?监听器来处理消息队列中的消息,设置监听器使?注解
@RocketMQMessageListener?

5.SpringBoot整合Kafka?

?????????Kafka是由Apache软件基?会开发的?个开源流处理平台,由Scala和Java编写。Kafka 是?种?吞吐量的分布式发布订阅消息系统,它可以处理消费者在?站中的所有动作流数据。

下载安装

下载地址Apache Kafka

解压安装即可

启动服务器

????????kafka服务器的功能相当于RocketMQ中的broker,kafka运?还需要?个类似于命名服务器的服务。在kafka安装?录中?带?个类似于命名服务器的?具,叫做zookeeper,它的作?是注册中?

??????? 在安装目录的bin目录里的windows目录下执行命令

zookeeper-server-start.bat ..\..\config\zookeeper.properties # 启动zookeeper
kafka-server-start.bat ..\..\config\server.properties # 启动kafka

????????运?bin?录下的windows?录下的zookeeper-server-start命令即可启动注册中?,默认对外服务端?2181。
????????运?bin?录下的windows?录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端?9092。?

创建主题

????????kakfa也是基于主题操作,操作之前需要先初始化topic。

# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
# 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

?测试服务器启动状态

????????Kafka提供有?套测试服务器功能的测试程序,运?bin?录下的windows?录下的命令即可使?

kafka-console-producer.bat --broker-list localhost:9092 --topic test # 测试?产消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning # 测试消息消费

整合

?1.导?springboot整合Kafka的starter

此坐标由springboot维护版本

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.配置Kafka的服务器地址?

spring:
  kafka:
    #地址
    bootstrap-servers: localhost:9092
    # id
    consumer:
      group-id: order

????????设置默认的?产者消费者所属组id。?

3.使?KafkaTemplate操作Kafka

@Service
public class MessageServiceKafkaImpl implements MessageService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public void sendMessage(String id) {
        System.out.println("待发送短信的订单已纳?处理队列(Kafka),id:"+id);
        kafkaTemplate.send("test",id);

    }

}

?????????使?send?法发送消息,需要传?topic名称。

4.使?消息监听器

服务器启动后,监听指定位置,当消息出现后,?即消费消息?

@Component
public class MessageListener{

    @KafkaListener(topics = "test")
    public void onMessage(ConsumerRecord<String,String> record) {
        System.out.println("已完成信息业务 id=" + record.value());
    }
}

?????????使?注解@KafkaListener定义当前?法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。

总结
1. springboot整合Kafka使?KafkaTemplate对象作为客户端操作消息队列
2. 操作Kafka需要配置Kafka服务器地址,默认端?9092
3. 企业开发时通常使?监听器来处理消息队列中的消息,设置监听器使?注解
@KafkaListener。接收消息保存在形参ConsumerRecord对象中?

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-09-15 01:51:23  更:2022-09-15 01:53:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:22:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码