一般的情况网上说的使用channelFuture.channel().closeFuture().sync();然后在finally中
if(channelFuture!=null){ ??????????????? if(channelFuture.channel()!=null && channelFuture.channel().isOpen()){ ??????????????????? channelFuture.channel().close(); ??????????????? } ??????????????? System.out.println("重新连接"); ??????????????? startClient(ip,port,message); ??????????? } 这样重新连接,这样虽然可以重新连接,但是正常连接,就走不下去了,这个不太好
下面的方法虽然解决了同步问题,但是在生产环境应该还是不能使用,原因是生产换进更不能使用那么多的任务进行定时进行重新连接
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; import io.netty.channel.nio.NioEventLoopGroup;
public class ConnectionListener implements ChannelFutureListener{
??? private MyNettyClient1 myNettyClient1; ?? ? ??? public ConnectionListener(MyNettyClient1 myNettyClient1){ ??????? this.myNettyClient1 = myNettyClient1; ??? } ?? ? ??? @Override ??? public void operationComplete(ChannelFuture channelFuture) throws Exception { ??????? if(!channelFuture.isSuccess()){ ??????????? EventLoop eventLoopGroup = channelFuture.channel().eventLoop(); ??????????? eventLoopGroup.schedule(new Runnable() { ?????????????? ? ??????????????? @Override ??????????????? public void run() { ??????????????????? System.out.println("ConnectionListenerConnectionListenerConnectionListener"); ??????????????????? myNettyClient1.startClient(myNettyClient1.getIp(), myNettyClient1.getPort(), "发送信息"); ??????????????? } ??????????? }, 5, TimeUnit.SECONDS); ??????? }else{ ??????????? System.out.println(" ConnectionListener 服务器里连接OK"); ??????? } ?????? ? ??? }
}
package com.wang.wzrtunetty.client;
import static org.hamcrest.CoreMatchers.nullValue;
import java.nio.charset.StandardCharsets; import java.util.Timer;
import com.wang.wzrtunetty.Test1InHandler; import com.wang.wzrtunetty.Test1OutHandler; import com.wang.wzrtunetty.Test1SingleGroupNultiIpPort;
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;
public class MyNettyClient1 {
??? Bootstrap bootstrap; ??? ChannelFuture channelFuture = null; ??? String ip; ??? int port; ??? String message; ??? Timer timer = null; ?? ?
??? public void startClient(String ip, final int port,String message) { ??????? try { ??????????? this.ip = ip; ??????????? this.port = port; ??????????? this.message = message;
??????????? bootstrap.handler(new ChannelInitializer<Channel>() {
??????????????? @Override ??????????????? protected void initChannel(Channel channel) throws Exception { ??????????????????? channel.pipeline().addLast(new Test3InHandler(MyNettyClient1.this)); ??????????????????? channel.pipeline().addLast(new Test04OutHandler()); ??????????????? } ??????????? }).option(ChannelOption.TCP_NODELAY, true); ?????????? ? ?????????? ? ??????????? channelFuture =? bootstrap.connect(ip, port).sync(); ?????????? ?
??????????? ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer(); ??????????? byteBuf.writeBytes(message.getBytes(StandardCharsets.UTF_8)); ??????????? System.out.println("channelFuture.channel().getClass().getName()="+channelFuture.channel().getClass().getName()); ??????????? channelFuture.channel().writeAndFlush(byteBuf); ?????????? ? ??????????? System.out.println("发送完毕"+port); ??????????? channelFuture.addListener(new ConnectionListener(this)); //??????????? channelFuture.addListener(new ConnectionListener(MyNettyClient1.this) { //?????????????? ? //??????????????? @Override //??????????????? public void operationComplete(ChannelFuture arg0) throws Exception { //??????????????????? System.out.println("发送成功"+port); //??????????????? } //??????????? }); ??????????? System.out.println("发送完毕222"+port); ??????? } catch (Exception e) { ??????????? e.printStackTrace();
??????? }finally { //??????????? if(channelFuture!=null){ //??????????????? if(channelFuture.channel()!=null && channelFuture.channel().isOpen()){ //??????????????????? channelFuture.channel().close(); //??????????????? } //??????????????? System.out.println("重新连接"); //??????????????? startClient(ip,port,message); //??????????? } ?????????? ? ??????? }
??? } ?? ? ??? public Bootstrap getBootstrap() { ??????? return bootstrap; ??? }
??? public void setBootstrap(Bootstrap bootstrap) { ??????? this.bootstrap = bootstrap; ??? }
??? public String getIp() { ??????? return ip; ??? }
??? public void setIp(String ip) { ??????? this.ip = ip; ??? }
??? public int getPort() { ??????? return port; ??? }
??? public void setPort(int port) { ??????? this.port = port; ??? }
??? public Timer getTimer() { ??????? return timer; ??? }
??? public void setTimer(Timer timer) { ??????? this.timer = timer; ??? }
??? public static void main(String[] args) { ??????? NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(); ??????? Bootstrap bootstrap = new Bootstrap(); ?????? ? ??????? bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class); ?????? ? ??????? MyNettyClient1 myNettyClient1 = new MyNettyClient1(); ??????? myNettyClient1.setBootstrap(bootstrap); ??????? myNettyClient1.startClient("192.168.1.31", 8087, "aaaaMyNettyClient1"); ??? } }
import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.EventLoop;
public class Test3InHandler extends ChannelInboundHandlerAdapter{
??? private MyNettyClient1 myNettyClient1; ??? private Timer timer;//为什么不用这个timer,而必须使用client.Timer,因为每一个新连接就是一个新的线程,原来的timer就不能清除了,每次进入active就是新的timer ?? ? ??? public Test3InHandler(){ ?????? ? ??? } ?? ? ??? public Test3InHandler(MyNettyClient1 myNettyClient1) { ??????? this.myNettyClient1 = myNettyClient1; ??? } ?? ? ??? @Override ??? public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ??????? System.out.println("Test3InHandler? channelRead"); //??????? super.channelRead(ctx, msg); ??????? ctx.writeAndFlush(msg); ??? } ?? ? ??? @Override ??? public void channelActive(ChannelHandlerContext ctx) throws Exception { ??????? System.out.println("Test3InHandler? channelActive");
??????? if(myNettyClient1.getTimer()!=null){ ??????????? myNettyClient1.getTimer().cancel(); ??????????? myNettyClient1.setTimer(null);; ??????????? System.out.println("清空定时器"); ??????? } ??????? super.channelActive(ctx); ??? } ?? ? ??? @Override ??? public void channelInactive(ChannelHandlerContext ctx) throws Exception { ???????? System.out.println("Test3InHandler? channelInactive"); ?????? //使用过程中断线重连 ???????? final EventLoop eventLoop = ctx.channel().eventLoop(); ??????? ? ???????? if(myNettyClient1.getTimer()!=null){ ???????????? myNettyClient1.getTimer().cancel(); ???????????? myNettyClient1.setTimer(null); ???????? } ???????? myNettyClient1.setTimer(new Timer()); ???????? myNettyClient1.getTimer().scheduleAtFixedRate(new TimerTask() { ?????????? ? ??????????? @Override ??????????? public void run() { ??????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler"); ??????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接"); ? ??????????? } ??????? }, 5 *1000, 5*1000); ??????? ? //???????? boolean flag = false; //???????? if(!flag){ //??????????? ? //???????????? eventLoop.scheduleAtFixedRate(new Runnable() { //??????????????? ? //???????????????? @Override //???????????????? public void run() { //???????????????????? if(!flag){ //???????????????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler"); //???????????????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接"); //???????????????????? }??????????????????? ? //???????????????? } //???????????? }, 5, 5, TimeUnit.SECONDS); //???????? } ??????? ? ??????? ? ??????? ? //???????? final ChannelFuture channelFuture =? ctx.channel().closeFuture(); //???????? if(!channelFuture.isSuccess()){//这个虽然能够多次连接,但是连接成功后,还是会走定时任务,不行,而且会把连接数弄死,曾经使用 //???????? timer,但是handler中的timer不行,必须使用client的timer //???????????? eventLoop.scheduleAtFixedRate(new Runnable() { //??????????????? ? //???????????????? @Override //???????????????? public void run() { //???????????????????? if(!channelFuture.isSuccess()){ //???????????????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler"); //???????????????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接"); //???????????????????? } //???????????????? } //???????????? }, 5, 5, TimeUnit.SECONDS); //???????? } ??????? ? //???????? eventLoop.schedule(new Runnable() { //???????????? @Override //???????????? public void run() { //???????????????? 这个只能走一次重新连接,如果连接不上就再也连接不了,不行 //???????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler"); //???????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接"); //???????????? } //???????? }, 1L, TimeUnit.SECONDS); ??????? super.channelInactive(ctx); ??? } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise;
public class Test04OutHandler extends ChannelOutboundHandlerAdapter { ?? ? ??? @Override ??? public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ??????? System.out.println("Test04OutHandler write"); ??????? super.write(ctx, msg, promise); ??? }
}
|