在现代多核架构中,传统的基于锁和多线程的并发模型常常面临着上下文切换开销大、死锁风险、以及线程阻碍等性能瓶颈。Actor 模型通过引入“无共享内存、通过消息传递进行通信”的隔离机制,提供了一种天然安全的并发范式。
借助 C++20 协程(Coroutines) 的挂起与恢复能力,我们可以彻底消除传统 Actor 模型中每个 Actor 占用一个独立物理线程的昂贵开销。结合无锁工作窃取(Work-Stealing)调度器,能够让成千上万个轻量级 Actor 共享极少数的物理内核。
本文将深入探讨如何将 C++20 协程与无锁 Chase-Lev 队列结合,构建一个工业级、高性能的轻量级 Actor 模型底层架构。
1. 架构整体设计蓝图
要实现一个无锁且高效的协程 Actor 模型,我们需要理清三个核心组件的关系:
+--------------------------------------------------------+
| Scheduler |
| +------------------+ +------------------+ |
| | Work Thread 1 | | Work Thread 2 | |
| | +------------+ | steal | +------------+ | |
| | | LocalDeque | <----------- | LocalDeque | | |
| | +------------+ | | +------------+ | |
| +--------+---------+ +--------+---------+ |
+-----------|----------------------------|---------------+
| resume | resume
v v
+-----------------------+ +-----------------------+
| Actor A | | Actor B |
| +-----------------+ | | +-----------------+ |
| | Lock-free MPSC | | | | Lock-free MPSC | |
| | Mailbox | | | | Mailbox | |
| +-----------------+ | | +-----------------+ |
| +-----------------+ | | +-----------------+ |
| | Coroutine State | | | | Coroutine State | |
| +-----------------+ | | +-----------------+ |
+-----------------------+ +-----------------------+
- Actor 实体:封装了自身状态,拥有一个无锁的单消费者多生产者(MPSC)邮箱(Mailbox)。Actor 运行逻辑表现为一个 C++20 协程,当邮箱无消息时挂起,有消息时由调度器唤醒。
- 无锁工作窃取调度器(Scheduler):管理一个线程池。每个工作线程拥有一个私有的双端队列(Deque)。线程优先处理本地队列的任务(LIFO,保证缓存局部性),空闲时从其他线程的队列尾部窃取任务(FIFO,降低冲突)。
- 协程驱动器(Task / Promise):定制 C++20 的
promise_type,将协程的生命周期、挂起点的控制权完全交由调度器和 Actor 邮箱接管。
2. 协程驱动器:ActorTask 的定制
首先,我们需要定义 C++20 协程的返回类型。这个类型需要承载协程句柄,并能够与调度器无缝对接。
#include <coroutine>
#include <exception>
#include <iostream>
#include <utility>
class Scheduler;
// 协程任务包装器
struct [[nodiscard]] ActorTask {
struct promise_type {
ActorTask get_return_object() {
return ActorTask{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::initial_suspend_type initial_suspend() noexcept {
// 初始挂起,交由调度器决定何时开始执行
return std::suspend_always{};
}
std::final_suspend_type final_suspend() noexcept {
return std::suspend_always{};
}
void unhandled_exception() { std::terminate(); }
void return_void() noexcept {}
};
std::coroutine_handle<promise_type> handle;
ActorTask(std::coroutine_handle<promise_type> h) : handle(h) {}
~ActorTask() {
if (handle) handle.destroy();
}
ActorTask(const ActorTask&) = delete;
ActorTask& operator=(const ActorTask&) = delete;
ActorTask(ActorTask&& other) noexcept : handle(std::exchange(other.handle, nullptr)) {}
ActorTask& operator=(ActorTask&& other) noexcept {
if (this != &other) {
if (handle) handle.destroy();
handle = std::exchange(other.handle, nullptr);
}
return *this;
}
};
3. 核心机制:MPSC Mailbox 与 Awaiter 阻断设计
Actor 之间的通信是通过传递消息实现的。为了保证并发安全且高性能,Actor 的邮箱必须采用无锁的 Multi-Producer Single-Consumer (MPSC) 队列。
当 Actor 的协程调用 co_await mailbox.receive() 时:
- 如果邮箱中有消息,直接消费,不发生挂起。
- 如果邮箱为空,协程挂起,Actor 进入休眠状态。一旦其他 Actor 向其发送消息,发送方在将消息入队后,负责将该 Actor 的协程句柄重新推入调度器。
下面是基于无锁链表实现的 MPSC 邮箱以及配套的 Awaiter 实现:
#include <atomic>
#include <optional>
template <typename Msg>
class Mailbox {
struct Node {
Msg msg;
std::atomic<Node*> next{nullptr};
};
alignas(64) std::atomic<Node*> head{nullptr};
alignas(64) std::atomic<Node*> tail{nullptr};
alignas(64) std::atomic<bool> is_active{false}; // 标识 Actor 是否已在调度器中激活
public:
Mailbox() {
// 哑节点哨兵,简化无锁队列设计
Node* sentinel = new Node();
head.store(sentinel, std::memory_order_relaxed);
tail.store(sentinel, std::memory_order_relaxed);
}
~Mailbox() {
while (Node* old_head = head.load(std::memory_order_relaxed)) {
head.store(old_head->next.load(std::memory_order_relaxed), std::memory_order_relaxed);
delete old_head;
}
}
// 多个其他 Actor 调用此接口发送消息
void push(Msg msg) {
Node* new_node = new Node{std::move(msg), nullptr};
// 交换 tail,建立单向链表连接
Node* prev_tail = tail.exchange(new_node, std::memory_order_acq_rel);
prev_tail->next.store(new_node, std::memory_order_release);
}
// 单个消费者消费消息(非阻塞)
std::optional<Msg> try_pop() {
Node* h = head.load(std::memory_order_acquire);
Node* n = h->next.load(std::memory_order_acquire);
if (n != nullptr) {
head.store(n, std::memory_order_release);
Msg msg = std::move(n->msg);
delete h;
return msg;
}
return std::nullopt;
}
// 判断是否能激活该 Actor(避免重复入队调度器)
bool try_activate() {
bool expected = false;
return is_active.compare_exchange_strong(expected, true, std::memory_order_acq_rel);
}
void deactivate() {
is_active.store(false, std::memory_order_release);
}
};
接下来构建协程等待对象 ReceiveAwaiter:
template <typename Msg>
struct ReceiveAwaiter {
Mailbox<Msg>& mailbox;
std::optional<Msg> result;
ReceiveAwaiter(Mailbox<Msg>& m) : mailbox(m) {}
// 如果队列里直接有数据,则不需要挂起
bool await_ready() noexcept {
result = mailbox.try_pop();
return result.has_value();
}
// 挂起时的回调:保存协程句柄
void await_suspend(std::coroutine_handle<> h) noexcept {
// 将协程句柄挂接在 Actor 的状态中,以便后续消息到达时唤醒
// 在轻量级实现中,通常 Actor 本身持有此句柄。
}
Msg await_resume() noexcept {
return std::move(*result);
}
};
4. 极致多核吞吐:Chase-Lev 无锁工作窃取队列
为了最大化 CPU 利用率,我们需要为调度器配置无锁工作窃取(Work-Stealing)算法。其中最著名的便是 Chase-Lev Deque。
Chase-Lev 队列是一个单生产者、多消费者的无锁双端队列:
- 本地线程(Owner):在队列头部(
bottom)执行push和pop,使用 LIFO 顺序,极高地利用了 L1/L2 缓存。 - 窃取线程(Stealer):在队列尾部(
top)执行steal,采用 FIFO 顺序,降低了与本地线程的竞争。
#include <vector>
#include <atomic>
class ChaseLevDeque {
struct CircularArray {
size_t capacity;
std::vector<std::coroutine_handle<>> buffer;
CircularArray(size_t cap) : capacity(cap), buffer(cap) {}
void put(size_t index, std::coroutine_handle<> val) {
buffer[index % capacity] = val;
}
std::coroutine_handle<> get(size_t index) {
return buffer[index % capacity];
}
CircularArray* grow(size_t b, size_t t) {
CircularArray* new_array = new CircularArray(capacity * 2);
for (size_t i = t; i < b; ++i) {
new_array->put(i, get(i));
}
return new_array;
}
};
alignas(64) std::atomic<size_t> top{0};
alignas(64) std::atomic<size_t> bottom{0};
alignas(64) std::atomic<CircularArray*> array;
public:
ChaseLevDeque(size_t init_cap = 1024) {
array.store(new CircularArray(init_cap), std::memory_order_relaxed);
}
~ChaseLevDeque() {
delete array.load(std::memory_order_relaxed);
}
// 本地线程压入任务
void push(std::coroutine_handle<> task) {
size_t b = bottom.load(std::memory_order_relaxed);
size_t t = top.load(std::memory_order_acquire);
CircularArray* a = array.load(std::memory_order_relaxed);
if (b - t >= a->capacity - 1) { // 队列满,扩容
CircularArray* new_array = a->grow(b, t);
delete a;
array.store(new_array, std::memory_order_relaxed);
a = new_array;
}
a->put(b, task);
std::atomic_thread_fence(std::memory_order_seq_cst);
bottom.store(b + 1, std::memory_order_release);
}
// 本地线程弹出任务
std::coroutine_handle<> pop() {
size_t b = bottom.load(std::memory_order_relaxed) - 1;
CircularArray* a = array.load(std::memory_order_relaxed);
bottom.store(b, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
size_t t = top.load(std::memory_order_relaxed);
std::coroutine_handle<> task = nullptr;
if (t <= b) {
task = a->get(b);
if (t == b) { // 竞争最后一个元素
if (!top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
task = nullptr; // 被 Stealer 抢走了
}
bottom.store(b + 1, std::memory_order_relaxed);
}
} else {
bottom.store(b + 1, std::memory_order_relaxed);
}
return task;
}
// 外部线程(Stealer)窃取任务
std::coroutine_handle<> steal() {
size_t t = top.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
size_t b = bottom.load(std::memory_order_acquire);
if (t < b) {
CircularArray* a = array.load(std::memory_order_consume);
std::coroutine_handle<> task = a->get(t);
if (!top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
return nullptr; // 并发窃取失败,退回
}
return task;
}
return nullptr;
}
};
5. 调度器核心实现:WorkStealingScheduler
基于 Chase-Lev 队列,我们可以构建一个多核调度器。各个线程通过环路尝试本地 Pop、全局队列获取、以及邻居 Steal:
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
class WorkStealingScheduler {
size_t thread_count;
std::vector<std::unique_ptr<ChaseLevDeque>> local_queues;
std::vector<std::thread> workers;
std::atomic<bool> stop_flag{false};
static inline thread_local int thread_id{-1};
public:
WorkStealingScheduler(size_t threads = std::thread::hardware_concurrency())
: thread_count(threads) {
for (size_t i = 0; i < thread_count; ++i) {
local_queues.push_back(std::make_unique<ChaseLevDeque>());
}
}
~WorkStealingScheduler() {
stop();
}
void start() {
for (size_t i = 0; i < thread_count; ++i) {
workers.emplace_back([this, i]() {
thread_id = static_cast<int>(i);
this->run_loop(i);
});
}
}
void stop() {
stop_flag.store(true, std::memory_order_release);
for (auto& worker : workers) {
if (worker.joinable()) worker.join();
}
}
// 提交任务到当前线程,或轮询分发
void schedule(std::coroutine_handle<> handle) {
if (thread_id != -1) {
local_queues[thread_id]->push(handle);
} else {
// 外部非 Worker 线程提交,默认扔到 0 号队列
local_queues[0]->push(handle);
}
}
private:
void run_loop(size_t id) {
while (!stop_flag.load(std::memory_order_relaxed)) {
std::coroutine_handle<> task = local_queues[id]->pop();
if (!task) {
// 本地队列无任务,尝试从其他线程队列偷取
for (size_t offset = 1; offset < thread_count; ++offset) {
size_t victim = (id + offset) % thread_count;
task = local_queues[victim]->steal();
if (task) break;
}
}
if (task) {
task.resume(); // 执行或恢复协程
} else {
std::this_thread::yield(); // 极简退让机制(可配合条件变量实现被动挂起)
}
}
}
};
6. 构建 Actor 实体:将协程与邮箱融合
最终,我们将以上组件拼装成一个具体的 Actor 基类。
#include <string>
template <typename Msg>
class Actor {
protected:
Mailbox<Msg> mailbox;
WorkStealingScheduler& scheduler;
std::coroutine_handle<> coroutine_h{nullptr};
public:
Actor(WorkStealingScheduler& sched) : scheduler(sched) {}
virtual ~Actor() = default;
// 接收并处理消息(由用户派生类实现具体的协程主体)
virtual ActorTask run() = 0;
// 向该 Actor 异步发送消息
void send(Msg msg) {
mailbox.push(std::move(msg));
// 如果该 Actor 当前没在调度队列中,将其推入调度器
if (mailbox.try_activate()) {
if (coroutine_h) {
scheduler.schedule(coroutine_h);
}
}
}
void register_handle(std::coroutine_handle<> h) {
coroutine_h = h;
}
protected:
// 供派生协程 await 的接收操作
struct ReceiveAwaiterInternal {
Actor& self;
std::optional<Msg> result;
bool await_ready() {
result = self.mailbox.try_pop();
return result.has_value();
}
void await_suspend(std::coroutine_handle<> h) {
// 保存句柄
self.register_handle(h);
// 挂起前去激活,允许后续的 send 能够重新将本 Actor 激活入队
self.mailbox.deactivate();
// 双重检查,避免在此期间有新消息漏掉
std::optional<Msg> double_check = self.mailbox.try_pop();
if (double_check.has_value()) {
if (self.mailbox.try_activate()) {
self.scheduler.schedule(h);
}
}
}
Msg await_resume() {
return std::move(*result);
}
};
ReceiveAwaiterInternal receive() {
return ReceiveAwaiterInternal{*this};
}
};
7. 业务代码演示:高性能 Actor 协同工作
下面我们模拟一个极简的计算 Actor:接收整数,进行平方运算并打印。
#include <chrono>
struct Message {
int payload;
// 简单模拟,实际应用中可以利用 std::variant 携带丰富类型
};
class CalculatorActor : public Actor<Message> {
public:
using Actor<Message>::Actor;
ActorTask run() override {
while (true) {
// 使用 co_await 异步挂起,不占用底层系统线程
Message msg = co_await receive();
if (msg.payload < 0) {
std::cout << "[Actor] 收到退出信号,终止运行。\n";
co_return;
}
int res = msg.payload * msg.payload;
std::cout << "[Actor] 处理线程 ID: " << std::this_thread::get_id()
<< " | 计算平方: " << msg.payload << " -> " << res << "\n";
}
}
};
int main() {
WorkStealingScheduler scheduler(4); // 开启 4 个工作线程
scheduler.start();
CalculatorActor calc(scheduler);
// 开启 Actor 协程(此时会立刻在 initial_suspend 处挂起)
ActorTask task = calc.run();
calc.register_handle(task.handle);
// 派发消息
std::cout << "[Main] 派发任务到 Actor 邮箱\n";
calc.send(Message{10});
calc.send(Message{20});
calc.send(Message{30});
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// 发送退出信号
calc.send(Message{-1});
std::this_thread::sleep_for(std::chrono::milliseconds(100));
scheduler.stop();
return 0;
}
8. 性能优化要点(工业级实现需要考虑的问题)
在实际工业界落地该模型时,还需要针对以下细节进行微调:
- 缓存行伪共享(False Sharing):
在Mailbox以及ChaseLevDeque的原子索引中,我们引入了alignas(64)。这确保了跨核心读写的原子变量(例如top和bottom)不会落入同一个 CPU L1 Cache Line,消除了多核高速往复同步缓存造成的吞吐率雪崩。 - 轻量级退让(Parking/Unparking):
在调度器run_loop中,如果全局任务都为空,频繁的空转会消耗大量的 CPU 资源。可以引入无锁原子计数器记录active_workers的数量,配合自旋后进入条件变量挂起,以实现高效节能。 - 内存屏障(Fences)排序:
Chase-Lev Deque 对内存屏障的使用极为严苛。在push操作中,std::atomic_thread_fence(std::memory_order_seq_cst)保证了写入buffer的操作一定会先于bottom索引的更新可见。这是防止“Stealer”线程读到越界脏数据的关键。
结论
C++20 协程为重塑 C++ 领域的并发设计开启了新篇章。通过定制 Awaiter 与 promise_type,我们成功地将协程机制融入了经典的 Actor 并发范式;同时,结合 Chase-Lev 工作窃取调度算法,使系统能够在保持高吞吐的同时,实现极细粒度的任务分配。该方案具备无锁设计的所有优势,非常适合作为现代高性能网络服务、游戏引擎架构、或是分布式计算框架的底层支撑。