IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 基于corundumstudio建立websocket长连接 -> 正文阅读

[网络协议]基于corundumstudio建立websocket长连接

依赖

<!--socket io -->
        <dependency>
            <groupId>io.socket</groupId>
            <artifactId>socket.io-client</artifactId>
            <version>1.0.1</version>
        </dependency>
        <dependency>
            <groupId>com.corundumstudio.socketio</groupId>
            <artifactId>netty-socketio</artifactId>
            <version>${netty-socketio.version}</version>
        </dependency>
 <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>
   <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>${kafka.version}</version>
        </dependency>

代码

/**
 * socket io 资源管理器
 * @author ken
 * @date 2021/1/6 16:18
 */
@Slf4j
@Component
public class SocketIOClientManager {

    @Autowired
    private KafkaConnectionManager kafkaConnectionManager;

    @Autowired
    private MqttConnectionManager mqttConnectionManager;

    @Resource
    private WebSocketEventHandler webSocketEventHandler;

    // 用来存已连接的客户端唯一ID, <KAFKA+" : "+URL+" : "+topics, <sessionID, CLIENT>>
    private Map<String, Map<String, SocketIOClient>> clientMap = Collections.synchronizedMap(new HashMap<>());


    public void addClient(SocketIOClient client) {
        String sessionID = client.getSessionId().toString();
        String resourceID = getParamsByClient(client);
        if (resourceID == null) {
            log.error("客户端未配置参数");
            client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");
        }
        if (clientMap.containsKey(resourceID)) {
            Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
            if (!subMap.containsKey(sessionID)) {
                subMap.put(sessionID, client);
                clientMap.put(resourceID, subMap);
            }
        } else {
            final HashMap<String, SocketIOClient> subMap = new HashMap<>();
            subMap.put(sessionID, client);
            clientMap.put(resourceID, subMap);
        }
        log.info("在线客户端: " + clientMap.toString());
    }


    public void removeClient(SocketIOClient client) {
        String sessionID = client.getSessionId().toString();
        String resourceID = getParamsByClient(client);
        if (resourceID == null) {
            log.error("客户端未配置参数");
            client.sendEvent("fail", 403, "Type" + splitStr + "address" + splitStr + "topic" + splitStr + "topic" + splitStr + "topic...");
            return;
        }
        if (clientMap.containsKey(resourceID)) {
            final Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
            final Iterator<Map.Entry<String, SocketIOClient>> iterator = subMap.entrySet().iterator();
            while (iterator.hasNext()) {
                final Map.Entry<String, SocketIOClient> clientEntry = iterator.next();
                if (clientEntry.getKey().equals(sessionID)) {
                    iterator.remove();
                    log.info("移除客户端: {}", sessionID);
                    // 如果移除session后对应url没有对应session,那么移除url
                    if (subMap.size() == 0) {
                        clientMap.remove(resourceID);
                        log.info("移除ID: {}", resourceID);
                        if (resourceID.startsWith(String.valueOf(ResourceType.KAFKA))) {
                            kafkaConnectionManager.removeConnection(resourceID);
                        }
                        if (resourceID.startsWith(String.valueOf(ResourceType.MQTT))) {
                            mqttConnectionManager.removeConnection(resourceID);
                        }
                    } else {
                        clientMap.put(resourceID, subMap);
                    }
                }
            }
        } else {
            log.info("没有 {} 对应的{} 客户端", resourceID, sessionID);
        }
    }

    public void pushClientMesg2Kafka(SocketIOClient client, String topic, String mesg) throws ExecutionException, InterruptedException {
        String resourceID = getParamsByClient(client);
        KafkaPubSubServer kafkaServer = (KafkaPubSubServer) kafkaConnectionManager.getServerByResourceID(resourceID);
        if (kafkaServer == null) {
            throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");
        }
        if (clientMap.containsKey(resourceID)) {
            kafkaServer.pushMesg(topic, mesg);
        }
    }

