原理图
原生生产者代码
public class Producer {
static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "你好。";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已发送消息:" + message);
channel.close();
connection.close();
}
}
原生消费者代码
public class ComsumerHi {
static final String QUEUE_NAME="hello_queue";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello_queue",true,false,false,null);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
2.工作队列模式(包工头)
在同一个队列中可以有多个消费者,消费者之间对于消息的接收是竞争关系。
注:在简单模式的基础上直接再启动多几个consumer就行了
3.订阅发布模式(联想微博):
一个消息可以被多个消费者接收;其实是使用了订阅模式,交换机类型为:fanout广播
生产者代码:
public class PubSub {
static final String SMS_QUEUE="sms_queue";
static final String EMAIL_QUEUE="email_queue";
static final String FANOUT_EXCHANGE="fanout_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true,false,false,null);
channel.queueDeclare(SMS_QUEUE,true,false,false,null);
channel.queueDeclare(EMAIL_QUEUE,true,false,false,null);
channel.queueBind(SMS_QUEUE,FANOUT_EXCHANGE,"");
channel.queueBind(EMAIL_QUEUE,FANOUT_EXCHANGE,"");
String msg ="gwiogj";
channel.basicPublish(FANOUT_EXCHANGE,"",null,msg.getBytes());
channel.close();
connection.close();
}
}
消费者:
只需要修改一处代码,也就是修改队列名即可:
SMS消费者
channel.basicConsume(SMS_QUEUE,true,defaultConsumer);
EMAIL消费者:
channel.basicConsume(EMAIL_QUEUE,true,defaultConsumer);
但是:万一运维率先启动的是消费者,那么开发人员最好把队列和交换机照着生产者代码声明一遍,防止找不到交换机和队列的绑定关系
★★★4.路由模式(分布式日志收集):
比发布/订阅模式更灵活。 Routing 路由模式要求队列绑定到交换机的时候指定路由key;消费发送时候需要携带路由key;只有消息的路由key与队列路由key完全一致才能让该队列接收到消息。
生产者:
public class ProducerRouting {
static final String ALERT_QUEUE="alert_queue";
static final String LOG_QUEUE="log_queue";
static final String DIRECT_EXCHANGE="direct_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(ALERT_QUEUE,true,false,false,null);
channel.queueDeclare(LOG_QUEUE,true,false,false,null);
channel.queueBind(ALERT_QUEUE,DIRECT_EXCHANGE,"error");
channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"info");
channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"warning");
channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"error");
for (int i = 0; i < 5; i++) {
String msg ="正常info";
channel.basicPublish(DIRECT_EXCHANGE,"info",null,msg.getBytes());
}
for (int i = 0; i < 5; i++) {
String msg ="异常error";
channel.basicPublish(DIRECT_EXCHANGE,"error",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者:
在生产者的队列和交换机都在消费者代码声明好的前提下,只需改动消费者监听的队列名称即可
public class RoutingConsumer {
static final String ALERT_QUEUE="alert_queue";
static final String LOG_QUEUE="log_queue";
static final String DIRECT_EXCHANGE="direct_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello_queue",true,false,false,null);
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true,false,false,null);
channel.queueDeclare(ALERT_QUEUE,true,false,false,null);
channel.queueDeclare(LOG_QUEUE,true,false,false,null);
channel.queueBind(ALERT_QUEUE,DIRECT_EXCHANGE,"error");
channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"info");
channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"warning");
channel.queueBind(LOG_QUEUE,DIRECT_EXCHANGE,"error");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(LOG_QUEUE,true,defaultConsumer);
}
}
topics通配符模式(路由模式升级版):
可以根据路由key将消息传递到对应路由key的队列;队列绑定到交换机的路由key可以有多个;通配符模式中路由key可以使用 * 和 # ;使用了通配符模式之后对于路由Key的配置更加灵活。
生产者:
public class ProducerTopics {
static final String GUANGZHOU_XIAOZHANG_QUEUE="guangzhou_xiaozhang_queue";
static final String ZONGBU_CAIWU_QUEUE="zongbu_caiwu_queue";
static final String TOPIC_EXCHANGE="topic_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);
channel.queueDeclare(GUANGZHOU_XIAOZHANG_QUEUE,true,false,false,null);
channel.queueDeclare(ZONGBU_CAIWU_QUEUE,true,false,false,null);
★★★channel.queueBind(GUANGZHOU_XIAOZHANG_QUEUE,TOPIC_EXCHANGE,"guangzhou.*.*");
★★★channel.queueBind(ZONGBU_CAIWU_QUEUE,TOPIC_EXCHANGE,"*.caiwu.*");
for (int i = 0; i < 5; i++) {
String msg ="正常info";
channel.basicPublish(TOPIC_EXCHANGE,"guangzhou.caiwu.info",null,msg.getBytes());
}
for (int i = 0; i < 5; i++) {
String msg ="异常error";
channel.basicPublish(TOPIC_EXCHANGE,"guangzhou.renshi.error",null,msg.getBytes());
}
for (int i = 0; i < 5; i++) {
String msg ="success";
channel.basicPublish(TOPIC_EXCHANGE,"Santeeyago.caiwu.info",null,msg.getBytes());
}
channel.close();
connection.close();
}
}
消费者:
public class TopicsConsumerXiaozhang {
static final String GUANGZHOU_XIAOZHANG_QUEUE="guangzhou_xiaozhang_queue";
static final String ZONGBU_CAIWU_QUEUE="zongbu_caiwu_queue";
static final String TOPIC_EXCHANGE="topic_exchange";
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true,false,false,null);
channel.queueDeclare(GUANGZHOU_XIAOZHANG_QUEUE,true,false,false,null);
channel.queueDeclare(ZONGBU_CAIWU_QUEUE,true,false,false,null);
channel.queueBind(GUANGZHOU_XIAOZHANG_QUEUE,TOPIC_EXCHANGE,"guangzhou.*.*");
channel.queueBind(ZONGBU_CAIWU_QUEUE,TOPIC_EXCHANGE,"*.caiwu.*");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key为:" + envelope.getRoutingKey());
System.out.println("交换机为:" + envelope.getExchange());
System.out.println("消息id为:" + envelope.getDeliveryTag());
System.out.println("接收到的消息为:" + new String(body, "utf-8"));
}
};
channel.basicConsume(GUANGZHOU_XIAOZHANG_QUEUE,true,defaultConsumer);
}
}
|