目的是为了实时监控室外广告屏的亮度,界面,声音,开关机等等…… 因为室外的网络情况是随时可变的,所以采用的MQTT协议,作为Android客户端来说因为用MQTT发送消息太繁琐,我们采用的是客户端只接收命令,然后用Http进行数据反馈,这个项目近期也做完了,故记录一下。
第一步:导入在线库
// mqtt 包导入
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
第二步:声明广播(这个是在线库自带的 直接这么写就行,不用自己写这个service)
<service android:name="org.eclipse.paho.android.service.MqttService" />
第三步:MqttManager的全部代码
public class MqttManager {
public final static String TAG = MqttManager.class.getSimpleName();
@SuppressLint("StaticFieldLeak")
private static volatile MqttManager mInstance = null;
private MqttCallback mCallback;
public MqttClient client;
private MqttConnectOptions conOpt;
private Context context;
public String[] topic;
private MqttManager(Context context) {
mCallback = new MqttCallbackBus(context);
this.context = context;
}
public static MqttManager getInstance(Context context) {
if (mInstance == null) {
synchronized (MqttManager.class) {
if (mInstance == null) {
mInstance = new MqttManager(context);
}
}
}
return mInstance;
}
/**
* 释放单例, 及其所引用的资源
*/
public static void release() {
try {
if (mInstance != null) {
mInstance.disConnect();
mInstance = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 创建Mqtt 连接
*
* @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
* @param userName 用户名
* @param password 密码
* @return
*/
public boolean createConnect(String brokerUrl, String userName, String password) {
topic = new String[]{PUBLISH_TOPIC};
String deviceId = Utils.getDeviceNo();
if (client != null && client.isConnected()) {
return true;
}
boolean flag = false;
try {
conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true); //不接收离线期间的消息 每次都是重新登陆
conOpt.setAutomaticReconnect(true); //自动重连
if (!TextUtils.isEmpty(password)) {
conOpt.setPassword(password.toCharArray());
}
if (!TextUtils.isEmpty(userName)) {
conOpt.setUserName(userName);
}
client = new MqttClient(brokerUrl, deviceId, new MemoryPersistence());
client.setCallback(mCallback);
flag = doConnect();
client.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
//重连
String msg = "exception_reconnect";
EventBus.getDefault().post(msg);
}
return flag;
}
/**
* 建立连接
*
* @return
*/
private boolean doConnect() {
boolean flag = false;
if (client != null) {
try {
client.connect(conOpt);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
}
return flag;
}
public boolean publish(String topicName, int qos, byte[] payload) {
boolean flag = false;
try {
if (client == null) {
createConnect(HOST, USERNAME, PASSWORD);
}
if (!client.isConnected()) {
this.reconnect();
}
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
client.publish(topicName, message);
flag = true;
} catch (MqttException e) {
e.printStackTrace();
}
return flag;
}
private boolean subscribe(String topicName, int qos) {
boolean flag = false;
if (client != null && client.isConnected()) {
try {
client.subscribe(topicName, qos);
flag = true;
} catch (MqttException e) {
e.printStackTrace();
}
}
return flag;
}
private boolean subscribe(String[] topicName, int qos[]) {
boolean flag = false;
if (client != null && client.isConnected()) {
try {
client.subscribe(topicName, qos);
flag = true;
} catch (MqttException e) {
e.printStackTrace();
}
}
return flag;
}
/**
* 取消连接
*
* @throws MqttException
*/
public void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
}
}
/**
* 关闭连接
*/
public void close() {
if (client != null && client.isConnected()) {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
/**
* 重新连接
*/
private void reconnect() {
if (client != null && !client.isConnected()) {
try {
client.setCallback(mCallback);
client.connect(conOpt);
client.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
}
里面用eventBus做了下重连处理,后续响应如下
boolean isHavNet = isConnectIsNomarl(); //判断是否有网
if (isHavNet) {
handler.removeMessages(4);
MqttManager.getInstance(BaseApplication.getContext()).createConnect(HOST, USERNAME, PASSWORD);
}else{
handler.sendEmptyMessageDelayed(4, 10000);//无网等待有网再连
}
第四步:mqtt消息回调类
public class MqttCallbackBus implements MqttCallbackExtended {
private Context mContext;
public MqttCallbackBus(Context context) {
this.mContext = context;
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
Log.e("MqttCallbackBus", "MQTT_connectComplete:");
//断开连接必须重新订阅才能收到之前订阅的session连接的消息
if(reconnect){
Log.e("MqttCallbackBus_重连订阅主题", "MQTT_connectComplete:");
//这里是发送消息去重新订阅
String msg = "reconnect";
EventBus.getDefault().postSticky(msg);
}
}
@Override
public void connectionLost(Throwable cause) { //掉线
Log.e("MqttCallbackBus>>>", "MQTT_connectionLost 掉线原因:"+cause.getMessage());
cause.printStackTrace();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
MqttReceivedMessage msg = (MqttReceivedMessage) message;
String s = msg.toString();
Log.e("MQTT_messageArrived_msg", "MQTT_msg"+topic);
HashMap hs = new Gson().fromJson(s, HashMap.class);
EventBus.getDefault().post(hs); //抛出收到的消息 eventbus响应做自己的业务处理
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) { //(发布)publish后会执行到这里,发送状态 token.isComplete()
Log.e("MqttCallbackBus>>>", "MQTT_deliveryComplete:");
}
}
其中connectComplete的重连回调reconnect = true 的eventbus的响应是:
//重连 能进重连 说明必有网了 不必再else 定时重试网络
String[] topic = new String[]{PUBLISH_TOPIC};
try {
MqttManager.getInstance(BaseApplication.getContext()).client.subscribe(topic);
} catch (MqttException e) {
e.printStackTrace();
}
MainActivity的使用如下:
//网络变化监听 首次连接mqtt
boolean isHavNet = isConnectIsNomarl();
if (isHavNet) {
handler.removeMessages(1);
MqttManager.getInstance(BaseApplication.getContext()).createConnect(HOST, USERNAME, PASSWORD);
} else {
handler.sendEmptyMessageDelayed(1, 5000);
}
MQTT全局的配置参数:
?
附赠个判断是否有网的工具类
/**
* 判断网络是否连接
*/
private boolean isConnectIsNomarl() {
ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE);
@SuppressLint("MissingPermission") NetworkInfo info = connectivityManager.getActiveNetworkInfo();
if (info != null && info.isConnected()) {
String name = info.getTypeName();
return true;
} else {
return false;
}
}
|