任务调度之时间轮实现

任务调度,时间,实现 · 浏览次数 : 148

小编点评

**代码生成说明** * **时间格** (链表,维护了这个时刻可能需要执行的所有任务) * **队列** (用于存储执行的任务,并根据执行状态进行删除) * **排版** (用于管理时间格、队列、执行任务等,以提高代码可读性) **代码生成步骤** 1. **定义时间格**链表,用于存储执行的任务 2. **定义队列**,用于存储执行的任务 3. **定义排版**,用于管理时间格、队列、执行任务等 4. **创建**时间格链表,用于存储执行的任务 5. **创建**队列,用于存储执行的任务 6. **创建**排版,用于管理时间格、队列、执行任务等 7. **将**任务添加到**队列** 8. **从****队列**中获取**任务** 9. **执行**任务 10. **删除**已完成的任务 11. **更新**时间格链表,以管理执行的任务

正文

前言

在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,如果用程序员的视角看,这就是一个个的定时任务,在日常的开发工作中也有很多的定时任务场景:

  1. 数仓系统凌晨进行的数据同步
  2. 订单12小时未支付的状态校验
  3. rpc调用超时时间的校验
  4. 缓存数据失效时间的延长
  5. 定时开启的促销活动
  6. ……

假如现在有一个任务需要3s后执行,你会如何实现?

简单点,直接一个线程的休眠,thread.sleep(3000),一行代码就能达到目的,但是性能嘛……,由于每个任务都需要一个单独的线程,当系统中存在大量任务,

任务调度

假如,现在有一个任务需要3s后执行,你会如何实现呢?

简单点,直接一个休眠,让线程sleep 3s,不就达到目的了吗?但是性能嘛……,由于每个任务都需要一个单独的线程,在系统中存在大量任务时,这种方案的消耗是极其巨大的,那么如何实现高效的调度呢?大佬们低头看了一眼手表,一个算法出现了

时间轮的数据结构

如图所示,这就是时间轮的一个基础结构,一个存储了定时任务的环形队列,可以理解为一个时间钟,队列的每个节点称为时间槽,每个槽位又使用列表存储着需要执行的定时任务。和生活中的钟表运行机制一样,每隔固定的单位时间,就会从一个槽位跳到下一个槽位,就像秒针跳动了一次,再取出当前槽位的任务进行执行。假如固定单位时间为1S,当前槽位位2,如果需要插入一个3S后的任务,就会在槽位5的的列表里加上当前任务。等指针运行到第五个槽位时,取出任务执行就可以了。

时间轮的最大优势是在时间复杂度上的优势,一个任务简单的生命周期:

  1. 创建任务,插入到数据结构中。
  2. 查询任务,找到满足条件的任务
  3. 执行任务。
  4. 任务归档,从任务调度的列表中删出。

其中第三步的执行时间是固定的,所以1,2,4这三部就的时间复杂度就决定了整个任务调度流程的复杂度,而时间轮是链式存储结构,所以在增删和查询时,时间复杂度都是0(1),其他常见的任务调度算法例如最小堆和红黑树以及跳表。

最小堆是一颗完全二叉树而且子节点的值总是大于等于父节点的值,所以在插入时候需要判断父节点的关系,它的时间添加操作时间复杂度是O(logn),在任务执行时,只需要判断最顶节点就行,所以它的查询时间复杂度时哦O(1)。

根据红黑树的特性已经被归纳法证明它的增加的时间复杂度是O(logn),查找最小节点的时间复杂度位O(h)。

跳表的的本质是实现二分查找法的有序链表,但是他有多个层级,和红黑树的高度值相似,它的时间复杂度也是O(logn)

高级时间轮

如上图所示,如果一个刻度代表1S,那么一个周期就是1分钟,但是如果我一个任务是在3分钟后执行呢,如果是在一个12小时后执行呢?

当然如果是单纯的增加环形链表的长度也是可以的,直接扩大到3600*24,一天一个周期,直接放进来。但是还有更好的办法。

带轮次标记的任务

任务执行轮次的计算公式:((任务执行时间-当前时间)/固定单位时间)%槽位数量

根据槽位计算公式可以算出当前任务需要插入执行的轮次,我在任务上面加一个字段round,当每次执行到该槽位时,就遍历该槽位的任务列表,每个任务的round-1,取出来round=0的任务执行就行。

