IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ java API -> 正文阅读

[Java知识库]RabbitMQ java API

RabbitMQ java API

队列持久化

# 生成一个队列 1、队列名称 2、队列里面的消息是否进行持久化 3、是否共享消息 4、是否自动删除 5、其他高级参
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

在这里插入图片描述

消息持久化

# 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
# 参数3:设置消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes("UTF-8"));

不公平分发(能力越大责任越大,多劳多得)

# 消费者代码处增加
channel.basicQos(1);

在这里插入图片描述

预期值

# 消费者代码处增加
channel.basicQos(3);

在这里插入图片描述

消息不丢失条件

1、队列持久化;
2、消息持久化;
3、发布确认;
3.1、单个确认发布;(1000条,耗时722ms)慢
3.2、批量发布确认;(1000条,耗时147ms)快,出问题后不知道是哪个,
3.3、异步发布确认;(1000条,耗时62ms发完)性价比最高,复杂

发布确认

开启发布确认

channel.confirmSelect();// 开启发布确认

单个确认发布

# 发一条,及时确认一条
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认

批量确认发布

# 发多条后,再确认。比如10个一组或100个一组,然后确认一次
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
......
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认

异步确认发布

发送消息前,增加监听

// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
	System.out.println("成功的消息标记:" + deliveryTag);
};

// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
	System.out.println("失败的消息标记:" + deliveryTag);
};

channel.addConfirmListener(ackCallback, ackCallback);

/**
 * 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.waitForConfirms();// 确认

失败消息处理

/**
 * 线程安全有序的哈希表 使用于高并发 1、轻松的将序号和消息关联 2、轻松批量删除条码,根据序号 3、支持高并发(多线程)
 */
ConcurrentSkipListMap<Long, String> outStandingConfirms = new ConcurrentSkipListMap<>();

// 成功
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
	if(multiple) {
		ConcurrentNavigableMap<Long, String> confirmed = outStandingConfirms.headMap(deliveryTag);
		confirmed.clear();
	}else {
		outStandingConfirms.remove(deliveryTag);
	}
	
	System.out.println("成功的消息标记:" + deliveryTag);
};

// 失败
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
	System.out.println("失败的消息标记:" + deliveryTag);
};

channel.addConfirmListener(ackCallback, ackCallback);

/**
 * 发送一个消息 1、发送到哪个交换机 2、路由的key 3、其他的参数信息 4、发送消息内容
 */
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
outStandingConfirms.put(channel.getNextPublishSeqNo(), msg);
channel.waitForConfirms();// 确认
System.out.println("消息发送完毕");

广播/订阅(fanout-扇出交换机)

订阅者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLog1 {

	public static final String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		// 生成随机队列名称,消费完成后自动删除
		String queueName = channel.queueDeclare().getQueue();

		// 绑定队列到交换机上,routingkey为空字符串
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("ReceiveLog1接收到的消息:" + msg);
		};
		CancelCallback cancelCallback = consumerTag -> {

		};
		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("ReceiveLog1准备就绪:");
	}
}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

public class EmitLog {

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(ReceiveLog1.EXCHANGE_NAME, "fanout");
		String msg = "世界你好!";
		channel.basicPublish(ReceiveLog1.EXCHANGE_NAME, "", null, msg.getBytes());
		System.out.println("消息发送完成");
	}

}

路由模式(direct-直接交换机)

消费者1

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogDirect1 {

	public static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		String queueName = "console";

		// 声明一个队列
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "info");
		channel.queueBind(queueName, EXCHANGE_NAME, "warning");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("console接收到的消息:" + msg);
		};
		CancelCallback cancelCallback = consumerTag -> {

		};
		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("console准备就绪:");
	}

}

消费者2

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogDirect2 {

	public static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

		String queueName = "console";

		// 声明一个队列
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "error");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("disk接收到的消息:" + msg);
		};
		CancelCallback cancelCallback = consumerTag -> {

		};
		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("disk准备就绪:");
	}

}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

public class EmitLog {

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(ReceiveLogDirect1.EXCHANGE_NAME, "direct");
		String msg = "世界你好aaaaaaaaaaaaaaaaaaaa-warning!";
		channel.basicPublish(ReceiveLogDirect1.EXCHANGE_NAME, "warning", null, msg.getBytes());
		System.out.println("消息发送完成");
	}

}

(Topics-主题交换机)(可包括:fanout、direct)

routing_key格式要求:
1、必须是一个单词列表,以点号分割开;
2、比如:hao.ok.very;
3、* 可以代替一个单词;
4、# 可以代替零个或多个单词;
5、若只有 # ,则表示所有记录,相当于:fanout;
6、若没有 * 和 # ,则表示绑定具体队列,相当于:direct;在这里插入图片描述
消费者1

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C1 {

	public static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

		String queueName = "Q1";
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C1接收到的消息:" + msg+" 绑定键:"+message.getEnvelope().getRoutingKey());
		};
		CancelCallback cancelCallback = consumerTag -> {

		};

		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("C1准备就绪:");
	}

}

