MQTT(消息队列遥测传输)是ISO?标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件?。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
,百度文库和官方网站介绍很详细,如下
MQTT_百度百科 (baidu.com)
首页 | MQTT中文网 (p2hp.com)
简单总结一下。使用订阅/发布模式,将消息的发送方和接受方解耦。mqtt分别由发布(发送方)和订阅(接口方)、MQTT Broker(代理服务器)组成。代理服务器安装在我们的服务器。智能硬件设备是发布也是订阅。同样APP端或小程序等客户端也是发布和订阅。硬件上报数据到服务器,服务器发到APP,这个时候硬件是发布,而APP是订阅,如果说APP发一条指令给智能硬件,app是发布,智能硬件是订阅。
一、选择下载代理服务器及启动
MQTT代理服务器有很多种,这里使用emqx,部署在windown系统上。也可以部署在linnux。
1、首先下载emqx,下载链接如下:
Download EMQX
选择下载后,保存到电脑磁盘,解压出来,不在放在带中文目录。如图下
2、进入bin目录,复制目录路径
3、 启动emqx,cmd打开命令行窗口,输入D:回车,然后进入复制的目录路径进入,emqt start启动,如下
cd?D:\emqx-5.0.0-otp24.2.1-windows-amd64\bin
emqt start
4、启动emqt 后,我们打开管理页面,输入http://192.168.2.6:18083/#/login?to=/subscriptions/subscription
也可以http://127.0.0.1:18083/#/login?to=/subscriptions/subscription?
cmd打开命令窗口,输入ipconfig回车,查看本机电脑IP,通过局域网ip方式访问。我们使用APP就得通过局域网IP进行通信,如下会介绍,在浏览器打开链接后,可以看到一个登录页面,输入账号密码登录可以进入首页,账号默认是admin,密码默认public,登录后可以看到的页面如下
点击这里可以查看到已经连接的客户端
?打开这里可以看到主题订阅
打开这里设置显示的语言
?二、客户端测试工具mqttfx使用
下载地址如下
Download (jensd.de)
1、我们百度下载mqttfx-1.7.1-windows-x64进行安装到电脑或服务器上,我这里测试的,下载windown 版本的,安装完成打开如下。
?2、查看发布(发送方)使用
?3、查看订阅(查看订阅数据)?
三、手动写安卓APP代码实现发布和订阅(发送数据和接收数据)
使用android stutio 开发工具,使用java开发
1、添加权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
2、在build.gradle添加包
plugins {
id 'com.android.application'
}
android {
compileSdk 32
defaultConfig {
applicationId "com.nyw.mqttdemo"
minSdk 21
targetSdk 32
versionCode 1
versionName "1.0"
testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
}
}
compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
}
dependencies {
implementation 'androidx.appcompat:appcompat:1.3.0'
implementation 'com.google.android.material:material:1.4.0'
implementation 'androidx.constraintlayout:constraintlayout:2.0.4'
testImplementation 'junit:junit:4.13.2'
androidTestImplementation 'androidx.test.ext:junit:1.1.3'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.4.0'
//添加mqtt 2个包
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
在settings.gradle添加包下载链接
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
pluginManagement {
repositories {
gradlePluginPortal()
google()
mavenCentral()
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}
}
dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.FAIL_ON_PROJECT_REPOS)
repositories {
google()
mavenCentral()
}
}
rootProject.name = "MQTTDemo"
include ':app'
3、页面
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<TextView
android:id="@+id/tv_msg"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:text="Hello World!"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintTop_toTopOf="parent" />
<Button
android:id="@+id/btn_send_data"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="发送数据"
tools:layout_editor_absoluteX="-46dp"
tools:layout_editor_absoluteY="131dp" />
</androidx.constraintlayout.widget.ConstraintLayout>
4、java代码?
package com.nyw.mqttdemo;
import androidx.appcompat.app.AppCompatActivity;
import android.annotation.SuppressLint;
import android.content.ComponentName;
import android.content.Intent;
import android.content.ServiceConnection;
import android.os.Bundle;
import android.os.Handler;
import android.os.IBinder;
import android.os.Message;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import android.widget.Toast;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MainActivity extends AppCompatActivity {
private Button btn_send_data;
private TextView tv_msg;
//访问的地址,这里填写mqtt代理服务器ip
private String host = "tcp://192.168.2.6:1883";
//登录账号
private String userName = "admin";
//登录密码
private String passWord = "public";
//这个是mqtt_id 唯一id,每个客户端应该生成不同的唯一id,这里测试演示数据,我就写固定的id,打包到不同的手机上安装,要手动修改或修改成动态id,获取随机数+时间方式生成一个随机数作为唯一id
private String mqtt_id="1";
private int i = 1;
private Handler handler;
private MqttClient client;
private String mqtt_sub_topic = "second3"; //订阅主题(消息接收方)
private String mqtt_pub_topic ="first";//发布主题(消息发送方)
private MqttConnectOptions options;
private ScheduledExecutorService scheduler;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
inintView();
}
private void inintView() {
btn_send_data=findViewById(R.id.btn_send_data);
tv_msg=findViewById(R.id.tv_msg);
btn_send_data.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
//发送数据
publishmessageplus(mqtt_pub_topic,"我要发布订阅,发送数据");
}
});
init();
startReconnect();
handler = new Handler() {
@SuppressLint("SetTextIl8n")
public void handleMessage(Message msg) {
super.handleMessage(msg);
switch (msg.what) {
case 1: //开机校验更新回传
break;
case 2: //反馈回转
break;
case 3: //MQTT收到消息回传
tv_msg.setText(msg.obj.toString());
break;
case 30: //连接失败
Toast.makeText(MainActivity.this,"连接失败",Toast.LENGTH_SHORT).show();
break;
case 31: //连接成功
Toast.makeText(MainActivity.this,"连接成功",Toast.LENGTH_SHORT).show();
try {
client.subscribe(mqtt_sub_topic,2);
} catch (MqttException e) {
e.printStackTrace();
}
// publishmessageplus(mqtt_pub_topic,"第一个客户端发送的信息");
break;
default:
break;
}
}
};
}
/**
* 初始化MQTT配制
*/
private void init() {
try {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, mqtt_id,
new MemoryPersistence());
//MQTT的连接设置
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(userName);
//设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置回调
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
System.out.println("connectionLost----------");
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
System.out.println("deliveryComplete---------"
+ token.isComplete());
}
@Override
public void messageArrived(String topicName, MqttMessage message)
throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println("messageArrived----------");
Message msg = new Message();
msg.what = 3;
//重新编码,解决乱码问题
String strMsg= new String(message.getPayload(),"GB2312");
msg.obj = topicName + "---" + strMsg;
handler.sendMessage(msg);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 连接
*/
private void Mqtt_connect() {
new Thread(new Runnable() {
@Override
public void run() {
try {
if (!(client.isConnected())){
client.connect(options);
Message msg = new Message();
msg.what=31;
handler.sendMessage(msg);
}
} catch (Exception e) {
e.printStackTrace();
Message msg = new Message();
msg.what = 30;
handler.sendMessage(msg);
}
}
}).start();
}
/**
* 线程池执行任务,心跳包,当断开连接后,执行自动连接代理服务器
*/
private void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (!client.isConnected()) {
Mqtt_connect();
}
}
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
}
/**
* 发布订阅
* @param topic
* @param message2
*/
private void publishmessageplus(String topic,String message2)
{
if (client == null || !client.isConnected()) {
return;
}
MqttMessage message = new MqttMessage();
message.setPayload(message2.getBytes());
try {
client.publish(topic,message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
5、运行打包APP测试,这里放截图出来了,源码下载链接如下
androidmqttdemo代码可直接使用-Android文档类资源-CSDN下载
|