IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> java操作rabbitmq消费者示例 -> 正文阅读

[Java知识库]java操作rabbitmq消费者示例

? ? rabbitmq作为消息队列,在实际应用中很常见,生产者将消息发送到某个队列,消费者消费这个队列。

? ? ?消息在队列中,消费者要消费,需要监听队列,简单的来说,就是注册一个方法到消息通道,这个方法就会在有消息的时候执行。

? ? ?下面通过java来操作rabbitmq,给出代码示例。

? ? ? 这里介绍三种消费者示例方法

? ? ? 1、最简单的注册。

? ? ? ? ? ? 直接通过channel.basicConsume()设置callback。

? ? ? 2、模拟一个队列消费者,启动线程。

? ? ? ? ? ? ?自定义一个消费者,并且将消费者当做一个线程启动。

? ? ? ?以上只需要amqp-client依赖?

<dependency>
    	<groupId>com.rabbitmq</groupId>
    	<artifactId>amqp-client</artifactId>
    	<version>5.4.3</version>
</dependency>

? ? ? 3、利用springboot对amqp的支持,通过自定义MessageListener然后,绑定到Container。

? ? ? ? ? ? ?这是最常见的做法,自定义监听器,设置消息处理方法。将监听器注入消息监听容器,最后启动容器。

? ? ?这里需要加入spring-boot-starter-amqp依赖。

<dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    	<version>2.1.4.RELEASE</version>
</dependency>

? ? 下面给出以上介绍的三种消费者示例代码:

? ? ? 1、这个做法比较原始,也是最容易看懂的。

package com.xxx.huali.hualitest.amqp;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class SimpleConsumer {
	
	private static final String host = "192.168.226.100";
	private static final String username = "huali";
	private static final String password = "huali";
	private static final String queue_name1 = "activate";
	private static final String exchange_name = "core.down.topic";
	public static void main(String[] args) {
		Connection conn = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setUsername(username);
			factory.setPassword(password);
			factory.setVirtualHost("/mec");
			
			//connection
			conn = factory.newConnection();
			//channel
			channel = conn.createChannel();
			//queue
			channel.queueDeclare(queue_name1, true, false, false, null);
			//exchange
			channel.exchangeDeclare(exchange_name, "topic",true);
			channel.queueBind(queue_name1, exchange_name, "TOPIC.*.*.*.*");
			Consumer callback = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, 
BasicProperties properties,byte[] body) throws IOException {
					System.out.println("received message -> "+new String(body));
				}
			};
			channel.basicConsume(queue_name1, true, callback);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

? ? ? 2、这种方式在第一种的基础上,增加了线程,启动线程,设置监听,有点像第三种方式。

