环境准备
安装RabbitMq: 个人是先在Win10便携机上安装VMWare Workstation, 再安装Cent OS 操作系统,在此基础上安装RabbitMQ。
安装过程可以参考这篇博客: https://blog.csdn.net/hsxy123123/article/details/104006744
需要注意RabbitMQ官网提供的erlang与RabbitMQ的配套版本,按版本安装。
消息生产者Demo代码
private final static String EXCHANGE_NAME = "elon_exchange";
public void produceMessage2Exchange(String messageBody) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.*.*.*");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("*******");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, messageBody.getBytes(Charsets.UTF_8));
LOGGER.info("Sent {}", messageBody);
} catch (Exception e) {
LOGGER.info("Produce message fail.", e);
}
}
消息消费者Demo代码
private final static String EXCHANGE_NAME = "elon_exchange";
public void consumeMessageFromExchange(){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.*.*.*");
factory.setPort(5672);
factory.setUsername("***");
factory.setPassword("*******");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
LOGGER.info("Get queue name:{}", queueName);
channel.queueBind(queueName, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
LOGGER.info("Receive message from binding exchange:{}", message);
};
channel.basicConsume(queueName, true, deliverCallback, tag->{});
} catch (Exception e) {
LOGGER.error("Consume message exception.", e);
}
}
核心逻辑是声明一个随机的队列,绑定到交换器。再通过队列接收交换器转发的消息。多个队列可绑定到同一个交换器,从而启到广播消息的效果。
|