IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ 相关整合实战项目 Day01 -> 正文阅读

[Java知识库]RabbitMQ 相关整合实战项目 Day01

RabbitMQ 整合 PD 商城实战项目流程总结

一、订单流量削峰(解耦)

简单模式,若多添加几个消费者则用工作模式。

导入商城项目

将 step5 课前资料里面 /elasticsearch/pd商城.zip 解压。

第二层目录下 pd-web 文件夹解压到 rabbitmq 工程目录下面。

在这里插入图片描述

修改 pom.xml 文件中的 SpringBoot 版本为 2.3.2.RELEASE(可拖拽至 idea 内更改)。

在这里插入图片描述

在 rabbitmq 工程中导入模块(若之前在 idea 内更改则可以直接右键 Add As Maven 导入)。
或者打开工程结构 Project Structures 手动添加也可。

使用 sqlyog 导入项目目录中的 pd.sql:

在这里插入图片描述

右键复制脚本文件的路径,在 sqlyog 里面右键连接,从 SQL 转储文件导入数据库。

如果导入失败,可以增大 mysql 缓存区
set global max_allowed_packet=100000000;
set global net_buffer_length=100000;
SET GLOBAL  interactive_timeout=28800000;
SET GLOBAL  wait_timeout=28800000

因为大环境变化,MySQL 5.5 以下版本的用起来会很费劲,请至少更新至5.7。MariaDB 要 10.4 以上。

确认工程 JDK 的配置(吐槽:坑爹的涛哥这里准备的项目是 GBK 编码的,属实难顶)。

在这里插入图片描述

为了本项目的运行,首先将数据库 pd_store 的 user、order、order_item 表清空:

delete from pd_user
delete from pd_order
delete from pd_order_item

找到主启动类 RunPdApp ,运行,然后更改启动配置中的工作目录为本项目,改完重新启动。

在这里插入图片描述

打开 localhost 进入模拟拼多商城,注册一个新用户(随意填写,注意格式即可)。

在这里插入图片描述

注册成功以后,点击地址管理,添加一个收货地址(只填必填项即可)。

在这里插入图片描述

返回起始页,选择一个商品,加入购物车并下订单(无需支付,订单会添加到数据库内)。

在这里插入图片描述

查看数据库是否正确存储数据。

在这里插入图片描述

生产者 - 发送订单

修改订单,把订单发送到 rabbitmq

返回 pd-web 项目,打开 service 下的 OrderServiceImpl 实现类,注释掉 64-89 行关于数据库操作的代码。这里是为了不直接操作数据库,而是发送到 rabbitmq 以后再继续操作。

pom.xml 添加 rabbitmq 依赖。

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

application.yml 添加 rabbitmq 连接。

spring:  
  rabbitmq:
    host: 192.168.64.140  # wht6.cn
    port: 5672
    username: admin
    password: admin
    # virtual-host: 自己的空间名

启动类(或者新建自动配置类)

  • 提供使用的队列参数:
    orderQueue,持久队列(订单不能丢),不独占(并行处理效率高),不自动删除。

  • 创建 spring 的 Queue 对象,用来封装队列的参数。RabbitAutoConfiguration 自动配置类,会自动发现 Queue 实例,使用这里的参数,连接服务器在服务器上创建队列。
    1)通过方法创建一个 Queue 队列,注意引包要用 amqp 核心包。
    import org.springframework.amqp.core.Queue
    2)直接返回一个 Queue 对象,参数为上述参数(队列名,true,false,false)。
    3)最后将方法交给 Spring 容器管理。

package com.pd;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
@MapperScan("com.pd.mapper")
public class RunPdAPP{
	
	public static void main(String[] args) {
		SpringApplication.run(RunPdAPP.class, args);
	}

	@Bean
	public Queue orderQueue(){
		return new Queue("orderQueue",true,false,false);
	}
}

修改 OrderServiceImpl,向 Rabbitmq 发送订单。

  • saveOrder 方法上添加自动注入 AmqpTemplate 对象,该对象为 Spring 提供的用来发送消息的工具对象。

  • 在注释的方法里,要求将 pdOrder 发送到 orderQueue 队列: t.convertAndSend("orderQueue",pdOrder);

  • 这里的 convertAndSend 方法是转换并发送,先转成 byte[],再发送,订单对象会被自动序列化成 byte[]。

