IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> diyTomcat系列三,引入线程池并处理TCP长连接 -> 正文阅读

[网络协议]diyTomcat系列三,引入线程池并处理TCP长连接

作者:token keyword

diyTomcat系列三,引入线程池并处理TCP长连接

image-20211227164819753

本节欲解决的几个问题:

线程的频繁创建和销毁、每一次HTTP请求都会产生一个新的Socket连接,当请求数过多时开销较大

解决方案:

引入线程池、利用Connection=keep-alive、连接超时时间控制TCP长连接来处理http请求

1. 引入线程池

在正常的网络请求中,需要同时处理大量的http请求,如果每个http请求都交给一个单独的线程来处理,这个线程处理完后马上又会被销毁掉,这样将会消耗掉性能,造成不必要的开销,所以我们引入线程池来处理请求

//我们先来看一下线程池构造方法要传入那些参数
ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
//参数依次为:
/核心线程池的大小,获取服务器处理器的核心数
int corePoolSize = Runtime.getRuntime().availableProcessors();
//核心线程池的最大线程数
int maxPoolSize=corePoolSize * 2;
//线程最大空闲时间
long keepAliveTime=10;
//时间单位 设置为秒
TimeUnit unit=TimeUnit.SECONDS;
//阻塞队列 容量为 maxPoolSize * 4,最多允许放入 maxPoolSize * 4 个空闲任务
BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(maxPoolSize * 4);
//线程创建工厂
ThreadFactory threadFactory = new NameThreadFactory();
//线程池拒绝策略
RejectedExecutionHandler handler = new MyIgnorePolicy();

? 如果连接数不大的情况下使用线程池其实更浪费资源,所有Tomcat默认情况下是不使用线程池的,等到了一定的请求数再使用线程池,当然,基于高内聚低耦合的设计思路,我们也可以在server.xml第74行的配置文件中手动进行设置,设置默认启动线程池

image-20220122233328801

2. 处理TCP长连接

? 首先我们要知道,HTTP协议是一个应用层的协议应用层的协议协议是定义消息对话,确保正在发送的消息得到期待的响应,并且在传输数据时调用正确的服务。HTTP协议是基于请求/响应模式的。我们平时讨论的长连接是TCP长连接应用层不关注是长连接还是短连接,只要服务端给了响应,本次HTTP连接就结束了,或者更准确的说,是本次HTTP请求就结束了,下一次又是一个新的请求和新的响应,因此根本没有长连接这一说。那么自然也就没有短连接这一说了。

对应到代码上来看TCP长连接也就是socket一直没有被关闭,一直处于连接状态。

那怎么设置长连接呢?

? 我们只需要在请求头里设置Connection:keep-alive就可以了,这样当我们的服务器接收到这个请求时,就不会主动去关闭socket连接,当一个网页打开完成后,客户端和服务器之间用于传输HTTP数据的 TCP连接不会关闭,如果客户端再次访问这个服务器上的网页,会继续使用这一条已经建立的连接。当然设置为Keep-Alive的服务器也不会永久保持连接,它有一个保持时间,可以在不同的服务器软件(如Tomcat)中设定这个时间,如果客户端在规定的时间里没有进行http请求,就好断开连接,落实到代码里也就是关闭socket。实现长连接要客户端和服务端都支持长连接。HTTP协议的长连接和短连接,实质上是TCP协议的长连接和短连接。

Tomcat中设置连接超时时间可以在server.xml中第69行Connector里进行配置,可以看到Tomcat长连接的默认的时间是20s

image-20220122221203798

在笔者的第二篇文章中没有处理长连接,可以看到每一次http请求后都关闭了连接,然后又会进行一次新的连接

image-20220122221546658

接下来我们去优化他(其实也很简单,就是不关掉socket连接并设置超时时间就可以了):

主体代码是在第二篇文章中TaskService里面进行修改的,这里先贴修改和代码,完整代码在文章最后面后贴出,笔者这里还对连接次数进行了限制,防止一个线程处理过多的请求造成异常,配合线程池可以增加活跃状态的线程数来处理请求。

其中HttpServletRequestHttpServletResponse里都不能关闭流的连接,因为关闭通过socket获得的流,socket也会关闭,这样就会造成异常。

