引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
配置WebSocketConfig
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
核心类,sid为推送服务标识
@Component
@ServerEndpoint("/websocket/{sid}")
@Slf4j
public class WebSocketServer {
//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
private static int onlineCount = 0;
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
//接收sid
private String sid="";
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session,@PathParam("sid") String sid) {
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
log.info("有服务开始监听:"+sid+",当前在线数为" + getOnlineCount());
this.sid=sid;
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
//log.info("收到来自窗口"+sid+"的信息:"+message);
if("heart".equals(message)){
try {
sendMessage("heartOk");
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
* */
public static void sendInfo(String message,String sid) {
for (WebSocketServer item : webSocketSet) {
try {
//为null则全部推送
if(sid==null) {
item.sendMessage(message);
log.info("推送消息到"+item.sid+",推送内容:"+message);
}else if(item.sid.equals(sid)){
item.sendMessage(message);
log.info("推送消息到"+item.sid+",推送内容:"+message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
创建通用返回类以及User类模拟传输数据
@Data
public class Result {
private Integer code;
private String msg;
private Object data;
}
@Data
@AllArgsConstructor
public class User {
private Integer id;
private String name;
private String psw;
}
创建定时任务模拟接受mq或者其他消息需要推送的场景此处10秒传递一次
@Component
public class TimeTask {
/**
* @throws Exception
*/
//10秒传递一次
@Scheduled(cron="*/10 * * * * ? ")
public void JqcaseSearch() {
try {
Result result = new Result();
result.setCode(200);
List<User> userList = Arrays.asList(new User(1,"张三","12345")
, new User(2,"李四","54321"));
result.setData(userList);
result.setMsg("查询成功!");
String resultJson= new ObjectMapper().writeValueAsString(result);
WebSocketServer.sendInfo(resultJson,"111");
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动项目,使用websocket测试网站测试,会写前端的可以自行写。
websocket在线测试
成功连接拿到正确数据,此处使用111连接使用其他id则拿不到数据!
|