基本概念
什么是消息队列
消息队列是一种应用(进程)间的通信方式。
生产者只需把消息发布到MQ,消费者只需重MQ中取出,可靠传递由消息队列中的消息系统来确保。
消息队列有什么用
消息队列是一种异步协作机制,最根本的用处在于将一些不需要即时生效的操作拆分出来异步执行,从而达到可靠传递、流量削峰等目的。
比如如果有一个业务需要发送短信,可以在主流程完成之后发送消息到MQ后,让主流程完结。而由另外的线程拉取MQ的消息,完成发送短信的操作。
常用的消息队列
常用的MQ大概有ActiveMQ、RabbitMQ、RocketMQ、Kafka
优点:对Java的JMS支持最好;多线程并发;
缺点:历史悠久,版本更新慢。现在慢慢用的少了;
优点:生态丰富,是现在主流的MQ;支持多种客户端、支持AJAX;
缺点:对想深入源码的Java选手不太友好;
优点:为海量数据打造;主张拉模式;天然集群、HA、负载均衡;
缺点:生态较小
优点:分布式高可拓展;高性能;容错强
缺点:消息重复;乱序;维护成本高
什么是RabbitMQ
消息中间件
erlang:一种并发函数式语言
AMQP:Advanced Message Queuing Protocol,高级消息队列协议。由Exchange、Queue和Bind组成
RabbitMQ是一个erlang开发的AMQP实现
生产者将消息发送到Exchange上,通过Exchange从而Binding到Queues上。
Exchange有三种具体类型:
- direct:如果消息中的RoutingKey和Binding中的BindingKey一致就转发
- fanout:消息被分发到所有队列中
- topic:将RoutingKey和队列的模式进行匹配
应用场景
异步
可以理解为将遇到非必须的业务时,立即响应客户端,不关系业务何时完成
比如在用户注册时,有将信息写入数据库和发送注册成功邮件两项业务。
数据库写入完成即标志着用户注册成功,此时如果继续处理发送邮件的业务,会给客户端带来不必要的等待时间。引入消息队列后,在队列中写入完成注册的消息后,即可完成整个注册流程。至于邮件,可以等到邮件业务从消息队列中取出消息再发送。
把不紧急的业务从主线中剥离出来,主线不必考虑不紧急的业务何时完成的时候,可以考虑使用消息队列实现异步。
解耦
考虑两个系统间存在消息传递,一个系统的故障会影响到整个业务的正常运转。可以用消息队列来保证消息可靠传递
比如一个订单系统和一个库存系统,完成订单之后,需要进行库存调度。考虑到如果库存系统故障,会引起已完成的订单消息的丢失,而做很多异常处理会使业务变得臃肿。这个时候可考虑引入消息队列,使用消息队列保证可靠传输,从而减少业务逻辑。
削峰
考虑短时间的大量请求,可能会带来内存溢出、大面积连接超时等情况,使得服务器崩溃。引入消息队列后,可以控制请求到业务处理系统的流量,从而防止崩溃现象的出现。
比如秒杀场景。大量请求同时涌入,服务器不能分配足够的资源响应,或者带宽不足,导致宕机。可以引入消息队列来限流,MQ通过限制同一时间的出口消息,使得流量在服务器能够承受的范围之内。等待一部分请求处理完成之后,再向业务处理系统导入新的消息。
Go语言使用RabbitMQ
docker安装RabbitMQ
sudo docker pull rabbitmq
sudo docker images
sudo docker run --name rabbitmq -d -p 5672:5672
sudo docker ps
go安装rabbitmq客户端
go get github.com/streadway/amqp
HelloRabbitMQ
send.go
func main() {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
q, _ := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
body := "Hello World!"
_ = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
}
recv.go
func main() {
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
q, _ := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
msgs, _ := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
<-forever
}
Reference
几种常见的MQ总结对比
消息队列之RabbitMQ
服务为什么会崩溃
17 | 消息队列:秒杀时如何处理每秒上万次的下单请求?
docker安装RabbitMq
RabbitMQ Go语言客户端教程1——HelloWorld
|