在这里插入图片描述

重启启动类,重新下订单进行测试:

  • 下订单以后,会发现数据库里没有新订单。
    此时查看 Rabbitmq 服务器,在 orderQueue 队列会发现有一条就绪的消息。
    点开 orderQueue 队列,查看下方的 Get messages 会有很长的一串代码(消息):

在这里插入图片描述

有消息即可。

订单 - 消费者

1.找到项目目录,复制一份 pd-web 起名为 pd-web-consumer 作为消费者端。

2.修改 pom.xml 文件:

  • 将 artifactId 改为 pd-web-consumer(可直接拖拽 pom 文件至 idea)。

在这里插入图片描述

3.修改 application.yml 文件:

  • 因为之前的生产者端已经占用了 80 端口,所以这里将 server.port 改为 81。

4.添加依赖、添加连接、队列参数:前面做过了所以省略。

5.新建消费者类(OrderConsumer),从 orderQueue 接收消息:

  • 在 pd 包内创建 OrderConsumer 类。

  • 添加 @Component 注解自动创建实例,添加 @RabbitListener 注解指定接收消息的队列。

  • 在接收订单时,需要调用 OrderService 对象来接收,因此添加自动注入 OrderService 对象。

  • 创建 receive 方法,参数为 PdOrder 对象,并添加 @RabbitHandler 注解。

    @RabbitHandler 注解是用来配合 @RabbitListener 注解的。

    因此指定处理消息的方法的类中,只能有一个 @RabbitHandler 注解。

6.调用原来的业务代码,存储订单

  • 通过 OrderService 对象调用 saveOrder 方法接收订单,最后输出提示 “订单已存储” 。
  • Spring 会自动创建实例,自动启动消费者,自动开始从队列获取消息,自动执行处理消息的方法。
package com.pd;

import com.pd.pojo.PdOrder;
import com.pd.service.OrderService;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "orderQueue")
public class OrderConsumer {

    @Autowired
    private OrderService orderService;
    @RabbitHandler
    public void receive(PdOrder pdOrder) throws Exception {
        orderService.saveOrder(pdOrder);
        System.out.println("-----------------------------------订单已存储!wow!");
    }
}

7.修改 OrderServiceImpl,还原成原来的数据库操作的代码即可。

  • 将注入的 AmqpTemplate 对象删掉,saveOrder 前面自动序列化的代码也删掉,数据库注释部分还原。

  • 因为之前的是随机生成的 orderId,现在因为前面已经获取了 PdOrder 的对象。

    所以直接添加一行代码:获取 pdOrder 的 orderId。
    String orderId = pdOrder.getOrderId();

8.消息传递测试:

  • 由于之前发送的订单对象需要序列化,所以这里需要重新下一个订单来测试传递。
  • 重新下完订单以后,启动 81 端口的 RunPdApp 服务,消息会自动传递到消费者这里。
  • 控制台显示 “订单已存储” 即可。

在这里插入图片描述

9.合理分发相关:

  • ack – spring 封装的 rabbitmq,已经是手动 ack 模式了,spring 会自动发送回执。

  • qos=1 – spring 默认设置的 qos 是250,yml 中 可以添加 prefetch = 1 把 qos 设置成 1。

spring:  
  rabbitmq:  
	listener:
      simple:
        prefetch: 1

10.持久化相关:

队列持久化参数设定为 true;

生产者发送的消息默认是持久消息(即生产者利用 AmqpTemplate 调用 convertAndSend 的方法)。

二、SpringBoot 整合 RabbitMQ

创建项目

  • 在 rabbitmq 目录下创建 SpringBoot 工程模块:rabbitmq-spring,只添加 rabbitmq 依赖即可。

  • 修改 pom.xml 文件:Spring Boot 版本改为 2.3.2.RELEASE

  • 修改 application.yml 文件:添加 rabbitmq 连接

    spring:
      rabbitmq:
        host: 192.168.64.140  # wht6.cn
        port: 5672
        username: admin
        password: admin
        # virtual-host:你的空间名
    

