在这篇文章里,讲了在open62541里使用MQTT协议进行发布,发布内容是系统时间,但是没有讲如何订阅,open62541自带例子里也没有订阅的例子…
经过本人的苦心研究,最终解决了这个问题。open62541使用的版本是1.3.0,测试环境是debian10,其它环境也是类似。
一 工程搭建
由于pubsub是比较新的功能,如果开启了pubsub功能,又把UA_ENABLE_AMALGAMATION置位ON,那么生成的时候就会出现一些奇怪的问题。
所以,如果要使用pubsub功能,那么就把UA_ENABLE_AMALGAMATION置位OFF
最终工程结构如下, CMakeLists.txt内容如下,
cmake_minimum_required(VERSION 3.5.0)
project(demo VERSION 0.1.0)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/open62541/include)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/open62541/src/pubsub)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/open62541/plugins)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/build/open62541/src_generated/open62541)
set(CMAKE_BUILD_TYPE "RELEASE")
set(OPEN62541_VERSION "v1.3.0")
set(UA_ENABLE_AMALGAMATION OFF CACHE BOOL "")
set(UA_ENABLE_PUBSUB ON CACHE BOOL "")
set(UA_ENABLE_PUBSUB_MQTT ON CACHE BOOL "")
add_subdirectory(${CMAKE_CURRENT_SOURCE_DIR}/open62541)
add_executable(mqtt_pub ${CMAKE_CURRENT_SOURCE_DIR}/mqtt_pub.c)
target_link_libraries(mqtt_pub open62541)
add_executable(mqtt_sub ${CMAKE_CURRENT_SOURCE_DIR}/mqtt_sub.c)
target_link_libraries(mqtt_sub open62541)
mqtt_pub.c内容和这篇文章里是一样的,只是改了broker服务器的地址,
#define BROKER_ADDRESS_URL (char*)"opc.mqtt://broker-cn.emqx.io:1883"
这个地址是映云科技的MQTT公共服务器,地址是这里 这样可以更加真实的进行测试,当然使用mosquito的本地服务器也可以,如果使用本地服务器,记得把地址改成“opc.mqtt://127.0.0.1:1883”。
mqtt_sub.c的内容如下,
#include "open62541/server.h"
#include "open62541/server_config_default.h"
#include "ua_pubsub.h"
#include "ua_network_pubsub_mqtt.h"
#include "open62541/plugin/log_stdout.h"
#include <signal.h>
#define CONNECTION_NAME (char*)"MQTT Subscriber Connection"
#define TRANSPORT_PROFILE_URI (char*)"http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt"
#define MQTT_CLIENT_ID (char*)"TEST_CLIENT_PUBSUB_MQTT_SUB"
#define CONNECTIONOPTION_NAME (char*)"mqttClientId"
#define SUBSCRIBER_TOPIC (char*)"customTopic"
#define BROKER_ADDRESS_URL (char*)"opc.mqtt://broker-cn.emqx.io:1883"
#ifdef EXAMPLE_USE_MQTT_LOGIN
#define USERNAME_OPTION_NAME "mqttUsername"
#define PASSWORD_OPTION_NAME "mqttPassword"
#define MQTT_USERNAME "open62541user"
#define MQTT_PASSWORD "open62541"
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
#define USE_TLS_OPTION_NAME "mqttUseTLS"
#define MQTT_CA_FILE_PATH_OPTION_NAME "mqttCaFilePath"
#define CA_FILE_PATH "/path/to/server.cert"
#endif
#ifdef UA_ENABLE_JSON_ENCODING
static UA_Boolean useJson = true;
#else
static UA_Boolean useJson = false;
#endif
static UA_NodeId connectionIdent;
static UA_NodeId publishedDataSetIdent;
static UA_NodeId writerGroupIdent;
static void
addPubSubConnection(UA_Server *server, char *addressUrl) {
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING(CONNECTION_NAME);
connectionConfig.transportProfileUri = UA_STRING(TRANSPORT_PROFILE_URI);
connectionConfig.enabled = UA_TRUE;
UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
connectionConfig.publisherId.numeric = 2234;
const int connectionOptionsCount = 1
#ifdef EXAMPLE_USE_MQTT_LOGIN
+ 2
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
+ 2
#endif
;
UA_KeyValuePair connectionOptions[connectionOptionsCount];
size_t connectionOptionIndex = 0;
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, CONNECTIONOPTION_NAME);
UA_String mqttClientId = UA_STRING(MQTT_CLIENT_ID);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);
#ifdef EXAMPLE_USE_MQTT_LOGIN
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USERNAME_OPTION_NAME);
UA_String mqttUsername = UA_STRING(MQTT_USERNAME);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUsername, &UA_TYPES[UA_TYPES_STRING]);
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, PASSWORD_OPTION_NAME);
UA_String mqttPassword = UA_STRING(MQTT_PASSWORD);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttPassword, &UA_TYPES[UA_TYPES_STRING]);
#endif
#ifdef EXAMPLE_USE_MQTT_TLS
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, USE_TLS_OPTION_NAME);
UA_Boolean mqttUseTLS = true;
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttUseTLS, &UA_TYPES[UA_TYPES_BOOLEAN]);
connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, MQTT_CA_FILE_PATH_OPTION_NAME);
UA_String mqttCaFile = UA_STRING(CA_FILE_PATH);
UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttCaFile, &UA_TYPES[UA_TYPES_STRING]);
#endif
connectionConfig.connectionProperties = connectionOptions;
connectionConfig.connectionPropertiesSize = connectionOptionIndex;
UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
static void
mqttYieldPollingCallback(UA_Server *server, UA_PubSubConnection *connection) {
connection->channel->yield(connection->channel, 500);
}
static void callback(UA_ByteString *encodedBuffer, UA_ByteString *topic) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "Message Received");
UA_StatusCode ret = UA_STATUSCODE_GOOD;
#ifdef UA_ENABLE_JSON_ENCODING
UA_NetworkMessage dst;
ret = UA_NetworkMessage_decodeJson(&dst, encodedBuffer);
if( ret != UA_STATUSCODE_GOOD) {
return;
}
#else
UA_NetworkMessage dst;
memset(&dst, 0, sizeof(dst));
size_t offset = 0;
UA_NetworkMessage_decodeBinary(encodedBuffer, &offset, &dst);
#endif
UA_Byte anzDataSets = 1;
if(dst.payloadHeaderEnabled)
{
anzDataSets = dst.payloadHeader.dataSetPayloadHeader.count;
}
for(UA_Byte iterator = 0; iterator < anzDataSets; iterator++)
{
UA_DataSetMessage* dataSetMsg = &dst.payload.dataSetPayload.dataSetMessages[iterator];
if(dataSetMsg->header.dataSetMessageType == UA_DATASETMESSAGE_DATAKEYFRAME)
{
if(dataSetMsg->header.fieldEncoding != UA_FIELDENCODING_RAWDATA)
{
size_t anzFields = dataSetMsg->data.keyFrameData.fieldCount;
UA_StatusCode retVal = UA_STATUSCODE_GOOD;
for(UA_UInt16 i = 0; i < anzFields; i++)
{
if(dataSetMsg->data.keyFrameData.dataSetFields[i].hasValue)
{
UA_Variant *pValue = &(dataSetMsg->data.keyFrameData.dataSetFields[i].value);
if (UA_NodeId_equal(&(pValue->type->typeId), &UA_TYPES[UA_TYPES_DATETIME].typeId) == UA_FALSE)
{
continue;
}
UA_DateTime raw_date = *(UA_DateTime *) (pValue->data);
UA_DateTimeStruct dts = UA_DateTime_toStruct(raw_date);
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "date is: %u-%u-%u %u:%u:%u.%03u",
dts.year, dts.month, dts.day, dts.hour, dts.min, dts.sec, dts.milliSec);
}
}
}
}
}
UA_ByteString_delete(encodedBuffer);
UA_ByteString_delete(topic);
UA_NetworkMessage_clear(&dst);
}
static void
addSubscription(UA_Server *server, UA_PubSubConnection *connection) {
UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
brokerTransportSettings.queueName = UA_STRING(SUBSCRIBER_TOPIC);
brokerTransportSettings.resourceUri = UA_STRING_NULL;
brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;
UA_ExtensionObject transportSettings;
memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETREADERTRANSPORTDATATYPE];
transportSettings.content.decoded.data = &brokerTransportSettings;
UA_StatusCode rv = connection->channel->regist(connection->channel, &transportSettings, &callback);
if (rv == UA_STATUSCODE_GOOD) {
UA_UInt64 subscriptionCallbackId;
UA_Server_addRepeatedCallback(server, (UA_ServerCallback)mqttYieldPollingCallback,
connection, 200, &subscriptionCallbackId);
} else {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "register channel failed: %s!",
UA_StatusCode_name(rv));
}
return;
}
UA_Boolean running = true;
static void stopHandler(int sign) {
UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
running = false;
}
int main(int argc, char **argv) {
signal(SIGINT, stopHandler);
signal(SIGTERM, stopHandler);
UA_StatusCode retval = UA_STATUSCODE_GOOD;
UA_Server *server = UA_Server_new();
UA_ServerConfig *config = UA_Server_getConfig(server);
UA_ServerConfig_setMinimal(config, 4841, NULL);
UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerMQTT());
addPubSubConnection(server, BROKER_ADDRESS_URL);
UA_PubSubConnection *connection = UA_PubSubConnection_findConnectionbyId(server, connectionIdent);
if(!connection) {
UA_LOG_WARNING(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND,
"Could not create a PubSubConnection");
UA_Server_delete(server);
return -1;
}
addSubscription(server, connection);
UA_Server_run(server, &running);
UA_Server_delete(server);
return 0;
}
函数addSubscription()添加订阅,并注册一个回调函数callback,里面会把收到的消息进行解析,然后把系统时间打印出来,注意这个系统时间是mqtt_pub.c发出的。
可以看出订阅的代码比发布简单很多。注意细节:这2个都是open62541 server,所以发布用的端口是4840,订阅用的端口是4841
发布和订阅使用相同的broker服务器以及相同的topic,这样2者就可以通信了。
最后需要注意的是,由于发布和订阅相对于broker服务器来说,都是client,他们都有自己的client id,要保证这个值不同,即MQTT_CLIENT_ID
二 使用
cd到build目录,然后输入以下命令进行编译,
cmake .. && make
完成之后,在一个终端下运行mqtt_sub,另外一个终端下运行mqtt_pub,然后双方就可以通信了。
mqtt_sub这边的显示如下, 由于mqtt_pub是每隔500ms发送一次,可以看出mqtt_sub这边显示的系统时间也是间隔~500ms,注意这个时间是UTC时间,换算成中国所在时区要加上8个小时。
三 源码包
工程代码打包放到百度云里了,地址是https://pan.baidu.com/s/1CNvkw4wiIxXELTH0Ji4cVg,提取码是myxl
希望能对大家有所帮助。
|