1. 引言
最近遇到一个生活场景,需要把消息队列里的故障消息,推送给PC客户端,并在客户端主动语音播报。 这个功能涉及语音合成和通知推送,对于通知推送使用了WebSocket ,下面来记录下。
2. WebSocket使用步骤
SpringBoot 集成WebSocket
2.1 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2.2 创建WebSocket配置类
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
2.3 WebSocket服务类
@Slf4j
@ServerEndpoint("/websocket/{userId}")
@Component
public class WebSocketServer {
private static int onlineCount = 0;
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet =
new CopyOnWriteArraySet<WebSocketServer>();
private Session session;
private String userId="";
public static String baseUrl = "http://118.89.68.236:9000";
@OnOpen
public void onOpen(Session session,@PathParam("userId") String userId) {
this.session = session;
webSocketSet.add(this);
addOnlineCount();
log.info("有新窗口开始监听:"+userId+",当前在线人数为" + getOnlineCount());
this.userId = userId;
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("websocket IO异常");
}
}
@OnClose
public void onClose() {
webSocketSet.remove(this);
subOnlineCount();
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到来自窗口"+userId+"的信息:"+message);
for (WebSocketServer item : webSocketSet) {
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public void sendAudio(byte[] data) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(data);
this.session.getBasicRemote().sendBinary(buffer);
}
public static void sendToAudio(String message){
InputStream inputStream = null;
for (WebSocketServer item : webSocketSet) {
try {
inputStream = sendGet(message);
byte[] bytes = IOUtils.toByteArray(inputStream);
item.sendAudio(bytes);
} catch (Exception e) {
}
}
}
public static void sendInfo(String message,@PathParam("userId") String userId){
log.info("推送消息到窗口"+userId+",推送内容:"+message);
for (WebSocketServer item : webSocketSet) {
try {
if(userId==null) {
item.sendMessage(message);
}else if(item.userId.equals(userId)){
item.sendMessage(message);
}
} catch (IOException e) {
continue;
}
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
public static InputStream sendGet(String message) {
InputStream inputStream = null;
try {
String url = encode(baseUrl,message);
URL serverUrl = new URL(url);
URLConnection connection = serverUrl.openConnection();
connection.setConnectTimeout(5000);
connection.setReadTimeout(15000);
connection.setRequestProperty("accept", "*/*");
connection.setRequestProperty("connection", "Keep-Alive");
connection.connect();
inputStream = connection.getInputStream();
} catch (Exception e){
e.printStackTrace();
} finally {
}
return inputStream;
}
public static String encode(String baseUrl, String message) throws UnsupportedEncodingException {
String title = "format: yaml\nmode: mspk\naudio: 14\nspeaker: Aida\nvocoder: melgan";
String encodeContent = URLEncoder.encode(message, "UTF-8");
String encodeTitle = URLEncoder.encode(title, "UTF-8");
String result = baseUrl + "/synthesize?text=" + encodeContent + "&kwargs=" + encodeTitle;
log.info("request path : {}",result);
return result;
}
}
对于上面根据文字产生音频文件的两个方法:sendGet() 、encode() 。返回的是文件流,对于的服务使用的是ttskit ,可以去GitHub 搜一下。
通过使用IOUtils.toByteArray(inputStream) 方法将InputStream 转为byte[] 数组,使用的commons-io 包中的工具类。
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
最后就可以在业务代码中,调用WebSocket 服务方法了。此处使用定时任务模拟消息推送。
@Slf4j
@Configuration
@EnableScheduling
public class TTSTask {
public static int i = 1;
@Scheduled(cron = "0/30 * * * * ?")
private void configureTasks() {
log.info("执行静态定时任务时间: " + LocalDateTime.now());
String message = "收到一条故障信息,南京市江北新区高新路"+ i + "号杆塔故障";
log.info("播报信息“{}",message);
WebSocketServer.sendToAudio(message);
i++;
}
}
2.4 前端页面
<!DOCTYPE html>
<html lang="en" xmlns:th="http://www.thymeleaf.org">
<head>
<meta charset="UTF-8">
<title>语音自动播报</title>
<script type="text/javascript" src="/js/jquery.js" th:src="@{/js/jquery.js}"></script>
</head>
<body>
<header>
<h1 align="center">语音自动播报</h1>
</header>
<div>
<input type="button" id="btnConnection" value="连接" />
<input type="button" id="btnClose" value="关闭" />
</div>
<div align="center" style="margin-top:10px">
<audio id="audio" controls autoplay hidden></audio>
</div>
<script type="text/javascript">
$(function() {
var socket;
if(typeof(WebSocket) == "undefined") {
alert("您的浏览器不支持WebSocket");
return;
}
$("#btnConnection").click(function() {
socket = new WebSocket("ws://127.00.1:8890/websocket/88888");
socket.onopen = function() {
console.log("Socket 已打开");
};
socket.onmessage = function(msg) {
console.log("获得消息:",msg);
console.log("获得消息:",msg.data);
q('#audio').src = URL.createObjectURL(msg.data)
q('#audio').hidden = false
};
socket.onclose = function() {
console.log("Socket已关闭");
};
socket.onerror = function() {
console.log("发生了错误");
}
});
$("#btnClose").click(function() {
socket.close();
});
});
function q(selector) {
return document.querySelector(selector)
}
</script>
</body>
</html>
这里只说一点,对于 socket.onmessage 方法的回调结果,对于byte[] 数组,使用blob 接收的,对于前端audio 标签可以直接使用
|