引言
Asio 是 C++ 网络编程的事实标准,但多数开发者将其视为“黑盒” —— 我们提交异步操作,等待回调触发,却很少关心事件如何从内核抵达用户态,调度器如何分发任务,以及跨平台抽象层究竟隐藏了哪些细节
本文从一个简单的回显服务例子开始,逐步解析异步接口的底层原理与 C++20 coroutine 的协程式异步适配
前置
CompletionToken
Asio 对异步 op 就绪时(When)的完成回调,允许自定义其形式(How)和执行位置(Where)
flowchart TD
async_op["Async operation"] -- 启动时关联 --> init_fn["Initiating\nfunction"]
async_op -- 完成时调用 --> completion_hander["Completion\nhandler"]
init_fn -- 绑定签名 --> completion_signature["Completion\nsignature"]
init_fn -. 组装特化 .-> async_result["async_result\ntrait"]
init_fn -. 绑定参数 .-> completion_token["Completion\ntoken"]
completion_token -. 组装 .-> completion_hander
完成令牌:异步操作完成后调用的回调函数或约定
完成程序:待执行的完成回调子程序,通过完成令牌组装而成
以 asio::async_read() 异步读取接口为例,该接口的回调函数签名固定为 void(error_code, size_t),分别代表错误码和已读取字节数,根据完成令牌类型分为
回调函数
用户传入的 Complete token 对象必需为满足该签名的 Callable 对象,并组装成 Completion handler。当异步完成时会调用通知函数 complete() 将该 handler 带上 error_code, size_t 并提交到指定的 executor 执行
约定
作为决定完成回调的执行形式
use_feature
返回 future,该类型完成回调不能指定 executor。因为 future 是在调用方,由调用方自行调用 get 阻塞等待异步完成并获取 error_code, size_t
use_awaitable
根据签名使用 awaitable<std::tuple<error_code, size_t>, Executor> 作为该 co_await 的子协程可等待对象,异步完成时会将 error_code, size_t 写入该对象,然后挂起当前协程调用点,组装为带有协程恢复句柄的 Completion handler 并提交到指定的 executor 上执行,执行时恢复当前挂起点并从 awaitable 对象取出 error_code, size_t 并执行后续的协程内容 详见
异步发起
basic_socket_acceptor
basic_socket_acceptor.hpp
template <typename Protocol, typename Executor>
class basic_socket_acceptor : public socket_base {
void open(const protocol_type &protocol = protocol_type()) {
boost::system::error_code ec;
impl_.get_service().open(impl_.get_implementation(), protocol, ec);
boost::asio::detail::throw_error(ec, "open");
}
}
创建 acceptor 并开启协议时,会调用到内部关联的 reactive_socket_service 服务 open 函数(创建服务端 socket 并挂载到 epoll_reactor 上 accept 连接 详见)
async_accept()
basic_socket_acceptor.hpp
template <BOOST_ASIO_COMPLETION_TOKEN_FOR(
void(boost::system::error_code,
typename Protocol::socket::template rebind_executor<
executor_type>::other))
MoveAcceptToken = default_completion_token_t<executor_type>>
auto async_accept(
MoveAcceptToken &&token = default_completion_token_t<executor_type>())
-> decltype(async_initiate<
MoveAcceptToken,
void(boost::system::error_code,
typename Protocol::socket::template rebind_executor<
executor_type>::other)>(
declval<initiate_async_move_accept>(), token,
declval<const executor_type &>(), static_cast<endpoint_type *>(0),
static_cast<typename Protocol::socket::template rebind_executor<
executor_type>::other *>(0))) {
return async_initiate<
MoveAcceptToken, void(boost::system::error_code,
typename Protocol::socket::template rebind_executor<
executor_type>::other)>(
initiate_async_move_accept(this), token, impl_.get_executor(),
static_cast<endpoint_type *>(0),
static_cast<typename Protocol::socket::template rebind_executor<
executor_type>::other *>(0));
}
flowchart TD
async_accept["`async_accept
<MoveAcceptToken>
(token)`"]
async_initiate["`async_initiate
<MoveAcceptToken, void(error_code, socket)>
(initiate_async_move_accept(this),
token,
impl_.get_executor(),
endpoint_type*(0),
socket*(0))`"]
async_result["`async_result
<CompleteToken, ...Signatures,
Initiate, ...Args>::
initiate(initiate,
token,
args...)`"]
async_accept --> async_initiate
async_initiate --> async_result
沿着调用路径,将异步发起参数转发给 async_result::initiate() 进行初始化,最后调用到 initiate_async_move_accept(this)(token, impl_.get_executor(), NULL, NULL)
initiate_async_move_accept()
basic_stream_acceptor.hpp
class initiate_async_move_accept {
public:
typedef Executor executor_type;
explicit initiate_async_move_accept(basic_socket_acceptor *self)
: self_(self) {}
const executor_type &get_executor() const noexcept {
return self_->get_executor();
}
template <typename MoveAcceptHandler, typename Executor1, typename Socket>
void operator()(MoveAcceptHandler &&handler, const Executor1 &peer_ex,
endpoint_type *peer_endpoint, Socket *) const {
detail::non_const_lvalue<MoveAcceptHandler> handler2(handler);
self_->impl_.get_service().async_move_accept(
self_->impl_.get_implementation(), peer_ex, peer_endpoint,
handler2.value, self_->impl_.get_executor());
}
private:
basic_socket_acceptor *self_;
};
self_ 持有 reactive_socket_service 服务创建的 impl,通过服务的 async_move_accept 完成 impl 异步初始化 详见
basic_stream_socket
构建 socket 对象时也会调用到内部关联的 reactive_socket_service 服务 open 函数,可自行查看源代码
由于 accept 严格意义上将并不是创建一个新的 socket,而是由操作系统把一个已有的客户端 socket 转移到用户程序中,所以并不会调用到 socket 对象的构建流程,而是使用 assign 函数完成转移(流程上类似 open 详见)
async_read_some()
basic_stream_socket.hpp
template <typename MutableBufferSequence,
BOOST_ASIO_COMPLETION_TOKEN_FOR(void(boost::system::error_code,
std::size_t))
ReadToken = default_completion_token_t<executor_type>>
auto async_read_some(
const MutableBufferSequence &buffers,
ReadToken &&token = default_completion_token_t<executor_type>())
-> decltype(async_initiate<ReadToken, void(boost::system::error_code,
std::size_t)>(
declval<initiate_async_receive>(), token, buffers,
socket_base::message_flags(0))) {
return async_initiate<ReadToken,
void(boost::system::error_code, std::size_t)>(
initiate_async_receive(this), token, buffers,
socket_base::message_flags(0));
}
flowchart TD
async_read_some["`async_read_some
<Buffer, ReadToken>
(buffers,
token)`"]
async_initiate["`async_initiate
<ReadToken, void(error_code, size_t)>
(initiate_async_receive(this),
token,
buffers, message_flags)`"]
async_result["`async_result
<CompleteToken, ...Signatures,
Initiate, ...Args>::
initiate(initiate,
token,
args...)`"]
async_read_some --> async_initiate
async_initiate --> async_result
同上,即 initiate_async_receive(this)(token, buffers, message_flags)
initiate_async_receive()
basic_stream_socket.hpp
class initiate_async_receive {
public:
typedef Executor executor_type;
explicit initiate_async_receive(basic_stream_socket * self) : self_(self) {}
const executor_type &get_executor() const noexcept {
return self_->get_executor();
}
template <typename ReadHandler, typename MutableBufferSequence>
void operator()(ReadHandler &&handler, const MutableBufferSequence &buffers,
socket_base::message_flags flags) const {
detail::non_const_lvalue<ReadHandler> handler2(handler);
self_->impl_.get_service().async_receive(self_->impl_.get_implementation(),
buffers, flags, handler2.value,
self_->impl_.get_executor());
}
private:
basic_stream_socket *self_;
};
同上
io_object_impl
detail/io_object_impl.hpp
template <typename IoObjectService,
typename Executor = io_context::executor_type>
class io_object_impl {
io_object_impl(int, const executor_type &ex)
: service_(&boost::asio::use_service<IoObjectService>(
io_object_impl::get_context(ex))),
executor_(ex) {
service_->construct(implementation_);
}
};
io_object_impl 是负责构造/使用 service 创建 impl 的抽象泛型类
service_registry 是 Asio 用于延迟、按需初始化服务的机制,把公共服务设施挂载到 executor_context 上,避免重复创建或在不需要时创建造成的额外开销,当需要使用指定服务类时,可通过调用 make_service 转发参数构造或 use_service 默认初始化构造,已存在则直接返回类引用
参考 detail::service_registry 中的实现 详见
以 Linux 中的 Epoll 举例,service 对应到 eventpoll,而 impl 对应到 eventpoll 设施中的 epitem 事件节点
reactive_socket_service
typedef epoll_reactor reactor;
template <typename Protocol>
class reactive_socket_service :
public execution_context_service_base<reactive_socket_service<Protocol>>,
public reactive_socket_service_base {
struct implementation_type
: reactive_socket_service_base::base_implementation_type {
implementation_type() : protocol_(endpoint_type().protocol()) {}
// 协议类型
protocol_type protocol_;
};
// 创建 socket 到 impl
boost::system::error_code open(implementation_type &impl,
const protocol_type &protocol,
boost::system::error_code &ec) {
if (!do_open(impl, protocol.family(),
protocol.type(), protocol.protocol(), ec))
impl.protocol_ = protocol;
return ec;
}
// 转移原生 socket 到 impl
boost::system::error_code assign(implementation_type &impl,
const protocol_type &protocol,
const native_handle_type &native_socket,
boost::system::error_code &ec) {
if (!do_assign(impl, protocol.type(), native_socket, ec))
impl.protocol_ = protocol;
return ec;
}
};
管理 socket 操作相关的 service 类
reactive_socket_service_base
detail/reactive_socket_service_base.hpp
class reactive_socket_service_base {
struct base_implementation_type {
// 原始 socket
socket_type socket_;
// 状态
socket_ops::state_type state_;
// 挂载到 reactor 上的异步 op 指针
descriptor_state *reactor_data_;
};
// 构造 epoll_reactor
reactive_socket_service_base(execution_context &context)
: reactor_(use_service<reactor>(context)) {}
boost::system::error_code
do_open(reactive_socket_service_base::base_implementation_type &impl, int af,
int type, int protocol, boost::system::error_code &ec) {
if (is_open(impl)) {
ec = boost::asio::error::already_open;
return ec;
}
// 创建 server socket
socket_holder sock(socket_ops::socket(af, type, protocol, ec));
if (sock.get() == invalid_socket)
return ec;
// 往 epoll_reactor 上注册当前 server socket 相关的 I/O 事件
// 初始化 reactor_data_,设置 ptr 为 acceptor 的 impl.reactor_data_ 指针
if (int err =
reactor_.register_descriptor(sock.get(), impl.reactor_data_)) {
ec = boost::system::error_code(err,
boost::asio::error::get_system_category());
return ec;
}
impl.socket_ = sock.release();
switch (type) {
case SOCK_STREAM:
impl.state_ = socket_ops::stream_oriented | extra_state_;
break;
case SOCK_DGRAM:
impl.state_ = socket_ops::datagram_oriented | extra_state_;
break;
default:
impl.state_ = 0;
break;
}
ec = boost::system::error_code();
return ec;
}
boost::system::error_code reactive_socket_service_base::do_assign(
reactive_socket_service_base::base_implementation_type &impl, int type,
const reactive_socket_service_base::native_handle_type &native_socket,
boost::system::error_code &ec) {
if (is_open(impl)) {
ec = boost::asio::error::already_open;
return ec;
}
// 往 epoll_reactor 上注册当前 socket 相关的 I/O 事件
if (int err =
reactor_.register_descriptor(native_socket, impl.reactor_data_)) {
ec = boost::system::error_code(err,
boost::asio::error::get_system_category());
return ec;
}
// 转移当前 socket
impl.socket_ = native_socket;
switch (type) {
case SOCK_STREAM:
impl.state_ = socket_ops::stream_oriented | extra_state_;
break;
case SOCK_DGRAM:
impl.state_ = socket_ops::datagram_oriented | extra_state_;
break;
default:
impl.state_ = 0;
break;
}
impl.state_ |= socket_ops::possible_dup;
ec = boost::system::error_code();
return ec;
}
void construct(base_implementation_type& impl);
reactor& reactor_;
};
Asio 将协议无关的代码划分到了 reactive_socket_service_base 基类中,仅由派生类处理协议相关内容以减少重复的二进制代码。对应的异步 op 为 descriptor_state 详见
async_move_accept()
detail/reactive_socket_service.hpp
template <typename PeerIoExecutor, typename Handler, typename IoExecutor>
void async_move_accept(implementation_type &impl,
const PeerIoExecutor &peer_io_ex,
endpoint_type *peer_endpoint, Handler &handler,
const IoExecutor &io_ex) {
bool is_continuation =
BOOST_ASIO_VERSIONED_NAME(handler_cont_helpers)::is_continuation(handler);
associated_cancellation_slot_t<Handler> slot =
boost::asio::get_associated_cancellation_slot(handler);
typedef reactive_socket_move_accept_op<Protocol, PeerIoExecutor, Handler,
IoExecutor>
op;
typename op::ptr p = {boost::asio::detail::addressof(handler),
op::ptr::allocate(handler), 0};
p.p = new (p.v) op(success_ec_, peer_io_ex, impl.socket_, impl.state_,
impl.protocol_, peer_endpoint, handler, io_ex);
if (slot.is_connected()) {
p.p->cancellation_key_ = &slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
}
start_accept_op(impl, p.p, is_continuation, false, &io_ex, 0);
p.v = p.p = 0;
}
将异步接受连接及完成回调封装成 reactive_socket_move_accept_op,调用 start_accept_op 挂载到 reactor_ 服务(即 epoll_reactor,在 reactive_socket_service_base 初始化之时通过 use_service 创建)里对应的 descriptor_state(fd)上等待事件到达 详见
flowchart LR
start_accept_op --> do_start_accept_op --> do_start_op --> reactor_.start_op
在 do_start_accept_op 函数中,accept 实际对应的 op_types 为 reactor::read_op。表示该连接的三次握手已完成,进入全连接队列,内核将该 fd 标记为可读
reactive_socket_move_accept_op
detail/reactive_socket_accept_op.hpp
typename IoExecutor>
class reactive_socket_move_accept_op
: private Protocol::socket::template rebind_executor<PeerIoExecutor>::other,
public reactive_socket_accept_op_base<
typename Protocol::socket::template rebind_executor<
PeerIoExecutor>::other,
Protocol> {
public:
typedef Handler handler_type;
typedef IoExecutor io_executor_type;
// RAII 资源管理指针
BOOST_ASIO_DEFINE_HANDLER_PTR(reactive_socket_move_accept_op);
reactive_socket_move_accept_op(const boost::system::error_code &success_ec,
const PeerIoExecutor &peer_io_ex,
socket_type socket,
socket_ops::state_type state,
const Protocol &protocol,
typename Protocol::endpoint *peer_endpoint,
Handler &handler, const IoExecutor &io_ex)
: peer_socket_type(peer_io_ex),
reactive_socket_accept_op_base<peer_socket_type, Protocol>(
success_ec, socket, state, *this, protocol, peer_endpoint,
&reactive_socket_move_accept_op::do_complete),
handler_(static_cast<Handler &&>(handler)), work_(handler_, io_ex) {}
// 就绪时调用的通知函数
static void do_complete(void *owner, operation *base,
const boost::system::error_code & /*ec*/,
std::size_t /*bytes_transferred*/) {
reactive_socket_move_accept_op* o(
static_cast<reactive_socket_move_accept_op*>(base));
ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
// 将 socket 从 new_socket_ 转移到自身
if (owner)
o->do_assign();
handler_work<Handler, IoExecutor> w(
static_cast<handler_work<Handler, IoExecutor>&&>(
o->work_));
// 绑定异步事件的 I/O 结果到 handler,即 error_code 和 socket
detail::move_binder2<Handler,
boost::system::error_code, peer_socket_type>
handler(0, static_cast<Handler&&>(o->handler_), o->ec_,
static_cast<peer_socket_type&&>(*o));
p.h = boost::asio::detail::addressof(handler.handler_);
p.reset();
if (owner) {
// 内存屏障
fenced_block b(fenced_block::half);
// 调用实际的通知函数,将 handler 提交到 handler.handler_ 绑定的 executor 去执行
w.complete(handler, handler.handler_);
}
}
private:
typedef typename Protocol::socket::template
rebind_executor<PeerIoExecutor>::other peer_socket_type;
Handler handler_;
handler_work<Handler, IoExecutor> work_;
};
template <typename Socket, typename Protocol>
class reactive_socket_accept_op_base : public reactor_op {
public:
reactive_socket_accept_op_base(const boost::system::error_code &success_ec,
socket_type socket,
socket_ops::state_type state, Socket &peer,
const Protocol &protocol,
typename Protocol::endpoint *peer_endpoint,
func_type complete_func)
: reactor_op(success_ec, &reactive_socket_accept_op_base::do_perform,
complete_func),
socket_(socket), state_(state), peer_(peer), protocol_(protocol),
peer_endpoint_(peer_endpoint),
addrlen_(peer_endpoint ? peer_endpoint->capacity() : 0) {}
// 事件到达时执行
// 从 socket_ops::accept 系统调用中取出 socket 并存入 new_socket_ 中
static status do_perform(reactor_op *base) {
BOOST_ASIO_ASSUME(base != 0);
reactive_socket_accept_op_base* o(
static_cast<reactive_socket_accept_op_base*>(base));
socket_type new_socket = invalid_socket;
// accept 系统调用
status result = socket_ops::non_blocking_accept(o->socket_,
o->state_, o->peer_endpoint_ ? o->peer_endpoint_->data() : 0,
o->peer_endpoint_ ? &o->addrlen_ : 0, o->ec_, new_socket)
? done : not_done;
o->new_socket_.reset(new_socket);
return result;
}
// 从 new_socket_ 中转移给自身
void do_assign() {
if (new_socket_.get() != invalid_socket) {
if (peer_endpoint_)
peer_endpoint_->resize(addrlen_);
peer_.assign(protocol_, new_socket_.get(), ec_);
if (!ec_)
new_socket_.release();
}
}
private:
socket_type socket_;
socket_ops::state_type state_;
socket_holder new_socket_;
Socket& peer_;
Protocol protocol_;
typename Protocol::endpoint* peer_endpoint_;
std::size_t addrlen_;
};
当该事件到来时,调用 reactor_op::perform() 从 socket_ops::accept 系统调用中取出 socket 并存入当前 op 的 new_socket_ 中(reactor_op 是依附在 epoll_reactor::descriptor_state 上存在的,代表该 fd 上的异步操作) 详见
detail/handler_work.hpp
template <typename Handler, typename IoExecutor, typename = void>
class handler_work
: handler_work_base<IoExecutor>,
handler_work_base<associated_executor_t<Handler, IoExecutor>,
IoExecutor> {
public:
typedef handler_work_base<IoExecutor> base1_type;
typedef handler_work_base<associated_executor_t<Handler, IoExecutor>,
IoExecutor> base2_type;
handler_work(Handler &handler, const IoExecutor &io_ex) noexcept
: base1_type(0, 0, io_ex),
base2_type(base1_type::owns_work(),
boost::asio::get_associated_executor(handler, io_ex),
io_ex) {}
template <typename Function>
void complete(Function &function, Handler &handler) {
if (!base1_type::owns_work() && !base2_type::owns_work()) {
// When using a native implementation, I/O completion handlers are
// already dispatched according to the execution context's executor's
// rules. We can call the function directly.
static_cast<Function&&>(function)();
} else {
base2_type::dispatch(function, handler);
}
}
};
当异步完成时,调用 complete 函数完成通知或就地执行(如果是 io_context 的内部事件,或者 IoExecutor 和 指定的 HandlerExecutor 是同一个,则直接在 io_context 的事件循环中执行,不需要再经过 dispatch 提交到 op_queue_ 等待调度执行)
dispatch 依赖所指定的 executor 实现 详见
async_receive()
detail/reactive_socket_service_base.hpp
template <typename MutableBufferSequence,
typename Handler, typename IoExecutor>
void async_receive(base_implementation_type& impl,
const MutableBufferSequence& buffers, socket_base::message_flags flags,
Handler& handler, const IoExecutor& io_ex) {
bool is_continuation =
BOOST_ASIO_VERSIONED_NAME(handler_cont_helpers)::is_continuation(handler);
associated_cancellation_slot_t<Handler> slot =
boost::asio::get_associated_cancellation_slot(handler);
typedef reactive_socket_recv_op<MutableBufferSequence, Handler, IoExecutor>
op;
typename op::ptr p = {boost::asio::detail::addressof(handler),
op::ptr::allocate(handler), 0};
p.p = new (p.v) op(success_ec_, impl.socket_, impl.state_, buffers, flags,
handler, io_ex);
if (slot.is_connected()) {
p.p->cancellation_key_ = &slot.template emplace<reactor_op_cancellation>(
&reactor_, &impl.reactor_data_, impl.socket_, reactor::read_op);
}
start_op(impl,
(flags & socket_base::message_out_of_band) ? reactor::except_op
: reactor::read_op,
p.p, is_continuation,
(flags & socket_base::message_out_of_band) == 0,
((impl.state_ & socket_ops::stream_oriented) &&
buffer_sequence_adapter<boost::asio::mutable_buffer,
MutableBufferSequence>::all_empty(buffers)),
true, &io_ex, 0);
p.v = p.p = 0;
}
同上,将异步读取及完成回调封装成 reactive_socket_recv_op 并通过 start_op 完成挂载
epoll_reactor
descriptor_state
detail/epoll_reactor.hpp
// 一个 descriptor_state 对应一个 fd
// 其上挂载了所有等待它的异步操作
struct descriptor_state : operation {
descriptor_state *next_;
descriptor_state *prev_;
mutex mutex_;
epoll_reactor *reactor_;
int descriptor_;
uint32_t registered_events_;
op_queue<reactor_op> op_queue_[max_ops];
bool try_speculative_[max_ops];
bool shutdown_;
BOOST_ASIO_DECL descriptor_state(bool locking, int spin_count);
void set_ready_events(uint32_t events) { task_result_ = events; }
void add_ready_events(uint32_t events) { task_result_ |= events; }
// 检测 epoll_wait 返回的 event 类型(read_op, connect_or_write_op, except_op)
// 收集对应激活的 op_queue_[activate_op_flag...] 到 ops_
// 返回第一个 op 并交由 do_complete 中执行
// 其余由追加到 scheduler 的 op_queue_ 中由事件循环推动
operation *perform_io(uint32_t events) {
mutex_.lock();
perform_io_cleanup_on_block_exit io_cleanup(reactor_);
mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock);
static const int flag[max_ops] = {EPOLLIN, EPOLLOUT, EPOLLPRI};
for (int j = max_ops - 1; j >= 0; --j) {
if (events & (flag[j] | EPOLLERR | EPOLLHUP)) {
try_speculative_[j] = true;
while (reactor_op *op = op_queue_[j].front()) {
if (reactor_op::status status = op->perform()) {
op_queue_[j].pop();
io_cleanup.ops_.push(op);
if (status == reactor_op::done_and_exhausted) {
try_speculative_[j] = false;
break;
}
} else
break;
}
}
}
io_cleanup.first_op_ = io_cleanup.ops_.front();
io_cleanup.ops_.pop();
return io_cleanup.first_op_;
}
// 就绪时需要触发的钩子
// 在事件循环中的 o->complete(this, ec, task_result) 中被调用
static void do_complete(void *owner, operation *base,
const boost::system::error_code &ec,
std::size_t bytes_transferred) {
if (owner) {
// 检查记录的 owner 是否存活,使用 void* 避免多态开销
descriptor_state *descriptor_data = static_cast<descriptor_state *>(base);
uint32_t events = static_cast<uint32_t>(bytes_transferred);
if (operation *op = descriptor_data->perform_io(events)) {
op->complete(owner, ec, 0);
}
}
}
// 继承自 operation,保存 &do_complete 指针到父类 func_ 然后通过 complete 调用
descriptor_state(bool locking, int spin_count)
: operation(&epoll_reactor::descriptor_state::do_complete),
mutex_(locking, spin_count) {}
};
对应到 fd 的上下文
start_op()
detail/impl/epoll_reactor.ipp
void epoll_reactor::start_op(int op_type, socket_type descriptor,
epoll_reactor::per_descriptor_data &descriptor_data,
reactor_op *op, bool is_continuation,
bool allow_speculative,
void (*on_immediate)(operation *, bool, const void *),
const void *immediate_arg) {
if (!descriptor_data) {
op->ec_ = boost::asio::error::bad_descriptor;
on_immediate(op, is_continuation, immediate_arg);
return;
}
mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);
if (descriptor_data->shutdown_) {
on_immediate(op, is_continuation, immediate_arg);
return;
}
if (descriptor_data->op_queue_[op_type].empty()) {
if (allow_speculative &&
(op_type != read_op || descriptor_data->op_queue_[except_op].empty())) {
if (descriptor_data->try_speculative_[op_type]) {
if (reactor_op::status status = op->perform()) {
if (status == reactor_op::done_and_exhausted)
if (descriptor_data->registered_events_ != 0)
descriptor_data->try_speculative_[op_type] = false;
descriptor_lock.unlock();
on_immediate(op, is_continuation, immediate_arg);
return;
}
}
if (descriptor_data->registered_events_ == 0) {
op->ec_ = boost::asio::error::operation_not_supported;
on_immediate(op, is_continuation, immediate_arg);
return;
}
if (op_type == write_op) {
if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) {
epoll_event ev = { 0, { 0 } };
ev.events = descriptor_data->registered_events_ | EPOLLOUT;
ev.data.ptr = descriptor_data;
if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) {
descriptor_data->registered_events_ |= ev.events;
} else {
op->ec_ = boost::system::error_code(
errno, boost::asio::error::get_system_category());
on_immediate(op, is_continuation, immediate_arg);
return;
}
}
}
} else if (descriptor_data->registered_events_ == 0) {
op->ec_ = boost::asio::error::operation_not_supported;
on_immediate(op, is_continuation, immediate_arg);
return;
} else {
if (op_type == write_op) {
descriptor_data->registered_events_ |= EPOLLOUT;
}
epoll_event ev = {0, {0}};
ev.events = descriptor_data->registered_events_;
ev.data.ptr = descriptor_data;
epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
}
}
descriptor_data->op_queue_[op_type].push(op);
scheduler_.work_started();
}
中间都是一些状态判断,我们只需要关注最后的 descriptor_data->op_queue_[op_type].push(op); 将当前 op 挂载到 fd 的 op_type 类型的 op_queue_ 中,等待对应事件的触发
run()
detail/impl/epoll_reactor.ipp
void epoll_reactor::run(long usec, op_queue<operation> &ops) {
// Calculate timeout. Check the timer queues only if timerfd is not in use.
int timeout;
if (usec == 0)
timeout = 0;
else {
timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
if (timer_fd_ == -1) {
mutex::scoped_lock lock(mutex_);
timeout = get_timeout(timeout);
}
}
// 获取 epoll_wait 就绪事件
epoll_event events[128];
int num_events = epoll_wait(epoll_fd_, events, 128, timeout);
for (int i = 0; i < num_events; ++i) {
// 获取 fd 的上下文
void* ptr = events[i].data.ptr;
descriptor_state *descriptor_data = static_cast<descriptor_state *>(ptr);
if (!ops.is_enqueued(descriptor_data)) {
descriptor_data->set_ready_events(events[i].events);
ops.push(descriptor_data);
} else {
descriptor_data->add_ready_events(events[i].events);
}
}
// 裁剪了 timer_fd 相关的逻辑
}
epoll_reactor 对应到 epoll_wait 的事件循环(实际会由 io_context 的事件循环中的 task_operation_ 事件时调用 task_.run() 详见)
事件循环
io_context
---
title: io_context relationship
---
classDiagram
class i__basic_executor_type["io_context::basic_executor_type"] {
+context* target_
+basic_executor_type require(PropXXX)
+PropXXX query(TagXXX)
}
class e__service["execution_context::service"] {
+execution_context& owner_
+service* next_
+func* destory_
}
class execution_context_service_base~Type~ {
+static service_id~Type~ id
}
execution_context_service_base <|-- e__service
class d__service_registry["detail::service_registry"] {
+mutex mutex_
+execution_context& owner_
+execution_context::service* first_service_
+execution_context::service* create~Service,Owner~(execution_context, owner, Args... args)
+Service& use_service~Service~()
+Service& make_service~Service~()
}
class execution_context {
+auto_allocator_ptr allocator_
+detail::service_registry service_registry_
+Service& use_service~Service~()
+Service& make_service~Service~()
}
execution_context *-- d__service_registry
class scheduler {
+op_queue~operation~ op_queue_
+thread thread_
+size_t run(error_code)
+size_t do_run_one(lock, thread_info, error_code)
}
scheduler <|-- execution_context_service_base
class io_context {
+scheduler impl_
+basic_executor_type get_executor_type()
+size_t run()
}
io_context *-- scheduler
io_context <|-- execution_context
io_context 是 Asio 中与异步网络类系统调用的桥梁,充当了系统调用的注册发起和异步完成回调的调度主体
事件循环与调度中枢
io_context 负责管理和调度所有的异步 I/O 操作
所有的 I/O 对象(tcp::socket, steady_timer, …)都必需绑定到一个 io_context 实例
完成回调由 io_context 分发给 CompleteHander 所绑定工作线程执行
跨平台支持
- Linux:epoll(default), io_uring
- Windows:IOCP
- MacOS:kqueue
#if defined(BOOST_ASIO_HAS_IOCP)
typedef win_iocp_io_context io_context_impl;
class win_iocp_overlapped_ptr;
#else
typedef scheduler io_context_impl;
#endif
io_context_impl 是 指定平台 对应的 io_context 调度器抽象类
run()
impl/io_context.ipp
io_context::count_type io_context::run() {
count_type s = impl_.run(ec);
return s;
}
run() 转发到实际实现 io_context_impl.run()
scheduler
以 Linux 平台所对应的 scheduler 实现为例
flowchart LR
run["run()"]-->thread_info["thread_info ths;"]
thread_info-->do_run_one1["do_run_one(&ths)"]
do_run_one1-- while loop -->do_run_one1
flowchart TD
do_run_one(["do_run_one()"])-->op_empty_switch{"op_queue_.empty()?"}
op_empty_switch-- false -->get_op{"get op and switch"}
op_empty_switch-- true -->sleep(["sleep & wait for signal"])
get_op-- task_operation_ -->task_run["`task_.run
(task_usec_,
ths.private_op_queue)`"]
get_op-- others -->complete["op.complete()"]
task_run-->collect["`op_queue_ +=
ths.private_op_queue`"]
collect & complete-->finish(["finish"])
thread_info
detail/scheduler_thread_info.hpp
struct scheduler_thread_info : public thread_info_base
{
op_queue<scheduler_operation> private_op_queue;
long private_outstanding_work;
};
scheduler_thread_info 作为 thread local 存储块,主要存储了一个私有就绪 operation 队列,用于收集并一次性提交到 scheduler 的 op_queue_ 中
run()
detail/impl/scheduler.ipp
std::size_t scheduler::run(boost::system::error_code &ec) {
thread_info this_thread; // 创建当前线程存储块
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_); // 获取 op_queue_ 锁
std::size_t n = 0;
for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
}
执行 io_context.run() 的线程实际会通过接管 scheduler 循环执行 do_run_once() 来推动 scheduler 的异步事件循环
do_run_one()
detail/impl/scheduler.ipp
std::size_t scheduler::do_run_one(mutex::scoped_lock &lock,
scheduler::thread_info &this_thread,
const boost::system::error_code &ec) {
while (!stopped_) {
if (!op_queue_.empty()) {
// 获取待执行 op
operation *o = op_queue_.front();
op_queue_.pop();
if (o == &task_operation_) { // 代表执行 epoll_wait 监听的标记
// lock.unlock
// 唤醒其他事件循环线程
// ...
// 保持只有一个线程处于 epoll_wait
// on_exit 析构时重新添加 task_operation_ 进 op_queue_
task_cleanup on_exit = {this, &lock, &this_thread};
// 监听 epoll 并把就绪 op 收集到自身 private_op_queue 队列
// on_exit 析构时添加进 op_queue_
task_->run(more_handlers ? 0 : task_usec_,
this_thread.private_op_queue);
// 减少锁竞争优化
// on_exit 析构时会去获取 lock 并更新 op_queue_ 队列
// 所以这里并不会如同执行用户回调 op 一样直接返回,而是再次获取待执行 op
} else { // 执行就绪的 op
// lock.unlock
// 唤醒其他事件循环线程
// ...
work_cleanup on_exit = { this, &lock, &this_thread };
// 可能有立刻就绪的 op 产生,收集到 private_op_queue 队列
// on_exit 析构时添加进 op_queue_
o->complete(this, ec, o->task_result_);
return 1; // 统计执行用户提交的异步操作次数
}
} else {
// 空轮询优化
// ...
}
}
return 0;
}
do_run_one() 为完整的一次事件循环,从全局就绪列表中取出已就绪的异步 op 并执行
scheduler_task
detail/scheduler_task.hpp
class scheduler_task {
public:
virtual void run(long usec, op_queue<scheduler_operation>& ops) = 0;
virtual void interrupt() = 0;
};
scheduler_task 是 scheduler 执行的任务循环事件,通常是有关异步网络 I/O 的组件(系统调用 epoll, io_uring, kqueue 等)
在 Linux 上是默认使用 epoll_reactor,详见
operation
#if defined(BOOST_ASIO_HAS_IOCP)
typedef win_iocp_operation operation;
#else
typedef scheduler_operation operation;
#endif
scheduler 以 operation 为单位进行调度,op_queue_ 为调度器所持有的已就绪异步 op 列表
为了统一调度管理,Asio 将异步网络的系统调用统一封装为 task_operation_ 标签
取出 task_operation_ 的线程执行 epoll_wait 并收集就绪的系统调用到 op_queue_
| 对象类型 | 来源 | 说明 |
|---|---|---|
task_operation | 内部任务调度 | 哨兵对象,执行 epoll_wait 并收集就绪的系统调用 |
descriptor_state | 文件 fd 状态 | socket 可读/可写通知,内部持有 reactor_op |
reactor_op | I/O 反应器 | 用于 socket 不同类型的 op(读、写、连接、异常) |
wait_op | 定时器到期 | deadline_timer_service, steady_timer_service 注册 |
user handler_op | 用户提交 | io_context::post() 提交的函数对象 |
task_operation
当前线程执行 epoll_wait 等待,将就绪的 socket/timer 的 descriptor_state/wait_op 收集到 private_op_queue 中,随后将 private_op_queue 一并合并到 op_queue_ 中等待下一次调度
others
执行 op->complete(),标记当前 op 已就绪(根据所绑定的 executor 选择立即执行完成回调 or 提交到就绪队列等待调度执行)
空轮询优化:当启动 io_context.run() 时还未注册异步 I/O,防止空转消耗 CPU 资源,会通过 wakeup_event_ 触发 pthread_cond_wait,直到有新注册的异步 I/O 发送 signal 唤醒
scheduler_operation
detail/scheduler_operation.hpp
class scheduler_operation {
public:
typedef scheduler_operation operation_type;
void complete(void *owner, const boost::system::error_code &ec,
std::size_t bytes_transferred) {
func_(owner, this, ec, bytes_transferred);
}
void destroy() { func_(0, this, boost::system::error_code(), 0); }
protected:
typedef void (*func_type)(void *, scheduler_operation *,
const boost::system::error_code &, std::size_t);
scheduler_operation(func_type func)
: next_(0), func_(func), task_result_(0) {}
~scheduler_operation() {}
private:
friend class op_queue_access;
scheduler_operation *next_;
func_type func_;
protected:
friend class scheduler;
unsigned int task_result_; // 记录接受到的字节数
};
所有 operation 的基类,事件完成时通过 complete(owner, error_code, tag) 函数通知
executor
异步任务在完成时需要通过某种机制通知等待它的注册方,对应了 executor 的实现
executor 是实际执行任务的主体,Asio 通过给各类任务调度执行器都封装了 executor 以便在任何时候接收其他线程的任务提交
例如 io_context::executor_type、thread_pool::executor_type 以及 strand<Executor> 封装的串行执行 等
io_context.hpp
// executor 通过存储指针 target_ 关联对应的 io_context
// 在 cpp 内存对齐下,指针末两位始终为0
// asio 将其作为标志位表示是否允许阻塞、是否延续执行
struct io_context_bits {
static constexpr uintptr_t blocking_never = 1;
static constexpr uintptr_t relationship_continuation = 2;
static constexpr uintptr_t outstanding_work_tracked = 4;
static constexpr uintptr_t runtime_bits = 3;
};
template <typename Allocator, uintptr_t Bits>
class io_context::basic_executor_type : detail::io_context_bits, Allocator {
public:
/// Copy constructor.
basic_executor_type(const basic_executor_type &other) noexcept
: Allocator(static_cast<const Allocator &>(other)),
target_(other.target_) {}
// 设置 runtime_bits 仅由 execute 使用
constexpr basic_executor_type
require(execution::blocking_t::possibly_t) const {
return basic_executor_type(context_ptr(),
*this, bits() & ~blocking_never);
}
// 查询 runtime_bits
constexpr execution::blocking_t query(execution::blocking_t) const noexcept {
return (bits() & blocking_never)
? execution::blocking_t(execution::blocking.never)
: execution::blocking_t(execution::blocking.possibly);
}
// 标准接口,适配 std
template <typename Function> void execute(Function &&f) const;
template <typename Function, typename OtherAllocator>
void dispatch(Function &&f, const OtherAllocator &a) const;
template <typename Function, typename OtherAllocator>
void post(Function &&f, const OtherAllocator &a) const;
template <typename Function, typename OtherAllocator>
void defer(Function &&f, const OtherAllocator &a) const;
private:
io_context *context_ptr() const noexcept {
return reinterpret_cast<io_context *>(target_ & ~runtime_bits);
}
uintptr_t bits() const noexcept { return target_ & runtime_bits; }
uintptr_t target_;
};
execute()
impl/io_context.hpp
template <typename Allocator, uintptr_t Bits>
template <typename Function>
void io_context::basic_executor_type<Allocator, Bits>::execute(
Function &&f) const {
typedef decay_t<Function> function_type;
// 非阻塞时且当前处于 io_context 循环时,可以直接执行
if ((bits() & blocking_never) == 0 && context_ptr()->impl_.can_dispatch()) {
function_type tmp(static_cast<Function&&>(f));
detail::fenced_block b(detail::fenced_block::full);
static_cast<function_type &&>(tmp)();
return;
}
// 否则转为 operation 入队等待调度执行
typedef detail::executor_op<function_type, Allocator, detail::operation> op;
typename op::ptr p = {
detail::addressof(static_cast<const Allocator &>(*this)),
op::ptr::allocate(static_cast<const Allocator &>(*this)), 0};
p.p = new (p.v) op(static_cast<Function&&>(f),
static_cast<const Allocator&>(*this));
BOOST_ASIO_HANDLER_CREATION((*context_ptr(), *p.p,
"io_context", context_ptr(), 0, "execute"));
context_ptr()->impl_.post_immediate_completion(p.p,
(bits() & relationship_continuation) != 0);
p.v = p.p = 0;
}
post_immediate_completion
void scheduler::post_immediate_completion(
scheduler::operation* op, bool is_continuation) {
work_started();
mutex::scoped_lock lock(mutex_);
op_queue_.push(op);
wake_one_thread_and_unlock(lock);
}
直接把 op 入队已就绪队列即可
执行时机决策
executor 会提供 dispatch、post、defer 三种决策供注册方选择
| dispatch | 条件允许时立刻执行 |
| post | 强制异步 |
| defer | 延迟,但允许优化 |
具体代码可以在 impl/io_context.hpp 中找到(流程类似 execute),这里不再赘述
can_dispatch()
判断当前线程是否处于对应的 io_context 循环中
detail/impl/scheduler.ipp
bool scheduler::can_dispatch() {
return thread_call_stack::contains(this) != 0;
}
thread_call_stack 是专门记录线程的 io_context 调用栈(thread_local),使用了通用的 call_stack<thread_context, thread_info_base>构造
在 io_context::run 内会构造 thread_call_stack,离开时析构以实现追踪 io_context 嵌套链的效果
detail/thread_context.hpp
class thread_context {
public:
BOOST_ASIO_DECL static thread_info_base* top_of_thread_call_stack();
protected:
typedef call_stack<thread_context, thread_info_base> thread_call_stack;
};
detail/call_stack.hpp
template <typename Key, typename Value = unsigned char> class call_stack {
public:
class context : private noncopyable {
public:
explicit context(Key *k) : key_(k), next_(call_stack<Key, Value>::top_) {
value_ = reinterpret_cast<unsigned char*>(this);
call_stack<Key, Value>::top_ = this;
}
context(Key *k, Value &v)
: key_(k), value_(&v), next_(call_stack<Key, Value>::top_) {
call_stack<Key, Value>::top_ = this;
}
~context() { call_stack<Key, Value>::top_ = next_; }
Value *next_by_key() const {
context* elem = next_;
while (elem) {
if (elem->key_ == key_)
return elem->value_;
elem = elem->next_;
}
return 0;
}
private:
friend class call_stack<Key, Value>;
Key* key_;
Value* value_;
context* next_;
};
friend class context;
static Value *contains(Key *k) {
context* elem = top_;
while (elem) {
if (elem->key_ == k)
return elem->value_;
elem = elem->next_;
}
return 0;
}
static Value *top() {
context* elem = top_;
return elem ? elem->value_ : 0;
}
private:
static tss_ptr<context> top_;
};
值得注意的是,asio::thread_pool 中也有类似的 can_dispatch 机制,用于检测是否处在当前的 thread_pool 循环中以优化调用为就地执行,与 io_context 类似,使用 scheduler 作为底层调度器实现
唯一不同点是
io_context 会在第一次异步 op 发起时通过 reactive_socket_service_base 构造 epoll_reactor 服务的同时调用基类 scheduler 的 init_task 完成 scheduler_task 的初始化
thread_pool 不会调用 init_task,对应的 task_ 成员为空指针,因为线程池只需要关注用户提交的异步任务,也没有类似 io_context 的延迟初始化机制
协程适配
TODO