    public void pushKafkaMesg2Client(String resourceID, String mesg) {
        if (clientMap.containsKey(resourceID)) {
            Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
            for (SocketIOClient ioClient : subMap.values()) {
                ioClient.sendEvent(webSocketEventHandler.getClientSubKafkaEvent(), mesg.toString());
            }
        }
    }

    public void pushClientMesg2MQTT(SocketIOClient client, String topic, String mesg) throws MqttException {
        String resourceID = getParamsByClient(client);
        MqttPubSubServer mqttServer = (MqttPubSubServer) mqttConnectionManager.getServerByResourceID(resourceID);
        if (mqttServer == null) {
            throw DataTException.asDataTException(CommonEnum.CONF_ERROR, "数据源未配置");
        }
        if (clientMap.containsKey(resourceID)) {
            mqttServer.pushMesg(topic, mesg);
        }
    }

    public void pushMQTTMesg2Client(String resourceID, String mesg) {
        if (clientMap.containsKey(resourceID)) {
            Map<String, SocketIOClient> subMap = clientMap.get(resourceID);
            for (SocketIOClient ioClient : subMap.values()) {
                ioClient.sendEvent(webSocketEventHandler.getClientSubEmqEvent(), mesg.toString());
            }
        }
    }


    /**
     * 此方法为获取client连接中的参数,可根据需求更改
     *
     * @param client
     * @return
     */
    private String getParamsByClient(SocketIOClient client) {
        // 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
        final String resourceID = client.getHandshakeData().getSingleUrlParam("resourceID");
        return resourceID;
    }

}

@Configuration
public class SocketIOConfig {

    @Value("${socket-io.host}")
    private String host;

    @Value("${socket-io.port}")
    private int port;






    public String getUrl() {
        return "http://" + host + ":" + port;
    }


    public SocketIOConfig() {
    }

    @Bean
    public SocketIOServer socketIOServer() {
        //创建Socket,并设置监听端口
        com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
        // 设置主机名,默认是0.0.0.0
        config.setHostname(host);
        // 设置监听端口
        config.setPort(port);
        // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间
        config.setUpgradeTimeout(10000);
        // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
        config.setPingInterval(25000);
        // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
        config.setPingTimeout(60000);
        return new SocketIOServer(config);
    }

    @Bean
    public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
        return new SpringAnnotationScanner(socketServer);
    }
}
@Component
@Slf4j
public class WebSocketEventHandler {

    @Autowired
    private SocketIOClientManager socketIOClientManager;

    public String getClientSubKafkaEvent() {
        return clientSubKafkaEvent;
    }

    public String getClientPubKafkaEvent() {
        return clientPubKafkaEvent;
    }

    public String getClientSubEmqEvent() {
        return clientSubEmqEvent;
    }

    public String getClientPubEmqEvent() {
        return clientPubEmqEvent;
    }


    private final String clientSubKafkaEvent = "subKafka";

    private final String clientPubKafkaEvent = "pubKafka";

    private final String clientSubEmqEvent = "subEmq";

    private final String clientPubEmqEvent = "pubEmq";

    @OnConnect
    public void onConnect(SocketIOClient client) {
        log.info("客户端发起连接. sessionId->{}", client.getSessionId());
        socketIOClientManager.addClient(client);
    }

    @OnDisconnect
    public void onDisconnect(SocketIOClient client) {
        final String sessionID = client.getSessionId().toString();
        log.info("客户端断开连接, sessionId->{}" + sessionID);
        socketIOClientManager.removeClient(client);
        client.disconnect();
    }

