Hi大家好,今天给大家分享下MQTT的入门教学,能帮助大家快速上手使用。
首先在学习MQTT协议前,让我们先了解下MQTT是个什么东西,以下链接简单的介绍了MQTT协议的含义,以及它的特性。没了解过的同学先花几分钟熟悉下,看不懂没关系,在接下来的步骤中会慢慢解释。
https://www.runoob.com/w3cnote/mqtt-intro.htmlhttps://www.runoob.com/w3cnote/mqtt-intro.html
我们先需要下载MQTT的服务器:emqx-broker (MQTT的服务器不止这一种,本次主要以为emqx来讲解)
下载地址为:https://www.emqx.io/
?点击下载按钮,选择EMQX开源版,选择需要下载的版本号和操作系统windows(如果你是使用linux系统也可以下载相应的centos)
?下载完毕之后解压,我们会得到一个EMQX的文件夹,我们进入emqx中,在进入bin中,并用cmd打开这个路径,然后使用emqx start启动这个emqx的服务器。
??
此时代表emqx启动成功,(如果要关闭emqx则需要使用命令cd bin,先进入bin目录下,再使用emqx stop关闭服务器)
我们现在需要判断emqx是否启动成功,先随便打开一个浏览器输入:
http://127.0.0.1:18083
127.0.0.1代表的是你的本机的ip地址,18083代表的是端口号,进入登录界面的端口号。
登录账号为:admin 密码为public
此时登录成功,可以在设置中开启中文,此时也可以点击客户端,查看此时连接emqx服务器的客户端。(此时并没有客户端连接
我们接着打开cmd 输入命令ipconfig,可以看到适配器的ipv4的ip为:192.168.15.1
?我们也可以使用http://192.168.15.1:18083 进入登录页面,账号密码相同,其实本质上来使用192.168.15.1还是127.0.0.1用谁打开服务器都没影响,因为他们都是属于你本机ip地址,但是在后面的客户端连接服务器时,需要使用192.168.15.1这个ip,不然的话可能到时候会连接不上。
接下来我们来考虑客户端连接服务端的问题:
我使用的是android studio来进行安卓开发,首先我们先新建一个项目。
第一步我们先导入我们所需要的mqtt的jar包:org.eclipse.paho.client.mqttv3-1.2.5.jar(因为mqtt不上java自动的库文件,我们需要自己下载导入)
mqtt jar 下载地址:
https://repo.eclipse.org/content/repositories/paho-releases/org/eclipse/paho/
将下载的jar包复制至libs目录下,并右击mqtt jar包 ADD As Libray.. ,将mqtt jar包导入库文件中。
?
首先在MainActivity的java文件中确定编写mqtt的客户端代码如下:
package com.example.mqtt_test1;
import androidx.appcompat.app.AppCompatActivity;
import android.annotation.SuppressLint;
import android.content.DialogInterface;
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.json.JSONObject;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class MainActivity extends AppCompatActivity {
????private String host = "tcp://192.168.15.1:1883";
????private String userName = "admin";
????private String passWord = "public";
????private String mqtt_id="111111";
????private int i = 1;
????private Handler handler;
????private MqttClient client;
????private String mqtt_sub_topic = "second"; //为了保证你不受到别人的消息 ?哈哈
????private String mqtt_pub_topic ="first";
????private MqttConnectOptions options;
????private ScheduledExecutorService scheduler;
????@Override
????protected void onCreate(Bundle savedInstanceState) {
????????super.onCreate(savedInstanceState);
????????setContentView(R.layout.activity_main);
????????TextView text1 = findViewById(R.id.test1);
????????init();
????????startReconnect();
????????handler = new Handler() {
????????????@SuppressLint("SetTextIl8n")
????????????public void handleMessage(Message msg) {
????????????????super.handleMessage(msg);
????????????????switch (msg.what) {
????????????????????case 1: //开机校验更新回传
????????????????????????break;
????????????????????case 2: //反馈回转
????????????????????????break;
????????????????????case 3: //MQTT收到消息回传
????????????????????????text1.setText(msg.obj.toString());
????????????????????????break;
????????????????????case 30: //连接失败
????????????????????????Toast.makeText(MainActivity.this,"连接失败",Toast.LENGTH_SHORT).show();
????????????????????????break;
????????????????????case 31: //连接成功
????????????????????????Toast.makeText(MainActivity.this,"连接成功",Toast.LENGTH_SHORT).show();
????????????????????????try {
????????????????????????????client.subscribe(mqtt_sub_topic,2);
????????????????????????} catch (MqttException e) {
????????????????????????????e.printStackTrace();
????????????????????????}
????????????????????????publishmessageplus(mqtt_pub_topic,"第一个客户端发送的信息");
????????????????????????break;
????????????????????default:
????????????????????????break;
????????????????}
????????????}
????????};
????}
????private void init() {
????????try {
????????????//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
????????????client = new MqttClient(host, mqtt_id,
????????????????????new MemoryPersistence());
????????????//MQTT的连接设置
????????????options = new MqttConnectOptions();
????????????//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
????????????options.setCleanSession(true);
????????????//设置连接的用户名
????????????options.setUserName(userName);
????????????//设置连接的密码
????????????options.setPassword(passWord.toCharArray());
????????????// 设置超时时间 单位为秒
????????????options.setConnectionTimeout(10);
????????????// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
????????????options.setKeepAliveInterval(20);
????????????//设置回调
????????????client.setCallback(new MqttCallback() {
????????????????@Override
????????????????public void connectionLost(Throwable cause) {
????????????????????//连接丢失后,一般在这里面进行重连
????????????????????System.out.println("connectionLost----------");
????????????????}
????????????????@Override
????????????????public void deliveryComplete(IMqttDeliveryToken token) {
????????????????????//publish后会执行到这里
????????????????????System.out.println("deliveryComplete---------"
????????????????????????????+ token.isComplete());
????????????????}
????????????????@Override
????????????????public void messageArrived(String topicName, MqttMessage message)
????????????????????????throws Exception {
????????????????????//subscribe后得到的消息会执行到这里面
????????????????????System.out.println("messageArrived----------");
????????????????????Message msg = new Message();
????????????????????msg.what = 3;
????????????????????msg.obj = topicName + "---" + message.toString();
????????????????????handler.sendMessage(msg);
????????????????}
????????????});
????????} catch (Exception e) {
????????????e.printStackTrace();
????????}
????}
????private void Mqtt_connect() {
????????new Thread(new Runnable() {
????????????@Override
????????????public void run() {
????????????????try {
????????????????????if (!(client.isConnected())){
????????????????????????client.connect(options);
????????????????????????Message msg = new Message();
????????????????????????msg.what=31;
????????????????????????handler.sendMessage(msg);
????????????????????}
????????????????} catch (Exception e) {
????????????????????e.printStackTrace();
????????????????????Message msg = new Message();
????????????????????msg.what = 30;
????????????????????handler.sendMessage(msg);
????????????????}
????????????}
????????}).start();
????}
????private void startReconnect() {
????????scheduler = Executors.newSingleThreadScheduledExecutor();
????????scheduler.scheduleAtFixedRate(new Runnable() {
????????????@Override
????????????public void run() {
????????????????if (!client.isConnected()) {
????????????????????Mqtt_connect();
????????????????}
????????????}
????????}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
????}
????private void publishmessageplus(String topic,String message2)
????{
????????if (client == null || !client.isConnected()) {
????????????return;
????????}
????????MqttMessage message = new MqttMessage();
????????message.setPayload(message2.getBytes());
????????try {
????????????client.publish(topic,message);
????????} catch (MqttException e) {
????????????e.printStackTrace();
????????}
????}
}
在AndroidManifest.xml文件中添加网络状态,确保连接mqtt服务器!
???<uses-permission android:name="android.permission.INTERNET"/>
????<!-- ???获取网络状态-->
????<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
?运行启动,此时打开网页的192.168.15.1:18083可以发现有一个客户端连接成功!
?两个客户端之间传递信息:首先保证两客户端的id不相同,这样才能同时连接上mqtt的服务器。
在连接上mqtt成功的位置确定要订阅的主题mqtt_sub_topic(为了测试,这个订阅的主题为另一个客户端发布的主题)
try {
????????????????????????????client.subscribe(mqtt_sub_topic,1);
????????????????????????} catch (MqttException e) {
????????????????????????????e.printStackTrace();
????????????????????????}
?发布信息使用的方法:
publishmessageplus(mqtt_pub_topic,"第一个客户端发送的信息");
接着再创建一个安卓项目,与第一个步骤相同,只要保证第二个项目的订阅主题是第一个项目的发布主题,发布主题是第一个的订阅主题就行!
?我们查看启动两台虚拟机查看信息结果:
?
?
?
|