HOOOS

C++20 协程与无锁工作窃取:自研轻量级 Actor 模型的底层架构与实现

0 4 CppExplorer C20协程Actor模型
Apple

在现代多核架构中,传统的基于锁和多线程的并发模型常常面临着上下文切换开销大、死锁风险、以及线程阻碍等性能瓶颈。Actor 模型通过引入“无共享内存、通过消息传递进行通信”的隔离机制,提供了一种天然安全的并发范式。

借助 C++20 协程(Coroutines) 的挂起与恢复能力,我们可以彻底消除传统 Actor 模型中每个 Actor 占用一个独立物理线程的昂贵开销。结合无锁工作窃取(Work-Stealing)调度器,能够让成千上万个轻量级 Actor 共享极少数的物理内核。

本文将深入探讨如何将 C++20 协程与无锁 Chase-Lev 队列结合,构建一个工业级、高性能的轻量级 Actor 模型底层架构。


1. 架构整体设计蓝图

要实现一个无锁且高效的协程 Actor 模型,我们需要理清三个核心组件的关系:

+--------------------------------------------------------+
|                      Scheduler                         |
|  +------------------+         +------------------+     |
|  |   Work Thread 1  |         |   Work Thread 2  |     |
|  |  +------------+  |  steal  |  +------------+  |     |
|  |  | LocalDeque | <-----------  | LocalDeque |  |     |
|  |  +------------+  |         |  +------------+  |     |
|  +--------+---------+         +--------+---------+     |
+-----------|----------------------------|---------------+
            | resume                     | resume
            v                            v
+-----------------------+    +-----------------------+
|        Actor A        |    |        Actor B        |
|  +-----------------+  |    |  +-----------------+  |
|  | Lock-free MPSC  |  |    |  | Lock-free MPSC  |  |
|  |     Mailbox     |  |    |  |     Mailbox     |  |
|  +-----------------+  |    |  +-----------------+  |
|  +-----------------+  |    |  +-----------------+  |
|  | Coroutine State |  |    |  | Coroutine State |  |
|  +-----------------+  |    |  +-----------------+  |
+-----------------------+    +-----------------------+
  1. Actor 实体:封装了自身状态,拥有一个无锁的单消费者多生产者(MPSC)邮箱(Mailbox)。Actor 运行逻辑表现为一个 C++20 协程,当邮箱无消息时挂起,有消息时由调度器唤醒。
  2. 无锁工作窃取调度器(Scheduler):管理一个线程池。每个工作线程拥有一个私有的双端队列(Deque)。线程优先处理本地队列的任务(LIFO,保证缓存局部性),空闲时从其他线程的队列尾部窃取任务(FIFO,降低冲突)。
  3. 协程驱动器(Task / Promise):定制 C++20 的 promise_type,将协程的生命周期、挂起点的控制权完全交由调度器和 Actor 邮箱接管。

2. 协程驱动器:ActorTask 的定制

首先,我们需要定义 C++20 协程的返回类型。这个类型需要承载协程句柄,并能够与调度器无缝对接。

#include <coroutine>
#include <exception>
#include <iostream>
#include <utility>

class Scheduler;

// 协程任务包装器
struct [[nodiscard]] ActorTask {
    struct promise_type {
        ActorTask get_return_object() {
            return ActorTask{std::coroutine_handle<promise_type>::from_promise(*this)};
        }
        std::initial_suspend_type initial_suspend() noexcept { 
            // 初始挂起,交由调度器决定何时开始执行
            return std::suspend_always{}; 
        }
        std::final_suspend_type final_suspend() noexcept { 
            return std::suspend_always{}; 
        }
        void unhandled_exception() { std::terminate(); }
        void return_void() noexcept {}
    };

    std::coroutine_handle<promise_type> handle;

    ActorTask(std::coroutine_handle<promise_type> h) : handle(h) {}
    ~ActorTask() {
        if (handle) handle.destroy();
    }
    ActorTask(const ActorTask&) = delete;
    ActorTask& operator=(const ActorTask&) = delete;
    ActorTask(ActorTask&& other) noexcept : handle(std::exchange(other.handle, nullptr)) {}
    ActorTask& operator=(ActorTask&& other) noexcept {
        if (this != &other) {
            if (handle) handle.destroy();
            handle = std::exchange(other.handle, nullptr);
        }
        return *this;
    }
};

3. 核心机制:MPSC Mailbox 与 Awaiter 阻断设计

Actor 之间的通信是通过传递消息实现的。为了保证并发安全且高性能,Actor 的邮箱必须采用无锁的 Multi-Producer Single-Consumer (MPSC) 队列。

