Mqtt连接池
使用的技术:org.apache.commons.pool2、vertx-mqtt。使用的是pool2的通用连接池,同理可以对其他的一些客户端连接进行池化。暂未投入生产使用,有问题需自行维护并欢迎指正! 源码地址:gitee源码参考地址
maven依赖
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
<version>4.1.1</version>
</dependency>
mqtt连接配置
@Component
@ConfigurationProperties(prefix = "mqtt.client")
public class MqttClientProperties {
private String host = MqttClientOptions.DEFAULT_HOST;
private Integer port = MqttClientOptions.DEFAULT_PORT;
private String username;
private String password;
private Integer connectTimeout;
private Integer ackTimeout =MqttClientOptions.DEFAULT_ACK_TIMEOUT;
private boolean autoGeneratedClientId = true;
private boolean autoKeepAlive ;
private boolean cleanSession ;
private boolean willFlag ;
private int willQoS;
private boolean willRetain;
private int keepAliveInterval = MqttClientOptions.DEFAULT_KEEP_ALIVE_INTERVAL;
private int maxInflightQueue;
private int maxMessageSize = MqttClientOptions.DEFAULT_MAX_MESSAGE_SIZE;
private String willMessage;
private String willTopic;
private int minIdle = 3;
private int maxTotal = 8;
private int maxIdle = 8;
private int maxWaitMillis = 6000;
private boolean blockWhenExhausted = true;
private boolean testOnBorrow = true;
private boolean testOnReturn;
private boolean testWhileIdle;
private boolean jmxEnabled = false;
private boolean fairness;
private boolean maxWaitDuration;
setter...
getter...
}
mqtt连接工厂
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
@Slf4j
public class MqttClientFactory extends BasePooledObjectFactory<MqttClient> {
private MqttClientProperties config;
public MqttClientFactory(MqttClientProperties config) {
this.config = config;
}
@Override
public MqttClient create() throws InterruptedException {
MqttClientOptions options = new MqttClientOptions();
options.setMaxMessageSize(config.getMaxMessageSize());
options.setPassword(config.getPassword());
options.setUsername(config.getUsername());
options.setAckTimeout(config.getAckTimeout());
options.setAutoGeneratedClientId(config.isAutoGeneratedClientId());
options.setClientId(config.getClientId());
options.setAutoKeepAlive(config.isAutoKeepAlive());
options.setCleanSession(config.isCleanSession());
options.setKeepAliveInterval(config.getKeepAliveInterval());
options.setWillFlag(config.isWillFlag());
options.setWillQoS(config.getWillQoS());
MqttClient client = MqttClient.create(Vertx.vertx(), options);
MqttClient connect = client.connect(config.getPort(), config.getHost(), r -> {
if (r.succeeded()) {
log.info("{}:连接成功回调",log.getName());
}
if (r.failed()) {
log.error("{}:连接失败回调",log.getName());
}
});
return connect;
}
@Override
public PooledObject<MqttClient> wrap(MqttClient client) {
return new DefaultPooledObject<>(client);
}
@Override
public void destroyObject(PooledObject<MqttClient> pooled) {
if (pooled == null) {
return;
}
MqttClient client = pooled.getObject();
if (client.isConnected()) {
client.disconnect();
}
}
@Override
public boolean validateObject(PooledObject<MqttClient> pooled) {
MqttClient client = pooled.getObject();
boolean flag = false;
while (!flag){
flag = client.isConnected();
}
return flag;
}
}
mqtt的连接template
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
@Slf4j
public class MqttTemplate {
private GenericObjectPool<MqttClient> clientPool;
public MqttTemplate() {
}
public MqttTemplate(MqttClientFactory factory){
GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
config.setMinIdle(1);
config.setBlockWhenExhausted(true);
config.setTestOnBorrow(true);
config.setTestOnReturn(true);
config.setTestWhileIdle(true);
config.setJmxEnabled(false);
this.clientPool = new GenericObjectPool<>(factory,config);
}
public MqttTemplate(MqttClientFactory factory,MqttClientProperties properties){
GenericObjectPoolConfig<MqttClient> config = new GenericObjectPoolConfig<>();
config.setMinIdle(properties.getMinIdle());
config.setBlockWhenExhausted(properties.isBlockWhenExhausted());
config.setTestOnBorrow(properties.isTestOnBorrow());
config.setTestOnReturn(properties.isTestOnReturn());
config.setTestWhileIdle(properties.isTestWhileIdle());
config.setMaxIdle(properties.getMaxIdle());
config.setMaxTotal(properties.getMaxTotal());
config.setJmxEnabled(properties.isJmxEnabled());
this.clientPool = new GenericObjectPool<>(factory,config);
}
public boolean publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain){
try {
MqttClient client = clientPool.borrowObject();
if(client.isConnected()){
log.info("{}获取连接成功",log.getName());
client.publish(topic, payload, qosLevel, isDup, isRetain, r -> {
log.info("{}消息推送成功",log.getName());
clientPool.returnObject(client);
});
return true;
}else{
log.error("{}获取的客户端是断开的!",log.getName());
}
} catch (Exception e) {
log.error("{}获取连接失败:{}",log.getName(),e.getMessage());
}
return false;
}
...可以再自定义的去封装一些方法
}
mqtt连接池配置
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableConfigurationProperties(MqttClientProperties.class)
public class MqttConfig {
private MqttClientProperties mqttClientProperties;
@Autowired
public void setClientProperties(MqttClientProperties mqttClientProperties) {
this.mqttClientProperties = mqttClientProperties;
}
@Bean
public MqttClientFactory getClientFactory() {
return new MqttClientFactory(mqttClientProperties);
}
@Bean
public MqttTemplate getMqttTemplate() {
return new MqttTemplate(getClientFactory(),mqttClientProperties);
}
}
使用
@Autowired
private MqttTemplate mqttTemplate;
@GetMapping
public boolean pool(){
boolean publish = mqttTemplate.publish("20220428testtopic", Buffer.buffer("aaa"), MqttQoS.AT_MOST_ONCE, false, false);
return publish;
}
|