1、maven 依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version>
</dependency>
2、Server 端
public class HttpServer {
private int port;
public HttpServer(int port) {
this.port = port;
}
public void bind() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new NettyServerChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 500)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(port).sync();
//关闭channel和块,直到它被关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerChannelInitializer
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("socketChoose",new SocketChooseHandle());
channel.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast("commonhandler",new WebSocketHandler());
}
}
SocketChooseHandle
public class SocketChooseHandle extends ByteToMessageDecoder {
/** 默认暗号长度为23 */
private static final int MAX_LENGTH = 23;
/** WebSocket握手的协议前缀 */
private static final String WEBSOCKET_PREFIX = "GET /";
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String protocol = getBufStart(in);
if (protocol.startsWith(WEBSOCKET_PREFIX)) {
ctx.pipeline().addBefore("commonhandler","http-codec",new HttpServerCodec());
// HttpObjectAggregator:将HTTP消息的多个部分合成一条完整的HTTP消息
ctx.pipeline().addBefore("commonhandler","aggregator",new HttpObjectAggregator(65535));
// ChunkedWriteHandler:向客户端发送HTML5文件,文件过大会将内存撑爆
ctx.pipeline().addBefore("commonhandler","http-chunked",new ChunkedWriteHandler());
ctx.pipeline().addBefore("commonhandler","WebSocketAggregator",new WebSocketFrameAggregator(65535));
//用于处理websocket, /ws为访问websocket时的uri
ctx.pipeline().addBefore("commonhandler","ProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
????????????// 此次要移除socket 相关的编码
ctx.pipeline().remove(StringDecoder.class);
ctx.pipeline().remove(StringEncoder.class);
}
in.resetReaderIndex();
ctx.pipeline().remove(this.getClass());
}
private String getBufStart(ByteBuf in){
int length = in.readableBytes();
if (length > MAX_LENGTH) {
length = MAX_LENGTH;
}
// 标记读位置
in.markReaderIndex();
byte[] content = new byte[length];
in.readBytes(content);
return new String(content);
}
}
WebSocketHandler
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private static Map<String, Channel> map = new ConcurrentHashMap<>();
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object object) throws Exception {
String msg = "";
if (object instanceof TextWebSocketFrame){
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame)object;
msg = textWebSocketFrame.text();
}else {
msg = String.valueOf(object);
}
System.out.println("msg:"+msg);
for (String key: map.keySet()) {
if (key.equals(ctx.channel().id().toString())){
continue;
}
Channel channel = map.get(key);
ChannelFuture channelFuture = channel.writeAndFlush(msg);
if (!channelFuture.isSuccess()) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded::"+ctx.channel().id().asLongText());
InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
String hostAddress = socketAddress.getAddress().getHostAddress();
logger.info("IP:{}",hostAddress);
String clientId = ctx.channel().id().toString();
map.put(clientId,ctx.channel());
logger.info("map:{}",map.toString());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved::"+ctx.channel().id().asLongText());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("exceptionCaught::"+cause.getMessage());
String clientId = ctx.channel().id().toString();
map.remove(clientId);
ctx.close();
}
}
最后通过spring 的CommandLineRunner 实现服务启动
@Component
@Order(1)
public class TaskRunner implements CommandLineRunner {
private HttpServer httpServer = new HttpServer(10050);
@Override
public void run(String... args) throws Exception {
httpServer.bind();
}
}
3、Client 端
? ? ? ? 3.1 websocket client
? ? ? ? ? ? ? ? hello.html代码如下 直接游览器允许
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var webSocket;
if (window.WebSocket) {
webSocket = new WebSocket("ws://localhost:10050/ws");
webSocket.onmessage = function (ev) {
console.log("=====");
var res = document.getElementById("response");
res.value = res.value + "\n" + ev.data;
}
webSocket.onopen = function (ev) {
var req = document.getElementById("response");
req.value = "链接开启...."+"\n";
}
webSocket.onclose = function (ev) {
var req = document.getElementById("response");
req.value += "链接关闭...."+"\n";
}
function sendMessage(msg) {
//alert("send?");
if (!window.WebSocket){
return;
}
if (webSocket.readyState === WebSocket.OPEN) {
//alert("send!");
webSocket.send(msg);
document.getElementById("requestComment").value='';
}else {
alert("链接未开启!");
}
}
} else {
alert("浏览器不支持websocket")
}
</script>
<form οnsubmit="false">
<textarea style="height: 300px;width: 300px" id="requestComment" name="reqestcom"></textarea>
<button type="button" οnclick="sendMessage(this.form.reqestcom.value)">发送</button>
<textarea style="height: 300px;width: 300px" id="response"></textarea>
<button type="button" οnclick="document.getElementById('response').value=''">清空</button>
</form>
</body>
</html>
? ? ? ? 3.2 socket client
public class TestClient {
private final String host;
private final int port;
public TestClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(bossGroup).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
channel.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
future.channel().writeAndFlush(msg);
}
}finally {
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception {
new TestClient("127.0.0.1",10050).run();
}
}
ClientHandler
/**
* @author xiaocg
*/
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s.trim());
}
}
4、最终测试截图
websocket 端发送
socket client 接收
socket client 发送
websocket 接收
代码工程详见
https://gitee.com/xiaochangg/study.git?下的data-transfer 工程
|