package com.xxx.huali.hualitest.amqp;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
public class QueueConsumer implements Consumer,Runnable{
	private static final String host = "192.168.226.100";
	private static final String username = "huali";
	private static final String password = "huali";
	private static final String queue_name1 = "activate";
	private static final String exchange_name = "core.down.topic";
	Connection conn = null;
	Channel channel = null;
	public QueueConsumer(){
		
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setUsername(username);
			factory.setPassword(password);
			factory.setVirtualHost("/mec");
			
			//connection
			conn = factory.newConnection();
			//channel
			channel = conn.createChannel();
			
			//queue
			channel.queueDeclare(queue_name1, true, false, false, null);
			//exchange
			channel.exchangeDeclare(exchange_name, "topic",true);
			channel.queueBind(queue_name1, exchange_name, "TOPIC.*.*.*.*");
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void close() throws IOException{
		try {
			channel.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
		this.conn.close();
	}

	@Override
	public void run() {
		try {
			channel.basicConsume(queue_name1, true, this);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void handleConsumeOk(String consumerTag) {}

	@Override
	public void handleCancelOk(String consumerTag) {}

	@Override
	public void handleCancel(String consumerTag) throws IOException {}

	@Override
	public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, 
byte[] body) throws IOException {
		
		System.out.println("client received msg -> "+new String(body));
	}

	@Override
	public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {}

	@Override
	public void handleRecoverOk(String consumerTag) {}
	
}

? ? 启动主函数:

package com.xxx.huali.hualitest.amqp;

public class ConsumerMain {
	public static void main(String[] args) {
		QueueConsumer consumer = new QueueConsumer();
		Thread thread = new Thread(consumer);
		thread.start();
	}
}

? ? ? 3、 第三种方式需要借助spring-amqp的支持。它在实际开发中最常见,在springboot中,ConnectionFactory都直接配置了,连代码都不需要码了,我们只需要配置一个消息监听器就可以了。

package com.xxx.springbootamqp.test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {

		System.out.println("received message -> "+new String(message.getBody()));
	}

}

启动主函数:

package com.xxx.springbootamqp.test;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
public class ConsumerMain {
	private static final String host = "192.168.226.100";
	private static final String username = "huali";
	private static final String password = "huali";
	private static final String queue_name1 = "activate";
	public static void main(String[] args) {
		try {
			CachingConnectionFactory factory = new CachingConnectionFactory();
			factory.setHost(host);
			factory.setUsername(username);
			factory.setPassword(password);
			factory.setVirtualHost("/mec");
			
			SimpleMessageListenerContainer container = new 
SimpleMessageListenerContainer(factory);
			container.setMessageListener(new MyMessageListener());
			container.setQueueNames(queue_name1);
			container.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

? ? 这段代码严格来说,并不完整,没有声明队列,交换机,以及队列和交换机的绑定关系,因为消费者一旦绑定了交换机和队列的关系,后面就不需要重复设置了,这个关系就在rabbitmq中形成了,只要不手动删除,他自己不会改变。 因为这是第三个示例,前面的两个示例已经对队列和交换机做了设置和绑定。

? ? 这里需要启动容器,类似第二种的启动线程,其实这里也是利用了线程池,算是第二种方式的增强版。而且消费者封装成了BlockingQueueConsumer,更加符合队列消费者的语义。

? ? 有一点需要说明的是,这里采用的队列是和交换机进行绑定的,而交换机的类型是主题交换机,消息生产者只需要将消息发送到交换机对应的路由key上即可。也就是routingKey。??

channel.basicPublish(exchange_name, "TOPIC.4.128.1.a1OHmhQSyrxMEC_1", null, 
message.getBytes());

? ? 代码里面的?TOPIC.4.128.1.a1OHmhQSyrxMEC_1就是routingKey。

? ? ?消费者自己绑定主题交换机和队列,一个消费者,可以申请一个队列,两个消费者申请两个队列,这样,消息就路由到了多个消费者那里,而且互不干扰。

//queue
channel.queueDeclare(queue_name1, true, false, false, null);
//exchange
channel.exchangeDeclare(exchange_name, "topic",true);
channel.queueBind(queue_name1, exchange_name, "TOPIC.*.*.*.*");

? ? 最后附上一个简单的生产者代码示例:

package com.xxx.huali.hualitest.amqp;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Publisher {
	private static final String host = "192.168.226.100";
	private static final String username = "huali";
	private static final String password = "huali";
	
	private static final String exchange_name = "core.down.topic";
	
	public static void main(String[] args) {
		Connection conn = null;
		Channel channel = null;
		try {
			ConnectionFactory factory = new ConnectionFactory();
			factory.setHost(host);
			factory.setUsername(username);
			factory.setPassword(password);
			factory.setVirtualHost("/mec");
			
			//connection
			conn = factory.newConnection();
			//channel
			channel = conn.createChannel();
			//exchange
			channel.exchangeDeclare(exchange_name, "topic",true);
			JSONObject object = new JSONObject();
			String device_id = "a1OHmhQSyrxMEC_1";
			object.put("device_id", device_id);
			object.put("opt_type", 128);
			JSONObject data = new JSONObject();
			data.put("auzcode_devname", device_id);
			data.put("regist_result", 1);
			long time_stamp = new Date().getTime();
			object.put("data", data);
			object.put("time_stamp", time_stamp);
			String message = object.toJSONString();
			//for(int i=0;i<5;i++) {
			//	System.out.println("i="+i);
				channel.basicPublish(exchange_name, "TOPIC.4.128.1.a1OHmhQSyrxMEC_1", null, message.getBytes());
			//}
			System.out.println("done");
			
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
			try {
				channel.close();
				conn.close();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (TimeoutException e) {
				e.printStackTrace();
			}
		}
	}
}

? ? 生产者代码很简单,最关键的一句就是channel.basicPublish()那句发送消息代码。它只关心发送到哪个主题,哪个路由上,不关心队列。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-10-13 11:19:57  更:2021-10-13 11:20:46 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 21:19:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码