for(Task task:taskList){
  int round= task.getRound();
   round=(round-1);
   task.setRound(round);
   if(round==0){
     doTask(task);
   }
}

如果任务间隔不是很大,看起来也是不错的一种解决方式。

但是工作中有很多任务,延迟执行的时间是很久以后的,例如延保履约服务成功之后会有一个7天自动完成的定时任务,甚至有一些几年后才会执行的任务,如果都用round来处理的话,那这个round将会变的非常大的一个数字,也会在任务列表中插入很多当前不需要执行的任务,如果每次都执行上面的逻辑,显然会浪费大量的资源。

多层时间轮

多层时间轮的核心思想是:

就上上图的水表,有很多小的表盘,但是每个表盘的刻度其实是不一样,又或者手表里的时分秒或者日历上的年月日。

针对时间复杂度的问题:不做遍历计算round,只要到了当前槽位,就把任务列表的所有任务拿出来执行。

针对空间复杂度的问题:分层,每个层级的时间轮刻度不一样,多个时间轮协调工作。

如上图所示,第一次时间轮,每个刻度是1ms,一轮是20ms,第二个层时间轮的刻度是20ms,一轮就是400ms,第三层的刻度是400ms,一轮就是8000ms,每层的周期就等于 20ms *2的n次方。这要使用多层级时间轮就可以很容易把任务区分开来。每当高层次时间轮到达当前节点,就把任务降级到低层级的时间轮上。对于400ms的时间轮来说,小于1ms和小于399ms的任务都是过期任务,只要不大于400ms,都认为是过期任务。

代码实现的话,往上也有很多,最近比较火热的POWER-JOB的分布式调度框架就是才有的时间轮算法,粘贴下核心代码大家看下:

1.首先定义了一个任务接口

public interface TimerTask extends Runnable {
}

2.调度中的任务对象

public interface TimerFuture {

    /**
     * 获取实际要执行的任务
     * @return
     */
    TimerTask getTask();

    /**
     * 取消任务
     * @return
     */
    boolean cancel();

    /**
     * 任务是否取消
     * @return
     */
    boolean isCancelled();

    /**
     * 任务是否完成
     * @return
     */
    boolean isDone();
}

3.调度器接口

public interface Timer {

    /**
     * 调度定时任务
     */
    TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);

    /**
     * 停止所有调度任务
     */
    Set<TimerTask> stop();
}

4.时间轮的实现

public class HashedWheelTimer implements Timer {

    private final long tickDuration;
    private final HashedWheelBucket[] wheel;
    private final int mask;

    private final Indicator indicator;

    private final long startTime;

    private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
    private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();

    private final ExecutorService taskProcessPool;

