前言
最近做项目,需要双工通信,考虑http协议,但是是单向的通信,只能请求响应,不能从服务端推送,如果要服务器推送,方式有很多,http轮训,长轮训,websocket等,实际上tcp传输层是双向通信的,原始的socket就可以实现。现在最常用的是websocket,因为可以复用http的底层tcp连接,方便,当然http3使用udp通信,基于QUIC保证连接可靠。
Java socket方式
TCP是面向双向连接的协议,刚好符合条件。server端暴露端口用于连接,实现多方client收发器
package com.feng.socket.admin;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class SocketServer {
private static AtomicInteger sessionId = new AtomicInteger(10000);
private static Map<String, Socket> socketMap = new ConcurrentHashMap<>();
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20));
private ServerSocket serverSocket;
public SocketServer(int port) {
try {
this.serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
//server to client content
threadPoolExecutor.execute(new WriteRunnable());
try {
while (true) {
Socket socket = this.serverSocket.accept();
String id = sessionId.incrementAndGet()+"";
System.out.println("connect from client, the sessionId is :\t"+id);
socketMap.put(id, socket);
//client to server content
threadPoolExecutor.execute(new ReadRunnable(id));
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static Socket getSessionSocket(String sessionId){
return socketMap.get(sessionId);
}
public static void removeSessionSocket(String sessionId){
socketMap.remove(sessionId);
}
public static void main(String[] args) {
new SocketServer(8083).start();
}
}
其中收的线程
package com.feng.socket.admin;
import java.io.*;
import java.net.Socket;
public class ReadRunnable implements Runnable{
private String sessionId;
public ReadRunnable(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void run() {
try {
Socket socket = SocketServer.getSessionSocket(sessionId);
if (socket.isClosed()) {
System.out.println("client已断开连接------");
return;
}
InputStream in = socket.getInputStream();
Reader reader = new InputStreamReader(in);
BufferedReader br = new BufferedReader(reader);
while (true) {
if (socket.isClosed()) {
SocketServer.removeSessionSocket(sessionId);
System.out.println("client已断开连接------");
break;
}
try {
String line = br.readLine();
if (line == null) {
System.out.println("client已断开连接------");
break;
}
System.out.println(sessionId + ":" + line);
if ("shutdown".equalsIgnoreCase(line)) {
socket.close();
SocketServer.removeSessionSocket(sessionId);
System.out.println("client已断开连接------");
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
br.close();
reader.close();
in.close();
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
?发的方式,使用console输入
package com.feng.socket.admin;
import java.io.*;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class WriteRunnable implements Runnable{
Map<String, PrintWriter> cache = new ConcurrentHashMap<>();
@Override
public void run() {
String line = "";
PrintWriter pw;
try (InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(inputStreamReader)) {
while (true) {
if ((line = br.readLine()) != null) {
// 使用 sessionId:xxx 表示发送某个client,发送所有client广播就是迭代发送
String sessionId = line.substring(0,5);
String data = line.substring(6);
Socket socket = SocketServer.getSessionSocket(sessionId);
if (socket.isClosed()) {
System.out.println("server连接已断开-----");
}
if (cache.containsKey(sessionId)) {
pw = cache.get(sessionId);
} else {
try {
OutputStream out = socket.getOutputStream();
pw = new PrintWriter(out);
cache.put(sessionId, pw);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
pw.println(data);
pw.flush();
if ("shutdown".equalsIgnoreCase(data)) {
try {
cache.get(sessionId).close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
client端代码
package com.feng.socket.client;
import java.io.*;
import java.net.Socket;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) throws IOException {
final Socket socket = new Socket("192.168.79.104", 8083);
final Scanner scanner = new Scanner(System.in);
new Thread(new Runnable() {
@Override
public void run() {
String line = "";
try (InputStream in = socket.getInputStream();
Reader reader = new InputStreamReader(in);
BufferedReader br = new BufferedReader(reader)) {
while (true) {
if (socket.isClosed()) {
System.out.println("server断开连接==========");
break;
}
line = br.readLine();
if (line != null) {
System.out.println(line);
if ("shutdown".equalsIgnoreCase(line)) {
break;
}
} else {
System.out.println("server断开连接==========");
scanner.close();
break;
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
socket.close();
System.exit(0);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
String line;
try (OutputStream out = socket.getOutputStream();
PrintWriter pw = new PrintWriter(out)) {
while (true) {
if (socket.isClosed()) {
System.out.println("client断开连接==========");
break;
}
if (scanner.hasNext()) {
line = scanner.next();
pw.println(line);
pw.flush();
out.flush();
if ("shutdown".equalsIgnoreCase(line)) {
break;
}
}
}
if (!socket.isClosed()) {
socket.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}).start();
}
}
?相对比较简单,测试如下
实现了双工,简单模式BIO。
websocket协议
实际上websocket就是用来解决这个问题的一个标准,来自于Oracle官网JSR 356, Java API for WebSocket (oracle.com)
?目前的成熟方案是在javaee上实现协议升级,复用Tomcat等的底层容器的TCP连接。spring boot demo,配置bean,当然也可以使用Oracle官网的方式,这里springboot封装了。
package com.feng.socket.admin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter initServerEndpointExporter(){
return new ServerEndpointExporter();
}
}
写一个server端websocket连接收发器
package com.feng.socket.admin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint("/websocket/server/{sessionId}")
public class SocketServer {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketServer.class);
private static Map<String, Session> sessionMap = new ConcurrentHashMap<>();
private String sessionId = "";
@OnOpen
public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
this.sessionId = sessionId;
sessionMap.put(sessionId, session);
LOGGER.info("new connect, sessionId is " + sessionId);
}
@OnClose
public void onClose() {
sessionMap.remove(sessionId);
LOGGER.info("close socket, the sessionId is " + sessionId);
}
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("--------- receive message: " + message);
}
@OnError
public void onError(Session session, Throwable error) {
LOGGER.error(error.getMessage(), error);
}
public static void sendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(message);
}
public static Session getSession(String sessionId){
return sessionMap.get(sessionId);
}
}
写一个发送接口
package com.feng.socket.admin;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.websocket.Session;
import java.io.IOException;
@RestController
@SpringBootApplication
public class SocketMain {
public static void main(String[] args) {
SpringApplication.run(SocketMain.class, args);
}
@RequestMapping("/msg")
public String sendMsg(String sessionId, String msg) throws IOException {
Session session = SocketServer.getSession(sessionId);
SocketServer.sendMessage(session, msg);
return "send " + sessionId + " : " + msg;
}
}
在使用java-websocket写一个client端
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.3</version>
</dependency>
code
package com.feng.socket.client;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) throws URISyntaxException {
WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:8083/websocket/server/10001")) {
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println(serverHandshake.getHttpStatus() + " : " + serverHandshake.getHttpStatusMessage());
}
@Override
public void onMessage(String s) {
System.out.println("receive msg is " + s);
}
@Override
public void onClose(int i, String s, boolean b) {
System.out.println(s);
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
};
webSocketClient.connect();
while (!ReadyState.OPEN.equals(webSocketClient.getReadyState())) {
System.out.println("wait for connecting ...");
}
webSocketClient.send("hello");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String line = scanner.next();
webSocketClient.send(line);
}
System.out.println("start websocket client...");
}
}
实现服务端与client端双向发送,点对点传输。?
websocket协议原理
通过抓包,发现websocket实际上是http协议上的升级
?先client通过http协议发送协商,connection :Upgrade ,Upgrade的模式是websocket,按照这个理论,我们也可以自定义一种其他Upgrade的协议传输数据。
协商的key和版本
然后服务端返回协商的结果
?
协议达成,如果需要TLS加密传输还要证书传递等
?后面就会ping pong
?
总结
这里的双向通信,使用的TCP的特性,websocket也是在http的基础上升级而来,实际上现在的QUIC协议也可以实现在UDP上通信,就是HTTP3.0,就不再适合websocket了,估计会出现新的成熟技术。
|