消费者2

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C2 {

	public static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

		String queueName = "Q2";
		channel.queueDeclare(queueName, false, false, false, null);
		channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
		channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C2接收到的消息:" + msg + " 绑定键:" + message.getEnvelope().getRoutingKey());
		};
		CancelCallback cancelCallback = consumerTag -> {

		};

		channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
		System.out.println("C2准备就绪:");
	}

}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class P {

	public static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

		String msg = "topic模式测试走起";
		channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, msg.getBytes("UTF-8"));
		System.out.println("消息发送成功");
	}

}

死信队列

死信队列的3大来源:
1、超时;
2、队列满;
3、消息被拒绝;
在这里插入图片描述

TTL死信

消费者

import java.util.HashMap;
import java.util.Map;

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C1 {

	public static final String EXCHANGE_NORMAL_NAME = "normal_exchange";

	public static final String EXCHANGE_DEAD_NAME = "dead_exchange";

	public static final String QUEUE_NORMAL_NAME = "normal_queue";
	public static final String QUEUE_DEAD_NAME = "dead_queue";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();

		channel.exchangeDeclare(EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
		channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT);

		Map<String, Object> arguments = new HashMap<String, Object>();
		// 过期时间 10s=10000ms(一般过期时间在生产方控制,此处不写)
		// arguments.put("x-message-ttl",10000);
		// 过期后进入的死信交换机
		arguments.put("x-dead-letter-exchange", EXCHANGE_DEAD_NAME);
		// 设置死信routingkey
		arguments.put("x-dead-letter-routing-key", "lisi");
		channel.queueDeclare(QUEUE_NORMAL_NAME, false, false, false, arguments);

		channel.queueBind(QUEUE_NORMAL_NAME, EXCHANGE_NORMAL_NAME, "zhangsan");

		// 死信队列
		channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
		channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C1接收到消息:" + msg);

		};
		channel.basicConsume(QUEUE_NORMAL_NAME, deliverCallback, consumerTag -> {
		});
	}

}

消费者2

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class C2 {

	public static final String EXCHANGE_DEAD_NAME = "dead_exchange";

	public static final String QUEUE_DEAD_NAME = "dead_queue";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();

		// 死信队列
		//channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null);
		//channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "lisi");

		DeliverCallback deliverCallback = (consumerTag, message) -> {
			String msg = new String(message.getBody(), "UTF-8");
			System.out.println("C2接收到消息:" + msg);

		};
		channel.basicConsume(QUEUE_DEAD_NAME, deliverCallback, consumerTag -> {
		});
	}

}

生产者

import com.asia.tip.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

public class P {

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();

		channel.exchangeDeclare(C1.EXCHANGE_NORMAL_NAME, BuiltinExchangeType.DIRECT);
		String msg = "死信消息要来了";
		// 过期时间,单位毫秒,设置10s
		BasicProperties properties = new BasicProperties().builder().expiration("10000").build();
		channel.basicPublish(C1.EXCHANGE_NORMAL_NAME, "zhangsan", properties, msg.getBytes("UTF-8"));
		System.out.println("死信消息发送成功!");
	}

}

MAX死信

# C1增加代码
// 设置队列长度限制(队列里面超过6条时,则进入到死信队列)
arguments.put("x-max-length", 6);

# P 删除超时设置即可

在这里插入图片描述

消息被拒绝-死信

消费者增加以下代码进行拒绝

DeliverCallback deliverCallback = (consumerTag, message) -> {
	String msg = new String(message.getBody(), "UTF-8");
	if("拒绝条件".equals(msg)) {
		System.out.println("C2拒绝接收消息接收到消息:" + msg);
		channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
	}else {
		System.out.println("C2接收消息接收到消息:" + msg);
		channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
	}
};
// 需要开启手动应答
channel.basicConsume(QUEUE_NORMAL_NAME,false, deliverCallback, consumerTag -> {
});

延迟队列(死信队列的一种)

使用场景:
1、订单在十分钟内未支付自动取消;
2、新创建的店铺,如果10天内都没上传过商品,则自动发送消息提醒;
3、新注册用户,如果3天内都没进行登录,则短信提醒;
4、用户发起退款,如果3天都没处理,则通知相关运营人员介入;
5、预定会议,在预定会议开始前10分钟发消息通知参会人员;
总结:虽然和定时任务非常相似。但特点是:数据量大,时效性强;

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-16 22:07:36  更:2022-03-16 22:08:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:59:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码