IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 定时器ScheduledExecutorService原理分析 -> 正文阅读

[大数据]定时器ScheduledExecutorService原理分析

一 简单使用

环境:jdk8

DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ScheduledExecutorService executorService=new ScheduledThreadPoolExecutor(1);
        executorService.scheduleWithFixedDelay(()->{
            System.out.println(dtf.format(LocalDateTime.now()));
        },1,3, TimeUnit.SECONDS);

有关spring中定时器使用,请点击https://blog.csdn.net/yegeg/article/details/121654509

从new ScheduledThreadPoolExecutor可以看出,我们这个定时器也和线程池有关系,接下来看ScheduledThreadPoolExecutor类

?它扩展了ThreadPoolExecutor ,说明它也是一个线程池,也实现了ScheduledExecutorService ,这个接口定义了定时器所拥有的方法

public interface ScheduledExecutorService extends ExecutorService {

    /**
     * Creates and executes a one-shot action that becomes enabled
     * after the given delay.
     *
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    /**
     * Creates and executes a ScheduledFuture that becomes enabled after the
     * given delay.
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the given
     * period; that is executions will commence after
     * {@code initialDelay} then {@code initialDelay+period}, then
     * {@code initialDelay + 2 * period}, and so on.
     * If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.  If any execution of this task
     * takes longer than its period, then subsequent executions
     * may start late, but will not concurrently execute.
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    /**
     * Creates and executes a periodic action that becomes enabled first
     * after the given initial delay, and subsequently with the
     * given delay between the termination of one execution and the
     * commencement of the next.  If any execution of the task
     * encounters an exception, subsequent executions are suppressed.
     * Otherwise, the task will only terminate via cancellation or
     * termination of the executor.
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

我们常用的shedule、fixedRate、fixedDelay都在这里有提前申明

二 ScheduledExecutorService和线程池有什么关系

从ScheduledThreadPoolExecutor类继承,我们就知道?ScheduledExecutorService继承于ThreadPoolExecutor ,那么我们定时任务也是用线程池中规则来执行的,这篇文章,我们重点关注是ScheduledExecutorService,现在我们来看下ScheduledThreadPoolExecutor的scheduleWithFixedDelay源码

 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
         // ①
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
         
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);// ②
        sft.outerTask = t;
        delayedExecute(t);  // ③
        return t;
    }

这个函数主要做了三件事?

① 把我们的执行任务、延时、执行周期包装成ScheduledFutureTask

sequenceNumber 任务在创建时候递增,如果两个任务执行时间相同,就可以根据该变量来判断哪个任务先执行

time 任务执行需要等待时间

period 执行周期,如果是正数表示fixedRate任务,如果是负数表示fixedDelay任务,0表示非重复任务

② 执行了decorateTask方法,主要作用是一个扩展点,允许用户修改和替换task

③ 通过delayedExecute把任务放到一个队列中,并且通过ensurePrestart方法去保证至少有一个线程开始执行,ensurePrestart是线程池的方法,它保证有线程能启动去执行任务,有关线程池的原理,将在线程池文章中分析

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

到这里,把任务封装成一个task 放到队列中,并且开启了线程来执行。

三 它是怎么实现周期性调用呢

通过上面,我们把任务放到队列,并且开启线程来执行,他是怎么执行呢,我们知道用Thread去启动一个任务的时候,这个任务可以实现Runnable接口,然后去执行他里面的run,我们上面说封装的ScheduledFutureTask任务,也是实现了Runnable接口,那么线程在执行的时候,也会去执行run,我们来看下ScheduledFutureTask中重写run方法

 public void run() {
            boolean periodic = isPeriodic();//①
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();// ②
            else if (ScheduledFutureTask.super.runAndReset()) { // ③
                setNextRunTime();
                reExecutePeriodic(outerTask); 
            }
        }

①处就是判断是不是周期任务,如果ScheduledFutureTask的变量period不等于0 表示周期任务

② 如果是一次性任务就执行ScheduledFutureTask.super.run(),执行这个方法的时候会去执行FutrueTask中的run。里面调用了我们自己写的要执行的任务,把这里理解成去执行我们自己的任务。

③ 如果是周期任务 就执行ScheduledFutureTask.super.runAndReset()并且获取下次执行的时间,我们先来看下runAndReset是什么,它也是FutrueTask中的方法,执行了该方法后,它会重置为初始状态。

setNextRunTime()去设置我们任务下次执行的所需等待的时间,这里就体现了fixedRate和fixedDelay执行周期的差别。

reExecutePeriodic又把我们的任务重新放到队列中,启动一个线程去执行

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

这样就实现了我们任务周期性调用,放入队列-->调用任务-->计算下次执行时间-->放入队里-->调用任务-->计算下次执行时间....

四 怎么实现获取即将要执行的任务

从上面分析,我们知道了任务是怎么周期性的执行,我们也知道每个任务都有等待时间,那么我们是怎么获取即将要执行的等待时间最短的任务呢,等待这段时间线程做了什么?这里用到一个叫DelayedWorkQueue的队列,他是一个阻塞队列,线程池中的线程会通过他的taken来获取要执行的任务,taken方法会阻塞直到能获取最近要执行的任务才返回,任务是放到一个最小堆里面,堆顶就是等待时间最少的任务,如果任务等待时间还没有到达,线程就会被阻塞,释放cup

思考下

DelayedWorkQueue队列在新增和移除任务时候,是怎样实现最小堆存放的

调用taken时候,是怎么把线程阻塞起来,又怎么被唤醒

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-04 13:30:42  更:2021-12-04 13:32:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:13:05-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码