    public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
        this(tickDuration, ticksPerWheel, 0);
    }

    /**
     * 新建时间轮定时器
     * @param tickDuration 时间间隔,单位毫秒(ms)
     * @param ticksPerWheel 轮盘个数
     * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)
     */
    public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {

        this.tickDuration = tickDuration;

        // 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余
        int ticksNum = CommonUtils.formatSize(ticksPerWheel);
        wheel = new HashedWheelBucket[ticksNum];
        for (int i = 0; i < ticksNum; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        mask = wheel.length - 1;

        // 初始化执行线程池
        if (processThreadNum <= 0) {
            taskProcessPool = null;
        }else {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
            // 这里需要调整一下队列大小
            BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
            int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
            // 基本都是 io 密集型任务
            taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
                    60, TimeUnit.SECONDS,
                    queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
        }

        startTime = System.currentTimeMillis();

        // 启动后台线程
        indicator = new Indicator();
        new Thread(indicator, "HashedWheelTimer-Indicator").start();
    }

    @Override
    public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {

        long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
        HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);

        // 直接运行到期、过期任务
        if (delay <= 0) {
            runTask(timerFuture);
            return timerFuture;
        }

        // 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列)
        waitingTasks.add(timerFuture);
        return timerFuture;
    }

    @Override
    public Set<TimerTask> stop() {
        indicator.stop.set(true);
        taskProcessPool.shutdown();
        while (!taskProcessPool.isTerminated()) {
            try {
                Thread.sleep(100);
            }catch (Exception ignore) {
            }
        }
        return indicator.getUnprocessedTasks();
    }

    /**
     * 包装 TimerTask,维护预期执行时间、总圈数等数据
     */
    private final class HashedWheelTimerFuture implements TimerFuture {

        // 预期执行时间
        private final long targetTime;
        private final TimerTask timerTask;

        // 所属的时间格,用于快速删除该任务
        private HashedWheelBucket bucket;
        // 总圈数
        private long totalTicks;
        // 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消
        private int status;

        // 状态枚举值
        private static final int WAITING = 0;
        private static final int RUNNING = 1;
        private static final int FINISHED = 2;
        private static final int CANCELED = 3;

        public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {

            this.targetTime = targetTime;
            this.timerTask = timerTask;
            this.status = WAITING;
        }

        @Override
        public TimerTask getTask() {
            return timerTask;
        }

        @Override
        public boolean cancel() {
            if (status == WAITING) {
                status = CANCELED;
                canceledTasks.add(this);
                return true;
            }
            return false;
        }

        @Override
        public boolean isCancelled() {
            return status == CANCELED;
        }

        @Override
        public boolean isDone() {
            return status == FINISHED;
        }
    }

    /**
     * 时间格(本质就是链表,维护了这个时刻可能需要执行的所有任务)
     */
    private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {

        public void expireTimerTasks(long currentTick) {

            removeIf(timerFuture -> {

                // processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
                if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
                    return true;
                }

                if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
                    log.warn("[HashedWheelTimer] impossible, please fix the bug");
                    return true;
                }

                // 本轮直接调度
                if (timerFuture.totalTicks <= currentTick) {

                    if (timerFuture.totalTicks < currentTick) {
                        log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
                    }

                    try {
                        // 提交执行
                        runTask(timerFuture);
                    }catch (Exception ignore) {
                    } finally {
                        timerFuture.status = HashedWheelTimerFuture.FINISHED;
                    }
                    return true;
                }

                return false;
            });

        }
    }

    private void runTask(HashedWheelTimerFuture timerFuture) {
        timerFuture.status = HashedWheelTimerFuture.RUNNING;
        if (taskProcessPool == null) {
            timerFuture.timerTask.run();
        }else {
            taskProcessPool.submit(timerFuture.timerTask);
        }
    }

    /**
     * 模拟时针转动的线程
     */
    private class Indicator implements Runnable {

        private long tick = 0;

        private final AtomicBoolean stop = new AtomicBoolean(false);
        private final CountDownLatch latch = new CountDownLatch(1);

        @Override
        public void run() {

            while (!stop.get()) {

                // 1. 将任务从队列推入时间轮
                pushTaskToBucket();
                // 2. 处理取消的任务
                processCanceledTasks();
                // 3. 等待指针跳向下一刻
                tickTack();
                // 4. 执行定时任务
                int currentIndex = (int) (tick & mask);
                HashedWheelBucket bucket = wheel[currentIndex];
                bucket.expireTimerTasks(tick);

                tick ++;
            }
            latch.countDown();
        }

        /**
         * 模拟指针转动,当返回时指针已经转到了下一个刻度
         */
        private void tickTack() {

            // 下一次调度的绝对时间
            long nextTime = startTime + (tick + 1) * tickDuration;
            long sleepTime = nextTime - System.currentTimeMillis();

            if (sleepTime > 0) {
                try {
                    Thread.sleep(sleepTime);
                }catch (Exception ignore) {
                }
            }
        }

        /**
         * 处理被取消的任务
         */
        private void processCanceledTasks() {
            while (true) {
                HashedWheelTimerFuture canceledTask = canceledTasks.poll();
                if (canceledTask == null) {
                    return;
                }
                // 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理)
                if (canceledTask.bucket != null) {
                    canceledTask.bucket.remove(canceledTask);
                }
            }
        }

        /**
         * 将队列中的任务推入时间轮中
         */
        private void pushTaskToBucket() {

            while (true) {
                HashedWheelTimerFuture timerTask = waitingTasks.poll();
                if (timerTask == null) {
                    return;
                }

                // 总共的偏移量
                long offset = timerTask.targetTime - startTime;
                // 总共需要走的指针步数
                timerTask.totalTicks = offset / tickDuration;
                // 取余计算 bucket index
                int index = (int) (timerTask.totalTicks & mask);
                HashedWheelBucket bucket = wheel[index];

                // TimerTask 维护 Bucket 引用,用于删除该任务
                timerTask.bucket = bucket;

                if (timerTask.status == HashedWheelTimerFuture.WAITING) {
                    bucket.add(timerTask);
                }
            }
        }

        public Set<TimerTask> getUnprocessedTasks() {
            try {
                latch.await();
            }catch (Exception ignore) {
            }

            Set<TimerTask> tasks = Sets.newHashSet();

            Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
                if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
                    tasks.add(timerFuture.timerTask);
                }
            };

            waitingTasks.forEach(consumer);
            for (HashedWheelBucket bucket : wheel) {
                bucket.forEach(consumer);
            }
            return tasks;
        }
    }
}

