IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 嵌入式 -> 树莓派mqtt连接onenet平台实现消息订阅与发布 -> 正文阅读

[嵌入式]树莓派mqtt连接onenet平台实现消息订阅与发布

一、引言

由于毕设需求,需要通过MQTT上传数据与下发命令,所以小白的我学习了MQTT,虽然理论部分还是懵懵懂懂,但别人的实现代码看懂也还算凑合。对于mqtt的入门我是跟着B站一位up主物联网技术大神的教学视频学习的,他是以stm32开发板来讲解的。由于个人能力薄弱所以自己整合的代码有很大一部分是直接copy这位up主的(嘿嘿 😮),特别mqtt.c中的,不过也做了些许修改以适用于树莓派,在此多谢这位up主。由于onenet是一个免费开放平台,所以对于没钱党的我选择了它。
我选择把它整合为树莓派可用也是有原因的:第一,该up主的代码适用于stm32,并且我也直接用他的mqtt框架在STM32上开发,但后来发现不跑系统直接用一个while循环后面有很多问题就难以解决,而自己又没学过可在stm32上运行的实时操作系统,但对Linux有一点了解,所以这是原因之一。第二,网上也有很多mqtt源码可适用于树莓派,但为了加深对mqtt实现过程的了解,我选择自己整合一下代码,加深印象。

up主:物联网技术大神
视频地址:https://b23.tv/mJEpQh
参考文章:https://blog.csdn.net/u010835747/article/details/108485662

推荐对于mqtt入门可参考:菜鸟教程的MQTT 入门介绍
推荐网上的文章如:MQTT 嵌入式 C语言 客户端libemqtt源码解析
C语言实现mosquitto发布、订阅消息

二、代码

话不多,说先上代码。
main.c

#include "mqtt.h"
#include "tcp.h"
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include<time.h> //C语言的头文件  
#include <pthread.h>
#include <stdlib.h>


static int subscribe_flag = 1;			//订阅标志,(订阅失败时)记载重新订阅次数;若订阅成功则置为0
static int conect_flag = 1;				//MQTT连接标志,(连接失败时)记载重新连接次数;若连接成功则置为0


/********************************************************/
/*函数:指令处理函数(可自行添加指令和处理事务)			*/
/*参数:CmdBuf----接收的指令CmdBuf					    */
/*返回值:无											    */
/********************************************************/
void Cmd_Handler(unsigned char* CmdBuf)
{
		char Cmd1[16] = "open_Led";
		char Cmd2[16] = "open_Beep";
		
		char buf1[24] = "{\"LED\":1}";				  //可上传数据到onenet显示
		if(!memcmp(CmdBuf,Cmd1,strlen(Cmd1))){
				printf("灯已打开\n");
				PayloadPublish(buf1,strlen(buf1));    //可上传数据到onenet显示
		}else if(!memcmp(CmdBuf,Cmd2,strlen(Cmd2))){
				printf("蜂鸣器已打开\n");
		}else{
				printf("未知指令\n");
		}	
}


