IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> netty 客户端重新连接 -> 正文阅读

[Java知识库]netty 客户端重新连接

一般的情况网上说的使用channelFuture.channel().closeFuture().sync();然后在finally中

if(channelFuture!=null){
??????????????? if(channelFuture.channel()!=null && channelFuture.channel().isOpen()){
??????????????????? channelFuture.channel().close();
??????????????? }
??????????????? System.out.println("重新连接");
??????????????? startClient(ip,port,message);
??????????? } 这样重新连接,这样虽然可以重新连接,但是正常连接,就走不下去了,这个不太好

下面的方法虽然解决了同步问题,但是在生产环境应该还是不能使用,原因是生产换进更不能使用那么多的任务进行定时进行重新连接


import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;
import io.netty.channel.nio.NioEventLoopGroup;

public class ConnectionListener implements ChannelFutureListener{

??? private MyNettyClient1 myNettyClient1;
?? ?
??? public ConnectionListener(MyNettyClient1 myNettyClient1){
??????? this.myNettyClient1 = myNettyClient1;
??? }
?? ?
??? @Override
??? public void operationComplete(ChannelFuture channelFuture) throws Exception {
??????? if(!channelFuture.isSuccess()){
??????????? EventLoop eventLoopGroup = channelFuture.channel().eventLoop();
??????????? eventLoopGroup.schedule(new Runnable() {
?????????????? ?
??????????????? @Override
??????????????? public void run() {
??????????????????? System.out.println("ConnectionListenerConnectionListenerConnectionListener");
??????????????????? myNettyClient1.startClient(myNettyClient1.getIp(), myNettyClient1.getPort(), "发送信息");
??????????????? }
??????????? }, 5, TimeUnit.SECONDS);
??????? }else{
??????????? System.out.println(" ConnectionListener 服务器里连接OK");
??????? }
?????? ?
??? }

}

package com.wang.wzrtunetty.client;

import static org.hamcrest.CoreMatchers.nullValue;

import java.nio.charset.StandardCharsets;
import java.util.Timer;

import com.wang.wzrtunetty.Test1InHandler;
import com.wang.wzrtunetty.Test1OutHandler;
import com.wang.wzrtunetty.Test1SingleGroupNultiIpPort;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;


public class MyNettyClient1 {

??? Bootstrap bootstrap;
??? ChannelFuture channelFuture = null;
??? String ip;
??? int port;
??? String message;
??? Timer timer = null;
?? ?

??? public void startClient(String ip, final int port,String message) {
??????? try {
??????????? this.ip = ip;
??????????? this.port = port;
??????????? this.message = message;

??????????? bootstrap.handler(new ChannelInitializer<Channel>() {

??????????????? @Override
??????????????? protected void initChannel(Channel channel) throws Exception {
??????????????????? channel.pipeline().addLast(new Test3InHandler(MyNettyClient1.this));
??????????????????? channel.pipeline().addLast(new Test04OutHandler());
??????????????? }
??????????? }).option(ChannelOption.TCP_NODELAY, true);
?????????? ?
?????????? ?
??????????? channelFuture =? bootstrap.connect(ip, port).sync();
?????????? ?

??????????? ByteBuf byteBuf = UnpooledByteBufAllocator.DEFAULT.buffer();
??????????? byteBuf.writeBytes(message.getBytes(StandardCharsets.UTF_8));
??????????? System.out.println("channelFuture.channel().getClass().getName()="+channelFuture.channel().getClass().getName());
??????????? channelFuture.channel().writeAndFlush(byteBuf);
?????????? ?
??????????? System.out.println("发送完毕"+port);
??????????? channelFuture.addListener(new ConnectionListener(this));
//??????????? channelFuture.addListener(new ConnectionListener(MyNettyClient1.this) {
//?????????????? ?
//??????????????? @Override
//??????????????? public void operationComplete(ChannelFuture arg0) throws Exception {
//??????????????????? System.out.println("发送成功"+port);
//??????????????? }
//??????????? });
??????????? System.out.println("发送完毕222"+port);
??????? } catch (Exception e) {
??????????? e.printStackTrace();

??????? }finally {
//??????????? if(channelFuture!=null){
//??????????????? if(channelFuture.channel()!=null && channelFuture.channel().isOpen()){
//??????????????????? channelFuture.channel().close();
//??????????????? }
//??????????????? System.out.println("重新连接");
//??????????????? startClient(ip,port,message);
//??????????? }
?????????? ?
??????? }

??? }
?? ?
??? public Bootstrap getBootstrap() {
??????? return bootstrap;
??? }

??? public void setBootstrap(Bootstrap bootstrap) {
??????? this.bootstrap = bootstrap;
??? }

??? public String getIp() {
??????? return ip;
??? }

??? public void setIp(String ip) {
??????? this.ip = ip;
??? }

??? public int getPort() {
??????? return port;
??? }

??? public void setPort(int port) {
??????? this.port = port;
??? }

??? public Timer getTimer() {
??????? return timer;
??? }

??? public void setTimer(Timer timer) {
??????? this.timer = timer;
??? }

??? public static void main(String[] args) {
??????? NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
??????? Bootstrap bootstrap = new Bootstrap();
?????? ?
??????? bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class);
?????? ?
??????? MyNettyClient1 myNettyClient1 = new MyNettyClient1();
??????? myNettyClient1.setBootstrap(bootstrap);
??????? myNettyClient1.startClient("192.168.1.31", 8087, "aaaaMyNettyClient1");
??? }
}


