IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 学习open62541 --- [65] 使用MQTT进行订阅 -> 正文阅读

[系统运维]学习open62541 --- [65] 使用MQTT进行订阅

这篇文章里,讲了在open62541里使用MQTT协议进行发布,发布内容是系统时间,但是没有讲如何订阅,open62541自带例子里也没有订阅的例子…

经过本人的苦心研究,最终解决了这个问题。open62541使用的版本是1.3.0,测试环境是debian10,其它环境也是类似。


一 工程搭建

由于pubsub是比较新的功能,如果开启了pubsub功能,又把UA_ENABLE_AMALGAMATION置位ON,那么生成的时候就会出现一些奇怪的问题。

所以,如果要使用pubsub功能,那么就把UA_ENABLE_AMALGAMATION置位OFF

最终工程结构如下,
在这里插入图片描述
CMakeLists.txt内容如下,

cmake_minimum_required(VERSION 3.5.0)
project(demo VERSION 0.1.0)

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/open62541/include)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/open62541/src/pubsub)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/open62541/plugins)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/build/open62541/src_generated/open62541)

set(CMAKE_BUILD_TYPE "RELEASE")

set(OPEN62541_VERSION "v1.3.0")
set(UA_ENABLE_AMALGAMATION OFF CACHE BOOL "")
set(UA_ENABLE_PUBSUB ON CACHE BOOL "")
set(UA_ENABLE_PUBSUB_MQTT ON CACHE BOOL "")

add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/open62541)


add_executable(mqtt_pub ${CMAKE_CURRENT_SOURCE_DIR}/mqtt_pub.c)
target_link_libraries(mqtt_pub open62541)

add_executable(mqtt_sub ${CMAKE_CURRENT_SOURCE_DIR}/mqtt_sub.c)
target_link_libraries(mqtt_sub open62541)

mqtt_pub.c内容和这篇文章里是一样的,只是改了broker服务器的地址,

#define BROKER_ADDRESS_URL           (char*)"opc.mqtt://broker-cn.emqx.io:1883"
// #define BROKER_ADDRESS_URL           (char*)"opc.mqtt://127.0.0.1:1883"

这个地址是映云科技的MQTT公共服务器,地址是这里
在这里插入图片描述
这样可以更加真实的进行测试,当然使用mosquito的本地服务器也可以,如果使用本地服务器,记得把地址改成“opc.mqtt://127.0.0.1:1883”。

mqtt_sub.c的内容如下,


#include "open62541/server.h"
#include "open62541/server_config_default.h"
#include "ua_pubsub.h"
#include "ua_network_pubsub_mqtt.h"
#include "open62541/plugin/log_stdout.h"

#include <signal.h>

#define CONNECTION_NAME              (char*)"MQTT Subscriber Connection"
#define TRANSPORT_PROFILE_URI        (char*)"http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
#define MQTT_CLIENT_ID               (char*)"TEST_CLIENT_PUBSUB_MQTT_SUB"
#define CONNECTIONOPTION_NAME        (char*)"mqttClientId"
#define SUBSCRIBER_TOPIC             (char*)"customTopic"
// #define BROKER_ADDRESS_URL           (char*)"opc.mqtt://127.0.0.1:1883" 
#define BROKER_ADDRESS_URL           (char*)"opc.mqtt://broker-cn.emqx.io:1883"

// Uncomment the following line to enable MQTT login for the example
// #define EXAMPLE_USE_MQTT_LOGIN

#ifdef EXAMPLE_USE_MQTT_LOGIN
#define USERNAME_OPTION_NAME         "mqttUsername"
#define PASSWORD_OPTION_NAME         "mqttPassword"
#define MQTT_USERNAME                "open62541user"
#define MQTT_PASSWORD                "open62541"
#endif

// Uncomment the following line to enable MQTT via TLS for the example
// #define EXAMPLE_USE_MQTT_TLS

