IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 游戏开发 -> mq Too many publishes in progress -> 正文阅读

[游戏开发]mq Too many publishes in progress

在使用 org.eclipse.paho.client.mqttv3:1.2.5版本时,如果并发度过高或者跑了一段时间之后就会出现:Too many publishes in progress 的问题。首先看一下为什么会出现这个问题。

直接先上总结:

  • qos=0:在1.2.1版本之前token也会存在在 tokenStore里面,所以在为0时,如果并发过高就会导致释放 actualInFlight不及时的情况以及误删除其他msgId的token的情况,导致后续发送数据失败。由于现在我使用的是1.2.5版本qos=0已经不存入 tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题和no new message IDs being available 的异常信息
  • qos=1:发送消息时会存入到tokenStore当中,等待服务端的回应,回应了ack之后才会执行释放actualInFlight,由于并发度过高,客户端等待服务端回应时的数据比较慢,释放来不及就会导致出现这种问题,而no new message IDs being available 也是因为并发度过高,获取到重复的id循环执行次数超过了两次就会抛出异常;至于为什么明明没有发送消息了,后续发送数据还是会 Too many publishes in progress 我猜测,tokenStore被误删了,导致后续没有获取到token就不执行释放actualInFlight;或者是等待发送队列里面数据过多,一直处于满的状态;再或者就是mqtt服务端返回ack因为网络的问题,消费方的数据消费很慢,后续就一直再等待。嗯,以上的猜测我觉得很合理。

问题复现

首先我们使用QOS=0 的情况下来用 100个线程同时发送消息,会发现消息都是正常接收回复。让我看看源码 QOS=0的情况下框架都做了什么事
在这里插入图片描述
在这里插入图片描述

QOS=0

org.eclipse.paho.client.mqttv3.MqttAsyncClient#publish() 方法:验证了topic,封装成了 MqttDeliveryToken对象,将发送的消息设置到对象中,然后调用 comms.sendNoWait(pubMsg, token);
在这里插入图片描述
org.eclipse.paho.client.mqttv3.internal.ClientComms#sendNoWait() :首先判断连接状态,以及消息类型;如果连接断开了并且 disconnectedMessageBuffer不为空并且里面的消息数量不为0,就会把发送的消息放入其中
在这里插入图片描述

org.eclipse.paho.client.mqttv3.internal.ClientComms#internalSend(): 当前方法并没有做什么特别的事就事设置了一下token中的client信息

org.eclipse.paho.client.mqttv3.internal.ClientState#send(): 这是重点方法

getNextMessageId():这个方法是mqtt用来生成消息id的,取值范围是1 - 65535 之间,如果取出的id 存在于 inUseMsgIds,就会继续循环下一次自增,循环次数超过两次就会抛出:32001状态码,就是:Internal error, caused by no new message IDs being available 的异常情况,这个情况在qos为1并且并发情况过高下就会出现这个问题。我也不明白为什么框架会限制在 65535之间(还是太菜了,理解困难)

public void send(MqttWireMessage message, MqttToken token) throws MqttException {
		final String methodName = "send";
		//首先判断msgId是否必须要求,默认为true并且是否为0,这个msgId是框架自动生成的id,并不是我们在创建MqttMessage时生成,默认为0
		if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
				if(message instanceof MqttPublish  && (((MqttPublish) message).getMessage().getQos() != 0)){
				//
						message.setMessageId(getNextMessageId());
				}else if(message instanceof MqttPubAck ||
						message instanceof MqttPubRec ||
						message instanceof MqttPubRel ||
						message instanceof MqttPubComp ||
						message instanceof MqttSubscribe ||
						message instanceof MqttSuback ||
						message instanceof MqttUnsubscribe || 
						message instanceof MqttUnsubAck){
					message.setMessageId(getNextMessageId());
				}
		}
		if (token != null) {
			message.setToken(token);
			try {
				token.internalTok.setMessageID(message.getMessageId());
			} catch (Exception e) {
			}
		}
			
		if (message instanceof MqttPublish) {
			synchronized (queueLock) {
			//重点来了,这里 实际在飞行窗口中的数据是否大于最大飞行窗口的数量(默认为10)
				if (actualInFlight >= this.maxInflight) {
					//@TRACE 613= sending {0} msgs at max inflight window
					log.fine(CLASS_NAME, methodName, "613", new Object[]{ Integer.valueOf(actualInFlight)});
					//这里就会抛出异常情况:Too many publishes in progress
					throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
				}
				
				MqttMessage innerMessage = ((MqttPublish) message).getMessage();
				//@TRACE 628=pending publish key={0} qos={1} message={2}
				log.fine(CLASS_NAME,methodName,"628", new Object[]{ Integer.valueOf(message.getMessageId()),  Integer.valueOf(innerMessage.getQos()), message});
				//接下来根据qos的等级设置到对应的缓存当中,包括设置token
				switch(innerMessage.getQos()) {
					case 2:
						outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
						persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
						tokenStore.saveToken(token, message);
						break;
					case 1:
						outboundQoS1.put( Integer.valueOf(message.getMessageId()), message);
						persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
						tokenStore.saveToken(token, message);
						break;
				}
				//添加到待发送的队列当中
				pendingMessages.addElement(message);
				queueLock.notifyAll();
			}
		} else {
			//@TRACE 615=pending send key={0} message {1}
			log.fine(CLASS_NAME,methodName,"615", new Object[]{ Integer.valueOf(message.getMessageId()), message});
			
			if (message instanceof MqttConnect) {
				synchronized (queueLock) {
					// Add the connect action at the head of the pending queue ensuring it jumps
					// ahead of any of other pending actions.
					tokenStore.saveToken(token, message);
					pendingFlows.insertElementAt(message,0);
					queueLock.notifyAll();
				}
			} else {
				if (message instanceof MqttPingReq) {
					this.pingCommand = message;
				}
				else if (message instanceof MqttPubRel) {
					outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
					persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
				}
				else if (message instanceof MqttPubComp)  {
					persistence.remove(getReceivedPersistenceKey(message));
				}
				
				synchronized (queueLock) {
					if ( !(message instanceof MqttAck )) {
						tokenStore.saveToken(token, message);
					}
					pendingFlows.addElement(message);
					queueLock.notifyAll();
				}
			}
		}
	}

