设备通过mqtt消息上报坐标信息保存到时序数据库,查询历史数据回放设备轨迹。
#include <stdio.h>
#include <string.h>
#include "mosquitto.h"
#include <cjson/cJSON.h>
#include "influxdb.h"
#define LOGOUT 0
#define LOGIN 1
#define STREAMING 2
#define TALKBACK 3
char mqtt_host[512] = "localhost";
int mqtt_port = 1883;
char username[48] = "test";
char password[48] = "12345";
char clientid[48] = "presence";
void saveUwb(cJSON *root){
char buf[512] = "uwb,";
time_t t;
time(&t);
cJSON *TagId = cJSON_GetObjectItem(root,"TagId");
if(TagId){
snprintf(buf+strlen(buf), sizeof(buf) - sizeof(char) * strlen(buf), "tagid=%s ",TagId->valuestring);
}
cJSON *x = cJSON_GetObjectItem(root,"x");
if(x){
snprintf(buf+strlen(buf), sizeof(buf) - sizeof(char) * strlen(buf), "x=%s",x->valuestring);
}
cJSON *y = cJSON_GetObjectItem(root,"y");
if(y){
snprintf(buf+strlen(buf), sizeof(buf) - sizeof(char) * strlen(buf), ",y=%s",y->valuestring);
}
cJSON *sn = cJSON_GetObjectItem(root,"sn");
if(sn){
snprintf(buf+strlen(buf), sizeof(buf) - sizeof(char) * strlen(buf), ",sn=%s",sn->valuestring);
}
snprintf(buf+strlen(buf), sizeof(buf) - sizeof(char) * strlen(buf), ",datetime=%ld",t);
s_influxdb_client *client = influxdb_client_new("localhost:8086","admin","admin","mydb",0);
printf("buf : %s\n",buf);
int status = influxdb_insert(client,buf);
printf("status : %d,%s\n",status,client->dbname);
influxdb_client_free(client);
}
void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
cJSON * root = cJSON_Parse((char *)message->payload);
if (!root){
printf("Received msg payload[%s], json parse failed\n", (char*)message->payload);
return;
}
printf("Received message is %s\n", (char *)message->payload);
printf("topic is %s\n", (char *)message->topic);
if (!strcasecmp((char *)message->topic,"/mvms/client/uwb/")){
saveUwb(root);
}
if (root){
cJSON_Delete(root);
}
}
int main(int argc, char * argv[])
{
int rc = 0;
struct mosquitto *mosq;
mosquitto_lib_init();
mosq = mosquitto_new(clientid, true, NULL);
if (mosq){
mosquitto_username_pw_set(mosq, username, password);
mosquitto_connect_callback_set(mosq, connect_callback);
mosquitto_message_callback_set(mosq, message_callback);
rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 45);
char topic_uwb[512]={0};
snprintf(topic_uwb, sizeof(topic_uwb)-1, "/mvms/client/uwb/#");
mosquitto_subscribe(mosq, NULL, topic_uwb, 2);
mosquitto_loop_forever(mosq, -1, 1);
mosquitto_destroy(mosq);
}
mosquitto_lib_cleanup();
return rc;
}
|