    // kafka消息接收入口
    @OnEvent(value = clientPubKafkaEvent)
    public void pushKafka(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {
        if (StrUtil.isEmpty(topic)) {
            ackRequest.sendAckData(400, "topic不能为空");
        }
        if (StrUtil.isEmpty(mesg)) {
            ackRequest.sendAckData(400, "mesg不能为空");
        }
        try {
            socketIOClientManager.pushClientMesg2Kafka(client, topic, mesg);
            ackRequest.sendAckData(200, "id");
        } catch (Exception e) {
            e.printStackTrace();
            ackRequest.sendAckData(500, e.getMessage());
        }
    }

    // emq信息接收入口
    @OnEvent(value = clientPubEmqEvent)
    public void pushEmq(SocketIOClient client, AckRequest ackRequest, String topic, String mesg) {
        if (StrUtil.isEmpty(topic)) {
            ackRequest.sendAckData(400, "topic不能为空");
        }
        if (StrUtil.isEmpty(mesg)) {
            ackRequest.sendAckData(400, "mesg不能为空");
        }
        try {
            socketIOClientManager.pushClientMesg2MQTT(client, topic, mesg);
            ackRequest.sendAckData(200, "id");
        } catch (Exception e) {
            e.printStackTrace();
            ackRequest.sendAckData(500, e.getMessage());
        }
    }


}
@Component
@Order(1)
public class ServerRunner implements CommandLineRunner {
    private final SocketIOServer server;
    private static final Logger logger = LoggerFactory.getLogger(ServerRunner.class);

    @Autowired
    public ServerRunner(SocketIOServer server) {
        this.server = server;
    }

    @Override
    public void run(String... args) {
        logger.info("SocketIO 启动...");
        server.start();
    }
}
@Slf4j
public class SocketClientEMQTest {
    public static void main(String[] args) {
        final SocketClientEMQTest socketClientTest = new SocketClientEMQTest();
        try {
            socketClientTest.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run(String... args) throws Exception {
        URI uri = URI.create("http://127.0.0.1:9201");
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.query = "resourceID=" + "mqtt$$tcp://localhost:1883$$test";
        Socket socket = IO.socket(uri, options);
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("connect: {}", args);
            }
        });
        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("disconnect: {}", args);
            }
        });

        socket.on("subEmq", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("push_mqtt {}", args);
            }
        });
      /*  socket.on("push_kafka", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("push_kafka {}" , args);
            }
        });*/

        final ArrayList<String> arrayList = new ArrayList<>();
//        arrayList.add("")
        int i = 0;
        while (true) {
            i += 1;
            socket.emit("pubEmq", "test", "testmesg" + i, new Ack() {
                @Override
                public void call(Object... objects) {
                    log.info("userChat ack:{}|{}", objects[0], objects[1]);
                }
            });
            if (i >= 10) {
                break;
            }
            Thread.sleep(2000);
        }
        socket.connect();
        LockSupport.park();
    }
}
@Slf4j
public class SocketClientKAFKATest {
    public static void main(String[] args) {
        final SocketClientKAFKATest socketClientTest = new SocketClientKAFKATest();
        try {
            socketClientTest.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void run(String... args) throws Exception {
        URI uri = URI.create("http://127.0.0.1:9201");
        IO.Options options = new IO.Options();
        options.transports = new String[]{"websocket"};
        options.reconnectionAttempts = 2;
        options.query = "resourceID=" + "kafka$$localhost:9092$$test12399";
//        options.query = "loginUserNum=" + "mqtt$$tcp://localhost:1883$$test";
        Socket socket = IO.socket(uri, options);
        socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("connect: {}", args);
            }
        });
        socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("disconnect: {}", args);
            }
        });
        socket.on("subKafka", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("push_kafka {}", args);
            }
        });

      /*  socket.on("push_kafka", new Emitter.Listener() {
            @Override
            public void call(Object... args) {
                log.info("push_kafka {}" , args);
            }
        });*/

        final ArrayList<String> arrayList = new ArrayList<>();
//        arrayList.add("")
        int i = 0;
        while (true) {
            i += 1;
            socket.emit("pubKafka", "TEST", "testmesg" + i, new Ack() {
                @Override
                public void call(Object... objects) {
                    log.info("userChat ack:{}|{}", objects[0], objects[1]);
                }
            });
            if (i >= 10) {
                break;
            }
            Thread.sleep(2000);
        }
        socket.connect();
        LockSupport.park();
    }
}

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-11-20 18:45:47  更:2021-11-20 18:47:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/6 19:24:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码