IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> tomcat使用AbstractQueuedSynchronizer进行限流分析 -> 正文阅读

[Java知识库]tomcat使用AbstractQueuedSynchronizer进行限流分析

tomcat源码系列导航栏

? ??tomcat源码分析环境搭建

? ??tomcat启动过程-load初始化

? ? tomcat启动过程-start启动

? ? tomcat使用AbstractQueuedSynchronizer进行限流分析

目录

代码块一acceptor.run()

代码块二countUpOrAwaitConnection()

代码块三 countUpOrAwait

代码块四 tryAcquireShared

代码块五?doAcquireSharedInterruptibly


前言

前一篇文章,分析了tomcat的启动过程,tomcat的start启动过程,一步一步的启动,主线程创建accpetor线程,accpetor线程阻塞的监听8080端口的请求进来,如果有请求进来,然后就把它放入一个事件列表中,又继续监听8080端口。主线程创建的poller线程就去轮训事件列表,如果有事件进来,那就交给线程池去处理。我们这一篇文章来分析tomcat的限流。

我们先看一下上一篇文章的代码块五acceptor.run()方法

代码块一acceptor.run()

  //后台线程监听传入的TCP/IP连接并将其传递给适当的处理器。
        @Override
        public void run() {
            //Thread.currentThread().setName("我是响应用户网页请求的线程");
            int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    //代码一这里就是进行tomcat的限流操作
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        //分析一 通过nio的方式获取客户端的请求
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        countDownConnection();
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        //分析二 处理这个请求
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }


? ? ? 我们看的这个方法countUpOrAwaitConnection(),接着进去。

代码块二countUpOrAwaitConnection()

    protected void countUpOrAwaitConnection() throws InterruptedException {
       //这个最大连接数,我们在tomcat启动过程中默认设置的是10000
       //可以看到如果我们修改最大连接数为-1,那就代码不进行限流操作 
       if (maxConnections==-1) {
            return;
        }
        //这里获取一个限流器,也是在tomcat启动过程中默认创建的限流器
        LimitLatch latch = connectionLimitLatch;
        if (latch!=null) {
            //进行限流操作
            latch.countUpOrAwait();
        }
    }

????????我们可以看到这里获得了一个限流器。LimitLatch 这个限流器调用Latch的countUpOrAwait进行限流

代码块三 countUpOrAwait

    /**
     * Acquires a shared latch if one is available or waits for one if no shared
     * latch is current available.
     * 如果共享锁可用,则获取共享锁;如果当前没有共享锁可用,则等待共享锁。
     * @throws InterruptedException If the current thread is interrupted
     */
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        
        sync.acquireSharedInterruptibly(1);
    }

????????可以看到这个方法就是调用了一个sync.acquireSharedInterruptibly(1);就进行了限流操作。那么它的原理是什么。首先我们看一下sync是一个什么东西。

//这是一个限流器
public class LimitLatch {

    private static final Log log = LogFactory.getLog(LimitLatch.class);
    
    //我们可以发现Sync是一个继承自AQS的内部类
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }
}

????????我们可以看到原来Sync其实是LimitLatch的内部类,并且它还继承自AQS,我们知道AQS定义了一套多线程访问共享资源的同步器框架。其实原理就是它定义了一个双向链表和节点构成的队列。我们如果获取锁成功,则往下操作。如果获取锁失败(说明锁被别人拿去了,排队等待),则将线程创建成一个节点放入到哪个双向链表中去进行等待。

????????放入到队列中的线程。底层使用LockSupport.park(this)来阻塞线程,LockSupport是使用UNSAFE这个类来使操作系统进行线程的阻塞。

????????如果你不了解AQS这里可以参考一篇文章????????Java并发:AbstractQueuedSynchronizer详解(独占模式)

????????接下来的部分需要先了解AQS才能理解更加深刻

????????我们接着往下看

sync.acquireSharedInterruptibly(1)。这个方法做了什么
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

接着进去看

代码块四 tryAcquireShared

 //尝试去操作 返回-1代表需要进行限流了。然后1代表不需要限流。
        @Override
        protected int tryAcquireShared(int ignored) {
            //count是统计的客户端的请求数,每一次请求进来,count的原子的增加1
            long newCount = count.incrementAndGet();
            //如果请求数已经大于了我们设置的最大连接数10000
            if (!released && newCount > limit) {
                // Limit exceeded
                //我们将count减少1
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

代码块五?doAcquireSharedInterruptibly

//以下这个方法就是,当我们尝试去获取锁失败后,
//我们会创建一个节点,然后将该节点设置进一个由双向链表组成的队列中去。
//然后再根据节点的状态看是否需要进行线程阻塞
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

????????我们可以看到以上这个方法就是,当我们尝试去获取锁失败后,我们会创建一个节点,然后将该节点设置进一个由双向链表组成的队列中去。然后再根据节点的状态看是否需要进行线程阻塞。这样我们就实现了限流操作。

????????最后总结一下AbstractQueuedSynchronizer 这个类的设计方式,和具体的实现类是怎么个操作方式。首先AQS这个类提供了必须要子类实现以下四个方法,

作用分别是tryAcquire 、获取 独享锁,tryRelease、释放独享锁,tryAcquireShared获取共享锁,tryReleaseShared释放共享锁。两两配对使用的。

?????????具体怎样才算获取锁成功,这个需要我们自己业务确定。比如本篇文章,LimitLatch使用到的获取共享锁,只有我们的请求连接数小于10000。我们就认为这个动作就是获取锁成功。如果获取锁失败,那么就将当前线程放入队列,然后阻塞。

? ? ? ? 再比如ReentrantLock这个类是一个独享锁。它定义了一个state整数字段。0表示没有线程之前获取过锁,那么我们线程去操作的时候,将state由0修改为1,那么我们就认为获取锁成功。如果state是1或者大于1的整数。并且之前不是当前线程把它获取的。那么当前线程就插入队列中去阻塞。代表获取锁失败。

? ? ? ? 诸如此类,我们以此类推Semaphore、CountDownLatch、ReentrantReadWriteLock都是类似的使用。

 
 
 

                
        
        
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-25 07:59:26  更:2021-11-25 08:00:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 3:46:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码