HOOOS

现代 C++ 极简实战:如何用 epoll 实现万级并发的 HTTP 服务器?

0 4 码农老麦 epoll网络编程
Apple

要让单台服务器撑住万级并发(C10K 问题),传统的“一连接一线程(Thread-per-connection)”模型会因为线程上下文切换和内存开销(每个线程默认栈空间 8MB)直接崩溃。

现代 Linux 服务端的标准解法是:非阻塞 I/O + I/O 多路复用(epoll)+ 事件驱动(Reactor 模式)

本文将带你用现代 C++(C++17/20)从零构建一个基于 epoll 边缘触发(Edge-Triggered)模式的简易 HTTP 服务器,并利用 RAII 机制管理生命周期,确保高并发下的资源安全。


一、 核心架构:Reactor 模式与边缘触发 (ET)

在高并发场景下,我们使用 Epoll 边缘触发 (ET) 模式。与水平触发 (LT) 相比,ET 模式下文件描述符状态变化时只会通知一次。这意味着:

  • 极致的性能:减少了 epoll_wait 的系统调用次数。
  • 更高的编码要求:必须一次性将缓冲区的数据读完/写完,直到返回 EAGAINEWOULDBLOCK 错误,否则会发生“漏事件”导致连接饥饿。

我们的架构采用单线程 Reactor 模型(对于简易服务器足够,后续易于扩展为 Multi-Reactor + 线程池):

[Client] ---> [epoll_wait (Event Loop)]
                     |
         +-----------+-----------+
         | (Read Event)          | (Write Event)
         v                       v
   [Read Buffer]           [Write Buffer]
         |                       ^
         v                       |
   [HTTP Parser] --------> [Response Gen]

二、 现代 C++ 的 RAII 资源封装

编写 C++ 网络程序,最忌讳的是 fd(文件描述符)泄露。我们利用 C++ 的析构函数自动释放资源,编写安全的 Socket 和 Epoll 包装类。

1. 优雅的 SafeFD 封装

#include <unistd.h>
#include <utility>

class SafeFD {
public:
    SafeFD() : fd_(-1) {}
    explicit SafeFD(int fd) : fd_(fd) {}
    ~SafeFD() { reset(); }

    // 禁止拷贝,允许移动 (Move-only)
    SafeFD(const SafeFD&) = delete;
    SafeFD& operator=(const SafeFD&) = delete;
    
    SafeFD(SafeFD&& other) noexcept : fd_(other.release()) {}
    SafeFD& operator=(SafeFD&& other) noexcept {
        if (this != &other) {
            reset(other.release());
        }
        return *this;
    }

    int get() const { return fd_; }
    bool valid() const { return fd_ != -1; }

    int release() {
        return std::exchange(fd_, -1);
    }

    void reset(int new_fd = -1) {
        if (fd_ != -1) {
            ::close(fd_);
        }
        fd_ = new_fd;
    }

private:
    int fd_;
};

2. Epoll 控制器封装

#include <sys/epoll.h>
#include <stdexcept>
#include <vector>

class Epoll {
public:
    Epoll() : epoll_fd_(::epoll_create1(EPOLL_CLOEXEC)) {
        if (!epoll_fd_.valid()) {
            throw std::runtime_error("Failed to create epoll");
        }
    }

    void add(int fd, uint32_t events) {
        epoll_event ev{};
        ev.events = events;
        ev.data.fd = fd;
        if (::epoll_ctl(epoll_fd_.get(), EPOLL_CTL_ADD, fd, &ev) < 0) {
            throw std::runtime_error("epoll_ctl add failed");
        }
    }

    void modify(int fd, uint32_t events) {
        epoll_event ev{};
        ev.events = events;
        ev.data.fd = fd;
        if (::epoll_ctl(epoll_fd_.get(), EPOLL_CTL_MOD, fd, &ev) < 0) {
            throw std::runtime_error("epoll_ctl mod failed");
        }
    }

    void remove(int fd) {
        ::epoll_ctl(epoll_fd_.get(), EPOLL_CTL_DEL, fd, nullptr);
    }

    int wait(std::vector<epoll_event>& active_events, int timeout_ms) {
        int nfds = ::epoll_wait(epoll_fd_.get(), active_events.data(), 
                               static_cast<int>(active_events.size()), timeout_ms);
        return nfds;
    }

private:
    SafeFD epoll_fd_;
};

