?方式一:
Topic 匹配模式
发送发 EmitLogsTopic.java
package com.zheng.seven;
import com.rabbitmq.client.Channel;
import com.zheng.utils.RabbitMqUtil;
//匹配模式
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
HashMap<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但是只被队列Q2接受一次");
bindingKeyMap.put("quick.brown,fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "不会匹配到");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但是会匹配到");
Channel channel = RabbitMqUtil.getChannel();
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
System.out.println("生产者发出消息"+message);
}
}
}
接收方1 ReceiveLogsTopic.java
package com.zheng.seven;
import com.rabbitmq.client.Channel;
import com.zheng.utils.RabbitMqUtil;
//匹配模式
import java.util.HashMap;
import java.util.Map;
public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
HashMap<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但是只被队列Q2接受一次");
bindingKeyMap.put("quick.brown,fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "不会匹配到");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但是会匹配到");
Channel channel = RabbitMqUtil.getChannel();
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("utf-8"));
System.out.println("生产者发出消息"+message);
}
}
}
接收方2 ReceiveLogsTopic.java
package com.zheng.seven;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;
//声明主题交换机及先相关队列
public class ReceiveLogsTopic02 {
//交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
//接受消息
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q2",false,false,false,null);
channel.queueBind("Q2",EXCHANGE_NAME,"*.*.rabbit");
//*代表一个字符串 #代表多个字符串
channel.queueBind("Q2",EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收消息");
DeliverCallback deliverCallback =(consumerTag,message)->{
System.out.println("已确认消息"+new String(message.getBody(),"utf-8"));
System.out.println("接受队列:"+"Q1"+"绑定键"+message.getEnvelope().getRoutingKey());
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("未确认消息");
};
channel.basicConsume("Q2",true,deliverCallback,cancelCallback,null);
}
}
方式二:
fanout 发布订阅模式
发送方 EmitLogs.java
package com.zheng.five;
import com.rabbitmq.client.Channel;
import com.zheng.utils.RabbitMqUtil;
//任何人都发
import java.util.Scanner;
public class EmitLogs {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));
System.out.println("发送成功");
}
}
}
接收方1?ReceiveLogs01.java
package com.zheng.five;
import com.rabbitmq.client.*;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;
public class ReceiveLogs01 {
//交换机的名称
public static final String EXCHANGE_NAME ="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明一个队列 临时队列
/**
* 生成一个临时队列 队列的名称是随机的
* 当消费者断开与队列的连接时 队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机与队列
*/
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("c1等待接收消息,打印在屏幕上");
DeliverCallback deliverCallback =(consumerTag, message)->{
System.out.println("消息已确认"+new String(message.getBody(),"utf-8"));
};
CancelCallback cancelCallback=( consumerTag)->{
System.out.println("消息未确认");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback,null);
}
}
接收方2?ReceiveLogs02.java
package com.zheng.five;
import com.rabbitmq.client.*;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("c2等待接收消息,打印在屏幕上");
DeliverCallback deliverCallback =(consumerTag, message)->{
System.out.println("消息已确认"+new String(message.getBody(),"utf-8"));
};
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息未确认");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback,null);
}
}
方式三
direct 指定发送模式
发送方? DirectLogs.java
package com.zheng.six;
import com.rabbitmq.client.Channel;
import com.zheng.utils.RabbitMqUtil;
//指定会发给谁
import java.util.Scanner;
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("utf-8"));
System.out.println("发送成功"+message);
}
}
}
接收方?ReceiveLogs03.java
package com.zheng.six;
import com.rabbitmq.client.*;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;
public class ReceiveLogs03 {
//交换机的名称
public static final String EXCHANGE_NAME ="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//声明一个队列 临时队列
/**
* 生成一个临时队列 队列的名称是随机的
* 当消费者断开与队列的连接时 队列就自动删除
*/
channel.queueDeclare("console",false,false,false,null);
/**
* 绑定交换机与队列
*/
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warming");
System.out.println("c1等待接收消息,打印在屏幕上");
DeliverCallback deliverCallback =(consumerTag, message)->{
System.out.println("消息已确认"+new String(message.getBody(),"utf-8"));
};
CancelCallback cancelCallback=( consumerTag)->{
System.out.println("消息未确认");
};
channel.basicConsume("console",true,deliverCallback,cancelCallback,null);
}
}
接收方?ReceiveLogs04.java
package com.zheng.six;
import com.rabbitmq.client.*;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zheng.utils.RabbitMqUtil;
public class ReceiveLogs04 {
//交换机的名称
public static final String EXCHANGE_NAME ="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//声明一个队列 临时队列
/**
* 生成一个临时队列 队列的名称是随机的
* 当消费者断开与队列的连接时 队列就自动删除
*/
channel.queueDeclare("disk",false,false,false,null);
/**
* 绑定交换机与队列
*/
channel.queueBind("disk",EXCHANGE_NAME,"error");
System.out.println("c2等待接收消息,打印在屏幕上");
DeliverCallback deliverCallback =(consumerTag, message)->{
System.out.println("消息已确认"+new String(message.getBody(),"utf-8"));
};
CancelCallback cancelCallback=( consumerTag)->{
System.out.println("消息未确认");
};
channel.basicConsume("disk",true,deliverCallback,cancelCallback,null);
}
}
|