public void run() {
    long connectionTime = 0L;//记录连接时间
    while (flag){
        try {
            connectionCount++;
            logger.info(Thread.currentThread().getName()+",此长连接处理了"+connectionCount+"次请求");
            if(connectionCount > MAX_REQUEST_COUNT){
                connection = "close";
            }
            //http请求封装,如果没有请求会在这里堵塞
            HttpServletRequest req = new HttpServletRequest(socket);
            //根据请求做出响应
            HttpServletResponse resp = new HttpServletResponse(req, socket);
        } catch (Exception e) {
            logger.error("线程运行错误",e);
            connection = "close";
        }finally {
            //超时关闭连接
            if(connectionTime == 0){
                connectionTime = System.currentTimeMillis();
            }else {
                connectionTime = System.currentTimeMillis() - connectionTime;
                if(connectionTime > 20000){//超时时间可以从xml配置文件中读取
                    flag = false;
                    connection = "close";//关闭连接
                }
            }
            //超过连接次数关闭连接
            if("close".equals(connection)){
                flag = false;
                //不是长链接即关闭资源
                try {
                    if(socket != null){
                        socket.close();
                    }
                } catch (IOException ioException) {
                    logger.error(ioException.getMessage());
                }
                try {
                    if(in != null){
                        in.close();
                    }
                } catch (IOException ioException) {
                    logger.error(ioException.getMessage());
                }
                try {
                    if(out != null){
                        out.close();
                    }
                } catch (IOException ioException) {
                    logger.error(ioException.getMessage());
                }
            }
        }
    }
}

3. 项目完整代码

参照第二期的代码,这里只做了少许修改。

这里要切记HttpServletRequestHttpServletResponse里都不能关闭流的连接,因为关闭通过socket获得的流,socket也会关闭,这样就很造成异常,关闭Socket连接在TaskService进行处理

3.1 MyCatServer

package com.fx.tomcat;


import org.apache.log4j.Logger;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author:fengxian
 * @date: 2022/1/17 17:02
 * Description:
 */
public class MyCatServer {
    private final static Logger logger = Logger.getLogger(MyCatServer.class);
    private static Map<String, Map<String,String>> map;//用来包装配置文件
    private static boolean flag=true;
    public static void main(String[] args) {
        MyCatServer myCatServer = new MyCatServer();
        //从配置文件中获得服务的端口
        int tomcatServicePort = Integer.parseInt(map.get("Connector").get("port"));
        //根据配置文件使用不同的多线程策略
        if("true".equalsIgnoreCase(map.get("Connector").get("executor"))){
            myCatServer.startServerByThreadPool(tomcatServicePort);
        }else {
            myCatServer.startServerByThread(tomcatServicePort);
        }
    }

    /**
     * 不通过线程池
     */
    private void startServerByThread(int port){
        try (ServerSocket ss = new ServerSocket(port)) {
            while (flag){
                Socket socket = ss.accept();
                logger.debug("有用户进行请求,他是"+socket.getLocalSocketAddress());
                new Thread(new TaskService(socket)).start();
            }
        } catch (Exception e) {
            logger.error("服务器启动异常"+e);
        }
    }

    /**
     * 线程池开启线程
     */
    private void startServerByThreadPool(int port) {
        //核心线程池的大小,获取服务器处理器的核心数
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        //核心线程池的最大线程数
        int maxPoolSize=corePoolSize * 2;
        //线程最大空闲时间
        long keepAliveTime=10;
        //时间单位
        TimeUnit unit=TimeUnit.SECONDS;
        //阻塞队列 容量为2 最多允许放入两个空闲任务
        BlockingQueue<Runnable> workQueue=new ArrayBlockingQueue<>(maxPoolSize * 4);
        //线程创建工厂
        ThreadFactory threadFactory = new NameThreadFactory();
        //线程池拒绝策略
        RejectedExecutionHandler handler = new MyIgnorePolicy();
        //线程池执行者
        ThreadPoolExecutor executor;
        //不允许无节制的创建线程,必须使用线程池
        executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);
        //预启动所有核心线程,提升效率
        executor.prestartAllCoreThreads();
        try (ServerSocket ss = new ServerSocket(port)) {
            while(flag){
                Socket socket = ss.accept();
                logger.debug("有用户进行请求,他是"+socket.getLocalSocketAddress());
                //执行任务
                executor.submit(new TaskService(socket));
            }
        } catch (Exception e) {
        	logger.error("服务器启动异常:{}",e);
        } finally {
            //线程池关闭
            executor.shutdown();
        }
    }

    /**
     * 线程工厂
     */
    static class NameThreadFactory implements ThreadFactory {
        Logger logger = Logger.getLogger(NameThreadFactory.class);
        //线程id, AtomicInteger 原子类
        private final AtomicInteger threadId=new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "线程" + threadId.getAndIncrement());//相当于i++
            logger.info(t.getName()+"已经被创建了");
            return t;
        }
    }

    /**
     * 线程池BlockingQueue满后的拒绝策略
     */
    public static class MyIgnorePolicy implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            logger.error("线程池+"+ e.toString()+r.toString()+"被拒绝");
        }
    }

    //dom4j 解析 xml
    static {
        map = new ConcurrentHashMap<>();
        Map<String,String> sonMap = new HashMap<>();
        SAXReader saxReader = new SAXReader();
        Document document = null;
        try {
            document = saxReader.read("conf/server.xml");
        } catch (DocumentException e) {
            logger.error("配置文件解析失败{}",e);
        }
        assert document != null;
        Element root = document.getRootElement();
        Element connector = root.element("Service").element("Connector");
        sonMap.put("port",connector.attributeValue("port"));
        sonMap.put("protocol",connector.attributeValue("protocol"));
        sonMap.put("connectionTimeout",connector.attributeValue("connectionTimeout"));
        sonMap.put("redirectPort",connector.attributeValue("redirectPort"));
        sonMap.put("executor",connector.attributeValue("executor"));
        map.put("Connector",sonMap);
    }
}

