报错详情
Dispatcher has no subscribers for channel 'mqttOutboundChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage []
报错代码
/**
* @author aqin1012 AQin.
* @date 2022/2/23 9:16 AM
* @Version 1.0
*/
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Autowired
private MqttProperties prop;
@Autowired
MqttEncryptHelper mqttEncryptHelper;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setServerURIs(new String[]{prop.getHostUrl()});
mqttConnectOptions.setUserName(prop.getUsername());
mqttConnectOptions.setPassword(prop.getPassword().toCharArray());
// 客户端断线时暂时不清除,直到超时注销
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setKeepAliveInterval(prop.getKeepAliveInterval());
mqttConnectOptions.setAutomaticReconnect(true);
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
prop.getClientId() + "-pub-" + Instant.now().toEpochMilli(),
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setDefaultTopic(prop.getSenderDispatchTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
报错分析
由报错信息可以看出来是由于频道没有订阅者,由于我的mqttOutboundChannel () 和mqttOutbound() 是绑定的,在参考了https://www.infoflowing.com这篇问答(非常感谢~)后,我把给频道手动添加订阅者:
mqttOutboundChannel()? 做如下修改:
@Bean
public MessageChannel mqttOutboundChannel() {
DirectChannel dc = new DirectChannel();
dc.subscribe(mqttOutbound());
return dc;
}
再次运行服务
搞定~撒花 !!!
很多博文给出的解决方案是添加注解?@EnableIntegration,但是我这边并未起作用(有知道原因的大佬麻烦不吝指导~~谢谢呢🙏)
|