IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> MQTT协议框架MOP代码结构解析 -> 正文阅读

[网络协议]MQTT协议框架MOP代码结构解析

一、MoP是啥

MoP 即MQTT on Pulsar ,是一个在Pulsar基础上实现的MQTT协议,git地址:https://github.com/streamnative/mop
通过MoP可以快速的搭建一个MQTT服务器,下面介绍一下MoP的主要的代码结构。

二、MQTT消息定义

MQTT的消息在MoP中的定义是MqttMesssage,其代码如下:

public class MqttMessage {
    private final MqttFixedHeader mqttFixedHeader;
    private final Object variableHeader;
    private final Object payload;
    private final DecoderResult decoderResult;
    public static final MqttMessage PINGREQ;
    public static final MqttMessage PINGRESP;
    public static final MqttMessage DISCONNECT;
}
  • (1)MqttFixedHeader(固定头) :存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。
  • (2)variableHeader(可变头):存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。
  • (3)消息体(payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

三、MoP主要代码模块

  • 1、MQTTProtocolHandler:代码入口,继承自ProtocolHandler。
  • 2、MQTTProxyService:采用netty的方式,实现网络通信
    3、MQTTCommonInboundHandler:继承自ChannelInboundHandlerAdapter,实现ChannelInboundHandlerAdapter中的channelRead方法,获取客户端发送过来的消息,根据消息类型进入不同的处理流程。
public class MQTTCommonInboundHandler extends ChannelInboundHandlerAdapter {

    protected ProtocolMethodProcessor processor;  //协议处理

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) {
        checkArgument(message instanceof MqttMessage);
        checkNotNull(processor);
        MqttMessage msg = (MqttMessage) message;
        try {
            checkState(msg);
            MqttMessageType messageType = msg.fixedHeader().messageType();
            if (log.isDebugEnabled()) {
                log.debug("Processing MQTT Inbound handler message, type={}", messageType);
            }
            switch (messageType) {
                case CONNECT:
                    checkArgument(msg instanceof MqttConnectMessage);
                    processor.processConnect((MqttConnectMessage) msg);
                    break;
                case SUBSCRIBE:
                    checkArgument(msg instanceof MqttSubscribeMessage);
                    processor.processSubscribe((MqttSubscribeMessage) msg);
                    break;
                case UNSUBSCRIBE:
                    checkArgument(msg instanceof MqttUnsubscribeMessage);
                    processor.processUnSubscribe((MqttUnsubscribeMessage) msg);
                    break;
                case PUBLISH:
                    checkArgument(msg instanceof MqttPublishMessage);
                    processor.processPublish((MqttPublishMessage) msg);
                    break;
                case PUBREC:
                    processor.processPubRec(msg);
                    break;
                case PUBCOMP:
                    processor.processPubComp(msg);
                    break;
                case PUBREL:
                    processor.processPubRel(msg);
                    break;
                case DISCONNECT:
                    processor.processDisconnect(msg);
                    break;
                case PUBACK:
                    checkArgument(msg instanceof MqttPubAckMessage);
                    processor.processPubAck((MqttPubAckMessage) msg);
                    break;
                case PINGREQ:
                    processor.processPingReq();
                    break;
                default:
                    throw new UnsupportedOperationException("Unknown MessageType: " + messageType);
            }
        } catch (Throwable ex) {
            ReferenceCountUtil.safeRelease(msg);
            log.error("Exception was caught while processing MQTT message, ", ex);
            ctx.close();
        }
    }

如上,先将消息解析成MqttMessage类型,再从mqttFixedHeader中获取消息类型messageType,然后根据不同类型进行不同的处理流程。

4、MQTTProxyInboundHandler:继承自MQTTCommonInboundHandler,代码如下:

public class MQTTProxyInboundHandler extends MQTTCommonInboundHandler {

    private final MQTTProxyService proxyService;

    public MQTTProxyInboundHandler(MQTTProxyService proxyService) {
        this.proxyService = proxyService;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.processor = new MQTTProxyProtocolMethodProcessor(proxyService, ctx);
    }
}

MQTTProxyInboundHandler通过实现channelActive,初始化processor MQTTProxyProtocolMethodProcessor类型。即处理mqtt消息主要是在MQTTProxyProtocolMethodProcessor中进行。

5、MQTTProxyProtocolMethodProcessor:继承自AbstractCommonProtocolMethodProcessor,主要用来处理MQTT消息。下面以处理CONNECT消息和PUBLISH消息为例来说明一下主要的逻辑。

? ? ?1)处理CONNECT消息

 @Override
    public void processConnect(MqttConnectMessage msg) {
        MqttConnectPayload payload = msg.payload();
        MqttConnectMessage connectMessage = msg;
        final int protocolVersion = msg.variableHeader().version();
        final String username = payload.userName();
        String clientId = payload.clientIdentifier();
        if (log.isDebugEnabled()) {
            log.debug("process CONNECT message. CId={}, username={}", clientId, username);
        }
        // Check MQTT protocol version.
        if (!MqttUtils.isSupportedVersion(protocolVersion)) {
            log.error("MQTT protocol version is not valid. CId={}", clientId);
            channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().unsupportedVersion());
            channel.close();
            return;
        }
        if (!MqttUtils.isQosSupported(msg)) {
            channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().willQosNotSupport(protocolVersion));
            channel.close();
            return;
        }
        // Client must specify the client ID except enable clean session on the connection.
        if (StringUtils.isEmpty(clientId)) {
            if (!msg.variableHeader().isCleanSession()) {
                channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().identifierInvalid(protocolVersion));
                channel.close();
                log.error("The MQTT client ID cannot be empty. Username={}", username);
                return;
            }
            clientId = MqttMessageUtils.createClientIdentifier(channel);
            connectMessage = MqttMessageUtils.stuffClientIdToConnectMessage(msg, clientId);
            if (log.isDebugEnabled()) {
                log.debug("Client has connected with generated identifier. CId={}", clientId);
            }
        }
        String userRole = null;
        if (!authenticationEnabled) {
            if (log.isDebugEnabled()) {
                log.debug("Authentication is disabled, allowing client. CId={}, username={}", clientId, username);
            }
        } else {
            MQTTAuthenticationService.AuthenticationResult authResult = authenticationService.authenticate(payload);
            if (authResult.isFailed()) {
                channel.writeAndFlush(MqttConnectAckHelper.errorBuilder().authFail(protocolVersion));
                channel.close();
                log.error("Invalid or incorrect authentication. CId={}, username={}", clientId, username);
                return;
            }
            userRole = authResult.getUserRole();
        }
        doProcessConnect(connectMessage, userRole);
    }

