简单模拟MQ的作用 – 完成异步通讯
如图显示:有了RabbitMQ,你就可以和三个妹子同时进行短信聊天,就不需要为无法同时和三个妹子语音聊天而苦恼
MQ存在的意义 (解决同步通讯的存在的缺点)
如图所示:当用户完成了支付功能,我们需要仓储服务对应商品数量减1。当我们的支付服务频繁调用仓储服务之后,仓储服务挂掉了,这个时候支付服务想再调仓储服务就调不了,然后支付服务也阻塞在这里了,然后系统崩了。
同步存在的问题
初探MQ的作用
1. 服务解耦合
2. 性能提升,吞吐量提高
3. 服务没有强依赖,不担心级联失败问题
ps:概念啥的,谁想看啊
我们直接进入RabbitMQ的使用
安装RabbitMQ
PS:默认你们安装好了docker在自己的虚拟机或者自己的服务器上面了
docker pull rabbitmq:3-management(远程安装,版本可以自主选择)
--------------------------------------
也可以在自己电脑上面下载rabbitmq.tar,然后上传到你的虚拟机或者服务器,然后
docker load -i mq.tar
---------------------------
接下来,我们就可以使用docker运行我们的RabbitMQ了
docker run \
-e RABBITMQ_DEFAULT_USER=xxx \
-e RABBITMQ_DEFAULT_PASS=xxx\
--name mq \
--hostname mq1 \
-p 15672:15672 \ (注意:这是管理页面端口)
-p 5672:5672 \ (注意:这是通讯端口)
-d \ (后台运行)
rabbitmq:3-management
别直接复制上面的运行指令直接运行了,xxx,xxx这两个你自主设置一下,把括号和括号里面的内容删除一下
祝你好运
登录管理页面( ip地址:15672 )
如果你能登录进去说明你的安装配置都没有问题了,恭喜你。
整合RabbitMQ到项目里面
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
我们可以创建一个普通的maven项目为父项目,把依赖放在里面,再创建两个springboot子项目,分别作为消息的发布者和消息的消费者 ps:需要这个项目文件的可加qq:2338244917 然后我们在连个子项目中连接上我们的RabbitMQ
spring:
rabbitmq:
host: 47.98.251.192
port: 5672
username: xxx
password: xxx
virtual-host: /
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test01() {
String queueName = "simple.queue";
String msg = "hello spring amqp!";
rabbitTemplate.convertAndSend(queueName,msg);
}
@RabbitListener(queues = "simple.queue")
public void listenSimpleMessage(String msg) {
System.out.println("消费者接收到simple.queue的消息:"+msg);
}
通过上面的操作,我们就可以让我们的消费者时刻消费发布者的消息了
上面的例子,我们就会出现一个问题。假设:我们有两个消费者(我们可以按照上面的多做一个@RabbitListener(queues=“xxx.xxx”))就行了,这个时候,我们的发布者发送50条信息,消费者我们搞连个来消费这些消息,一个1秒处理50条,一个1秒处理20条。
这样,这两个消费者处理一样数量的消息
原因是因为要一个处理完,下一个才能处理,这样有个乱用,我们要能者多劳
spring:
rabbitmq:
host: 47.98.251.199
username: root
password: 922815
port: 5672
virtual-host: /
listener:
simple:
prefetch: 1 # 每次取一条消息,处理完成才能获取下一条消息
在consumer项目配置一下就可以解决这个问题了
最后聊一聊交换机(Exchange)
交换机不储存消息
FanoutExchange
作用: 当我们消息的发送者发送消息成功之后,我们的FanoutExchange就会让绑定在它上面的队列queue全部都能消费到每一个消息
package cn.itcast.mq.cofig;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("lin.fanout");
}
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Binding bindingQueue1(Queue fanoutQueue1,FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2,FanoutExchange fanoutExchange) {
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
}
@Test
public void testSendFanoutExchange() {
String exchangeName = "lin.fanout";
String message = "Hello Everyone!";
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
前面的代码是在consumer项目里面的,后面的代码是publisher项目的代码
DirectExchange
作用: 我们可以通过DirectExchange(路由交换机),通过发送信息时,绑定routingKey来指定我们发送给哪个消息队列消费
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "lin.direct",type = ExchangeTypes.DIRECT),
key = {"red","blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1消息:["+msg+"]");
}
@Test
public void testSendDirectExchange1() {
String exchangeName = "lin.direct";
String message = "Hello yellow!";
rabbitTemplate.convertAndSend(exchangeName,"yellow",message);
}
TopicExchange
作用: 这玩意的作用相当于把消息队列分类
#:代表0个或者1个单词,*:代表1个单词
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "lin.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue2(String msg) {
System.out.println("消费者接收到topic.queue1的消息:["+msg+"]");
}
@Test
public void testSendTopicExchange1() {
String exchangeName = "lin.topic";
String msg = "Hello,topicExchange!";
rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);
}
ps:最后的最后渴望变成天使 大家别走,听我bb完最后一个知识点,b完我就下班了
SpringAMQP-消息转换器
它的作用就是,当我们消息的发布者发布一个消息是一个对象给rabbitMQ的时候,我们jdk有它一套方式,这方式不但性能不行,还不安全,我们要取代它
引入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
注入@Bean
这个@Bean我们可以在springboot项目入口那个类里面注入 当然在配置类里面注入也可以
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
我们简单来使用一下它
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String,Object> msg) {
System.out.println("接收到object.queue的消息:"+msg);
}
@Test
public void testSendDirectExchange5() {
String queryName = "object.queue";
Map<String,Object> msg = new HashMap<>();
msg.put("name","柳岩");
msg.put("age",21);
rabbitTemplate.convertAndSend(queryName,msg);
}
好了,到此浅谈RabbitMQ就告一段落了
感谢能看到这里还不走的读者
要是还能给我点个赞就更加感谢了
如有问题,可以联系qq:2338244917
作者:随风
|