项目结构

  • 创建结构和之前类似,虽然项目为我们自动创建了一个启动类(RabbitmqSpringApplication),但是我们想同时启动多个模式下的生产者和消费者时会起冲突,因此为了方便测试运行我们每个包内单独创建一个启动类(Main)来运行该包下的测试程序。
  • 分别创建 m1/m2/m3/m4/m5 包,目录下创建 Main 启动类、Producer 生产者类和 Consumer 消费者类。

M1 项目测试

M1 项目为默认模式的测试,即只有一个消费者使用一个队列接收一个生产者的模式。

Producer 类

首先我们完成生产者类,生产者类负责向某个队列发送消息,通过对象转换并发送自己想发送的信息。

  • 添加 @Component 注解自动创建实例。

  • 自动注入 AmqpTemplate 对象。

  • 自定义一个发送方法(send),用 AmqpTemplate 对象调用 convertAndSend 方法将信息发送。

    方法参数为:( 队列名,具体信息内容 )这里该方法会自动转换字符串内容。

  • 这里同意规定测试队列名为 ”helloworld“,发送信息随意。

package cn.tedu.rabbitmqspring.m1;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(){
        t.convertAndSend("helloworld","Hello-THE-FKING-World");
    }
}

Consumer 类

  • 添加 @Component 注解自动创建实例。

  • 通常我们在消费者端写接收方法时会在类上添加 @RabbitListener 注解,并在接收方法上添加 @RabbitHandler 注解来配合。

    这里我们选择直接在方法上添加 @RabbitListener 注解来表示接收。

    每个 @RabbitListener 注解,都会注册启动一个消费者。

    直接加在方法上面表示该队列只有这一个方法用来接收。

    这样做的好处是接收代码可以集中,维护起来更加方便。

  • 自定义一个接收方法(receive),上面通过 @RabbitListener 注解限定接收的队列。

    方法参数为 String 对象(发送过来的数据),并输出 “收到: ” + 内容。

package cn.tedu.rabbitmqspring.m1;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    @RabbitListener(queues = "helloworld")
    public void receive(String str){
        System.out.println("收到: "+str);
    }
}

Main 启动类

  • 完成 Spring 启动类的基本配置。

  • 创建一个 Queue 实例,封装队列参数:

    定义一个方法(helloWorldQueue),返回 Queue 队列,该队列设定为非持久队列,并交给容器管理。

  • 添加测试代码,调用生产者发送消息:

    为了调用生产者,所以进行 Producer 的注入。

    定义测试方法(test),利用 Producer 对象调用 send 方法发送信息。

    这里可以添加 @PostConstruct 注解,该注解用于初始化资源,可以事先将这些文件加载到内存里,需要时直接从内存调用。

  • spring 的执行流程:
    包扫描创建所有实例 --> 完成所有依赖的注入 --> @PostConstruct --> 后续流程

  • 这里执行代码时,自动配置类有可能还没有创建队列,消费者可能也还没有启动第一次,执行可能丢失信息。

package cn.tedu.rabbitmqspring.m1;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class,args);
    }
    @Bean
    public Queue helloWorldQueue(){
        return new Queue("helloworld",false);
    }
    @Autowired
    private Producer p;
    @PostConstruct
    public void test(){
        p.send();
    }
}

启动测试:测试有收到消息即可。

在这里插入图片描述

M2 项目测试

M2 项目测试为轮询模式(工作模式),即有多台消费者用一个队列接收生产者的模式。

基本项目结构和 M1 类似,我们可以直接复制 M1 项目的三个类到 M2 里。

Producer 类

  • 本项目的生产者采用工作模式,向 task_queue 队列发送消息。

  • 主要更改自定义发送方法的方法体,工作模式需要不断地向消费者发送信息,所以采用 while-true

    死循环来模拟频繁的发送。增加输入消息提示并获取信息。

  • 最后还是调用 convertAndSend 方法发送,参数为( 队列名,消息内容,消息预处理器)

    此处的消息预处理器,可以重新设置消息的属性,本次测试不采用所以可以不写。

package cn.tedu.rabbitmqspring.m2;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(){
        while (true) {
            System.out.println("输入消息: ");
            String str = new Scanner(System.in).nextLine();
            t.convertAndSend("task_queue",str);
        }
    }
}

Consumer 类

  • 工作模式下,通常是多个消费者共用一个队列来接收消费者的消息,我们模拟两个消费者来测试。
  • 在 M1 的基础上,将队列名改为 task_queue,并复制出另一个发送方法模拟两个同时启动。
  • 相应的方法名和输出提示也要更改。
