一、MoP是啥
MoP 即MQTT on Pulsar ,是一个在Pulsar基础上实现的MQTT协议,git地址:https://github.com/streamnative/mop 通过MoP可以快速的搭建一个MQTT服务器,下面介绍一下MoP的主要的代码结构。
二、MQTT消息定义
MQTT的消息在MoP中的定义是MqttMesssage,其代码如下:
public class MqttMessage {
private final MqttFixedHeader mqttFixedHeader;
private final Object variableHeader;
private final Object payload;
private final DecoderResult decoderResult;
public static final MqttMessage PINGREQ;
public static final MqttMessage PINGRESP;
public static final MqttMessage DISCONNECT;
}
- (1)MqttFixedHeader(固定头) :存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
- (2)variableHeader(可变头):存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
- (3)消息体(payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。
三、MoP主要代码模块
- 1、MQTTProtocolHandler:代码入口,继承自ProtocolHandler。
- 2、MQTTProxyService:采用netty的方式,实现网络通信
3、MQTTCommonInboundHandler:继承自ChannelInboundHandlerAdapter,实现ChannelInboundHandlerAdapter中的channelRead方法,获取客户端发送过来的消息,根据消息类型进入不同的处理流程。
public class MQTTCommonInboundHandler extends ChannelInboundHandlerAdapter {
protected ProtocolMethodProcessor processor; //协议处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object message) {
checkArgument(message instanceof MqttMessage);
checkNotNull(processor);
MqttMessage msg = (MqttMessage) message;
try {
checkState(msg);
MqttMessageType messageType = msg.fixedHeader().messageType();
if (log.isDebugEnabled()) {
log.debug("Processing MQTT Inbound handler message, type={}", messageType);
}
switch (messageType) {
case CONNECT:
checkArgument(msg instanceof MqttConnectMessage);
processor.processConnect((MqttConnectMessage) msg);
break;
case SUBSCRIBE:
checkArgument(msg instanceof MqttSubscribeMessage);
processor.processSubscribe((MqttSubscribeMessage) msg);
break;
case UNSUBSCRIBE:
checkArgument(msg instanceof MqttUnsubscribeMessage);
processor.processUnSubscribe((MqttUnsubscribeMessage) msg);
break;
case PUBLISH:
checkArgument(msg instanceof MqttPublishMessage);
processor.processPublish((MqttPublishMessage) msg);
break;
case PUBREC:
processor.processPubRec(msg);
break;
case PUBCOMP:
processor.processPubComp(msg);
break;
case PUBREL:
processor.processPubRel(msg);
break;
case DISCONNECT:
processor.processDisconnect(msg);
break;
case PUBACK:
checkArgument(msg instanceof MqttPubAckMessage);
processor.processPubAck((MqttPubAckMessage) msg);
break;
case PINGREQ:
processor.processPingReq();
break;
default:
throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
}
} catch (Throwable ex) {
ReferenceCountUtil.safeRelease(msg);
log.error("Exception was caught while processing MQTT message, ", ex);
ctx.close();
}
}
如上,先将消息解析成MqttMessage类型,再从mqttFixedHeader中获取消息类型messageType,然后根据不同类型进行不同的处理流程。
4、MQTTProxyInboundHandler:继承自MQTTCommonInboundHandler,代码如下:
public class MQTTProxyInboundHandler extends MQTTCommonInboundHandler {
private final MQTTProxyService proxyService;
public MQTTProxyInboundHandler(MQTTProxyService proxyService) {
this.proxyService = proxyService;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.processor = new MQTTProxyProtocolMethodProcessor(proxyService, ctx);
}
}
MQTTProxyInboundHandler通过实现channelActive,初始化processor MQTTProxyProtocolMethodProcessor类型。即处理mqtt消息主要是在MQTTProxyProtocolMethodProcessor中进行。
5、MQTTProxyProtocolMethodProcessor:继承自AbstractCommonProtocolMethodProcessor,主要用来处理MQTT消息。下面以处理CONNECT消息和PUBLISH消息为例来说明一下主要的逻辑。
? ? ?1)处理CONNECT消息
@Override
public void processConnect(MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
MqttConnectMessage connectMessage = msg;
final int protocolVersion = msg.variableHeader().version();
final String username = payload.userName();
String clientId = payload.clientIdentifier();
if (log.isDebugEnabled()) {
log.debug("process CONNECT message. CId={}, username={}", clientId, username);
}
// Check MQTT protocol version.
if (!MqttUtils.isSupportedVersion(protocolVersion)) {
log.error("MQTT protocol version is not valid. CId={}", clientId);
channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().unsupportedVersion());
channel.close();
return;
}
if (!MqttUtils.isQosSupported(msg)) {
channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().willQosNotSupport(protocolVersion));
channel.close();
return;
}
// Client must specify the client ID except enable clean session on the connection.
if (StringUtils.isEmpty(clientId)) {
if (!msg.variableHeader().isCleanSession()) {
channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().identifierInvalid(protocolVersion));
channel.close();
log.error("The MQTT client ID cannot be empty. Username={}", username);
return;
}
clientId = MqttMessageUtils.createClientIdentifier(channel);
connectMessage = MqttMessageUtils.stuffClientIdToConnectMessage(msg, clientId);
if (log.isDebugEnabled()) {
log.debug("Client has connected with generated identifier. CId={}", clientId);
}
}
String userRole = null;
if (!authenticationEnabled) {
if (log.isDebugEnabled()) {
log.debug("Authentication is disabled, allowing client. CId={}, username={}", clientId, username);
}
} else {
MQTTAuthenticationService.AuthenticationResult authResult = authenticationService.authenticate(payload);
if (authResult.isFailed()) {
channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().authFail(protocolVersion));
channel.close();
log.error("Invalid or incorrect authentication. CId={}, username={}", clientId, username);
return;
}
userRole = authResult.getUserRole();
}
doProcessConnect(connectMessage, userRole);
}
当MoP接收到connect类型消息时,调用processConnect方法,进行一系列的消息校验,然后调用?doProcessConnect(connectMessage, userRole)来创建连接
public void doProcessConnect(MqttConnectMessage msg, String userRole) {
connection = Connection.builder()
.protocolVersion(msg.variableHeader().version())
.clientId(msg.payload().clientIdentifier())
.userRole(userRole)
.cleanSession(msg.variableHeader().isCleanSession())
.connectMessage(msg)
.keepAliveTime(msg.variableHeader().keepAliveTimeSeconds())
.channel(channel)
.connectionManager(connectionManager)
.serverReceivePubMaximum(proxyConfig.getReceiveMaximum())
.build();
connection.sendConnAck();
}
通过创建好的connection对象来和客户端通信,并发送消息ack通知客户端建立连接成功。
? ? ?2)处理PUBLISH消息
//处理PUBLISH消息
public void processPublish(MqttPublishMessage msg) {
if (log.isDebugEnabled()) {
log.debug("[Proxy Publish] publish to topic = {}, CId={}",
msg.variableHeader().topicName(), connection.getClientId());
}
final int packetId = msg.variableHeader().packetId();
final String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(msg.variableHeader().topicName(),
proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(),
TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain()));
CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(
TopicName.get(pulsarTopicName));
lookupResult
.thenCompose(brokerAddress -> writeToBroker(brokerAddress, pulsarTopicName, msg))
.exceptionally(ex -> {
msg.release();
log.error("[Proxy Publish] Failed to publish for topic : {}, CId : {}",
msg.variableHeader().topicName(), connection.getClientId(), ex);
MopExceptionHelper.handle(MqttMessageType.PUBLISH, packetId, channel, ex);
return null;
});
}
//写入消息到broker中
private CompletableFuture<Void> writeToBroker(InetSocketAddress mqttBroker, String topic, MqttMessage msg) {
CompletableFuture<MQTTProxyExchanger> proxyExchanger = connectToBroker(mqttBroker, topic);
return proxyExchanger.thenCompose(exchanger -> writeToBroker(exchanger, msg));
}
从msg中获取topicName,再根据topicName获取broker地址,再根据broker地址、topicName将消息写入broker
|