引言

Asio 是 C++ 网络编程的事实标准,但多数开发者将其视为“黑盒” —— 我们提交异步操作,等待回调触发,却很少关心事件如何从内核抵达用户态,调度器如何分发任务,以及跨平台抽象层究竟隐藏了哪些细节

本文从一个简单的回显服务例子开始,逐步解析异步接口的底层原理与 C++20 coroutine 的协程式异步适配

echo_server

前置

CompletionToken

Asio 对异步 op 就绪时(When)的完成回调,允许自定义其形式(How)和执行位置(Where)

flowchart TD
    async_op["Async operation"] -- 启动时关联 --> init_fn["Initiating\nfunction"]
    async_op -- 完成时调用 --> completion_hander["Completion\nhandler"]
    init_fn -- 绑定签名 --> completion_signature["Completion\nsignature"]
    init_fn -. 组装特化 .-> async_result["async_result\ntrait"]
    init_fn -. 绑定参数 .-> completion_token["Completion\ntoken"]
    completion_token -. 组装 .-> completion_hander

完成令牌:异步操作完成后调用的回调函数或约定

完成程序:待执行的完成回调子程序,通过完成令牌组装而成

以 asio::async_read() 异步读取接口为例,该接口的回调函数签名固定为 void(error_code, size_t),分别代表错误码和已读取字节数,根据完成令牌类型分为

  1. 回调函数

    用户传入的 Complete token 对象必需为满足该签名的 Callable 对象,并组装成 Completion handler。当异步完成时会调用通知函数 complete() 将该 handler 带上 error_code, size_t 并提交到指定的 executor 执行

  2. 约定

    作为决定完成回调的执行形式

    1. use_feature

      返回 future,该类型完成回调不能指定 executor。因为 future 是在调用方,由调用方自行调用 get 阻塞等待异步完成并获取 error_code, size_t

    2. use_awaitable

      根据签名使用 awaitable<std::tuple<error_code, size_t>, Executor> 作为该 co_await 的子协程可等待对象,异步完成时会将 error_code, size_t 写入该对象,然后挂起当前协程调用点,组装为带有协程恢复句柄的 Completion handler 并提交到指定的 executor 上执行,执行时恢复当前挂起点并从 awaitable 对象取出 error_code, size_t 并执行后续的协程内容 详见

异步发起

basic_socket_acceptor

basic_socket_acceptor.hpp
template <typename Protocol, typename Executor>
class basic_socket_acceptor : public socket_base {
  void open(const protocol_type &protocol = protocol_type()) {
    boost::system::error_code ec;
    impl_.get_service().open(impl_.get_implementation(), protocol, ec);
    boost::asio::detail::throw_error(ec, "open");
  }
}

