最近项目中引入了实时接收服务器数据的功能,考量后通过WebSocket长链接来实现。
接下来了解一下webSocket 的特点: 1、建立在 TCP 协议之上,服务器端的实现比较容易。 2、与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。 3、支持双向通信,实时性更强。 4、数据格式比较轻量,性能开销小,通信高效。 5、可以发送文本,也可以发送二进制数据。 6、没有同源限制,客户端可以与任意服务器通信。 7、协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
在我的项目计划中是通过Okhttp3实现socket长连接,先看一下效果: 依次展示的效果图为开启长链接------关闭长链接-------重连长链接
接下来看一下具体的代码 1、长链接初始化,创建Service服务,通过bindService启动Service服务
/**
* 获取单例,非appContext,要先init
*/
public static ImWebSocketBackgroundService getInstance(Context context) {
if (instance == null) {
synchronized (ImWebSocketBackgroundService.class) {
if (instance == null) {
instance = new ImWebSocketBackgroundService(context);
}
}
}
return instance;
}
private ImWebSocketBackgroundService(Context context) {
this.context = context;
}
private void onCreate() {
if(webSocketServiceManager == null){
webSocketServiceManager = new WebSocketServiceManager(context, this);
webSocketServiceManager.bindService();
sendGeneration = 0;
}
}
public class MyWebSocketService extends Service implements WebSocketResultListener {
private WebSocketThread webSocketThread;
private IWebsocketResponseDispatcher responseDispatcher;
private SocketResultListenerStorage socketResultListenerStorage = new SocketResultListenerStorage();
@Override
public void onCreate() {
super.onCreate();
webSocketThread = new WebSocketThread();
webSocketThread.setWebSocketResultListener(this);
webSocketThread.start();
responseDispatcher = new WebsocketResponseDispatcher();
}
@Nullable
@Override
public IBinder onBind(Intent intent) {
return new WebSocketBinder();
}
@Override
public void onDestroy() {
super.onDestroy();
if (webSocketThread.getHandler() != null)
webSocketThread.getHandler().sendEmptyMessage(MessageType.QUIT);
}
public boolean sendText(String text) {
if (webSocketThread.getHandler() != null) {
Message message = Message.obtain();
message.obj = text;
message.what = MessageType.SEND;
return webSocketThread.getHandler().sendMessage(message);
}
return false;
}
}
在WebSocketServiceManager中创建ServiceConnection类型实例,并重写onServiceConnected()方法和onServiceDisconnected()方法。 当执行到onServiceConnected回调时通过IBinder实例得到Service,实现client与Service的连接 onServiceDisconnected回调被执行时,表示client与Service断开连接
public WebSocketServiceManager(Context context, WebSocketResultListener socketResultListener) {
this.context = context;
this.socketResultListener = socketResultListener;
}
private ServiceConnection serviceConnection = new ServiceConnection() {
@Override
public void onServiceConnected(ComponentName name, IBinder service) {
Log.e(WebSocketThread.TAG, "WebSocketService 已经连接");
serviceBindSucc = true;
binding = false;
bindTime = 0; //连接成功后归零
myWebSocketService = ((MyWebSocketService.WebSocketBinder)service).getWebSocketService();
myWebSocketService.addSocketListener(socketResultListener);
if(myWebSocketService.isSendConnected()) {
if (socketResultListener != null) {
socketResultListener.connection();
}
}
}
@Override
public void onServiceDisconnected(ComponentName name) { //service异常被关闭 回调该方法
binding = false;
serviceBindSucc = false;
if (bindTime<5 && !binding){
Log.e(WebSocketThread.TAG, String.format("WebSocketService 连接断开,开始第%s次重连", bindTime));
bindService();
}
}
};
//绑定服务
public void bindService(){
serviceBindSucc = false;
binding = true;
Intent intent = new Intent(context, MyWebSocketService.class);
context.bindService(intent,serviceConnection, Context.BIND_AUTO_CREATE);
bindTime++;
}
2、在service创建时初始化Thread线程,通过Handler发送消息建立连接 ? ?2.1?配置OkHttpClient 其中head中需要的参数依照大家项目中需要参数随时改变
private void initHeader() {
headerMap.put("sourse", "android");
headerMap.put("appVersion", Constants.VERSION_CODE);
headerMap.put("token", Constants.TOKEN);
headerMap.put("devCode",Constants.DevCode);
}
OkHttpClient mClient = new OkHttpClient.Builder()
.readTimeout(3, TimeUnit.SECONDS)//设置读取超时时间
.writeTimeout(3, TimeUnit.SECONDS)//设置写的超时时间
.connectTimeout(3, TimeUnit.SECONDS)//设置连接超时时间
.build();
?2.2 使用Url,构建WebSocket请求 WEB_SOCKET_URL = "ws://47.74.235.101:8190/ws-community-im-websocket"; WEB_SOCKET_URL 依照大家项目中具体路径改变
private void initRequest() {
Request.Builder requestBuilder = new Request.Builder().url(WEB_SOCKET_URL);
if(headerMap == null){
initHeader();
}
if (headerMap != null && !headerMap.isEmpty()) {
Set<String> strings = headerMap.keySet();
StringBuffer stringBuffer = new StringBuffer();
for (String string : strings) {
stringBuffer.append(string).append(" ").append(headerMap.get(string)).append("\t");
}
Log.e("web", stringBuffer.toString());
requestBuilder.headers(Headers.of(headerMap));
}
request = requestBuilder.build();
}
2.3 发起连接,配置回调。
- onOpen(),连接成功
- onMessage(String text),收到字符串类型的消息,一般我们都是使用这个
- onMessage(ByteString bytes),收到字节数组类型消息,我这里没有用到
- onClosed(),连接关闭
- onFailure(),连接失败,一般都是在这里发起重连操作
//开始连接
WebSocket websocket = mClient.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//连接成功...
}
@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
//收到消息...(一般是这里处理json)
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
//收到消息...(一般很少这种消息)
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
//连接关闭...
}
@Override
public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
super.onFailure(webSocket, throwable, response);
//连接失败...
}
});
2.4 使用WebSocket对象发送消息,msg为消息内容,send方法会马上返回发送结果
//发送消息
webSocket.send(msg);
3、通过第2步连接成功后启动心跳,每隔10秒钟发起一次心跳,在onMessageReceive中接收到后台返回的json信息,在本项目中通过后台返回的code值判断此时时在线收到消息还是断开连接,大家依照各自项目改变
SEND_MESSAGE = 601; //在线接收消息
private static final int DISCONNECT_MESSAGE = 602; //断开连接
//开始心跳
public void startHeartBeat() {
Log.e(WebSocketThread.TAG, "启动心跳");
runHeartBeat = true;
if (scheduler == null) {
scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(this, getDelay(), TimeUnit.SECONDS);
}
}
@Override
public void onMessageReceive(String jsonText) {
try {
JSONObject jsonObject = new JSONObject(jsonText);
Log.i(WebSocketThread.TAG,jsonObject+"");
if(jsonObject.has("msgType") && Integer.valueOf(jsonObject.getInt("msgType")) != null){
int msgType = jsonObject.getInt("msgType");
if (msgType == SEND_MESSAGE) { //发送消息
String msg = jsonObject.getString("msg");
Log.d(WebSocketThread.TAG, msg);
} else if (msgType == DISCONNECT_MESSAGE) { //断开连接
stopHeartBeat();
}
}
} catch (JSONException e) {
e.printStackTrace();
}
}
@Override
public void connection() {
Log.e(WebSocketThread.TAG, "连接成功");
startHeartBeat();
}
@Override
public void run() {
if (runHeartBeat) {
if (sendText(defineText())) {
sendGeneration++;
Log.d(WebSocketThread.TAG,"发送心跳成功"+sendGeneration);
}
scheduler.schedule(this, getDelay(), TimeUnit.SECONDS);
}
}
private int getDelay() {
return 10;
}
4、重连机制----处理切换网络,被挤掉线等情况 在第3步中提到的onFailure()方法中进行重连,连接成功后重新接收到后台返回的json信息
public class ReconnectWebSocketManager {
private WebSocketThread webSocketThread;
private volatile boolean retrying; //正在重新连接
private volatile boolean destroyed ;
private final ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); //单线程
public ReconnectWebSocketManager(WebSocketThread webSocketThread) {
this.webSocketThread = webSocketThread;
this.retrying = false;
this.destroyed = false;
}
synchronized void performReconnect(){
retrying = false;
retry();
}
private synchronized void retry() {
if (!retrying){
retrying = true;
synchronized (singleThreadPool){
singleThreadPool.execute(new ReconnectRunnable());
}
}
}
//销毁
public void destroy(){
destroyed = true;
if (singleThreadPool !=null)
singleThreadPool.shutdownNow();
webSocketThread = null;
}
public class ReconnectRunnable implements Runnable {
public void run() {
retrying = true;
for (int i = 0;i <20 ; i++){
if (destroyed){
retrying = false;
return;
}
Handler handler = webSocketThread.getHandler();
if (handler !=null){
if (webSocketThread.getConnectStatus() ==2) //已连接
break;
if (webSocketThread.getConnectStatus() ==1) //正在连接
continue;
if (webSocketThread.getConnectStatus() ==0)
handler.sendEmptyMessage(MessageType.CONNECT);
}else {
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
retrying = false;
}
}
}
}
到此长链接主要代码已经讲完了,别忘了在清单文件中注册service,添加网络权限
<service android:name="com.example.msg.imwebsocket.MyWebSocketService"/>
<uses-permission android:name="android.permission.INTERNET" />
在build.gradle中添加okhttp3依赖
implementation 'com.squareup.okhttp3:okhttp:3.10.0'
希望能对于长链接需求的同学帮助,欢迎讨论,附上项目的代码地址demo ?
|