12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析

ScheduledThreadPoolExecutor 介绍

ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。

简单使用

public static void main(String[] args) throws InterruptedException {
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);

    //1. execute
    // 和普通线程池执行一样
    executor.execute(() -> {
        System.out.println("execute");
    });

    //2. schedule
    // 指定延迟时间,一次性执行任务
    executor.schedule(() -> {
        System.out.println("schedule");
    },2000,TimeUnit.MILLISECONDS);

    //3. AtFixedRate
    // 周期性执行任务(周期时间:执行时间和延迟时间的最大值)
    executor.scheduleAtFixedRate(() -> {
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("at:" + System.currentTimeMillis());
    },3000,2000,TimeUnit.MILLISECONDS);

    //4. WithFixedDelay
    // 周期性执行任务(周期时间:执行时间 + 延迟时间)
    executor.scheduleWithFixedDelay(() -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("with:" + System.currentTimeMillis());
    },3000,2000,TimeUnit.MILLISECONDS);
}

核心内容

在这里插入图片描述

ScheduledFutureTask - 任务

ScheduledFutureTask 实现了 RunnableScheduledFuture 接口,间接的实现了 Delayed 接口,让任务可以放到延迟队列中,并且基于二叉堆做排序

private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {

        /** 计数器,每个任务都有一个全局唯一的序号
        	如果任务的执行时间一模一样,比对sequenceNumber */
        private final long sequenceNumber;

        /** 任务执行的时间,单位是纳秒 */
        private long time;

        /* 
        * period == 0:表示一次性执行的任务 
        * period > 0:表示使用的是 scheduleAtFixedRate
        * period < 0:表示使用的是 scheduleWithFixedDelay 
        * */
        private final long period;

        /** 周期性执行任务时,引用具体任务,方便后面重新扔到阻塞队列 */
        RunnableScheduledFuture<V> outerTask = this;

        /**
         * Index into delay queue, to support faster cancellation.
         */
        int heapIndex;
		// 实现Delayed接口重写的方法,执行时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), NANOSECONDS);
        }
		// 实现Delayed接口重写的方法,比较方式
        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
    }

DelayedWorkQueue - 队列

DelayedWorkQueue 包装了 RunnableScheduledFuture<?>[]

static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {
		// 初始长度
        private static final int INITIAL_CAPACITY = 16;
        // 数组
        private RunnableScheduledFuture<?>[] queue =
            new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
        // 锁
        private final ReentrantLock lock = new ReentrantLock();
        // 长度
        private int size = 0;
		// 等待拿堆顶数据的线程
        private Thread leader = null;
		// Condition 队列
        private final Condition available = lock.newCondition();
}

源码分析

execute()

直接调用 schedule(),入参 delay = 0

public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }

schedule()

延迟一段时间,执行一次任务

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        // 非空校验
        if (callable == null || unit == null)
            throw new NullPointerException();
        // 将任务封装成 ScheduledFutureTask 
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        // 延迟执行任务                               
        delayedExecute(t);
        return t;
    }


	// 返回当前任务要执行的系统时间
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

delayedExecute() 延迟执行任务

private void delayedExecute(RunnableScheduledFuture<?> task) {
		// 线程池状态不是RUNNING,就拒绝任务
        if (isShutdown())
            reject(task);
        else {
			// 将任务放入延迟队列中(二叉堆)
            super.getQueue().add(task);
			// 1.线程池状态
			// 2.根据策略决定是否能执行
			// 3.将任务从延迟队列移除
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
				// 是否需要创建线程
                ensurePrestart();
        }
    }
	
	
	
// periodic - true:代表是周期性执行的任务
// periodic - false:代表是一次性的延迟任务
boolean canRunInCurrentRunState(boolean periodic) {
	// 默认情况下,如果任务扔到了延迟队列中,有两个策略
    // 如果任务是周期性执行的,默认为false(continueExistingPeriodicTasksAfterShutdown)
    // 如果任务是一次性的延迟任务,默认为true(executeExistingDelayedTasksAfterShutdown)
	
	// 此时,周期性任务返回false,一次性任务返回true
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
    
}

// 判断当前任务到底执行不执行
final boolean isRunningOrShutdown(boolean shutdownOK) {
    // 重新拿到线程池的ctl
    int rs = runStateOf(ctl.get());
    // 如果线程池是RUNNING,返回true
    // 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
    return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}



// 是否需要创建线程
void ensurePrestart() {
    // 获取线程池中的工作线程个数。
    int wc = workerCountOf(ctl.get());
    // 如果工作线程个数,小于核心线程数,
    if (wc < corePoolSize)
        // 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
        addWorker(null, true);
    // 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
    else if (wc == 0)
        // 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
        addWorker(null, false);
}

scheduleAtFixedRate()

scheduleAtFixedRate 在包装 ScheduledFutureTask 时会将 period 设置为正数,代表固定周期执行

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
        sft.outerTask = t;
        // 延迟执行任务
        delayedExecute(t);
        return t;
    }

scheduleWithFixedDelay()

scheduleWithFixedDelay 在包装 ScheduledFutureTask 时会将 period 设置为负数,代表在执行任务完毕后,再计算下次执行的时间

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
        sft.outerTask = t;
        // 延迟执行任务
        delayedExecute(t);
        return t;
    }

run()

执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。

// 执行任务
public void run() {
    // 获取任务是否是周期执行
    // true:周期执行
    // false:一次的延迟执行
    boolean periodic = isPeriodic();
    // 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
    if (!canRunInCurrentRunState(periodic))
        // 取消任务
        cancel(false);
    else if (!periodic)
        // 当前任务是一次性的延迟执行。执行任务具体的run方法
        ScheduledFutureTask.super.run();
        // 周期性任务
    else if (ScheduledFutureTask.super.runAndReset()) {
    			// 计算下次任务运行时间
                setNextRunTime();
                // 重新将任务扔到延迟队列中
                reExecutePeriodic(outerTask);
            }
}


// 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
private void setNextRunTime() {
    // 拿到当前任务的period
    long p = period;
    // period > 0:At
    if (p > 0)
        // 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
        time = time + p;
    else
        // period < 0:With
        // 任务执行完,拿当前系统时间计算下次执行的时间点
        time = now() + p;
}

// 重新将任务扔到延迟队列中
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    // 线程池状态的判断
    if (canRunInCurrentRunState(true)) {
        // 将任务扔到了延迟队列中
        super.getQueue().add(task);
        // 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            // 是否需要创建线程
            ensurePrestart();
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340