https://zhuanlan.zhihu.com/p/138886949
Memory barrier跟cache的实现有很强的相关性, 掌握cache的实现硬件对理解memory barrier很有帮助. 以基本的MESI协议为例, 它主要实现了4种状态:
MESI保证一条cache line在任意时刻最多只有一个owner, 这样所有的写操作都是串行的, 而不会出现同一时刻有多个owner的情况, 那样不同的CPU就可能读到不同的值了.
如果只是上面的架构, 那么从硬件角度就比较容易做到访问cache的顺序和程序顺序一致, 从而不用额外的memory barrier. 但是上面的实现会有性能问题, 所以又引人了2样东西:
加上上面的store buffers和invalidate queue之后, 系统变成这样了:
这样就相当于在读写两端都引人了另外一层缓冲, memory barrier也就有了必要.
先考虑store buffer引入的问题, 如果在CPU0上执行以下指令可能出现什么情况?
a = 1;
b = 1;
虽然program order是先写a再写b, 因为store buffer存在, 可能会出现:
上面的步骤可以看出, 在2-3之间的窗口, 对其它cpu而言, b的可见性早于a, 也就是说其它cpu读到b = 1时, 可能还看不到a的修改. 这样我们就需要增加write memory barrier来保证其它cpu至少有办法保证先看到a的修改. 这里的有办法指的是其他cpu也需要配合. 我们把代码修改成这样:
a = 1;
smp_wmb();
b = 1;
那么write memory barrier怎么实现呢? 暴力点的可以在b写cacheline之前, 把所有store buffer给flush完, 确保其它cpu先收到invalidate消息 (收到该消息就表示那个cpu的数据已经更新), 轻量级一点的可以把b也放到store buffer去, 按照先进先出的顺序写cache.
再考虑invalidate queue引入的问题, cpu0上依然执行上面加过的代码, 在cpu1上执行如下代码:
while (b == 0) continue;
assert(a == 1);
考虑如下场景:
为了确保能读到a的新值, 进行如下修改:
while (b == 0) continue;
smp_rmb();
assert(a == 1);
那么read memory barrier可以怎么实现呢? 暴力点可以在读a前, 把invalidate queue中的所有消息都apply一遍.
既然有了read/write memory barrier, 那是不是一起使用就是一个full memory barrier了呢?
smp_wmb + smp_rmb != smp_mb
注意write memory barrier只管#StoreStore的顺序, read memory barrier只管#LoadLoad的顺序, full memory barrier还要管#StoreLoad和#LoadStore的顺序, 比如在x86上, write memory barrier和read memory barrier分别都可以为空, 但是full memory barrier却不能.
一般来说, full memory barrier会同时flush store buffer和apply invalidate queue. 为什么x86上 (还有其他的架构比如spark) 不允许#StoreStore, #LoadLoad, #LoadStore乱序, 却单独允许#StoreLoad出现? 大胆猜测一下, 要防止#StoreLoad乱序, 需要在每次load的时候隐式的flush store buffer, 这个代价非常大, 而其他类型的开销则比较小, 特别地, invalidate queue有可能实现的非常快, 比如hash, 甚至不使用invalidate queue?
这个是之前我们使用的内核上碰到的一个问题, 后来定位到是upstream已经解了的一个memory barrier的问题.
/* Use either while holding wait_queue_head_t::lock or when used for wakeups
* with an extra smp_mb() like:
*
* CPU0 - waker CPU1 - waiter
*
* for (;;) {
* @cond = true; prepare_to_wait(&wq, &wait, state);
* smp_mb(); // smp_mb() from set_current_state()
* if (waitqueue_active(wq)) if (@cond)
* wake_up(wq); break;
* schedule();
* }
* finish_wait(&wq, &wait);
*
* Because without the explicit smp_mb() it's possible for the
* waitqueue_active() load to get hoisted over the @cond store such that we'll
* observe an empty wait list while the waiter might not observe @cond.
*
* Also note that this 'optimization' trades a spin_lock() for an smp_mb(),
* which (when the lock is uncontended) are of roughly equal cost.
*/
static inline int waitqueue_active(wait_queue_head_t *q)
{
return !list_empty(&q->task_list);
}
问题出现在调用这个函数的地方:
unlock_new_inode(struct inode *inode)
{
lockdep_annotate_inode_mutex_key(inode);
spin_lock(&inode->i_lock);
WARN_ON(!(inode->i_state & I_NEW));
inode->i_state &= ~I_NEW; // <-- @cond
smp_mb(); // <-- missing
wake_up_bit(&inode->i_state, __I_NEW); // <-- waitqueue_active()
spin_unlock(&inode->i_lock);
}