/***********************************************************/
/*线程函数:接收和处理服务器发过来的数据								       */
/***********************************************************/
void *RcvMessage_thread(void *date)
{
		unsigned int send_len = 0;				//存放TCP接收到消息的长度
		unsigned char buff[256] = {0};			//存放TCP接收到的消息
		unsigned char MQTT_RcvData[400];		//存放处理后的TCP消息
		unsigned char back_buff[100] = {0};		//存放指令


		while(1){

				if(conect_flag == 5){
						printf("多次连接失败,请检查设备信息是否正确\n");
						printf("程序已退出\n");
						exit(0);
				}

				memset(MQTT_RcvData,0,sizeof(MQTT_RcvData));
				memset(buff,0,sizeof(buff));
				send_len = tcp_rcv(buff,sizeof(buff));	//获取TCP接收的数据和数据长度

				if(send_len < 0)						//长度小于0,说明TCP读取数据失败,准备重新连接
				{
						MY_MQTT_RECONECT();
						continue;
				}else{
						memcpy(&MQTT_RcvData[2],buff,send_len);  		  //拷贝数据
						MQTT_RcvData[0] = send_len/256;                   //记录数据长度高字节
						MQTT_RcvData[1] = send_len%256;                   //记录数据长度低字节
				}

				//若第一个字节0x20,则是MQTT连接状态回复
				if(MQTT_RcvData[2]==0x20){             			
						switch(MQTT_RcvData[5]){					
								case 0x00 : printf("CONNECT报文成功\n"); 
											MQTT_Subscribe(S_TOPIC_NAME,QoS_0);        //MQTT连接成功,开始订阅,conect_flag置0                             
											conect_flag = 0;
											break;                                                                                                   
								case 0x01 : printf("连接已拒绝,不支持的协议版本,准备重连\r\n");
											MY_MQTT_RECONECT();                   //MQTT连接失败,准备重连,下面以此类推                     
											conect_flag++;
											continue;
											break;                                                        
								case 0x02 : printf("连接已拒绝,不合格的客户端标识符,准备重连\r\n");
											MY_MQTT_RECONECT();                                             
											conect_flag++;
											continue;                                           
											break;                                                       
								case 0x03 : printf("连接已拒绝,服务端不可用,准备重连\r\n");         
											MY_MQTT_RECONECT();                                             
											continue;                                         
											conect_flag++;
											break;                                                       
								case 0x04 : printf("连接已拒绝,无效的用户名或密码,准备重连\r\n");
											MY_MQTT_RECONECT();                                             
											conect_flag++;
											continue;                                          					
											break;                                                       
								case 0x05 : printf("连接已拒绝,未授权,准备重连\r\n"); 
											MY_MQTT_RECONECT();                                             
											conect_flag++;
											continue;                                           				
											break;                                                        		
								default   : printf("连接已拒绝,未知状态,准备重连\r\n");
											MY_MQTT_RECONECT();                                             
											conect_flag++;
											continue;                                           		
											break;                                                       							
						}

				}			
				//若第一个字节是0x90,表示收到的是SUBACK报文
				else if(MQTT_RcvData[2]==0x90){ 
						switch(MQTT_RcvData[6]){					
								case 0x00 :
								case 0x01 : printf("订阅成功\r\n");        
											subscribe_flag = 0;
											break;  
								default   : printf("订阅失败,准备重新订阅\r\n");
											sleep(1);
											MQTT_Subscribe(S_TOPIC_NAME,QoS_0);
											subscribe_flag++;
											if(subscribe_flag == 5){
													printf("多次订阅失败,准备重新连接!\r\n");
													sleep(1);
													MY_MQTT_RECONECT(); 
											}
											continue;  
											break;                                					
						}					
				}
				//若第一个字节是0xD0,表示收到的是PINGRESP报文
				else if(MQTT_RcvData[2]==0xD0){ 
						printf("PING报文回复\r\n"); 		 
				}	
				//若第一个字节是0x30,表示收到的是服务器发来的推送数据
				else if((MQTT_RcvData[2]==0x30)){ 
						printf("服务器等级0推送\r\n"); 		   
						MQTT_DealPushdata_Qs0(MQTT_RcvData,back_buff);  //处理等级0推送数据
						printf("Cmd: %s\n",back_buff);					//打印指令
						Cmd_Handler(back_buff);							//调用指令处理函数
						memset(back_buff,0,sizeof(back_buff));	
				}					   
		}

}



/***************************************************/
/*线程函数:定时发送ping报文						       	   */
/***************************************************/
void *ping_thread(void *date)
{
		unsigned char buff[32] = {0};				//存放PING报文的缓冲区
		unsigned int send_len = 0;
		
		while(1)
		{	
				if(conect_flag == 0){					//MQTT连接成功时开始发送ping报文
						memset(buff,0,sizeof(buff));
						send_len = MQTT_ping(buff);
						tcp_send(buff,send_len);
						sleep(40);
				}
		}	
}


