TimerQueue 是.NET中实现定时任务的核心组件,它是一个定时任务的管理器,负责存储和调度定时任务。它被用于实现很多 .NET 中的定时任务,比如 System.Threading.Timer、Task.Delay、CancellationTokenSource 等。
笔者将用两篇文章为大家介绍 TimerQueue 的实现原理,本篇文章将以 System.Threading.Timer 为入口,揭秘 TimerQueue 对定时任务基本单元 TimerQueueTimer
的管理和调度,下一篇文章将介绍 TimerQueue 又是如何通过 native timer 被触发的。
本文将基于.NET 7.0 源码揭秘 TimerQueue 的实现原理。
https://github.com/dotnet/runtime/blob/release/7.0/src/libraries/System.Private.CoreLib/src/System/Threading/Timer.cs
Timer 的使用方式非常简单,只需要创建一个 Timer 实例并通过构造函数指定回调函数和回调函数的参数即可。构造函数有几个不同的重载,可以用不同的时间单位指定 Timer 的首次到期时间 dueTime 和间隔时间 period,这边以 TimeSpan 为例:
new Timer(callback: TimerCallback, state: "Hello World", dueTime: TimeSpan.FromSeconds(1), period: TimeSpan.FromSeconds(2));
Console.WriteLine("Started timer at {0}", DateTime.Now);
void TimerCallback(object? state)
{
Console.WriteLine("TimerCallback: state = {0} at {1}", state, DateTime.Now);
}
Console.ReadKey();
输出结果如下:
Started timer at 2023/7/3 21:09:10
TimerCallback: state = Hello World at 2023/7/3 21:09:11
TimerCallback: state = Hello World at 2023/7/3 21:09:13
TimerCallback: state = Hello World at 2023/7/3 21:09:15
TimerCallback: state = Hello World at 2023/7/3 21:09:17
TimerCallback: state = Hello World at 2023/7/3 21:09:19
上面的代码创建了一个 Timer 实例,该 Timer 实例会在 1s 后第一次执行回调函数,之后每隔 2s 执行一次回调函数。
此外,Timer 还有一个 Change 方法,可以用来修改 Timer 的 dueTime 和 period。
Timer timer = new Timer(callback: TimerCallback, state: "Hello World", dueTime: TimeSpan.FromSeconds(1), period:
TimeSpan.FromSeconds(2));
timer.Change(dueTime: TimeSpan.FromSeconds(2), period: TimeSpan.FromSeconds(3));
Console.WriteLine("Started timer at {0}", DateTime.Now);
void TimerCallback(object? state)
{
Console.WriteLine("TimerCallback: state = {0} at {1}", state, DateTime.Now);
}
Console.ReadKey();
上面的代码通过 Change 方法修改了 Timer 的 dueTime 和 period,使得 Timer 会在 2s 后开始执行回调函数,每隔 3s 执行一次回调函数。
Timer 的实现主要由以下三个类组成:
TimerQueue 是核心的实现。
public sealed class Timer : MarshalByRefObject, IDisposable, IAsyncDisposable
{
internal TimerHolder _timer;
public Timer(TimerCallback callback,
object? state,
TimeSpan dueTime,
TimeSpan period)
{
long dueTm = (long)dueTime.TotalMilliseconds;
long periodTm = (long)period.TotalMilliseconds;
TimerSetup(callback, state, (uint)dueTm, (uint)periodTm);
}
private void TimerSetup(TimerCallback callback,
object? state,
uint dueTime,
uint period,
bool flowExecutionContext = true)
{
// 实际的任务会被封装到 TimerQueueTimer 中,并由 TimerHolder 管理其生命周期
_timer = new TimerHolder(new TimerQueueTimer(callback, state, dueTime, period, flowExecutionContext));
}
}
internal sealed class TimerQueueTimer : IThreadPoolWorkItem
{
internal TimerQueueTimer(TimerCallback timerCallback, object? state, uint dueTime, uint period, bool flowExecutionContext)
{
_timerCallback = timerCallback;
_state = state;
_dueTime = Timeout.UnsignedInfinite;
_period = Timeout.UnsignedInfinite;
if (flowExecutionContext)
{
_executionContext = ExecutionContext.Capture();
}
// 分配一个 TimerQueue
_associatedTimerQueue = TimerQueue.Instances[Thread.GetCurrentProcessorId() % TimerQueue.Instances.Length];
if (dueTime != Timeout.UnsignedInfinite)
Change(dueTime, period);
}
internal bool Change(uint dueTime, uint period, bool throwIfDisposed = true)
{
bool success;
lock (_associatedTimerQueue)
{
// ... 省略部分代码
// 将 TimerQueueTimer 添加到 TimerQueue 中
success = _associatedTimerQueue.UpdateTimer(this, dueTime, period);
}
return success;
}
void IThreadPoolWorkItem.Execute() => Fire(isThreadPool: true);
internal void Fire(bool isThreadPool = false)
{
// ... 省略部分代码
// 调用 TimerCallback
_timerCallback(_state);
// ... 省略部分代码
}
}
Timer 的任务会被封装到 TimerQueueTimer 中,并由 TimerHolder 管理其生命周期。
TimerQueue 是实现定时任务的核心,它负责存储和调度 TimerQueueTimer 实例。
TimerQueueTimer 是 TimerQueue 的基本任务单元,它封装了待执行的任务。TimerQueueTimer 实现了 IThreadPoolWorkItem 接口,可以交给线程池调度执行。
internal sealed class TimerQueueTimer : IThreadPoolWorkItem
{
// 绑定的 TimerQueue
private readonly TimerQueue _associatedTimerQueue;
// TimerQueueTimer 被存储在 TimerQueue 中,_next 和 _prev 用于构成 TimerQueue 的双向链表
internal TimerQueueTimer? _next;
internal TimerQueueTimer? _prev;
// 是否是保存在 TimerQueue 的 shortTimers 链表中
internal bool _short;
// 此次任务的开始时间
internal long _startTicks;
internal uint _dueTime;
internal uint _period;
// 回调函数
private readonly TimerCallback _timerCallback;
// 回调函数的参数
private readonly object? _state;
// 绑定的 ExecutionContext
private readonly ExecutionContext? _executionContext;
// ... 省略
TimerQueueTimer 按照创建目的不同,可以分为两类:
操作超时timer:这些timer被频繁创建和销毁,但几乎很少触发。这些timer的目的是仅在发生故障时触发。它们作为一种故障安全机制,允许系统检测和处理异常情况。比如用于检测某个http接口调用是否超时。
后台定时任务timer:这些timer被设计为在特定间隔或特定时间触发。与超时timer不同,这些timer是确实会触发的。
对超时timer而言,更多地是要考虑创建和销毁的性能。而对于后台定时任务timer,这类任务通常是长期运行的,触发上稍微多一些时间开销,就其总运行时间而言,是可以忽略不计的。
TimerQueue 的设计更多地是考虑创建和销毁的性能,而不是触发的性能,也就是更多地针对 TimerQueueTimer 的插入和删除操作进行优化。
因此 TimerQueue 将 TimerQueueTimer 存储在双向链表中,插入和删除都是 O(1) 的时间复杂度。
虽然 TimerQueue 更多地是考入插入和删除的性能,但是遍历链表的性能也是有被考虑的。
TimerQueue 使用 OS 提供的 native timer 来实现定时任务的调度,当 native timer 触发时,TimerQueue 会遍历链表,触发所有到期的 TimerQueueTimer。
为了提高遍历 Timer链表 的性能,TimerQueue 会将 TimerQueueTimer 按照到期时间进行分成了两组:shortTimers 和 longTimers。
TimerQueue 基于一个基本阈值维护了动态更新的参考时间点,在这个参考时间点之前到期的 TimerQueueTimer 会被分到 shortTimers 中,而在这个时间点之后到期的 TimerQueueTimer 会被分到 longTimers 中。
native timer 触发时,TimerQueue 会先遍历 shortTimers,如果遍历完 shortTimers 后,当前的时间还没到需要遍历 longTimers 的时间,那么 TimerQueue 会继续等待,直到当前时间到达需要遍历 longTimers 的时间,然后再遍历 longTimers。
通过对 TimerQueueTimer 分类避免了每次都需要遍历整个 TimerQueueTimer 链表,下文会更详细地介绍这个算法。
TimerQueue 的初始化是在 TimerQueue.Instances 属性的 getter 方法中完成的,TimerQueue.Instances 属性是一个静态属性,它会在第一次访问时初始化。
TimerQueue 的数量是根据当前机器的 CPU 核心数来决定的,每个 TimerQueue 对应一个 CPU 核心。
internal sealed class TimerQueue: IThreadPoolWorkItem
{
public static TimerQueue[] Instances { get; } = CreateTimerQueues();
private static TimerQueue[] CreateTimerQueues()
{
var queues = new TimerQueue[Environment.ProcessorCount];
for (int i = 0; i < queues.Length; i++)
{
queues[i] = new TimerQueue(i);
}
return queues;
}
}
internal sealed class TimerQueueTimer: IThreadPoolWorkItem
{
internal TimerQueueTimer(TimerCallback timerCallback, object? state, uint dueTime, uint period, bool flowExecutionContext)
{
_timerCallback = timerCallback;
_state = state;
_dueTime = Timeout.UnsignedInfinite;
_period = Timeout.UnsignedInfinite;
if (flowExecutionContext)
{
_executionContext = ExecutionContext.Capture();
}
// 每个 CPU 核心对应一个 TimerQueue, TimerQueueTimer 根据当前线程所在的 CPU 核心来决定加入哪个 TimerQueue
_associatedTimerQueue = TimerQueue.Instances[Thread.GetCurrentProcessorId() % TimerQueue.Instances.Length];
if (dueTime != Timeout.UnsignedInfinite)
Change(dueTime, period);
}
internal bool Change(uint dueTime, uint period, bool throwIfDisposed = true)
{
bool success;
lock (_associatedTimerQueue)
{
if (_canceled)
return throwIfDisposed ? throw new ObjectDisposedException(null, SR.ObjectDisposed_Generic) : false;
_period = period;
// 如果 dueTime == Timeout.UnsignedInfinite,表示 TimerQueueTimer 被取消,需要从 TimerQueue 中移除
if (dueTime == Timeout.UnsignedInfinite)
{
_associatedTimerQueue.DeleteTimer(this);
success = true;
}
else
{
// 表示 TimerQueueTimer 加入 TimerQueue 中或更新 TimerQueueTimer 在 TimerQueue 中的位置及到期时间
success = _associatedTimerQueue.UpdateTimer(this, dueTime, period);
}
}
return success;
}
}
在 TimerQueueTimer 关联上 TimerQueue 后,接着就是调用 TimerQueue 的 UpdateTimer 方法,将 TimerQueueTimer 加入到 TimerQueue 中,或者更新 TimerQueueTimer 在 TimerQueue 中的位置及到期时间。
TimerQueue.UpdateTimer 被调用的时机有两个:
TimerQueue 维护了一个参考时间点 currentAbsoluteThreshold,到期时间在这个时间点之前的 TimerQueueTimer 会被分到 shortTimers 中,而到期时间在这个时间点之后的
TimerQueueTimer 会被分到 longTimers 中。
TimerQueue 定义了一个阈值 ShortTimersThresholdMilliseconds = 333ms,参考时间点 currentAbsoluteThreshold 的初始值就是 TimerQueue 创建时间 +
这个阈值 333ms。每次 shortTimers 被遍历完一轮后,currentAbsoluteThreshold 会被更新为当前时间点 + 333ms。
为方便理解,下文用参考时间点来代指 currentAbsoluteThreshold。
internal sealed class TimerQueue: IThreadPoolWorkItem
{
// 系统启动时间作为基准时间,这边每个平台的实现略有不同
https://github.com/dotnet/runtime/blob/release/7.0/src/libraries/System.Private.CoreLib/src/System/Threading/TimerQueue.Unix.cs
https://github.com/dotnet/runtime/blob/release/7.0/src/libraries/System.Private.CoreLib/src/System/Threading/TimerQueue.Windows.cs
private static long TickCount64 => Environment.TickCount64;
// 决定是否将 TimerQueueTimer 添加到 shortTimers 中的阈值,333ms
private const int ShortTimersThresholdMilliseconds = 333;
// 参考时间点,到期时间在这个时间点之前的 TimerQueueTimer 会被分到 shortTimers 中,而到期时间在这个时间点之后的 TimerQueueTimer 会被分到 longTimers 中
private long _currentAbsoluteThreshold = TickCount64 + ShortTimersThresholdMilliseconds;
// shortTimers 和 longTimers 都是双向链表
private TimerQueueTimer? _shortTimers;
private TimerQueueTimer? _longTimers;
public bool UpdateTimer(TimerQueueTimer timer, uint dueTime, uint period)
{
long nowTicks = TickCount64;
// dueTime 表示 TimerQueueTimer 离到期的时间间隔
// 比较 TimerQueueTimer 的到期时间和 _currentAbsoluteThreshold,决定是否将 TimerQueueTimer 添加到 shortTimers 中
long absoluteDueTime = nowTicks + dueTime;
bool shouldBeShort = _currentAbsoluteThreshold - absoluteDueTime >= 0;
if (timer._dueTime == Timeout.UnsignedInfinite)
{
// _dueTime == Timeout.UnsignedInfinite 表示 timer 未被添加到 TimerQueue 中,只需要直接添加即可
timer._short = shouldBeShort;
LinkTimer(timer);
++ActiveCount;
}
else if (timer._short != shouldBeShort) // _short 表示 timer 是否需要被添加到 shortTimers 中
{
// 如果 timer 已经被添加到 TimerQueue 中,
// 但是 timer 应该被添加到另外一个链表中,
// 需要先将 TimerQueueTimer 从原来的链表中移除,然后再添加到新的链表中
UnlinkTimer(timer);
timer._short = shouldBeShort;
LinkTimer(timer);
}
timer._dueTime = dueTime;
timer._period = (period == 0) ? Timeout.UnsignedInfinite : period;
timer._startTicks = nowTicks;
// 确保 native timer 在 TimerQueueTimer 的到期时间之前触发
return EnsureTimerFiresBy(dueTime);
}
}
TimerQueue 中的 shortTimers 和 longTimers 都是双向链表,TimerQueueTimer 被添加到 TimerQueue 中时,会使用头插法将 TimerQueueTimer 添加到 shortTimers 或 longTimers 中。
TimerQueue 对 TimerQueueTimer 的更新操作分为三种:
internal sealed class TimerQueue: IThreadPoolWorkItem
{
private TimerQueueTimer? _shortTimers;
private TimerQueueTimer? _longTimers;
public long ActiveCount { get; private set; }
private void LinkTimer(TimerQueueTimer timer)
{
// _short 表示 timer 是否需要被添加到 shortTimers 中
ref TimerQueueTimer? listHead = ref timer._short ? ref _shortTimers : ref _longTimers;
// 使用头插法将 timer 添加到 shortTimers 或 longTimers 中
timer._next = listHead;
if (timer._next != null)
{
timer._next._prev = timer;
}
timer._prev = null;
listHead = timer;
}
private void UnlinkTimer(TimerQueueTimer timer)
{
// 如果 timer 不是 shortTimers 或 longTimers 的尾节点,需要更新 timer 后一个节点的 prev 指针
TimerQueueTimer? t = timer._next;
if (t != null)
{
t._prev = timer._prev;
}
// 如果 timer 是 shortTimers 或 longTimers 的头结点,需要更新 shortTimers 或 longTimers 的头结点
if (_shortTimers == timer)
{
_shortTimers = t;
}
else if (_longTimers == timer)
{
_longTimers = t;
}
// 如果 timer 不是 shortTimers 或 longTimers 的头结点,需要更新 timer 前一个节点的 next 指针
t = timer._prev;
if (t != null)
{
t._next = timer._next;
}
}
public void DeleteTimer(TimerQueueTimer timer)
{
if (timer._dueTime != Timeout.UnsignedInfinite)
{
--ActiveCount;
UnlinkTimer(timer);
timer._prev = null;
timer._next = null;
timer._dueTime = Timeout.UnsignedInfinite;
timer._period = Timeout.UnsignedInfinite;
timer._startTicks = 0;
timer._short = false;
}
}
}
遍历算法的关键是对 TimerQueue 中的 TimerQueueTimer 的分类处理。
从下面两个维度对 TimerQueueTimer 进行分类:
TimerQueue 绑定的 native timer 到期后,会调用 TimerQueue 的 FireNextTimers 方法,FireNextTimers 方法会先遍历 shortTimers,如果当前时间大于参考时间点
_currentAbsoluteThreshold,则会继续遍历 longTimers。
遍历 shortTimers 的过程中需要判断是一次性的 TimerQueueTimer 还是周期性的 TimerQueueTimer。
遍历 longTimers 的过程中,会有两种情况:
TimerQueueTimer 的回调实际在哪触发分为两种情况:
下面是 TimerQueue 的 FireNextTimers 的大致流程图:
internal sealed class TimerQueue: IThreadPoolWorkItem
{
// 将 TimerQueueTimer 从 shortTimers 或 longTimers 中移除,然后添加到另外一个队列中
public void MoveTimerToCorrectList(TimerQueueTimer timer, bool shortList)
{
UnlinkTimer(timer);
timer._short = shortList;
LinkTimer(timer);
}
private void FireNextTimers()
{
// 第一次个 TimerQueueTimer 在当前线程执行,其他会被添加到线程池中
TimerQueueTimer? timerToFireOnThisThread = null;
lock (this)
{
_isTimerScheduled = false;
bool haveTimerToSchedule = false;
uint nextTimerDuration = uint.MaxValue;
long nowTicks = TickCount64;
// 分两次遍历,第一次遍历 shortTimers,遍历完 shortTimers 后,
// 如果当前时间大于参考时间点 _currentAbsoluteThreshold,则再遍历 longTimers
TimerQueueTimer? timer = _shortTimers;
for (int listNum = 0; listNum < 2; listNum++) // short == 0, long == 1
{
while (timer != null)
{
TimerQueueTimer? next = timer._next;
long elapsed = nowTicks - timer._startTicks;
long remaining = timer._dueTime - elapsed;
if (remaining <= 0)
{
// 将当前 timer 标记为待触发状态
timer._everQueued = true;
if (timer._period != Timeout.UnsignedInfinite)
{
// 如果是周期性的 TimerQueueTimer,需要将 timer 重新添加到 shortTimers 或 longTimers 中
// 如果 周期 period 设置的过小,代码执行到这里的时候,下一次触发的时间已经过去了,就设置下一次触发的时间为 1ms
timer._startTicks = nowTicks;
long elapsedForNextDueTime = elapsed - timer._dueTime;
timer._dueTime = (elapsedForNextDueTime < timer._period) ?
timer._period - (uint)elapsedForNextDueTime :
1;
// 在遍历 shortTimers 的过程中,选出最小的 nextTimerDuration,用于后面重新绑定 native timer
if (timer._dueTime < nextTimerDuration)
{
haveTimerToSchedule = true;
nextTimerDuration = timer._dueTime;
}
// 周期性的 TimerQueueTimer 会因为下次到期时间的变化而在 shortTimers 和 longTimers 之间移动
bool targetShortList = (nowTicks + timer._dueTime) - _currentAbsoluteThreshold <= 0;
if (timer._short != targetShortList)
{
MoveTimerToCorrectList(timer, targetShortList);
}
}
else
{
// 如果不是周期性的 TimerQueueTimer,直接将 timer 从 TimerQueue 中移除
DeleteTimer(timer);
}
// 第一个 TimerQueueTimer 在当前线程执行,其他会被添加到线程池中
if (timerToFireOnThisThread == null)
{
timerToFireOnThisThread = timer;
}
else
{
ThreadPool.UnsafeQueueUserWorkItemInternal(timer, preferLocal: false);
}
}
else
{
if (remaining < nextTimerDuration)
{
haveTimerToSchedule = true;
nextTimerDuration = (uint)remaining;
}
// !timer._short 表示目前遍历的是 longTimers,如果 remaining <= ShortTimersThresholdMilliseconds,
if (!timer._short && remaining <= ShortTimersThresholdMilliseconds)
{
MoveTimerToCorrectList(timer, shortList: true);
}
}
timer = next;
}
// shortTimers 已经遍历完,判断是否需要遍历 longTimers
if (listNum == 0)
{
// 计算当前时间和参考时间点 _currentAbsoluteThreshold 之间的时间差 remaining
// 如果 remaining > 0,说明还不需要遍历 longTimers
long remaining = _currentAbsoluteThreshold - nowTicks;
if (remaining > 0)
{
if (_shortTimers == null && _longTimers != null)
// 因为没有 shortTimers 了,下次遍历 longTimers 时,需要重新计算 nextTimerDuration
// 因为没办法确定准确的 nextTimerDuration,所以这里直接设置为 remaining
// +1 的延迟是为了能在下次触发时, 是为了顺带处理掉 [_currentAbsoluteThreshold, _currentAbsoluteThreshold + 1] 之间的 TimerQueueTimer
// 避免这些 TimerQueueTimer 延后到下次触发时才被处理
之间的 TimerQueueTimer
nextTimerDuration = (uint)remaining + 1;
haveTimerToSchedule = true;
}
break;
}
// 切换到 longTimers
timer = _longTimers;
// 更新参考时间点 _currentAbsoluteThreshold
_currentAbsoluteThreshold = nowTicks + ShortTimersThresholdMilliseconds;
}
}
// 重新绑定native timer
if (haveTimerToSchedule)
{
EnsureTimerFiresBy(nextTimerDuration);
}
}
// 此次遍历的第一个 TimerQueueTimer 在执行 FireNativeTimer 的线程上执行,减少线程切换
// 其他 TimerQueueTimer 在 ThreadPool 线程上执行
timerToFireOnThisThread?.Fire();
}
}
Task.Delay 和 用于超时取消的 CancellationTokenSource 也都是基于 TimerQueue 实现的。
Task.Delay 返回的 Task 是对 TimerQueueTimer 的封装,当 TimerQueueTimer 触发时,Task.Delay 返回的 Task 会被设置为完成状态。
超时取消的 CancellationTokenSource 也是对 TimerQueueTimer 的封装,当 TimerQueueTimer 触发时,CancellationTokenSource 会被取消。
欢迎关注个人技术公众号