SSE是一种可以主动从服务端推送消息的技术。SSE的本质其实就是一个HTTP的长连接,只不过它给客户端发送的不是一次性的数据包,而是一个stream流,格式为text/event-stream。所以客户端不会关闭连接,会一直等着服务器发过来的新的数据流。
原理
- SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
- SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
- SSE 默认支持断线重连,WebSocket 需要自己实现。
- SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
- SSE 支持自定义发送的消息类型。
?应用场景
股票行情、新闻推送的这种只需要服务器发送消息给客户端场景中。
客户端实现
-
在浏览器端创建一个EventSource实例,向服务器发起连接 -
open:连接一旦建立,就会触发open事件,可以在onopen属性定义回调函数 -
message:客户端收到服务器发来的数据,就会触发message事件,可以在onmessage属性定义回调函数。 -
error:如果发生通信错误(如连接中断,服务器返回数据失败),就会触发error事件,可以在onerror属性定义回调函数。 -
close:用于关闭 SSE 连接。source.close(); -
自定义事件:EventSource规范允许服务器端执行自定义事件,客户端监听该事件即可,需要使用addEventListener
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>订阅消息</title>
<style>
.left-container {
float: left;
width: 350px;
min-height: 300px;
border-right: 3px solid #4b4b4b;
}
.left-container li{
text-overflow: ellipsis;
white-space: nowrap;
overflow: hidden;
}
.right-container{
padding-left: 30px;
float: left;
width: 350px;
}
</style>
</head>
<body>
<div class="left-container">
<label>订阅主题</label>
<input type="text" id="topic">
<button onclick="subscribe()">订阅</button>
<div>收到消息如下:</div>
<ul id="message"></ul>
</div>
<div class="right-container">
<div>
<label>消息主题</label>
<input type="text" id="pub_topic">
</div>
<div>
<label>消息内容</label>
<input type="text" id="pub_content">
</div>
<button onclick="publish()">发布</button>
<div>发布消息和内容如下:</div>
<ul id="pub_message"></ul>
</div>
<script>
function subscribe() {
let topic = document.getElementById('topic').value;
let url = location.origin + '/subscribe?topic=' + topic;
send(url, null, process);
}
//发送订阅消息
function send(url, data, callback) {
/* let xhr = new XMLHttpRequest();
xhr.onreadystatechange = function(){
if (xhr.readyState == 3 || xhr.readyState == 4){
if (callback) {
callback(xhr.responseText);
}
}
};
xhr.open('get', url, true);
xhr.send(data); */
if (callback) {
let eventSource = new EventSource(url);
eventSource.onmessage = function (e) {
//let temperature = JSON.parse(e.data);
process(e.data);
};
eventSource.onopen = function (e) {
process('Connection opened');
};
eventSource.onerror = function (e) {
process('Connection closed');
};
} else {
let xhr = new XMLHttpRequest();
xhr.onreadystatechange = function(){
};
xhr.open('get', url, true);
xhr.send(data);
}
}
let len = 0;
//处理订阅消息
function process(messsage) {
let li = document.createElement('li');
li.innerHTML = messsage;
//len = messsage.length;
let ul = document.getElementById('message');
ul.appendChild(li);
}
//发布消息
function publish() {
let topic = document.getElementById('pub_topic').value;
let content = document.getElementById('pub_content').value;
let url = location.origin + '/publish?topic=' + topic + '&content=' + content;
send(url, null, null);
let li = document.createElement('li');
li.innerHTML = `发布主题:${topic}; 发布内容:${content}`;
let ul = document.getElementById('pub_message');
ul.appendChild(li);
}
</script>
</body>
</html>
?服务器端实现
-
事件流的对应MIME格式为text/event-stream。 -
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本 -
服务端返回数据需要特殊的格式,分为四种消息类型,且消息的每个字段使用"\n"来做分割,
-
Event: 事件类型,支持自定义事件 -
Data: 发送的数据内容,如果数据很长,可以分成多行,用\n结尾,最后一行用\n\n结尾。 -
ID: 每一条事件流的ID,相当于每一条数据的编号, -
Retry:指定浏览器重新发起连接的时间间隔。在自动重连过程中,之前收到的最后一个ID会被发送到服务端。
//Controller
package com.example.ssedemo.controller;
import com.example.ssedemo.utils.ReqContextUtils;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@RestController
public class SubscribeController {
@RequestMapping(value = "/subscribe", method = RequestMethod.GET, produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe(HttpServletRequest req, HttpServletResponse res, @RequestParam("topic") String topic) {
return ReqContextUtils.addSubscrib(topic, req, res);
}
@RequestMapping("/publish")
public void publish(@RequestParam("topic") String topic, @RequestParam("content") String content) {
ReqContextUtils.publishMessage(topic, content);
}
}
//处理逻辑
package com.example.ssedemo.utils;
import com.google.gson.JsonObject;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
public class ReqContextUtils {
//超时时间
private static int DEFAULT_TIME_OUT = 60*60*1000;
//订阅列表,存储所有主题的订阅请求,每个topic对应一个ArrayList,ArrayList里该topic的所有订阅请求
private static HashMap<String, ArrayList<SseEmitter>> subscribeArray = new LinkedHashMap<>();
//添加订阅消息
public static SseEmitter addSubscrib(String topic, HttpServletRequest request, HttpServletResponse response) {
if (null == topic || "".equals(topic)) {
return null;
}
SseEmitter emitter = new SseEmitter();
ArrayList<SseEmitter> emitterList = subscribeArray.get(topic);
if (null == emitterList) {
emitterList = new ArrayList<SseEmitter>();
subscribeArray.put(topic, emitterList);
}
emitterList.add(emitter);
return emitter;
}
//获取订阅列表
public static ArrayList<SseEmitter> getSubscribList(String topic) {
return subscribeArray.get(topic);
}
//推送消息
public static void publishMessage(String topic, String content) {
ArrayList<SseEmitter> emitterList = subscribeArray.get(topic);
if (null != emitterList) {
for(SseEmitter emitter :emitterList) {
try {
//emitter.send(content);
emitter.send(SseEmitter.event().name("message").data(content));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
浏览器兼容性
?
|