当 Actor 的协程调用 co_await mailbox.receive() 时:

  • 如果邮箱中有消息,直接消费,不发生挂起。
  • 如果邮箱为空,协程挂起,Actor 进入休眠状态。一旦其他 Actor 向其发送消息,发送方在将消息入队后,负责将该 Actor 的协程句柄重新推入调度器。

下面是基于无锁链表实现的 MPSC 邮箱以及配套的 Awaiter 实现:

#include <atomic>
#include <optional>

template <typename Msg>
class Mailbox {
    struct Node {
        Msg msg;
        std::atomic<Node*> next{nullptr};
    };

    alignas(64) std::atomic<Node*> head{nullptr};
    alignas(64) std::atomic<Node*> tail{nullptr};
    alignas(64) std::atomic<bool> is_active{false}; // 标识 Actor 是否已在调度器中激活

public:
    Mailbox() {
        // 哑节点哨兵,简化无锁队列设计
        Node* sentinel = new Node();
        head.store(sentinel, std::memory_order_relaxed);
        tail.store(sentinel, std::memory_order_relaxed);
    }

    ~Mailbox() {
        while (Node* old_head = head.load(std::memory_order_relaxed)) {
            head.store(old_head->next.load(std::memory_order_relaxed), std::memory_order_relaxed);
            delete old_head;
        }
    }

    // 多个其他 Actor 调用此接口发送消息
    void push(Msg msg) {
        Node* new_node = new Node{std::move(msg), nullptr};
        // 交换 tail,建立单向链表连接
        Node* prev_tail = tail.exchange(new_node, std::memory_order_acq_rel);
        prev_tail->next.store(new_node, std::memory_order_release);
    }

    // 单个消费者消费消息(非阻塞)
    std::optional<Msg> try_pop() {
        Node* h = head.load(std::memory_order_acquire);
        Node* n = h->next.load(std::memory_order_acquire);
        if (n != nullptr) {
            head.store(n, std::memory_order_release);
            Msg msg = std::move(n->msg);
            delete h;
            return msg;
        }
        return std::nullopt;
    }

    // 判断是否能激活该 Actor(避免重复入队调度器)
    bool try_activate() {
        bool expected = false;
        return is_active.compare_exchange_strong(expected, true, std::memory_order_acq_rel);
    }

    void deactivate() {
        is_active.store(false, std::memory_order_release);
    }
};

接下来构建协程等待对象 ReceiveAwaiter

template <typename Msg>
struct ReceiveAwaiter {
    Mailbox<Msg>& mailbox;
    std::optional<Msg> result;

    ReceiveAwaiter(Mailbox<Msg>& m) : mailbox(m) {}

    // 如果队列里直接有数据,则不需要挂起
    bool await_ready() noexcept {
        result = mailbox.try_pop();
        return result.has_value();
    }

    // 挂起时的回调:保存协程句柄
    void await_suspend(std::coroutine_handle<> h) noexcept {
        // 将协程句柄挂接在 Actor 的状态中,以便后续消息到达时唤醒
        // 在轻量级实现中,通常 Actor 本身持有此句柄。
    }

    Msg await_resume() noexcept {
        return std::move(*result);
    }
};

4. 极致多核吞吐:Chase-Lev 无锁工作窃取队列

为了最大化 CPU 利用率,我们需要为调度器配置无锁工作窃取(Work-Stealing)算法。其中最著名的便是 Chase-Lev Deque

Chase-Lev 队列是一个单生产者、多消费者的无锁双端队列:

  • 本地线程(Owner):在队列头部(bottom)执行 pushpop,使用 LIFO 顺序,极高地利用了 L1/L2 缓存。
  • 窃取线程(Stealer):在队列尾部(top)执行 steal,采用 FIFO 顺序,降低了与本地线程的竞争。
#include <vector>
#include <atomic>

class ChaseLevDeque {
    struct CircularArray {
        size_t capacity;
        std::vector<std::coroutine_handle<>> buffer;

        CircularArray(size_t cap) : capacity(cap), buffer(cap) {}

        void put(size_t index, std::coroutine_handle<> val) {
            buffer[index % capacity] = val;
        }

        std::coroutine_handle<> get(size_t index) {
            return buffer[index % capacity];
        }

        CircularArray* grow(size_t b, size_t t) {
            CircularArray* new_array = new CircularArray(capacity * 2);
            for (size_t i = t; i < b; ++i) {
                new_array->put(i, get(i));
            }
            return new_array;
        }
    };

    alignas(64) std::atomic<size_t> top{0};
    alignas(64) std::atomic<size_t> bottom{0};
    alignas(64) std::atomic<CircularArray*> array;

public:
    ChaseLevDeque(size_t init_cap = 1024) {
        array.store(new CircularArray(init_cap), std::memory_order_relaxed);
    }