#ifdef EXAMPLE_USE_MQTT_TLS
#define USE_TLS_OPTION_NAME             "mqttUseTLS"
#define MQTT_CA_FILE_PATH_OPTION_NAME   "mqttCaFilePath"
#define CA_FILE_PATH                    "/path/to/server.cert"
#endif

#ifdef UA_ENABLE_JSON_ENCODING
static UA_Boolean useJson = true;
#else
static UA_Boolean useJson = false;
#endif

static UA_NodeId connectionIdent;
static UA_NodeId publishedDataSetIdent;
static UA_NodeId writerGroupIdent;

static void
addPubSubConnection(UA_Server *server, char *addressUrl) {
    /* Details about the connection configuration and handling are located
     * in the pubsub connection tutorial */
    UA_PubSubConnectionConfig connectionConfig;
    memset(&connectionConfig, 0, sizeof(connectionConfig));
    connectionConfig.name = UA_STRING(CONNECTION_NAME);
    connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
    connectionConfig.enabled = UA_TRUE;

    /* configure address of the mqtt broker (local on default port) */
    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
    /* Changed to static publisherId from random generation to identify
     * the publisher on Subscriber side */
    connectionConfig.publisherId.numeric = 2234;

    /* configure options, set mqtt client id */
    const int connectionOptionsCount = 1
#ifdef EXAMPLE_USE_MQTT_LOGIN
    + 2
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
    + 2
#endif
    ;

    UA_KeyValuePair connectionOptions[connectionOptionsCount];

    size_t connectionOptionIndex = 0;
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
    UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);

#ifdef EXAMPLE_USE_MQTT_LOGIN
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USERNAME_OPTION_NAME);
    UA_String mqttUsername = UA_STRING(MQTT_USERNAME);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUsername, &UA_TYPES[UA_TYPES_STRING]);

    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, PASSWORD_OPTION_NAME);
    UA_String mqttPassword = UA_STRING(MQTT_PASSWORD);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttPassword, &UA_TYPES[UA_TYPES_STRING]);
#endif

#ifdef EXAMPLE_USE_MQTT_TLS
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USE_TLS_OPTION_NAME);
    UA_Boolean mqttUseTLS = true;
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUseTLS, &UA_TYPES[UA_TYPES_BOOLEAN]);

    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, MQTT_CA_FILE_PATH_OPTION_NAME);
    UA_String mqttCaFile = UA_STRING(CA_FILE_PATH);
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttCaFile, &UA_TYPES[UA_TYPES_STRING]);
#endif

    connectionConfig.connectionProperties = connectionOptions;
    connectionConfig.connectionPropertiesSize = connectionOptionIndex;

    UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}


/* Periodically refreshes the MQTT stack (sending/receiving) */
static void
mqttYieldPollingCallback(UA_Server *server, UA_PubSubConnection *connection) {
    connection->channel->yield(connection->channel, 500);
}

static void callback(UA_ByteString *encodedBuffer, UA_ByteString *topic) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Message Received");
    
    UA_StatusCode ret = UA_STATUSCODE_GOOD;

#ifdef UA_ENABLE_JSON_ENCODING
    // For example try to decode as a Json Networkmessage...
    UA_NetworkMessage dst;
    ret = UA_NetworkMessage_decodeJson(&dst, encodedBuffer);
    if( ret != UA_STATUSCODE_GOOD) {
        return;
    }
    
#else
    UA_NetworkMessage dst;
    memset(&dst, 0, sizeof(dst));
    size_t offset = 0;
    UA_NetworkMessage_decodeBinary(encodedBuffer, &offset, &dst);