org.eclipse.paho.client.mqttv3.internal.CommsSender#run(): 然后就是 CommsSender线程中在循环读取数据。图中指示的 get() 方法

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

QOS=1

接下来看看QOS=1的情况,还是线程为100个同时发送;就会看到一下情况,正在进行过多的发布。但是其中能够正常处理其中的一些消息,跑到后续就会发现 Internal error, caused by no new message IDs being available 的异常信息。接下来看看为什么QOS=1的情况会出现这种情况。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
还是上面的代码步骤,但是qos=1的情况多了一个存储token的部分,在添加进队列当中时会存储一次token信息,目的就是等待服务端返回成功的ack。
在这里插入图片描述
org.eclipse.paho.client.mqttv3.internal.CommsReceiver#run(): 让我们看看 Receiver线程的run方法
在这里插入图片描述
org.eclipse.paho.client.mqttv3.internal.ClientState#notifyReceivedAck() : 当前方法就会根据收到消息的类型进行对应的操作,我们发送消息然后服务端回复的消息类型应该是 MqttPubAck对象
在这里插入图片描述
在这里插入图片描述
根据当前客户端的状态判断处理的逻辑,如果运行的状态为正在运行中,就将消息token放入到完成的队列当中,否则执行下面的逻辑,else中的 handleActionComplete() 会释放掉对应的token以及id,然后调用复写的回调 onSuccess和onFailure方法
在这里插入图片描述
上面的代码我们可以看到 消息被发送到了 completeQueue当中,而在消费者则是 CommsCallback线程中,接下我们看看run方法

org.eclipse.paho.client.mqttv3.internal.CommsCallback#run

在这里插入图片描述
org.eclipse.paho.client.mqttv3.internal.ClientState#notifyComplete:当前方法就会根据对应的消息类型进行操作
在这里插入图片描述
如果消息类型为 MqttPubAck的话就会执行oqs为1的释放,并且执行 decrementInFlight() 方法,将actualInFlight–,然后释放掉 inUseMsgIds 当中正在使用的id
在这里插入图片描述
总结:

  • qos=0:在1.2.1版本之前token也会存在在 tokenStore里面,所以在为0时,如果并发过高就会导致释放 actualInFlight不及时的情况以及误删除其他msgId的token的情况,导致后续发送数据失败。由于现在我使用的是1.2.5版本qos=0已经不存入 tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题和no new message IDs being available 的异常信息
  • qos=1:发送消息时会存入到tokenStore当中,等待服务端的回应,回应了ack之后才会执行释放actualInFlight,由于并发度过高,客户端等待服务端回应时的数据比较慢,释放来不及就会导致出现这种问题,而no new message IDs being available 也是因为并发度过高,获取到重复的id循环执行次数超过了两次就会抛出异常;至于为什么明明没有发送消息了,后续发送数据还是会 Too many publishes in progress 我猜测,tokenStore被误删了,导致后续没有获取到token就不执行释放actualInFlight;或者是等待发送队列里面数据过多,一直处于满的状态;再或者就是mqtt服务端返回ack因为网络的问题,消费方的数据消费很慢,后续就一直再等待。

解决方案

第一种解决方案:调整 maxInFlight 的大小,因为默认为10,这个并发数量可太小了。很可惜,不管设置的多大,这个飞行窗口在并发很高的情况下依旧会出问题

第二种就是修改源码:我直接在本地项目中复制了同一个包路径,然后复制了一个 ClientState类,修改了其中 msgId生成的策略,以及飞行窗口的判断逻辑。很可惜,他源码居然加密了,修改源码后执行时会报 :signer information does not match signer information of other classes in the same package的错误,好家伙,我放弃了。后来还是选择了方案一,调整飞行窗口的数量,能够满足日常情况就行了(因为我们业务,没那么大的并发)

第三种解决方案:在发送数据时,创建一个阻塞队列,判断飞行窗口中实际数量是否大于最大值,如果大于最大值,那么就存入到等待队列当中等待条件满足后再取出数据进行发送。还可以创建副本客户端,用于并发程度过高的压力分担

第四种解决方案:加服务器,加服务

  游戏开发 最新文章
6、英飞凌-AURIX-TC3XX: PWM实验之使用 GT
泛型自动装箱
CubeMax添加Rtthread操作系统 组件STM32F10
python多线程编程:如何优雅地关闭线程
数据类型隐式转换导致的阻塞
WebAPi实现多文件上传,并附带参数
from origin ‘null‘ has been blocked by
UE4 蓝图调用C++函数(附带项目工程)
Unity学习笔记(一)结构体的简单理解与应用
【Memory As a Programming Concept in C a
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:42:53  更:2022-04-04 12:44:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 18:47:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码