    ~ChaseLevDeque() {
        delete array.load(std::memory_order_relaxed);
    }

    // 本地线程压入任务
    void push(std::coroutine_handle<> task) {
        size_t b = bottom.load(std::memory_order_relaxed);
        size_t t = top.load(std::memory_order_acquire);
        CircularArray* a = array.load(std::memory_order_relaxed);

        if (b - t >= a->capacity - 1) { // 队列满,扩容
            CircularArray* new_array = a->grow(b, t);
            delete a;
            array.store(new_array, std::memory_order_relaxed);
            a = new_array;
        }

        a->put(b, task);
        std::atomic_thread_fence(std::memory_order_seq_cst);
        bottom.store(b + 1, std::memory_order_release);
    }

    // 本地线程弹出任务
    std::coroutine_handle<> pop() {
        size_t b = bottom.load(std::memory_order_relaxed) - 1;
        CircularArray* a = array.load(std::memory_order_relaxed);
        bottom.store(b, std::memory_order_relaxed);
        std::atomic_thread_fence(std::memory_order_seq_cst);
        size_t t = top.load(std::memory_order_relaxed);

        std::coroutine_handle<> task = nullptr;
        if (t <= b) {
            task = a->get(b);
            if (t == b) { // 竞争最后一个元素
                if (!top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
                    task = nullptr; // 被 Stealer 抢走了
                }
                bottom.store(b + 1, std::memory_order_relaxed);
            }
        } else {
            bottom.store(b + 1, std::memory_order_relaxed);
        }
        return task;
    }

    // 外部线程(Stealer)窃取任务
    std::coroutine_handle<> steal() {
        size_t t = top.load(std::memory_order_acquire);
        std::atomic_thread_fence(std::memory_order_seq_cst);
        size_t b = bottom.load(std::memory_order_acquire);

        if (t < b) {
            CircularArray* a = array.load(std::memory_order_consume);
            std::coroutine_handle<> task = a->get(t);
            if (!top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
                return nullptr; // 并发窃取失败,退回
            }
            return task;
        }
        return nullptr;
    }
};

5. 调度器核心实现:WorkStealingScheduler

基于 Chase-Lev 队列,我们可以构建一个多核调度器。各个线程通过环路尝试本地 Pop、全局队列获取、以及邻居 Steal:

#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>

class WorkStealingScheduler {
    size_t thread_count;
    std::vector<std::unique_ptr<ChaseLevDeque>> local_queues;
    std::vector<std::thread> workers;
    std::atomic<bool> stop_flag{false};

    static inline thread_local int thread_id{-1};

public:
    WorkStealingScheduler(size_t threads = std::thread::hardware_concurrency()) 
        : thread_count(threads) {
        for (size_t i = 0; i < thread_count; ++i) {
            local_queues.push_back(std::make_unique<ChaseLevDeque>());
        }
    }

    ~WorkStealingScheduler() {
        stop();
    }

    void start() {
        for (size_t i = 0; i < thread_count; ++i) {
            workers.emplace_back([this, i]() {
                thread_id = static_cast<int>(i);
                this->run_loop(i);
            });
        }
    }

    void stop() {
        stop_flag.store(true, std::memory_order_release);
        for (auto& worker : workers) {
            if (worker.joinable()) worker.join();
        }
    }

    // 提交任务到当前线程,或轮询分发
    void schedule(std::coroutine_handle<> handle) {
        if (thread_id != -1) {
            local_queues[thread_id]->push(handle);
        } else {
            // 外部非 Worker 线程提交,默认扔到 0 号队列
            local_queues[0]->push(handle);
        }
    }

private:
    void run_loop(size_t id) {
        while (!stop_flag.load(std::memory_order_relaxed)) {
            std::coroutine_handle<> task = local_queues[id]->pop();

            if (!task) {
                // 本地队列无任务,尝试从其他线程队列偷取
                for (size_t offset = 1; offset < thread_count; ++offset) {
                    size_t victim = (id + offset) % thread_count;
                    task = local_queues[victim]->steal();
                    if (task) break;
                }
            }

            if (task) {
                task.resume(); // 执行或恢复协程
            } else {
                std::this_thread::yield(); // 极简退让机制(可配合条件变量实现被动挂起)
            }
        }
    }
};

6. 构建 Actor 实体:将协程与邮箱融合

最终,我们将以上组件拼装成一个具体的 Actor 基类。

#include <string>

template <typename Msg>
class Actor {
protected:
    Mailbox<Msg> mailbox;
    WorkStealingScheduler& scheduler;
    std::coroutine_handle<> coroutine_h{nullptr};

public:
    Actor(WorkStealingScheduler& sched) : scheduler(sched) {}