int main()
{
		pthread_t RcvMessageThread;
		pthread_t PingThread;
	
		My_MQTT_Init();
		My_MQTT_CONECT();

		pthread_create(&RcvMessageThread,NULL,RcvMessage_thread,NULL);	//创建消息处理线程1
		pthread_create(&PingThread,NULL,ping_thread,NULL);				//创建PING报文定时发布线程2

		//等待连接成功则发布消息(也可以再创建一个线程用于消息发布)
		while(conect_flag == 0);
		char ba[40]="{\"temp\":22.8,\"humi\":55.4}";
		MQTT_PublishQs0("kfb_topic",ba,strlen(ba));

		pthread_join(RcvMessageThread,NULL);							//等待线程1
		pthread_join(PingThread,NULL);									//等待线程2

		return 0;
}

mqtt.c

#include <string.h>
#include "mqtt.h"
#include <stdio.h>
#include "tcp.h"
#include <unistd.h>
#include <stdlib.h>


char ClientID[128];                                          //存放客户端ID的缓冲区
int  ClientID_len;                                           //存放客户端ID的长度

char Username[128];                                          //存放用户名的缓冲区
int  Username_len;											 //存放用户名的长度

char Passward[128];                                          //存放密码的缓冲区
int  Passward_len;

static unsigned char MQTT_CONENCT_BUF[200];					  //构建连接报文的缓冲区
static unsigned char MQTT_SUBSCRIBE_BUF[200];				  //构建订阅报文的缓冲区
static unsigned char MQTT_SEND_BUF[200];					  //构建向onenet上传数据点的缓冲区(onenet设备数据流中查看,缓冲区大小看自己上传数据进行适当调整)
static unsigned char MQTT_PUBLISH_BUF[200];					  //构建发送报文的缓冲区(缓冲区大小看自己上传数据进行适当调整)



/*----------------------------------------------------------*/
/*函数名:云初始化参数,得到客户端ID,用户名和密码          */
/*参  数:无                                            	    */
/*返回值:无                                           		    */
/*----------------------------------------------------------*/
void IoT_Parameter_Init(void)
{	
	memset(ClientID,0,128);                              //客户端ID的缓冲区全部清零
	sprintf(ClientID,"%s",DeviesID);                     //构建客户端ID,并存入缓冲区
	ClientID_len = strlen(ClientID);                     //计算客户端ID的长度
	
	memset(Username,0,128);                              //用户名的缓冲区全部清零
	sprintf(Username,"%s",ProductID);                    //构建用户名,并存入缓冲区
	Username_len = strlen(Username);                     //计算用户名的长度
	
	memset(Passward,0,128);                              //用户名的缓冲区全部清零
	sprintf(Passward,"%s",AuthInfo);               		 //构建密码,并存入缓冲区
	Passward_len = strlen(Passward);                     //计算密码的长度
}


