写在前面
本文是使用Go语言实现各种RabbitMQ的中间件模型
1. 介绍
1.1 什么是MQ
MQ (Message Quene) : 翻译为 消息队列 ,通过典型的 生产者 和消费者 模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。
别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
目前市面上有很多消息中间件:RabbitMQ,RocketMQ,Kafka等等…
1.2 什么是RabbitMQ
RabbitMQ 是使用Erlang语言开发的开源消息队列系统,基于AMQP协议 来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。
1.3 AMQP 协议
AMQP(advanced message queuing protocol) 在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。
顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:
2. Go语言操作RabbitMQ
2.1 下载
下载rabbitmq过程就省了,可以直接到官网网站下载安装,像安装qq一样。
2.2 引入驱动
go get github.com/streadway/amqp
var MQ *amqp.Connection
func RabbitMQ(connString string) {
conn, err := amqp.Dial(connString)
if err != nil {
panic(err)
}
MQ = conn
}
2.3 HelloWorld 模型
P代表生产者,C代表消费者,红色部分是队列。 生产者生成消息到队列中,消费者进行消费,直连单点模式。
2.3.1 生产者
var ProductMQ *amqp.Connection
ch, err := ProductMQ.Channel()
q, err := ch.QueueDeclare("hello",
false,
false,
false,
false,
nil,
)
参数1(name):队列名字 参数2(durable):持久化,队列中所有的数据都是在内存中的,如果为true的话,这个通道关闭之后,数据就会存在磁盘中持久化,false的话就会丢弃 参数3(autoDelete):不需要用到队列的时候,是否将消息删除 参数4(exclusive):是否独占队列,true的话,就是只能是这个进程独占这个队列,其他都不能对这个队列进行读写 参数5(noWait):是否阻塞 参数6(args):其他参数
body := "Hello World!"
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
参数1(exchange):交换机,后续会讲到 参数2(route-key):队列名字 参数3(mandatory):是否强制性,
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return 方法将消息返回给生产者 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
参数4(immediate):是否立即处理
当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者
也就是说,mandatory 标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
参数5(msg):发布的消息,ContentType是传输类型,Body是发送的消息。
2.3.2 消费者
ch, err := ConsumerMQ.Channel()
q, err := ch.QueueDeclare(
"hello",
false,
false,
false,
false,
nil,
)
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
由于消费者端需要一直监听,所以我们要用一个for循环+channel去阻塞主进程,使得主进程一直处于监听状态。
forever := make(chan bool)
go func() {
for d := range msgs {
fmt.Printf("Received a message: %s", d.Body)
}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
2.3.3 结果
2.4 Work Queues 模型
Work queues ,也被称为(Task queues ),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work queues模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
2.4.1 生产者
生成10条消息到队列中
body := "Hello World! "
for i := 0; i < 10; i++ {
msg := strconv.Itoa(i)
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body+msg),
})
}
2.4.2 消费者
创建两个一样的消费者进行监听消费,与上面2.3.2 的消费者保持一致
2.4.3 结果
消费者1号 消费者2号
2.5 Publish/Subscribe 模型
fanout 扇出 也称为广播
在广播模式下,消息发送流程如下:
- 可以有多个消费者
- 每个消费者有自己的queue(队列)
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
2.5.1 生产者
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)
参数1(name):交换机名称 参数2(kind):交换机类型
_ = ch.Publish("logs", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
2.5.2 消费者
_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "", "logs", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
2.5.3 结果
2.6 Routing 模型
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
在fanout 模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct 类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey (路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey 。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
2.6.1 生产者
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_direct", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
2.6.2 消费者
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
2.7 Topics 模型
Topic 类型的Exchange 与Direct 相比,都是可以根据RoutingKey 把消息路由到不同的队列。只不过Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!
这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 统配符
* 匹配不多不少恰好1个词 # 匹配一个或多个词 - 如:
fan.# 匹配 fan.one.two 或者 fan.one 等 fan.* 只能匹配 fan.one
2.7.1 生产者
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
body := "Hello World "
_ = ch.Publish("logs_topic", "", false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
2.7.2 消费者
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, )
q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, )
_ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, )
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )
2.8 RPC 模型
日后补充
|