如何解决出于排序的目的,原子读-修改-写是一种操作还是两种操作?
考虑原子读-修改-写操作,例如x.exchange(...,std::memory_order_acq_rel)
。出于对其他对象的加载和存储进行排序的目的,这是否被视为:
-
具有获取-释放语义的单个操作?
如果它是 #2,那么尽管在加载之前或存储之后不能对同一线程中的其他操作进行重新排序,但仍然存在在两者之间重新排序的可能性。
举一个具体的例子,考虑:
x
std::atomic<int> x,y;
void thread_A() {
x.exchange(1,std::memory_order_acq_rel);
y.store(1,std::memory_order_relaxed);
}
void thread_B() {
// These two loads cannot be reordered
int yy = y.load(std::memory_order_acquire);
int xx = x.load(std::memory_order_acquire);
std::cout << xx << "," << yy << std::endl;
}
是否可以输出 thread_B
?
如果将 0,1
替换为 x.exchange()
,那么 x.store(1,std::memory_order_release);
肯定可以输出 thread_B
。 0,1
中的额外隐式负载是否应该排除这种情况?
cppreference 听起来像 #1 是这种情况,而 exchange()
是被禁止的:
具有这种内存顺序的读-修改-写操作既是获取操作又是释放操作。在此存储之前或之后,无法对当前线程中的任何内存读取或写入进行重新排序。
但是我在标准中找不到任何明确的内容来支持这一点。实际上,该标准对原子读-修改-写操作几乎没有说明,除了 N4860 中的 31.4 (10) 这只是读取必须读取写入之前写入的最后一个值的明显属性。所以虽然我不想质疑 cppreference,但我想知道这是否真的正确。
我也在研究它是如何在 ARM64 上实现的。 gcc 和 clang 都将 0,1
编译为本质上
thread_A
(See on godbolt.) 根据我对 ARM64 语义的理解,以及一些测试(负载为 ldaxr [x]
stlxr #1,[x]
str #1,[y]
而不是存储),我认为 y
可以在之前变得可见str [y]
(当然不是在 stlxr [x]
之前)。这将使ldaxr
能够观察到thread_B
。所以如果 #1 是真的,那么 gcc 和 clang 似乎都是错误的,我不敢相信。
最后,据我所知,将 0,1
替换为 memory_order_acq_rel
不会改变此分析的任何内容,因为它只会增加与其他 seq_cst
操作相关的语义,并且我们这里没有。
我发现 What exact rules in the C++ memory model prevent reordering before acquire operations?,如果我理解正确,它似乎同意 #2 是正确的,并且可以观察到 seq_cst
。我仍然很感激您的确认,以及检查 cppreference 引用是否实际上是错误的或我是否误解了它。
解决方法
不是语言标准层面的答案,而是一些证据,在实践中,答案可以是“两个”。正如我在问题中猜测的那样,即使 RMW 为 seq_cst
,也可能发生这种情况。
我无法像在原始问题中那样观察到存储被重新排序,但这里有一个示例,它显示了原子 seq_cst
RMW 的存储被重新排序后的 relaxed
负载。
下面的程序是根据 What's are practical example where acquire release memory order differs from sequential consistency? 中 LWimsey 的示例改编的 Peterson 算法的实现。正如那里所解释的,该算法的正确版本涉及
me.store(true,std::memory_order_seq_cst);
if (other.load(std::memory_order_seq_cst) == false)
// lock taken
在商店之后负载变得可见是必不可少的。
如果 RMW 是用于排序语义的单个操作,我们希望这样做是安全的
me.exchange(true,std::memory_order_seq_cst);
if (other.load(std::memory_order_relaxed) == false) {
// Ensure critical section doesn't start until we know we have the lock
std::atomic_thread_fence(std::memory_order_seq_cst);
// lock taken
}
理论上,由于交换操作已经获得语义,因此在交换完成后加载必须变为可见,特别是在 true
到 me
的存储变为可见之后。
但实际上在 ARMv8-a 上,使用 gcc 或 clang,此类代码经常失败。看起来,实际上,exchange
确实包含一个获取加载和一个发布存储,并且 other.load
可能在发布存储之前变得可见。 (虽然不是在 exchange
的获取加载之前,但这在这里无关紧要。)
clang 生成如下代码:
mov w11,#1
retry:
ldaxrb wzr,[me]
stlxrb w12,w11,[me]
cbnz w12,retry
ldrb w11,[other]
参见 https://godbolt.org/z/fhjjn7,汇编输出的第 116-120 行。 (gcc 是相同的,但隐藏在库函数中。)通过 ARM64 内存排序语义,可以使用以下加载和存储对 release-store stlxrb
重新排序。它是排他性的这一事实并没有改变这一点。
为了更频繁地进行重新排序,我们将存储的数据安排为依赖于错过缓存的先前加载,我们通过使用 dc civac
逐出该行来确保这一点。我们还需要将两个标志 me
和 other
放在不同的缓存行上。否则,据我所知,即使线程 A 在存储之前进行加载,线程 B 也必须等到 A 的存储完成后才开始其 RMW,尤其是在 A 的存储可见之前不会进行加载。
在多核 Cortex A72 (Raspberry Pi 4B) 上,断言通常会在几乎是瞬时的几千次迭代后失败。
代码需要使用 -O2
构建。我怀疑如果为 ARMv8.2 或更高版本构建,swpalb
可用,它将无法工作。
// Based on https://stackoverflow.com/a/41859912/634919 by LWimsey
#include <thread>
#include <atomic>
#include <cassert>
// size that's at least as big as a cache line
constexpr size_t cache_line_size = 256;
static void take_lock(std::atomic<bool> &me,std::atomic<bool> &other) {
alignas(cache_line_size) bool uncached_true = true;
for (;;) {
// Evict uncached_true from cache.
asm volatile("dc civac,%0" : : "r" (&uncached_true) : "memory");
// So the release store to `me` may be delayed while
// `uncached_true` is loaded. This should give the machine
// time to proceed with the load of `other`,which is not
// forbidden by the release semantics of the store to `me`.
me.exchange(uncached_true,std::memory_order_seq_cst);
if (other.load(std::memory_order_relaxed) == false) {
// taken!
std::atomic_thread_fence(std::memory_order_seq_cst);
return;
}
// start over
me.store(false,std::memory_order_seq_cst);
}
}
static void drop_lock(std::atomic<bool> &me) {
me.store(false,std::memory_order_seq_cst);
}
alignas(cache_line_size) std::atomic<int> counter{0};
static void critical_section(void) {
// We should be the only thread inside here.
int tmp = counter.fetch_add(1,std::memory_order_seq_cst);
assert(tmp == 0);
// Delay to give the other thread a chance to try the lock
for (int i = 0; i < 100; i++)
asm volatile("");
tmp = counter.fetch_sub(1,std::memory_order_seq_cst);
assert(tmp == 1);
}
static void busy(std::atomic<bool> *me,std::atomic<bool> *other)
{
for (;;) {
take_lock(*me,*other);
std::atomic_thread_fence(std::memory_order_seq_cst); // paranoia
critical_section();
std::atomic_thread_fence(std::memory_order_seq_cst); // paranoia
drop_lock(*me);
}
}
// The two flags need to be on separate cache lines.
alignas(cache_line_size) std::atomic<bool> flag1{false},flag2{false};
int main()
{
std::thread t1(busy,&flag1,&flag2);
std::thread t2(busy,&flag2,&flag1);
t1.join(); // will never happen
t2.join();
return 0;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。