创建 acceptor 并开启协议时,会调用到内部关联的 reactive_socket_service 服务 open 函数(创建服务端 socket 并挂载到 epoll_reactor 上 accept 连接 详见

async_accept()

basic_socket_acceptor.hpp
template <BOOST_ASIO_COMPLETION_TOKEN_FOR(
              void(boost::system::error_code,
                   typename Protocol::socket::template rebind_executor<
                       executor_type>::other))
              MoveAcceptToken = default_completion_token_t<executor_type>>
auto async_accept(
    MoveAcceptToken &&token = default_completion_token_t<executor_type>())
    -> decltype(async_initiate<
                MoveAcceptToken,
                void(boost::system::error_code,
                     typename Protocol::socket::template rebind_executor<
                         executor_type>::other)>(
        declval<initiate_async_move_accept>(), token,
        declval<const executor_type &>(), static_cast<endpoint_type *>(0),
        static_cast<typename Protocol::socket::template rebind_executor<
            executor_type>::other *>(0))) {
  return async_initiate<
      MoveAcceptToken, void(boost::system::error_code,
                            typename Protocol::socket::template rebind_executor<
                                executor_type>::other)>(
      initiate_async_move_accept(this), token, impl_.get_executor(),
      static_cast<endpoint_type *>(0),
      static_cast<typename Protocol::socket::template rebind_executor<
          executor_type>::other *>(0));
}

flowchart TD
    async_accept["`async_accept
    &lt;MoveAcceptToken&gt;
    (token)`"]

    async_initiate["`async_initiate
    &lt;MoveAcceptToken, void(error_code, socket)&gt;
    (initiate_async_move_accept(this),
    token,
    impl_.get_executor(),
    endpoint_type*(0),
    socket*(0))`"]

    async_result["`async_result
    &lt;CompleteToken, ...Signatures,
    Initiate, ...Args&gt;::
    initiate(initiate,
    token,
    args...)`"]

    async_accept --> async_initiate
    async_initiate --> async_result

沿着调用路径,将异步发起参数转发给 async_result::initiate() 进行初始化,最后调用到 initiate_async_move_accept(this)(token, impl_.get_executor(), NULL, NULL)

initiate_async_move_accept()

basic_stream_acceptor.hpp
class initiate_async_move_accept {
public:
  typedef Executor executor_type;

  explicit initiate_async_move_accept(basic_socket_acceptor *self)
      : self_(self) {}

  const executor_type &get_executor() const noexcept {
    return self_->get_executor();
  }

  template <typename MoveAcceptHandler, typename Executor1, typename Socket>
  void operator()(MoveAcceptHandler &&handler, const Executor1 &peer_ex,
                  endpoint_type *peer_endpoint, Socket *) const {
    detail::non_const_lvalue<MoveAcceptHandler> handler2(handler);
    self_->impl_.get_service().async_move_accept(
        self_->impl_.get_implementation(), peer_ex, peer_endpoint,
        handler2.value, self_->impl_.get_executor());
  }

private:
  basic_socket_acceptor *self_;
};

self_ 持有 reactive_socket_service 服务创建的 impl,通过服务的 async_move_accept 完成 impl 异步初始化 详见

basic_stream_socket

构建 socket 对象时也会调用到内部关联的 reactive_socket_service 服务 open 函数,可自行查看源代码

由于 accept 严格意义上将并不是创建一个新的 socket,而是由操作系统把一个已有的客户端 socket 转移到用户程序中,所以并不会调用到 socket 对象的构建流程,而是使用 assign 函数完成转移(流程上类似 open 详见

async_read_some()

basic_stream_socket.hpp
template <typename MutableBufferSequence,
          BOOST_ASIO_COMPLETION_TOKEN_FOR(void(boost::system::error_code,
                                               std::size_t))
              ReadToken = default_completion_token_t<executor_type>>
auto async_read_some(
    const MutableBufferSequence &buffers,
    ReadToken &&token = default_completion_token_t<executor_type>())
    -> decltype(async_initiate<ReadToken, void(boost::system::error_code,
                                               std::size_t)>(
        declval<initiate_async_receive>(), token, buffers,
        socket_base::message_flags(0))) {
  return async_initiate<ReadToken,
                        void(boost::system::error_code, std::size_t)>(
      initiate_async_receive(this), token, buffers,
      socket_base::message_flags(0));
}

flowchart TD
    async_read_some["`async_read_some
    &lt;Buffer, ReadToken&gt;
    (buffers,
    token)`"]

    async_initiate["`async_initiate
    &lt;ReadToken, void(error_code, size_t)&gt;
    (initiate_async_receive(this),
    token,
    buffers, message_flags)`"]

    async_result["`async_result
    &lt;CompleteToken, ...Signatures,
    Initiate, ...Args&gt;::
    initiate(initiate,
    token,
    args...)`"]

    async_read_some --> async_initiate
    async_initiate --> async_result

同上,即 initiate_async_receive(this)(token, buffers, message_flags)

initiate_async_receive()

basic_stream_socket.hpp
class initiate_async_receive {
public:
  typedef Executor executor_type;

  explicit initiate_async_receive(basic_stream_socket * self) : self_(self) {}

  const executor_type &get_executor() const noexcept {
    return self_->get_executor();
  }

  template <typename ReadHandler, typename MutableBufferSequence>
  void operator()(ReadHandler &&handler, const MutableBufferSequence &buffers,
                  socket_base::message_flags flags) const {
    detail::non_const_lvalue<ReadHandler> handler2(handler);
    self_->impl_.get_service().async_receive(self_->impl_.get_implementation(),
                                             buffers, flags, handler2.value,
                                             self_->impl_.get_executor());
  }

private:
  basic_stream_socket *self_;
};

同上

io_object_impl

detail/io_object_impl.hpp
template <typename IoObjectService,
    typename Executor = io_context::executor_type>
class io_object_impl {
  io_object_impl(int, const executor_type &ex)
      : service_(&boost::asio::use_service<IoObjectService>(
            io_object_impl::get_context(ex))),
        executor_(ex) {
    service_->construct(implementation_);
  }
};

io_object_impl 是负责构造/使用 service 创建 impl 的抽象泛型类

service_registry 是 Asio 用于延迟、按需初始化服务的机制,把公共服务设施挂载到 executor_context 上,避免重复创建或在不需要时创建造成的额外开销,当需要使用指定服务类时,可通过调用 make_service 转发参数构造或 use_service 默认初始化构造,已存在则直接返回类引用

参考 detail::service_registry 中的实现 详见

以 Linux 中的 Epoll 举例,service 对应到 eventpoll,而 impl 对应到 eventpoll 设施中的 epitem 事件节点

reactive_socket_service

typedef epoll_reactor reactor;

template <typename Protocol>
class reactive_socket_service :
  public execution_context_service_base<reactive_socket_service<Protocol>>,
  public reactive_socket_service_base {
  struct implementation_type
      : reactive_socket_service_base::base_implementation_type {
    implementation_type() : protocol_(endpoint_type().protocol()) {}
    // 协议类型
    protocol_type protocol_;
  };

  // 创建 socket 到 impl
  boost::system::error_code open(implementation_type &impl,
                                 const protocol_type &protocol,
                                 boost::system::error_code &ec) {
    if (!do_open(impl, protocol.family(),
          protocol.type(), protocol.protocol(), ec))
      impl.protocol_ = protocol;
    return ec;
  }

  // 转移原生 socket 到 impl
  boost::system::error_code assign(implementation_type &impl,
                                   const protocol_type &protocol,
                                   const native_handle_type &native_socket,
                                   boost::system::error_code &ec) {
    if (!do_assign(impl, protocol.type(), native_socket, ec))
      impl.protocol_ = protocol;
    return ec;
  }
};

管理 socket 操作相关的 service 类

reactive_socket_service_base

detail/reactive_socket_service_base.hpp
class reactive_socket_service_base {
  struct base_implementation_type {
    // 原始 socket
    socket_type socket_;
    // 状态
    socket_ops::state_type state_;
    // 挂载到 reactor 上的异步 op 指针
    descriptor_state *reactor_data_;
  };

  // 构造 epoll_reactor
  reactive_socket_service_base(execution_context &context)
      : reactor_(use_service<reactor>(context)) {}

  boost::system::error_code
  do_open(reactive_socket_service_base::base_implementation_type &impl, int af,
          int type, int protocol, boost::system::error_code &ec) {
    if (is_open(impl)) {
      ec = boost::asio::error::already_open;
      return ec;
    }

    // 创建 server socket
    socket_holder sock(socket_ops::socket(af, type, protocol, ec));
    if (sock.get() == invalid_socket)
      return ec;

    // 往 epoll_reactor 上注册当前 server socket 相关的 I/O 事件
    // 初始化 reactor_data_,设置 ptr 为 acceptor 的 impl.reactor_data_ 指针
    if (int err =
            reactor_.register_descriptor(sock.get(), impl.reactor_data_)) {
      ec = boost::system::error_code(err,
                                     boost::asio::error::get_system_category());
      return ec;
    }

    impl.socket_ = sock.release();
    switch (type) {
    case SOCK_STREAM:
      impl.state_ = socket_ops::stream_oriented | extra_state_;
      break;
    case SOCK_DGRAM:
      impl.state_ = socket_ops::datagram_oriented | extra_state_;
      break;
    default:
      impl.state_ = 0;
      break;
    }
    ec = boost::system::error_code();
    return ec;
  }

  boost::system::error_code reactive_socket_service_base::do_assign(
      reactive_socket_service_base::base_implementation_type &impl, int type,
      const reactive_socket_service_base::native_handle_type &native_socket,
      boost::system::error_code &ec) {
    if (is_open(impl)) {
      ec = boost::asio::error::already_open;
      return ec;
    }

    // 往 epoll_reactor 上注册当前 socket 相关的 I/O 事件
    if (int err =
            reactor_.register_descriptor(native_socket, impl.reactor_data_)) {
      ec = boost::system::error_code(err,
                                     boost::asio::error::get_system_category());
      return ec;
    }

    // 转移当前 socket
    impl.socket_ = native_socket;
    switch (type) {
    case SOCK_STREAM:
      impl.state_ = socket_ops::stream_oriented | extra_state_;
      break;
    case SOCK_DGRAM:
      impl.state_ = socket_ops::datagram_oriented | extra_state_;
      break;
    default:
      impl.state_ = 0;
      break;
    }
    impl.state_ |= socket_ops::possible_dup;
    ec = boost::system::error_code();
    return ec;
  }

  void construct(base_implementation_type& impl);

  reactor& reactor_;
};

Asio 将协议无关的代码划分到了 reactive_socket_service_base 基类中,仅由派生类处理协议相关内容以减少重复的二进制代码。对应的异步 op 为 descriptor_state 详见

async_move_accept()

detail/reactive_socket_service.hpp
template <typename PeerIoExecutor, typename Handler, typename IoExecutor>
void async_move_accept(implementation_type &impl,
                       const PeerIoExecutor &peer_io_ex,
                       endpoint_type *peer_endpoint, Handler &handler,
                       const IoExecutor &io_ex) {
  bool is_continuation =
      BOOST_ASIO_VERSIONED_NAME(handler_cont_helpers)::is_continuation(handler);

  associated_cancellation_slot_t<Handler> slot =
      boost::asio::get_associated_cancellation_slot(handler);

  typedef reactive_socket_move_accept_op<Protocol, PeerIoExecutor, Handler,
                                         IoExecutor>
      op;
  typename op::ptr p = {boost::asio::detail::addressof(handler),
                        op::ptr::allocate(handler), 0};
  p.p = new (p.v) op(success_ec_, peer_io_ex, impl.socket_, impl.state_,
                     impl.protocol_, peer_endpoint, handler, io_ex);

  if (slot.is_connected()) {
    p.p->cancellation_key_ = &slot.template emplace<reactor_op_cancellation>(
        &reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
  }

  start_accept_op(impl, p.p, is_continuation, false, &io_ex, 0);
  p.v = p.p = 0;
}

将异步接受连接及完成回调封装成 reactive_socket_move_accept_op,调用 start_accept_op 挂载到 reactor_ 服务(即 epoll_reactor,在 reactive_socket_service_base 初始化之时通过 use_service 创建)里对应的 descriptor_state(fd)上等待事件到达 详见

flowchart LR
    start_accept_op --> do_start_accept_op --> do_start_op --> reactor_.start_op

在 do_start_accept_op 函数中,accept 实际对应的 op_types 为 reactor::read_op。表示该连接的三次握手已完成,进入全连接队列,内核将该 fd 标记为可读

reactive_socket_move_accept_op

detail/reactive_socket_accept_op.hpp
                 typename IoExecutor>
class reactive_socket_move_accept_op
    : private Protocol::socket::template rebind_executor<PeerIoExecutor>::other,
      public reactive_socket_accept_op_base<
          typename Protocol::socket::template rebind_executor<
              PeerIoExecutor>::other,
          Protocol> {
public:
  typedef Handler handler_type;
  typedef IoExecutor io_executor_type;

  // RAII 资源管理指针
  BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_move_accept_op);

  reactive_socket_move_accept_op(const boost::system::error_code &success_ec,
                                 const PeerIoExecutor &peer_io_ex,
                                 socket_type socket,
                                 socket_ops::state_type state,
                                 const Protocol &protocol,
                                 typename Protocol::endpoint *peer_endpoint,
                                 Handler &handler, const IoExecutor &io_ex)
      : peer_socket_type(peer_io_ex),
        reactive_socket_accept_op_base<peer_socket_type, Protocol>(
            success_ec, socket, state, *this, protocol, peer_endpoint,
            &reactive_socket_move_accept_op::do_complete),
        handler_(static_cast<Handler &&>(handler)), work_(handler_, io_ex) {}

  // 就绪时调用的通知函数
  static void do_complete(void *owner, operation *base,
                          const boost::system::error_code & /*ec*/,
                          std::size_t /*bytes_transferred*/) {
    reactive_socket_move_accept_op* o(
        static_cast<reactive_socket_move_accept_op*>(base));
    ptr p = { boost::asio::detail::addressof(o->handler_), o, o };

    // 将 socket 从 new_socket_ 转移到自身
    if (owner)
      o->do_assign();

    handler_work<Handler, IoExecutor> w(
        static_cast<handler_work<Handler, IoExecutor>&&>(
          o->work_));

    // 绑定异步事件的 I/O 结果到 handler,即 error_code 和 socket
    detail::move_binder2<Handler,
      boost::system::error_code, peer_socket_type>
        handler(0, static_cast<Handler&&>(o->handler_), o->ec_,
          static_cast<peer_socket_type&&>(*o));
    p.h = boost::asio::detail::addressof(handler.handler_);
    p.reset();

    if (owner) {
      // 内存屏障
      fenced_block b(fenced_block::half);
      // 调用实际的通知函数,将 handler 提交到 handler.handler_ 绑定的 executor 去执行
      w.complete(handler, handler.handler_);
    }
  }

private:
  typedef typename Protocol::socket::template
    rebind_executor<PeerIoExecutor>::other peer_socket_type;
  Handler handler_;
  handler_work<Handler, IoExecutor> work_;
};

template <typename Socket, typename Protocol>
class reactive_socket_accept_op_base : public reactor_op {
public:
  reactive_socket_accept_op_base(const boost::system::error_code &success_ec,
                                 socket_type socket,
                                 socket_ops::state_type state, Socket &peer,
                                 const Protocol &protocol,
                                 typename Protocol::endpoint *peer_endpoint,
                                 func_type complete_func)
      : reactor_op(success_ec, &reactive_socket_accept_op_base::do_perform,
                   complete_func),
        socket_(socket), state_(state), peer_(peer), protocol_(protocol),
        peer_endpoint_(peer_endpoint),
        addrlen_(peer_endpoint ? peer_endpoint->capacity() : 0) {}

  // 事件到达时执行
  // 从 socket_ops::accept 系统调用中取出 socket 并存入 new_socket_ 中
  static status do_perform(reactor_op *base) {
    BOOST_ASIO_ASSUME(base != 0);
    reactive_socket_accept_op_base* o(
        static_cast<reactive_socket_accept_op_base*>(base));

    socket_type new_socket = invalid_socket;
    // accept 系统调用
    status result = socket_ops::non_blocking_accept(o->socket_,
        o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
        o->peer_endpoint_ ? &o->addrlen_ : 0, o->ec_, new_socket)
    ? done : not_done;
    o->new_socket_.reset(new_socket);
    return result;
  }

  // 从 new_socket_ 中转移给自身
  void do_assign() {
    if (new_socket_.get() != invalid_socket) {
      if (peer_endpoint_)
        peer_endpoint_->resize(addrlen_);
      peer_.assign(protocol_, new_socket_.get(), ec_);
      if (!ec_)
        new_socket_.release();
    }
  }

private:
  socket_type socket_;
  socket_ops::state_type state_;
  socket_holder new_socket_;
  Socket& peer_;
  Protocol protocol_;
  typename Protocol::endpoint* peer_endpoint_;
  std::size_t addrlen_;
};

当该事件到来时,调用 reactor_op::perform() 从 socket_ops::accept 系统调用中取出 socket 并存入当前 op 的 new_socket_ 中(reactor_op 是依附在 epoll_reactor::descriptor_state 上存在的,代表该 fd 上的异步操作) 详见

detail/handler_work.hpp
template <typename Handler, typename IoExecutor, typename = void>
class handler_work
    : handler_work_base<IoExecutor>,
      handler_work_base<associated_executor_t<Handler, IoExecutor>,
                        IoExecutor> {
public:
  typedef handler_work_base<IoExecutor> base1_type;
  typedef handler_work_base<associated_executor_t<Handler, IoExecutor>,
      IoExecutor> base2_type;

  handler_work(Handler &handler, const IoExecutor &io_ex) noexcept
      : base1_type(0, 0, io_ex),
        base2_type(base1_type::owns_work(),
                   boost::asio::get_associated_executor(handler, io_ex),
                   io_ex) {}

  template <typename Function>
  void complete(Function &function, Handler &handler) {
    if (!base1_type::owns_work() && !base2_type::owns_work()) {
      // When using a native implementation, I/O completion handlers are
      // already dispatched according to the execution context's executor's
      // rules. We can call the function directly.
      static_cast<Function&&>(function)();
    } else {
      base2_type::dispatch(function, handler);
    }
  }
};

当异步完成时,调用 complete 函数完成通知或就地执行(如果是 io_context 的内部事件,或者 IoExecutor 和 指定的 HandlerExecutor 是同一个,则直接在 io_context 的事件循环中执行,不需要再经过 dispatch 提交到 op_queue_ 等待调度执行)

dispatch 依赖所指定的 executor 实现 详见

async_receive()

detail/reactive_socket_service_base.hpp
template <typename MutableBufferSequence,
typename Handler, typename IoExecutor>
void async_receive(base_implementation_type& impl,
      const MutableBufferSequence& buffers, socket_base::message_flags flags,
      Handler& handler, const IoExecutor& io_ex) {
  bool is_continuation =
      BOOST_ASIO_VERSIONED_NAME(handler_cont_helpers)::is_continuation(handler);

  associated_cancellation_slot_t<Handler> slot =
      boost::asio::get_associated_cancellation_slot(handler);

  typedef reactive_socket_recv_op<MutableBufferSequence, Handler, IoExecutor>
      op;
  typename op::ptr p = {boost::asio::detail::addressof(handler),
                        op::ptr::allocate(handler), 0};
  p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_, buffers, flags,
                     handler, io_ex);

  if (slot.is_connected()) {
    p.p->cancellation_key_ = &slot.template emplace<reactor_op_cancellation>(
        &reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
  }

  start_op(impl,
           (flags & socket_base::message_out_of_band) ? reactor::except_op
                                                      : reactor::read_op,
           p.p, is_continuation,
           (flags & socket_base::message_out_of_band) == 0,
           ((impl.state_ & socket_ops::stream_oriented) &&
            buffer_sequence_adapter<boost::asio::mutable_buffer,
                                    MutableBufferSequence>::all_empty(buffers)),
           true, &io_ex, 0);
  p.v = p.p = 0;
}

同上,将异步读取及完成回调封装成 reactive_socket_recv_op 并通过 start_op 完成挂载

epoll_reactor

descriptor_state

detail/epoll_reactor.hpp
// 一个 descriptor_state 对应一个 fd
// 其上挂载了所有等待它的异步操作
struct descriptor_state : operation {
  descriptor_state *next_;
  descriptor_state *prev_;

  mutex mutex_;
  epoll_reactor *reactor_;
  int descriptor_;
  uint32_t registered_events_;
  op_queue<reactor_op> op_queue_[max_ops];
  bool try_speculative_[max_ops];
  bool shutdown_;

  BOOST_ASIO_DECL descriptor_state(bool locking, int spin_count);
  void set_ready_events(uint32_t events) { task_result_ = events; }
  void add_ready_events(uint32_t events) { task_result_ |= events; }

  // 检测 epoll_wait 返回的 event 类型(read_op, connect_or_write_op, except_op)
  // 收集对应激活的 op_queue_[activate_op_flag...] 到 ops_
  // 返回第一个 op 并交由 do_complete 中执行
  // 其余由追加到 scheduler 的 op_queue_ 中由事件循环推动
  operation *perform_io(uint32_t events) {
    mutex_.lock();
    perform_io_cleanup_on_block_exit io_cleanup(reactor_);
    mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
    static const int flag[max_ops] = {EPOLLIN, EPOLLOUT, EPOLLPRI};
    for (int j = max_ops - 1; j >= 0; --j) {
      if (events & (flag[j] | EPOLLERR | EPOLLHUP)) {
        try_speculative_[j] = true;
        while (reactor_op *op = op_queue_[j].front()) {
          if (reactor_op::status status = op->perform()) {
            op_queue_[j].pop();
            io_cleanup.ops_.push(op);
            if (status == reactor_op::done_and_exhausted) {
              try_speculative_[j] = false;
              break;
            }
          } else
            break;
        }
      }
    }
    io_cleanup.first_op_ = io_cleanup.ops_.front();
    io_cleanup.ops_.pop();
    return io_cleanup.first_op_;
  }

  // 就绪时需要触发的钩子
  // 在事件循环中的 o->complete(this, ec, task_result) 中被调用
  static void do_complete(void *owner, operation *base,
                          const boost::system::error_code &ec,
                          std::size_t bytes_transferred) {
    if (owner) {
      // 检查记录的 owner 是否存活,使用 void* 避免多态开销
      descriptor_state *descriptor_data = static_cast<descriptor_state *>(base);
      uint32_t events = static_cast<uint32_t>(bytes_transferred);
      if (operation *op = descriptor_data->perform_io(events)) {
        op->complete(owner, ec, 0);
      }
    }
  }

  // 继承自 operation,保存 &do_complete 指针到父类 func_ 然后通过 complete 调用
  descriptor_state(bool locking, int spin_count)
      : operation(&epoll_reactor::descriptor_state::do_complete),
        mutex_(locking, spin_count) {}
};

对应到 fd 的上下文

start_op()

detail/impl/epoll_reactor.ipp
void epoll_reactor::start_op(int op_type, socket_type descriptor,
                        epoll_reactor::per_descriptor_data &descriptor_data,
                        reactor_op *op, bool is_continuation,
                        bool allow_speculative,
                        void (*on_immediate)(operation *, bool, const void *),
                        const void *immediate_arg) {
  if (!descriptor_data) {
    op->ec_ = boost::asio::error::bad_descriptor;
    on_immediate(op, is_continuation, immediate_arg);
    return;
  }

  mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);

  if (descriptor_data->shutdown_) {
    on_immediate(op, is_continuation, immediate_arg);
    return;
  }

  if (descriptor_data->op_queue_[op_type].empty()) {
    if (allow_speculative &&
        (op_type != read_op || descriptor_data->op_queue_[except_op].empty())) {
      if (descriptor_data->try_speculative_[op_type]) {
        if (reactor_op::status status = op->perform()) {
          if (status == reactor_op::done_and_exhausted)
            if (descriptor_data->registered_events_ != 0)
              descriptor_data->try_speculative_[op_type] = false;
          descriptor_lock.unlock();
          on_immediate(op, is_continuation, immediate_arg);
          return;
        }
      }

      if (descriptor_data->registered_events_ == 0) {
        op->ec_ = boost::asio::error::operation_not_supported;
        on_immediate(op, is_continuation, immediate_arg);
        return;
      }

      if (op_type == write_op) {
        if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) {
          epoll_event ev = { 0, { 0 } };
          ev.events = descriptor_data->registered_events_ | EPOLLOUT;
          ev.data.ptr = descriptor_data;
          if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) {
            descriptor_data->registered_events_ |= ev.events;
          } else {
            op->ec_ = boost::system::error_code(
                errno, boost::asio::error::get_system_category());
            on_immediate(op, is_continuation, immediate_arg);
            return;
          }
        }
      }
    } else if (descriptor_data->registered_events_ == 0) {
      op->ec_ = boost::asio::error::operation_not_supported;
      on_immediate(op, is_continuation, immediate_arg);
      return;
    } else {
      if (op_type == write_op) {
        descriptor_data->registered_events_ |= EPOLLOUT;
      }

      epoll_event ev = {0, {0}};
      ev.events = descriptor_data->registered_events_;
      ev.data.ptr = descriptor_data;
      epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
    }
  }

  descriptor_data->op_queue_[op_type].push(op);
  scheduler_.work_started();
}

中间都是一些状态判断,我们只需要关注最后的 descriptor_data->op_queue_[op_type].push(op); 将当前 op 挂载到 fd 的 op_type 类型的 op_queue_ 中,等待对应事件的触发

run()

detail/impl/epoll_reactor.ipp
void epoll_reactor::run(long usec, op_queue<operation> &ops) {
  // Calculate timeout. Check the timer queues only if timerfd is not in use.
  int timeout;
  if (usec == 0)
    timeout = 0;
  else {
    timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
    if (timer_fd_ == -1) {
      mutex::scoped_lock lock(mutex_);
      timeout = get_timeout(timeout);
    }
  }

  // 获取 epoll_wait 就绪事件
  epoll_event events[128];
  int num_events = epoll_wait(epoll_fd_, events, 128, timeout);

  for (int i = 0; i < num_events; ++i) {
    // 获取 fd 的上下文
    void* ptr = events[i].data.ptr;
    descriptor_state *descriptor_data = static_cast<descriptor_state *>(ptr);
    if (!ops.is_enqueued(descriptor_data)) {
      descriptor_data->set_ready_events(events[i].events);
      ops.push(descriptor_data);
    } else {
      descriptor_data->add_ready_events(events[i].events);
    }
  }

  // 裁剪了 timer_fd 相关的逻辑
}

epoll_reactor 对应到 epoll_wait 的事件循环(实际会由 io_context 的事件循环中的 task_operation_ 事件时调用 task_.run() 详见

事件循环

io_context

---
title: io_context relationship
---
classDiagram
    class i__basic_executor_type["io_context::basic_executor_type"] {
        +context* target_

        +basic_executor_type require(PropXXX)
        +PropXXX query(TagXXX)
    }

    class e__service["execution_context::service"] {
        +execution_context& owner_
        +service* next_
        +func* destory_
    }

    class execution_context_service_base~Type~ {
        +static service_id~Type~ id
    }

    execution_context_service_base <|-- e__service

    class d__service_registry["detail::service_registry"] {
        +mutex mutex_
        +execution_context& owner_
        +execution_context::service* first_service_

        +execution_context::service* create~Service,Owner~(execution_context, owner, Args... args) 
        +Service& use_service~Service~()
        +Service& make_service~Service~()
    }

    class execution_context {
        +auto_allocator_ptr allocator_
        +detail::service_registry service_registry_

        +Service& use_service~Service~()
        +Service& make_service~Service~()
    }

    execution_context *-- d__service_registry

    class scheduler {
        +op_queue~operation~ op_queue_
        +thread thread_

        +size_t run(error_code)
        +size_t do_run_one(lock, thread_info, error_code)
    }

    scheduler <|-- execution_context_service_base

    class io_context {
        +scheduler impl_

        +basic_executor_type get_executor_type() 
        +size_t run()
    }

    io_context *-- scheduler
    io_context <|-- execution_context

io_context 是 Asio 中与异步网络类系统调用的桥梁,充当了系统调用的注册发起和异步完成回调的调度主体

  1. 事件循环与调度中枢

    io_context 负责管理和调度所有的异步 I/O 操作

    • 所有的 I/O 对象(tcp::socket, steady_timer, …)都必需绑定到一个 io_context 实例

    • 完成回调由 io_context 分发给 CompleteHander 所绑定工作线程执行

  2. 跨平台支持

    • Linux:epoll(default), io_uring
    • Windows:IOCP
    • MacOS:kqueue
#if defined(BOOST_ASIO_HAS_IOCP)
  typedef win_iocp_io_context io_context_impl;
  class win_iocp_overlapped_ptr;
#else
  typedef scheduler io_context_impl;
#endif

io_context_impl 是 指定平台 对应的 io_context 调度器抽象类

run()

impl/io_context.ipp
io_context::count_type io_context::run() {
  count_type s = impl_.run(ec);
  return s;
}

run() 转发到实际实现 io_context_impl.run()

scheduler

以 Linux 平台所对应的 scheduler 实现为例

flowchart LR
    run["run()"]-->thread_info["thread_info ths;"]
    thread_info-->do_run_one1["do_run_one(&ths)"]
    do_run_one1-- while loop -->do_run_one1
flowchart TD
    do_run_one(["do_run_one()"])-->op_empty_switch{"op_queue_.empty()?"}
    op_empty_switch-- false -->get_op{"get op and switch"}
    op_empty_switch-- true -->sleep(["sleep & wait for signal"])
    get_op-- task_operation_ -->task_run["`task_.run
    (task_usec_,
    ths.private_op_queue)`"]
    get_op-- others -->complete["op.complete()"]
    task_run-->collect["`op_queue_ +=
    ths.private_op_queue`"]
    collect & complete-->finish(["finish"])

thread_info

detail/scheduler_thread_info.hpp
struct scheduler_thread_info : public thread_info_base
{
  op_queue<scheduler_operation> private_op_queue;
  long private_outstanding_work;
};

scheduler_thread_info 作为 thread local 存储块,主要存储了一个私有就绪 operation 队列,用于收集并一次性提交到 scheduler 的 op_queue_ 中

run()

detail/impl/scheduler.ipp
std::size_t scheduler::run(boost::system::error_code &ec) {
  thread_info this_thread; // 创建当前线程存储块
  thread_call_stack::context ctx(this, this_thread);

  mutex::scoped_lock lock(mutex_); // 获取 op_queue_ 锁

  std::size_t n = 0;
  for (; do_run_one(lock, this_thread, ec); lock.lock())
    if (n != (std::numeric_limits<std::size_t>::max)())
      ++n;
  return n;
}

执行 io_context.run() 的线程实际会通过接管 scheduler 循环执行 do_run_once() 来推动 scheduler 的异步事件循环

do_run_one()

detail/impl/scheduler.ipp
std::size_t scheduler::do_run_one(mutex::scoped_lock &lock,
                                  scheduler::thread_info &this_thread,
                                  const boost::system::error_code &ec) {
  while (!stopped_) {
    if (!op_queue_.empty()) {
      // 获取待执行 op
      operation *o = op_queue_.front();
      op_queue_.pop();

      if (o == &task_operation_) { // 代表执行 epoll_wait 监听的标记
        // lock.unlock
        // 唤醒其他事件循环线程
        // ...

        // 保持只有一个线程处于 epoll_wait
        // on_exit 析构时重新添加 task_operation_ 进 op_queue_
        task_cleanup on_exit = {this, &lock, &this_thread};

        // 监听 epoll 并把就绪 op 收集到自身 private_op_queue 队列
        // on_exit 析构时添加进 op_queue_
        task_->run(more_handlers ? 0 : task_usec_,
            this_thread.private_op_queue);

        // 减少锁竞争优化
        // on_exit 析构时会去获取 lock 并更新 op_queue_ 队列
        // 所以这里并不会如同执行用户回调 op 一样直接返回,而是再次获取待执行 op
      } else { // 执行就绪的 op
        // lock.unlock
        // 唤醒其他事件循环线程
        // ...

        work_cleanup on_exit = { this, &lock, &this_thread };

        // 可能有立刻就绪的 op 产生,收集到 private_op_queue 队列
        // on_exit 析构时添加进 op_queue_
        o->complete(this, ec, o->task_result_);

        return 1; // 统计执行用户提交的异步操作次数
      }
    } else {
      // 空轮询优化
      // ...
    }
  }

  return 0;
}

do_run_one() 为完整的一次事件循环,从全局就绪列表中取出已就绪的异步 op 并执行

scheduler_task

detail/scheduler_task.hpp
class scheduler_task {
public:
  virtual void run(long usec, op_queue<scheduler_operation>& ops) = 0;
  virtual void interrupt() = 0;
};

scheduler_task 是 scheduler 执行的任务循环事件,通常是有关异步网络 I/O 的组件(系统调用 epoll, io_uring, kqueue 等)

在 Linux 上是默认使用 epoll_reactor,详见

operation

#if defined(BOOST_ASIO_HAS_IOCP)
typedef win_iocp_operation operation;
#else
typedef scheduler_operation operation;
#endif

scheduler 以 operation 为单位进行调度,op_queue_ 为调度器所持有的已就绪异步 op 列表

为了统一调度管理,Asio 将异步网络的系统调用统一封装为 task_operation_ 标签

取出 task_operation_ 的线程执行 epoll_wait 并收集就绪的系统调用到 op_queue_

对象类型来源说明
task_operation内部任务调度哨兵对象,执行 epoll_wait 并收集就绪的系统调用
descriptor_state文件 fd 状态socket 可读/可写通知,内部持有 reactor_op
reactor_opI/O 反应器用于 socket 不同类型的 op(读、写、连接、异常)
wait_op定时器到期deadline_timer_service, steady_timer_service 注册
user handler_op用户提交io_context::post() 提交的函数对象
  1. task_operation

    当前线程执行 epoll_wait 等待,将就绪的 socket/timer 的 descriptor_state/wait_op 收集到 private_op_queue 中,随后将 private_op_queue 一并合并到 op_queue_ 中等待下一次调度

  2. others

    执行 op->complete(),标记当前 op 已就绪(根据所绑定的 executor 选择立即执行完成回调 or 提交到就绪队列等待调度执行)

空轮询优化:当启动 io_context.run() 时还未注册异步 I/O,防止空转消耗 CPU 资源,会通过 wakeup_event_ 触发 pthread_cond_wait,直到有新注册的异步 I/O 发送 signal 唤醒

scheduler_operation

detail/scheduler_operation.hpp
class scheduler_operation {
public:
  typedef scheduler_operation operation_type;

  void complete(void *owner, const boost::system::error_code &ec,
                std::size_t bytes_transferred) {
    func_(owner, this, ec, bytes_transferred);
  }

  void destroy() { func_(0, this, boost::system::error_code(), 0); }

protected:
  typedef void (*func_type)(void *, scheduler_operation *,
                            const boost::system::error_code &, std::size_t);

  scheduler_operation(func_type func)
      : next_(0), func_(func), task_result_(0) {}

  ~scheduler_operation() {}

private:
  friend class op_queue_access;
  scheduler_operation *next_;
  func_type func_;
protected:
  friend class scheduler;
  unsigned int task_result_; // 记录接受到的字节数
};

所有 operation 的基类,事件完成时通过 complete(owner, error_code, tag) 函数通知

executor

异步任务在完成时需要通过某种机制通知等待它的注册方,对应了 executor 的实现

executor 是实际执行任务的主体,Asio 通过给各类任务调度执行器都封装了 executor 以便在任何时候接收其他线程的任务提交

例如 io_context::executor_type、thread_pool::executor_type 以及 strand<Executor> 封装的串行执行 等

io_context.hpp
// executor 通过存储指针 target_ 关联对应的 io_context
// 在 cpp 内存对齐下,指针末两位始终为0
// asio 将其作为标志位表示是否允许阻塞、是否延续执行

struct io_context_bits {
  static constexpr uintptr_t blocking_never = 1;
  static constexpr uintptr_t relationship_continuation = 2;
  static constexpr uintptr_t outstanding_work_tracked = 4;
  static constexpr uintptr_t runtime_bits = 3;
};

template <typename Allocator, uintptr_t Bits>
class io_context::basic_executor_type : detail::io_context_bits, Allocator {
public:
  /// Copy constructor.
  basic_executor_type(const basic_executor_type &other) noexcept
      : Allocator(static_cast<const Allocator &>(other)),
        target_(other.target_) {}

  // 设置 runtime_bits 仅由 execute 使用
  constexpr basic_executor_type
  require(execution::blocking_t::possibly_t) const {
    return basic_executor_type(context_ptr(),
        *this, bits() & ~blocking_never);
  }

  // 查询 runtime_bits
  constexpr execution::blocking_t query(execution::blocking_t) const noexcept {
    return (bits() & blocking_never)
      ? execution::blocking_t(execution::blocking.never)
      : execution::blocking_t(execution::blocking.possibly);
  }

  // 标准接口,适配 std
  template <typename Function> void execute(Function &&f) const;

  template <typename Function, typename OtherAllocator>
  void dispatch(Function &&f, const OtherAllocator &a) const;

  template <typename Function, typename OtherAllocator>
  void post(Function &&f, const OtherAllocator &a) const;

  template <typename Function, typename OtherAllocator>
  void defer(Function &&f, const OtherAllocator &a) const;

private:
  io_context *context_ptr() const noexcept {
    return reinterpret_cast<io_context *>(target_ & ~runtime_bits);
  }

  uintptr_t bits() const noexcept { return target_ & runtime_bits; }

  uintptr_t target_;
};

execute()

impl/io_context.hpp
template <typename Allocator, uintptr_t Bits>
template <typename Function>
void io_context::basic_executor_type<Allocator, Bits>::execute(
    Function &&f) const {
  typedef decay_t<Function> function_type;

  // 非阻塞时且当前处于 io_context 循环时,可以直接执行
  if ((bits() & blocking_never) == 0 && context_ptr()->impl_.can_dispatch()) {
    function_type tmp(static_cast<Function&&>(f));

    detail::fenced_block b(detail::fenced_block::full);
    static_cast<function_type &&>(tmp)();
    return;
  }

  // 否则转为 operation 入队等待调度执行
  typedef detail::executor_op<function_type, Allocator, detail::operation> op;
  typename op::ptr p = {
      detail::addressof(static_cast<const Allocator &>(*this)),
      op::ptr::allocate(static_cast<const Allocator &>(*this)), 0};
  p.p = new (p.v) op(static_cast<Function&&>(f),
      static_cast<const Allocator&>(*this));

  BOOST_ASIO_HANDLER_CREATION((*context_ptr(), *p.p,
        "io_context", context_ptr(), 0, "execute"));

  context_ptr()->impl_.post_immediate_completion(p.p,
      (bits() & relationship_continuation) != 0);
  p.v = p.p = 0;
}

post_immediate_completion

void scheduler::post_immediate_completion(
    scheduler::operation* op, bool is_continuation) {
  work_started();
  mutex::scoped_lock lock(mutex_);
  op_queue_.push(op);
  wake_one_thread_and_unlock(lock);
}

直接把 op 入队已就绪队列即可

执行时机决策

executor 会提供 dispatch、post、defer 三种决策供注册方选择

dispatch条件允许时立刻执行
post强制异步
defer延迟,但允许优化

具体代码可以在 impl/io_context.hpp 中找到(流程类似 execute),这里不再赘述

can_dispatch()

判断当前线程是否处于对应的 io_context 循环中

detail/impl/scheduler.ipp
bool scheduler::can_dispatch() {
  return thread_call_stack::contains(this) != 0;
}

thread_call_stack 是专门记录线程的 io_context 调用栈(thread_local),使用了通用的 call_stack<thread_context, thread_info_base>构造

在 io_context::run 内会构造 thread_call_stack,离开时析构以实现追踪 io_context 嵌套链的效果

detail/thread_context.hpp
class thread_context {
public:
  BOOST_ASIO_DECL static thread_info_base* top_of_thread_call_stack();

protected:
  typedef call_stack<thread_context, thread_info_base> thread_call_stack;
};

detail/call_stack.hpp
template <typename Key, typename Value = unsigned char> class call_stack {
public:
  class context : private noncopyable {
  public:
    explicit context(Key *k) : key_(k), next_(call_stack<Key, Value>::top_) {
      value_ = reinterpret_cast<unsigned char*>(this);
      call_stack<Key, Value>::top_ = this;
    }

    context(Key *k, Value &v)
        : key_(k), value_(&v), next_(call_stack<Key, Value>::top_) {
      call_stack<Key, Value>::top_ = this;
    }

    ~context() { call_stack<Key, Value>::top_ = next_; }

    Value *next_by_key() const {
      context* elem = next_;
      while (elem) {
        if (elem->key_ == key_)
          return elem->value_;
        elem = elem->next_;
      }
      return 0;
    }

  private:
    friend class call_stack<Key, Value>;
    Key* key_;
    Value* value_;
    context* next_;
  };

  friend class context;

  static Value *contains(Key *k) {
    context* elem = top_;
    while (elem) {
      if (elem->key_ == k)
        return elem->value_;
      elem = elem->next_;
    }
    return 0;
  }

  static Value *top() {
    context* elem = top_;
    return elem ? elem->value_ : 0;
  }

private:
  static tss_ptr<context> top_;
};

值得注意的是,asio::thread_pool 中也有类似的 can_dispatch 机制,用于检测是否处在当前的 thread_pool 循环中以优化调用为就地执行,与 io_context 类似,使用 scheduler 作为底层调度器实现

唯一不同点是

  1. io_context 会在第一次异步 op 发起时通过 reactive_socket_service_base 构造 epoll_reactor 服务的同时调用基类 scheduler 的 init_task 完成 scheduler_task 的初始化

  2. thread_pool 不会调用 init_task,对应的 task_ 成员为空指针,因为线程池只需要关注用户提交的异步任务,也没有类似 io_context 的延迟初始化机制

协程适配

TODO