package cn.tedu.rabbitmqspring.m2;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    @RabbitListener(queues = "task_queue")
    public void receive1(String str){
        System.out.println("Consumer 1 收到: "+str);
    }
    @RabbitListener(queues = "task_queue")
    public void receive2(String str){
        System.out.println("Consumer 2 收到: "+str);
    }
}

Main 启动类

  • 在工作模式中,队列为共用的,所以要创建一个持久型队列,更改队列方法名和队列参数。

  • 由于前面发送方法为死循环,send 方法不会停止跳出, 所以PostConstruct也不会继续。

    这样就会导致后续流程不会进行,虽然可以一直输出消息,但是消费者端却不会收到消息。

  • 所以我们要把 send 方法放到一个新线程里面去执行,不会占用主线程。方法可以用 lambda 表达式省略写。

package cn.tedu.rabbitmqspring.m2;

import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class,args);
    }

    @Bean
    public Queue taskQueue(){
        return new Queue("task_queue");
    }
    @Autowired
    private Producer p;
    @PostConstruct
    public void test(){
        new Thread(()->p.send()).start();
    }
}

启动测试:有消息输出即可。

在这里插入图片描述

M3 项目测试

M3 项目测试为发布订阅模式(广播模式),即生产者向 fanout 交换机发送消息。

只有与之绑定队列的消费者才能收到信息。

基本项目结构和 M2 类似,我们可以直接复制 M2 项目的三个类到 M3 里。

Producer 类

  • 因为本项目是订阅模式测试,所以并不是向队列发送消息,而是向 fanout 交换机发送消息。
  • 只需要将发送方法(send)里面调用 convertAndSend 方法更改即可。
  • 方法参数为( 交换机名,路由键,发送消息),订阅模式不设置路由键(空串即可)。
  • 交换机名设定为 “logs”(其他的也可以,但注意后面的名字需要与其一致)。
package cn.tedu.rabbitmqspring.m3;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Scanner;

@Component
public class Producer {
    @Autowired
    private AmqpTemplate t;

    public void send(){
        while (true) {
            System.out.println("输入消息: ");
            String str = new Scanner(System.in).nextLine();
            t.convertAndSend("logs","",str);
        }
    }
}

Consumer 类

  • 订阅模式下,Consumer 需要创建随机队列,然后和交换机绑定。

  • 具体更改的地方只有 @RabbitListener 注解的参数部分:

    M1-M2 测试中都是与队列进行绑定参数,这里我们要使用绑定参数(bindings)。

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, 
            exchange = @Exchange(name = "logs", declare = "false")
    ))
    

    其中:

    由于群发没有关键词,所以 @QueueBinding 处可以直接绑定。

    value = @Queue 当后面没有配置参数时就是默认:随机命名,非持久,独占,自动删除的队列。

    交换机参数里面的 declare = “false” 代表:不重新定义,不新创建交换机。

    ( 吐槽:这注解里面套注解,是真的不做人 … )

  • 最后将配置复制到第二个方法上即可。

package cn.tedu.rabbitmqspring.m3;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void receive1(String str){
        System.out.println("Consumer 1 收到: "+str);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue,
            exchange = @Exchange(name = "logs",declare = "false")
    ))
    public void receive2(String str){
        System.out.println("Consumer 2 收到: "+str);
    }
}

Main 启动类

  • 创建队列方法改为创建交换机,并封装交换机参数:

    定义一个创建交换机方法(logsExchange),返回 Fanout 交换机(FanoutExchange)。

    交换机参数为( 交换机名,持久化参数,自动删除参数)。

    我们这里设置交换机为非持久,非自动删除的交换机,交换机名应于上面一致。

package cn.tedu.rabbitmqspring.m3;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import javax.annotation.PostConstruct;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class,args);
    }
    @Bean
    public FanoutExchange logsExchange(){
        return new FanoutExchange("logs",false,false);
    }
    @Autowired
    private Producer p;
    @PostConstruct
    public void test(){
        new Thread(()->p.send()).start();
    }
}

启动测试:测试两个消费者均收到消息即为成功。

在这里插入图片描述
Day01结束

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-09-18 10:00:25  更:2021-09-18 10:03:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 17:00:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码