/*----------------------------------------------------------*/
/*函数名:构建连接服务器报文                         			        */
/*参  数:	获取完整报文的指针                                           */
/*返回值:完整报文的长度                                               */
/*----------------------------------------------------------*/
unsigned int MQTT_ConectPack(unsigned char *buf)
{	
	int temp,Remaining_len,Fixed_len,Variable_len,Payload_len,Total_len;
	
	
	Fixed_len = 1;                                                        //连接报文中,固定报头长度暂时先=1
	Variable_len = 10;                                                    //连接报文中,可变报头长度=10
	Payload_len = 2 + ClientID_len + 2 + Username_len + 2 + Passward_len; //连接报文中,负载长度      
	Remaining_len = Variable_len + Payload_len;                           //剩余长度=可变报头长度+负载长度
	memset(MQTT_CONENCT_BUF,0,sizeof(MQTT_CONENCT_BUF));
	
	MQTT_CONENCT_BUF[0]=0x10;                //固定报头第1个字节 :固定0x01		
	do{                                      //循环处理固定报头中的剩余长度字节,字节量根据剩余字节的真实长度变化
		temp = Remaining_len%128;            //剩余长度取余128
		Remaining_len = Remaining_len/128;   //剩余长度取整128
		if(Remaining_len>0)               	
			temp |= 0x80;                    //按协议要求位7置位          
		MQTT_CONENCT_BUF[Fixed_len] = temp;         //剩余长度字节记录一个数据
		Fixed_len++;	                     //固定报头总长度+1    
	}while(Remaining_len>0);                 //如果Remaining_len>0的话,再次进入循环
	
	MQTT_CONENCT_BUF[Fixed_len+0]=0x00;     //可变报头第1个字节 :固定0x00	            
	MQTT_CONENCT_BUF[Fixed_len+1]=0x04;     //可变报头第2个字节 :固定0x04
	MQTT_CONENCT_BUF[Fixed_len+2]=0x4D;	    //可变报头第3个字节 :固定0x4D
	MQTT_CONENCT_BUF[Fixed_len+3]=0x51;	    //可变报头第4个字节 :固定0x51
	MQTT_CONENCT_BUF[Fixed_len+4]=0x54;	    //可变报头第5个字节 :固定0x54
	MQTT_CONENCT_BUF[Fixed_len+5]=0x54;     //可变报头第6个字节 :固定0x54
	MQTT_CONENCT_BUF[Fixed_len+6]=0x04;	    //可变报头第7个字节 :固定0x04
	MQTT_CONENCT_BUF[Fixed_len+7]=0xC2; 	//可变报头第8个字节 :使能用户名和密码校验,不使用遗嘱,不保留会话
	MQTT_CONENCT_BUF[Fixed_len+8]=0x00; 	//可变报头第9个字节 :保活时间高字节 0x00
	MQTT_CONENCT_BUF[Fixed_len+9]=0x64;		//可变报头第10个字节:保活时间高字节 0x64   100s
	
	/*     CLIENT_ID      */
	MQTT_CONENCT_BUF[Fixed_len+10] = ClientID_len/256;                			  			    //客户端ID长度高字节
	MQTT_CONENCT_BUF[Fixed_len+11] = ClientID_len%256;               			  			    //客户端ID长度低字节
	memcpy(&MQTT_CONENCT_BUF[Fixed_len+12],ClientID,ClientID_len);                 				//复制过来客户端ID字串	
	/*     用户名        */
	MQTT_CONENCT_BUF[Fixed_len+12+ClientID_len] = Username_len/256; 				  		    //用户名长度高字节
	MQTT_CONENCT_BUF[Fixed_len+13+ClientID_len] = Username_len%256; 				 		    //用户名长度低字节
	memcpy(&MQTT_CONENCT_BUF[Fixed_len+14+ClientID_len],Username,Username_len);                 //复制过来用户名字串	
	/*      密码        */
	MQTT_CONENCT_BUF[Fixed_len+14+ClientID_len+Username_len] = Passward_len/256;			    //密码长度高字节
	MQTT_CONENCT_BUF[Fixed_len+15+ClientID_len+Username_len] = Passward_len%256;			    //密码长度低字节
	memcpy(&MQTT_CONENCT_BUF[Fixed_len+16+ClientID_len+Username_len],Passward,Passward_len);    //复制过来密码字串

	Total_len = Fixed_len + Variable_len + Payload_len;
	memcpy(buf,MQTT_CONENCT_BUF,Total_len);                  //加入发送数据缓冲区
	return Total_len;
}


/*----------------------------------------------------------*/
/*函数名:PING报文,心跳包                                            */
/*参  数:	获取完整报文的指针                                           */
/*返回值:PING报文的长度                                             */
/*----------------------------------------------------------*/
unsigned int MQTT_ping(unsigned char *buf)
{
	buf[0] = 0xc0;
	buf[1] = 0x00;

	return 2;
}