作者:京东保险 陈建华

来源:京东云开发者社区

与任务调度之时间轮实现相似的内容:

任务调度之时间轮实现

在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,如果用程序员的视角看,这就是一个个的定时任务,在日常的开发工作中也有很多的定时任务场景

[转帖]被误解的CPU利用率、超线程、动态调频 —— CPU 性能之迷 Part 1

https://www.modb.pro/db/555820 引 TL;DR 这“引”部分写得有点多了,不喜直接跳到下一节。 性能测试、压力测试、业务系统性能容量评估。这 3 件事,可以认为是大部分程序员/软件开发从业者都需要面对的事。但,奇怪的是,很多人花了很多时间去做完成这些工作任务,却很少有人

解密Prompt系列32. LLM之表格理解任务-文本模态

这一章我们聊聊大模型表格理解任务,在大模型时代主要出现在包含表格的RAG任务,以及表格操作数据抽取文本对比等任务中。这一章先聊单一的文本模态,我们分别介绍微调和基于Prompt的两种方案。

[转帖]开发成长之路(21)-- 不可不知的操作系统知识(1)

https://cloud.tencent.com/developer/article/1879252?areaSource=&traceId= 文章目录 知识框图 操作系统的硬件环境 计算机系统的层次结构 操作系统主要作用 多道程序设计的基本概念 分时系统 实时系统 实时任务的类型 实时系统与分时

如何使用ChatGPT来自动化Python任务

1.概述 最近,比较火热的ChatGPT很受欢迎。今天,笔者为大家来介绍一下ChatGPT能做哪些事情。 2.内容 ChatGPT是一款由OpenAI开发的专门从事对话的AI聊天机器人。它的目标是让AI系统更加自然的与之交互,但它也可以在我们编写代码的时候提供一些帮助。 2.1 使用ChatGPT来

一种通用的业务监控触发方案设计

业务监控是指通过技术手段监控业务代码执行的最终结果或者状态是否符合预期,实现业务监控主要分成两步:一、在业务系统中选择节点发送消息触发业务监控;二、系统在接收到mq消息或者定时任务调度时,根据消息中或者任务中的业务数据查询业务执行的结果或状态并与业务预期的结果相对比。目前供销系统的方案如下:

.NET有哪些好用的定时任务调度框架

前言 定时任务调度的相关业务在日常工作开发中是一个十分常见的需求,经常有小伙伴们在技术群提问:有什么好用的定时任务调度框架推荐的?今天大姚给大家分享5个.NET开源、简单、易用、免费的任务调度框架,帮助大家在做定时任务调度框架技术选型的时候有一个参考。 以下开源任务调度收录地址:https://gi

Vue任务调度。

### 1、作用 vue中一个非常重要的功能,批量更新或者叫异步更新 响应式数据发生变化出发副作用函数重新执行时,我们有能力去决定副作用函数的执行时机、次数和方式。 ### 2、例子 ```javascript const state = reactive({ num: 1 }) effect(()

【面试题】多线程面试题总结

最近在看面试题,所以想用自己的理解总结一下,便于加深印象。 为什么使用多线程 使用多线程可以充分利用CPU,提高CPU的使用率。 提高系统的运行效率,对于一些复杂或者耗时的功能,可以对其进行拆分,比如将某个任务拆分了A、B、C三个子任务,如果子任务之间没有依赖关系,那么就可以使用多线程同时运行A、B

论文阅读: 面向Planning的端到端智驾Planning-oriented Autonomous Driving

设计一个更优的、可理解的、面向最终目标的框架。基于这个面向Planning的思想,他们提出了 Unified Autonomous Driving (UniAD)方案,一种新的自动驾驶框架。这个方案从全局视角出发,让智驾的各个模块特征提取可以互相补充,各个任务之间可以通过统一的查询接口通信。在此基础...