diff --git a/doc/modules/ROOT/nav.adoc b/doc/modules/ROOT/nav.adoc index 02bcc816c..6edaf54aa 100644 --- a/doc/modules/ROOT/nav.adoc +++ b/doc/modules/ROOT/nav.adoc @@ -38,6 +38,7 @@ ** xref:4.guide/4o.file-io.adoc[File I/O] ** xref:4.guide/4p.unix-sockets.adoc[Unix Domain Sockets] ** xref:4.guide/4q.udp.adoc[UDP Sockets] +** xref:4.guide/4r.wait.adoc[Readiness Wait] * xref:5.testing/5.intro.adoc[Testing] ** xref:5.testing/5a.mocket.adoc[Mock Sockets] ** xref:5.testing/5b.socket-pair.adoc[Socket Pairs] diff --git a/doc/modules/ROOT/pages/4.guide/4r.wait.adoc b/doc/modules/ROOT/pages/4.guide/4r.wait.adoc new file mode 100644 index 000000000..81f535b11 --- /dev/null +++ b/doc/modules/ROOT/pages/4.guide/4r.wait.adoc @@ -0,0 +1,148 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + += Readiness Wait + +The `wait()` method on every socket and acceptor suspends until the +underlying file descriptor becomes ready in a chosen direction, without +transferring any bytes. Use it to integrate with C libraries that own +the I/O on a nonblocking file descriptor and only need notification +that data is available or that the descriptor is writable. + +[NOTE] +==== +Code snippets assume: +[source,cpp] +---- +#include +#include + +namespace corosio = boost::corosio; +---- +==== + +== Overview + +Three directions are exposed via the `wait_type` enum: + +[source,cpp] +---- +enum class wait_type { read, write, error }; +---- + +The awaitable yields an `error_code` with no `bytes_transferred`. On +success the socket is observed to be ready; no data has been consumed +from it. + +[source,cpp] +---- +auto [ec] = co_await sock.wait(corosio::wait_type::read); +if (!ec) { + // sock is readable: a subsequent read_some will return data + // without blocking. +} +---- + +== Wrapping a Nonblocking C API + +The original motivation is libraries such as libssh and libpq that +manage their own buffers on an `O_NONBLOCK` fd. They need a "tell me +when the fd is ready" primitive that does not steal bytes from the +stream. + +The typical pattern: + +[source,cpp] +---- +// pq is some PG connection holding a nonblocking socket fd. +corosio::tcp_socket sock = adopt_fd(ioc, PQsocket(pq)); + +while (PQisBusy(pq)) { + auto [ec] = co_await sock.wait(corosio::wait_type::read); + if (ec) co_return ec; + if (PQconsumeInput(pq) == 0) + co_return last_pq_error(pq); +} +---- + +Because `wait()` does not call `recv()`, the C library's next +`PQconsumeInput` (or equivalent) sees all the data the kernel has +delivered. + +== Acceptors + +`tcp_acceptor` and `local_stream_acceptor` expose the same `wait()`. +For `wait_type::read`, completion signals that a connection is pending +on the listen socket. A subsequent `accept()` will succeed without +blocking: + +[source,cpp] +---- +auto [wec] = co_await acceptor.wait(corosio::wait_type::read); +if (wec) co_return; + +corosio::tcp_socket peer(ioc); +auto [aec] = co_await acceptor.accept(peer); +---- + +This is useful when application-level conditions must be checked +before consuming the next connection (rate limiting, backpressure +signaling) without holding an `accept()` call open. + +== Cancellation + +`wait()` honors the stop token of its `co_await` environment and the +`socket.cancel()` / `acceptor.cancel()` non-virtuals, completing with +`capy::cond::canceled`: + +[source,cpp] +---- +auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await sock.wait(corosio::wait_type::read); + // ec == capy::cond::canceled if sock.cancel() was invoked +}; +---- + +`cancel_after()` composes with `wait()` the same way it does with the +other socket operations. + +== `wait_type::write` Semantics + +`wait(wait_type::write)` always completes immediately with success on +a connected socket. This matches asio's behavior on the IOCP backend +and gives a consistent contract across all corosio backends. The +intended use is: "I want to know I can write now," not "I want to +park until the send buffer drains after backpressure." + +Backpressure on the send path is already surfaced by `write_some()` +returning fewer bytes than requested (or `EAGAIN`-equivalent +behavior); use that signal rather than `wait(wait_type::write)` to +react to a full send buffer. + +== Backend Notes + +On Linux (epoll) and BSD/macOS (kqueue) the read and error waits +register interest in the fd's read or error event without performing +any I/O syscall. On the select backend the same registration +semantics apply through the select-loop's fd sets. Write waits +short-circuit and never enter the reactor (see above). + +On Windows (IOCP), stream-socket `wait_read` uses a zero-byte +`WSARecv`: the kernel signals completion when data is available +without consuming bytes. All other waits (datagram-read, +acceptor-read, error-wait) route through an auxiliary `WSAPoll`-based +reactor that runs on a dedicated thread and bridges into the IOCP +via `PostQueuedCompletionStatus`. The public API is uniform across +platforms. + +== See Also + +* xref:4d.sockets.adoc[Sockets] +* xref:4e.tcp-acceptor.adoc[Acceptors] +* xref:4q.udp.adoc[UDP Sockets] diff --git a/include/boost/corosio/local_datagram_socket.hpp b/include/boost/corosio/local_datagram_socket.hpp index fa509cc26..7797c3e9e 100644 --- a/include/boost/corosio/local_datagram_socket.hpp +++ b/include/boost/corosio/local_datagram_socket.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -204,6 +205,27 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object std::error_code* ec, std::size_t* bytes_out) = 0; + /** Initiate an asynchronous wait for socket readiness. + + Completes when the socket becomes ready for the + specified direction, or an error condition is + reported. No bytes are transferred. + + @param h Coroutine handle to resume on completion. + @param ex Executor for dispatching the completion. + @param w The direction to wait on. + @param token Stop token for cancellation. + @param ec Output error code. + + @return Coroutine handle to resume immediately. + */ + virtual std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) = 0; + /// Shut down part or all of the socket. virtual std::error_code shutdown(shutdown_type what) noexcept = 0; @@ -346,6 +368,23 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object } }; + /// Represent the awaitable returned by @ref wait. + struct wait_awaitable + : detail::void_op_base + { + local_datagram_socket& s_; + wait_type w_; + + wait_awaitable(local_datagram_socket& s, wait_type w) noexcept + : s_(s), w_(w) {} + + std::coroutine_handle<> dispatch( + std::coroutine_handle<> h, capy::executor_ref ex) const + { + return s_.get().wait(h, ex, w_, token_, &ec_); + } + }; + /** Represent the awaitable returned by @ref send. Captures the buffer, then dispatches to the backend @@ -528,6 +567,25 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object return connect_awaitable(*this, ep); } + /** Wait for the socket to become ready in a given direction. + + Suspends until the socket is ready for the requested + direction, or an error condition is reported. No bytes + are transferred. + + @param w The wait direction (read, write, or error). + + @return An awaitable that completes with `io_result<>`. + + @par Preconditions + The socket must be open. This socket must outlive the + returned awaitable. + */ + [[nodiscard]] auto wait(wait_type w) + { + return wait_awaitable(*this, w); + } + /** Send a datagram to the specified destination. Completes when the entire datagram has been accepted diff --git a/include/boost/corosio/local_stream_acceptor.hpp b/include/boost/corosio/local_stream_acceptor.hpp index 19eaa739a..e7b5247f5 100644 --- a/include/boost/corosio/local_stream_acceptor.hpp +++ b/include/boost/corosio/local_stream_acceptor.hpp @@ -12,6 +12,8 @@ #include #include +#include +#include #include #include #include @@ -75,6 +77,22 @@ enum class bind_option */ class BOOST_COROSIO_DECL local_stream_acceptor : public io_object { + struct wait_awaitable + : detail::void_op_base + { + local_stream_acceptor& acc_; + wait_type w_; + + wait_awaitable(local_stream_acceptor& acc, wait_type w) noexcept + : acc_(acc), w_(w) {} + + std::coroutine_handle<> dispatch( + std::coroutine_handle<> h, capy::executor_ref ex) const + { + return acc_.get().wait(h, ex, w_, token_, &ec_); + } + }; + struct move_accept_awaitable { local_stream_acceptor& acc_; @@ -301,6 +319,27 @@ class BOOST_COROSIO_DECL local_stream_acceptor : public io_object return accept_awaitable(*this, peer); } + /** Wait for an incoming connection or readiness condition. + + Suspends until the listen socket is ready in the + requested direction. For `wait_type::read`, completion + signals that a subsequent @ref accept will succeed + without blocking. No connection is consumed. + + @param w The wait direction. + + @return An awaitable that completes with `io_result<>`. + + @par Preconditions + The acceptor must be listening. + */ + [[nodiscard]] auto wait(wait_type w) + { + if (!is_open()) + detail::throw_logic_error("wait: acceptor not listening"); + return wait_awaitable(*this, w); + } + /** Initiate an asynchronous accept, returning the socket. Completes when a new connection is available. Only one @@ -433,6 +472,18 @@ class BOOST_COROSIO_DECL local_stream_acceptor : public io_object std::error_code*, io_object::implementation**) = 0; + /** Initiate an asynchronous wait for acceptor readiness. + + Completes when the listen socket becomes ready for + the specified direction. No connection is consumed. + */ + virtual std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) = 0; + /// Return the cached local endpoint. virtual corosio::local_endpoint local_endpoint() const noexcept = 0; diff --git a/include/boost/corosio/local_stream_socket.hpp b/include/boost/corosio/local_stream_socket.hpp index 864714fce..e9daedcdc 100644 --- a/include/boost/corosio/local_stream_socket.hpp +++ b/include/boost/corosio/local_stream_socket.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -107,6 +108,27 @@ class BOOST_COROSIO_DECL local_stream_socket : public io_stream std::stop_token token, std::error_code* ec) = 0; + /** Initiate an asynchronous wait for socket readiness. + + Completes when the socket becomes ready for the + specified direction, or an error condition is + reported. No bytes are transferred. + + @param h Coroutine handle to resume on completion. + @param ex Executor for dispatching the completion. + @param w The direction to wait on. + @param token Stop token for cancellation. + @param ec Output error code. + + @return Coroutine handle to resume immediately. + */ + virtual std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) = 0; + /** Shut down the socket for the given direction(s). @param what The shutdown direction. @@ -186,6 +208,23 @@ class BOOST_COROSIO_DECL local_stream_socket : public io_stream } }; + /// Represent the awaitable returned by @ref wait. + struct wait_awaitable + : detail::void_op_base + { + local_stream_socket& s_; + wait_type w_; + + wait_awaitable(local_stream_socket& s, wait_type w) noexcept + : s_(s), w_(w) {} + + std::coroutine_handle<> dispatch( + std::coroutine_handle<> h, capy::executor_ref ex) const + { + return s_.get().wait(h, ex, w_, token_, &ec_); + } + }; + public: /** Destructor. @@ -302,6 +341,25 @@ class BOOST_COROSIO_DECL local_stream_socket : public io_stream return connect_awaitable(*this, ep); } + /** Wait for the socket to become ready in a given direction. + + Suspends until the socket is ready for the requested + direction, or an error condition is reported. No bytes + are transferred. + + @param w The wait direction (read, write, or error). + + @return An awaitable that completes with `io_result<>`. + + @par Preconditions + The socket must be open. This socket must outlive the + returned awaitable. + */ + [[nodiscard]] auto wait(wait_type w) + { + return wait_awaitable(*this, w); + } + /** Cancel any pending asynchronous operations. All outstanding operations complete with `errc::operation_canceled`. diff --git a/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp index b8bcece0a..59a2be664 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp @@ -95,6 +95,12 @@ class BOOST_COROSIO_DECL win_local_dgram_service final void work_started() noexcept; void work_finished() noexcept; + /** Return the owning IOCP scheduler. */ + win_scheduler& scheduler() noexcept + { + return sched_; + } + private: win_scheduler& sched_; win_mutex mutex_; @@ -147,6 +153,14 @@ inline local_dgram_recv_op::local_dgram_recv_op( cancel_func_ = &do_cancel_impl; } +inline local_dgram_wait_op::local_dgram_wait_op( + win_local_dgram_socket_internal& internal_) noexcept + : overlapped_op(&do_complete) + , internal(internal_) +{ + cancel_func_ = &do_cancel_impl; +} + // ============================================================ // Cancellation functions // ============================================================ @@ -206,6 +220,19 @@ local_dgram_recv_op::do_cancel_impl(overlapped_op* base) noexcept } } +inline void +local_dgram_wait_op::do_cancel_impl(overlapped_op* base) noexcept +{ + auto* op = static_cast(base); + op->cancelled.store(true, std::memory_order_release); + if (op->internal.is_open()) + { + ::CancelIoEx( + reinterpret_cast(op->internal.native_handle()), op); + } + op->internal.svc_.scheduler().cancel_wait_if_constructed(op); +} + // ============================================================ // Completion handlers // ============================================================ @@ -324,6 +351,24 @@ local_dgram_recv_op::do_complete( op->invoke_handler(); } +inline void +local_dgram_wait_op::do_complete( + void* owner, + scheduler_op* base, + std::uint32_t /*bytes*/, + std::uint32_t /*error*/) +{ + auto* op = static_cast(base); + if (!owner) + { + op->cleanup_only(); + op->internal_ptr.reset(); + return; + } + auto prevent_premature_destruction = std::move(op->internal_ptr); + op->invoke_handler(); +} + // ============================================================ // win_local_dgram_socket_internal // ============================================================ @@ -336,6 +381,7 @@ inline win_local_dgram_socket_internal::win_local_dgram_socket_internal( , conn_(*this) , send_wr_(*this) , recv_rd_(*this) + , wt_(*this) { } @@ -646,6 +692,38 @@ win_local_dgram_socket_internal::recv( return std::noop_coroutine(); } +inline std::coroutine_handle<> +win_local_dgram_socket_internal::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + wt_.internal_ptr = shared_from_this(); + + auto& op = wt_; + op.reset(); + op.h = h; + op.ex = d; + op.ec_out = ec; + op.bytes_out = nullptr; + op.start(token); + + svc_.work_started(); + + if (w == wait_type::write) + { + svc_.on_completion(&op, 0, 0); + return std::noop_coroutine(); + } + + // Datagram wait_read and wait_error route through the auxiliary + // select reactor. + svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); + return std::noop_coroutine(); +} + inline void win_local_dgram_socket_internal::cancel() noexcept { @@ -659,11 +737,16 @@ win_local_dgram_socket_internal::cancel() noexcept conn_.request_cancel(); send_wr_.request_cancel(); recv_rd_.request_cancel(); + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); } inline void win_local_dgram_socket_internal::close_socket() noexcept { + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); + if (socket_ != INVALID_SOCKET) { ::CancelIoEx(reinterpret_cast(socket_), nullptr); @@ -739,6 +822,17 @@ win_local_dgram_socket::recv( return internal_->recv(h, d, buf, flags, token, ec, bytes); } +inline std::coroutine_handle<> +win_local_dgram_socket::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + return internal_->wait(h, d, w, token, ec); +} + inline std::error_code win_local_dgram_socket::bind(corosio::local_endpoint ep) noexcept { diff --git a/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp b/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp index b652b96a2..e62f29cd9 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -134,6 +135,23 @@ struct local_dgram_recv_op : overlapped_op win_local_dgram_socket_internal& internal_) noexcept; }; +/** Readiness-wait operation for local datagram sockets. */ +struct local_dgram_wait_op : overlapped_op +{ + win_local_dgram_socket_internal& internal; + std::shared_ptr internal_ptr; + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t bytes, + std::uint32_t error); + static void do_cancel_impl(overlapped_op* op) noexcept; + + explicit local_dgram_wait_op( + win_local_dgram_socket_internal& internal_) noexcept; +}; + /* Internal local datagram socket state for IOCP. */ class win_local_dgram_socket_internal : public intrusive_list::node @@ -146,6 +164,7 @@ class win_local_dgram_socket_internal friend struct local_dgram_connect_op; friend struct local_dgram_send_op; friend struct local_dgram_recv_op; + friend struct local_dgram_wait_op; win_local_dgram_service& svc_; local_dgram_send_to_op wr_; @@ -153,6 +172,7 @@ class win_local_dgram_socket_internal local_dgram_connect_op conn_; local_dgram_send_op send_wr_; local_dgram_recv_op recv_rd_; + local_dgram_wait_op wt_; SOCKET socket_ = INVALID_SOCKET; public: @@ -205,6 +225,13 @@ class win_local_dgram_socket_internal std::error_code*, std::size_t*); + std::coroutine_handle<> wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token, + std::error_code*); + SOCKET native_handle() const noexcept; corosio::local_endpoint local_endpoint() const noexcept; corosio::local_endpoint remote_endpoint() const noexcept; @@ -275,6 +302,13 @@ class win_local_dgram_socket final std::error_code* ec, std::size_t* bytes) override; + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) override; + std::error_code bind(corosio::local_endpoint ep) noexcept override; std::error_code shutdown( diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor.hpp index 75f0fbb6f..d7ced2ea9 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor.hpp @@ -15,6 +15,7 @@ #if BOOST_COROSIO_HAS_IOCP #include +#include #include #include #include @@ -58,6 +59,22 @@ struct local_stream_accept_op : overlapped_op local_stream_accept_op() noexcept; }; +/** Readiness-wait operation state for a local stream acceptor. */ +struct local_stream_acceptor_wait_op : overlapped_op +{ + std::shared_ptr acceptor_ptr; + SOCKET listen_socket = INVALID_SOCKET; + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t bytes, + std::uint32_t error); + static void do_cancel_impl(overlapped_op* op) noexcept; + + local_stream_acceptor_wait_op() noexcept; +}; + /* Internal acceptor state for IOCP local stream I/O. */ class win_local_stream_acceptor_internal : public intrusive_list::node @@ -80,6 +97,13 @@ class win_local_stream_acceptor_internal std::error_code*, io_object::implementation**); + std::coroutine_handle<> wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token, + std::error_code*); + SOCKET native_handle() const noexcept; corosio::local_endpoint local_endpoint() const noexcept; bool is_open() const noexcept; @@ -88,6 +112,7 @@ class win_local_stream_acceptor_internal void set_local_endpoint(corosio::local_endpoint ep) noexcept; local_stream_accept_op acc_; + local_stream_acceptor_wait_op wt_; private: win_local_stream_service& svc_; @@ -115,6 +140,13 @@ class win_local_stream_acceptor final std::error_code* ec, io_object::implementation** impl_out) override; + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) override; + corosio::local_endpoint local_endpoint() const noexcept override; bool is_open() const noexcept override; void cancel() noexcept override; diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp index 2288e17af..4f7da1965 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_acceptor_service.hpp @@ -90,6 +90,28 @@ local_stream_accept_op::do_cancel_impl(overlapped_op* base) noexcept } } +inline local_stream_acceptor_wait_op::local_stream_acceptor_wait_op() noexcept + : overlapped_op(&do_complete) +{ + cancel_func_ = &do_cancel_impl; +} + +inline void +local_stream_acceptor_wait_op::do_cancel_impl(overlapped_op* base) noexcept +{ + auto* op = static_cast(base); + op->cancelled.store(true, std::memory_order_release); + if (op->listen_socket != INVALID_SOCKET) + { + ::CancelIoEx(reinterpret_cast(op->listen_socket), op); + } + if (op->acceptor_ptr) + { + op->acceptor_ptr->socket_service().scheduler() + .cancel_wait_if_constructed(op); + } +} + // accept_op completion handler inline void @@ -191,6 +213,26 @@ local_stream_accept_op::do_complete( dispatch_coro(saved_ex, op->cont_op.cont).resume(); } +inline void +local_stream_acceptor_wait_op::do_complete( + void* owner, + scheduler_op* base, + std::uint32_t /*bytes*/, + std::uint32_t /*error*/) +{ + auto* op = static_cast(base); + + if (!owner) + { + op->cleanup_only(); + op->acceptor_ptr.reset(); + return; + } + + auto prevent_premature_destruction = std::move(op->acceptor_ptr); + op->invoke_handler(); +} + // ============================================================ // win_local_stream_acceptor_internal // ============================================================ @@ -243,11 +285,49 @@ win_local_stream_acceptor_internal::cancel() noexcept if (socket_ != INVALID_SOCKET) ::CancelIoEx(reinterpret_cast(socket_), nullptr); acc_.request_cancel(); + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); +} + +inline std::coroutine_handle<> +win_local_stream_acceptor_internal::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + wt_.acceptor_ptr = shared_from_this(); + wt_.listen_socket = socket_; + + auto& op = wt_; + op.reset(); + op.h = h; + op.ex = d; + op.ec_out = ec; + op.bytes_out = nullptr; + op.start(token); + + svc_.work_started(); + + if (w == wait_type::write) + { + svc_.on_completion(&op, 0, 0); + return std::noop_coroutine(); + } + + // wait_type::read and wait_type::error route through the auxiliary + // select reactor. + svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); + return std::noop_coroutine(); } inline void win_local_stream_acceptor_internal::close_socket() noexcept { + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); + if (socket_ != INVALID_SOCKET) { ::CancelIoEx(reinterpret_cast(socket_), nullptr); @@ -387,6 +467,17 @@ win_local_stream_acceptor::accept( return internal_->accept(h, d, token, ec, impl_out); } +inline std::coroutine_handle<> +win_local_stream_acceptor::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + return internal_->wait(h, d, w, token, ec); +} + inline corosio::local_endpoint win_local_stream_acceptor::local_endpoint() const noexcept { diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp index 25af5c1ca..c4a32b06d 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp @@ -104,6 +104,12 @@ class BOOST_COROSIO_DECL win_local_stream_service final void work_started() noexcept; void work_finished() noexcept; + /** Return the owning IOCP scheduler. */ + win_scheduler& scheduler() noexcept + { + return sched_; + } + private: friend class win_local_stream_acceptor_service; @@ -145,6 +151,14 @@ inline local_stream_write_op::local_stream_write_op( cancel_func_ = &do_cancel_impl; } +inline local_stream_wait_op::local_stream_wait_op( + win_local_stream_socket_internal& internal_) noexcept + : overlapped_op(&do_complete) + , internal(internal_) +{ + cancel_func_ = &do_cancel_impl; +} + // ============================================================ // Cancellation functions // ============================================================ @@ -185,6 +199,19 @@ local_stream_write_op::do_cancel_impl(overlapped_op* base) noexcept } } +inline void +local_stream_wait_op::do_cancel_impl(overlapped_op* base) noexcept +{ + auto* op = static_cast(base); + op->cancelled.store(true, std::memory_order_release); + if (op->internal.is_open()) + { + ::CancelIoEx( + reinterpret_cast(op->internal.native_handle()), op); + } + op->internal.svc_.scheduler().cancel_wait_if_constructed(op); +} + // ============================================================ // connect_op completion handler // ============================================================ @@ -277,6 +304,30 @@ local_stream_write_op::do_complete( op->invoke_handler(); } +// ============================================================ +// wait_op completion handler +// ============================================================ + +inline void +local_stream_wait_op::do_complete( + void* owner, + scheduler_op* base, + std::uint32_t /*bytes*/, + std::uint32_t /*error*/) +{ + auto* op = static_cast(base); + + if (!owner) + { + op->cleanup_only(); + op->internal_ptr.reset(); + return; + } + + auto prevent_premature_destruction = std::move(op->internal_ptr); + op->invoke_handler(); +} + // ============================================================ // win_local_stream_socket_internal // ============================================================ @@ -287,6 +338,7 @@ inline win_local_stream_socket_internal::win_local_stream_socket_internal( , conn_(*this) , rd_(*this) , wr_(*this) + , wt_(*this) { } @@ -525,6 +577,66 @@ win_local_stream_socket_internal::write_some( return std::noop_coroutine(); } +inline std::coroutine_handle<> +win_local_stream_socket_internal::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + wt_.internal_ptr = shared_from_this(); + + auto& op = wt_; + op.reset(); + op.h = h; + op.ex = d; + op.ec_out = ec; + op.bytes_out = nullptr; + op.empty_buffer = true; + op.start(token); + + svc_.work_started(); + + if (w == wait_type::write) + { + svc_.on_completion(&op, 0, 0); + return std::noop_coroutine(); + } + + if (w == wait_type::read) + { + // Zero-byte WSARecv — completes when data is available + // without consuming any bytes. + op.wsabuf = WSABUF{0, nullptr}; + op.flags = 0; + + int result = ::WSARecv( + socket_, &op.wsabuf, 1, nullptr, &op.flags, &op, nullptr); + + if (result == SOCKET_ERROR) + { + DWORD err = ::WSAGetLastError(); + if (err != WSA_IO_PENDING) + { + svc_.on_completion(&op, err, 0); + return std::noop_coroutine(); + } + } + + svc_.on_pending(&op); + + if (op.cancelled.load(std::memory_order_acquire)) + ::CancelIoEx(reinterpret_cast(socket_), &op); + + return std::noop_coroutine(); + } + + // wait_type::error: route through the auxiliary select reactor. + svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); + return std::noop_coroutine(); +} + inline void win_local_stream_socket_internal::cancel() noexcept { @@ -536,11 +648,16 @@ win_local_stream_socket_internal::cancel() noexcept conn_.request_cancel(); rd_.request_cancel(); wr_.request_cancel(); + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); } inline void win_local_stream_socket_internal::close_socket() noexcept { + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); + if (socket_ != INVALID_SOCKET) { ::CancelIoEx(reinterpret_cast(socket_), nullptr); @@ -607,6 +724,17 @@ win_local_stream_socket::write_some( return internal_->write_some(h, d, buf, token, ec, bytes); } +inline std::coroutine_handle<> +win_local_stream_socket::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + return internal_->wait(h, d, w, token, ec); +} + inline std::error_code win_local_stream_socket::shutdown( local_stream_socket::shutdown_type what) noexcept diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_socket.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_socket.hpp index d3b22ff6f..0a91eefb5 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_socket.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_socket.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -90,6 +91,25 @@ struct local_stream_write_op : overlapped_op win_local_stream_socket_internal& internal_) noexcept; }; +/** Readiness-wait operation state for local stream sockets. */ +struct local_stream_wait_op : overlapped_op +{ + WSABUF wsabuf{}; + DWORD flags = 0; + win_local_stream_socket_internal& internal; + std::shared_ptr internal_ptr; + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t bytes, + std::uint32_t error); + static void do_cancel_impl(overlapped_op* op) noexcept; + + explicit local_stream_wait_op( + win_local_stream_socket_internal& internal_) noexcept; +}; + /* Internal socket state for IOCP local stream I/O. Holds the native SOCKET handle, cached endpoints, and @@ -106,11 +126,13 @@ class win_local_stream_socket_internal friend struct local_stream_read_op; friend struct local_stream_write_op; friend struct local_stream_connect_op; + friend struct local_stream_wait_op; win_local_stream_service& svc_; local_stream_connect_op conn_; local_stream_read_op rd_; local_stream_write_op wr_; + local_stream_wait_op wt_; SOCKET socket_ = INVALID_SOCKET; public: @@ -141,6 +163,13 @@ class win_local_stream_socket_internal std::error_code*, std::size_t*); + std::coroutine_handle<> wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token, + std::error_code*); + SOCKET native_handle() const noexcept; corosio::local_endpoint local_endpoint() const noexcept; corosio::local_endpoint remote_endpoint() const noexcept; @@ -198,6 +227,13 @@ class win_local_stream_socket final std::error_code* ec, std::size_t* bytes) override; + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) override; + std::error_code shutdown( local_stream_socket::shutdown_type what) noexcept override; diff --git a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp index 059a75330..2ad9222fc 100644 --- a/include/boost/corosio/native/detail/iocp/win_scheduler.hpp +++ b/include/boost/corosio/native/detail/iocp/win_scheduler.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #include @@ -46,6 +47,7 @@ namespace boost::corosio::detail { // Forward declarations struct overlapped_op; class win_timers; +class win_wait_reactor; class BOOST_COROSIO_DECL win_scheduler final : public scheduler @@ -129,6 +131,31 @@ class BOOST_COROSIO_DECL win_scheduler final mutable win_mutex dispatch_mutex_; mutable op_queue completed_ops_; std::unique_ptr timers_; + std::unique_ptr wait_reactor_; + std::once_flag wait_reactor_once_; + std::atomic wait_reactor_ready_{false}; + +public: + /** Auxiliary select-based reactor for IOCP wait operations. + + Lazily created on first access; lives for the lifetime of the + scheduler and is stopped+joined in ~win_scheduler. Used by + socket and acceptor wait() implementations whose readiness + cannot be expressed natively in IOCP (datagram-read, + acceptor-read, error-wait). + */ + win_wait_reactor& wait_reactor(); + + /** Cancel a parked wait op only if the reactor exists. + + Safe to call from any thread. If no wait op has ever been + registered, the reactor was never constructed, so there is + nothing to cancel and we avoid spinning up a thread + wakeup + socketpair on the cancel path. Acquire/release pairs with the + store in wait_reactor() so reads see a fully-constructed + reactor when the flag is true. + */ + void cancel_wait_if_constructed(overlapped_op* op) noexcept; }; /* @@ -206,67 +233,9 @@ inline win_scheduler::win_scheduler( ctx.make_service(*this); } -inline win_scheduler::~win_scheduler() -{ - if (iocp_ != nullptr) - ::CloseHandle(iocp_); -} - -inline void -win_scheduler::shutdown() -{ - if (timers_) - timers_->stop(); - - // Drain timer heap before the work-counting loop. The timer_service - // was registered after this scheduler (nested make_service from our - // constructor), so execution_context::shutdown() calls us first. - // Asio avoids this by owning timer queues directly inside the - // scheduler; we bridge the gap by shutting down the timer service - // early. The subsequent call from execution_context is a no-op. - if (timer_svc_) - timer_svc_->shutdown(); - - while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0) - { - op_queue ops; - { - std::lock_guard lock(dispatch_mutex_); - ops.splice(completed_ops_); - } - - if (!ops.empty()) - { - while (auto* h = ops.pop()) - { - ::InterlockedDecrement(&outstanding_work_); - h->destroy(); - } - } - else - { - DWORD bytes; - ULONG_PTR key; - LPOVERLAPPED overlapped; - ::GetQueuedCompletionStatus( - iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_); - if (overlapped) - { - ::InterlockedDecrement(&outstanding_work_); - if (key == key_posted) - { - auto* op = reinterpret_cast(overlapped); - op->destroy(); - } - else - { - auto* op = overlapped_to_op(overlapped); - op->destroy(); - } - } - } - } -} +// ~win_scheduler() and shutdown() are defined at the bottom of this +// header so the unique_ptr's deleter and +// wait_reactor_->stop() see the type complete. inline void win_scheduler::post(std::coroutine_handle<> h) const @@ -692,6 +661,114 @@ win_scheduler::update_timeout() } // namespace boost::corosio::detail +// Defer including the auxiliary wait reactor until the scheduler is +// fully defined, since the reactor's inline methods call back into +// win_scheduler. This also gives the dtor and wait_reactor() below a +// complete win_wait_reactor type for unique_ptr destruction and +// lazy construction. +// +// The macro lets win_wait_reactor.hpp diagnose direct inclusion +// (which would land it here with win_scheduler still incomplete). +#define BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE +#include + +namespace boost::corosio::detail { + +inline void +win_scheduler::shutdown() +{ + if (timers_) + timers_->stop(); + + // Drain timer heap before the work-counting loop. The timer_service + // was registered after this scheduler (nested make_service from our + // constructor), so execution_context::shutdown() calls us first. + // Asio avoids this by owning timer queues directly inside the + // scheduler; we bridge the gap by shutting down the timer service + // early. The subsequent call from execution_context is a no-op. + if (timer_svc_) + timer_svc_->shutdown(); + + // Same problem for the auxiliary wait reactor: ops parked in it + // hold work_started credit. Stop the reactor early so its loop + // drains them as cancelled and the work counter can reach zero. + if (wait_reactor_ready_.load(std::memory_order_acquire)) + wait_reactor_->stop(); + + while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0) + { + op_queue ops; + { + std::lock_guard lock(dispatch_mutex_); + ops.splice(completed_ops_); + } + + if (!ops.empty()) + { + while (auto* h = ops.pop()) + { + ::InterlockedDecrement(&outstanding_work_); + h->destroy(); + } + } + else + { + DWORD bytes; + ULONG_PTR key; + LPOVERLAPPED overlapped; + ::GetQueuedCompletionStatus( + iocp_, &bytes, &key, &overlapped, gqcs_timeout_ms_); + if (overlapped) + { + ::InterlockedDecrement(&outstanding_work_); + if (key == key_posted) + { + auto* op = reinterpret_cast(overlapped); + op->destroy(); + } + else + { + auto* op = overlapped_to_op(overlapped); + op->destroy(); + } + } + } + } +} + +inline win_scheduler::~win_scheduler() +{ + if (wait_reactor_) + wait_reactor_->stop(); + wait_reactor_.reset(); + + if (iocp_ != nullptr) + ::CloseHandle(iocp_); +} + +inline win_wait_reactor& +win_scheduler::wait_reactor() +{ + // Lazy thread-safe init: multiple IOCP workers may race the first + // wait() call. wait_reactor_ready_ is set with release ordering + // after construction so cancel_wait_if_constructed can safely + // observe the reactor without forcing construction itself. + std::call_once(wait_reactor_once_, [this] { + wait_reactor_ = std::make_unique(*this); + wait_reactor_ready_.store(true, std::memory_order_release); + }); + return *wait_reactor_; +} + +inline void +win_scheduler::cancel_wait_if_constructed(overlapped_op* op) noexcept +{ + if (wait_reactor_ready_.load(std::memory_order_acquire)) + wait_reactor_->cancel_wait(op); +} + +} // namespace boost::corosio::detail + #endif // BOOST_COROSIO_HAS_IOCP #endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_SCHEDULER_HPP diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor.hpp index 96dbe4f9f..ff8162095 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor.hpp @@ -16,6 +16,7 @@ #if BOOST_COROSIO_HAS_IOCP #include +#include #include #include #include @@ -54,6 +55,22 @@ struct accept_op : overlapped_op accept_op() noexcept; }; +/** Readiness-wait operation state for an acceptor. */ +struct acceptor_wait_op : overlapped_op +{ + std::shared_ptr acceptor_ptr; + SOCKET listen_socket = INVALID_SOCKET; + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t bytes, + std::uint32_t error); + static void do_cancel_impl(overlapped_op* op) noexcept; + + acceptor_wait_op() noexcept; +}; + /** Internal acceptor state for IOCP-based I/O. This class contains the actual state for a listening socket, including @@ -82,6 +99,13 @@ class win_tcp_acceptor_internal std::error_code*, io_object::implementation**); + std::coroutine_handle<> wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token, + std::error_code*); + SOCKET native_handle() const noexcept; endpoint local_endpoint() const noexcept; bool is_open() const noexcept; @@ -90,6 +114,7 @@ class win_tcp_acceptor_internal void set_local_endpoint(endpoint ep) noexcept; accept_op acc_; + acceptor_wait_op wt_; private: win_tcp_service& svc_; @@ -123,6 +148,13 @@ class win_tcp_acceptor final std::error_code* ec, io_object::implementation** impl_out) override; + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) override; + endpoint local_endpoint() const noexcept override; bool is_open() const noexcept override; void cancel() noexcept override; diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp index 1117b7d6a..ff76ea075 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_acceptor_service.hpp @@ -94,11 +94,24 @@ inline write_op::write_op(win_tcp_socket_internal& internal_) noexcept cancel_func_ = &do_cancel_impl; } +inline wait_op::wait_op(win_tcp_socket_internal& internal_) noexcept + : overlapped_op(&do_complete) + , internal(internal_) +{ + cancel_func_ = &do_cancel_impl; +} + inline accept_op::accept_op() noexcept : overlapped_op(&do_complete) { cancel_func_ = &do_cancel_impl; } +inline acceptor_wait_op::acceptor_wait_op() noexcept + : overlapped_op(&do_complete) +{ + cancel_func_ = &do_cancel_impl; +} + // Cancellation functions inline void @@ -136,6 +149,26 @@ write_op::do_cancel_impl(overlapped_op* base) noexcept } } +inline void +wait_op::do_cancel_impl(overlapped_op* base) noexcept +{ + auto* op = static_cast(base); + op->cancelled.store(true, std::memory_order_release); + // Best-effort cancel of any pending zero-byte WSARecv issued for + // wait_type::read. ERROR_NOT_FOUND when nothing overlapped is + // pending is harmless. + if (op->internal.is_open()) + { + ::CancelIoEx( + reinterpret_cast(op->internal.native_handle()), op); + } + // wait_type::error parks the op in the auxiliary select reactor; + // wake it so the reactor can post a cancelled completion. No-op + // if the reactor was never constructed (e.g. zero-byte WSARecv + // path was the only thing this socket ever did). + op->internal.svc_.scheduler().cancel_wait_if_constructed(op); +} + inline void accept_op::do_cancel_impl(overlapped_op* base) noexcept { @@ -146,6 +179,22 @@ accept_op::do_cancel_impl(overlapped_op* base) noexcept } } +inline void +acceptor_wait_op::do_cancel_impl(overlapped_op* base) noexcept +{ + auto* op = static_cast(base); + op->cancelled.store(true, std::memory_order_release); + if (op->listen_socket != INVALID_SOCKET) + { + ::CancelIoEx(reinterpret_cast(op->listen_socket), op); + } + if (op->acceptor_ptr) + { + op->acceptor_ptr->socket_service().scheduler() + .cancel_wait_if_constructed(op); + } +} + // accept_op completion handler inline void @@ -245,6 +294,28 @@ accept_op::do_complete( dispatch_coro(saved_ex, op->cont_op.cont).resume(); } +// acceptor_wait_op completion handler + +inline void +acceptor_wait_op::do_complete( + void* owner, + scheduler_op* base, + std::uint32_t /*bytes*/, + std::uint32_t /*error*/) +{ + auto* op = static_cast(base); + + if (!owner) + { + op->cleanup_only(); + op->acceptor_ptr.reset(); + return; + } + + auto prevent_premature_destruction = std::move(op->acceptor_ptr); + op->invoke_handler(); +} + // connect_op completion handler inline void @@ -330,6 +401,28 @@ write_op::do_complete( op->invoke_handler(); } +// wait_op completion handler + +inline void +wait_op::do_complete( + void* owner, + scheduler_op* base, + std::uint32_t /*bytes*/, + std::uint32_t /*error*/) +{ + auto* op = static_cast(base); + + if (!owner) + { + op->cleanup_only(); + op->internal_ptr.reset(); + return; + } + + auto prevent_premature_destruction = std::move(op->internal_ptr); + op->invoke_handler(); +} + // win_tcp_socket_internal inline win_tcp_socket_internal::win_tcp_socket_internal(win_tcp_service& svc) noexcept @@ -337,6 +430,7 @@ inline win_tcp_socket_internal::win_tcp_socket_internal(win_tcp_service& svc) no , conn_(*this) , rd_(*this) , wr_(*this) + , wt_(*this) { } @@ -593,6 +687,71 @@ win_tcp_socket_internal::write_some( return std::noop_coroutine(); } +inline std::coroutine_handle<> +win_tcp_socket_internal::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + wt_.internal_ptr = shared_from_this(); + + auto& op = wt_; + op.reset(); + op.h = h; + op.ex = d; + op.ec_out = ec; + op.bytes_out = nullptr; + op.empty_buffer = true; // skip EOF translation in invoke_handler + op.start(token); + + svc_.work_started(); + + if (w == wait_type::write) + { + // Match asio's IOCP behavior and corosio's reactor contract: + // wait_type::write completes immediately on a connected socket. + svc_.on_completion(&op, 0, 0); + return std::noop_coroutine(); + } + + if (w == wait_type::read) + { + // Zero-byte WSARecv: kernel signals completion when data is + // available without consuming any bytes from the stream. This + // is the documented Winsock pattern for "is the socket + // readable" notifications. + op.wsabuf = WSABUF{0, nullptr}; + op.flags = 0; + + int result = ::WSARecv( + socket_, &op.wsabuf, 1, nullptr, &op.flags, &op, nullptr); + + if (result == SOCKET_ERROR) + { + DWORD err = ::WSAGetLastError(); + if (err != WSA_IO_PENDING) + { + svc_.on_completion(&op, err, 0); + return std::noop_coroutine(); + } + } + + svc_.on_pending(&op); + + // Re-check cancellation after I/O is pending. + if (op.cancelled.load(std::memory_order_acquire)) + ::CancelIoEx(reinterpret_cast(socket_), &op); + + return std::noop_coroutine(); + } + + // wait_type::error: route through the auxiliary select reactor. + svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); + return std::noop_coroutine(); +} + inline void win_tcp_socket_internal::cancel() noexcept { @@ -604,11 +763,25 @@ win_tcp_socket_internal::cancel() noexcept conn_.request_cancel(); rd_.request_cancel(); wr_.request_cancel(); + wt_.request_cancel(); + // CancelIoEx covers overlapped I/O on the socket but cannot reach + // a wait op parked in the auxiliary reactor (no overlapped is + // outstanding). Route through the reactor explicitly. Safe no-op + // if the reactor was never constructed. + svc_.scheduler().cancel_wait_if_constructed(&wt_); } inline void win_tcp_socket_internal::close_socket() noexcept { + // Tear down any aux-reactor-parked wait op before closing the + // SOCKET handle. Otherwise the reactor would keep polling a + // dangling fd (and on a Winsock SOCKET-id reuse the wrong fd + // could be polled briefly). The cancel happens-before the + // closesocket below. + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); + if (socket_ != INVALID_SOCKET) { ::CancelIoEx(reinterpret_cast(socket_), nullptr); @@ -676,6 +849,17 @@ win_tcp_socket::write_some( return internal_->write_some(h, d, buf, token, ec, bytes); } +inline std::coroutine_handle<> +win_tcp_socket::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + return internal_->wait(h, d, w, token, ec); +} + inline std::error_code win_tcp_socket::shutdown(tcp_socket::shutdown_type what) noexcept { @@ -1204,6 +1388,39 @@ win_tcp_acceptor_internal::accept( return std::noop_coroutine(); } +inline std::coroutine_handle<> +win_tcp_acceptor_internal::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + wt_.acceptor_ptr = shared_from_this(); + wt_.listen_socket = socket_; + + auto& op = wt_; + op.reset(); + op.h = h; + op.ex = d; + op.ec_out = ec; + op.bytes_out = nullptr; + op.start(token); + + svc_.work_started(); + + if (w == wait_type::write) + { + svc_.on_completion(&op, 0, 0); + return std::noop_coroutine(); + } + + // wait_type::read (incoming connection ready) and wait_type::error + // on the listen socket route through the auxiliary select reactor. + svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); + return std::noop_coroutine(); +} + inline void win_tcp_acceptor_internal::cancel() noexcept { @@ -1213,11 +1430,17 @@ win_tcp_acceptor_internal::cancel() noexcept } acc_.request_cancel(); + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); } inline void win_tcp_acceptor_internal::close_socket() noexcept { + // Tear down any aux-reactor-parked wait op first. + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); + if (socket_ != INVALID_SOCKET) { ::CancelIoEx(reinterpret_cast(socket_), nullptr); @@ -1258,6 +1481,17 @@ win_tcp_acceptor::accept( return internal_->accept(h, d, token, ec, impl_out); } +inline std::coroutine_handle<> +win_tcp_acceptor::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + return internal_->wait(h, d, w, token, ec); +} + inline endpoint win_tcp_acceptor::local_endpoint() const noexcept { diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp index 674721f3b..15237606c 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_service.hpp @@ -172,6 +172,12 @@ class BOOST_COROSIO_DECL win_tcp_service final /** Notify scheduler that I/O work completed. */ void work_finished() noexcept; + /** Return the owning IOCP scheduler. */ + win_scheduler& scheduler() noexcept + { + return sched_; + } + private: friend class win_tcp_acceptor_service; diff --git a/include/boost/corosio/native/detail/iocp/win_tcp_socket.hpp b/include/boost/corosio/native/detail/iocp/win_tcp_socket.hpp index 5984f0093..b7075036b 100644 --- a/include/boost/corosio/native/detail/iocp/win_tcp_socket.hpp +++ b/include/boost/corosio/native/detail/iocp/win_tcp_socket.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -88,6 +89,32 @@ struct write_op : overlapped_op explicit write_op(win_tcp_socket_internal& internal_) noexcept; }; +/** Readiness-wait operation state. + + Completion conveys an error_code only (no bytes_transferred). + wait_type::read posts a zero-byte WSARecv: the kernel signals + completion when data arrives without consuming it. + wait_type::write short-circuits through the scheduler queue. + wait_type::error parks the op in the auxiliary select reactor + until the kernel reports an error condition. +*/ +struct wait_op : overlapped_op +{ + WSABUF wsabuf{}; + DWORD flags = 0; + win_tcp_socket_internal& internal; + std::shared_ptr internal_ptr; + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t bytes, + std::uint32_t error); + static void do_cancel_impl(overlapped_op* op) noexcept; + + explicit wait_op(win_tcp_socket_internal& internal_) noexcept; +}; + /** Internal socket state for IOCP-based I/O. This class contains the actual state for a single socket, including @@ -105,11 +132,13 @@ class win_tcp_socket_internal friend struct read_op; friend struct write_op; friend struct connect_op; + friend struct wait_op; win_tcp_service& svc_; connect_op conn_; read_op rd_; write_op wr_; + wait_op wt_; SOCKET socket_ = INVALID_SOCKET; int family_ = AF_UNSPEC; @@ -140,6 +169,13 @@ class win_tcp_socket_internal std::error_code*, std::size_t*); + std::coroutine_handle<> wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token, + std::error_code*); + SOCKET native_handle() const noexcept; endpoint local_endpoint() const noexcept; endpoint remote_endpoint() const noexcept; @@ -195,6 +231,13 @@ class win_tcp_socket final std::error_code* ec, std::size_t* bytes) override; + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) override; + std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override; native_handle_type native_handle() const noexcept override; diff --git a/include/boost/corosio/native/detail/iocp/win_udp_service.hpp b/include/boost/corosio/native/detail/iocp/win_udp_service.hpp index daa4e2019..644330bc9 100644 --- a/include/boost/corosio/native/detail/iocp/win_udp_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_udp_service.hpp @@ -79,6 +79,12 @@ class BOOST_COROSIO_DECL win_udp_service final void work_started() noexcept; void work_finished() noexcept; + /** Return the owning IOCP scheduler. */ + win_scheduler& scheduler() noexcept + { + return sched_; + } + private: win_scheduler& sched_; win_mutex mutex_; @@ -125,6 +131,13 @@ inline udp_recv_op::udp_recv_op(win_udp_socket_internal& internal_) noexcept cancel_func_ = &do_cancel_impl; } +inline udp_wait_op::udp_wait_op(win_udp_socket_internal& internal_) noexcept + : overlapped_op(&do_complete) + , internal(internal_) +{ + cancel_func_ = &do_cancel_impl; +} + // Cancellation functions inline void @@ -236,6 +249,19 @@ udp_recv_op::do_cancel_impl(overlapped_op* base) noexcept } } +inline void +udp_wait_op::do_cancel_impl(overlapped_op* base) noexcept +{ + auto* op = static_cast(base); + op->cancelled.store(true, std::memory_order_release); + if (op->internal.is_open()) + { + ::CancelIoEx( + reinterpret_cast(op->internal.native_handle()), op); + } + op->internal.svc_.scheduler().cancel_wait_if_constructed(op); +} + // Connected-mode completion handlers inline void @@ -312,6 +338,26 @@ udp_recv_op::do_complete( op->invoke_handler(); } +inline void +udp_wait_op::do_complete( + void* owner, + scheduler_op* base, + std::uint32_t /*bytes*/, + std::uint32_t /*error*/) +{ + auto* op = static_cast(base); + + if (!owner) + { + op->cleanup_only(); + op->internal_ptr.reset(); + return; + } + + auto prevent_premature_destruction = std::move(op->internal_ptr); + op->invoke_handler(); +} + // win_udp_socket_internal inline win_udp_socket_internal::win_udp_socket_internal( @@ -322,6 +368,7 @@ inline win_udp_socket_internal::win_udp_socket_internal( , conn_(*this) , send_wr_(*this) , recv_rd_(*this) + , wt_(*this) { } @@ -640,6 +687,41 @@ win_udp_socket_internal::recv( return std::noop_coroutine(); } +inline std::coroutine_handle<> +win_udp_socket_internal::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + wt_.internal_ptr = shared_from_this(); + + auto& op = wt_; + op.reset(); + op.h = h; + op.ex = d; + op.ec_out = ec; + op.bytes_out = nullptr; + op.start(token); + + svc_.work_started(); + + if (w == wait_type::write) + { + svc_.on_completion(&op, 0, 0); + return std::noop_coroutine(); + } + + // Datagram wait_read and wait_error route through the auxiliary + // select reactor: there's no IOCP-native primitive for "datagram + // readable without dequeuing the message" (zero-byte WSARecvFrom + // would discard the next datagram), and wait_error needs the + // reactor for the kernel-error signal in any case. + svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); + return std::noop_coroutine(); +} + inline void win_udp_socket_internal::cancel() noexcept { @@ -653,11 +735,16 @@ win_udp_socket_internal::cancel() noexcept conn_.request_cancel(); send_wr_.request_cancel(); recv_rd_.request_cancel(); + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); } inline void win_udp_socket_internal::close_socket() noexcept { + wt_.request_cancel(); + svc_.scheduler().cancel_wait_if_constructed(&wt_); + if (socket_ != INVALID_SOCKET) { ::CancelIoEx(reinterpret_cast(socket_), nullptr); @@ -754,6 +841,17 @@ win_udp_socket::recv( return internal_->recv(h, d, buf, flags, token, ec, bytes); } +inline std::coroutine_handle<> +win_udp_socket::wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) +{ + return internal_->wait(h, d, w, token, ec); +} + inline native_handle_type win_udp_socket::native_handle() const noexcept { diff --git a/include/boost/corosio/native/detail/iocp/win_udp_socket.hpp b/include/boost/corosio/native/detail/iocp/win_udp_socket.hpp index 437714416..0c3c8b7c0 100644 --- a/include/boost/corosio/native/detail/iocp/win_udp_socket.hpp +++ b/include/boost/corosio/native/detail/iocp/win_udp_socket.hpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -129,6 +130,22 @@ struct udp_recv_op : overlapped_op explicit udp_recv_op(win_udp_socket_internal& internal_) noexcept; }; +/** Readiness-wait operation (datagram socket). */ +struct udp_wait_op : overlapped_op +{ + win_udp_socket_internal& internal; + std::shared_ptr internal_ptr; + + static void do_complete( + void* owner, + scheduler_op* base, + std::uint32_t bytes, + std::uint32_t error); + static void do_cancel_impl(overlapped_op* op) noexcept; + + explicit udp_wait_op(win_udp_socket_internal& internal_) noexcept; +}; + /** Internal datagram socket state for IOCP-based I/O. This class contains the actual state for a single UDP socket, @@ -149,6 +166,7 @@ class win_udp_socket_internal friend struct udp_connect_op; friend struct udp_send_op; friend struct udp_recv_op; + friend struct udp_wait_op; win_udp_service& svc_; send_to_op wr_; @@ -156,6 +174,7 @@ class win_udp_socket_internal udp_connect_op conn_; udp_send_op send_wr_; udp_recv_op recv_rd_; + udp_wait_op wt_; SOCKET socket_ = INVALID_SOCKET; int family_ = AF_UNSPEC; @@ -208,6 +227,13 @@ class win_udp_socket_internal std::error_code*, std::size_t*); + std::coroutine_handle<> wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token, + std::error_code*); + SOCKET native_handle() const noexcept; endpoint local_endpoint() const noexcept; endpoint remote_endpoint() const noexcept; @@ -285,6 +311,13 @@ class win_udp_socket final std::error_code* ec, std::size_t* bytes) override; + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref d, + wait_type w, + std::stop_token token, + std::error_code* ec) override; + native_handle_type native_handle() const noexcept override; std::error_code set_option( diff --git a/include/boost/corosio/native/detail/iocp/win_wait_reactor.hpp b/include/boost/corosio/native/detail/iocp/win_wait_reactor.hpp new file mode 100644 index 000000000..3617f68a4 --- /dev/null +++ b/include/boost/corosio/native/detail/iocp/win_wait_reactor.hpp @@ -0,0 +1,396 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP +#define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP + +#include + +#if BOOST_COROSIO_HAS_IOCP + +// This header is included from the bottom of win_scheduler.hpp after +// the scheduler class is fully defined. Including it directly would +// circle back into a still-incomplete win_scheduler when the dtor's +// unique_ptr::reset() is parsed. Diagnose that +// rather than emitting a confusing "incomplete type" error far away. +#ifndef BOOST_COROSIO_DETAIL_IOCP_WIN_SCHEDULER_BODY_DONE +#error "Include \ +instead of including this header directly." +#endif + +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace boost::corosio::detail { + +/** Auxiliary select-based reactor for IOCP wait operations. + + IOCP has no native primitive for socket readiness without I/O. + For cases where a zero-byte WSARecv won't work (datagram-read, + acceptor-read, error-wait), this reactor runs a dedicated thread + using WSAPoll to detect readiness and posts a synthetic completion + to the owning IOCP scheduler via win_scheduler::on_completion(). + + The same dispatch path used by overlapped I/O then delivers the + completion to the user's coroutine, so the public API is uniform + across backends. + + Per-op lifecycle: + 1. Caller sets up an overlapped_op (h, ex, ec_out, cancelled flag). + 2. Caller calls register_wait(fd, w, op) and returns + std::noop_coroutine. The op is parked in the reactor's table. + 3. Reactor thread polls. When the fd is ready, the op is removed + from the table and posted to the scheduler. The error code + delivered to the completion is: ec={} on success; the SO_ERROR + value if error revents fired and SO_ERROR is set; or + WSAECONNABORTED as a synthesized fallback for wait_type::error + when error revents fired but SO_ERROR returned zero. + 4. On socket cancel(), the user's thread calls cancel_wait(op), + which queues a cancel request. The reactor thread removes the + op from the table and posts a completion; invoke_handler sees + op.cancelled==true and yields capy::cond::canceled. + + Thread-safe: register_wait, cancel_wait, and stop may be called + from any thread. +*/ +class win_wait_reactor +{ +public: + explicit win_wait_reactor(win_scheduler& sched); + ~win_wait_reactor(); + + win_wait_reactor(win_wait_reactor const&) = delete; + win_wait_reactor& operator=(win_wait_reactor const&) = delete; + + /// Park an overlapped_op until @p fd is ready for @p w. + void register_wait(SOCKET fd, wait_type w, overlapped_op* op); + + /// Remove a parked op and post a completion. Idempotent. + void cancel_wait(overlapped_op* op); + + /// Stop the reactor thread and drain remaining ops as cancelled. + void stop(); + +private: + struct entry + { + SOCKET fd = INVALID_SOCKET; + wait_type w = wait_type::read; + overlapped_op* op = nullptr; + }; + + void run(); + void wake_self() noexcept; + void make_wakeup_pair(); + void close_wakeup_pair() noexcept; + + static SHORT events_for_wait(wait_type w) noexcept + { + switch (w) + { + case wait_type::read: return POLLRDNORM; + case wait_type::write: return POLLWRNORM; + default: return POLLPRI; + } + } + + static bool ready_for_wait(wait_type w, SHORT revents) noexcept + { + constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL; + switch (w) + { + case wait_type::read: + return (revents & (POLLRDNORM | POLLRDBAND | err_bits)) != 0; + case wait_type::write: + return (revents & (POLLWRNORM | POLLWRBAND | err_bits)) != 0; + default: + return (revents & (POLLPRI | err_bits)) != 0; + } + } + + win_scheduler& sched_; + + SOCKET wakeup_read_ = INVALID_SOCKET; + SOCKET wakeup_write_ = INVALID_SOCKET; + + std::mutex mutex_; + std::vector pending_register_; + std::vector pending_cancel_; + std::atomic stop_{false}; + std::atomic wake_pending_{false}; + + std::vector registered_; // reactor-thread-only + + std::thread thread_; +}; + +inline win_wait_reactor::win_wait_reactor(win_scheduler& sched) + : sched_(sched) +{ + make_wakeup_pair(); + thread_ = std::thread([this] { run(); }); +} + +inline win_wait_reactor::~win_wait_reactor() +{ + stop(); + close_wakeup_pair(); +} + +inline void +win_wait_reactor::make_wakeup_pair() +{ + // Build a pair of connected loopback sockets to use as a wakeup + // channel. Winsock has no socketpair(2), so we listen on + // 127.0.0.1:0, connect a peer, then accept it. + SOCKET listener = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (listener == INVALID_SOCKET) + return; + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ::htonl(INADDR_LOOPBACK); + addr.sin_port = 0; + + int len = sizeof(addr); + if (::bind(listener, reinterpret_cast(&addr), len) == + SOCKET_ERROR || + ::listen(listener, 1) == SOCKET_ERROR || + ::getsockname(listener, reinterpret_cast(&addr), &len) == + SOCKET_ERROR) + { + ::closesocket(listener); + return; + } + + wakeup_write_ = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (wakeup_write_ == INVALID_SOCKET) + { + ::closesocket(listener); + return; + } + + if (::connect( + wakeup_write_, reinterpret_cast(&addr), len) == + SOCKET_ERROR) + { + ::closesocket(wakeup_write_); + wakeup_write_ = INVALID_SOCKET; + ::closesocket(listener); + return; + } + + wakeup_read_ = ::accept(listener, nullptr, nullptr); + ::closesocket(listener); + + if (wakeup_read_ == INVALID_SOCKET) + { + ::closesocket(wakeup_write_); + wakeup_write_ = INVALID_SOCKET; + return; + } + + // The drain loop in run() calls recv() until it returns <= 0. + // With a blocking socket that second recv() would block instead + // of returning WSAEWOULDBLOCK, deadlocking the reactor thread. + u_long non_blocking = 1; + ::ioctlsocket(wakeup_read_, FIONBIO, &non_blocking); +} + +inline void +win_wait_reactor::close_wakeup_pair() noexcept +{ + if (wakeup_read_ != INVALID_SOCKET) + { + ::closesocket(wakeup_read_); + wakeup_read_ = INVALID_SOCKET; + } + if (wakeup_write_ != INVALID_SOCKET) + { + ::closesocket(wakeup_write_); + wakeup_write_ = INVALID_SOCKET; + } +} + +inline void +win_wait_reactor::wake_self() noexcept +{ + // Coalesce wakes: only send a byte if no wake is already pending. + bool expected = false; + if (!wake_pending_.compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) + return; + if (wakeup_write_ != INVALID_SOCKET) + { + char b = 0; + ::send(wakeup_write_, &b, 1, 0); + } +} + +inline void +win_wait_reactor::register_wait( + SOCKET fd, wait_type w, overlapped_op* op) +{ + { + std::lock_guard lock(mutex_); + pending_register_.push_back(entry{fd, w, op}); + } + wake_self(); +} + +inline void +win_wait_reactor::cancel_wait(overlapped_op* op) +{ + { + std::lock_guard lock(mutex_); + pending_cancel_.push_back(op); + } + wake_self(); +} + +inline void +win_wait_reactor::stop() +{ + if (stop_.exchange(true, std::memory_order_acq_rel)) + return; + wake_self(); + if (thread_.joinable()) + thread_.join(); +} + +inline void +win_wait_reactor::run() +{ + std::vector pollfds; + + while (!stop_.load(std::memory_order_acquire)) + { + // Drain pending register/cancel under the lock. + std::vector to_add; + std::vector to_cancel; + { + std::lock_guard lock(mutex_); + to_add.swap(pending_register_); + to_cancel.swap(pending_cancel_); + } + + for (auto& e : to_add) + registered_.push_back(e); + + for (auto* op : to_cancel) + { + auto it = std::find_if( + registered_.begin(), registered_.end(), + [op](entry const& e) { return e.op == op; }); + if (it != registered_.end()) + { + // The op's cancelled flag has already been set by + // request_cancel; invoke_handler will translate it. + sched_.on_completion(op, 0, 0); + registered_.erase(it); + } + // If not in registered_, the op already fired — no-op. + } + + // Build the poll set. Slot 0 is the wakeup socket. + pollfds.clear(); + pollfds.reserve(registered_.size() + 1); + pollfds.push_back({wakeup_read_, POLLRDNORM, 0}); + for (auto& e : registered_) + pollfds.push_back({e.fd, events_for_wait(e.w), 0}); + + int n = ::WSAPoll( + pollfds.data(), + static_cast(pollfds.size()), + -1 /* infinite */); + if (n == SOCKET_ERROR) + break; + + // Drain the wakeup socket so it stops reporting readable. + if (pollfds[0].revents != 0) + { + char buf[64]; + for (;;) + { + int r = ::recv(wakeup_read_, buf, sizeof(buf), 0); + if (r <= 0) + break; + } + wake_pending_.store(false, std::memory_order_release); + } + + // Walk events in reverse so erases don't invalidate later indices. + for (std::size_t i = pollfds.size(); i > 1; --i) + { + auto const& pfd = pollfds[i - 1]; + if (pfd.revents == 0) + continue; + + auto const& e = registered_[i - 2]; + if (!ready_for_wait(e.w, pfd.revents)) + continue; + + DWORD err = 0; + constexpr SHORT err_bits = POLLERR | POLLHUP | POLLNVAL; + if (pfd.revents & err_bits) + { + int so_err = 0; + int sz = sizeof(so_err); + if (::getsockopt( + e.fd, SOL_SOCKET, SO_ERROR, + reinterpret_cast(&so_err), &sz) == 0 && + so_err != 0) + { + err = static_cast(so_err); + } + else if (e.w == wait_type::error) + { + // wait_type::error fires on the error condition; + // the contract is to report a non-zero error_code. + err = WSAECONNABORTED; + } + } + + sched_.on_completion(e.op, err, 0); + registered_.erase(registered_.begin() + (i - 2)); + } + } + + // Drain remaining ops as cancelled on shutdown. This must cover + // both the active set and anything still queued by user threads + // that hasn't been moved into registered_ yet, otherwise those + // ops leak work_started credit and stall scheduler shutdown. + { + std::lock_guard lock(mutex_); + for (auto& e : pending_register_) + registered_.push_back(e); + pending_register_.clear(); + pending_cancel_.clear(); + } + for (auto& e : registered_) + sched_.on_completion(e.op, ERROR_OPERATION_ABORTED, 0); + registered_.clear(); +} + +} // namespace boost::corosio::detail + +#endif // BOOST_COROSIO_HAS_IOCP + +#endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_WAIT_REACTOR_HPP diff --git a/include/boost/corosio/native/detail/reactor/reactor_acceptor.hpp b/include/boost/corosio/native/detail/reactor/reactor_acceptor.hpp index 1c5847da4..cf3a60350 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_acceptor.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_acceptor.hpp @@ -11,8 +11,10 @@ #define BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP #include +#include #include #include +#include #include #include @@ -38,6 +40,7 @@ namespace boost::corosio::detail { @tparam Service The backend's acceptor service type. @tparam Op The backend's base op type. @tparam AcceptOp The backend's accept op type. + @tparam WaitOp The backend's wait op type. @tparam DescState The backend's descriptor_state type. @tparam ImplBase The public vtable base (tcp_acceptor::implementation or @@ -49,6 +52,7 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase = tcp_acceptor::implementation, class Endpoint = endpoint> @@ -72,6 +76,15 @@ class reactor_acceptor /// Pending accept operation slot. AcceptOp acc_; + /// Pending wait-for-read operation slot. + WaitOp wait_rd_; + + /// Pending wait-for-write operation slot. + WaitOp wait_wr_; + + /// Pending wait-for-error operation slot. + WaitOp wait_er_; + /// Per-descriptor state for persistent reactor registration. DescState desc_state_; @@ -133,7 +146,10 @@ class reactor_acceptor desc_state_.fd = fd; { std::lock_guard lock(desc_state_.mutex); - desc_state_.read_op = nullptr; + desc_state_.read_op = nullptr; + desc_state_.wait_read_op = nullptr; + desc_state_.wait_write_op = nullptr; + desc_state_.wait_error_op = nullptr; } } @@ -148,6 +164,30 @@ class reactor_acceptor /// Close the acceptor (non-virtual, called by the service). void close_socket() noexcept { do_close_socket(); } + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) override + { + return do_wait(h, ex, w, token, ec); + } + + /** Wait for readiness on the listen socket. + + Registers a wait op on the matching event slot. For + `wait_type::read`, completion signals that an incoming + connection is pending and a subsequent accept will + succeed without blocking. + */ + std::coroutine_handle<> do_wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token const&, + std::error_code*); + /** Cancel a single pending operation. Claims the operation from the read_op descriptor slot @@ -197,11 +237,12 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> void -reactor_acceptor:: +reactor_acceptor:: cancel_single_op(Op& op) noexcept { auto self = this->weak_from_this().lock(); @@ -213,8 +254,14 @@ reactor_acceptor: reactor_op_base* claimed = nullptr; { std::lock_guard lock(desc_state_.mutex); - if (desc_state_.read_op == &op) - claimed = std::exchange(desc_state_.read_op, nullptr); + auto try_claim = [&](reactor_op_base*& slot) { + if (!claimed && slot == &op) + claimed = std::exchange(slot, nullptr); + }; + try_claim(desc_state_.read_op); + try_claim(desc_state_.wait_read_op); + try_claim(desc_state_.wait_write_op); + try_claim(desc_state_.wait_error_op); } if (claimed) { @@ -229,14 +276,18 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> void -reactor_acceptor:: +reactor_acceptor:: do_cancel() noexcept { cancel_single_op(acc_); + cancel_single_op(wait_rd_); + cancel_single_op(wait_wr_); + cancel_single_op(wait_er_); } template< @@ -244,22 +295,32 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> void -reactor_acceptor:: +reactor_acceptor:: do_close_socket() noexcept { auto self = this->weak_from_this().lock(); if (self) { acc_.request_cancel(); - - reactor_op_base* claimed = nullptr; + wait_rd_.request_cancel(); + wait_wr_.request_cancel(); + wait_er_.request_cancel(); + + reactor_op_base* claimed_acc = nullptr; + reactor_op_base* claimed_wr = nullptr; + reactor_op_base* claimed_ww = nullptr; + reactor_op_base* claimed_we = nullptr; { std::lock_guard lock(desc_state_.mutex); - claimed = std::exchange(desc_state_.read_op, nullptr); + claimed_acc = std::exchange(desc_state_.read_op, nullptr); + claimed_wr = std::exchange(desc_state_.wait_read_op, nullptr); + claimed_ww = std::exchange(desc_state_.wait_write_op, nullptr); + claimed_we = std::exchange(desc_state_.wait_error_op, nullptr); desc_state_.read_ready = false; desc_state_.write_ready = false; @@ -267,12 +328,18 @@ reactor_acceptor: desc_state_.impl_ref_ = self; } - if (claimed) - { - acc_.impl_ptr = self; - svc_.post(&acc_); - svc_.work_finished(); - } + auto repost = [&](reactor_op_base* claimed, reactor_op_base& op) { + if (claimed) + { + op.impl_ptr = self; + svc_.post(&op); + svc_.work_finished(); + } + }; + repost(claimed_acc, acc_); + repost(claimed_wr, wait_rd_); + repost(claimed_ww, wait_wr_); + repost(claimed_we, wait_er_); } if (fd_ >= 0) @@ -294,22 +361,32 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> native_handle_type -reactor_acceptor:: +reactor_acceptor:: do_release_socket() noexcept { auto self = this->weak_from_this().lock(); if (self) { acc_.request_cancel(); - - reactor_op_base* claimed = nullptr; + wait_rd_.request_cancel(); + wait_wr_.request_cancel(); + wait_er_.request_cancel(); + + reactor_op_base* claimed_acc = nullptr; + reactor_op_base* claimed_wr = nullptr; + reactor_op_base* claimed_ww = nullptr; + reactor_op_base* claimed_we = nullptr; { std::lock_guard lock(desc_state_.mutex); - claimed = std::exchange(desc_state_.read_op, nullptr); + claimed_acc = std::exchange(desc_state_.read_op, nullptr); + claimed_wr = std::exchange(desc_state_.wait_read_op, nullptr); + claimed_ww = std::exchange(desc_state_.wait_write_op, nullptr); + claimed_we = std::exchange(desc_state_.wait_error_op, nullptr); desc_state_.read_ready = false; desc_state_.write_ready = false; @@ -317,12 +394,18 @@ reactor_acceptor: desc_state_.impl_ref_ = self; } - if (claimed) - { - acc_.impl_ptr = self; - svc_.post(&acc_); - svc_.work_finished(); - } + auto repost = [&](reactor_op_base* claimed, reactor_op_base& op) { + if (claimed) + { + op.impl_ptr = self; + svc_.post(&op); + svc_.work_finished(); + } + }; + repost(claimed_acc, acc_); + repost(claimed_wr, wait_rd_); + repost(claimed_ww, wait_wr_); + repost(claimed_we, wait_er_); } native_handle_type released = fd_; @@ -347,11 +430,12 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> std::error_code -reactor_acceptor:: +reactor_acceptor:: do_bind(Endpoint const& ep) { sockaddr_storage storage{}; @@ -374,11 +458,12 @@ template< class Service, class Op, class AcceptOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> std::error_code -reactor_acceptor:: +reactor_acceptor:: do_listen(int backlog) { if (::listen(fd_, backlog) < 0) @@ -388,6 +473,83 @@ reactor_acceptor: return {}; } +template< + class Derived, + class Service, + class Op, + class AcceptOp, + class WaitOp, + class DescState, + class ImplBase, + class Endpoint> +std::coroutine_handle<> +reactor_acceptor:: + do_wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token const& token, + std::error_code* ec) +{ + // wait_type::write completes immediately (see reactor_stream_socket::do_wait). + if (w == wait_type::write) + { + auto& op = wait_wr_; + op.reset(); + op.wait_event = reactor_event_write; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = this->fd_; + op.start(token, static_cast(this)); + op.impl_ptr = this->shared_from_this(); + op.complete(0, 0); + svc_.post(&op); + return std::noop_coroutine(); + } + + WaitOp* op_ptr; + reactor_op_base** desc_slot_ptr; + std::uint32_t event; + + if (w == wait_type::read) + { + op_ptr = &wait_rd_; + desc_slot_ptr = &desc_state_.wait_read_op; + event = reactor_event_read; + } + else // wait_type::error + { + op_ptr = &wait_er_; + desc_slot_ptr = &desc_state_.wait_error_op; + event = reactor_event_error; + } + + auto& op = *op_ptr; + op.reset(); + op.wait_event = event; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = this->fd_; + op.start(token, static_cast(this)); + op.impl_ptr = this->shared_from_this(); + + svc_.work_started(); + + std::lock_guard lock(desc_state_.mutex); + if (op.cancelled.load(std::memory_order_acquire)) + { + svc_.post(&op); + svc_.work_finished(); + } + else + { + *desc_slot_ptr = &op; + } + return std::noop_coroutine(); +} + } // namespace boost::corosio::detail #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_ACCEPTOR_HPP diff --git a/include/boost/corosio/native/detail/reactor/reactor_basic_socket.hpp b/include/boost/corosio/native/detail/reactor/reactor_basic_socket.hpp index 4ea2c2d14..089089e9d 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_basic_socket.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_basic_socket.hpp @@ -57,10 +57,10 @@ class reactor_basic_socket { friend Derived; - template + template friend class reactor_stream_socket; - template + template friend class reactor_datagram_socket; explicit reactor_basic_socket(Service& svc) noexcept : svc_(svc) {} @@ -333,8 +333,8 @@ reactor_basic_socket:: reactor_op_base* op = nullptr; reactor_op_base* base = nullptr; }; - // Max 3 ops (conn, rd, wr) - claimed_entry claimed[3]; + // Max 8 ops: conn, rd, wr, wait_rd, wait_wr, wait_er, recv_rd, send_wr + claimed_entry claimed[8]; int count = 0; { @@ -373,7 +373,7 @@ reactor_basic_socket:: { reactor_op_base* base = nullptr; }; - claimed_entry claimed[3]; + claimed_entry claimed[8]; int count = 0; { @@ -389,9 +389,12 @@ reactor_basic_socket:: }); desc_state_.read_ready = false; desc_state_.write_ready = false; - desc_state_.read_cancel_pending = false; - desc_state_.write_cancel_pending = false; - desc_state_.connect_cancel_pending = false; + desc_state_.read_cancel_pending = false; + desc_state_.write_cancel_pending = false; + desc_state_.connect_cancel_pending = false; + desc_state_.wait_read_cancel_pending = false; + desc_state_.wait_write_cancel_pending = false; + desc_state_.wait_error_cancel_pending = false; if (desc_state_.is_enqueued_.load(std::memory_order_acquire)) desc_state_.impl_ref_ = self; @@ -436,7 +439,7 @@ reactor_basic_socket:: { reactor_op_base* base = nullptr; }; - claimed_entry claimed[3]; + claimed_entry claimed[8]; int count = 0; { @@ -452,9 +455,12 @@ reactor_basic_socket:: }); desc_state_.read_ready = false; desc_state_.write_ready = false; - desc_state_.read_cancel_pending = false; - desc_state_.write_cancel_pending = false; - desc_state_.connect_cancel_pending = false; + desc_state_.read_cancel_pending = false; + desc_state_.write_cancel_pending = false; + desc_state_.connect_cancel_pending = false; + desc_state_.wait_read_cancel_pending = false; + desc_state_.wait_write_cancel_pending = false; + desc_state_.wait_error_cancel_pending = false; if (desc_state_.is_enqueued_.load(std::memory_order_acquire)) desc_state_.impl_ref_ = self; diff --git a/include/boost/corosio/native/detail/reactor/reactor_datagram_ops.hpp b/include/boost/corosio/native/detail/reactor/reactor_datagram_ops.hpp index e1909e955..a6ef1b288 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_datagram_ops.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_datagram_ops.hpp @@ -71,6 +71,14 @@ struct reactor_dgram_recv_op final void operator()() override; }; +template +struct reactor_dgram_wait_op final + : reactor_wait_op< + reactor_dgram_base_op> +{ + void operator()() override; +}; + // --- Deferred implementations --- template @@ -112,6 +120,13 @@ reactor_dgram_recv_op::operator()() complete_datagram_op(*this); } +template +void +reactor_dgram_wait_op::operator()() +{ + complete_wait_op(*this); +} + } // namespace boost::corosio::detail #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DATAGRAM_OPS_HPP diff --git a/include/boost/corosio/native/detail/reactor/reactor_datagram_socket.hpp b/include/boost/corosio/native/detail/reactor/reactor_datagram_socket.hpp index fe64df443..caa229ab2 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_datagram_socket.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_datagram_socket.hpp @@ -12,7 +12,9 @@ #include #include +#include #include +#include #include #include @@ -49,6 +51,7 @@ to_native_msg_flags(int flags) noexcept @tparam RecvFromOp The backend's recv_from op type. @tparam SendOp The backend's connected send op type. @tparam RecvOp The backend's connected recv op type. + @tparam WaitOp The backend's wait op type. @tparam DescState The backend's descriptor_state type. @tparam ImplBase The public vtable base (udp_socket::implementation or @@ -63,6 +66,7 @@ template< class RecvFromOp, class SendOp, class RecvOp, + class WaitOp, class DescState, class ImplBase = udp_socket::implementation, class Endpoint = endpoint> @@ -80,6 +84,9 @@ class reactor_datagram_socket Service, DescState, Endpoint>; + using self_type = reactor_datagram_socket< + Derived, Service, ConnOp, SendToOp, RecvFromOp, SendOp, RecvOp, WaitOp, + DescState, ImplBase, Endpoint>; friend base_type; friend Derived; @@ -106,6 +113,15 @@ class reactor_datagram_socket /// Pending connected recv operation slot. RecvOp recv_rd_; + /// Pending wait-for-read operation slot. + WaitOp wait_rd_; + + /// Pending wait-for-write operation slot. + WaitOp wait_wr_; + + /// Pending wait-for-error operation slot. + WaitOp wait_er_; + ~reactor_datagram_socket() override = default; /// Return the cached remote endpoint. @@ -181,6 +197,16 @@ class reactor_datagram_socket this->do_cancel(); } + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) override + { + return do_wait(h, ex, w, token, ec); + } + // --- End virtual overrides --- /// Close the socket (non-virtual, called by the service). @@ -269,6 +295,19 @@ class reactor_datagram_socket std::error_code*, std::size_t*); + /** Shared readiness-wait dispatch. + + Registers a wait op for the requested direction. Does not + perform any I/O syscall — completion is signalled when the + reactor delivers the matching edge event. + */ + std::coroutine_handle<> do_wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token const&, + std::error_code*); + /** Close the socket and cancel pending operations. Extends the base do_close_socket() to also reset @@ -331,6 +370,12 @@ class reactor_datagram_socket return &this->desc_state_.read_op; if (&op == static_cast(&send_wr_)) return &this->desc_state_.write_op; + if (&op == static_cast(&wait_rd_)) + return &this->desc_state_.wait_read_op; + if (&op == static_cast(&wait_wr_)) + return &this->desc_state_.wait_write_op; + if (&op == static_cast(&wait_er_)) + return &this->desc_state_.wait_error_op; return nullptr; } @@ -347,6 +392,12 @@ class reactor_datagram_socket return &this->desc_state_.read_cancel_pending; if (&op == static_cast(&send_wr_)) return &this->desc_state_.write_cancel_pending; + if (&op == static_cast(&wait_rd_)) + return &this->desc_state_.wait_read_cancel_pending; + if (&op == static_cast(&wait_wr_)) + return &this->desc_state_.wait_write_cancel_pending; + if (&op == static_cast(&wait_er_)) + return &this->desc_state_.wait_error_cancel_pending; return nullptr; } @@ -358,6 +409,9 @@ class reactor_datagram_socket fn(wr_); fn(recv_rd_); fn(send_wr_); + fn(wait_rd_); + fn(wait_wr_); + fn(wait_er_); } template @@ -368,6 +422,9 @@ class reactor_datagram_socket fn(wr_, this->desc_state_.write_op); fn(recv_rd_, this->desc_state_.read_op); fn(send_wr_, this->desc_state_.write_op); + fn(wait_rd_, this->desc_state_.wait_read_op); + fn(wait_wr_, this->desc_state_.wait_write_op); + fn(wait_er_, this->desc_state_.wait_error_op); } }; @@ -381,6 +438,7 @@ template< class RecvFromOp, class SendOp, class RecvOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> @@ -393,6 +451,7 @@ reactor_datagram_socket< RecvFromOp, SendOp, RecvOp, + WaitOp, DescState, ImplBase, Endpoint>:: @@ -491,6 +550,7 @@ template< class RecvFromOp, class SendOp, class RecvOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> @@ -503,6 +563,7 @@ reactor_datagram_socket< RecvFromOp, SendOp, RecvOp, + WaitOp, DescState, ImplBase, Endpoint>:: @@ -614,6 +675,7 @@ template< class RecvFromOp, class SendOp, class RecvOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> @@ -626,6 +688,7 @@ reactor_datagram_socket< RecvFromOp, SendOp, RecvOp, + WaitOp, DescState, ImplBase, Endpoint>:: @@ -703,6 +766,7 @@ template< class RecvFromOp, class SendOp, class RecvOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> @@ -715,6 +779,7 @@ reactor_datagram_socket< RecvFromOp, SendOp, RecvOp, + WaitOp, DescState, ImplBase, Endpoint>:: @@ -807,6 +872,7 @@ template< class RecvFromOp, class SendOp, class RecvOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> @@ -819,6 +885,7 @@ reactor_datagram_socket< RecvFromOp, SendOp, RecvOp, + WaitOp, DescState, ImplBase, Endpoint>:: @@ -908,6 +975,104 @@ reactor_datagram_socket< return std::noop_coroutine(); } +// do_wait + +template< + class Derived, + class Service, + class ConnOp, + class SendToOp, + class RecvFromOp, + class SendOp, + class RecvOp, + class WaitOp, + class DescState, + class ImplBase, + class Endpoint> +std::coroutine_handle<> +reactor_datagram_socket< + Derived, + Service, + ConnOp, + SendToOp, + RecvFromOp, + SendOp, + RecvOp, + WaitOp, + DescState, + ImplBase, + Endpoint>:: + do_wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token const& token, + std::error_code* ec) +{ + // wait_type::write completes immediately (see reactor_stream_socket::do_wait). + if (w == wait_type::write) + { + auto& op = wait_wr_; + if (this->svc_.scheduler().try_consume_inline_budget()) + { + *ec = std::error_code{}; + op.cont_op.cont.h = h; + return dispatch_coro(ex, op.cont_op.cont); + } + op.reset(); + op.wait_event = reactor_event_write; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = this->fd_; + op.start(token, static_cast(this)); + op.impl_ptr = this->shared_from_this(); + op.complete(0, 0); + this->svc_.post(&op); + return std::noop_coroutine(); + } + + WaitOp* op_ptr; + reactor_op_base** desc_slot_ptr; + bool* ready_flag_ptr; + bool* cancel_flag_ptr; + std::uint32_t event; + + bool dummy_ready = false; // no cached edge for error waits + + if (w == wait_type::read) + { + op_ptr = &wait_rd_; + desc_slot_ptr = &this->desc_state_.wait_read_op; + ready_flag_ptr = &this->desc_state_.read_ready; + cancel_flag_ptr = &this->desc_state_.wait_read_cancel_pending; + event = reactor_event_read; + } + else // wait_type::error + { + op_ptr = &wait_er_; + desc_slot_ptr = &this->desc_state_.wait_error_op; + ready_flag_ptr = &dummy_ready; + cancel_flag_ptr = &this->desc_state_.wait_error_cancel_pending; + event = reactor_event_error; + } + + auto& op = *op_ptr; + op.reset(); + op.wait_event = event; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = this->fd_; + op.start(token, static_cast(this)); + op.impl_ptr = this->shared_from_this(); + + this->register_op( + op, *desc_slot_ptr, *ready_flag_ptr, *cancel_flag_ptr, + false); + return std::noop_coroutine(); +} + } // namespace boost::corosio::detail #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_DATAGRAM_SOCKET_HPP diff --git a/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp b/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp index f763eeb21..6ba5731da 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_descriptor_state.hpp @@ -61,6 +61,15 @@ struct reactor_descriptor_state : scheduler_op /// Pending connect operation (guarded by `mutex`). reactor_op_base* connect_op = nullptr; + /// Pending wait-for-read operation (guarded by `mutex`). + reactor_op_base* wait_read_op = nullptr; + + /// Pending wait-for-write operation (guarded by `mutex`). + reactor_op_base* wait_write_op = nullptr; + + /// Pending wait-for-error operation (guarded by `mutex`). + reactor_op_base* wait_error_op = nullptr; + /// True if a read edge event arrived before an op was registered. bool read_ready = false; @@ -76,6 +85,15 @@ struct reactor_descriptor_state : scheduler_op /// Deferred connect cancellation (IOCP-style cancel semantics). bool connect_cancel_pending = false; + /// Deferred wait-read cancellation (IOCP-style cancel semantics). + bool wait_read_cancel_pending = false; + + /// Deferred wait-write cancellation (IOCP-style cancel semantics). + bool wait_write_cancel_pending = false; + + /// Deferred wait-error cancellation (IOCP-style cancel semantics). + bool wait_error_cancel_pending = false; + /// Event mask set during registration (no mutex needed). std::uint32_t registered_events = 0; @@ -185,6 +203,13 @@ reactor_descriptor_state::invoke_deferred_io() { read_ready = true; } + + // Complete any parked wait-for-read regardless of read_op presence. + if (wait_read_op) + { + wait_read_op->complete(err, 0); + local_ops.push(std::exchange(wait_read_op, nullptr)); + } } if (ev & reactor_event_write) { @@ -219,6 +244,22 @@ reactor_descriptor_state::invoke_deferred_io() } if (!had_write_op) write_ready = true; + + // Complete any parked wait-for-write regardless of write_op presence. + if (wait_write_op) + { + wait_write_op->complete(err, 0); + local_ops.push(std::exchange(wait_write_op, nullptr)); + } + } + // Complete a parked wait-for-error on any error condition. + if ((ev & reactor_event_error) || err) + { + if (wait_error_op) + { + wait_error_op->complete(err, 0); + local_ops.push(std::exchange(wait_error_op, nullptr)); + } } if (err) { @@ -237,6 +278,16 @@ reactor_descriptor_state::invoke_deferred_io() connect_op->complete(err, 0); local_ops.push(std::exchange(connect_op, nullptr)); } + if (wait_read_op) + { + wait_read_op->complete(err, 0); + local_ops.push(std::exchange(wait_read_op, nullptr)); + } + if (wait_write_op) + { + wait_write_op->complete(err, 0); + local_ops.push(std::exchange(wait_write_op, nullptr)); + } } } diff --git a/include/boost/corosio/native/detail/reactor/reactor_op.hpp b/include/boost/corosio/native/detail/reactor/reactor_op.hpp index f273bd8b4..ebca79190 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_op.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_op.hpp @@ -173,6 +173,46 @@ struct reactor_connect_op : Base } }; +/** Readiness-only wait operation. + + Does not perform any I/O syscall. Completion is signalled by + the reactor delivering the requested edge event; reactor_descriptor_state + calls complete() directly and never invokes perform_io(). + + @tparam Base The backend's base op type. +*/ +template +struct reactor_wait_op : Base +{ + /* Mirror of reactor_event_read from reactor_descriptor_state.hpp. + Including that header from here would create an include cycle + (descriptor_state -> reactor_op_base; reactor_op -> reactor_op_base), + so we carry the value locally. Both must stay in sync. */ + static constexpr std::uint32_t read_event = 0x001; + + /// Which event bit this wait targets (reactor_event_read/write/error). + std::uint32_t wait_event = 0; + + void reset() noexcept + { + Base::reset(); + wait_event = 0; + } + + bool is_read_operation() const noexcept override + { + return wait_event == read_event; + } + + /* perform_io() should never be called for a wait op — readiness + IS the completion. Overridden here to satisfy the virtual and + produce a safe result if called defensively. */ + void perform_io() noexcept override + { + this->complete(0, 0); + } +}; + /** Shared scatter-read operation. Uses readv() with an EINTR retry loop. diff --git a/include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp b/include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp index 8ab02749a..6fe89a697 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_op_complete.hpp @@ -88,6 +88,38 @@ complete_dgram_recv_op(Op& op) dispatch_coro(saved_ex, op.cont_op.cont).resume(); } +/** Complete a wait operation. + + Wait operations report only an error_code — no bytes_transferred, + no EOF translation. Used for socket and acceptor wait() awaitables; + picks the impl pointer set by start() to reach the scheduler. + + @tparam Op The concrete wait operation type. + @param op The operation to complete. +*/ +template +void +complete_wait_op(Op& op) +{ + op.stop_cb.reset(); + if (op.socket_impl_) + op.socket_impl_->desc_state_.scheduler_->reset_inline_budget(); + else + op.acceptor_impl_->desc_state_.scheduler_->reset_inline_budget(); + + if (op.cancelled.load(std::memory_order_acquire)) + *op.ec_out = capy::error::canceled; + else if (op.errn != 0) + *op.ec_out = make_err(op.errn); + else + *op.ec_out = {}; + + op.cont_op.cont.h = op.h; + capy::executor_ref saved_ex(op.ex); + auto prevent = std::move(op.impl_ptr); + dispatch_coro(saved_ex, op.cont_op.cont).resume(); +} + /** Complete a connect operation with endpoint caching. On success, queries the local endpoint via getsockname and diff --git a/include/boost/corosio/native/detail/reactor/reactor_socket_finals.hpp b/include/boost/corosio/native/detail/reactor/reactor_socket_finals.hpp index da28f6e4c..9ad573a98 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_socket_finals.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_socket_finals.hpp @@ -61,6 +61,7 @@ class reactor_stream_socket_impl reactor_stream_connect_op, reactor_stream_read_op, reactor_stream_write_op, + reactor_stream_wait_op, typename Traits::desc_state_type, ImplBase, Endpoint> @@ -121,6 +122,7 @@ class reactor_dgram_socket_impl reactor_dgram_recv_from_op, reactor_dgram_send_op, reactor_dgram_recv_op, + reactor_dgram_wait_op, typename Traits::desc_state_type, ImplBase, Endpoint> @@ -160,6 +162,7 @@ class reactor_acceptor_impl Service, reactor_stream_base_op, reactor_stream_accept_op, + reactor_stream_wait_op, typename Traits::desc_state_type, AccImplBase, Endpoint> diff --git a/include/boost/corosio/native/detail/reactor/reactor_stream_ops.hpp b/include/boost/corosio/native/detail/reactor/reactor_stream_ops.hpp index f35941652..8d70d0b02 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_stream_ops.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_stream_ops.hpp @@ -71,6 +71,14 @@ struct reactor_stream_accept_op final void operator()() override; }; +template +struct reactor_stream_wait_op final + : reactor_wait_op< + reactor_stream_base_op> +{ + void operator()() override; +}; + // --- Deferred implementations (instantiated when Socket/Acceptor are complete) --- template @@ -106,6 +114,13 @@ reactor_stream_accept_op::operator()() complete_accept_op(*this); } +template +void +reactor_stream_wait_op::operator()() +{ + complete_wait_op(*this); +} + } // namespace boost::corosio::detail #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_OPS_HPP diff --git a/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp b/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp index 406a52c7e..217813299 100644 --- a/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp +++ b/include/boost/corosio/native/detail/reactor/reactor_stream_socket.hpp @@ -12,7 +12,9 @@ #include #include +#include #include +#include #include #include @@ -28,13 +30,14 @@ namespace boost::corosio::detail { Inherits shared data members and cancel/close/register logic from reactor_basic_socket. Adds the stream-specific remote - endpoint, shutdown, and I/O dispatch (connect, read, write). + endpoint, shutdown, and I/O dispatch (connect, read, write, wait). @tparam Derived The concrete socket type (CRTP). @tparam Service The backend's socket service type. @tparam ConnOp The backend's connect op type. @tparam ReadOp The backend's read op type. @tparam WriteOp The backend's write op type. + @tparam WaitOp The backend's wait op type. @tparam DescState The backend's descriptor_state type. @tparam ImplBase The public vtable base (tcp_socket::implementation or @@ -47,6 +50,7 @@ template< class ConnOp, class ReadOp, class WriteOp, + class WaitOp, class DescState, class ImplBase = tcp_socket::implementation, class Endpoint = endpoint> @@ -64,6 +68,9 @@ class reactor_stream_socket Service, DescState, Endpoint>; + using self_type = reactor_stream_socket< + Derived, Service, ConnOp, ReadOp, WriteOp, WaitOp, + DescState, ImplBase, Endpoint>; friend base_type; friend Derived; @@ -84,6 +91,15 @@ class reactor_stream_socket /// Pending write operation slot. WriteOp wr_; + /// Pending wait-for-read operation slot. + WaitOp wait_rd_; + + /// Pending wait-for-write operation slot. + WaitOp wait_wr_; + + /// Pending wait-for-error operation slot. + WaitOp wait_er_; + ~reactor_stream_socket() override = default; /// Return the cached remote endpoint. @@ -126,6 +142,16 @@ class reactor_stream_socket return do_write_some(h, ex, param, token, ec, bytes_out); } + std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) override + { + return do_wait(h, ex, w, token, ec); + } + std::error_code shutdown(corosio::shutdown_type what) noexcept override { @@ -219,6 +245,19 @@ class reactor_stream_socket std::error_code*, std::size_t*); + /** Shared readiness-wait dispatch. + + Registers a wait op for the requested direction. Does not + perform any I/O syscall — completion is signalled when the + reactor delivers the matching edge event. + */ + std::coroutine_handle<> do_wait( + std::coroutine_handle<>, + capy::executor_ref, + wait_type, + std::stop_token const&, + std::error_code*); + /** Close the socket and cancel pending operations. Extends the base do_close_socket() to also reset @@ -242,6 +281,12 @@ class reactor_stream_socket return &this->desc_state_.read_op; if (&op == static_cast(&wr_)) return &this->desc_state_.write_op; + if (&op == static_cast(&wait_rd_)) + return &this->desc_state_.wait_read_op; + if (&op == static_cast(&wait_wr_)) + return &this->desc_state_.wait_write_op; + if (&op == static_cast(&wait_er_)) + return &this->desc_state_.wait_error_op; return nullptr; } @@ -254,6 +299,12 @@ class reactor_stream_socket return &this->desc_state_.read_cancel_pending; if (&op == static_cast(&wr_)) return &this->desc_state_.write_cancel_pending; + if (&op == static_cast(&wait_rd_)) + return &this->desc_state_.wait_read_cancel_pending; + if (&op == static_cast(&wait_wr_)) + return &this->desc_state_.wait_write_cancel_pending; + if (&op == static_cast(&wait_er_)) + return &this->desc_state_.wait_error_cancel_pending; return nullptr; } @@ -263,6 +314,9 @@ class reactor_stream_socket fn(conn_); fn(rd_); fn(wr_); + fn(wait_rd_); + fn(wait_wr_); + fn(wait_er_); } template @@ -271,6 +325,9 @@ class reactor_stream_socket fn(conn_, this->desc_state_.connect_op); fn(rd_, this->desc_state_.read_op); fn(wr_, this->desc_state_.write_op); + fn(wait_rd_, this->desc_state_.wait_read_op); + fn(wait_wr_, this->desc_state_.wait_write_op); + fn(wait_er_, this->desc_state_.wait_error_op); } }; @@ -280,11 +337,12 @@ template< class ConnOp, class ReadOp, class WriteOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> std::coroutine_handle<> -reactor_stream_socket:: +reactor_stream_socket:: do_connect( std::coroutine_handle<> h, capy::executor_ref ex, @@ -355,11 +413,12 @@ template< class ConnOp, class ReadOp, class WriteOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> std::coroutine_handle<> -reactor_stream_socket:: +reactor_stream_socket:: do_read_some( std::coroutine_handle<> h, capy::executor_ref ex, @@ -463,11 +522,12 @@ template< class ConnOp, class ReadOp, class WriteOp, + class WaitOp, class DescState, class ImplBase, class Endpoint> std::coroutine_handle<> -reactor_stream_socket:: +reactor_stream_socket:: do_write_some( std::coroutine_handle<> h, capy::executor_ref ex, @@ -555,6 +615,93 @@ reactor_stream_socket +std::coroutine_handle<> +reactor_stream_socket:: + do_wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token const& token, + std::error_code* ec) +{ + // wait_type::write completes immediately on a connected socket, + // matching asio's behavior on IOCP. Corosio's reactor backends use + // edge-triggered EPOLLOUT, which would never fire on an already- + // writable socket; an immediate completion is also a more useful + // contract than parking until a non-writable -> writable transition. + if (w == wait_type::write) + { + auto& op = wait_wr_; + if (this->svc_.scheduler().try_consume_inline_budget()) + { + *ec = std::error_code{}; + op.cont_op.cont.h = h; + return dispatch_coro(ex, op.cont_op.cont); + } + op.reset(); + op.wait_event = reactor_event_write; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = this->fd_; + op.start(token, static_cast(this)); + op.impl_ptr = this->shared_from_this(); + op.complete(0, 0); + this->svc_.post(&op); + return std::noop_coroutine(); + } + + // Pick refs up-front to avoid duplicating the register_op call. + WaitOp* op_ptr; + reactor_op_base** desc_slot_ptr; + bool* ready_flag_ptr; + bool* cancel_flag_ptr; + std::uint32_t event; + + bool dummy_ready = false; // placeholder for error waits (no cached edge) + + if (w == wait_type::read) + { + op_ptr = &wait_rd_; + desc_slot_ptr = &this->desc_state_.wait_read_op; + ready_flag_ptr = &this->desc_state_.read_ready; + cancel_flag_ptr = &this->desc_state_.wait_read_cancel_pending; + event = reactor_event_read; + } + else // wait_type::error + { + op_ptr = &wait_er_; + desc_slot_ptr = &this->desc_state_.wait_error_op; + ready_flag_ptr = &dummy_ready; + cancel_flag_ptr = &this->desc_state_.wait_error_cancel_pending; + event = reactor_event_error; + } + + auto& op = *op_ptr; + op.reset(); + op.wait_event = event; + op.h = h; + op.ex = ex; + op.ec_out = ec; + op.fd = this->fd_; + op.start(token, static_cast(this)); + op.impl_ptr = this->shared_from_this(); + + this->register_op(op, *desc_slot_ptr, *ready_flag_ptr, *cancel_flag_ptr, + false); + return std::noop_coroutine(); +} + } // namespace boost::corosio::detail #endif // BOOST_COROSIO_NATIVE_DETAIL_REACTOR_REACTOR_STREAM_SOCKET_HPP diff --git a/include/boost/corosio/tcp_acceptor.hpp b/include/boost/corosio/tcp_acceptor.hpp index 04374ece8..b1c93973e 100644 --- a/include/boost/corosio/tcp_acceptor.hpp +++ b/include/boost/corosio/tcp_acceptor.hpp @@ -13,6 +13,8 @@ #include #include +#include +#include #include #include #include @@ -80,6 +82,22 @@ namespace boost::corosio { */ class BOOST_COROSIO_DECL tcp_acceptor : public io_object { + struct wait_awaitable + : detail::void_op_base + { + tcp_acceptor& acc_; + wait_type w_; + + wait_awaitable(tcp_acceptor& acc, wait_type w) noexcept + : acc_(acc), w_(w) {} + + std::coroutine_handle<> dispatch( + std::coroutine_handle<> h, capy::executor_ref ex) const + { + return acc_.get().wait(h, ex, w_, token_, &ec_); + } + }; + struct accept_awaitable { tcp_acceptor& acc_; @@ -331,6 +349,29 @@ class BOOST_COROSIO_DECL tcp_acceptor : public io_object return accept_awaitable(*this, peer); } + /** Wait for an incoming connection or readiness condition. + + Suspends until the listen socket is ready in the + requested direction, or an error condition is reported. + For `wait_type::read`, completion signals that a + subsequent @ref accept will succeed without blocking. + No connection is consumed. + + @param w The wait direction. + + @return An awaitable that completes with `io_result<>`. + + @par Preconditions + The acceptor must be listening. This acceptor must + outlive the returned awaitable. + */ + [[nodiscard]] auto wait(wait_type w) + { + if (!is_open()) + detail::throw_logic_error("wait: acceptor not listening"); + return wait_awaitable(*this, w); + } + /** Cancel any pending asynchronous operations. All outstanding operations complete with `errc::operation_canceled`. @@ -432,6 +473,20 @@ class BOOST_COROSIO_DECL tcp_acceptor : public io_object std::error_code*, io_object::implementation**) = 0; + /** Initiate an asynchronous wait for acceptor readiness. + + Completes when the listen socket becomes ready for + the specified direction (typically `wait_type::read` + for an incoming connection), or an error condition is + reported. No connection is consumed. + */ + virtual std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) = 0; + /// Returns the cached local endpoint. virtual endpoint local_endpoint() const noexcept = 0; diff --git a/include/boost/corosio/tcp_socket.hpp b/include/boost/corosio/tcp_socket.hpp index 8611d26d7..d5e6d51fc 100644 --- a/include/boost/corosio/tcp_socket.hpp +++ b/include/boost/corosio/tcp_socket.hpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -108,6 +109,27 @@ class BOOST_COROSIO_DECL tcp_socket : public io_stream std::stop_token token, std::error_code* ec) = 0; + /** Initiate an asynchronous wait for socket readiness. + + Completes when the socket becomes ready for the + specified direction, or an error condition is + reported. No bytes are transferred. + + @param h Coroutine handle to resume on completion. + @param ex Executor for dispatching the completion. + @param w The direction to wait on. + @param token Stop token for cancellation. + @param ec Output error code. + + @return Coroutine handle to resume immediately. + */ + virtual std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) = 0; + /** Shut down the socket for the given direction(s). @param what The shutdown direction. @@ -177,6 +199,23 @@ class BOOST_COROSIO_DECL tcp_socket : public io_stream } }; + /// Represent the awaitable returned by @ref wait. + struct wait_awaitable + : detail::void_op_base + { + tcp_socket& s_; + wait_type w_; + + wait_awaitable(tcp_socket& s, wait_type w) noexcept + : s_(s), w_(w) {} + + std::coroutine_handle<> dispatch( + std::coroutine_handle<> h, capy::executor_ref ex) const + { + return s_.get().wait(h, ex, w_, token_, &ec_); + } + }; + public: /** Destructor. @@ -344,6 +383,35 @@ class BOOST_COROSIO_DECL tcp_socket : public io_stream return connect_awaitable(*this, ep); } + /** Wait for the socket to become ready in a given direction. + + Suspends until the socket is ready for the requested + direction, or an error condition is reported. No bytes + are transferred — useful for integrating with C libraries + that own the I/O on a nonblocking fd and only need + readiness notification (e.g. libpq async, libssh). + + The operation supports cancellation via `std::stop_token` + through the affine awaitable protocol. If the associated + stop token is triggered, the operation completes + immediately with `errc::operation_canceled`. + + @param w The wait direction (read, write, or error). + + @return An awaitable that completes with `io_result<>`. + On success, no bytes have been consumed from the + stream; a subsequent `read_some` (for read waits) + returns the available data. + + @par Preconditions + The socket must be open. This socket must outlive the + returned awaitable. + */ + [[nodiscard]] auto wait(wait_type w) + { + return wait_awaitable(*this, w); + } + /** Cancel any pending asynchronous operations. All outstanding operations complete with `errc::operation_canceled`. diff --git a/include/boost/corosio/udp_socket.hpp b/include/boost/corosio/udp_socket.hpp index 2fe42a8cb..0fd68396a 100644 --- a/include/boost/corosio/udp_socket.hpp +++ b/include/boost/corosio/udp_socket.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -242,6 +243,27 @@ class BOOST_COROSIO_DECL udp_socket : public io_object std::stop_token token, std::error_code* ec, std::size_t* bytes_out) = 0; + + /** Initiate an asynchronous wait for socket readiness. + + Completes when the socket becomes ready for the + specified direction, or an error condition is + reported. No bytes are transferred. + + @param h Coroutine handle to resume on completion. + @param ex Executor for dispatching the completion. + @param w The direction to wait on. + @param token Stop token for cancellation. + @param ec Output error code. + + @return Coroutine handle to resume immediately. + */ + virtual std::coroutine_handle<> wait( + std::coroutine_handle<> h, + capy::executor_ref ex, + wait_type w, + std::stop_token token, + std::error_code* ec) = 0; }; /** Represent the awaitable returned by @ref send_to. @@ -313,6 +335,23 @@ class BOOST_COROSIO_DECL udp_socket : public io_object } }; + /// Represent the awaitable returned by @ref wait. + struct wait_awaitable + : detail::void_op_base + { + udp_socket& s_; + wait_type w_; + + wait_awaitable(udp_socket& s, wait_type w) noexcept + : s_(s), w_(w) {} + + std::coroutine_handle<> dispatch( + std::coroutine_handle<> h, capy::executor_ref ex) const + { + return s_.get().wait(h, ex, w_, token_, &ec_); + } + }; + /// Represent the awaitable returned by @ref send. struct send_awaitable : detail::bytes_op_base @@ -594,6 +633,27 @@ class BOOST_COROSIO_DECL udp_socket : public io_object return connect_awaitable(*this, ep); } + /** Wait for the socket to become ready in a given direction. + + Suspends until the socket is ready for the requested + direction, or an error condition is reported. No bytes + are transferred. + + The operation supports cancellation via `std::stop_token`. + + @param w The wait direction (read, write, or error). + + @return An awaitable that completes with `io_result<>`. + + @par Preconditions + The socket must be open. This socket must outlive the + returned awaitable. + */ + [[nodiscard]] auto wait(wait_type w) + { + return wait_awaitable(*this, w); + } + /** Send a datagram to the connected peer. @param buf The buffer containing data to send. diff --git a/include/boost/corosio/wait_type.hpp b/include/boost/corosio/wait_type.hpp new file mode 100644 index 000000000..e253b4c08 --- /dev/null +++ b/include/boost/corosio/wait_type.hpp @@ -0,0 +1,39 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_WAIT_TYPE_HPP +#define BOOST_COROSIO_WAIT_TYPE_HPP + +namespace boost::corosio { + +/** Direction selector for socket and acceptor wait() operations. + + Passed to socket::wait() and acceptor::wait() to select which + readiness condition to await before returning. +*/ +enum class wait_type +{ + /// Wait until the descriptor is ready for a non-blocking read. + read, + + /// Wait until the descriptor is ready for a non-blocking write. + write, + + /// Wait until an error condition has been reported by the kernel + /// (e.g. SO_ERROR is non-zero or an exceptional event is pending). + /// Error events are not buffered across operations: an error that + /// fires before wait(error) is registered may be lost. Kernel + /// semantics for what counts as an "error condition" vary by + /// platform; treat the contract as best-effort. + error +}; + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_WAIT_TYPE_HPP diff --git a/test/unit/wait.cpp b/test/unit/wait.cpp new file mode 100644 index 000000000..e57b1a712 --- /dev/null +++ b/test/unit/wait.cpp @@ -0,0 +1,375 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +// Test that header is self-contained. +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#if BOOST_COROSIO_POSIX +#include +#include +#include + +#include +#include + +#include +#endif + +#include "context.hpp" +#include "test_suite.hpp" + +namespace boost::corosio { + +#if BOOST_COROSIO_POSIX +namespace { + +std::string +make_temp_socket_path() +{ + char tmpl[] = "/tmp/corosio_wait_XXXXXX"; + if (!::mkdtemp(tmpl)) + throw std::runtime_error("mkdtemp failed"); + std::string path(tmpl); + path += "/sock"; + return path; +} + +void +cleanup_path(std::string const& path) +{ + ::unlink(path.c_str()); + auto dir = path.substr(0, path.rfind('/')); + ::rmdir(dir.c_str()); +} + +} // namespace +#endif + +template +struct wait_test +{ + // wait_read completes when the peer sends data, no bytes consumed. + void testWaitReadAndNoConsume() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto [s1, s2] = test::make_socket_pair(ioc); + + constexpr std::string_view payload = "hello"; + + std::error_code wait_ec; + bool wait_done = false; + std::error_code read_ec; + std::size_t bytes_read = 0; + std::array buf{}; + + auto reader = [&]() -> capy::task<> { + auto [ec1] = co_await s1.wait(wait_type::read); + wait_ec = ec1; + wait_done = true; + if (ec1) + co_return; + auto [ec2, n] = co_await s1.read_some( + capy::mutable_buffer(buf.data(), buf.size())); + read_ec = ec2; + bytes_read = n; + }; + auto writer = [&]() -> capy::task<> { + auto [ec, n] = co_await s2.write_some( + capy::const_buffer(payload.data(), payload.size())); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(reader()); + capy::run_async(ex)(writer()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + BOOST_TEST(!read_ec); + BOOST_TEST_EQ(bytes_read, payload.size()); + } + + // wait_type::write completes immediately on a connected socket. + // Corosio matches asio's IOCP behavior: writability is always + // treated as ready, the wait does not park on a edge transition. + void testWaitWriteImmediate() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto [s1, s2] = test::make_socket_pair(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto writer = [&]() -> capy::task<> { + auto [ec] = co_await s1.wait(wait_type::write); + wait_ec = ec; + wait_done = true; + }; + + capy::run_async(ex)(writer()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + +#if BOOST_COROSIO_POSIX + // local_stream_socket wait_read fires when the peer writes. + void testWaitOnLocalStream() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto path = make_temp_socket_path(); + + local_stream_acceptor acc(ioc); + acc.open(); + auto bec = acc.bind(local_endpoint(path)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + + local_stream_socket server(ioc); + local_stream_socket client(ioc); + client.open(); + + auto accept_task = [&]() -> capy::task<> { + auto [ec] = co_await acc.accept(server); + (void)ec; + }; + auto connect_task = [&]() -> capy::task<> { + auto [ec] = co_await client.connect(local_endpoint(path)); + (void)ec; + }; + capy::run_async(ex)(accept_task()); + capy::run_async(ex)(connect_task()); + ioc.run(); + ioc.restart(); + + constexpr std::string_view payload = "hi"; + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await server.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto writer = [&]() -> capy::task<> { + auto [ec, n] = co_await client.write_some( + capy::const_buffer(payload.data(), payload.size())); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(writer()); + ioc.run(); + + cleanup_path(path); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } +#endif // BOOST_COROSIO_POSIX + + // Cancellation via socket.cancel() yields operation_canceled. + void testCancellation() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + auto [s1, s2] = test::make_socket_pair(ioc); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await s1.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto canceller = [&]() -> capy::task<> { + timer t(ioc); + t.expires_after(std::chrono::milliseconds(20)); + (void)co_await t.wait(); + s1.cancel(); + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(canceller()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(wait_ec == capy::cond::canceled); + } + + // Acceptor wait_read fires when a client connects; accept then succeeds. + void testAcceptorWait() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + tcp_acceptor acc(ioc); + acc.open(); + acc.set_option(socket_option::reuse_address(true)); + auto bec = acc.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + auto lec = acc.listen(); + BOOST_TEST(!lec); + auto port = acc.local_endpoint().port(); + + std::error_code wait_ec; + bool wait_done = false; + std::error_code accept_ec; + tcp_socket peer(ioc); + tcp_socket client(ioc); + + auto waiter = [&]() -> capy::task<> { + auto [ec1] = co_await acc.wait(wait_type::read); + wait_ec = ec1; + wait_done = true; + if (ec1) + co_return; + auto [ec2] = co_await acc.accept(peer); + accept_ec = ec2; + }; + auto connector = [&]() -> capy::task<> { + auto [ec] = co_await client.connect( + endpoint(ipv4_address::loopback(), port)); + (void)ec; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(connector()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + BOOST_TEST(!accept_ec); + BOOST_TEST(peer.is_open()); + } + + // UDP socket wait_read completes when a datagram arrives. + void testWaitOnUdp() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + udp_socket recv(ioc); + recv.open(udp::v4()); + auto bec = recv.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + auto port = recv.local_endpoint().port(); + + udp_socket send(ioc); + send.open(udp::v4()); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await recv.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto sender = [&]() -> capy::task<> { + char dg[1] = { 'X' }; + auto [ec, n] = co_await send.send_to( + capy::const_buffer(dg, sizeof(dg)), + endpoint(ipv4_address::loopback(), port)); + (void)ec; + (void)n; + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(sender()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(!wait_ec); + } + + // Cancel a UDP wait_read while it's parked. On IOCP this exercises + // the auxiliary WSAPoll reactor's cancel_wait path, where the op + // has no overlapped I/O pending so CancelIoEx is a no-op and the + // cancellation must be delivered through the reactor itself. + void testUdpCancellation() + { + io_context ioc(Backend); + auto ex = ioc.get_executor(); + + udp_socket sock(ioc); + sock.open(udp::v4()); + auto bec = sock.bind(endpoint(ipv4_address::loopback(), 0)); + BOOST_TEST(!bec); + + std::error_code wait_ec; + bool wait_done = false; + + auto waiter = [&]() -> capy::task<> { + auto [ec] = co_await sock.wait(wait_type::read); + wait_ec = ec; + wait_done = true; + }; + auto canceller = [&]() -> capy::task<> { + timer t(ioc); + t.expires_after(std::chrono::milliseconds(20)); + (void)co_await t.wait(); + sock.cancel(); + }; + + capy::run_async(ex)(waiter()); + capy::run_async(ex)(canceller()); + ioc.run(); + + BOOST_TEST(wait_done); + BOOST_TEST(wait_ec == capy::cond::canceled); + } + + void run() + { + testWaitReadAndNoConsume(); + testWaitWriteImmediate(); +#if BOOST_COROSIO_POSIX + testWaitOnLocalStream(); +#endif + testCancellation(); + testAcceptorWait(); + testWaitOnUdp(); + testUdpCancellation(); + } +}; + +COROSIO_BACKEND_TESTS(wait_test, "boost.corosio.wait") + +} // namespace boost::corosio