一、微信小程序实现WebSocket客户端程序
1. 界面实现
<input name="url" value="{{url}}" bindinput ="urlInput"/>
<button size='mini' type="warn">断开连接</button>
<button size='mini' type="primary" bindtap="connectSocket">开启连接</button>
<textarea placeholder="输入发送内容" bindinput ="msgInput"></textarea>
<button size='mini' type="primary" bindtap="sendMsg">发送</button>
<view wx:for="{{msgs}}">{{index}}: {{item}}</view>
界面效果: 
2. WXS部分
数据部分包含三个字段: url:字符串类型,代表WebSocket服务器地址; msgs:数组类型,用于存储发送和接收的消息; msg:字符串类型,用于保存发送的内容;
另外还定义了三个函数: connectSocket:提供了连接WebSocket服务的功能; msgInput:获取用户输入的内容; sendMsg:实现了向远程WebSocket发送消息的功能;
Page({
data: {
url: 'ws://localhost:8888/ws',
msgs: [],
msg: '',
}
connectSocket() {
let _this = this;
let task = wx.connectSocket({
url: _this.data.url
});
task.onMessage(function(res) {
_this.setData({
msgs: [..._this.data.msgs, "接收到消息 -> " + res.data]
});
});
_this.setData({
socketTask: task,
msgs: [..._this.data.msgs,"连接成功!"]
});
},
msgInput(e) {
this.setData({
msg: e.detail.value
});
},
sendMsg() {
let msg = this.data.msg;
this.data.socketTask.send({
data: msg
});
}
})
二、Netty实现WebSocket服务端程序
在从HTTP或HTTPS协议切换到WebSocket时,将会使用一种称为升级握手的机制。因此我们的WebSocket服务端程序将始终以HTTP作为开始,然后再执行升级。其约定为:如果被请求的URL以/ws 结尾,那么我们将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP。当连接升级完毕后,所有数据都将会使用WebSocket进行传输(如下图)。

1. 新建一个Maven工程,并引入Netty依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.netty</groupId>
<artifactId>NettyWebSocket</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.48.Final</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. 自定义处理器
1)定义一个专门处理Http协议的处理器,当浏览器第一次连接时候会读取首页的html文件,并将html文件内容返回给浏览器展示。
package io.netty.websocket;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;
import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain()
.getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
ctx.fireChannelRead(request.retain());
return;
}
handleHttpRequest(ctx, request);
}
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (HttpHeaders.is100ContinueExpected(request)) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
ctx.write(new ChunkedNioFile(file.getChannel()));
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
}
2)定义一个处理器,负责处理所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,则所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不需要的ChannelHandler会被移除掉。
- WebSocket升级前的ChannelPipeline:

- WebSocket升级后的ChannelPipeline:

WebSocket升级完成后,WebSocketServerProtocolHandler会把HttpRequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,WebSocketServerProtocolHandler会移除任何不再被WebSocket连接所需要的ChannelHandler,其中包括 HttpObjectAggregator 和 HttpRequestHandler。
实现代码:
package io.netty.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup channelGroup;
public TextWebSocketFrameHandler(ChannelGroup channelGroup) {
this.channelGroup = channelGroup;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
String content = "Client " + ctx.channel().remoteAddress().toString().substring(1) + " joined";
System.out.println(content);
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
ctx.pipeline().remove(HttpRequestHandler.class);
TextWebSocketFrame msg = new TextWebSocketFrame(content);
channelGroup.writeAndFlush(msg);
channelGroup.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame msg) throws Exception {
System.out.println("读取到的消息:" + msg.retain());
channelGroup.writeAndFlush(msg.retain());
}
}
上面userEventTriggered 方法监听用户事件。当有客户端连接时候,会自动执行该方法。而channelRead0 方法负责读取客户端发送过来的消息,然后通过channelGroup 将消息输出到所有已连接的客户端。
3. 定义初始化器
定义一个ChannelInitializer的子类,其主要目的是在某个 Channel 注册到 EventLoop 后,对这个 Channel 执行一些初始化操作。
package io.netty.websocket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup channelGroup;
public ChatServerInitializer(ChannelGroup channelGroup) {
this.channelGroup = channelGroup;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new HttpRequestHandler("/ws"));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new TextWebSocketFrameHandler(channelGroup));
}
}
4. 创建启动类
package io.netty.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.net.InetSocketAddress;
public class ChatServer {
public void start() {
ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChatServerInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8888)).syncUninterruptibly();
System.out.println("Starting ChatServer on port 8888 ...");
future.channel().closeFuture().syncUninterruptibly();
} finally {
channelGroup.close();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new ChatServer().start();
}
}
5. 编写一个html文件
该html文件提供网页版的WebSocket客户端页面。在src/main/resources 目录下新建一个html文件。
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<form οnsubmit="return false;">
<h3>WebSocket 聊天室:</h3>
<textarea id="responseText" style="width: 500px; height: 300px;"></textarea><br/>
<input type="text" name="message" style="width: 300px" value="Hello Netty"/>
<input type="button" value="发送消息" onclick="send(this.form.message.value)"/>
<input type="button" value="清空聊天记录" onclick="clearScreen()"/>
</form>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8888/ws");
socket.onopen = function(event) {
var ta = document.getElementById('responseText');
ta.value = "连接开启!";
};
socket.onclose = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + "连接被关闭!";
};
socket.onmessage = function(event) {
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data;
};
} else {
alert("你的浏览器不支持 WebSocket!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
} else {
alert("连接没有开启.");
}
}
function clearScreen() {
document.getElementById('responseText').value = "";
}
</script>
</body>
</html>
界面效果:  最终效果: 
|