/*----------------------------------------------------------*/
/*函数名:构建与发送订阅报文                      			            */
/*参  数:QoS:订阅等级                                             */
/*参  数:topic_name:订阅topic报文名称                               */
/*返回值:无                                                     */
/*----------------------------------------------------------*/
void MQTT_Subscribe(char *topic_name, int QoS)
{	
	int Remaining_len, Fixed_len, Variable_len, Payload_len, Total_len;
	
	memset(MQTT_SUBSCRIBE_BUF,0,sizeof(MQTT_SUBSCRIBE_BUF));
	
	Fixed_len = 2;                              //SUBSCRIBE报文中,固定报头长度=2
	Variable_len = 2;                           //SUBSCRIBE报文中,可变报头长度=2	
	Payload_len = 2 + strlen(topic_name) + 1;   //计算有效负荷长度 = 2字节(topic_name长度)+ topic_name字符串的长度 + 1字节服务等级
	Total_len = Fixed_len + Variable_len + Payload_len;
	
	MQTT_SUBSCRIBE_BUF[0]=0x82;                                    //第1个字节 :固定0x82                      
	MQTT_SUBSCRIBE_BUF[1]=Variable_len + Payload_len;              //第2个字节 :可变报头+有效负荷的长度	
	MQTT_SUBSCRIBE_BUF[2]=0x00;                                    //第3个字节 :报文标识符高字节,固定使用0x00
	MQTT_SUBSCRIBE_BUF[3]=0x01;		                              //第4个字节 :报文标识符低字节,固定使用0x01
	MQTT_SUBSCRIBE_BUF[4]=strlen(topic_name)/256;                  //第5个字节 :topic_name长度高字节
	MQTT_SUBSCRIBE_BUF[5]=strlen(topic_name)%256;		          //第6个字节 :topic_name长度低字节
	memcpy(&MQTT_SUBSCRIBE_BUF[6],topic_name,strlen(topic_name));  //第7个字节开始 :复制过来topic_name字串		
	MQTT_SUBSCRIBE_BUF[6+strlen(topic_name)]=QoS;                  //最后1个字节:订阅等级
	
	tcp_send(MQTT_SUBSCRIBE_BUF,Total_len);							//发送订阅报文
}


/*----------------------------------------------------------*/
/*函数名:等级0 发布消息报文                                */
/*参  数:topic_name:topic名称                             */
/*参  数:data:数据                                        */
/*参  数:data_len:数据长度                                */
/*返回值:无                                                */
/*----------------------------------------------------------*/
void MQTT_PublishQs0(char *topic, char *data, int data_len)
{	
	int temp,Remaining_len, Fixed_len, Variable_len, Payload_len, Total_len;
	
	Fixed_len = 1;                              //固定报头长度暂时先等于:1字节
	Variable_len = 2 + strlen(topic);           //可变报头长度:2字节(topic长度)+ topic字符串的长度
	Payload_len = data_len;                     //有效负荷长度:就是data_len
	Remaining_len = Variable_len + Payload_len; //剩余长度=可变报头长度+负载长度

	memset(MQTT_PUBLISH_BUF,0,sizeof(MQTT_PUBLISH_BUF));
	
	MQTT_PUBLISH_BUF[0]=0x30;                       //固定报头第1个字节 :固定0x30   	
	do{                                      //循环处理固定报头中的剩余长度字节,字节量根据剩余字节的真实长度变化
		temp = Remaining_len%128;            //剩余长度取余128
		Remaining_len = Remaining_len/128;   //剩余长度取整128
		if(Remaining_len>0)               	
			temp |= 0x80;                    //按协议要求位7置位          
		MQTT_PUBLISH_BUF[Fixed_len] = temp;         //剩余长度字节记录一个数据
		Fixed_len++;	                     //固定报头总长度+1    
	}while(Remaining_len>0);                 //如果Remaining_len>0的话,再次进入循环
		             
	MQTT_PUBLISH_BUF[Fixed_len+0]=strlen(topic)/256;                      //可变报头第1个字节     :topic长度高字节
	MQTT_PUBLISH_BUF[Fixed_len+1]=strlen(topic)%256;		               //可变报头第2个字节     :topic长度低字节
	memcpy(&MQTT_PUBLISH_BUF[Fixed_len+2],topic,strlen(topic));           //可变报头第3个字节开始 :拷贝topic字符串	
	memcpy(&MQTT_PUBLISH_BUF[Fixed_len+2+strlen(topic)],data,data_len);   //有效负荷:拷贝data数据

	Total_len = Fixed_len + Variable_len + Payload_len;
	tcp_send(MQTT_PUBLISH_BUF,Total_len);
}