三、 非阻塞网络 I/O 与事件循环

在 ET 模式下,我们要确保 Socket 是非阻塞的。

1. 设置非阻塞模式

#include <fcntl.h>

void set_nonblocking(int fd) {
    int flags = ::fcntl(fd, F_GETFL, 0);
    if (flags == -1) return;
    ::fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

2. 核心服务器骨架

我们将每个连接的状态抽象为 Connection 对象,使用 std::string_view 进行零拷贝解析。

#include <sys/socket.h>
#include <netinet/in.h>
#include <string>
#include <unordered_map>
#include <memory>
#include <iostream>

struct Connection {
    SafeFD fd;
    std::string read_buffer;
    std::string write_buffer;
    size_t write_idx = 0;
};

class HttpServer {
public:
    HttpServer(int port) : epoll_() {
        setup_listener(port);
    }

    void run() {
        std::vector<epoll_event> events(1024);
        while (true) {
            int nfds = epoll_.wait(events, -1);
            if (nfds < 0) {
                if (errno == EINTR) continue;
                break;
            }

            for (int i = 0; i < nfds; ++i) {
                int fd = events[i].data.fd;
                uint32_t revents = events[i].events;

                if (fd == listen_fd_.get()) {
                    handle_accept();
                } else {
                    if (revents & (EPOLLIN | EPOLLPRI)) {
                        handle_read(fd);
                    }
                    if (revents & EPOLLOUT) {
                        handle_write(fd);
                    }
                    if (revents & (EPOLLERR | EPOLLHUP)) {
                        close_connection(fd);
                    }
                }
            }
        }
    }

private:
    void setup_listener(int port) {
        int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
        if (fd < 0) throw std::runtime_error("socket creation failed");
        listen_fd_.reset(fd);

        int opt = 1;
        ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));

        sockaddr_in addr{};
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = INADDR_ANY;
        addr.sin_port = htons(port);

        if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0) {
            throw std::runtime_error("bind failed");
        }

        if (::listen(fd, SOMAXCONN) < 0) {
            throw std::runtime_error("listen failed");
        }

        epoll_.add(listen_fd_.get(), EPOLLIN);
    }

    void handle_accept() {
        while (true) {
            sockaddr_in client_addr;
            socklen_t client_len = sizeof(client_addr);
            int client_fd = ::accept4(listen_fd_.get(), reinterpret_cast<sockaddr*>(&client_addr), 
                                      &client_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
            if (client_fd < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break; // 所有挂起的连接已处理完
                }
                break;
            }

            connections_.emplace(client_fd, std::make_unique<Connection>(SafeFD(client_fd)));
            epoll_.add(client_fd, EPOLLIN | EPOLLET | EPOLLRDHUP);
        }
    }

    void handle_read(int fd) {
        auto it = connections_.find(fd);
        if (it == connections_.end()) return;
        auto& conn = it->second;

        char buf[4096];
        bool closed = false;
        while (true) {
            ssize_t n = ::recv(fd, buf, sizeof(buf), 0);
            if (n > 0) {
                conn->read_buffer.append(buf, n);
            } else if (n == 0) {
                closed = true; // 客户端关闭连接
                break;
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    break; // 数据读完了
                }
                if (errno == EINTR) continue;
                closed = true; // 发生真实错误
                break;
            }
        }

        if (closed) {
            close_connection(fd);
        } else {
            process_request(*conn);
        }
    }

    void handle_write(int fd) {
        auto it = connections_.find(fd);
        if (it == connections_.end()) return;
        auto& conn = it->second;

        while (conn->write_idx < conn->write_buffer.size()) {
            size_t to_write = conn->write_buffer.size() - conn->write_idx;
            ssize_t n = ::send(fd, conn->write_buffer.data() + conn->write_idx, to_write, 0);
            if (n > 0) {
                conn->write_idx += n;
            } else {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    return; // 写缓冲区满了,等待下一次可写事件
                }
                if (errno == EINTR) continue;
                close_connection(fd);
                return;
            }
        }

        // 数据写完后,在长连接中应重置状态并继续监听读事件
        if (conn->write_idx == conn->write_buffer.size()) {
            epoll_.modify(fd, EPOLLIN | EPOLLET | EPOLLRDHUP);
            conn->read_buffer.clear();
            conn->write_buffer.clear();
            conn->write_idx = 0;
        }
    }

    void close_connection(int fd) {
        epoll_.remove(fd);
        connections_.erase(fd); // 析构时 SafeFD 自动关闭 socket
    }

    void process_request(Connection& conn);

    Epoll epoll_;
    SafeFD listen_fd_;
    std::unordered_map<int, std::unique_ptr<Connection>> connections_;
};