3.2 TaskService

package com.fx.tomcat;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.time.Duration;
import java.time.Instant;

/**
 * @author: fengxian
 * @date: 2022/1/18 9:09
 * Description:
 */
public class TaskService implements Runnable{
    private final Logger logger = Logger.getLogger(TaskService.class);
    private Socket socket;
    private InputStream in = null;
    private OutputStream out = null;
    private String connection="keep-alive";
    private boolean flag;
    private final int MAX_REQUEST_COUNT = 30;
    private int connectionCount;//连接次数
    public TaskService(Socket socket){
        this.socket=socket;
        flag = true;
    }
    @Override
    public void run() {
        long connectionTime = 0L;
        while (flag){
            try {
                connectionCount++;
                logger.info(Thread.currentThread().getName()+",此长连接处理了"+connectionCount+"次请求");
                if(connectionCount > MAX_REQUEST_COUNT){
                    connection = "close";
                }
                //http请求封装,如果没有请求会在这里堵塞
                HttpServletRequest req = new HttpServletRequest(socket);
                //根据请求做出响应
                HttpServletResponse resp = new HttpServletResponse(req, socket);
            } catch (Exception e) {
                logger.error("线程运行错误",e);
                connection = "close";
            }finally {
                //超时关闭连接
                if(connectionTime == 0){
                    connectionTime = System.currentTimeMillis();
                }else {
                    connectionTime = System.currentTimeMillis() - connectionTime;
                    if(connectionTime > 20000){//超时时间可以从xml配置文件中读取
                        flag = false;
                        connection = "close";//关闭连接
                    }
                }
                //超过连接次数关闭连接
                if("close".equals(connection)){
                    flag = false;
                    //不是长链接即关闭资源
                    try {
                        if(socket != null){
                            socket.close();
                        }
                    } catch (IOException ioException) {
                        logger.error(ioException.getMessage());
                    }
                    try {
                        if(in != null){
                            in.close();
                        }
                    } catch (IOException ioException) {
                        logger.error(ioException.getMessage());
                    }
                    try {
                        if(out != null){
                            out.close();
                        }
                    } catch (IOException ioException) {
                        logger.error(ioException.getMessage());
                    }
                }
            }
        }
    }
}

HttpServletResponse和HttpServletRequest只是将关闭流的代码全部剔除了,其他未进行修改

4. 项目演示效果和新问题思考

image-20220122234040199

这里又有一个有意思的点

**为什么设置了长连接还是会产生这么多新的socket连接呢?**是不是超时了?

我们将超时时间设置为天长和地久再测试一遍

image-20220122234313036

测试结果:

image-20220122234857290

我们会发现还是会产生6个socket连接,但是我们多测试几次后就会发现,每次都会产生6个socket连接

这是为什么呢?

image-20220122235218520

? 那是因为浏览器在请求资源的时候不可能只是一个线程进行资源请求,这样请求的话会导致速度很慢,所以浏览器都会对同一个服务器设置有默认的HTTP最大并发连接数,主流浏览器默认并发数如下:

BrowserHTTP/1.1HTTP/1.0
IE 8,966
IE 6,724
Firefox 1766
Firefox 366
Firefox 228
Safari 3,444
Chrome 1,26?
Chrome 344
Chrome 4+6?

? 我们都知道在Tomcat中可以使用@WebServlet代替xml文件来进行地址映射,下一篇文章笔者将自定义这个注解并实现Servlet动态资源的访问。自定义注解的过程可以看下笔者的文章,注解学习一、Java内置注解及注解书写注解学习二、使用注解仿写junit测试框架,先尝试着写几个注解。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-01-24 11:18:35  更:2022-01-24 11:20:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/8 5:23:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码