HTML5 浏览器一方面普及了 WebSocket 的应用,同时我们也感受到 WebSocket 所带来的好处。那么怎么在 Java 中实现 WebSocket 呢?在本文中我们为大家介绍一下,如有不足,敬请提出:)
WebSocket 的特点
WebSocket 的特点就是全双工,不仅浏览器可以发消息给服务端,而且可以反过来,服务器端也能发消息给浏览器,——此为最重要的一点。想想看没有 WebSocket 的日子,服务器端怎么主动发消息给浏览器?客户端轮询?长链接?——都是 Hack 的方法,而且并非服务端自己主动要求发消息给浏览器的。如今,有了 WebSocket,大家就可以互通有无,十分畅快的沟通。
WebSocket 与 Socket 的关系?
抱歉,没有半毛钱的关系哦。
WebSocket 服务端与客户端
毫无疑问,服务端与客户端对应有两种不同的逻辑,我们分别来看看。本文使用同一种语言 Java 去描述服务端、客户端。
WebSocket 服务端
一般较新的 Servlet 规范(Servlet > v3.0)已经支持 WebSocket,Tomcat 里面直接支持,不需要引入其他 jar 包。下面,我们创建一个基类提供基本的 WebSocket 服务端功能。
该源码在这里。
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.Session;
import com.ajaxjs.util.logger.LogHelper;
import com.ajaxjs.util.map.JsonHelper;
public abstract class BaseWebsocketServer {
private static final LogHelper LOGGER = LogHelper.getLog(BaseWebsocketServer.class);
protected static final Set<WebSocketEntity> CONNECTIONS = new CopyOnWriteArraySet<>();
public void sendMessageJson(Object obj) {
sendMessage(JsonHelper.toJson(obj));
}
public void sendMessage(String msg) {
for (WebSocketEntity clients : CONNECTIONS) {
clients.sendText(msg);
}
}
@OnClose
public void onClose(Session session, CloseReason reason) {
LOGGER.info("WebSocket 关闭");
WebSocketEntity toRemove = null;
for (WebSocketEntity e : CONNECTIONS) {
if (e.getSession().equals(session)) {
toRemove = e;
break;
}
}
if (toRemove != null)
CONNECTIONS.remove(toRemove);
}
@OnError
public void onError(Session session, Throwable e) {
LOGGER.warning(session.getId() + " 连接发生错误 " + e.getMessage());
e.printStackTrace();
}
}
WebSocket 通讯有几种事件,对应不同的 Java 注解(@OnOpen 、@OnClose 等),添加到 Java 方法上。然而这些方法的参数,如 Session session 、CloseReason reason ,不是固定的,可以比较自由地配搭。
静态变量 CONNECTIONS = new CopyOnWriteArraySet<WebSocketEntity>() 是记住已连接的客户端所用。WebSocketEntity 是我们封装的客户端 Bean,当前比较简单,只保存的 session 对象。你可以根据业务增加相应的字段。
public class WebSocketEntity {
private Session session;
public WebSocketEntity(Session session) {
this.session = session;
}
public void sendText(String message) {
session.getAsyncRemote().sendText(message);
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
}
需要客户端 id 标识吗?其实 session.getId() 可返回 id。
这里为什么用 CopyOnWriteArraySet ?原来这是一种不需要加锁的多并发机制,原先这点子是参考这博客的, CopyOnWriteArraySet 原理参考这里。
子类
有基类自然有子类,下面以一个告警的通知为例:创建一个 WebSocket 服务端类WarningWebSocketServer ,并在类前添加@ServerEndpoint(value = "/MessageCenter/warning") 注解,该注释端点表示将 WebSocket 服务端运行在 ws://[Server 端 IP 或域名]:[Server 端口]/项目名/MessageCenter/warning 的访问端点。
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import com.ajaxjs.net.websocket.BaseWebsocketServer;
import com.ajaxjs.net.websocket.WebSocketEntity;
import com.ajaxjs.util.logger.LogHelper;
@ServerEndpoint("/MessageCenter/warning")
@Component
public class WarningWebSocketServer extends BaseWebsocketServer {
private static final LogHelper LOGGER = LogHelper.getLog(WarningWebSocketServer.class);
@OnOpen
public void onOpen(Session session) {
LOGGER.info("已连接告警 WebSocket");
CONNECTIONS.add(new WebSocketEntity(session));
}
@OnMessage
public void onMessage(String message) {
LOGGER.info("WebSocket.onMessage: " + message);
}
}
通过 @Component 注解,我们把 WarningWebSocketServer 作为 Spring 的一个 Component,即 Bean 去调用,当执行 WarningWebSocketServer.sendMessage() /sendMessageJson() 时候就可以给 WebSocket 客户端发送消息。
WebSocket 客户端
WebSocket 客户端源码在这里。创建客户端实例依赖一个 ws://服务端地址 ,通过 connect(String server) 连接 WebSocket 服务器,然后回调函数中BiConsumer<Session, String> onMessage 处理客户端发过来的消息。
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.DeploymentException;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import com.ajaxjs.util.ThreadUtil;
import com.ajaxjs.util.logger.LogHelper;
/**
* WebSocket 客户端
*
* @author xinzhang
*
*/
@ClientEndpoint
public class WebSocketClient {
private static final LogHelper LOGGER = LogHelper.getLog(WebSocketClient.class);
protected WebSocketContainer container;
protected Session userSession;
private String server;
/**
* 创建 WebSocket 客户端
*/
public WebSocketClient() {
container = ContainerProvider.getWebSocketContainer();
}
/**
* 连接 WebSocket 服务器
*
* @param server 服务器地址
*/
public void connect(String server) {
this.server = server;
connect();
}
/**
* 连接 WebSocket 服务器
*/
public void connect() {
try {
userSession = container.connectToServer(this, new URI(server));
} catch (DeploymentException | URISyntaxException | IOException e) {
LOGGER.warning("WS 地址: " + server);
LOGGER.warning(e);
}
}
/**
* 发送信息
*
* @param msg 信息
* @throws IOException
*/
public void sendMessage(String msg) {
try {
userSession.getBasicRemote().sendText(msg);
} catch (IOException e) {
LOGGER.warning(e);
}
}
@OnOpen
public void onOpen(Session session) {
LOGGER.info("WebSocket Connected");
tryReconnect.set(false);
circlePing();
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
LOGGER.info("WebSocket 连接断开!");
if (end.get())
return;
needReconnect();
}
private BiConsumer<Session, String> onMessage;
/**
* 有消息推到的时候触发
*
* @param session
* @param msg
*/
@OnMessage
public void onMessage(Session session, String msg) {
LOGGER.info(msg);
if (onMessage != null)
onMessage.accept(session, msg);
}
/**
* 需要ping标识
*/
private AtomicBoolean needPing = new AtomicBoolean(true);
/**
* 尝试重连标识
*/
private AtomicBoolean tryReconnect = new AtomicBoolean(false);
/**
* 重连次数
*/
private AtomicInteger reConnectTimes = new AtomicInteger(0);
/**
* 连接结束标识
*/
private AtomicBoolean end = new AtomicBoolean(false);
private static ByteBuffer PING_PAYLOAD = null;
public void circlePing() {
if (PING_PAYLOAD == null)
PING_PAYLOAD = ByteBuffer.wrap("Ping".getBytes());
new Thread(() -> {
while (needPing.get()) {
if (userSession != null && userSession.isOpen())
try {
userSession.getBasicRemote().sendPing(PING_PAYLOAD);
} catch (IllegalArgumentException | IOException e) {
LOGGER.warning(e);
}
ThreadUtil.sleep(5, TimeUnit.SECONDS);
}
LOGGER.warning("[]Ping循环关闭");
}).start();
}
/**
* 重新连接
*/
private void needReconnect() {
ThreadUtil.sleep(3);
int cul = reConnectTimes.incrementAndGet();
if (cul > 3) {
disconnect();// close("real stop");
throw new NullPointerException("服务端断连,3次重连均失败");
}
LOGGER.warning("[{0}]第[{1}]次断开重连", cul);
if (tryReconnect.get()) {
LOGGER.warning("第[{0}]次断开重连结果 -> 连接正在重连,本次重连请求放弃", cul);
needReconnect();
return;
}
try {
tryReconnect.set(true);
if (userSession != null && userSession.isOpen()) {
LOGGER.warning("[第[{0}]次断开重连,关闭旧连接", cul);
disconnect();
}
container = ContainerProvider.getWebSocketContainer();
connect();
} catch (Exception exception) {
LOGGER.warning("[第[{0}]次断开重连结果 -> 连接正在重连,重连异常:[{1}]", cul, exception.getMessage());
needReconnect();
} finally {
tryReconnect.set(false);
}
}
/**
* 关闭链接
*/
public void disconnect() {
try {
userSession.close();
} catch (IOException e) {
LOGGER.warning(e);
}
}
public BiConsumer<Session, String> getOnMessage() {
return onMessage;
}
public void setOnMessage(BiConsumer<Session, String> onMessage) {
this.onMessage = onMessage;
}
}
发送消息给客户端是执行 sendMessage(String msg) 发送信息。这里我们使用了 userSession.getBasicRemote().sendText(msg) 同步的方法,而系统还提供了异步的方法 session.getAsyncRemote().sendText(msg) ,它们的区别在于:
getAsyncRemote() 和getBasicRemote() 确实是异步与同步的区别,大部分情况下,推荐使用getAsyncRemote() 。……由于同步特性,第二行的消息必须等待第一行的发送完成才能进行,而第一行的剩余部分消息要等第二行发送完才能继续发送,所以在第二行会抛出IllegalStateException 异常。如果要使用getBasicRemote() 同步发送消息,则避免尽量一次发送全部消息,使用部分消息来发送。出处
需要注意的问题
心跳机制
开始时候,发现 WebSocket 每隔一定时间会自动断开连接,搜了很多博客都说设置一下 Nginx的 proxy_read_timeout ,的确修改可以解决此问题。但是这个时间过长会影响服务器性能,于是可以改用心跳的机制,告诉服务端此连接一直保持有效,不要断开我。
前端加入心跳的方法参见这里, Java 客户端的参见这里。
所谓心跳,就是利用 WebSocket 协议中 Ping 的方法,隔一定时间发消息给服务端,保持住连接。
加上心跳后,如果仍然掉线,那么就要考虑不是 Nginx 问题所导致的,就要考虑具体是什么原因导致断线的。WebSocket 断开的原因有很多,最好在 WebSocket 断开时,将错误打印出来。典型的原因有网络波动,服务端断连的情况,会导致客户端被动断开连接。
@OnClose
public void onClose(Session session, CloseReason reason) {
LOGGER.info("WebSocket 连接断开!code: {0}, reson: {1}", reason.getCloseCode(), reason.getReasonPhrase());
}
具体 code 的含义如下表。
心跳机制的另外一个含义就是告诉双方彼此是否还连接着,所以不但客户端可以对服务端进行心跳,而且反过来,服务端也可以对客户端发心跳。客户端定时发心跳检测,服务端收到心跳检测就回复一个数据包,如果客户端超时未收到回复的心跳包,就可以认为已经离线了。服务端的检测也是类似,只不过服务端可以直接发送 ping 帧,如果超时未收到 pong 帧就可以认为客户端已经断线了。
自动重连
WebSocket 当前貌似没有一种断开自动重新连接的机制,得自己写。其实之前介绍心跳的文章中就包含了自动重连逻辑,特别是 Java 的重连方法,考虑比较周到。
缓冲区问题
WebSocket定时发送 sendPing() 后,还会反复出现接收/发送几个请求就断开连接的情况
原因分析:
无论是作服务端还是客户端,发现每次都是接收到同一个请求的信息后连接就断开了,经过反复的摸索发现,是由于接收到的这个请求传输的数据量过大,超出了 WebSocket 会话接收信息的缓冲区的大小(可使用session.getMaxTextMessageBufferSize() 查看缓冲大小,默认为8192),引起的 WebSocket 连接的异常断开。出处
解决方法:重新设置 WebSocket 缓冲区大小,
int maxSize = 200 * 1024;
session.setMaxBinaryMessageBufferSize(maxSize);
session.setMaxTextMessageBufferSize(maxSize);
用线程池解决大批量消息
服务端/客户端接收到客户端/服务端一次性发来的几百条或更多的请求,瞬间都堆积在会话的缓冲区,又没做多线程处理,并且每接收到一条请求还要查询阿里云服务器数据库,加上网络带宽过小,处理一条请求就要花费几十秒;导致线程队列严重堵塞,无法及时响应处理后续的其他请求。
解决方法:使用了线程池开启多条线程同时进行处理
private static ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
@OnMessage
public void onMessage(String datas,Session session) {
Runnable t = new Runnable() {
@Override
public void run() {
}
};
fixedThreadPool.submit(t);
}
注意,要给 session 加上同步锁,否则会出现多个线程同时往同一个 session 写数据,导致报错的情况。
public void send(String data) throws Exception {
synchronized (session) {
session.getBasicRemote().sendText(data);
}
}
最后推荐两篇关于 WebSocket 的优秀资源:
|