/*----------------------------------------------------------*/
/*函数名:处理服务器发来的等级0的推送                       */
/*参  数:redata:接收的数据                                */
/*返回值:无                                                */
/*----------------------------------------------------------*/
unsigned int MQTT_DealPushdata_Qs0(unsigned char *redata,unsigned char *buf)
{
	int  re_len;               	           //定义一个变量,存放接收的数据总长度
	int  pack_num;                         //定义一个变量,当多个推送一起过来时,保存推送的个数
    int  temp,temp_len;                    //定义一个变量,暂存数据
    int  totle_len;                        //定义一个变量,存放已经统计的推送的总数据量
	int  topic_len;              	       //定义一个变量,存放推送中主题的长度
	int  cmd_len;                          //定义一个变量,存放推送中包含的命令数据的长度
	int  cmd_loca;                         //定义一个变量,存放推送中包含的命令的起始位置
	int  i;                                //定义一个变量,用于for循环
	int  local,multiplier;
	unsigned char tempbuff[400];	   //临时缓冲区
	unsigned char *data;                   //redata过来的时候,第一个字节是数据总量,data用于指向redata的第2个字节,真正的数据开始的地方

	
	re_len = redata[0]*256+redata[1];                               //获取接收的数据总长度		
	data = &redata[2];                                              //data指向redata的第2个字节,真正的数据开始的 
	pack_num = temp_len = totle_len = temp = 0;                	    //各个变量清零
	local = 1;
	multiplier = 1;
	do{
		pack_num++;                                     			//开始循环统计推送的个数,每次循环推送的个数+1	
		do{			
			temp = data[totle_len + local];   
			temp_len += (temp & 127) * multiplier;
			multiplier *= 128;
			local++;
		}while ((temp & 128) != 0);
		totle_len += (temp_len + local);                          	//累计统计的总的推送的数据长度
		re_len -= (temp_len + local) ;                              //接收的数据总长度 减去 本次统计的推送的总长度      
		local = 1;
		multiplier = 1;
		temp_len = 0;
	}while(re_len!=0);                                  			//如果接收的数据总长度等于0了,说明统计完毕了
	printf("本次接收了%d个推送数据\r\n",pack_num);
	temp_len = totle_len = 0;                		            	//各个变量清零
	local = 1;
	multiplier = 1;
	for(i=0;i<pack_num;i++){                                        //已经统计到了接收的推送个数,开始for循环,取出每个推送的数据 		
		do{
			temp = data[totle_len + local];   
			temp_len += (temp & 127) * multiplier;
			multiplier *= 128;
			local++;
		}while ((temp & 128) != 0);				
		topic_len = data[local+totle_len]*256+data[local+1+totle_len] + 2;    //计算本次推送数据中主题占用的数据量
		cmd_len = temp_len-topic_len;                               //计算本次推送数据中命令数据占用的数据量
		cmd_loca = totle_len + local +  topic_len;                  //计算本次推送数据中命令数据开始的位置
		memcpy(tempbuff,&data[cmd_loca],cmd_len);                   //命令数据拷贝出来		                 
		memcpy(buf,tempbuff,cmd_len);
		totle_len += (temp_len+local);                              //累计已经统计的推送的数据长度
		local = 1;
		multiplier = 1;
		temp_len = 0;
	}
	return cmd_len;
}



/*******************************************************************************/


/**********************************/
/*函数:MQTT初始化函数                    */
/*参数:无                			  */
/*返回值:无                    		  */
/**********************************/
void My_MQTT_Init(){
	int a=0;

	for(int i=0;i<4;i++){			//异常处理:若初始化失败,提供3次重新初始化机会
		a = tcp_init();
		if(a<0){
			if(i==3){
				printf("TCP多次初始化失败,请根据提示消息修正\n");
				printf("程序已退出\n");
				exit(-1);
			}
			printf("TCP初始化失败,准备重新初始化...\n");
			sleep(1);
		}else{
			break;
		}
	}
	IoT_Parameter_Init();
}


/**********************************/
/*函数:MQTT连接服务器函数                   */
/*参数:无                			  */
/*返回值:无                    		  */
/**********************************/
void My_MQTT_CONECT(){
	unsigned int send_len = 0;
	unsigned char sendbuff[200] = {0};

	memset(sendbuff,0,sizeof(sendbuff));
	send_len = MQTT_ConectPack(sendbuff);
	tcp_send(sendbuff,send_len);
}


