Kafka,RabbitMQ,RockedMQ实际应用大汇总2.RocketMQ&RabbitMQ
- 本文结合官网用例,记载了三大主流mq的实例以及实际运用。
- 本文不涉及相关环境的安装与配置,涉及较为全面的代码(包括配置文件及maven)
- 本文直接上代码及用例,适合对mq已经学过一遍或了解过的同学进行学习&复习,可加入生产环境。
- 关于各种mq的介绍及对比可以参照我之前的文章
- 其实mq的设计思想都差不多,可以细细感受一下mq的设计理念以及基本的服务对象。
- 如果有问题,欢迎留言区或私信进行交流。
- 虽然已经挑重点代码拿出来了,但是一篇文章里写三个mq还是很长,因此还是分为三篇记录吧,这是第二篇,主讲RabbitMQ,RocketMQ。
二:RocketMQ
推荐阅读官方example,讲的非常非常详细了,按照同步异步订阅等等等类别都分好了,阅读example的体验感也很好。 官方example
三:RabbitMQ
官方example同样给的非常详细,但有些并不常用。但官网上的图绝对是学习与理解的利器。 官方example
- 路由模式(工作队列)
- (消费者)监听者
package net.xdclass.direct;
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class Recv{
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.98.122.108");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
package net.xdclass.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.98.122.108");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String error = "我是订单服务的error日志";
String info = "我是订单服务的info日志";
String debug = "我是订单服务的debug日志";
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功");
}
}
}
- 订阅发布模式
- 订阅者(消息消费者)
package net.xdclass.pub;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Recv{
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.98.122.108");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
package net.xdclass.pub;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.98.122.108");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World pub !";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
- 主题模式
- 消费者
package net.xdclass.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class Recv2 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.98.122.108");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body=" + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(queueName,false,consumer);
}
}
package net.xdclass.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.98.122.108");
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/dev");
factory.setPort(5672);
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String error = "我是订单服务的error日志";
String info = "我是订单服务的info日志";
String debug = "我是订单服务的debug日志";
channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("topic发送成功");
}
}
}
交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词
一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order,
而 *.order ,只会匹配 info.order, 之间是使用. 点进行分割多个词的;
如果是 ., 则info.order、error.order都会匹配
|