    virtual ~Actor() = default;

    // 接收并处理消息(由用户派生类实现具体的协程主体)
    virtual ActorTask run() = 0;

    // 向该 Actor 异步发送消息
    void send(Msg msg) {
        mailbox.push(std::move(msg));
        // 如果该 Actor 当前没在调度队列中,将其推入调度器
        if (mailbox.try_activate()) {
            if (coroutine_h) {
                scheduler.schedule(coroutine_h);
            }
        }
    }

    void register_handle(std::coroutine_handle<> h) {
        coroutine_h = h;
    }

protected:
    // 供派生协程 await 的接收操作
    struct ReceiveAwaiterInternal {
        Actor& self;
        std::optional<Msg> result;

        bool await_ready() {
            result = self.mailbox.try_pop();
            return result.has_value();
        }

        void await_suspend(std::coroutine_handle<> h) {
            // 保存句柄
            self.register_handle(h);
            // 挂起前去激活,允许后续的 send 能够重新将本 Actor 激活入队
            self.mailbox.deactivate();
            
            // 双重检查,避免在此期间有新消息漏掉
            std::optional<Msg> double_check = self.mailbox.try_pop();
            if (double_check.has_value()) {
                if (self.mailbox.try_activate()) {
                    self.scheduler.schedule(h);
                }
            }
        }

        Msg await_resume() {
            return std::move(*result);
        }
    };

    ReceiveAwaiterInternal receive() {
        return ReceiveAwaiterInternal{*this};
    }
};

7. 业务代码演示:高性能 Actor 协同工作

下面我们模拟一个极简的计算 Actor:接收整数,进行平方运算并打印。

#include <chrono>

struct Message {
    int payload;
    // 简单模拟,实际应用中可以利用 std::variant 携带丰富类型
};

class CalculatorActor : public Actor<Message> {
public:
    using Actor<Message>::Actor;

    ActorTask run() override {
        while (true) {
            // 使用 co_await 异步挂起,不占用底层系统线程
            Message msg = co_await receive();
            if (msg.payload < 0) {
                std::cout << "[Actor] 收到退出信号,终止运行。\n";
                co_return; 
            }
            int res = msg.payload * msg.payload;
            std::cout << "[Actor] 处理线程 ID: " << std::this_thread::get_id()
                      << " | 计算平方: " << msg.payload << " -> " << res << "\n";
        }
    }
};

int main() {
    WorkStealingScheduler scheduler(4); // 开启 4 个工作线程
    scheduler.start();

    CalculatorActor calc(scheduler);
    
    // 开启 Actor 协程(此时会立刻在 initial_suspend 处挂起)
    ActorTask task = calc.run();
    calc.register_handle(task.handle);

    // 派发消息
    std::cout << "[Main] 派发任务到 Actor 邮箱\n";
    calc.send(Message{10});
    calc.send(Message{20});
    calc.send(Message{30});

    std::this_thread::sleep_for(std::chrono::milliseconds(200));

    // 发送退出信号
    calc.send(Message{-1});

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    scheduler.stop();
    return 0;
}

8. 性能优化要点(工业级实现需要考虑的问题)

在实际工业界落地该模型时,还需要针对以下细节进行微调:

  1. 缓存行伪共享(False Sharing)
    Mailbox 以及 ChaseLevDeque 的原子索引中,我们引入了 alignas(64)。这确保了跨核心读写的原子变量(例如 topbottom)不会落入同一个 CPU L1 Cache Line,消除了多核高速往复同步缓存造成的吞吐率雪崩。
  2. 轻量级退让(Parking/Unparking)
    在调度器 run_loop 中,如果全局任务都为空,频繁的空转会消耗大量的 CPU 资源。可以引入无锁原子计数器记录 active_workers 的数量,配合自旋后进入条件变量挂起,以实现高效节能。
  3. 内存屏障(Fences)排序
    Chase-Lev Deque 对内存屏障的使用极为严苛。在 push 操作中,std::atomic_thread_fence(std::memory_order_seq_cst) 保证了写入 buffer 的操作一定会先于 bottom 索引的更新可见。这是防止“Stealer”线程读到越界脏数据的关键。

结论

C++20 协程为重塑 C++ 领域的并发设计开启了新篇章。通过定制 Awaiterpromise_type,我们成功地将协程机制融入了经典的 Actor 并发范式;同时,结合 Chase-Lev 工作窃取调度算法,使系统能够在保持高吞吐的同时,实现极细粒度的任务分配。该方案具备无锁设计的所有优势,非常适合作为现代高性能网络服务、游戏引擎架构、或是分布式计算框架的底层支撑。

点评评价

captcha
健康