LCOV - code coverage report
Current view: top level - capy/io - any_write_sink.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 90.6 % 212 192
Test Date: 2026-02-07 18:59:16 Functions: 77.9 % 77 60

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      11              : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_array.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/concept/io_awaitable.hpp>
      19              : #include <boost/capy/concept/write_sink.hpp>
      20              : #include <boost/capy/coro.hpp>
      21              : #include <boost/capy/ex/executor_ref.hpp>
      22              : #include <boost/capy/io_result.hpp>
      23              : #include <boost/capy/io_task.hpp>
      24              : 
      25              : #include <concepts>
      26              : #include <coroutine>
      27              : #include <cstddef>
      28              : #include <exception>
      29              : #include <new>
      30              : #include <span>
      31              : #include <stop_token>
      32              : #include <system_error>
      33              : #include <utility>
      34              : 
      35              : namespace boost {
      36              : namespace capy {
      37              : 
      38              : /** Type-erased wrapper for any WriteSink.
      39              : 
      40              :     This class provides type erasure for any type satisfying the
      41              :     @ref WriteSink concept, enabling runtime polymorphism for
      42              :     sink write operations. It uses cached awaitable storage to achieve
      43              :     zero steady-state allocation after construction.
      44              : 
      45              :     The wrapper supports two construction modes:
      46              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      47              :       allocates storage and owns the sink.
      48              :     - **Reference**: Pass a pointer to wrap without ownership. The
      49              :       pointed-to sink must outlive this wrapper.
      50              : 
      51              :     @par Awaitable Preallocation
      52              :     The constructor preallocates storage for the type-erased awaitable.
      53              :     This reserves all virtual address space at server startup
      54              :     so memory usage can be measured up front, rather than
      55              :     allocating piecemeal as traffic arrives.
      56              : 
      57              :     @par Immediate Completion
      58              :     Operations complete immediately without suspending when the
      59              :     buffer sequence is empty, or when the underlying sink's
      60              :     awaitable reports readiness via `await_ready`.
      61              : 
      62              :     @par Thread Safety
      63              :     Not thread-safe. Concurrent operations on the same wrapper
      64              :     are undefined behavior.
      65              : 
      66              :     @par Example
      67              :     @code
      68              :     // Owning - takes ownership of the sink
      69              :     any_write_sink ws(some_sink{args...});
      70              : 
      71              :     // Reference - wraps without ownership
      72              :     some_sink sink;
      73              :     any_write_sink ws(&sink);
      74              : 
      75              :     const_buffer buf(data, size);
      76              :     auto [ec, n] = co_await ws.write(std::span(&buf, 1));
      77              :     auto [ec2] = co_await ws.write_eof();
      78              :     @endcode
      79              : 
      80              :     @see any_write_stream, WriteSink
      81              : */
      82              : class any_write_sink
      83              : {
      84              :     struct vtable;
      85              :     struct write_awaitable_ops;
      86              :     struct eof_awaitable_ops;
      87              : 
      88              :     template<WriteSink S>
      89              :     struct vtable_for_impl;
      90              : 
      91              :     void* sink_ = nullptr;
      92              :     vtable const* vt_ = nullptr;
      93              :     void* cached_awaitable_ = nullptr;
      94              :     void* storage_ = nullptr;
      95              :     write_awaitable_ops const* active_write_ops_ = nullptr;
      96              :     eof_awaitable_ops const* active_eof_ops_ = nullptr;
      97              : 
      98              : public:
      99              :     /** Destructor.
     100              : 
     101              :         Destroys the owned sink (if any) and releases the cached
     102              :         awaitable storage.
     103              :     */
     104              :     ~any_write_sink();
     105              : 
     106              :     /** Default constructor.
     107              : 
     108              :         Constructs an empty wrapper. Operations on a default-constructed
     109              :         wrapper result in undefined behavior.
     110              :     */
     111              :     any_write_sink() = default;
     112              : 
     113              :     /** Non-copyable.
     114              : 
     115              :         The awaitable cache is per-instance and cannot be shared.
     116              :     */
     117              :     any_write_sink(any_write_sink const&) = delete;
     118              :     any_write_sink& operator=(any_write_sink const&) = delete;
     119              : 
     120              :     /** Move constructor.
     121              : 
     122              :         Transfers ownership of the wrapped sink (if owned) and
     123              :         cached awaitable storage from `other`. After the move, `other` is
     124              :         in a default-constructed state.
     125              : 
     126              :         @param other The wrapper to move from.
     127              :     */
     128            1 :     any_write_sink(any_write_sink&& other) noexcept
     129            1 :         : sink_(std::exchange(other.sink_, nullptr))
     130            1 :         , vt_(std::exchange(other.vt_, nullptr))
     131            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     132            1 :         , storage_(std::exchange(other.storage_, nullptr))
     133            1 :         , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
     134            1 :         , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
     135              :     {
     136            1 :     }
     137              : 
     138              :     /** Move assignment operator.
     139              : 
     140              :         Destroys any owned sink and releases existing resources,
     141              :         then transfers ownership from `other`.
     142              : 
     143              :         @param other The wrapper to move from.
     144              :         @return Reference to this wrapper.
     145              :     */
     146              :     any_write_sink&
     147              :     operator=(any_write_sink&& other) noexcept;
     148              : 
     149              :     /** Construct by taking ownership of a WriteSink.
     150              : 
     151              :         Allocates storage and moves the sink into this wrapper.
     152              :         The wrapper owns the sink and will destroy it.
     153              : 
     154              :         @param s The sink to take ownership of.
     155              :     */
     156              :     template<WriteSink S>
     157              :         requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     158              :     any_write_sink(S s);
     159              : 
     160              :     /** Construct by wrapping a WriteSink without ownership.
     161              : 
     162              :         Wraps the given sink by pointer. The sink must remain
     163              :         valid for the lifetime of this wrapper.
     164              : 
     165              :         @param s Pointer to the sink to wrap.
     166              :     */
     167              :     template<WriteSink S>
     168              :     any_write_sink(S* s);
     169              : 
     170              :     /** Check if the wrapper contains a valid sink.
     171              : 
     172              :         @return `true` if wrapping a sink, `false` if default-constructed
     173              :             or moved-from.
     174              :     */
     175              :     bool
     176           15 :     has_value() const noexcept
     177              :     {
     178           15 :         return sink_ != nullptr;
     179              :     }
     180              : 
     181              :     /** Check if the wrapper contains a valid sink.
     182              : 
     183              :         @return `true` if wrapping a sink, `false` if default-constructed
     184              :             or moved-from.
     185              :     */
     186              :     explicit
     187            2 :     operator bool() const noexcept
     188              :     {
     189            2 :         return has_value();
     190              :     }
     191              : 
     192              :     /** Initiate a partial write operation.
     193              : 
     194              :         Writes one or more bytes from the provided buffer sequence.
     195              :         May consume less than the full sequence.
     196              : 
     197              :         @param buffers The buffer sequence containing data to write.
     198              : 
     199              :         @return An awaitable yielding `(error_code,std::size_t)`.
     200              : 
     201              :         @par Immediate Completion
     202              :         The operation completes immediately without suspending
     203              :         the calling coroutine when:
     204              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     205              :         @li The underlying sink's awaitable reports immediate
     206              :             readiness via `await_ready`.
     207              : 
     208              :         @note This is a partial operation and may not process the
     209              :         entire buffer sequence. Use @ref write for guaranteed
     210              :         complete transfer.
     211              : 
     212              :         @par Preconditions
     213              :         The wrapper must contain a valid sink (`has_value() == true`).
     214              :     */
     215              :     template<ConstBufferSequence CB>
     216              :     auto
     217              :     write_some(CB buffers);
     218              : 
     219              :     /** Initiate a complete write operation.
     220              : 
     221              :         Writes data from the provided buffer sequence. The operation
     222              :         completes when all bytes have been consumed, or an error
     223              :         occurs. Forwards to the underlying sink's `write` operation,
     224              :         windowed through @ref buffer_param when the sequence exceeds
     225              :         the per-call buffer limit.
     226              : 
     227              :         @param buffers The buffer sequence containing data to write.
     228              : 
     229              :         @return An awaitable yielding `(error_code,std::size_t)`.
     230              : 
     231              :         @par Immediate Completion
     232              :         The operation completes immediately without suspending
     233              :         the calling coroutine when:
     234              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     235              :         @li Every underlying `write` call completes
     236              :             immediately (the wrapped sink reports readiness
     237              :             via `await_ready` on each iteration).
     238              : 
     239              :         @par Preconditions
     240              :         The wrapper must contain a valid sink (`has_value() == true`).
     241              :     */
     242              :     template<ConstBufferSequence CB>
     243              :     io_task<std::size_t>
     244              :     write(CB buffers);
     245              : 
     246              :     /** Atomically write data and signal end-of-stream.
     247              : 
     248              :         Writes all data from the buffer sequence and then signals
     249              :         end-of-stream. The implementation decides how to partition
     250              :         the data across calls to the underlying sink's @ref write
     251              :         and `write_eof`. When the caller's buffer sequence is
     252              :         non-empty, the final call to the underlying sink is always
     253              :         `write_eof` with a non-empty buffer sequence. When the
     254              :         caller's buffer sequence is empty, only `write_eof()` with
     255              :         no data is called.
     256              : 
     257              :         @param buffers The buffer sequence containing data to write.
     258              : 
     259              :         @return An awaitable yielding `(error_code,std::size_t)`.
     260              : 
     261              :         @par Immediate Completion
     262              :         The operation completes immediately without suspending
     263              :         the calling coroutine when:
     264              :         @li The buffer sequence is empty. Only the @ref write_eof()
     265              :             call is performed.
     266              :         @li All underlying operations complete immediately (the
     267              :             wrapped sink reports readiness via `await_ready`).
     268              : 
     269              :         @par Preconditions
     270              :         The wrapper must contain a valid sink (`has_value() == true`).
     271              :     */
     272              :     template<ConstBufferSequence CB>
     273              :     io_task<std::size_t>
     274              :     write_eof(CB buffers);
     275              : 
     276              :     /** Signal end of data.
     277              : 
     278              :         Indicates that no more data will be written to the sink.
     279              :         The operation completes when the sink is finalized, or
     280              :         an error occurs.
     281              : 
     282              :         @return An awaitable yielding `(error_code)`.
     283              : 
     284              :         @par Immediate Completion
     285              :         The operation completes immediately without suspending
     286              :         the calling coroutine when the underlying sink's awaitable
     287              :         reports immediate readiness via `await_ready`.
     288              : 
     289              :         @par Preconditions
     290              :         The wrapper must contain a valid sink (`has_value() == true`).
     291              :     */
     292              :     auto
     293              :     write_eof();
     294              : 
     295              : protected:
     296              :     /** Rebind to a new sink after move.
     297              : 
     298              :         Updates the internal pointer to reference a new sink object.
     299              :         Used by owning wrappers after move assignment when the owned
     300              :         object has moved to a new location.
     301              : 
     302              :         @param new_sink The new sink to bind to. Must be the same
     303              :             type as the original sink.
     304              : 
     305              :         @note Terminates if called with a sink of different type
     306              :             than the original.
     307              :     */
     308              :     template<WriteSink S>
     309              :     void
     310              :     rebind(S& new_sink) noexcept
     311              :     {
     312              :         if(vt_ != &vtable_for_impl<S>::value)
     313              :             std::terminate();
     314              :         sink_ = &new_sink;
     315              :     }
     316              : 
     317              : private:
     318              :     auto
     319              :     write_some_(std::span<const_buffer const> buffers);
     320              : 
     321              :     auto
     322              :     write_(std::span<const_buffer const> buffers);
     323              : 
     324              :     auto
     325              :     write_eof_buffers_(std::span<const_buffer const> buffers);
     326              : };
     327              : 
     328              : //----------------------------------------------------------
     329              : 
     330              : struct any_write_sink::write_awaitable_ops
     331              : {
     332              :     bool (*await_ready)(void*);
     333              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     334              :     io_result<std::size_t> (*await_resume)(void*);
     335              :     void (*destroy)(void*) noexcept;
     336              : };
     337              : 
     338              : struct any_write_sink::eof_awaitable_ops
     339              : {
     340              :     bool (*await_ready)(void*);
     341              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     342              :     io_result<> (*await_resume)(void*);
     343              :     void (*destroy)(void*) noexcept;
     344              : };
     345              : 
     346              : struct any_write_sink::vtable
     347              : {
     348              :     write_awaitable_ops const* (*construct_write_some_awaitable)(
     349              :         void* sink,
     350              :         void* storage,
     351              :         std::span<const_buffer const> buffers);
     352              :     write_awaitable_ops const* (*construct_write_awaitable)(
     353              :         void* sink,
     354              :         void* storage,
     355              :         std::span<const_buffer const> buffers);
     356              :     write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
     357              :         void* sink,
     358              :         void* storage,
     359              :         std::span<const_buffer const> buffers);
     360              :     eof_awaitable_ops const* (*construct_eof_awaitable)(
     361              :         void* sink,
     362              :         void* storage);
     363              :     std::size_t awaitable_size;
     364              :     std::size_t awaitable_align;
     365              :     void (*destroy)(void*) noexcept;
     366              : };
     367              : 
     368              : template<WriteSink S>
     369              : struct any_write_sink::vtable_for_impl
     370              : {
     371              :     using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
     372              :         std::span<const_buffer const>{}));
     373              :     using WriteAwaitable = decltype(std::declval<S&>().write(
     374              :         std::span<const_buffer const>{}));
     375              :     using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
     376              :         std::span<const_buffer const>{}));
     377              :     using EofAwaitable = decltype(std::declval<S&>().write_eof());
     378              : 
     379              :     static void
     380            6 :     do_destroy_impl(void* sink) noexcept
     381              :     {
     382            6 :         static_cast<S*>(sink)->~S();
     383            6 :     }
     384              : 
     385              :     static write_awaitable_ops const*
     386           40 :     construct_write_some_awaitable_impl(
     387              :         void* sink,
     388              :         void* storage,
     389              :         std::span<const_buffer const> buffers)
     390              :     {
     391           40 :         auto& s = *static_cast<S*>(sink);
     392           40 :         ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
     393              : 
     394              :         static constexpr write_awaitable_ops ops = {
     395           40 :             +[](void* p) {
     396           40 :                 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
     397              :             },
     398            2 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     399            4 :                 return detail::call_await_suspend(
     400            2 :                     static_cast<WriteSomeAwaitable*>(p), h, ex, token);
     401              :             },
     402           38 :             +[](void* p) {
     403           38 :                 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
     404              :             },
     405           42 :             +[](void* p) noexcept {
     406            2 :                 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
     407              :             }
     408              :         };
     409           40 :         return &ops;
     410              :     }
     411              : 
     412              :     static write_awaitable_ops const*
     413           78 :     construct_write_awaitable_impl(
     414              :         void* sink,
     415              :         void* storage,
     416              :         std::span<const_buffer const> buffers)
     417              :     {
     418           78 :         auto& s = *static_cast<S*>(sink);
     419           78 :         ::new(storage) WriteAwaitable(s.write(buffers));
     420              : 
     421              :         static constexpr write_awaitable_ops ops = {
     422           78 :             +[](void* p) {
     423           78 :                 return static_cast<WriteAwaitable*>(p)->await_ready();
     424              :             },
     425            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     426            0 :                 return detail::call_await_suspend(
     427            0 :                     static_cast<WriteAwaitable*>(p), h, ex, token);
     428              :             },
     429           78 :             +[](void* p) {
     430           78 :                 return static_cast<WriteAwaitable*>(p)->await_resume();
     431              :             },
     432           78 :             +[](void* p) noexcept {
     433            0 :                 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
     434              :             }
     435              :         };
     436           78 :         return &ops;
     437              :     }
     438              : 
     439              :     static write_awaitable_ops const*
     440           16 :     construct_write_eof_buffers_awaitable_impl(
     441              :         void* sink,
     442              :         void* storage,
     443              :         std::span<const_buffer const> buffers)
     444              :     {
     445           16 :         auto& s = *static_cast<S*>(sink);
     446           16 :         ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
     447              : 
     448              :         static constexpr write_awaitable_ops ops = {
     449           16 :             +[](void* p) {
     450           16 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
     451              :             },
     452            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     453            0 :                 return detail::call_await_suspend(
     454            0 :                     static_cast<WriteEofBuffersAwaitable*>(p), h, ex, token);
     455              :             },
     456           16 :             +[](void* p) {
     457           16 :                 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
     458              :             },
     459           16 :             +[](void* p) noexcept {
     460            0 :                 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
     461              :             }
     462              :         };
     463           16 :         return &ops;
     464              :     }
     465              : 
     466              :     static eof_awaitable_ops const*
     467           17 :     construct_eof_awaitable_impl(
     468              :         void* sink,
     469              :         void* storage)
     470              :     {
     471           17 :         auto& s = *static_cast<S*>(sink);
     472           17 :         ::new(storage) EofAwaitable(s.write_eof());
     473              : 
     474              :         static constexpr eof_awaitable_ops ops = {
     475           17 :             +[](void* p) {
     476           17 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     477              :             },
     478            1 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     479            2 :                 return detail::call_await_suspend(
     480            1 :                     static_cast<EofAwaitable*>(p), h, ex, token);
     481              :             },
     482           16 :             +[](void* p) {
     483           16 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     484              :             },
     485           18 :             +[](void* p) noexcept {
     486            1 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     487              :             }
     488              :         };
     489           17 :         return &ops;
     490              :     }
     491              : 
     492              :     static constexpr std::size_t max4(
     493              :         std::size_t a, std::size_t b,
     494              :         std::size_t c, std::size_t d) noexcept
     495              :     {
     496              :         std::size_t ab = a > b ? a : b;
     497              :         std::size_t cd = c > d ? c : d;
     498              :         return ab > cd ? ab : cd;
     499              :     }
     500              : 
     501              :     static constexpr std::size_t max_awaitable_size =
     502              :         max4(sizeof(WriteSomeAwaitable),
     503              :              sizeof(WriteAwaitable),
     504              :              sizeof(WriteEofBuffersAwaitable),
     505              :              sizeof(EofAwaitable));
     506              : 
     507              :     static constexpr std::size_t max_awaitable_align =
     508              :         max4(alignof(WriteSomeAwaitable),
     509              :              alignof(WriteAwaitable),
     510              :              alignof(WriteEofBuffersAwaitable),
     511              :              alignof(EofAwaitable));
     512              : 
     513              :     static constexpr vtable value = {
     514              :         &construct_write_some_awaitable_impl,
     515              :         &construct_write_awaitable_impl,
     516              :         &construct_write_eof_buffers_awaitable_impl,
     517              :         &construct_eof_awaitable_impl,
     518              :         max_awaitable_size,
     519              :         max_awaitable_align,
     520              :         &do_destroy_impl
     521              :     };
     522              : };
     523              : 
     524              : //----------------------------------------------------------
     525              : 
     526              : inline
     527          129 : any_write_sink::~any_write_sink()
     528              : {
     529          129 :     if(storage_)
     530              :     {
     531            6 :         vt_->destroy(sink_);
     532            6 :         ::operator delete(storage_);
     533              :     }
     534          129 :     if(cached_awaitable_)
     535              :     {
     536          124 :         if(active_write_ops_)
     537            1 :             active_write_ops_->destroy(cached_awaitable_);
     538          123 :         else if(active_eof_ops_)
     539            1 :             active_eof_ops_->destroy(cached_awaitable_);
     540          124 :         ::operator delete(cached_awaitable_);
     541              :     }
     542          129 : }
     543              : 
     544              : inline any_write_sink&
     545            2 : any_write_sink::operator=(any_write_sink&& other) noexcept
     546              : {
     547            2 :     if(this != &other)
     548              :     {
     549            2 :         if(storage_)
     550              :         {
     551            0 :             vt_->destroy(sink_);
     552            0 :             ::operator delete(storage_);
     553              :         }
     554            2 :         if(cached_awaitable_)
     555              :         {
     556            1 :             if(active_write_ops_)
     557            1 :                 active_write_ops_->destroy(cached_awaitable_);
     558            0 :             else if(active_eof_ops_)
     559            0 :                 active_eof_ops_->destroy(cached_awaitable_);
     560            1 :             ::operator delete(cached_awaitable_);
     561              :         }
     562            2 :         sink_ = std::exchange(other.sink_, nullptr);
     563            2 :         vt_ = std::exchange(other.vt_, nullptr);
     564            2 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     565            2 :         storage_ = std::exchange(other.storage_, nullptr);
     566            2 :         active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
     567            2 :         active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
     568              :     }
     569            2 :     return *this;
     570              : }
     571              : 
     572              : template<WriteSink S>
     573              :     requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     574            6 : any_write_sink::any_write_sink(S s)
     575            6 :     : vt_(&vtable_for_impl<S>::value)
     576              : {
     577              :     struct guard {
     578              :         any_write_sink* self;
     579              :         bool committed = false;
     580            6 :         ~guard() {
     581            6 :             if(!committed && self->storage_) {
     582            0 :                 self->vt_->destroy(self->sink_);
     583            0 :                 ::operator delete(self->storage_);
     584            0 :                 self->storage_ = nullptr;
     585            0 :                 self->sink_ = nullptr;
     586              :             }
     587            6 :         }
     588            6 :     } g{this};
     589              : 
     590            6 :     storage_ = ::operator new(sizeof(S));
     591            6 :     sink_ = ::new(storage_) S(std::move(s));
     592              : 
     593              :     // Preallocate the awaitable storage (sized for max of write/eof)
     594            6 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     595              : 
     596            6 :     g.committed = true;
     597            6 : }
     598              : 
     599              : template<WriteSink S>
     600          119 : any_write_sink::any_write_sink(S* s)
     601          119 :     : sink_(s)
     602          119 :     , vt_(&vtable_for_impl<S>::value)
     603              : {
     604              :     // Preallocate the awaitable storage (sized for max of write/eof)
     605          119 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     606          119 : }
     607              : 
     608              : //----------------------------------------------------------
     609              : 
     610              : inline auto
     611              : any_write_sink::write_some_(
     612              :     std::span<const_buffer const> buffers)
     613              : {
     614              :     struct awaitable
     615              :     {
     616              :         any_write_sink* self_;
     617              :         std::span<const_buffer const> buffers_;
     618              : 
     619              :         bool
     620              :         await_ready() const noexcept
     621              :         {
     622              :             return false;
     623              :         }
     624              : 
     625              :         coro
     626              :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     627              :         {
     628              :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     629              :                 self_->sink_,
     630              :                 self_->cached_awaitable_,
     631              :                 buffers_);
     632              : 
     633              :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     634              :                 return h;
     635              : 
     636              :             return self_->active_write_ops_->await_suspend(
     637              :                 self_->cached_awaitable_, h, ex, token);
     638              :         }
     639              : 
     640              :         io_result<std::size_t>
     641              :         await_resume()
     642              :         {
     643              :             struct guard {
     644              :                 any_write_sink* self;
     645              :                 ~guard() {
     646              :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     647              :                     self->active_write_ops_ = nullptr;
     648              :                 }
     649              :             } g{self_};
     650              :             return self_->active_write_ops_->await_resume(
     651              :                 self_->cached_awaitable_);
     652              :         }
     653              :     };
     654              :     return awaitable{this, buffers};
     655              : }
     656              : 
     657              : inline auto
     658           78 : any_write_sink::write_(
     659              :     std::span<const_buffer const> buffers)
     660              : {
     661              :     struct awaitable
     662              :     {
     663              :         any_write_sink* self_;
     664              :         std::span<const_buffer const> buffers_;
     665              : 
     666              :         bool
     667           78 :         await_ready() const noexcept
     668              :         {
     669           78 :             return false;
     670              :         }
     671              : 
     672              :         coro
     673           78 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     674              :         {
     675          156 :             self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
     676           78 :                 self_->sink_,
     677           78 :                 self_->cached_awaitable_,
     678              :                 buffers_);
     679              : 
     680           78 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     681           78 :                 return h;
     682              : 
     683            0 :             return self_->active_write_ops_->await_suspend(
     684            0 :                 self_->cached_awaitable_, h, ex, token);
     685              :         }
     686              : 
     687              :         io_result<std::size_t>
     688           78 :         await_resume()
     689              :         {
     690              :             struct guard {
     691              :                 any_write_sink* self;
     692           78 :                 ~guard() {
     693           78 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     694           78 :                     self->active_write_ops_ = nullptr;
     695           78 :                 }
     696           78 :             } g{self_};
     697           78 :             return self_->active_write_ops_->await_resume(
     698          135 :                 self_->cached_awaitable_);
     699           78 :         }
     700              :     };
     701           78 :     return awaitable{this, buffers};
     702              : }
     703              : 
     704              : inline auto
     705           17 : any_write_sink::write_eof()
     706              : {
     707              :     struct awaitable
     708              :     {
     709              :         any_write_sink* self_;
     710              : 
     711              :         bool
     712           17 :         await_ready() const noexcept
     713              :         {
     714           17 :             return false;
     715              :         }
     716              : 
     717              :         coro
     718           17 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     719              :         {
     720              :             // Construct the underlying awaitable into cached storage
     721           34 :             self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
     722           17 :                 self_->sink_,
     723           17 :                 self_->cached_awaitable_);
     724              : 
     725              :             // Check if underlying is immediately ready
     726           17 :             if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
     727           16 :                 return h;
     728              : 
     729              :             // Forward to underlying awaitable
     730            2 :             return self_->active_eof_ops_->await_suspend(
     731            1 :                 self_->cached_awaitable_, h, ex, token);
     732              :         }
     733              : 
     734              :         io_result<>
     735           16 :         await_resume()
     736              :         {
     737              :             struct guard {
     738              :                 any_write_sink* self;
     739           16 :                 ~guard() {
     740           16 :                     self->active_eof_ops_->destroy(self->cached_awaitable_);
     741           16 :                     self->active_eof_ops_ = nullptr;
     742           16 :                 }
     743           16 :             } g{self_};
     744           16 :             return self_->active_eof_ops_->await_resume(
     745           27 :                 self_->cached_awaitable_);
     746           16 :         }
     747              :     };
     748           17 :     return awaitable{this};
     749              : }
     750              : 
     751              : inline auto
     752           16 : any_write_sink::write_eof_buffers_(
     753              :     std::span<const_buffer const> buffers)
     754              : {
     755              :     struct awaitable
     756              :     {
     757              :         any_write_sink* self_;
     758              :         std::span<const_buffer const> buffers_;
     759              : 
     760              :         bool
     761           16 :         await_ready() const noexcept
     762              :         {
     763           16 :             return false;
     764              :         }
     765              : 
     766              :         coro
     767           16 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     768              :         {
     769           32 :             self_->active_write_ops_ =
     770           32 :                 self_->vt_->construct_write_eof_buffers_awaitable(
     771           16 :                     self_->sink_,
     772           16 :                     self_->cached_awaitable_,
     773              :                     buffers_);
     774              : 
     775           16 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     776           16 :                 return h;
     777              : 
     778            0 :             return self_->active_write_ops_->await_suspend(
     779            0 :                 self_->cached_awaitable_, h, ex, token);
     780              :         }
     781              : 
     782              :         io_result<std::size_t>
     783           16 :         await_resume()
     784              :         {
     785              :             struct guard {
     786              :                 any_write_sink* self;
     787           16 :                 ~guard() {
     788           16 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     789           16 :                     self->active_write_ops_ = nullptr;
     790           16 :                 }
     791           16 :             } g{self_};
     792           16 :             return self_->active_write_ops_->await_resume(
     793           27 :                 self_->cached_awaitable_);
     794           16 :         }
     795              :     };
     796           16 :     return awaitable{this, buffers};
     797              : }
     798              : 
     799              : template<ConstBufferSequence CB>
     800              : auto
     801           42 : any_write_sink::write_some(CB buffers)
     802              : {
     803              :     struct awaitable
     804              :     {
     805              :         any_write_sink* self_;
     806              :         const_buffer_array<detail::max_iovec_> ba_;
     807              : 
     808           42 :         awaitable(
     809              :             any_write_sink* self,
     810              :             CB const& buffers)
     811           42 :             : self_(self)
     812           42 :             , ba_(buffers)
     813              :         {
     814           42 :         }
     815              : 
     816              :         bool
     817           42 :         await_ready() const noexcept
     818              :         {
     819           42 :             return ba_.to_span().empty();
     820              :         }
     821              : 
     822              :         coro
     823           40 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     824              :         {
     825           40 :             self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
     826           40 :                 self_->sink_,
     827           40 :                 self_->cached_awaitable_,
     828           40 :                 ba_.to_span());
     829              : 
     830           40 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     831           38 :                 return h;
     832              : 
     833            4 :             return self_->active_write_ops_->await_suspend(
     834            2 :                 self_->cached_awaitable_, h, ex, token);
     835              :         }
     836              : 
     837              :         io_result<std::size_t>
     838           40 :         await_resume()
     839              :         {
     840           40 :             if(ba_.to_span().empty())
     841            2 :                 return {{}, 0};
     842              : 
     843              :             struct guard {
     844              :                 any_write_sink* self;
     845           38 :                 ~guard() {
     846           38 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     847           38 :                     self->active_write_ops_ = nullptr;
     848           38 :                 }
     849           38 :             } g{self_};
     850           38 :             return self_->active_write_ops_->await_resume(
     851           38 :                 self_->cached_awaitable_);
     852           38 :         }
     853              :     };
     854           42 :     return awaitable{this, buffers};
     855              : }
     856              : 
     857              : template<ConstBufferSequence CB>
     858              : io_task<std::size_t>
     859           68 : any_write_sink::write(CB buffers)
     860              : {
     861              :     buffer_param<CB> bp(buffers);
     862              :     std::size_t total = 0;
     863              : 
     864              :     for(;;)
     865              :     {
     866              :         auto bufs = bp.data();
     867              :         if(bufs.empty())
     868              :             break;
     869              : 
     870              :         auto [ec, n] = co_await write_(bufs);
     871              :         total += n;
     872              :         if(ec)
     873              :             co_return {ec, total};
     874              :         bp.consume(n);
     875              :     }
     876              : 
     877              :     co_return {{}, total};
     878          136 : }
     879              : 
     880              : template<ConstBufferSequence CB>
     881              : io_task<std::size_t>
     882           26 : any_write_sink::write_eof(CB buffers)
     883              : {
     884              :     const_buffer_param<CB> bp(buffers);
     885              :     std::size_t total = 0;
     886              : 
     887              :     for(;;)
     888              :     {
     889              :         auto bufs = bp.data();
     890              :         if(bufs.empty())
     891              :         {
     892              :             auto [ec] = co_await write_eof();
     893              :             co_return {ec, total};
     894              :         }
     895              : 
     896              :         if(! bp.more())
     897              :         {
     898              :             // Last window — send atomically with EOF
     899              :             auto [ec, n] = co_await write_eof_buffers_(bufs);
     900              :             total += n;
     901              :             co_return {ec, total};
     902              :         }
     903              : 
     904              :         auto [ec, n] = co_await write_(bufs);
     905              :         total += n;
     906              :         if(ec)
     907              :             co_return {ec, total};
     908              :         bp.consume(n);
     909              :     }
     910           52 : }
     911              : 
     912              : } // namespace capy
     913              : } // namespace boost
     914              : 
     915              : #endif
        

Generated by: LCOV version 2.3