前言
上一章实现了websocket传输文本信息,实际上网络传输的都是二进制0和1,因而也可以传输文件。
demo
实现websocket传输文件,使用上次的示例,client
package com.feng.socket.client;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
Object condition = new Object();
WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:8083/websocket/server/10001")) {
@Override
public void onOpen(ServerHandshake serverHandshake) {
System.out.println(serverHandshake.getHttpStatus() + " : " + serverHandshake.getHttpStatusMessage());
}
@Override
public void onMessage(String s) {
System.out.println("receive msg is " + s);
}
@Override
public void onMessage(ByteBuffer bytes) {
//To overwrite
byte mark = bytes.get(0);
if (mark == 2) {
synchronized (condition) {
condition.notify();
}
System.out.println("receive ack for file info");
} else if (mark == 6){
synchronized (condition) {
condition.notify();
}
System.out.println("receive ack for file end");
}
}
@Override
public void onClose(int i, String s, boolean b) {
System.out.println(s);
}
@Override
public void onError(Exception e) {
e.printStackTrace();
}
};
webSocketClient.connect();
while (!ReadyState.OPEN.equals(webSocketClient.getReadyState())) {
System.out.println("wait for connecting ...");
}
// webSocketClient.send("hello");
// Scanner scanner = new Scanner(System.in);
// while (scanner.hasNext()) {
// String line = scanner.next();
// webSocketClient.send(line);
// }
System.out.println("start websocket client...");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
if ("1".equals(scanner.next()))
sendFile(webSocketClient, condition);
}
}
public static void sendFile(WebSocketClient webSocketClient, Object condition){
new Thread(() -> {
try {
SeekableByteChannel byteChannel = Files.newByteChannel(Path.of("/Users/huahua/IdeaProjects/websocket-demo/websocket-demo/socket-client/src/main/resources/Thunder5.rar"),
new StandardOpenOption[]{StandardOpenOption.READ});
ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
byteBuffer.put((byte)1);
String info = "{\"fileName\": \"Thunder5.rar\", \"fileSize\":"+byteChannel.size()+"}";
byteBuffer.put(info.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
webSocketClient.send(byteBuffer);
byteBuffer.clear();
synchronized (condition) {
condition.wait();
}
byteBuffer.put((byte)3);
while (byteChannel.read(byteBuffer) > 0) {
byteBuffer.flip();
webSocketClient.send(byteBuffer);
byteBuffer.clear();
byteBuffer.put((byte)3);
}
byteBuffer.clear();
byteBuffer.put((byte)5);
byteBuffer.put("end".getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
webSocketClient.send(byteBuffer);
synchronized (condition) {
condition.wait();
}
byteChannel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
Server端使用Tomcat的websocket
package com.feng.socket.admin;
import com.fasterxml.jackson.databind.json.JsonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint("/websocket/server/{sessionId}")
public class SocketServer {
private static final Logger LOGGER = LoggerFactory.getLogger(SocketServer.class);
private static Map<String, Session> sessionMap = new ConcurrentHashMap<>();
private String sessionId = "";
private SeekableByteChannel byteChannel;
@OnOpen
public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
this.sessionId = sessionId;
sessionMap.put(sessionId, session);
LOGGER.info("new connect, sessionId is " + sessionId);
}
@OnClose
public void onClose() {
sessionMap.remove(sessionId);
LOGGER.info("close socket, the sessionId is " + sessionId);
}
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("--------- receive message: " + message);
}
@OnMessage
public void onMessage(ByteBuffer byteBuffer, Session session) throws IOException {
if (byteBuffer.limit() == 0) {
return;
}
byte mark = byteBuffer.get(0);
if (mark == 1) {
byteBuffer.get();
String info = new String(byteBuffer.array(),
byteBuffer.position(),
byteBuffer.limit() - byteBuffer.position());
FileInfo fileInfo = new JsonMapper().readValue(info, FileInfo.class);
byteChannel = Files.newByteChannel(Path.of("/Users/huahua/"+fileInfo.getFileName()),
new StandardOpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});
//ack
ByteBuffer buffer = ByteBuffer.allocate(4096);
buffer.put((byte) 2);
buffer.put("receive fileinfo".getBytes(StandardCharsets.UTF_8));
buffer.flip();
session.getBasicRemote().sendBinary(buffer);
} else if (mark == 3) {
byteBuffer.get();
byteChannel.write(byteBuffer);
} else if (mark == 5) {
//ack
ByteBuffer buffer = ByteBuffer.allocate(4096);
buffer.clear();
buffer.put((byte) 6);
buffer.put("receive end".getBytes(StandardCharsets.UTF_8));
buffer.flip();
session.getBasicRemote().sendBinary(buffer);
byteChannel.close();
byteChannel = null;
}
}
@OnError
public void onError(Session session, Throwable error) {
LOGGER.error(error.getMessage(), error);
}
public static void sendMessage(Session session, String message) throws IOException {
session.getBasicRemote().sendText(message);
}
public static Session getSession(String sessionId){
return sessionMap.get(sessionId);
}
}
实现思路
?
?经验分享
实际在使用过程中有2个问题
1. Tomcat的websocket默认最大只能发送8K的数据
根本原因是
org.apache.tomcat.websocket.WsSession
// Buffers
static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(
"org.apache.tomcat.websocket.DEFAULT_BUFFER_SIZE", 8 * 1024)
.intValue();
private volatile int maxBinaryMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
private volatile int maxTextMessageBufferSize = Constants.DEFAULT_BUFFER_SIZE;
通过系统变量,或者JVM -D参数可设置
org.apache.tomcat.websocket.DEFAULT_BUFFER_SIZE
2. json格式化问题,如果对象的属性有byte[]数组
fastjson和Jackson是使用Base64的方式处理的gson是真byte[]数组存储,只是字符串是包括的。
??
总结
实际上websocket是tcp上的双工协议,传输文件是没有问题的,只是需要定义应用层协议才行。如果使用Tomcat的websocket传输,注意传输内容大小。而且HTTP 2.0和HTTP 3.0 并不能使用websocket,尤其是http 3.0 UDP协议。
|