直接上代码
1.WebSocketClientHandler.class
package com.example.springbootnetty.hanlder;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketClientHandshaker handshaker;
private ChannelPromise handshakeFuture;
public WebSocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
handshaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("WebSocket Client disconnected!");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handshaker.isHandshakeComplete()) {
try {
handshaker.finishHandshake(ch, (FullHttpResponse) msg);
log.info("websocket client 连接成功");
handshakeFuture.setSuccess();
} catch (WebSocketHandshakeException e) {
log.info("websocket client 连接失败");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
log.info("websocket client 接收到的消息:{}",textFrame.text());
} else if (frame instanceof PongWebSocketFrame) {
log.info("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
log.info("websocket client关闭");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
log.error("业务处理错误,websocket client关闭");
}
}
2.WebsocketClient.class
package com.example.springbootnetty.client;
import com.example.springbootnetty.hanlder.WebSocketClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.net.URI;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class WebsocketClient {
private String url = "wss:/localhost/webSocketServer";
private Channel channel;
private final static WebsocketClient WEBSOCKET_CLIENT = new WebsocketClient();
private WebsocketClient() {
}
public static WebsocketClient getInstance() {
return WEBSOCKET_CLIENT;
}
private String URL = System.getProperty("url", url);
public void startClient() throws Exception {
log.info("websocket client 连接中....");
URI uri = new URI(URL);
String scheme = uri.getScheme() == null ? "ws" : uri.getScheme();
final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost();
final int port;
if (uri.getPort() == -1) {
if ("ws".equalsIgnoreCase(scheme)) {
port = 80;
} else if ("wss".equalsIgnoreCase(scheme)) {
port = 443;
} else {
port = -1;
}
} else {
port = uri.getPort();
}
if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) {
System.err.println("Only WS(S) is supported.");
return;
}
final boolean ssl = "wss".equalsIgnoreCase(scheme);
final SslContext sslCtx;
if (ssl) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
EventLoopGroup group = new NioEventLoopGroup();
try {
final WebSocketClientHandler handler = new WebSocketClientHandler(WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
}
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(8192),
WebSocketClientCompressionHandler.INSTANCE,
handler);
}
});
channel = b.connect(uri.getHost(), port).sync().channel();
handler.handshakeFuture().sync();
channel.writeAndFlush("发送的第一条消息");
channel.eventLoop().scheduleAtFixedRate(() -> {
channel.writeAndFlush(new TextWebSocketFrame("pong"));
}, 0, 15, TimeUnit.SECONDS);
channel.closeFuture().sync();
} catch (Exception e) {
channel.close();
log.error("websocket client 启动失败,失败原因:{}", e.getMessage());
} finally {
group.shutdownGracefully();
}
}
}
3.RunClient.class
package com.example.springbootnetty.client;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class RunClient {
@PostConstruct
public void run() {
try {
WebsocketClient.getInstance().startClient();
} catch (Exception e) {
log.error("websocket启动失败,失败原因:{}", e.getMessage());
}
}
}
项目地址
|