IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> Java 双工通信与websocket协议 -> 正文阅读

[网络协议]Java 双工通信与websocket协议

前言

最近做项目,需要双工通信,考虑http协议,但是是单向的通信,只能请求响应,不能从服务端推送,如果要服务器推送,方式有很多,http轮训,长轮训,websocket等,实际上tcp传输层是双向通信的,原始的socket就可以实现。现在最常用的是websocket,因为可以复用http的底层tcp连接,方便,当然http3使用udp通信,基于QUIC保证连接可靠。

Java socket方式

TCP是面向双向连接的协议,刚好符合条件。server端暴露端口用于连接,实现多方client收发器

package com.feng.socket.admin;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class SocketServer {
    private static AtomicInteger sessionId = new AtomicInteger(10000);
    private static Map<String, Socket> socketMap = new ConcurrentHashMap<>();

    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20));
    private ServerSocket serverSocket;

    public SocketServer(int port) {
        try {
            this.serverSocket = new ServerSocket(port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void start() {
        //server to client content
        threadPoolExecutor.execute(new WriteRunnable());
        try {
            while (true) {
                Socket socket = this.serverSocket.accept();
                String id = sessionId.incrementAndGet()+"";
                System.out.println("connect from client, the sessionId is :\t"+id);
                socketMap.put(id, socket);
                //client to server content
                threadPoolExecutor.execute(new ReadRunnable(id));

            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static Socket getSessionSocket(String sessionId){
        return socketMap.get(sessionId);
    }
    public static void removeSessionSocket(String sessionId){
        socketMap.remove(sessionId);
    }

    public static void main(String[] args) {
        new SocketServer(8083).start();
    }
}

其中收的线程

package com.feng.socket.admin;

import java.io.*;
import java.net.Socket;

public class ReadRunnable implements Runnable{

    private String sessionId;

    public ReadRunnable(String sessionId) {
        this.sessionId = sessionId;
    }

    @Override
    public void run() {
        try {
            Socket socket = SocketServer.getSessionSocket(sessionId);
            if (socket.isClosed()) {
                System.out.println("client已断开连接------");
                return;
            }
            InputStream in = socket.getInputStream();
            Reader reader = new InputStreamReader(in);
            BufferedReader br = new BufferedReader(reader);

            while (true) {
                if (socket.isClosed()) {
                    SocketServer.removeSessionSocket(sessionId);
                    System.out.println("client已断开连接------");
                    break;
                }
                try {
                    String line = br.readLine();
                    if (line == null) {
                        System.out.println("client已断开连接------");
                        break;
                    }
                    System.out.println(sessionId + ":" + line);
                    if ("shutdown".equalsIgnoreCase(line)) {
                        socket.close();
                        SocketServer.removeSessionSocket(sessionId);
                        System.out.println("client已断开连接------");
                        break;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            br.close();
            reader.close();
            in.close();
            socket.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

?发的方式,使用console输入

package com.feng.socket.admin;

import java.io.*;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class WriteRunnable implements Runnable{

    Map<String, PrintWriter> cache = new ConcurrentHashMap<>();

    @Override
    public void run() {
        String line = "";
        PrintWriter pw;
        try (InputStreamReader inputStreamReader = new InputStreamReader(System.in);
             BufferedReader br = new BufferedReader(inputStreamReader)) {
            while (true) {
                if ((line = br.readLine()) != null) {
                    // 使用 sessionId:xxx 表示发送某个client,发送所有client广播就是迭代发送
                    String sessionId = line.substring(0,5);
                    String data = line.substring(6);
                    Socket socket = SocketServer.getSessionSocket(sessionId);
                    if (socket.isClosed()) {
                        System.out.println("server连接已断开-----");
                    }
                    if (cache.containsKey(sessionId)) {
                        pw = cache.get(sessionId);
                    } else {
                        try {
                            OutputStream out = socket.getOutputStream();
                            pw = new PrintWriter(out);
                            cache.put(sessionId, pw);
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }
                    pw.println(data);
                    pw.flush();
                    if ("shutdown".equalsIgnoreCase(data)) {
                        try {
                            cache.get(sessionId).close();
                            socket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

client端代码

package com.feng.socket.client;

import java.io.*;
import java.net.Socket;
import java.util.Scanner;

public class SocketClient {
    public static void main(String[] args) throws IOException {
        final Socket socket = new Socket("192.168.79.104", 8083);
        final Scanner scanner = new Scanner(System.in);
        new Thread(new Runnable() {
            @Override
            public void run() {
                String line = "";
                try (InputStream in = socket.getInputStream();
                     Reader reader = new InputStreamReader(in);
                     BufferedReader br = new BufferedReader(reader)) {
                    while (true) {
                        if (socket.isClosed()) {
                            System.out.println("server断开连接==========");
                            break;
                        }
                        line = br.readLine();
                        if (line != null) {
                            System.out.println(line);
                            if ("shutdown".equalsIgnoreCase(line)) {
                                break;
                            }
                        } else {
                            System.out.println("server断开连接==========");
                            scanner.close();
                            break;
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        socket.close();
                        System.exit(0);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                String line;
                try (OutputStream out = socket.getOutputStream();
                     PrintWriter pw = new PrintWriter(out)) {
                    while (true) {
                        if (socket.isClosed()) {
                            System.out.println("client断开连接==========");
                            break;
                        }
                        if (scanner.hasNext()) {
                            line = scanner.next();
                            pw.println(line);
                            pw.flush();
                            out.flush();
                            if ("shutdown".equalsIgnoreCase(line)) {
                                break;
                            }
                        }
                    }
                    if (!socket.isClosed()) {
                        socket.close();
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();


    }
}

?相对比较简单,测试如下

实现了双工,简单模式BIO。

websocket协议

实际上websocket就是用来解决这个问题的一个标准,来自于Oracle官网JSR 356, Java API for WebSocket (oracle.com)

?目前的成熟方案是在javaee上实现协议升级,复用Tomcat等的底层容器的TCP连接。spring boot demo,配置bean,当然也可以使用Oracle官网的方式,这里springboot封装了。

package com.feng.socket.admin;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter initServerEndpointExporter(){
        return new ServerEndpointExporter();
    }
}

写一个server端websocket连接收发器

package com.feng.socket.admin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
@ServerEndpoint("/websocket/server/{sessionId}")
public class SocketServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(SocketServer.class);
    private static Map<String, Session> sessionMap = new ConcurrentHashMap<>();

    private String sessionId = "";

    @OnOpen
    public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
        this.sessionId = sessionId;
        sessionMap.put(sessionId, session);
        LOGGER.info("new connect, sessionId is " + sessionId);
    }

    @OnClose
    public void onClose() {
        sessionMap.remove(sessionId);
        LOGGER.info("close socket, the sessionId is " + sessionId);

    }

    @OnMessage
    public void onMessage(String message, Session session) {
        LOGGER.info("--------- receive message: " + message);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        LOGGER.error(error.getMessage(), error);
    }

    public static void sendMessage(Session session, String message) throws IOException {
        session.getBasicRemote().sendText(message);
    }

    public static Session getSession(String sessionId){
        return sessionMap.get(sessionId);
    }
}

写一个发送接口

package com.feng.socket.admin;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.websocket.Session;
import java.io.IOException;

@RestController
@SpringBootApplication
public class SocketMain {
    public static void main(String[] args) {
        SpringApplication.run(SocketMain.class, args);
    }

    @RequestMapping("/msg")
    public String sendMsg(String sessionId, String msg) throws IOException {
        Session session = SocketServer.getSession(sessionId);
        SocketServer.sendMessage(session, msg);
        return "send " + sessionId + " : " + msg;
    }
}

在使用java-websocket写一个client端

        <dependency>
            <groupId>org.java-websocket</groupId>
            <artifactId>Java-WebSocket</artifactId>
            <version>1.5.3</version>
        </dependency>

code

package com.feng.socket.client;

import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Scanner;

public class SocketClient {
    public static void main(String[] args) throws URISyntaxException {
        WebSocketClient webSocketClient = new WebSocketClient(new URI("ws://127.0.0.1:8083/websocket/server/10001")) {
            @Override
            public void onOpen(ServerHandshake serverHandshake) {
                System.out.println(serverHandshake.getHttpStatus() + " : " + serverHandshake.getHttpStatusMessage());
            }

            @Override
            public void onMessage(String s) {
                System.out.println("receive msg is " + s);
            }

            @Override
            public void onClose(int i, String s, boolean b) {
                System.out.println(s);
            }

            @Override
            public void onError(Exception e) {
                e.printStackTrace();
            }
        };
        webSocketClient.connect();
        while (!ReadyState.OPEN.equals(webSocketClient.getReadyState())) {
            System.out.println("wait for connecting ...");
        }

        webSocketClient.send("hello");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String line = scanner.next();
            webSocketClient.send(line);
        }
        System.out.println("start websocket client...");
    }
}

实现服务端与client端双向发送,点对点传输。?

websocket协议原理

通过抓包,发现websocket实际上是http协议上的升级

?先client通过http协议发送协商,connection :Upgrade ,Upgrade的模式是websocket,按照这个理论,我们也可以自定义一种其他Upgrade的协议传输数据。

协商的key和版本

然后服务端返回协商的结果

?

协议达成,如果需要TLS加密传输还要证书传递等

?后面就会ping pong

?

总结

这里的双向通信,使用的TCP的特性,websocket也是在http的基础上升级而来,实际上现在的QUIC协议也可以实现在UDP上通信,就是HTTP3.0,就不再适合websocket了,估计会出现新的成熟技术。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-07-03 11:09:04  更:2022-07-03 11:09:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 23:19:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码