环境
mqtt服务
方式一(fusesource):
添加依赖
implementation 'org.fusesource.mqtt-client:mqtt-client:1.16'
界面MQTTActivity
public class MQTTActivity extends AppCompatActivity {
private Unbinder unbinder;
@BindView(R.id.et_mqtt_url)
protected EditText et_mqtt_url;
@BindView(R.id.btn_mq_connect)
protected Button btn_mq_connect;
@BindView(R.id.et_message_publish)
protected EditText et_message_publish;
@BindView(R.id.et_subscribe_topic)
protected EditText et_subscribe_topic;
@BindView(R.id.et_message_content)
protected EditText et_message_content;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_mqtt);
unbinder = ButterKnife.bind(this);
}
@OnClick(R.id.btn_mq_connect)
protected void mqConnect() {
String mqttUrl = et_mqtt_url.getText().toString();
if (TextUtils.isEmpty(mqttUrl)) {
ToastUtils.showShort("mq url为空");
return;
}
MqttManager.getInstance().startConnect(mqttUrl, new MqttManager.OnMessageReceiver() {
@Override
public void onReceiver(String topic, String message) {
et_message_content.append(topic + "<--" + message + "\n");
}
});
}
@OnClick(R.id.btn_message_publish)
public void publishMessage(View view) {
String topic = et_subscribe_topic.getText().toString();
if (TextUtils.isEmpty(topic)) {
ToastUtils.showShort("topic为空");
return;
}
String message = et_message_publish.getText().toString();
if (TextUtils.isEmpty(message)) {
ToastUtils.showShort("发布的消息内容为空");
return;
}
MqttManager.getInstance().publishMessage(topic, message, new Callback<Void>() {
@Override
public void onSuccess(Void value) {
Log.d("MQTTActivity", "publish onSuccess");
et_message_content.append(topic + "-->" + message + "\n");
}
@Override
public void onFailure(Throwable value) {
Log.d("MQTTActivity", "publish onFailure");
}
});
}
@OnClick(R.id.btn_message_subscribe)
protected void mqSubscribe() {
String topic = et_subscribe_topic.getText().toString();
if (TextUtils.isEmpty(topic)) {
ToastUtils.showShort("topic为空");
return;
}
MqttManager.getInstance().addSubscribeMessage(topic);
}
@OnClick(R.id.btn_message_unsubscribe)
protected void mqUnSubscribe() {
String topic = et_subscribe_topic.getText().toString();
if (TextUtils.isEmpty(topic)) {
ToastUtils.showShort("topic为空");
return;
}
MqttManager.getInstance().unSubscribeMessage(topic);
}
@Override
protected void onDestroy() {
super.onDestroy();
MqttManager.getInstance().disconnect();
unbinder.unbind();
}
}
MqttManager
public class MqttManager {
private static final String TAG = "MqttManager";
public static boolean isDisconnect = true;
private final static short KEEP_ALIVE = 30;
private static MqttManager instance;
private static MQTT mqtt;
private CallbackConnection callbackConnection;
private OnMessageReceiver messageCallBack;
private MqttManager() {
mqtt = new MQTT();
mqtt.setVersion("3.1.1");
mqtt.setSendBufferSize(10 * 1024 * 1024);
mqtt.setReceiveBufferSize(10 * 1024 * 1024);
mqtt.setKeepAlive(KEEP_ALIVE);
mqtt.setClientId("test");
mqtt.setCleanSession(false);
mqtt.setUserName("admin");
mqtt.setPassword("password");
}
public static MqttManager getInstance() {
if (instance == null) {
instance = new MqttManager();
}
return instance;
}
public void startConnect(String url, OnMessageReceiver messageCallBack) {
if (url == null || url.length() <= 0) {
return;
}
this.messageCallBack = messageCallBack;
if (callbackConnection != null) {
callbackConnection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void aVoid) {
Log.i(TAG, "disconnect success");
}
@Override
public void onFailure(Throwable throwable) {
Log.i(TAG, "disconnect failed");
throwable.printStackTrace();
}
});
}
try {
mqtt.setHost(url);
mqtt.setCleanSession(true);
callbackConnection = mqtt.callbackConnection();
callbackConnection.listener(new ConnectListener());
callbackConnection.connect(new ConnectCallBack());
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
public void addSubscribeMessage(String topic) {
if (callbackConnection != null) {
Topic[] topics = {new Topic(topic, QoS.EXACTLY_ONCE)
};
callbackConnection.subscribe(topics, new Callback<byte[]>() {
@Override
public void onSuccess(byte[] bytes) {
ToastUtils.showShort("订阅成功");
Log.d(TAG, "订阅成功");
}
@Override
public void onFailure(Throwable throwable) {
ToastUtils.showShort("订阅失败");
Log.d(TAG, "订阅失败:" + throwable.toString());
isDisconnect = true;
callbackConnection.disconnect(null);
}
});
}
}
public void unSubscribeMessage(String topic) {
if (callbackConnection != null) {
UTF8Buffer[] topics = {new UTF8Buffer(topic)};
callbackConnection.unsubscribe(topics, new Callback<Void>() {
@Override
public void onSuccess(Void value) {
ToastUtils.showShort("取消订阅" + topic + "成功");
}
@Override
public void onFailure(Throwable throwable) {
ToastUtils.showShort("取消订阅" + topic + "失败");
Log.i(TAG, "取消订阅" + topic + "失败:" + throwable.toString());
}
});
}
}
public void publishMessage(String topic, String message, Callback callBack) {
if (callbackConnection != null) {
callbackConnection.getDispatchQueue().execute(new Runnable() {
@Override
public void run() {
callbackConnection.publish(topic, message.getBytes(), QoS.AT_MOST_ONCE, false, callBack);
}
});
} else {
isDisconnect = true;
}
}
private class ConnectListener implements Listener {
@Override
public void onConnected() {
Log.i(TAG, "onConnected");
isDisconnect = false;
}
@Override
public void onDisconnected() {
Log.i(TAG, "onDisconnected");
isDisconnect = true;
}
@Override
public void onPublish(UTF8Buffer utf8Buffer, Buffer buffer, Runnable runnable) {
runnable.run();
Log.d(TAG, "receive success");
String topicOrigin = new String(utf8Buffer.toByteArray());
String receiveData = new String(buffer.toByteArray());
LogUtils.d("topicOrigin->" + topicOrigin);
LogUtils.d("receiveData->" + receiveData);
if (messageCallBack != null) {
messageCallBack.onReceiver(topicOrigin, receiveData);
}
}
@Override
public void onFailure(Throwable throwable) {
Log.i(TAG, "onConnect fail");
isDisconnect = true;
callbackConnection.disconnect(null);
throwable.printStackTrace();
}
}
public void disconnect() {
if (callbackConnection != null) {
callbackConnection.disconnect(new Callback<Void>() {
@Override
public void onSuccess(Void aVoid) {
Log.i(TAG, "disconnect success");
}
@Override
public void onFailure(Throwable throwable) {
Log.i(TAG, "disconnect failed");
throwable.printStackTrace();
}
});
}
}
private class ConnectCallBack implements Callback<Void> {
@Override
public void onSuccess(Void aVoid) {
ToastUtils.showShort("连接成功");
Log.i(TAG, "连接成功");
}
@Override
public void onFailure(Throwable throwable) {
ToastUtils.showShort("连接失败");
Log.i(TAG, "连接失败:" + throwable.toString());
}
}
public interface OnMessageReceiver {
void onReceiver(String topic, String message);
}
}
布局文件activity_mqtt.xml
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:background="#FFFFFF"
android:orientation="vertical">
<EditText
android:id="@+id/et_mqtt_url"
android:layout_width="match_parent"
android:layout_height="48dp"
android:hint="tcp://ip:端口"
android:text="tcp://192.168.12.22:61613" />
<Button
android:id="@+id/btn_mq_connect"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="mqtt连接" />
<EditText
android:id="@+id/et_subscribe_topic"
android:layout_width="match_parent"
android:layout_height="48dp"
android:hint="需要订阅的topic" />
<EditText
android:id="@+id/et_message_publish"
android:layout_width="match_parent"
android:layout_height="48dp"
android:gravity="top|left"
android:hint="需要发布消息内容" />
<Button
android:id="@+id/btn_message_publish"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="发布消息" />
<Button
android:id="@+id/btn_message_subscribe"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="订阅消息" />
<Button
android:id="@+id/btn_message_unsubscribe"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="取消订阅消息" />
<EditText
android:id="@+id/et_message_content"
android:layout_width="match_parent"
android:layout_height="0dp"
android:layout_weight="1"
android:background="@color/C_E6E6E6"
android:gravity="top"
android:padding="8dp"
android:scrollbars="vertical" />
</LinearLayout>
权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
方式二(eclipse):
添加依赖
model中的build.gradle
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
implementation 'com.android.support:support-v4:29.0.3'
implementation 'org.greenrobot:eventbus:3.1.1'
根目录中的build.gradle
maven {
url "https://repo.eclipse.org/content/repositories/paho-releases/"
}
界面 MQTTEclipseActivity
public class MQTTEclipseActivity extends AppCompatActivity {
private Unbinder unbinder;
@BindView(R.id.et_mqtt_url)
protected EditText et_mqtt_url;
@BindView(R.id.et_message_publish)
protected EditText et_message_publish;
@BindView(R.id.et_subscribe_topic)
protected EditText et_subscribe_topic;
@BindView(R.id.et_message_content)
protected EditText et_message_content;
private IService iService = null;
private MyServiceConnection conn = null;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_mqtt_eclipse);
unbinder = ButterKnife.bind(this);
EventBus.getDefault().register(this);
conn = new MyServiceConnection();
Intent intent = new Intent(this, MQTTService.class);
bindService(intent, conn, BIND_AUTO_CREATE);
}
@OnClick(R.id.btn_mq_connect)
protected void mqConnect() {
String mqttUrl = et_mqtt_url.getText().toString();
if (TextUtils.isEmpty(mqttUrl)) {
ToastUtils.showShort("mq url为空");
return;
}
if (iService != null) {
iService.invokeServiceConnect(mqttUrl);
}
}
@OnClick(R.id.btn_message_publish)
public void publishMessage(View view) {
String topic = et_subscribe_topic.getText().toString();
if (TextUtils.isEmpty(topic)) {
ToastUtils.showShort("topic为空");
return;
}
String message = et_message_publish.getText().toString();
if (TextUtils.isEmpty(message)) {
ToastUtils.showShort("发布的消息内容为空");
return;
}
if (iService != null) {
iService.invokeServicePublish(topic, message);
}
}
@OnClick(R.id.btn_message_subscribe)
protected void mqSubscribe() {
String topic = et_subscribe_topic.getText().toString();
if (TextUtils.isEmpty(topic)) {
ToastUtils.showShort("topic为空");
return;
}
if (iService != null) {
iService.invokeServiceSubscribe(topic);
}
}
@OnClick(R.id.btn_message_unsubscribe)
protected void mqUnSubscribe() {
String topic = et_subscribe_topic.getText().toString();
if (TextUtils.isEmpty(topic)) {
ToastUtils.showShort("topic为空");
return;
}
if (iService != null) {
iService.invokeServiceUnSubscribe(topic);
}
}
@Subscribe(threadMode = ThreadMode.MAIN)
public void getMqttMessage(MQTTMessage mqttMessage) {
Log.i(MQTTService.TAG, "get topic:" + mqttMessage.getTopic());
Log.i(MQTTService.TAG, "get message:" + mqttMessage.getMessage());
Log.i(MQTTService.TAG, "get qos:" + mqttMessage.getQos());
et_message_content.append(mqttMessage.getTopic() + "-->" + mqttMessage.getMessage() + "\n");
}
@Override
protected void onDestroy() {
super.onDestroy();
unbindService(conn);
unbinder.unbind();
EventBus.getDefault().unregister(this);
}
class MyServiceConnection implements ServiceConnection {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
iService = (IService) service;
}
@Override
public void onServiceDisconnected(ComponentName name) {
}
}
}
布局文件
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:background="#FFFFFF"
android:orientation="vertical">
<EditText
android:id="@+id/et_mqtt_url"
android:layout_width="match_parent"
android:layout_height="48dp"
android:hint="tcp://ip:端口"
android:text="tcp://192.168.12.22:61613" />
<Button
android:id="@+id/btn_mq_connect"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="mqtt连接" />
<EditText
android:id="@+id/et_subscribe_topic"
android:layout_width="match_parent"
android:layout_height="48dp"
android:hint="需要订阅的topic" />
<EditText
android:id="@+id/et_message_publish"
android:layout_width="match_parent"
android:layout_height="48dp"
android:gravity="top|left"
android:hint="需要发布消息内容" />
<Button
android:id="@+id/btn_message_publish"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="发布消息" />
<Button
android:id="@+id/btn_message_subscribe"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="订阅消息" />
<Button
android:id="@+id/btn_message_unsubscribe"
android:layout_width="match_parent"
android:layout_height="48dp"
android:text="取消订阅消息" />
<EditText
android:id="@+id/et_message_content"
android:layout_width="match_parent"
android:layout_height="0dp"
android:layout_weight="1"
android:background="@color/C_E6E6E6"
android:gravity="top"
android:padding="8dp"
android:scrollbars="vertical" />
</LinearLayout>
消息类MQTTMessage
public class MQTTMessage {
private String message;
private String topic;
private String qos;
public MQTTMessage(String message, String topic, String qos) {
this.message = message;
this.topic = topic;
this.qos = qos;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getQos() {
return qos;
}
public void setQos(String qos) {
this.qos = qos;
}
}
MQTTService
public class MQTTService extends Service {
public static final String TAG = MQTTService.class.getSimpleName();
private static MqttAndroidClient client;
private MqttConnectOptions conOpt;
private String userName = "admin";
private String passWord = "password";
private String clientId = "test";
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
return super.onStartCommand(intent, flags, startId);
}
private void init(String uri) {
client = new MqttAndroidClient(this, uri, clientId);
client.setCallback(mqttCallback);
conOpt = new MqttConnectOptions();
conOpt.setCleanSession(true);
conOpt.setConnectionTimeout(10);
conOpt.setKeepAliveInterval(20);
conOpt.setUserName(userName);
conOpt.setPassword(passWord.toCharArray());
connection();
}
private void connection() {
if (!client.isConnected()) {
try {
client.connect(conOpt, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
ToastUtils.showShort("连接成功");
Log.i(TAG, "连接成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
ToastUtils.showShort("连接失败");
Log.i(TAG, "连接失败:" + exception.toString());
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
}
private MqttCallback mqttCallback = new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String str1 = new String(message.getPayload());
EventBus.getDefault().post(new MQTTMessage(new String(message.getPayload()), topic, String.valueOf(message.getQos())));
String str2 = topic + ";qos:" + message.getQos() + ";retained:" + message.isRetained();
Log.i(TAG, "messageArrived:" + str1);
Log.i(TAG, str2);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
@Override
public void connectionLost(Throwable cause) {
connection();
}
};
@Nullable
@Override
public IBinder onBind(Intent intent) {
LogUtils.d("onBind");
return new MyBinder();
}
private void subscribe(String topic) {
try {
client.subscribe(topic, 2, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
ToastUtils.showShort("订阅" + topic + "成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
ToastUtils.showShort("订阅" + topic + "失败");
Log.i(TAG, "订阅" + topic + "失败:" + exception.toString());
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
private void publish(String topic, String msg) {
Integer qos = 0;
Boolean retained = false;
try {
client.publish(topic, msg.getBytes(), qos.intValue(), retained.booleanValue());
} catch (MqttException e) {
e.printStackTrace();
}
}
private void unSubscribe(String topic) {
try {
client.unsubscribe(topic, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
ToastUtils.showShort("取消订阅" + topic + "成功");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
ToastUtils.showShort("取消订阅" + topic + "失败");
Log.i(TAG, "取消订阅" + topic + "失败:" + exception.toString());
}
});
} catch (MqttException e) {
e.printStackTrace();
}
}
class MyBinder extends Binder implements IService {
@Override
public void invokeServiceConnect(String url) {
init(url);
}
@Override
public void invokeServiceSubscribe(String topic) {
subscribe(topic);
}
@Override
public void invokeServiceUnSubscribe(String topic) {
unSubscribe(topic);
}
@Override
public void invokeServicePublish(String topic, String messge) {
publish(topic, messge);
}
}
@Override
public void onDestroy() {
if (client != null) {
client.unregisterResources();
client.close();
client = null;
}
super.onDestroy();
}
}
权限
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
截图(两种方案界面一致)
手机端连接mq后并订阅mytopic,PC端mqttfx发布topic为“mytopic”内容为“ceshishuju”的数据,这时手机端就可以收到对应的数据 PC端mqttfx工具连接后订阅sssa的topic,手机端mq连接后,输入topic,输入发送内容点击发布消息,这是PC端mqttfx就可以收到手机端发送的内容 demo下载地址见这里,免积分
参考: apache-apollo-1.7.1下载及安装 Android消息推送MQTT实战 Android APP必备高级功能,消息推送之MQTT
|