这玩意能干什么?我只能说,这是一个物联网的方案,能通过java客户端监听来自单片机发送的消息,单片机有什么消息?常见的有:比如持久性的传感器数据上报,这你得1s上传一次吧,还有一些控制设备的信息,比如灯,电机之类一次操作持续运转的东西。
写在前面:
曾经用过的一种需要接入Internet的物联网方案(这是目前的主流):
我之前就受益与免费的云平台(云服务器),比如我以前博客里介绍过了的巴法云平台,还有我以前用过的小熊派华为云平台,它们的好处显而易见,就是不需要你去搭建服务器,不需要去了解数据传输的各种细节,你只需要用你的单片机去连上云就行了,用它们的API接口(特别是巴法云平台的接口真的适合小白实现前端和设备的控制),面向接口编程确实给人很舒服。缺点就是你必须得有网络吧,没网络你就连不上云。
本次主角:
一种局域网内物联网方案(非主流,但实用)
优点:不需要接入外网,除了这个优点没什么优点。
步骤:
首先,你得有个MQTT的服务器吧,怎么办,用EMQX搭建一个局域网的MQTT服务器。
然后,用MQTTX或paho或是它本身自带的web测试接口测试是否能正常订阅和发布消息以及能否正常收到消息。
然后,你得有单片机吧,单片机得支持MQTT协议吧,推荐用ESP32(ESP8266),基于开源的arduino,你能在太极创客的文档上轻松学会MQTT的相关知识。然后你得把单片机的数据格式转换成通用的JSON格式去发。
附上太极创客的相关文档链接地址:
零基础入门学用物联网 – MQTT基础篇 – 目录 – 太极创客 (taichi-maker.com)
然后,也就是本次的主题了,用JAVA建立一个客户端,由于要用MQTT,所以java是一个springboot的项目,用其去监听EMQX建立的MQTT服务器的报文(也就是对于主题所发布个各种消息),客户端不局限于用JAVA语言去写,具体可参考EMQX的文档,报文采用JSON格式,为什么要用JSON格式,因为方便用JSON格式反过来创建对象然后将对象的属性持久化到数据库。
MQTT Java 客户端库 | EMQX 文档
你可以用这文档里的相关代码,然后稍微修改,就以我的为参考(已经设置了断开服务器后客户端自动重连)
连接Mysql(主要是拿到数据库的操作对象connection)
package emqx.demo.Console;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.*;
import java.util.Properties;
public class JDBC {
static String driverClass=null;
static String url=null;
static String name=null;
static String password=null;
static {
try {
//读取配置文件的信息
Properties properties= new Properties();
InputStream is=new FileInputStream("MyInfo.properties");//放在工程文件下使用
//InputStream is=JDBCUtil.class.getClassLoader().getResourceAsStream("jdbc.properties");//放在src下使用
//导入输入流
properties.load(is);
//读取属性
driverClass=properties.getProperty("driverClass");
url=properties.getProperty("url");
name=properties.getProperty("name");
password=properties.getProperty("password");
System.out.println(driverClass);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 获取连接对象
* @return
* */
public static Connection getConn() {
Connection conn=null;
try {
Class.forName(driverClass);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
conn = DriverManager.getConnection(url,name,password);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return conn;
}
/** 释放资源
*
*
**/
public static void release(Connection conn,Statement st,ResultSet rs) {
closeRs(rs);
closeSt(st);
closeConn(conn);
}
public static void release(Connection conn,Statement st) {
closeSt(st);
closeConn(conn);
}
private static void closeRs(ResultSet rs) {
try {
if(rs!=null)
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
rs=null;
}
}
private static void closeSt(Statement st) {
try {
if(st!=null)
st.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
st=null;
}
}
private static void closeConn(Connection conn) {
try {
if(conn!=null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}finally {
conn=null;
}
}
}
连接EMQX的MQTT服务器
package emqx.demo.Console;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class ConnectServer {
String subTopic ="ESP32/DEVICE";//订阅主题
int qos =1;//消息服务等级
String broker ="tcp://127.0.0.1:1883";//emqx搭建的mqtt服务器的地址
String clientId ="JavaClient";//这个客户端的名字在emqx上显示的
String content="Java Client is online";//发布消息
String pubtopic="esp32";//发布主题
public ConnectServer() {
MemoryPersistence persistence = new MemoryPersistence();//保存形式以内容保存
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
//设置消息对象
MqttMessage message = new MqttMessage(content.getBytes());//待发送的信息
message.setQos(qos);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_java");
connOpts.setPassword("random".toCharArray());
// 保留会话
connOpts.setCleanSession(false);
//设置超时时间
connOpts.setConnectionTimeout(10);
/*
设置会话心跳时间 单位为秒 服务器会每隔
1.5*20秒的时间向客户端发送个消息判断客户端是否在线,
但这个方法并没有重连的机制
*/
connOpts.setKeepAliveInterval(20);
// 设置回调
client.setCallback(new MyMqttCallback());
// 建立连接
while (!client.isConnected()) {
System.out.println(" client is Connecting to broker: " + broker);
client.connect(connOpts);
}
System.out.println("连接成功");
// 订阅
client.subscribe(subTopic,qos );
System.out.println("订阅主题"+subTopic);
// 发布上线消息
client.publish(pubtopic,message);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
编写回调函数类(这个根据自己的需求写就好了)
回调函数类必须实现MqttCallback接口
package emqx.demo.Console;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.sql.SQLException;
import java.sql.Statement;
/*
* MQTT回调
* 收消息持久化到mysql
*
*
* */
public class MyMqttCallback implements MqttCallback {
public static Statement stmt= null;
ESP32 myesp32;
String mystring;
public MyMqttCallback() {
try {
stmt= JDBC.getConn().createStatement();
}
catch (Exception e){
System.out.println("无法创建statement实例化对象");
}
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接断开,可以做重连");
new ConnectServer();//每次连接断开就又创建一个连接对象
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收来自主题:" + topic+"的信息内容:");
mystring=new String(message.getPayload());
System.out.println(mystring);
myesp32= JSON.parseObject( mystring,ESP32.class);
System.out.println(myesp32.toString());
System.out.println("--------------------------------");
/*
*
* 更新瞬间性操作传感器比如灯,电机的状态
* */
if(myesp32.command==1){
try {
MysqlDao.updata(myesp32.name,myesp32.state,stmt);
System.out.println("更新传感器状态");
}
catch (Exception e){
System.out.println("更新数据失败");
}
}
/*
* 插入传感器持久性上传的数据
*
* */
else if(myesp32.command==2) {
try {
MysqlDao.insert(myesp32.name, myesp32.data, stmt);
System.out.println("插入新数据");
}
catch ( Exception e){
System.out.println("插入新数据失败");
}
}
mystring="";
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
以及编写我的Mydao类,用于数据持久化到数据库
package emqx.demo.Console;
import com.sun.source.tree.LineMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
import sun.nio.cs.FastCharsetProvider;
import java.security.Key;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Repository
public class MysqlDao {
/*
*
* 供接口拿数据的方法
*
* */
//获取某传感器的全部数据
public static Map FindAll(Statement statement, String name, String tablename) throws SQLException {
ResultSet rs=statement.executeQuery( "SELECT id, name, data FROM "+tablename+" where name ='"+name+"'");
Map map=new HashMap();
List list=new ArrayList();
map.put(name,list);
while (rs.next()) {
// 通过字段检索
list.add(rs.getString("data")) ;
}
map.put(name,list);
return map;
}
//获取传感器对应的最新数据
public static String find(String Sensor_name,String key,Statement statement) throws SQLException {
String sql= "SELECT "+ key+" FROM devices.sensor WHERE id=(SELECT MAX(id) FROM sensor GROUP BY `name` HAVING `name`="+"\""+Sensor_name+"\")";
System.out.println(sql);
ResultSet rs=statement.executeQuery(sql);
System.out.println(rs.toString());
rs.next();
System.out.println(rs.getString(key));
return rs.getString(key);
}
public static String GetState(String name ,Statement statement){
String sql= "SELECT state FROM action WHERE `sname` ="+"'"+name+"'";
System.out.println(sql);
try {
ResultSet rs=statement.executeQuery(sql);
rs.next();
return rs.getString("state");
} catch (SQLException e) {
e.printStackTrace();
return "error";
}
}
/*
*
*
* 监听单片机发出的数据
*
* */
//在数据库的表中插入数据
public static boolean insert(String name,String dat,Statement sta) throws SQLException {
String sql="insert into devices.sensor(name,data) VALUES ('"+name+"" +"','"+dat+"')";
System.out.println(sql);
if (sta.execute(sql))return true;
return false;
}
//更新持续性外设如灯,电机的状态一次改变永久使用
public static void updata(String dev,String state,Statement statement) throws SQLException {
String sql="UPDATE devices.action SET state='"+state+"'WHERE sname='"+dev+"'";
System.out.println(sql);
statement.executeUpdate(sql);
}
}
以上就做完了,Mqtt服务器的消息就JAVA客户端监听并被被持久化报存到了数据库中。
最后就是用JAVA的JDBC写数据库相关的API接口供安卓端,微信小程序端去调用,让它们拿数据放前端界面了就ok了。可以的话,可用JAVA的第三方框架Mybatis去简化API接口的编写。(主要简化的是sql语句)
|