什么是RabbitMq?
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.
通俗的说RabbitMq就是用来接收和转发消息的,本身不对消息进行任何处理,只是接收和转发。
为什么要用RabbitMq?
1、解耦 这里有两个模块,用户注册完后短信通知用户注册成功,这里用户注册就依赖于短信通知,存在耦合,当短信通知出现异常时,用户注册也会影响使用,下面引入消息队列。 引入消息队列后用户注册于短信通知已经解耦了,此后用户注册完后发送消息队列后就可以不管了,剩下的就交给消息队列了。
2、异步 ??还是上面的案例,本来用户注册完后需要等待短信通知调用完后才能给用户回馈,引入消息队列后用户注册和短信通知就可以异步执行,大大提高了响应速度。
3、流量削峰 ??举个例子,某个特殊的日子,有个商家搞整点优惠活动,力度还挺大,听到这个消息,大家都跃跃欲试。到了时间点,大家打开活动网站,准备下单,发现网站崩了,原来是到了整点,大量的请求发给了服务器,服务器一时间难以接受,就挂了。怎么解决,MQ的流量削峰。
引入MQ
RabbitMq的组成
- Producer 发送消息的一方
- Exchange(交换机)Producer 将消息发送给交换机,交换机来实现消息的分发
- Queue(队列) 存储消息的地方
- Consumer 信息的消费者
RabbitMq分发策略
简单队列
生产者的实现:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="Hello World";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行后:
消费者的实现:
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
try{
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String info=new String(message.getBody(),"utf-8");
System.out.println(" [x] Received '" + info + "'");
}
}, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行后:
工作队列
消息生产者:
public class WorkSend {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection=null;
Channel channel=null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.basicQos(1);
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
String info="work task info--------";
for(int i=0;i<10;i++){
System.out.println(info.getBytes());
channel.basicPublish("",TASK_QUEUE_NAME,null,(info+i).getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者(这里创建了两个消费者,代码都一样):
public class WorkRecv2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection=null;
try {
connection=connectionFactory.newConnection();
final Channel channel=connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
channel.basicConsume(TASK_QUEUE_NAME, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String info = new String(message.getBody(), "utf-8");
System.out.println(info);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
运行后(两个消费者每人每次消费一条信息,依次消费):
分发队列
生产者产生消息,只要订阅了该队列的都能收到消息通知
消息生产者:
public class FanoutExchange {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "info: Hello World!";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
运行后: 消息消费者:
public class FanoutRecive {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
运行后:
Routing
通过路由key绑定队列与交换机,交换机可以定点投放信息 消息生产者:
public class RouteSend {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message="发给blue的";
channel.basicPublish(EXCHANGE_NAME, "blue", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent "+ message + "'");
}
}
}
运行后: 消息消费者(这里创建两个消费者,只需将绑定的路由key改下就行):
public class BlueRecv {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "blue");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
运行后:
Topics
例如 lazy.# 能匹配 lazy.orange.elephant , lazy.brown.fox ,而 lazy.*.fox 只能匹配lazy.brown.fox
消息生产者:
public class TopicSend {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message="info to lazy.#-----";
channel.basicPublish(EXCHANGE_NAME, "lazy.red.bird", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent " + message + "'");
}
}
}
消费者代码(创建了两个消费者,改一下routingKey就行,改为 lazy.*.bird):
public class TopicRecv {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println(" [*] Waiting for messages.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
运行后(两个都收到了消息):
|