From d57e9f5076e25a9a1890afd0448800095433a14e Mon Sep 17 00:00:00 2001 From: Michael Vandeberg Date: Tue, 19 May 2026 15:02:22 -0700 Subject: [PATCH] feat(local): close POSIX/Windows gaps for local stream sockets MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add Windows make_local_stream_pair() via temp-listener emulation of socketpair(), enabling socket-pair tests on IOCP - Implement assign_socket() in win_local_stream_service so raw SOCKET handles can be registered with the IOCP port - Replace ConnectEx/AcceptEx with blocking connect()/accept() on worker threads for AF_UNIX — the IOCP extension functions are not reliable for AF_UNIX on all Windows versions - Enable local_stream_socket tests on Windows (construction, open, move, connect/accept, read/write, available, release) - Add portable temp_socket_dir helper using std::filesystem for temp paths across platforms - Guard local datagram code (SOCK_DGRAM) as POSIX-only at compile time — Windows does not support AF_UNIX SOCK_DGRAM - Remove dead IOCP datagram implementation files (win_local_dgram_service.hpp, win_local_dgram_socket.hpp) - Document Windows limitation on local_datagram_socket and local_datagram headers --- .github/compilers.json | 2 +- include/boost/corosio.hpp | 8 +- include/boost/corosio/backend.hpp | 4 - .../corosio/detail/local_datagram_service.hpp | 6 + include/boost/corosio/local_datagram.hpp | 8 + .../boost/corosio/local_datagram_socket.hpp | 9 + include/boost/corosio/local_socket_pair.hpp | 18 +- .../detail/iocp/win_local_dgram_service.hpp | 1135 ----------------- .../detail/iocp/win_local_dgram_socket.hpp | 341 ----- .../detail/iocp/win_local_stream_service.hpp | 22 +- include/boost/corosio/test/temp_path.hpp | 106 ++ src/corosio/src/io_context.cpp | 2 - src/corosio/src/local_datagram.cpp | 17 +- src/corosio/src/local_datagram_socket.cpp | 22 +- src/corosio/src/local_socket_pair.cpp | 127 +- test/unit/local_datagram_socket.cpp | 47 +- test/unit/local_stream_socket.cpp | 81 +- test/unit/wait.cpp | 51 +- 18 files changed, 348 insertions(+), 1658 deletions(-) delete mode 100644 include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp delete mode 100644 include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp create mode 100644 include/boost/corosio/test/temp_path.hpp diff --git a/.github/compilers.json b/.github/compilers.json index 539f5ecdd..15026a684 100644 --- a/.github/compilers.json +++ b/.github/compilers.json @@ -155,7 +155,7 @@ "latest_cxxstd": "20", "cxx": "g++", "cc": "gcc", - "runs_on": "windows-2022", + "runs_on": "windows-2025", "b2_toolset": "gcc", "generator": "MinGW Makefiles", "shared": false, diff --git a/include/boost/corosio.hpp b/include/boost/corosio.hpp index 9a1304508..d09bea0f0 100644 --- a/include/boost/corosio.hpp +++ b/include/boost/corosio.hpp @@ -35,9 +35,15 @@ #include #include #include +#include + +// local_datagram.hpp and local_datagram_socket.hpp are POSIX-only; +// Windows does not support AF_UNIX datagram sockets (SOCK_DGRAM). +#include +#if BOOST_COROSIO_POSIX #include #include -#include +#endif #include #include diff --git a/include/boost/corosio/backend.hpp b/include/boost/corosio/backend.hpp index e1d52ee09..022f1fc4e 100644 --- a/include/boost/corosio/backend.hpp +++ b/include/boost/corosio/backend.hpp @@ -214,8 +214,6 @@ class win_local_stream_socket; class win_local_stream_service; class win_local_stream_acceptor; class win_local_stream_acceptor_service; -class win_local_dgram_socket; -class win_local_dgram_service; class win_signal; class win_signals; @@ -251,8 +249,6 @@ struct iocp_t using local_stream_service_type = detail::win_local_stream_service; using local_stream_acceptor_type = detail::win_local_stream_acceptor; using local_stream_acceptor_service_type = detail::win_local_stream_acceptor_service; - using local_datagram_socket_type = detail::win_local_dgram_socket; - using local_datagram_service_type = detail::win_local_dgram_service; /// @} using signal_type = detail::win_signal; diff --git a/include/boost/corosio/detail/local_datagram_service.hpp b/include/boost/corosio/detail/local_datagram_service.hpp index e80d452da..075460e54 100644 --- a/include/boost/corosio/detail/local_datagram_service.hpp +++ b/include/boost/corosio/detail/local_datagram_service.hpp @@ -11,6 +11,10 @@ #define BOOST_COROSIO_DETAIL_LOCAL_DATAGRAM_SERVICE_HPP #include +#include + +#if BOOST_COROSIO_POSIX + #include #include #include @@ -90,4 +94,6 @@ class BOOST_COROSIO_DECL local_datagram_service } // namespace boost::corosio::detail +#endif // BOOST_COROSIO_POSIX + #endif // BOOST_COROSIO_DETAIL_LOCAL_DATAGRAM_SERVICE_HPP diff --git a/include/boost/corosio/local_datagram.hpp b/include/boost/corosio/local_datagram.hpp index 16ba5229b..b75d5c703 100644 --- a/include/boost/corosio/local_datagram.hpp +++ b/include/boost/corosio/local_datagram.hpp @@ -11,6 +11,9 @@ #define BOOST_COROSIO_LOCAL_DATAGRAM_HPP #include +#include + +#if BOOST_COROSIO_POSIX namespace boost::corosio { @@ -25,6 +28,9 @@ class local_datagram_socket; in the compiled library to avoid exposing platform socket headers. + @note Not available on Windows. Windows does not support + AF_UNIX datagram sockets (SOCK_DGRAM). + @see local_datagram_socket */ class BOOST_COROSIO_DECL local_datagram @@ -45,4 +51,6 @@ class BOOST_COROSIO_DECL local_datagram } // namespace boost::corosio +#endif // BOOST_COROSIO_POSIX + #endif // BOOST_COROSIO_LOCAL_DATAGRAM_HPP diff --git a/include/boost/corosio/local_datagram_socket.hpp b/include/boost/corosio/local_datagram_socket.hpp index 7797c3e9e..8bfe16d80 100644 --- a/include/boost/corosio/local_datagram_socket.hpp +++ b/include/boost/corosio/local_datagram_socket.hpp @@ -12,6 +12,9 @@ #include #include + +#if BOOST_COROSIO_POSIX + #include #include #include @@ -56,6 +59,10 @@ namespace boost::corosio { kernel filters incoming datagrams to those from the connected peer. + @note Not available on Windows. Windows does not support + AF_UNIX datagram sockets (SOCK_DGRAM). Attempting to + open this socket on Windows will fail. + @par Cancellation All asynchronous operations support cancellation through `std::stop_token` via the affine protocol, or explicitly @@ -871,4 +878,6 @@ class BOOST_COROSIO_DECL local_datagram_socket : public io_object } // namespace boost::corosio +#endif // BOOST_COROSIO_POSIX + #endif // BOOST_COROSIO_LOCAL_DATAGRAM_SOCKET_HPP diff --git a/include/boost/corosio/local_socket_pair.hpp b/include/boost/corosio/local_socket_pair.hpp index f9ceb029d..c9a85464e 100644 --- a/include/boost/corosio/local_socket_pair.hpp +++ b/include/boost/corosio/local_socket_pair.hpp @@ -12,11 +12,11 @@ #include #include +#include #if BOOST_COROSIO_POSIX - -#include #include +#endif #include @@ -26,9 +26,10 @@ class io_context; /** Create a connected pair of local stream sockets. - Uses socketpair(AF_UNIX, SOCK_STREAM) to create two - pre-connected sockets. Data written to one can be read - from the other. + On POSIX, uses socketpair(AF_UNIX, SOCK_STREAM). On Windows, + emulates socketpair by creating a temporary listener, connecting, + and accepting. Data written to one socket can be read from the + other. @param ctx The I/O context for the sockets. @@ -39,11 +40,15 @@ class io_context; BOOST_COROSIO_DECL std::pair make_local_stream_pair(io_context& ctx); +#if BOOST_COROSIO_POSIX /** Create a connected pair of local datagram sockets. Uses socketpair(AF_UNIX, SOCK_DGRAM) to create two pre-connected sockets. + @note Not available on Windows. Windows does not support + AF_UNIX datagram sockets. + @param ctx The I/O context for the sockets. @return A pair of connected local datagram sockets. @@ -52,9 +57,8 @@ make_local_stream_pair(io_context& ctx); */ BOOST_COROSIO_DECL std::pair make_local_datagram_pair(io_context& ctx); +#endif } // namespace boost::corosio -#endif // BOOST_COROSIO_POSIX - #endif // BOOST_COROSIO_LOCAL_SOCKET_PAIR_HPP diff --git a/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp deleted file mode 100644 index 59a2be664..000000000 --- a/include/boost/corosio/native/detail/iocp/win_local_dgram_service.hpp +++ /dev/null @@ -1,1135 +0,0 @@ -// -// Copyright (c) 2026 Michael Vandeberg -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SERVICE_HPP -#define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SERVICE_HPP - -#include - -#if BOOST_COROSIO_HAS_IOCP - -#include -#include - -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - -#include - -namespace boost::corosio::detail { - -/* Map portable message_flags values to native MSG_* constants. */ -inline DWORD -local_dgram_to_native_msg_flags(int flags) noexcept -{ - DWORD native = 0; - if (flags & 1) native |= MSG_PEEK; - if (flags & 2) native |= MSG_OOB; - if (flags & 4) native |= MSG_DONTROUTE; - return native; -} - -/* IOCP local datagram service. - - Inherits from local_datagram_service to enable runtime polymorphism - via use_service(). -*/ -class BOOST_COROSIO_DECL win_local_dgram_service final - : private win_wsa_init - , public local_datagram_service -{ -public: - io_object::implementation* construct() override; - - void destroy(io_object::implementation* p) override; - - void close(io_object::handle& h) override; - - explicit win_local_dgram_service(capy::execution_context& ctx); - - ~win_local_dgram_service(); - - win_local_dgram_service(win_local_dgram_service const&) = delete; - win_local_dgram_service& operator=(win_local_dgram_service const&) = delete; - - void shutdown() override; - - std::error_code open_socket( - local_datagram_socket::implementation& impl, - int family, int type, int protocol) override; - - std::error_code assign_socket( - local_datagram_socket::implementation& impl, - native_handle_type fd) override; - - std::error_code bind_socket( - local_datagram_socket::implementation& impl, - corosio::local_endpoint ep) override; - - void destroy_impl(win_local_dgram_socket& impl); - - void unregister_impl(win_local_dgram_socket_internal& impl); - - std::error_code open_socket_internal( - win_local_dgram_socket_internal& impl, - int family, int type, int protocol); - - void post(overlapped_op* op); - void on_pending(overlapped_op* op) noexcept; - void on_completion(overlapped_op* op, DWORD error, DWORD bytes) noexcept; - void work_started() noexcept; - void work_finished() noexcept; - - /** Return the owning IOCP scheduler. */ - win_scheduler& scheduler() noexcept - { - return sched_; - } - -private: - win_scheduler& sched_; - win_mutex mutex_; - intrusive_list socket_list_; - intrusive_list wrapper_list_; - void* iocp_; -}; - -// ============================================================ -// Operation constructors -// ============================================================ - -inline local_dgram_send_to_op::local_dgram_send_to_op( - win_local_dgram_socket_internal& internal_) noexcept - : overlapped_op(&do_complete) - , internal(internal_) -{ - cancel_func_ = &do_cancel_impl; -} - -inline local_dgram_recv_from_op::local_dgram_recv_from_op( - win_local_dgram_socket_internal& internal_) noexcept - : overlapped_op(&do_complete) - , internal(internal_) -{ - cancel_func_ = &do_cancel_impl; -} - -inline local_dgram_connect_op::local_dgram_connect_op( - win_local_dgram_socket_internal& internal_) noexcept - : overlapped_op(&do_complete) - , internal(internal_) -{ - cancel_func_ = &do_cancel_impl; -} - -inline local_dgram_send_op::local_dgram_send_op( - win_local_dgram_socket_internal& internal_) noexcept - : overlapped_op(&do_complete) - , internal(internal_) -{ - cancel_func_ = &do_cancel_impl; -} - -inline local_dgram_recv_op::local_dgram_recv_op( - win_local_dgram_socket_internal& internal_) noexcept - : overlapped_op(&do_complete) - , internal(internal_) -{ - cancel_func_ = &do_cancel_impl; -} - -inline local_dgram_wait_op::local_dgram_wait_op( - win_local_dgram_socket_internal& internal_) noexcept - : overlapped_op(&do_complete) - , internal(internal_) -{ - cancel_func_ = &do_cancel_impl; -} - -// ============================================================ -// Cancellation functions -// ============================================================ - -inline void -local_dgram_send_to_op::do_cancel_impl(overlapped_op* base) noexcept -{ - auto* op = static_cast(base); - op->cancelled.store(true, std::memory_order_release); - if (op->internal.is_open()) - { - ::CancelIoEx( - reinterpret_cast(op->internal.native_handle()), op); - } -} - -inline void -local_dgram_recv_from_op::do_cancel_impl(overlapped_op* base) noexcept -{ - auto* op = static_cast(base); - op->cancelled.store(true, std::memory_order_release); - if (op->internal.is_open()) - { - ::CancelIoEx( - reinterpret_cast(op->internal.native_handle()), op); - } -} - -inline void -local_dgram_connect_op::do_cancel_impl(overlapped_op* base) noexcept -{ - auto* op = static_cast(base); - op->cancelled.store(true, std::memory_order_release); -} - -inline void -local_dgram_send_op::do_cancel_impl(overlapped_op* base) noexcept -{ - auto* op = static_cast(base); - op->cancelled.store(true, std::memory_order_release); - if (op->internal.is_open()) - { - ::CancelIoEx( - reinterpret_cast(op->internal.native_handle()), op); - } -} - -inline void -local_dgram_recv_op::do_cancel_impl(overlapped_op* base) noexcept -{ - auto* op = static_cast(base); - op->cancelled.store(true, std::memory_order_release); - if (op->internal.is_open()) - { - ::CancelIoEx( - reinterpret_cast(op->internal.native_handle()), op); - } -} - -inline void -local_dgram_wait_op::do_cancel_impl(overlapped_op* base) noexcept -{ - auto* op = static_cast(base); - op->cancelled.store(true, std::memory_order_release); - if (op->internal.is_open()) - { - ::CancelIoEx( - reinterpret_cast(op->internal.native_handle()), op); - } - op->internal.svc_.scheduler().cancel_wait_if_constructed(op); -} - -// ============================================================ -// Completion handlers -// ============================================================ - -inline void -local_dgram_send_to_op::do_complete( - void* owner, - scheduler_op* base, - std::uint32_t /*bytes*/, - std::uint32_t /*error*/) -{ - auto* op = static_cast(base); - if (!owner) - { - op->cleanup_only(); - op->internal_ptr.reset(); - return; - } - auto prevent_premature_destruction = std::move(op->internal_ptr); - op->invoke_handler(); -} - -inline void -local_dgram_recv_from_op::do_complete( - void* owner, - scheduler_op* base, - std::uint32_t /*bytes*/, - std::uint32_t /*error*/) -{ - auto* op = static_cast(base); - if (!owner) - { - op->cleanup_only(); - op->internal_ptr.reset(); - return; - } - - bool success = - (op->dwError == 0 && !op->cancelled.load(std::memory_order_acquire)); - if (success && op->source_out) - { - *op->source_out = from_sockaddr_local( - op->source_storage, static_cast(op->source_len)); - } - - auto prevent_premature_destruction = std::move(op->internal_ptr); - op->invoke_handler(); -} - -inline void -local_dgram_connect_op::do_complete( - void* owner, - scheduler_op* base, - std::uint32_t /*bytes*/, - std::uint32_t /*error*/) -{ - auto* op = static_cast(base); - if (!owner) - { - op->cleanup_only(); - op->internal_ptr.reset(); - return; - } - - bool success = - (op->dwError == 0 && !op->cancelled.load(std::memory_order_acquire)); - if (success) - { - sockaddr_storage local_storage{}; - int local_len = sizeof(local_storage); - if (::getsockname( - op->internal.socket_, - reinterpret_cast(&local_storage), &local_len) == 0) - op->internal.local_endpoint_ = from_sockaddr_local( - local_storage, static_cast(local_len)); - op->internal.remote_endpoint_ = op->target_endpoint; - } - - auto prevent_premature_destruction = std::move(op->internal_ptr); - op->invoke_handler(); -} - -inline void -local_dgram_send_op::do_complete( - void* owner, - scheduler_op* base, - std::uint32_t /*bytes*/, - std::uint32_t /*error*/) -{ - auto* op = static_cast(base); - if (!owner) - { - op->cleanup_only(); - op->internal_ptr.reset(); - return; - } - auto prevent_premature_destruction = std::move(op->internal_ptr); - op->invoke_handler(); -} - -inline void -local_dgram_recv_op::do_complete( - void* owner, - scheduler_op* base, - std::uint32_t /*bytes*/, - std::uint32_t /*error*/) -{ - auto* op = static_cast(base); - if (!owner) - { - op->cleanup_only(); - op->internal_ptr.reset(); - return; - } - auto prevent_premature_destruction = std::move(op->internal_ptr); - op->invoke_handler(); -} - -inline void -local_dgram_wait_op::do_complete( - void* owner, - scheduler_op* base, - std::uint32_t /*bytes*/, - std::uint32_t /*error*/) -{ - auto* op = static_cast(base); - if (!owner) - { - op->cleanup_only(); - op->internal_ptr.reset(); - return; - } - auto prevent_premature_destruction = std::move(op->internal_ptr); - op->invoke_handler(); -} - -// ============================================================ -// win_local_dgram_socket_internal -// ============================================================ - -inline win_local_dgram_socket_internal::win_local_dgram_socket_internal( - win_local_dgram_service& svc) noexcept - : svc_(svc) - , wr_(*this) - , rd_(*this) - , conn_(*this) - , send_wr_(*this) - , recv_rd_(*this) - , wt_(*this) -{ -} - -inline win_local_dgram_socket_internal::~win_local_dgram_socket_internal() -{ - svc_.unregister_impl(*this); -} - -inline SOCKET -win_local_dgram_socket_internal::native_handle() const noexcept -{ - return socket_; -} - -inline corosio::local_endpoint -win_local_dgram_socket_internal::local_endpoint() const noexcept -{ - return local_endpoint_; -} - -inline corosio::local_endpoint -win_local_dgram_socket_internal::remote_endpoint() const noexcept -{ - return remote_endpoint_; -} - -inline bool -win_local_dgram_socket_internal::is_open() const noexcept -{ - return socket_ != INVALID_SOCKET; -} - -inline std::coroutine_handle<> -win_local_dgram_socket_internal::send_to( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param param, - corosio::local_endpoint dest, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes_out) -{ - wr_.internal_ptr = shared_from_this(); - - auto& op = wr_; - op.reset(); - op.h = h; - op.ex = d; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.start(token); - - svc_.work_started(); - - capy::mutable_buffer bufs[local_dgram_send_to_op::max_buffers]; - op.wsabuf_count = - static_cast(param.copy_to(bufs, local_dgram_send_to_op::max_buffers)); - - for (DWORD i = 0; i < op.wsabuf_count; ++i) - { - op.wsabufs[i].buf = static_cast(bufs[i].data()); - op.wsabufs[i].len = static_cast(bufs[i].size()); - } - - op.dest_len = static_cast(to_sockaddr(dest, op.dest_storage)); - - int result = ::WSASendTo( - socket_, op.wsabufs, op.wsabuf_count, nullptr, - local_dgram_to_native_msg_flags(flags), - reinterpret_cast(&op.dest_storage), op.dest_len, &op, - nullptr); - - if (result == SOCKET_ERROR) - { - DWORD err = ::WSAGetLastError(); - if (err != WSA_IO_PENDING) - { - svc_.on_completion(&op, err, 0); - return std::noop_coroutine(); - } - } - - svc_.on_pending(&op); - - if (op.cancelled.load(std::memory_order_acquire)) - ::CancelIoEx(reinterpret_cast(socket_), &op); - - return std::noop_coroutine(); -} - -inline std::coroutine_handle<> -win_local_dgram_socket_internal::recv_from( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param param, - corosio::local_endpoint* source, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes_out) -{ - rd_.internal_ptr = shared_from_this(); - - auto& op = rd_; - op.reset(); - op.h = h; - op.ex = d; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.source_out = source; - op.start(token); - - svc_.work_started(); - - capy::mutable_buffer bufs[local_dgram_recv_from_op::max_buffers]; - op.wsabuf_count = - static_cast(param.copy_to(bufs, local_dgram_recv_from_op::max_buffers)); - - if (op.wsabuf_count == 0 || (op.wsabuf_count == 1 && bufs[0].size() == 0)) - { - op.empty_buffer = true; - svc_.on_completion(&op, 0, 0); - return std::noop_coroutine(); - } - - for (DWORD i = 0; i < op.wsabuf_count; ++i) - { - op.wsabufs[i].buf = static_cast(bufs[i].data()); - op.wsabufs[i].len = static_cast(bufs[i].size()); - } - - op.flags = local_dgram_to_native_msg_flags(flags); - std::memset(&op.source_storage, 0, sizeof(op.source_storage)); - op.source_len = sizeof(op.source_storage); - - int result = ::WSARecvFrom( - socket_, op.wsabufs, op.wsabuf_count, nullptr, &op.flags, - reinterpret_cast(&op.source_storage), &op.source_len, &op, - nullptr); - - if (result == SOCKET_ERROR) - { - DWORD err = ::WSAGetLastError(); - if (err != WSA_IO_PENDING) - { - svc_.on_completion(&op, err, 0); - return std::noop_coroutine(); - } - } - - svc_.on_pending(&op); - - if (op.cancelled.load(std::memory_order_acquire)) - ::CancelIoEx(reinterpret_cast(socket_), &op); - - return std::noop_coroutine(); -} - -// Datagram connect is synchronous on Windows -inline std::coroutine_handle<> -win_local_dgram_socket_internal::connect( - std::coroutine_handle<> h, - capy::executor_ref d, - corosio::local_endpoint ep, - std::stop_token token, - std::error_code* ec) -{ - conn_.internal_ptr = shared_from_this(); - - auto& op = conn_; - op.reset(); - op.h = h; - op.ex = d; - op.ec_out = ec; - op.target_endpoint = ep; - op.start(token); - - svc_.work_started(); - - sockaddr_storage storage{}; - socklen_t addrlen = detail::to_sockaddr(ep, storage); - int result = ::WSAConnect( - socket_, reinterpret_cast(&storage), - static_cast(addrlen), nullptr, nullptr, nullptr, nullptr); - - if (result == SOCKET_ERROR) - svc_.on_completion(&op, ::WSAGetLastError(), 0); - else - svc_.on_completion(&op, 0, 0); - - return std::noop_coroutine(); -} - -inline std::coroutine_handle<> -win_local_dgram_socket_internal::send( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param param, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes_out) -{ - send_wr_.internal_ptr = shared_from_this(); - - auto& op = send_wr_; - op.reset(); - op.h = h; - op.ex = d; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.start(token); - - svc_.work_started(); - - capy::mutable_buffer bufs[local_dgram_send_op::max_buffers]; - op.wsabuf_count = - static_cast(param.copy_to(bufs, local_dgram_send_op::max_buffers)); - - for (DWORD i = 0; i < op.wsabuf_count; ++i) - { - op.wsabufs[i].buf = static_cast(bufs[i].data()); - op.wsabufs[i].len = static_cast(bufs[i].size()); - } - - int result = ::WSASend( - socket_, op.wsabufs, op.wsabuf_count, nullptr, - local_dgram_to_native_msg_flags(flags), &op, nullptr); - - if (result == SOCKET_ERROR) - { - DWORD err = ::WSAGetLastError(); - if (err != WSA_IO_PENDING) - { - svc_.on_completion(&op, err, 0); - return std::noop_coroutine(); - } - } - - svc_.on_pending(&op); - - if (op.cancelled.load(std::memory_order_acquire)) - ::CancelIoEx(reinterpret_cast(socket_), &op); - - return std::noop_coroutine(); -} - -inline std::coroutine_handle<> -win_local_dgram_socket_internal::recv( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param param, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes_out) -{ - recv_rd_.internal_ptr = shared_from_this(); - - auto& op = recv_rd_; - op.reset(); - op.h = h; - op.ex = d; - op.ec_out = ec; - op.bytes_out = bytes_out; - op.start(token); - - svc_.work_started(); - - capy::mutable_buffer bufs[local_dgram_recv_op::max_buffers]; - op.wsabuf_count = - static_cast(param.copy_to(bufs, local_dgram_recv_op::max_buffers)); - - if (op.wsabuf_count == 0 || (op.wsabuf_count == 1 && bufs[0].size() == 0)) - { - op.empty_buffer = true; - svc_.on_completion(&op, 0, 0); - return std::noop_coroutine(); - } - - for (DWORD i = 0; i < op.wsabuf_count; ++i) - { - op.wsabufs[i].buf = static_cast(bufs[i].data()); - op.wsabufs[i].len = static_cast(bufs[i].size()); - } - - op.flags = local_dgram_to_native_msg_flags(flags); - - int result = ::WSARecv( - socket_, op.wsabufs, op.wsabuf_count, nullptr, &op.flags, &op, nullptr); - - if (result == SOCKET_ERROR) - { - DWORD err = ::WSAGetLastError(); - if (err != WSA_IO_PENDING) - { - svc_.on_completion(&op, err, 0); - return std::noop_coroutine(); - } - } - - svc_.on_pending(&op); - - if (op.cancelled.load(std::memory_order_acquire)) - ::CancelIoEx(reinterpret_cast(socket_), &op); - - return std::noop_coroutine(); -} - -inline std::coroutine_handle<> -win_local_dgram_socket_internal::wait( - std::coroutine_handle<> h, - capy::executor_ref d, - wait_type w, - std::stop_token token, - std::error_code* ec) -{ - wt_.internal_ptr = shared_from_this(); - - auto& op = wt_; - op.reset(); - op.h = h; - op.ex = d; - op.ec_out = ec; - op.bytes_out = nullptr; - op.start(token); - - svc_.work_started(); - - if (w == wait_type::write) - { - svc_.on_completion(&op, 0, 0); - return std::noop_coroutine(); - } - - // Datagram wait_read and wait_error route through the auxiliary - // select reactor. - svc_.scheduler().wait_reactor().register_wait(socket_, w, &op); - return std::noop_coroutine(); -} - -inline void -win_local_dgram_socket_internal::cancel() noexcept -{ - if (socket_ != INVALID_SOCKET) - { - ::CancelIoEx(reinterpret_cast(socket_), nullptr); - } - - wr_.request_cancel(); - rd_.request_cancel(); - conn_.request_cancel(); - send_wr_.request_cancel(); - recv_rd_.request_cancel(); - wt_.request_cancel(); - svc_.scheduler().cancel_wait_if_constructed(&wt_); -} - -inline void -win_local_dgram_socket_internal::close_socket() noexcept -{ - wt_.request_cancel(); - svc_.scheduler().cancel_wait_if_constructed(&wt_); - - if (socket_ != INVALID_SOCKET) - { - ::CancelIoEx(reinterpret_cast(socket_), nullptr); - ::closesocket(socket_); - socket_ = INVALID_SOCKET; - } - - local_endpoint_ = corosio::local_endpoint{}; - remote_endpoint_ = corosio::local_endpoint{}; -} - -// ============================================================ -// win_local_dgram_socket (wrapper) -// ============================================================ - -inline win_local_dgram_socket::win_local_dgram_socket( - std::shared_ptr internal) noexcept - : internal_(std::move(internal)) -{ -} - -inline void -win_local_dgram_socket::close_internal() noexcept -{ - if (internal_) - { - internal_->close_socket(); - internal_.reset(); - } -} - -inline std::coroutine_handle<> -win_local_dgram_socket::send_to( - std::coroutine_handle<> h, capy::executor_ref d, - buffer_param buf, corosio::local_endpoint dest, int flags, - std::stop_token token, std::error_code* ec, std::size_t* bytes) -{ - return internal_->send_to(h, d, buf, dest, flags, token, ec, bytes); -} - -inline std::coroutine_handle<> -win_local_dgram_socket::recv_from( - std::coroutine_handle<> h, capy::executor_ref d, - buffer_param buf, corosio::local_endpoint* source, int flags, - std::stop_token token, std::error_code* ec, std::size_t* bytes) -{ - return internal_->recv_from(h, d, buf, source, flags, token, ec, bytes); -} - -inline std::coroutine_handle<> -win_local_dgram_socket::connect( - std::coroutine_handle<> h, capy::executor_ref d, - corosio::local_endpoint ep, std::stop_token token, std::error_code* ec) -{ - return internal_->connect(h, d, ep, token, ec); -} - -inline std::coroutine_handle<> -win_local_dgram_socket::send( - std::coroutine_handle<> h, capy::executor_ref d, - buffer_param buf, int flags, - std::stop_token token, std::error_code* ec, std::size_t* bytes) -{ - return internal_->send(h, d, buf, flags, token, ec, bytes); -} - -inline std::coroutine_handle<> -win_local_dgram_socket::recv( - std::coroutine_handle<> h, capy::executor_ref d, - buffer_param buf, int flags, - std::stop_token token, std::error_code* ec, std::size_t* bytes) -{ - return internal_->recv(h, d, buf, flags, token, ec, bytes); -} - -inline std::coroutine_handle<> -win_local_dgram_socket::wait( - std::coroutine_handle<> h, - capy::executor_ref d, - wait_type w, - std::stop_token token, - std::error_code* ec) -{ - return internal_->wait(h, d, w, token, ec); -} - -inline std::error_code -win_local_dgram_socket::bind(corosio::local_endpoint ep) noexcept -{ - if (ep.is_abstract()) - return std::make_error_code(std::errc::operation_not_supported); - - SOCKET sock = internal_->socket_; - - sockaddr_storage storage{}; - socklen_t addrlen = detail::to_sockaddr(ep, storage); - if (::bind( - sock, reinterpret_cast(&storage), - static_cast(addrlen)) == SOCKET_ERROR) - return make_err(::WSAGetLastError()); - - internal_->local_endpoint_ = ep; - return {}; -} - -inline std::error_code -win_local_dgram_socket::shutdown( - local_datagram_socket::shutdown_type what) noexcept -{ - int how; - switch (what) - { - case local_datagram_socket::shutdown_receive: - how = SD_RECEIVE; - break; - case local_datagram_socket::shutdown_send: - how = SD_SEND; - break; - case local_datagram_socket::shutdown_both: - how = SD_BOTH; - break; - default: - return make_err(WSAEINVAL); - } - if (::shutdown(internal_->native_handle(), how) != 0) - return make_err(WSAGetLastError()); - return {}; -} - -inline native_handle_type -win_local_dgram_socket::native_handle() const noexcept -{ - return static_cast(internal_->native_handle()); -} - -inline native_handle_type -win_local_dgram_socket::release_socket() noexcept -{ - SOCKET s = internal_->socket_; - if (s != INVALID_SOCKET) - { - internal_->cancel(); - internal_->socket_ = INVALID_SOCKET; - internal_->local_endpoint_ = corosio::local_endpoint{}; - internal_->remote_endpoint_ = corosio::local_endpoint{}; - } - return static_cast(s); -} - -inline std::error_code -win_local_dgram_socket::set_option( - int level, int optname, void const* data, std::size_t size) noexcept -{ - if (::setsockopt( - internal_->native_handle(), level, optname, - reinterpret_cast(data), static_cast(size)) != 0) - return make_err(WSAGetLastError()); - return {}; -} - -inline std::error_code -win_local_dgram_socket::get_option( - int level, int optname, void* data, std::size_t* size) const noexcept -{ - int len = static_cast(*size); - if (::getsockopt( - internal_->native_handle(), level, optname, - reinterpret_cast(data), &len) != 0) - return make_err(WSAGetLastError()); - *size = static_cast(len); - return {}; -} - -inline corosio::local_endpoint -win_local_dgram_socket::local_endpoint() const noexcept -{ - return internal_->local_endpoint(); -} - -inline corosio::local_endpoint -win_local_dgram_socket::remote_endpoint() const noexcept -{ - return internal_->remote_endpoint(); -} - -inline void -win_local_dgram_socket::cancel() noexcept -{ - internal_->cancel(); -} - -inline win_local_dgram_socket_internal* -win_local_dgram_socket::get_internal() const noexcept -{ - return internal_.get(); -} - -// ============================================================ -// win_local_dgram_service -// ============================================================ - -inline win_local_dgram_service::win_local_dgram_service( - capy::execution_context& ctx) - : sched_(ctx.use_service()) - , iocp_(sched_.native_handle()) -{ -} - -inline win_local_dgram_service::~win_local_dgram_service() -{ - for (auto* w = wrapper_list_.pop_front(); w != nullptr; - w = wrapper_list_.pop_front()) - delete w; -} - -inline void -win_local_dgram_service::shutdown() -{ - std::lock_guard lock(mutex_); - - for (auto* impl = socket_list_.pop_front(); impl != nullptr; - impl = socket_list_.pop_front()) - { - impl->close_socket(); - } -} - -inline io_object::implementation* -win_local_dgram_service::construct() -{ - auto internal = std::make_shared(*this); - - { - std::lock_guard lock(mutex_); - socket_list_.push_back(internal.get()); - } - - auto* wrapper = new win_local_dgram_socket(std::move(internal)); - - { - std::lock_guard lock(mutex_); - wrapper_list_.push_back(wrapper); - } - - return wrapper; -} - -inline void -win_local_dgram_service::destroy(io_object::implementation* p) -{ - if (p) - { - auto& wrapper = static_cast(*p); - wrapper.close_internal(); - destroy_impl(wrapper); - } -} - -inline void -win_local_dgram_service::close(io_object::handle& h) -{ - auto& wrapper = static_cast(*h.get()); - wrapper.get_internal()->close_socket(); -} - -inline void -win_local_dgram_service::destroy_impl(win_local_dgram_socket& impl) -{ - { - std::lock_guard lock(mutex_); - wrapper_list_.remove(&impl); - } - delete &impl; -} - -inline void -win_local_dgram_service::unregister_impl( - win_local_dgram_socket_internal& impl) -{ - std::lock_guard lock(mutex_); - socket_list_.remove(&impl); -} - -inline std::error_code -win_local_dgram_service::open_socket( - local_datagram_socket::implementation& impl, - int family, int type, int protocol) -{ - auto& wrapper = static_cast(impl); - return open_socket_internal(*wrapper.get_internal(), family, type, protocol); -} - -inline std::error_code -win_local_dgram_service::assign_socket( - local_datagram_socket::implementation& /*impl*/, native_handle_type /*fd*/) -{ - return std::make_error_code(std::errc::operation_not_supported); -} - -inline std::error_code -win_local_dgram_service::bind_socket( - local_datagram_socket::implementation& impl, - corosio::local_endpoint ep) -{ - // Reject abstract sockets on Windows - if (ep.is_abstract()) - return std::make_error_code(std::errc::operation_not_supported); - - auto& wrapper = static_cast(impl); - auto* internal = wrapper.get_internal(); - SOCKET sock = internal->socket_; - - sockaddr_storage storage{}; - socklen_t addrlen = detail::to_sockaddr(ep, storage); - if (::bind( - sock, reinterpret_cast(&storage), - static_cast(addrlen)) == SOCKET_ERROR) - return make_err(::WSAGetLastError()); - - internal->local_endpoint_ = ep; - return {}; -} - -inline std::error_code -win_local_dgram_service::open_socket_internal( - win_local_dgram_socket_internal& impl, - int family, int type, int protocol) -{ - impl.close_socket(); - - SOCKET sock = - ::WSASocketW(family, type, protocol, nullptr, 0, WSA_FLAG_OVERLAPPED); - - if (sock == INVALID_SOCKET) - return make_err(::WSAGetLastError()); - - HANDLE result = ::CreateIoCompletionPort( - reinterpret_cast(sock), static_cast(iocp_), key_io, 0); - - if (result == nullptr) - { - DWORD dwError = ::GetLastError(); - ::closesocket(sock); - return make_err(dwError); - } - - impl.socket_ = sock; - return {}; -} - -inline void -win_local_dgram_service::post(overlapped_op* op) -{ - sched_.post(op); -} - -inline void -win_local_dgram_service::on_pending(overlapped_op* op) noexcept -{ - sched_.on_pending(op); -} - -inline void -win_local_dgram_service::on_completion( - overlapped_op* op, DWORD error, DWORD bytes) noexcept -{ - sched_.on_completion(op, error, bytes); -} - -inline void -win_local_dgram_service::work_started() noexcept -{ - sched_.work_started(); -} - -inline void -win_local_dgram_service::work_finished() noexcept -{ - sched_.work_finished(); -} - -} // namespace boost::corosio::detail - -#endif // BOOST_COROSIO_HAS_IOCP - -#endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SERVICE_HPP diff --git a/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp b/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp deleted file mode 100644 index e62f29cd9..000000000 --- a/include/boost/corosio/native/detail/iocp/win_local_dgram_socket.hpp +++ /dev/null @@ -1,341 +0,0 @@ -// -// Copyright (c) 2026 Michael Vandeberg -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// -// Official repository: https://github.com/cppalliance/corosio -// - -#ifndef BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SOCKET_HPP -#define BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SOCKET_HPP - -#include - -#if BOOST_COROSIO_HAS_IOCP - -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -namespace boost::corosio::detail { - -class win_local_dgram_service; -class win_local_dgram_socket_internal; - -/** Send-to operation for local datagram sockets. */ -struct local_dgram_send_to_op : overlapped_op -{ - static constexpr std::size_t max_buffers = 16; - WSABUF wsabufs[max_buffers]; - DWORD wsabuf_count = 0; - sockaddr_storage dest_storage{}; - int dest_len = 0; - win_local_dgram_socket_internal& internal; - std::shared_ptr internal_ptr; - - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t bytes, - std::uint32_t error); - static void do_cancel_impl(overlapped_op* op) noexcept; - - explicit local_dgram_send_to_op( - win_local_dgram_socket_internal& internal_) noexcept; -}; - -/** Recv-from operation for local datagram sockets. */ -struct local_dgram_recv_from_op : overlapped_op -{ - static constexpr std::size_t max_buffers = 16; - WSABUF wsabufs[max_buffers]; - DWORD wsabuf_count = 0; - DWORD flags = 0; - sockaddr_storage source_storage{}; - INT source_len = sizeof(sockaddr_storage); - corosio::local_endpoint* source_out = nullptr; - win_local_dgram_socket_internal& internal; - std::shared_ptr internal_ptr; - - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t bytes, - std::uint32_t error); - static void do_cancel_impl(overlapped_op* op) noexcept; - - explicit local_dgram_recv_from_op( - win_local_dgram_socket_internal& internal_) noexcept; -}; - -/** Connect operation for connected-mode local datagrams. */ -struct local_dgram_connect_op : overlapped_op -{ - corosio::local_endpoint target_endpoint; - win_local_dgram_socket_internal& internal; - std::shared_ptr internal_ptr; - - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t bytes, - std::uint32_t error); - static void do_cancel_impl(overlapped_op* op) noexcept; - - explicit local_dgram_connect_op( - win_local_dgram_socket_internal& internal_) noexcept; -}; - -/** Connected send operation for local datagrams. */ -struct local_dgram_send_op : overlapped_op -{ - static constexpr std::size_t max_buffers = 16; - WSABUF wsabufs[max_buffers]; - DWORD wsabuf_count = 0; - win_local_dgram_socket_internal& internal; - std::shared_ptr internal_ptr; - - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t bytes, - std::uint32_t error); - static void do_cancel_impl(overlapped_op* op) noexcept; - - explicit local_dgram_send_op( - win_local_dgram_socket_internal& internal_) noexcept; -}; - -/** Connected recv operation for local datagrams. */ -struct local_dgram_recv_op : overlapped_op -{ - static constexpr std::size_t max_buffers = 16; - WSABUF wsabufs[max_buffers]; - DWORD wsabuf_count = 0; - DWORD flags = 0; - win_local_dgram_socket_internal& internal; - std::shared_ptr internal_ptr; - - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t bytes, - std::uint32_t error); - static void do_cancel_impl(overlapped_op* op) noexcept; - - explicit local_dgram_recv_op( - win_local_dgram_socket_internal& internal_) noexcept; -}; - -/** Readiness-wait operation for local datagram sockets. */ -struct local_dgram_wait_op : overlapped_op -{ - win_local_dgram_socket_internal& internal; - std::shared_ptr internal_ptr; - - static void do_complete( - void* owner, - scheduler_op* base, - std::uint32_t bytes, - std::uint32_t error); - static void do_cancel_impl(overlapped_op* op) noexcept; - - explicit local_dgram_wait_op( - win_local_dgram_socket_internal& internal_) noexcept; -}; - -/* Internal local datagram socket state for IOCP. */ -class win_local_dgram_socket_internal - : public intrusive_list::node - , public std::enable_shared_from_this -{ - friend class win_local_dgram_service; - friend class win_local_dgram_socket; - friend struct local_dgram_send_to_op; - friend struct local_dgram_recv_from_op; - friend struct local_dgram_connect_op; - friend struct local_dgram_send_op; - friend struct local_dgram_recv_op; - friend struct local_dgram_wait_op; - - win_local_dgram_service& svc_; - local_dgram_send_to_op wr_; - local_dgram_recv_from_op rd_; - local_dgram_connect_op conn_; - local_dgram_send_op send_wr_; - local_dgram_recv_op recv_rd_; - local_dgram_wait_op wt_; - SOCKET socket_ = INVALID_SOCKET; - -public: - explicit win_local_dgram_socket_internal( - win_local_dgram_service& svc) noexcept; - ~win_local_dgram_socket_internal(); - - std::coroutine_handle<> send_to( - std::coroutine_handle<>, - capy::executor_ref, - buffer_param, - corosio::local_endpoint, - int flags, - std::stop_token, - std::error_code*, - std::size_t*); - - std::coroutine_handle<> recv_from( - std::coroutine_handle<>, - capy::executor_ref, - buffer_param, - corosio::local_endpoint*, - int flags, - std::stop_token, - std::error_code*, - std::size_t*); - - std::coroutine_handle<> connect( - std::coroutine_handle<>, - capy::executor_ref, - corosio::local_endpoint, - std::stop_token, - std::error_code*); - - std::coroutine_handle<> send( - std::coroutine_handle<>, - capy::executor_ref, - buffer_param, - int flags, - std::stop_token, - std::error_code*, - std::size_t*); - - std::coroutine_handle<> recv( - std::coroutine_handle<>, - capy::executor_ref, - buffer_param, - int flags, - std::stop_token, - std::error_code*, - std::size_t*); - - std::coroutine_handle<> wait( - std::coroutine_handle<>, - capy::executor_ref, - wait_type, - std::stop_token, - std::error_code*); - - SOCKET native_handle() const noexcept; - corosio::local_endpoint local_endpoint() const noexcept; - corosio::local_endpoint remote_endpoint() const noexcept; - bool is_open() const noexcept; - void cancel() noexcept; - void close_socket() noexcept; - -private: - corosio::local_endpoint local_endpoint_; - corosio::local_endpoint remote_endpoint_; -}; - -/* Local datagram socket wrapper for IOCP. */ -class win_local_dgram_socket final - : public local_datagram_socket::implementation - , public intrusive_list::node -{ - std::shared_ptr internal_; - -public: - explicit win_local_dgram_socket( - std::shared_ptr internal) noexcept; - - void close_internal() noexcept; - - std::coroutine_handle<> send_to( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param buf, - corosio::local_endpoint dest, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes) override; - - std::coroutine_handle<> recv_from( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param buf, - corosio::local_endpoint* source, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes) override; - - std::coroutine_handle<> connect( - std::coroutine_handle<> h, - capy::executor_ref d, - corosio::local_endpoint ep, - std::stop_token token, - std::error_code* ec) override; - - std::coroutine_handle<> send( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param buf, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes) override; - - std::coroutine_handle<> recv( - std::coroutine_handle<> h, - capy::executor_ref d, - buffer_param buf, - int flags, - std::stop_token token, - std::error_code* ec, - std::size_t* bytes) override; - - std::coroutine_handle<> wait( - std::coroutine_handle<> h, - capy::executor_ref d, - wait_type w, - std::stop_token token, - std::error_code* ec) override; - - std::error_code bind(corosio::local_endpoint ep) noexcept override; - - std::error_code shutdown( - local_datagram_socket::shutdown_type what) noexcept override; - - native_handle_type native_handle() const noexcept override; - - native_handle_type release_socket() noexcept override; - - std::error_code set_option( - int level, - int optname, - void const* data, - std::size_t size) noexcept override; - std::error_code - get_option(int level, int optname, void* data, std::size_t* size) - const noexcept override; - - corosio::local_endpoint local_endpoint() const noexcept override; - corosio::local_endpoint remote_endpoint() const noexcept override; - void cancel() noexcept override; - - win_local_dgram_socket_internal* get_internal() const noexcept; -}; - -} // namespace boost::corosio::detail - -#endif // BOOST_COROSIO_HAS_IOCP - -#endif // BOOST_COROSIO_NATIVE_DETAIL_IOCP_WIN_LOCAL_DGRAM_SOCKET_HPP diff --git a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp index c4a32b06d..78821fc34 100644 --- a/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp +++ b/include/boost/corosio/native/detail/iocp/win_local_stream_service.hpp @@ -935,10 +935,26 @@ win_local_stream_service::open_socket( inline std::error_code win_local_stream_service::assign_socket( - local_stream_socket::implementation& /*impl*/, native_handle_type /*fd*/) + local_stream_socket::implementation& impl, native_handle_type fd) { - // socketpair / assign is POSIX-only - return std::make_error_code(std::errc::operation_not_supported); + auto& wrapper = static_cast(impl); + auto& internal = *wrapper.get_internal(); + + internal.close_socket(); + + SOCKET sock = static_cast(fd); + + HANDLE result = ::CreateIoCompletionPort( + reinterpret_cast(sock), static_cast(iocp_), key_io, 0); + + if (result == nullptr) + { + DWORD dwError = ::GetLastError(); + return make_err(dwError); + } + + internal.socket_ = sock; + return {}; } inline std::error_code diff --git a/include/boost/corosio/test/temp_path.hpp b/include/boost/corosio/test/temp_path.hpp new file mode 100644 index 000000000..d63980ac0 --- /dev/null +++ b/include/boost/corosio/test/temp_path.hpp @@ -0,0 +1,106 @@ +// +// Copyright (c) 2026 Michael Vandeberg +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/corosio +// + +#ifndef BOOST_COROSIO_TEST_TEMP_PATH_HPP +#define BOOST_COROSIO_TEST_TEMP_PATH_HPP + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace boost::corosio::test { + +/** RAII temp directory holding a path for a Unix-domain socket. + + Creates a unique empty directory under + `std::filesystem::temp_directory_path()` and exposes a path under + it suitable for binding `local_stream_socket` / + `local_datagram_socket`. The destructor removes the directory + (and the bound socket file inside it) recursively, so tests that + throw mid-execution still clean up. + + Naming entropy comes from a process-wide atomic counter mixed with + a one-time `random_device` seed; that's enough to avoid collisions + between parallel test runs without requiring cryptographic + randomness. The constructor retries on collision and throws if it + cannot create a directory in a reasonable number of attempts. + + Platform note: the helper exists so tests don't need to call + `mkdtemp` / `unlink` / `rmdir` directly. On Windows, the path is + a filesystem AF_UNIX path (Windows 10 1803+). +*/ +class temp_socket_dir +{ +public: + temp_socket_dir() + { + namespace fs = std::filesystem; + auto const base = fs::temp_directory_path(); + + // 64 bits of mixed entropy: a random seed established once + // at static init, XORed with a monotonic counter. + static std::uint64_t const seed = [] { + std::random_device rd; + return (static_cast(rd()) << 32) | + static_cast(rd()); + }(); + static std::atomic counter{0}; + + std::error_code ec; + for (int tries = 0; tries < 32; ++tries) + { + auto const n = counter.fetch_add(1, std::memory_order_relaxed); + auto const tag = seed ^ n; + + char buf[32]; + std::snprintf( + buf, sizeof(buf), "corosio_test_%016llx", + static_cast(tag)); + + auto candidate = base / buf; + if (fs::create_directory(candidate, ec)) + { + dir_ = std::move(candidate); + return; + } + } + throw std::runtime_error( + "temp_socket_dir: could not create temp directory"); + } + + ~temp_socket_dir() noexcept + { + if (!dir_.empty()) + { + std::error_code ec; + std::filesystem::remove_all(dir_, ec); + } + } + + temp_socket_dir(temp_socket_dir const&) = delete; + temp_socket_dir& operator=(temp_socket_dir const&) = delete; + + /// Path suitable for binding a local socket. + std::string path() const + { + return (dir_ / "sock").string(); + } + +private: + std::filesystem::path dir_; +}; + +} // namespace boost::corosio::test + +#endif // BOOST_COROSIO_TEST_TEMP_PATH_HPP diff --git a/src/corosio/src/io_context.cpp b/src/corosio/src/io_context.cpp index 268c9c8a7..bab1f1ade 100644 --- a/src/corosio/src/io_context.cpp +++ b/src/corosio/src/io_context.cpp @@ -33,7 +33,6 @@ #include #include #include -#include #include #include #include @@ -108,7 +107,6 @@ iocp_t::construct(capy::execution_context& ctx, unsigned concurrency_hint) auto& local_svc = ctx.make_service(tcp_svc); ctx.make_service(local_svc); - ctx.make_service(); ctx.make_service(); ctx.make_service(); ctx.make_service(); diff --git a/src/corosio/src/local_datagram.cpp b/src/corosio/src/local_datagram.cpp index 31b841550..8f305e8de 100644 --- a/src/corosio/src/local_datagram.cpp +++ b/src/corosio/src/local_datagram.cpp @@ -11,35 +11,22 @@ #include #if BOOST_COROSIO_POSIX + #include #include -#elif BOOST_COROSIO_HAS_IOCP -#include -#ifndef AF_UNIX -#define AF_UNIX 1 -#endif -#endif namespace boost::corosio { int local_datagram::family() noexcept { -#if BOOST_COROSIO_POSIX || BOOST_COROSIO_HAS_IOCP return AF_UNIX; -#else - return 0; -#endif } int local_datagram::type() noexcept { -#if BOOST_COROSIO_POSIX || BOOST_COROSIO_HAS_IOCP return SOCK_DGRAM; -#else - return 0; -#endif } int @@ -49,3 +36,5 @@ local_datagram::protocol() noexcept } } // namespace boost::corosio + +#endif // BOOST_COROSIO_POSIX diff --git a/src/corosio/src/local_datagram_socket.cpp b/src/corosio/src/local_datagram_socket.cpp index e68758f46..ea2be7d71 100644 --- a/src/corosio/src/local_datagram_socket.cpp +++ b/src/corosio/src/local_datagram_socket.cpp @@ -9,17 +9,13 @@ #include -#if BOOST_COROSIO_POSIX || BOOST_COROSIO_HAS_IOCP +#if BOOST_COROSIO_POSIX #include #include #include -#if BOOST_COROSIO_POSIX #include -#elif BOOST_COROSIO_HAS_IOCP -#include -#endif namespace boost::corosio { @@ -113,11 +109,7 @@ native_handle_type local_datagram_socket::native_handle() const noexcept { if (!is_open()) -#if BOOST_COROSIO_HAS_IOCP - return ~native_handle_type(0); -#else return -1; -#endif return get().native_handle(); } @@ -134,22 +126,12 @@ local_datagram_socket::available() const { if (!is_open()) detail::throw_logic_error("available: socket not open"); -#if BOOST_COROSIO_HAS_IOCP - u_long value = 0; - if (::ioctlsocket( - static_cast(native_handle()), FIONREAD, &value) != 0) - detail::throw_system_error( - std::error_code(::WSAGetLastError(), std::system_category()), - "local_datagram_socket::available"); - return static_cast(value); -#else int value = 0; if (::ioctl(native_handle(), FIONREAD, &value) < 0) detail::throw_system_error( std::error_code(errno, std::system_category()), "local_datagram_socket::available"); return static_cast(value); -#endif } local_endpoint @@ -170,4 +152,4 @@ local_datagram_socket::remote_endpoint() const noexcept } // namespace boost::corosio -#endif // BOOST_COROSIO_POSIX || BOOST_COROSIO_HAS_IOCP +#endif // BOOST_COROSIO_POSIX diff --git a/src/corosio/src/local_socket_pair.cpp b/src/corosio/src/local_socket_pair.cpp index 9342db5b4..56cd060f6 100644 --- a/src/corosio/src/local_socket_pair.cpp +++ b/src/corosio/src/local_socket_pair.cpp @@ -131,4 +131,129 @@ make_local_datagram_pair(io_context& ctx) } // namespace boost::corosio -#endif // BOOST_COROSIO_POSIX +#elif BOOST_COROSIO_HAS_IOCP + +// Windows: emulate socketpair(AF_UNIX, SOCK_STREAM) via a temporary +// listener socket. Create a listening socket bound to a unique temp +// path, connect a client, accept the peer, then close the listener +// and remove the temp path. + +#include +#include +#include +#include + +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include + +#include +#include + +namespace boost::corosio { + +namespace { + +std::error_code +make_wsa_error() +{ + return std::error_code(::WSAGetLastError(), std::system_category()); +} + +} // namespace + +std::pair +make_local_stream_pair(io_context& ctx) +{ + // Create a unique temp directory + path for the listener. + test::temp_socket_dir tmp; + auto path = tmp.path(); + local_endpoint ep(path); + + // Build the sockaddr. + sockaddr_storage storage{}; + socklen_t addrlen = detail::to_sockaddr(ep, storage); + + // Create listener socket. + SOCKET listener = ::socket(AF_UNIX, SOCK_STREAM, 0); + if (listener == INVALID_SOCKET) + throw std::system_error(make_wsa_error(), "socket(listener)"); + + // bind + listen + if (::bind(listener, reinterpret_cast(&storage), + static_cast(addrlen)) == SOCKET_ERROR) + { + auto ec = make_wsa_error(); + ::closesocket(listener); + throw std::system_error(ec, "bind(listener)"); + } + + if (::listen(listener, 1) == SOCKET_ERROR) + { + auto ec = make_wsa_error(); + ::closesocket(listener); + throw std::system_error(ec, "listen"); + } + + // Create client socket and connect. + SOCKET client = ::socket(AF_UNIX, SOCK_STREAM, 0); + if (client == INVALID_SOCKET) + { + auto ec = make_wsa_error(); + ::closesocket(listener); + throw std::system_error(ec, "socket(client)"); + } + + if (::connect(client, reinterpret_cast(&storage), + static_cast(addrlen)) == SOCKET_ERROR) + { + auto ec = make_wsa_error(); + ::closesocket(client); + ::closesocket(listener); + throw std::system_error(ec, "connect"); + } + + // Accept the peer. + SOCKET server = ::accept(listener, nullptr, nullptr); + if (server == INVALID_SOCKET) + { + auto ec = make_wsa_error(); + ::closesocket(client); + ::closesocket(listener); + throw std::system_error(ec, "accept"); + } + + // Listener is no longer needed. + ::closesocket(listener); + + // Wrap the raw SOCKETs into local_stream_socket objects. + // assign() registers the handle with the IOCP port. + try + { + local_stream_socket s1(ctx); + local_stream_socket s2(ctx); + + s1.assign(static_cast(client)); + client = INVALID_SOCKET; + s2.assign(static_cast(server)); + server = INVALID_SOCKET; + + return {std::move(s1), std::move(s2)}; + } + catch (...) + { + if (client != INVALID_SOCKET) + ::closesocket(client); + if (server != INVALID_SOCKET) + ::closesocket(server); + throw; + } +} + +} // namespace boost::corosio + +#endif // BOOST_COROSIO_HAS_IOCP diff --git a/test/unit/local_datagram_socket.cpp b/test/unit/local_datagram_socket.cpp index b87341381..c34bfd908 100644 --- a/test/unit/local_datagram_socket.cpp +++ b/test/unit/local_datagram_socket.cpp @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -23,37 +24,11 @@ #include #include -#include -#include - #include "context.hpp" #include "test_suite.hpp" namespace boost::corosio { -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_test_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace - template struct local_datagram_socket_test { @@ -141,11 +116,11 @@ struct local_datagram_socket_test local_datagram_socket sock(ioc); sock.open(); - auto path = make_temp_socket_path(); + test::temp_socket_dir tmp; + auto path = tmp.path(); auto ec = sock.bind(local_endpoint(path)); BOOST_TEST_EQ(!ec, true); - cleanup_path(path); } void testSendToRecvFrom() @@ -153,8 +128,10 @@ struct local_datagram_socket_test io_context ioc(Backend); auto ex = ioc.get_executor(); - auto path1 = make_temp_socket_path(); - auto path2 = make_temp_socket_path(); + test::temp_socket_dir tmp1; + test::temp_socket_dir tmp2; + auto path1 = tmp1.path(); + auto path2 = tmp2.path(); local_datagram_socket s1(ioc); local_datagram_socket s2(ioc); @@ -212,8 +189,6 @@ struct local_datagram_socket_test // Source endpoint should be the sender's bound path BOOST_TEST_EQ(source.path(), path1); - cleanup_path(path1); - cleanup_path(path2); } void testBindFailure() @@ -440,8 +415,10 @@ struct local_datagram_socket_test io_context ioc(Backend); auto ex = ioc.get_executor(); - auto path1 = make_temp_socket_path(); - auto path2 = make_temp_socket_path(); + test::temp_socket_dir tmp1; + test::temp_socket_dir tmp2; + auto path1 = tmp1.path(); + auto path2 = tmp2.path(); local_datagram_socket s1(ioc); local_datagram_socket s2(ioc); @@ -520,8 +497,6 @@ struct local_datagram_socket_test BOOST_TEST_EQ(src1.path(), path1); BOOST_TEST_EQ(src2.path(), path1); - cleanup_path(path1); - cleanup_path(path2); } void run() diff --git a/test/unit/local_stream_socket.cpp b/test/unit/local_stream_socket.cpp index 8ee995255..ac082d264 100644 --- a/test/unit/local_stream_socket.cpp +++ b/test/unit/local_stream_socket.cpp @@ -10,13 +10,10 @@ // Test that header file is self-contained. #include -#include - -#if BOOST_COROSIO_POSIX - #include #include #include +#include #include #include #include @@ -27,13 +24,19 @@ #include #include +#include + +#if BOOST_COROSIO_POSIX +#include +#else +#include +#endif + #include #include #include #include -#include - #include "context.hpp" #include "test_suite.hpp" @@ -44,29 +47,6 @@ namespace boost::corosio { static_assert(capy::ReadStream); static_assert(capy::WriteStream); -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_test_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace - template struct local_stream_socket_test { @@ -105,7 +85,8 @@ struct local_stream_socket_test { io_context ioc(Backend); auto ex = ioc.get_executor(); - auto path = make_temp_socket_path(); + test::temp_socket_dir tmp; + auto path = tmp.path(); local_stream_acceptor acc(ioc); acc.open(); @@ -140,7 +121,6 @@ struct local_stream_socket_test ioc.run(); ioc.restart(); - cleanup_path(path); BOOST_TEST_EQ(accept_done, true); BOOST_TEST_EQ(!accept_ec, true); @@ -154,7 +134,8 @@ struct local_stream_socket_test { io_context ioc(Backend); auto ex = ioc.get_executor(); - auto path = make_temp_socket_path(); + test::temp_socket_dir tmp; + auto path = tmp.path(); local_stream_acceptor acc(ioc); acc.open(); @@ -191,7 +172,6 @@ struct local_stream_socket_test ioc.run(); ioc.restart(); - cleanup_path(path); BOOST_TEST_EQ(accept_done, true); BOOST_TEST_EQ(!accept_ec, true); @@ -256,10 +236,12 @@ struct local_stream_socket_test BOOST_TEST_EQ(s2.is_open(), true); } +#if BOOST_COROSIO_POSIX void testUnlinkExisting() { io_context ioc(Backend); - auto path = make_temp_socket_path(); + test::temp_socket_dir tmp; + auto path = tmp.path(); // First bind creates the socket file { @@ -286,7 +268,6 @@ struct local_stream_socket_test BOOST_TEST_EQ(!ec, true); } - cleanup_path(path); } void testUnlinkNonexistent() @@ -294,7 +275,8 @@ struct local_stream_socket_test // unlink_existing on a path that doesn't exist should // succeed (unlink silently fails with ENOENT). io_context ioc(Backend); - auto path = make_temp_socket_path(); + test::temp_socket_dir tmp; + auto path = tmp.path(); local_stream_acceptor acc(ioc); acc.open(); @@ -302,8 +284,8 @@ struct local_stream_socket_test local_endpoint(path), bind_option::unlink_existing); BOOST_TEST_EQ(!ec, true); - cleanup_path(path); } +#endif void testEndpointOrdering() { @@ -344,8 +326,10 @@ struct local_stream_socket_test testMoveAccept(); testReadWrite(); testSocketPair(); +#if BOOST_COROSIO_POSIX testUnlinkExisting(); testUnlinkNonexistent(); +#endif testEndpointOrdering(); testEndpointStreamOutput(); testAvailable(); @@ -386,14 +370,21 @@ struct local_stream_socket_test BOOST_TEST_EQ(s1.is_open(), true); - int fd = s1.release(); - BOOST_TEST_EQ(fd >= 0, true); + auto handle = s1.release(); BOOST_TEST_EQ(s1.is_open(), false); - // The released fd is still valid -- write through it + // The released handle is still valid -- write through it char const msg[] = "released"; - BOOST_TEST_EQ(::write(fd, msg, std::strlen(msg)) > 0, true); - ::close(fd); +#if BOOST_COROSIO_HAS_IOCP + BOOST_TEST_EQ( + ::send(static_cast(handle), + msg, static_cast(std::strlen(msg)), 0) > 0, true); + ::closesocket(static_cast(handle)); +#else + BOOST_TEST_EQ(handle >= 0, true); + BOOST_TEST_EQ(::write(handle, msg, std::strlen(msg)) > 0, true); + ::close(handle); +#endif } void testEndpointStreamOutput() @@ -429,9 +420,3 @@ COROSIO_BACKEND_TESTS( local_stream_socket_test, "boost.corosio.local_stream_socket") } // namespace boost::corosio - -#else // !BOOST_COROSIO_POSIX - -// Empty on non-POSIX platforms - -#endif diff --git a/test/unit/wait.cpp b/test/unit/wait.cpp index e57b1a712..37fe38fa0 100644 --- a/test/unit/wait.cpp +++ b/test/unit/wait.cpp @@ -10,8 +10,9 @@ // Test that header is self-contained. #include -#include - +#include +#include +#include #include #include #include @@ -20,6 +21,7 @@ #include #include +#include #include #include @@ -32,47 +34,11 @@ #include #include -#if BOOST_COROSIO_POSIX -#include -#include -#include - -#include -#include - -#include -#endif - #include "context.hpp" #include "test_suite.hpp" namespace boost::corosio { -#if BOOST_COROSIO_POSIX -namespace { - -std::string -make_temp_socket_path() -{ - char tmpl[] = "/tmp/corosio_wait_XXXXXX"; - if (!::mkdtemp(tmpl)) - throw std::runtime_error("mkdtemp failed"); - std::string path(tmpl); - path += "/sock"; - return path; -} - -void -cleanup_path(std::string const& path) -{ - ::unlink(path.c_str()); - auto dir = path.substr(0, path.rfind('/')); - ::rmdir(dir.c_str()); -} - -} // namespace -#endif - template struct wait_test { @@ -144,13 +110,13 @@ struct wait_test BOOST_TEST(!wait_ec); } -#if BOOST_COROSIO_POSIX // local_stream_socket wait_read fires when the peer writes. void testWaitOnLocalStream() { io_context ioc(Backend); auto ex = ioc.get_executor(); - auto path = make_temp_socket_path(); + test::temp_socket_dir tmp; + auto path = tmp.path(); local_stream_acceptor acc(ioc); acc.open(); @@ -196,12 +162,9 @@ struct wait_test capy::run_async(ex)(writer()); ioc.run(); - cleanup_path(path); - BOOST_TEST(wait_done); BOOST_TEST(!wait_ec); } -#endif // BOOST_COROSIO_POSIX // Cancellation via socket.cancel() yields operation_canceled. void testCancellation() @@ -360,9 +323,7 @@ struct wait_test { testWaitReadAndNoConsume(); testWaitWriteImmediate(); -#if BOOST_COROSIO_POSIX testWaitOnLocalStream(); -#endif testCancellation(); testAcceptorWait(); testWaitOnUdp();