/**********************************/
/*函数:MQTT重连接函数                   */
/*参数:无                			  */
/*返回值:无                    		  */
/**********************************/
void MY_MQTT_RECONECT(){
	printf("Ready for reconnection!");
	close_tcp();
	sleep(2);
	My_MQTT_Init();
	My_MQTT_CONECT();
}


/******************************************/
/*函数:向onenet上传数据点                         */
/*参数:data:上传的数据(最好为Json格式)*/
/*参数:data_len:数据长度			              */
/*返回值:无                    		          */
/******************************************/
void PayloadPublish(char *data, int data_len){
		int temp, Remaining_len, Fixed_len, Variable_len, Total_len;
		int len2=0;
		int i;

		Fixed_len = 1;                              			//固定报头长度暂时先等于:8字节
		len2=6;
		Variable_len = 2 + data_len;          				 	//可变报头长度:2字节(topic长度)+ topic字符串的长度
		Remaining_len = Variable_len + len2; 			//剩余长度=可变报头长度+负载长度

		memset(MQTT_SEND_BUF,0,sizeof(MQTT_SEND_BUF));
		
		MQTT_SEND_BUF[0]=0x30;                       				//固定报头第1个字节 :固定0x30 
		do{                                      //循环处理固定报头中的剩余长度字节,字节量根据剩余字节的真实长度变化
			temp = Remaining_len%128;            //剩余长度取余128
			Remaining_len = Remaining_len/128;   //剩余长度取整128
			if(Remaining_len>0)               	
				temp |= 0x80;                    //按协议要求位7置位          
			MQTT_SEND_BUF[Fixed_len] = temp;         //剩余长度字节记录一个数据
			Fixed_len++;	                     //固定报头总长度+1    
		}while(Remaining_len>0);  
	
		MQTT_SEND_BUF[Fixed_len+0]=0x00  ; 
		MQTT_SEND_BUF[Fixed_len+1]=0x03  ; 
		MQTT_SEND_BUF[Fixed_len+2]=0x24  ; 
		MQTT_SEND_BUF[Fixed_len+3]=0x64  ; 
		MQTT_SEND_BUF[Fixed_len+4]=0x70  ; 
		MQTT_SEND_BUF[Fixed_len+5]=0x03  ; 
	
		MQTT_SEND_BUF[Fixed_len+6]=data_len/256;                      //可变报头第1个字节     :topic长度高字节
		MQTT_SEND_BUF[Fixed_len+7]=data_len%256;		               //可变报头第2个字节     :topic长度低字节
	
		memcpy(&MQTT_SEND_BUF[Fixed_len+8],data,strlen(data));           //可变报头第3个字节开始 :拷贝topic字符串
		Total_len = Fixed_len + len2 + Variable_len;
		tcp_send(MQTT_SEND_BUF,Total_len);
}

mqtt.h

#ifndef __MQTT_H
#define __MQTT_H

#define  	DeviesID 		 "*********"			//设备ID
#define 	ProductID		 "******"				//产品ID
#define 	AuthInfo 		 "*****"				//鉴权信息
#define 	S_TOPIC_NAME     "********"            //需要订阅的主题
#define     QoS_0             0						//等级0(只支持等级0)				

	

void IoT_Parameter_Init(void);
unsigned int MQTT_ConectPack(unsigned char *buf);
void MQTT_Subscribe(char *topic_name, int QoS);
unsigned int MQTT_ping(unsigned char *buf);
void MQTT_PublishQs0(char *topic, char *data, int data_len);
unsigned int MQTT_DealPushdata_Qs0(unsigned char *redata,unsigned char *buf);

/**************************************/
void My_MQTT_Init();
void My_MQTT_CONECT();
void MY_MQTT_RECONECT();
void PayloadPublish(char *data, int data_len);

#endif

tcp.c

#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h> 
#include <string.h> 
#include <stdlib.h> 
#include <stdio.h>
#include "tcp.h"


int socket_fd = -1;


