import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.stream.ChunkedWriteHandler;
import net.sf.json.JSONObject;
import java.net.URI;
public class WebSocketNettyClient {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
final ClientHandler handler =new ClientHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个http的编解码器
pipeline.addLast(new HttpClientCodec());
// 添加一个用于支持大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
pipeline.addLast(handler );
}
});
URI websocketURI = new URI("ws://localhost:5000");
HttpHeaders httpHeaders = new DefaultHttpHeaders();
//进行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders);
final Channel channel=bootstrap.connect(websocketURI.getHost(),websocketURI.getPort()).sync().channel();
handler.setHandshaker(handshaker);
handshaker.handshake(channel);
//阻塞等待是否握手成功
handler.handshakeFuture().sync();
System.out.println("握手成功");
//发送消息
JSONObject clientJson = new JSONObject();
clientJson.put("cmd","test");
channel.writeAndFlush(new TextWebSocketFrame(clientJson.toString()));
// 等待连接被关闭
channel.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
//客户端业务处理类
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketClientHandshaker handshaker;
ChannelPromise handshakeFuture;
/**
* 当客户端主动链接服务端的链接后,调用此方法
*
* @param channelHandlerContext ChannelHandlerContext
*/
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) {
System.out.println("客户端Active .....");
handlerAdded(channelHandlerContext);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.out.println("\n\t???????exception?????????\n" +
cause.getMessage());
ctx.close();
}
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
protected void channelRead0(ChannelHandlerContext ctx, Object o) throws Exception {
System.out.println("22222");
// 握手协议返回,设置结束握手
if (!this.handshaker.isHandshakeComplete()){
FullHttpResponse response = (FullHttpResponse)o;
this.handshaker.finishHandshake(ctx.channel(), response);
this.handshakeFuture.setSuccess();
System.out.println("WebSocketClientHandler::channelRead0 HandshakeComplete...");
return;
}
else if (o instanceof TextWebSocketFrame)
{
TextWebSocketFrame textFrame = (TextWebSocketFrame)o;
System.out.println("WebSocketClientHandler::channelRead0 textFrame: " + textFrame.text());
} else if (o instanceof CloseWebSocketFrame){
System.out.println("WebSocketClientHandler::channelRead0 CloseWebSocketFrame");
}
}
}
|