在使用 org.eclipse.paho.client.mqttv3:1.2.5版本时,如果并发度过高或者跑了一段时间之后就会出现:Too many publishes in progress 的问题。首先看一下为什么会出现这个问题。
直接先上总结:
- qos=0:在1.2.1版本之前token也会存在在 tokenStore里面,所以在为0时,如果并发过高就会导致释放 actualInFlight不及时的情况以及误删除其他msgId的token的情况,导致后续发送数据失败。由于现在我使用的是1.2.5版本qos=0已经不存入 tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题和no new message IDs being available 的异常信息
- qos=1:发送消息时会存入到tokenStore当中,等待服务端的回应,回应了ack之后才会执行释放actualInFlight,由于并发度过高,客户端等待服务端回应时的数据比较慢,释放来不及就会导致出现这种问题,而no new message IDs being available 也是因为并发度过高,获取到重复的id循环执行次数超过了两次就会抛出异常;至于为什么明明没有发送消息了,后续发送数据还是会 Too many publishes in progress 我猜测,tokenStore被误删了,导致后续没有获取到token就不执行释放actualInFlight;或者是等待发送队列里面数据过多,一直处于满的状态;再或者就是mqtt服务端返回ack因为网络的问题,消费方的数据消费很慢,后续就一直再等待。嗯,以上的猜测我觉得很合理。
问题复现
首先我们使用QOS=0 的情况下来用 100个线程同时发送消息,会发现消息都是正常接收回复。让我看看源码 QOS=0的情况下框架都做了什么事
QOS=0
org.eclipse.paho.client.mqttv3.MqttAsyncClient#publish() 方法:验证了topic,封装成了 MqttDeliveryToken对象,将发送的消息设置到对象中,然后调用 comms.sendNoWait(pubMsg, token); org.eclipse.paho.client.mqttv3.internal.ClientComms#sendNoWait() :首先判断连接状态,以及消息类型;如果连接断开了并且 disconnectedMessageBuffer不为空并且里面的消息数量不为0,就会把发送的消息放入其中
org.eclipse.paho.client.mqttv3.internal.ClientComms#internalSend(): 当前方法并没有做什么特别的事就事设置了一下token中的client信息
org.eclipse.paho.client.mqttv3.internal.ClientState#send(): 这是重点方法
getNextMessageId():这个方法是mqtt用来生成消息id的,取值范围是1 - 65535 之间,如果取出的id 存在于 inUseMsgIds,就会继续循环下一次自增,循环次数超过两次就会抛出:32001状态码,就是:Internal error, caused by no new message IDs being available 的异常情况,这个情况在qos为1并且并发情况过高下就会出现这个问题。我也不明白为什么框架会限制在 65535之间(还是太菜了,理解困难)
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
if(message instanceof MqttPublish && (((MqttPublish) message).getMessage().getQos() != 0)){
message.setMessageId(getNextMessageId());
}else if(message instanceof MqttPubAck ||
message instanceof MqttPubRec ||
message instanceof MqttPubRel ||
message instanceof MqttPubComp ||
message instanceof MqttSubscribe ||
message instanceof MqttSuback ||
message instanceof MqttUnsubscribe ||
message instanceof MqttUnsubAck){
message.setMessageId(getNextMessageId());
}
}
if (token != null) {
message.setToken(token);
try {
token.internalTok.setMessageID(message.getMessageId());
} catch (Exception e) {
}
}
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
log.fine(CLASS_NAME, methodName, "613", new Object[]{ Integer.valueOf(actualInFlight)});
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
log.fine(CLASS_NAME,methodName,"628", new Object[]{ Integer.valueOf(message.getMessageId()), Integer.valueOf(innerMessage.getQos()), message});
switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
tokenStore.saveToken(token, message);
break;
case 1:
outboundQoS1.put( Integer.valueOf(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
tokenStore.saveToken(token, message);
break;
}
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
log.fine(CLASS_NAME,methodName,"615", new Object[]{ Integer.valueOf(message.getMessageId()), message});
if (message instanceof MqttConnect) {
synchronized (queueLock) {
tokenStore.saveToken(token, message);
pendingFlows.insertElementAt(message,0);
queueLock.notifyAll();
}
} else {
if (message instanceof MqttPingReq) {
this.pingCommand = message;
}
else if (message instanceof MqttPubRel) {
outboundQoS2.put( Integer.valueOf(message.getMessageId()), message);
persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
}
else if (message instanceof MqttPubComp) {
persistence.remove(getReceivedPersistenceKey(message));
}
synchronized (queueLock) {
if ( !(message instanceof MqttAck )) {
tokenStore.saveToken(token, message);
}
pendingFlows.addElement(message);
queueLock.notifyAll();
}
}
}
}
org.eclipse.paho.client.mqttv3.internal.CommsSender#run(): 然后就是 CommsSender线程中在循环读取数据。图中指示的 get() 方法
QOS=1
接下来看看QOS=1的情况,还是线程为100个同时发送;就会看到一下情况,正在进行过多的发布。但是其中能够正常处理其中的一些消息,跑到后续就会发现 Internal error, caused by no new message IDs being available 的异常信息。接下来看看为什么QOS=1的情况会出现这种情况。
还是上面的代码步骤,但是qos=1的情况多了一个存储token的部分,在添加进队列当中时会存储一次token信息,目的就是等待服务端返回成功的ack。 org.eclipse.paho.client.mqttv3.internal.CommsReceiver#run(): 让我们看看 Receiver线程的run方法 org.eclipse.paho.client.mqttv3.internal.ClientState#notifyReceivedAck() : 当前方法就会根据收到消息的类型进行对应的操作,我们发送消息然后服务端回复的消息类型应该是 MqttPubAck对象 根据当前客户端的状态判断处理的逻辑,如果运行的状态为正在运行中,就将消息token放入到完成的队列当中,否则执行下面的逻辑,else中的 handleActionComplete() 会释放掉对应的token以及id,然后调用复写的回调 onSuccess和onFailure方法 上面的代码我们可以看到 消息被发送到了 completeQueue当中,而在消费者则是 CommsCallback线程中,接下我们看看run方法
org.eclipse.paho.client.mqttv3.internal.CommsCallback#run
org.eclipse.paho.client.mqttv3.internal.ClientState#notifyComplete:当前方法就会根据对应的消息类型进行操作 如果消息类型为 MqttPubAck的话就会执行oqs为1的释放,并且执行 decrementInFlight() 方法,将actualInFlight–,然后释放掉 inUseMsgIds 当中正在使用的id 总结:
- qos=0:在1.2.1版本之前token也会存在在 tokenStore里面,所以在为0时,如果并发过高就会导致释放 actualInFlight不及时的情况以及误删除其他msgId的token的情况,导致后续发送数据失败。由于现在我使用的是1.2.5版本qos=0已经不存入 tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题和no new message IDs being available 的异常信息
- qos=1:发送消息时会存入到tokenStore当中,等待服务端的回应,回应了ack之后才会执行释放actualInFlight,由于并发度过高,客户端等待服务端回应时的数据比较慢,释放来不及就会导致出现这种问题,而no new message IDs being available 也是因为并发度过高,获取到重复的id循环执行次数超过了两次就会抛出异常;至于为什么明明没有发送消息了,后续发送数据还是会 Too many publishes in progress 我猜测,tokenStore被误删了,导致后续没有获取到token就不执行释放actualInFlight;或者是等待发送队列里面数据过多,一直处于满的状态;再或者就是mqtt服务端返回ack因为网络的问题,消费方的数据消费很慢,后续就一直再等待。
解决方案
第一种解决方案:调整 maxInFlight 的大小,因为默认为10,这个并发数量可太小了。很可惜,不管设置的多大,这个飞行窗口在并发很高的情况下依旧会出问题
第二种就是修改源码:我直接在本地项目中复制了同一个包路径,然后复制了一个 ClientState类,修改了其中 msgId生成的策略,以及飞行窗口的判断逻辑。很可惜,他源码居然加密了,修改源码后执行时会报 :signer information does not match signer information of other classes in the same package的错误,好家伙,我放弃了。后来还是选择了方案一,调整飞行窗口的数量,能够满足日常情况就行了(因为我们业务,没那么大的并发)
第三种解决方案:在发送数据时,创建一个阻塞队列,判断飞行窗口中实际数量是否大于最大值,如果大于最大值,那么就存入到等待队列当中等待条件满足后再取出数据进行发送。还可以创建副本客户端,用于并发程度过高的压力分担
第四种解决方案:加服务器,加服务
|