当MoP接收到connect类型消息时,调用processConnect方法,进行一系列的消息校验,然后调用?doProcessConnect(connectMessage, userRole)来创建连接

 public void doProcessConnect(MqttConnectMessage msg, String userRole) {
        connection = Connection.builder()
                .protocolVersion(msg.variableHeader().version())
                .clientId(msg.payload().clientIdentifier())
                .userRole(userRole)
                .cleanSession(msg.variableHeader().isCleanSession())
                .connectMessage(msg)
                .keepAliveTime(msg.variableHeader().keepAliveTimeSeconds())
                .channel(channel)
                .connectionManager(connectionManager)
                .serverReceivePubMaximum(proxyConfig.getReceiveMaximum())
                .build();
        connection.sendConnAck();
    }

通过创建好的connection对象来和客户端通信,并发送消息ack通知客户端建立连接成功。

? ? ?2)处理PUBLISH消息

//处理PUBLISH消息
public void processPublish(MqttPublishMessage msg) {
        if (log.isDebugEnabled()) {
            log.debug("[Proxy Publish] publish to topic = {}, CId={}",
                    msg.variableHeader().topicName(), connection.getClientId());
        }
        final int packetId = msg.variableHeader().packetId();
        final String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(msg.variableHeader().topicName(),
                proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(),
                TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain()));
        CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(
                TopicName.get(pulsarTopicName));
        lookupResult
                .thenCompose(brokerAddress -> writeToBroker(brokerAddress, pulsarTopicName, msg))
                .exceptionally(ex -> {
                    msg.release();
                    log.error("[Proxy Publish] Failed to publish for topic : {}, CId : {}",
                            msg.variableHeader().topicName(), connection.getClientId(), ex);
                    MopExceptionHelper.handle(MqttMessageType.PUBLISH, packetId, channel, ex);
                    return null;
                });
    }
//写入消息到broker中
    private CompletableFuture<Void> writeToBroker(InetSocketAddress mqttBroker, String topic, MqttMessage msg) {
        CompletableFuture<MQTTProxyExchanger> proxyExchanger = connectToBroker(mqttBroker, topic);
        return proxyExchanger.thenCompose(exchanger -> writeToBroker(exchanger, msg));
    }

从msg中获取topicName,再根据topicName获取broker地址,再根据broker地址、topicName将消息写入broker

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-01-24 11:18:35  更:2022-01-24 11:20:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 5:29:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码