四、 轻量级、零拷贝的 HTTP 解析与响应

为了榨干系统性能,避免不必要的内存拷贝,我们可以利用 C++17 的 std::string_view 来对缓冲区进行“滑动窗口式”的解析。

#include <string_view>

void HttpServer::process_request(Connection& conn) {
    std::string_view request(conn.read_buffer);
    
    // 寻找请求结束标记 (CRLF CRLF)
    size_t header_end = request.find("\r\n\r\n");
    if (header_end == std::string_view::npos) {
        // 请求头还不完整,继续等待接收
        return;
    }

    // 简单解析首行,例如 "GET /index.html HTTP/1.1"
    size_t first_line_end = request.find("\r\n");
    std::string_view first_line = request.substr(0, first_line_end);
    
    size_t method_end = first_line.find(' ');
    if (method_end == std::string_view::npos) return;
    std::string_view method = first_line.substr(0, method_end);

    size_t path_end = first_line.find(' ', method_end + 1);
    if (path_end == std::string_view::npos) return;
    std::string_view path = first_line.substr(method_end + 1, path_end - method_end - 1);

    // 构建一个极其简易的 HTTP 响应
    if (method == "GET") {
        std::string body = "<html><body><h1>Hello from Modern C++ Web Server!</h1></body></html>";
        conn.write_buffer = "HTTP/1.1 200 OK\r\n"
                            "Content-Type: text/html; charset=utf-8\r\n"
                            "Content-Length: " + std::to_string(body.size()) + "\r\n"
                            "Connection: keep-alive\r\n"
                            "\r\n" + body;
        
        // 注册可写事件,准备发送数据
        epoll_.modify(conn.fd.get(), EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP);
    }
}

五、 如何突破万级并发?(系统级调优)

只写出上述代码,在默认的 Linux 环境下压测是绝对无法达到万级并发的。因为操作系统默认设置了诸多限制。

要想真正支撑 C10K+,必须进行以下系统调优:

1. 突破文件描述符限制 (nofile)

Linux 默认每个进程的最大打开文件数是 1024

# 查看限制
ulimit -n

# 临时修改为 100 万
ulimit -n 1000000

若要永久生效,需要修改 /etc/security/limits.conf

* soft nofile 1000000
* hard nofile 1000000

2. 调整系统全局文件描述符上限

sysctl -w fs.file-max=2000000

3. 优化 TCP 半连接与全连接队列

  • SYN Queue (半连接队列): 调大 /proc/sys/net/ipv4/tcp_max_syn_backlog,防止高并发时 SYN 包被直接丢弃。
  • Accept Queue (全连接队列): listen(fd, backlog) 中的 backlog 参数决定了全连接队列大小。我们需要同步调大内核参数:
    sysctl -w net.core.somaxconn=32768
    

4. 端口释放与重用 (解决 TIME_WAIT 积压)

在高并发短连接压测下,客户端主动断开连接会产生大量的 TIME_WAIT 状态套接字,占用本地端口。
打开快速回收和重用选项:

sysctl -w net.ipv4.tcp_tw_reuse=1

结语

在实际工业级的高性能网络库(如 muduo、seastar、workflow)中,还需要引入以下高级特性:

  1. Timer Wheel (时间轮 / 最小堆):用于剔除心跳超时的死连接。
  2. Thread Pool (线程池):计算密集型的 HTTP 业务逻辑不能直接在 Reactor 线程执行,否则会阻塞整个事件循环,需要派发给 Worker 线程。
  3. 零拷贝优化:在发送静态文件时,使用 sendfile 系统调用避免用户态与内核态之间的数据拷贝。

基于现代 C++ 的 RAII 包装和 epoll 机制,我们用极其简短优雅的代码就搭起了高性能服务器的骨架。这正是现代 C++ 在系统级开发中的魅力所在。

点评评价

captcha
健康