import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;

public class Test3InHandler extends ChannelInboundHandlerAdapter{

??? private MyNettyClient1 myNettyClient1;
??? private Timer timer;//为什么不用这个timer,而必须使用client.Timer,因为每一个新连接就是一个新的线程,原来的timer就不能清除了,每次进入active就是新的timer
?? ?
??? public Test3InHandler(){
?????? ?
??? }
?? ?
??? public Test3InHandler(MyNettyClient1 myNettyClient1) {
??????? this.myNettyClient1 = myNettyClient1;
??? }
?? ?
??? @Override
??? public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
??????? System.out.println("Test3InHandler? channelRead");
//??????? super.channelRead(ctx, msg);
??????? ctx.writeAndFlush(msg);
??? }
?? ?
??? @Override
??? public void channelActive(ChannelHandlerContext ctx) throws Exception {
??????? System.out.println("Test3InHandler? channelActive");

??????? if(myNettyClient1.getTimer()!=null){
??????????? myNettyClient1.getTimer().cancel();
??????????? myNettyClient1.setTimer(null);;
??????????? System.out.println("清空定时器");
??????? }
??????? super.channelActive(ctx);
??? }
?? ?
??? @Override
??? public void channelInactive(ChannelHandlerContext ctx) throws Exception {
???????? System.out.println("Test3InHandler? channelInactive");
?????? //使用过程中断线重连
???????? final EventLoop eventLoop = ctx.channel().eventLoop();
??????? ?
???????? if(myNettyClient1.getTimer()!=null){
???????????? myNettyClient1.getTimer().cancel();
???????????? myNettyClient1.setTimer(null);
???????? }
???????? myNettyClient1.setTimer(new Timer());
???????? myNettyClient1.getTimer().scheduleAtFixedRate(new TimerTask() {
?????????? ?
??????????? @Override
??????????? public void run() {
??????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler");
??????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接"); ?
??????????? }
??????? }, 5 *1000, 5*1000);
??????? ?
//???????? boolean flag = false;
//???????? if(!flag){
//??????????? ?
//???????????? eventLoop.scheduleAtFixedRate(new Runnable() {
//??????????????? ?
//???????????????? @Override
//???????????????? public void run() {
//???????????????????? if(!flag){
//???????????????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler");
//???????????????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接");
//???????????????????? }??????????????????? ?
//???????????????? }
//???????????? }, 5, 5, TimeUnit.SECONDS);
//???????? }
??????? ?
??????? ?
??????? ?
//???????? final ChannelFuture channelFuture =? ctx.channel().closeFuture();
//???????? if(!channelFuture.isSuccess()){//这个虽然能够多次连接,但是连接成功后,还是会走定时任务,不行,而且会把连接数弄死,曾经使用
//???????? timer,但是handler中的timer不行,必须使用client的timer
//???????????? eventLoop.scheduleAtFixedRate(new Runnable() {
//??????????????? ?
//???????????????? @Override
//???????????????? public void run() {
//???????????????????? if(!channelFuture.isSuccess()){
//???????????????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler");
//???????????????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接");
//???????????????????? }
//???????????????? }
//???????????? }, 5, 5, TimeUnit.SECONDS);
//???????? }
??????? ?
//???????? eventLoop.schedule(new Runnable() {
//???????????? @Override
//???????????? public void run() {
//???????????????? 这个只能走一次重新连接,如果连接不上就再也连接不了,不行
//???????????????? System.out.println("Test3InHandlerTest3InHandlerTest3InHandlerTest3InHandler");
//???????????????? myNettyClient1.startClient(myNettyClient1.getIp(),myNettyClient1.getPort(),"在Test3InHandler中重新连接");
//???????????? }
//???????? }, 1L, TimeUnit.SECONDS);
??????? super.channelInactive(ctx);
??? }
}


import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class Test04OutHandler extends ChannelOutboundHandlerAdapter {
?? ?
??? @Override
??? public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
??????? System.out.println("Test04OutHandler write");
??????? super.write(ctx, msg, promise);
??? }

}

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-22 20:22:50  更:2022-03-22 20:24:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:31:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码