#endif

    UA_Byte anzDataSets = 1;
    if(dst.payloadHeaderEnabled)
    {
        anzDataSets = dst.payloadHeader.dataSetPayloadHeader.count;
    }
        
    for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++) 
    {
        UA_DataSetMessage* dataSetMsg = &dst.payload.dataSetPayload.dataSetMessages[iterator];
        
        if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME) 
        {
            if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA) 
            {
                size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;

                UA_StatusCode retVal = UA_STATUSCODE_GOOD;
                for(UA_UInt16 i = 0; i < anzFields; i++) 
                {
                    if(dataSetMsg->data.keyFrameData.dataSetFields[i].hasValue) 
                    {
                        UA_Variant *pValue = &(dataSetMsg->data.keyFrameData.dataSetFields[i].value);
                        
                        if (UA_NodeId_equal(&(pValue->type->typeId), &UA_TYPES[UA_TYPES_DATETIME].typeId) == UA_FALSE)
                        {
                            continue;
                        }
                        
                        UA_DateTime raw_date = *(UA_DateTime *) (pValue->data);
                        UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
                        UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "date is: %u-%u-%u %u:%u:%u.%03u",
                            dts.year, dts.month, dts.day, dts.hour, dts.min, dts.sec, dts.milliSec);

                    }

                }

            }

        }
    }
    

    UA_ByteString_delete(encodedBuffer);
    UA_ByteString_delete(topic);
    
    UA_NetworkMessage_clear(&dst);
}

/* Adds a subscription */
static void 
addSubscription(UA_Server *server, UA_PubSubConnection *connection) {
    
    //Register Transport settings
    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
    brokerTransportSettings.queueName = UA_STRING(SUBSCRIBER_TOPIC);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
    
    /* QOS */
    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    UA_StatusCode rv = connection->channel->regist(connection->channel, &transportSettings, &callback);
    if (rv == UA_STATUSCODE_GOOD) {
            UA_UInt64 subscriptionCallbackId;
            UA_Server_addRepeatedCallback(server, (UA_ServerCallback)mqttYieldPollingCallback,
                                          connection, 200, &subscriptionCallbackId);
        } else {
            UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "register channel failed: %s!",
                           UA_StatusCode_name(rv));
    }
    return; 
}


UA_Boolean running = true;
static void stopHandler(int sign) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
    running = false;
}

int main(int argc, char **argv) {
    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);

    UA_StatusCode retval = UA_STATUSCODE_GOOD;

    UA_Server *server = UA_Server_new();
    UA_ServerConfig *config = UA_Server_getConfig(server);
    UA_ServerConfig_setMinimal(config, 4841, NULL);

    UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerMQTT());

    addPubSubConnection(server, BROKER_ADDRESS_URL);
    UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);

    if(!connection) {
        UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
                       "Could not create a PubSubConnection");
        UA_Server_delete(server);
        return -1;
    }

    addSubscription(server, connection);

    UA_Server_run(server, &running);
    UA_Server_delete(server);
    return 0;
}

函数addSubscription()添加订阅,并注册一个回调函数callback,里面会把收到的消息进行解析,然后把系统时间打印出来,注意这个系统时间是mqtt_pub.c发出的。

可以看出订阅的代码比发布简单很多。注意细节:这2个都是open62541 server,所以发布用的端口是4840,订阅用的端口是4841

发布和订阅使用相同的broker服务器以及相同的topic,这样2者就可以通信了。

最后需要注意的是,由于发布和订阅相对于broker服务器来说,都是client,他们都有自己的client id,要保证这个值不同,即MQTT_CLIENT_ID


二 使用

cd到build目录,然后输入以下命令进行编译,

cmake .. && make

完成之后,在一个终端下运行mqtt_sub,另外一个终端下运行mqtt_pub,然后双方就可以通信了。

mqtt_sub这边的显示如下,
在这里插入图片描述
由于mqtt_pub是每隔500ms发送一次,可以看出mqtt_sub这边显示的系统时间也是间隔~500ms,注意这个时间是UTC时间,换算成中国所在时区要加上8个小时。


三 源码包

工程代码打包放到百度云里了,地址是https://pan.baidu.com/s/1CNvkw4wiIxXELTH0Ji4cVg,提取码是myxl

希望能对大家有所帮助。

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-06-04 00:07:03  更:2022-06-04 00:08:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 13:38:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码