/***********************************************/
/*函数:TCP客户端初始化函数                     */
/*参数:无                			          */
/*返回值:-1:初始化失败;	0:初始化成功      */
/**********************************************/
int tcp_init(void)
{
	struct sockaddr_in c_addr = {0};

	//创建套接字
	socket_fd = socket(AF_INET, SOCK_STREAM, 0);
	if(socket_fd==-1){
		perror("socket");
		return -1;
    }

	c_addr.sin_family = AF_INET;
	c_addr.sin_port = htons(ServerPort);					//端口
	c_addr.sin_addr.s_addr = inet_addr(ServerIP);			//服务器IP

	//TCP连接服务器
	if(connect(socket_fd,(struct sockaddr*)&c_addr,sizeof(struct sockaddr))==-1){
		perror("connect");
		return -1;
    }else{
		printf("The TCP connection to the server succeeded.!\n");
   	}

	return 0;
}


/*********************************************************/
/*函数:TCP客户端发送数据函数                             */
/*参数:msg:要发送的数据            msg_len:数据长度     */
/*返回值:无               							    */
/********************************************************/
void tcp_send(char *msg,int msg_len)
{
	write(socket_fd, msg, msg_len);
}


/*****************************************************/
/*函数:TCP客户端接收数据函数                         */
/*参数:msg:接收缓冲区       msg_len:接收缓冲区大小 */
/*返回值:                                          */
/***************************************************/
int tcp_rcv(char *msg,int msg_len)
{
	int n_read = 0;
	
	n_read=read(socket_fd,msg,msg_len);
	if(n_read<0){
		perror("read");
		printf("tcp_rcv failed!");
		return -1;    
    }else{
        return n_read;
    }
}


/******关闭套接字******/
void close_tcp(void)
{
	close(socket_fd);
	socket_fd = -1;
}

tcp.h

#ifndef __TCP_H
#define __TCP_H

#define  ServerIP    "183.230.40.39"	//服务器IP
#define  ServerPort   6002				//端口


int tcp_init(void);
void tcp_send(char *msg,int msg_len);
int tcp_rcv(char *msg,int msg_len);
void close_tcp(void);


#endif

三、测试效果

测试工具:mqtt.fx 去官网下载。
先在onenet平台创建一个产品,产品下再创建两个设备。记得创建前选择多协议接入。
在这里插入图片描述

如上,首先我们在代码里填好“树莓派”的产品ID、设备ID、鉴权信息和要订阅的topic(此处我订阅的是手机APP的topic)后编译运行,记得编译时链接线程库 -lpthread ,之后我们便接入onenet的“树莓派”了,此时“树莓派”也显示在线状态。

然后打开mqtt.fx ,输入“手机APP”的信息后连接,再订阅树莓派的topic。
在这里插入图片描述
此时“手机APP”也在线了。
在这里插入图片描述
如果你先连接的是“手机APP”并且订阅了树莓派的topic,那么当你再连接“树莓派”成功时,mqtt.fx会收到树莓派发布过来的一条信息:在这里插入图片描述

接着 mqtt.fx 发布命令"open_Led", “树莓派”会收到命令并执行 printf(“灯已打开\n”);和上传数据"{“LED”:1}"到onenet平台,可在设备“树莓派”的数据流中查看。
在这里插入图片描述
在这里插入图片描述

四、说明

1、由于代码中很多调试信息以及注释用的都是中文,如果开发环境不支持中文可以安装中文包,或者直接把中文改为英文。
2、如果上传或发布数据的频率较大如温湿度上传,那么你可以重新创建一个线程进行操作。如果上传或发布的数据比较长,那么你可以重新分配一下代码中相应的数据缓冲区。
3、对于可能出现的异常如连接失败等我也做了异常处理,但由于我使用时并未触碰过异常,所以不清楚异常处理逻辑是否合理,出现异常时能否正确处理,希望自己在以后使用中能够发现问题。也欢迎大佬批评指正。

  嵌入式 最新文章
基于高精度单片机开发红外测温仪方案
89C51单片机与DAC0832
基于51单片机宠物自动投料喂食器控制系统仿
《痞子衡嵌入式半月刊》 第 68 期
多思计组实验实验七 简单模型机实验
CSC7720
启明智显分享| ESP32学习笔记参考--PWM(脉冲
STM32初探
STM32 总结
【STM32】CubeMX例程四---定时器中断(附工
上一篇文章      下一篇文章      查看所有文章
加:2021-08-08 11:29:42  更:2021-08-08 11:30:03 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/13 13:59:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码