LCOV - code coverage report
Current view: top level - capy/io - any_write_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 94.1 % 101 95
Test Date: 2026-02-07 18:59:16 Functions: 91.5 % 59 54

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      11              : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_array.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/write_stream.hpp>
      19              : #include <boost/capy/coro.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : 
      23              : #include <concepts>
      24              : #include <coroutine>
      25              : #include <cstddef>
      26              : #include <new>
      27              : #include <span>
      28              : #include <stop_token>
      29              : #include <system_error>
      30              : #include <utility>
      31              : 
      32              : namespace boost {
      33              : namespace capy {
      34              : 
      35              : /** Type-erased wrapper for any WriteStream.
      36              : 
      37              :     This class provides type erasure for any type satisfying the
      38              :     @ref WriteStream concept, enabling runtime polymorphism for
      39              :     write operations. It uses cached awaitable storage to achieve
      40              :     zero steady-state allocation after construction.
      41              : 
      42              :     The wrapper supports two construction modes:
      43              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      44              :       allocates storage and owns the stream.
      45              :     - **Reference**: Pass a pointer to wrap without ownership. The
      46              :       pointed-to stream must outlive this wrapper.
      47              : 
      48              :     @par Awaitable Preallocation
      49              :     The constructor preallocates storage for the type-erased awaitable.
      50              :     This reserves all virtual address space at server startup
      51              :     so memory usage can be measured up front, rather than
      52              :     allocating piecemeal as traffic arrives.
      53              : 
      54              :     @par Immediate Completion
      55              :     Operations complete immediately without suspending when the
      56              :     buffer sequence is empty, or when the underlying stream's
      57              :     awaitable reports readiness via `await_ready`.
      58              : 
      59              :     @par Thread Safety
      60              :     Not thread-safe. Concurrent operations on the same wrapper
      61              :     are undefined behavior.
      62              : 
      63              :     @par Example
      64              :     @code
      65              :     // Owning - takes ownership of the stream
      66              :     any_write_stream stream(socket{ioc});
      67              : 
      68              :     // Reference - wraps without ownership
      69              :     socket sock(ioc);
      70              :     any_write_stream stream(&sock);
      71              : 
      72              :     const_buffer buf(data, size);
      73              :     auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
      74              :     @endcode
      75              : 
      76              :     @see any_read_stream, any_stream, WriteStream
      77              : */
      78              : class any_write_stream
      79              : {
      80              :     struct vtable;
      81              : 
      82              :     template<WriteStream S>
      83              :     struct vtable_for_impl;
      84              : 
      85              :     // ordered for cache line coherence
      86              :     void* stream_ = nullptr;
      87              :     vtable const* vt_ = nullptr;
      88              :     void* cached_awaitable_ = nullptr;
      89              :     void* storage_ = nullptr;
      90              :     bool awaitable_active_ = false;
      91              : 
      92              : public:
      93              :     /** Destructor.
      94              : 
      95              :         Destroys the owned stream (if any) and releases the cached
      96              :         awaitable storage.
      97              :     */
      98              :     ~any_write_stream();
      99              : 
     100              :     /** Default constructor.
     101              : 
     102              :         Constructs an empty wrapper. Operations on a default-constructed
     103              :         wrapper result in undefined behavior.
     104              :     */
     105            1 :     any_write_stream() = default;
     106              : 
     107              :     /** Non-copyable.
     108              : 
     109              :         The awaitable cache is per-instance and cannot be shared.
     110              :     */
     111              :     any_write_stream(any_write_stream const&) = delete;
     112              :     any_write_stream& operator=(any_write_stream const&) = delete;
     113              : 
     114              :     /** Move constructor.
     115              : 
     116              :         Transfers ownership of the wrapped stream (if owned) and
     117              :         cached awaitable storage from `other`. After the move, `other` is
     118              :         in a default-constructed state.
     119              : 
     120              :         @param other The wrapper to move from.
     121              :     */
     122            2 :     any_write_stream(any_write_stream&& other) noexcept
     123            2 :         : stream_(std::exchange(other.stream_, nullptr))
     124            2 :         , vt_(std::exchange(other.vt_, nullptr))
     125            2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     126            2 :         , storage_(std::exchange(other.storage_, nullptr))
     127            2 :         , awaitable_active_(std::exchange(other.awaitable_active_, false))
     128              :     {
     129            2 :     }
     130              : 
     131              :     /** Move assignment operator.
     132              : 
     133              :         Destroys any owned stream and releases existing resources,
     134              :         then transfers ownership from `other`.
     135              : 
     136              :         @param other The wrapper to move from.
     137              :         @return Reference to this wrapper.
     138              :     */
     139              :     any_write_stream&
     140              :     operator=(any_write_stream&& other) noexcept;
     141              : 
     142              :     /** Construct by taking ownership of a WriteStream.
     143              : 
     144              :         Allocates storage and moves the stream into this wrapper.
     145              :         The wrapper owns the stream and will destroy it.
     146              : 
     147              :         @param s The stream to take ownership of.
     148              :     */
     149              :     template<WriteStream S>
     150              :         requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     151              :     any_write_stream(S s);
     152              : 
     153              :     /** Construct by wrapping a WriteStream without ownership.
     154              : 
     155              :         Wraps the given stream by pointer. The stream must remain
     156              :         valid for the lifetime of this wrapper.
     157              : 
     158              :         @param s Pointer to the stream to wrap.
     159              :     */
     160              :     template<WriteStream S>
     161              :     any_write_stream(S* s);
     162              : 
     163              :     /** Check if the wrapper contains a valid stream.
     164              : 
     165              :         @return `true` if wrapping a stream, `false` if default-constructed
     166              :             or moved-from.
     167              :     */
     168              :     bool
     169           21 :     has_value() const noexcept
     170              :     {
     171           21 :         return stream_ != nullptr;
     172              :     }
     173              : 
     174              :     /** Check if the wrapper contains a valid stream.
     175              : 
     176              :         @return `true` if wrapping a stream, `false` if default-constructed
     177              :             or moved-from.
     178              :     */
     179              :     explicit
     180            3 :     operator bool() const noexcept
     181              :     {
     182            3 :         return has_value();
     183              :     }
     184              : 
     185              :     /** Initiate an asynchronous write operation.
     186              : 
     187              :         Writes data from the provided buffer sequence. The operation
     188              :         completes when at least one byte has been written, or an error
     189              :         occurs.
     190              : 
     191              :         @param buffers The buffer sequence containing data to write.
     192              :             Passed by value to ensure the sequence lives in the
     193              :             coroutine frame across suspension points.
     194              : 
     195              :         @return An awaitable yielding `(error_code,std::size_t)`.
     196              : 
     197              :         @par Immediate Completion
     198              :         The operation completes immediately without suspending
     199              :         the calling coroutine when:
     200              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     201              :         @li The underlying stream's awaitable reports immediate
     202              :             readiness via `await_ready`.
     203              : 
     204              :         @note This is a partial operation and may not process the
     205              :         entire buffer sequence. Use the composed @ref write algorithm
     206              :         for guaranteed complete transfer.
     207              : 
     208              :         @par Preconditions
     209              :         The wrapper must contain a valid stream (`has_value() == true`).
     210              :     */
     211              :     template<ConstBufferSequence CB>
     212              :     auto
     213              :     write_some(CB buffers);
     214              : 
     215              : protected:
     216              :     /** Rebind to a new stream after move.
     217              : 
     218              :         Updates the internal pointer to reference a new stream object.
     219              :         Used by owning wrappers after move assignment when the owned
     220              :         object has moved to a new location.
     221              : 
     222              :         @param new_stream The new stream to bind to. Must be the same
     223              :             type as the original stream.
     224              : 
     225              :         @note Terminates if called with a stream of different type
     226              :             than the original.
     227              :     */
     228              :     template<WriteStream S>
     229              :     void
     230              :     rebind(S& new_stream) noexcept
     231              :     {
     232              :         if(vt_ != &vtable_for_impl<S>::value)
     233              :             std::terminate();
     234              :         stream_ = &new_stream;
     235              :     }
     236              : };
     237              : 
     238              : //----------------------------------------------------------
     239              : 
     240              : struct any_write_stream::vtable
     241              : {
     242              :     // ordered by call frequency for cache line coherence
     243              :     void (*construct_awaitable)(
     244              :         void* stream,
     245              :         void* storage,
     246              :         std::span<const_buffer const> buffers);
     247              :     bool (*await_ready)(void*);
     248              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     249              :     io_result<std::size_t> (*await_resume)(void*);
     250              :     void (*destroy_awaitable)(void*) noexcept;
     251              :     std::size_t awaitable_size;
     252              :     std::size_t awaitable_align;
     253              :     void (*destroy)(void*) noexcept;
     254              : };
     255              : 
     256              : template<WriteStream S>
     257              : struct any_write_stream::vtable_for_impl
     258              : {
     259              :     using Awaitable = decltype(std::declval<S&>().write_some(
     260              :         std::span<const_buffer const>{}));
     261              : 
     262              :     static void
     263            1 :     do_destroy_impl(void* stream) noexcept
     264              :     {
     265            1 :         static_cast<S*>(stream)->~S();
     266            1 :     }
     267              : 
     268              :     static void
     269           75 :     construct_awaitable_impl(
     270              :         void* stream,
     271              :         void* storage,
     272              :         std::span<const_buffer const> buffers)
     273              :     {
     274           75 :         auto& s = *static_cast<S*>(stream);
     275           75 :         ::new(storage) Awaitable(s.write_some(buffers));
     276           75 :     }
     277              : 
     278              :     static constexpr vtable value = {
     279              :         &construct_awaitable_impl,
     280           75 :         +[](void* p) {
     281           75 :             return static_cast<Awaitable*>(p)->await_ready();
     282              :         },
     283            2 :         +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     284            4 :             return detail::call_await_suspend(
     285            2 :                 static_cast<Awaitable*>(p), h, ex, token);
     286              :         },
     287           73 :         +[](void* p) {
     288           73 :             return static_cast<Awaitable*>(p)->await_resume();
     289              :         },
     290           77 :         +[](void* p) noexcept {
     291           12 :             static_cast<Awaitable*>(p)->~Awaitable();
     292              :         },
     293              :         sizeof(Awaitable),
     294              :         alignof(Awaitable),
     295              :         &do_destroy_impl
     296              :     };
     297              : };
     298              : 
     299              : //----------------------------------------------------------
     300              : 
     301              : inline
     302           95 : any_write_stream::~any_write_stream()
     303              : {
     304           95 :     if(storage_)
     305              :     {
     306            1 :         vt_->destroy(stream_);
     307            1 :         ::operator delete(storage_);
     308              :     }
     309           95 :     if(cached_awaitable_)
     310              :     {
     311           85 :         if(awaitable_active_)
     312            1 :             vt_->destroy_awaitable(cached_awaitable_);
     313           85 :         ::operator delete(cached_awaitable_);
     314              :     }
     315           95 : }
     316              : 
     317              : inline any_write_stream&
     318            5 : any_write_stream::operator=(any_write_stream&& other) noexcept
     319              : {
     320            5 :     if(this != &other)
     321              :     {
     322            5 :         if(storage_)
     323              :         {
     324            0 :             vt_->destroy(stream_);
     325            0 :             ::operator delete(storage_);
     326              :         }
     327            5 :         if(cached_awaitable_)
     328              :         {
     329            2 :             if(awaitable_active_)
     330            1 :                 vt_->destroy_awaitable(cached_awaitable_);
     331            2 :             ::operator delete(cached_awaitable_);
     332              :         }
     333            5 :         stream_ = std::exchange(other.stream_, nullptr);
     334            5 :         vt_ = std::exchange(other.vt_, nullptr);
     335            5 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     336            5 :         storage_ = std::exchange(other.storage_, nullptr);
     337            5 :         awaitable_active_ = std::exchange(other.awaitable_active_, false);
     338              :     }
     339            5 :     return *this;
     340              : }
     341              : 
     342              : template<WriteStream S>
     343              :     requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     344            1 : any_write_stream::any_write_stream(S s)
     345            1 :     : vt_(&vtable_for_impl<S>::value)
     346              : {
     347              :     struct guard {
     348              :         any_write_stream* self;
     349              :         bool committed = false;
     350            1 :         ~guard() {
     351            1 :             if(!committed && self->storage_) {
     352            0 :                 self->vt_->destroy(self->stream_);
     353            0 :                 ::operator delete(self->storage_);
     354            0 :                 self->storage_ = nullptr;
     355            0 :                 self->stream_ = nullptr;
     356              :             }
     357            1 :         }
     358            1 :     } g{this};
     359              : 
     360            1 :     storage_ = ::operator new(sizeof(S));
     361            1 :     stream_ = ::new(storage_) S(std::move(s));
     362              : 
     363              :     // Preallocate the awaitable storage
     364            1 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     365              : 
     366            1 :     g.committed = true;
     367            1 : }
     368              : 
     369              : template<WriteStream S>
     370           86 : any_write_stream::any_write_stream(S* s)
     371           86 :     : stream_(s)
     372           86 :     , vt_(&vtable_for_impl<S>::value)
     373              : {
     374              :     // Preallocate the awaitable storage
     375           86 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     376           86 : }
     377              : 
     378              : //----------------------------------------------------------
     379              : 
     380              : template<ConstBufferSequence CB>
     381              : auto
     382           79 : any_write_stream::write_some(CB buffers)
     383              : {
     384              :     struct awaitable
     385              :     {
     386              :         any_write_stream* self_;
     387              :         const_buffer_array<detail::max_iovec_> ba_;
     388              : 
     389           79 :         awaitable(
     390              :             any_write_stream* self,
     391              :             CB const& buffers) noexcept
     392           79 :             : self_(self)
     393           79 :             , ba_(buffers)
     394              :         {
     395           79 :         }
     396              : 
     397              :         bool
     398           79 :         await_ready() const noexcept
     399              :         {
     400           79 :             return ba_.to_span().empty();
     401              :         }
     402              : 
     403              :         coro
     404           75 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     405              :         {
     406           75 :             self_->vt_->construct_awaitable(
     407           75 :                 self_->stream_,
     408           75 :                 self_->cached_awaitable_,
     409           75 :                 ba_.to_span());
     410           75 :             self_->awaitable_active_ = true;
     411              : 
     412           75 :             if(self_->vt_->await_ready(self_->cached_awaitable_))
     413           73 :                 return h;
     414              : 
     415            4 :             return self_->vt_->await_suspend(
     416            2 :                 self_->cached_awaitable_, h, ex, token);
     417              :         }
     418              : 
     419              :         io_result<std::size_t>
     420           77 :         await_resume()
     421              :         {
     422           77 :             if(!self_->awaitable_active_)
     423            4 :                 return {{}, 0};
     424              :             struct guard {
     425              :                 any_write_stream* self;
     426           73 :                 ~guard() {
     427           73 :                     self->vt_->destroy_awaitable(self->cached_awaitable_);
     428           73 :                     self->awaitable_active_ = false;
     429           73 :                 }
     430           73 :             } g{self_};
     431           73 :             return self_->vt_->await_resume(
     432           73 :                 self_->cached_awaitable_);
     433           73 :         }
     434              :     };
     435           79 :     return awaitable{this, buffers};
     436              : }
     437              : 
     438              : } // namespace capy
     439              : } // namespace boost
     440              : 
     441              : #endif
        

Generated by: LCOV version 2.3