LCOV - code coverage report
Current view: top level - capy/io - any_buffer_source.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 86.6 % 172 149
Test Date: 2026-02-07 18:59:16 Functions: 88.2 % 51 45

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_copy.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/buffers/slice.hpp>
      19              : #include <boost/capy/concept/buffer_source.hpp>
      20              : #include <boost/capy/concept/io_awaitable.hpp>
      21              : #include <boost/capy/concept/read_source.hpp>
      22              : #include <boost/capy/coro.hpp>
      23              : #include <boost/capy/error.hpp>
      24              : #include <boost/capy/ex/executor_ref.hpp>
      25              : #include <boost/capy/io_result.hpp>
      26              : #include <boost/capy/io_task.hpp>
      27              : 
      28              : #include <concepts>
      29              : #include <coroutine>
      30              : #include <cstddef>
      31              : #include <exception>
      32              : #include <new>
      33              : #include <span>
      34              : #include <stop_token>
      35              : #include <system_error>
      36              : #include <utility>
      37              : 
      38              : namespace boost {
      39              : namespace capy {
      40              : 
      41              : /** Type-erased wrapper for any BufferSource.
      42              : 
      43              :     This class provides type erasure for any type satisfying the
      44              :     @ref BufferSource concept, enabling runtime polymorphism for
      45              :     buffer pull operations. It uses cached awaitable storage to achieve
      46              :     zero steady-state allocation after construction.
      47              : 
      48              :     The wrapper also satisfies @ref ReadSource. When the wrapped type
      49              :     satisfies only @ref BufferSource, the read operations are
      50              :     synthesized using @ref pull and @ref consume with an extra
      51              :     buffer copy. When the wrapped type satisfies both @ref BufferSource
      52              :     and @ref ReadSource, the native read operations are forwarded
      53              :     directly across the virtual boundary, avoiding the copy.
      54              : 
      55              :     The wrapper supports two construction modes:
      56              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      57              :       allocates storage and owns the source.
      58              :     - **Reference**: Pass a pointer to wrap without ownership. The
      59              :       pointed-to source must outlive this wrapper.
      60              : 
      61              :     Within each mode, the vtable is populated at compile time based
      62              :     on whether the wrapped type also satisfies @ref ReadSource:
      63              :     - **BufferSource only**: @ref read_some and @ref read are
      64              :       synthesized from @ref pull and @ref consume, incurring one
      65              :       buffer copy per operation.
      66              :     - **BufferSource + ReadSource**: All read operations are
      67              :       forwarded natively through the type-erased boundary with
      68              :       no extra copy.
      69              : 
      70              :     @par Awaitable Preallocation
      71              :     The constructor preallocates storage for the type-erased awaitable.
      72              :     This reserves all virtual address space at server startup
      73              :     so memory usage can be measured up front, rather than
      74              :     allocating piecemeal as traffic arrives.
      75              : 
      76              :     @par Thread Safety
      77              :     Not thread-safe. Concurrent operations on the same wrapper
      78              :     are undefined behavior.
      79              : 
      80              :     @par Example
      81              :     @code
      82              :     // Owning - takes ownership of the source
      83              :     any_buffer_source abs(some_buffer_source{args...});
      84              : 
      85              :     // Reference - wraps without ownership
      86              :     some_buffer_source src;
      87              :     any_buffer_source abs(&src);
      88              : 
      89              :     const_buffer arr[16];
      90              :     auto [ec, bufs] = co_await abs.pull(arr);
      91              : 
      92              :     // ReadSource interface also available
      93              :     char buf[64];
      94              :     auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
      95              :     @endcode
      96              : 
      97              :     @see any_buffer_sink, BufferSource, ReadSource
      98              : */
      99              : class any_buffer_source
     100              : {
     101              :     struct vtable;
     102              :     struct awaitable_ops;
     103              :     struct read_awaitable_ops;
     104              : 
     105              :     template<BufferSource S>
     106              :     struct vtable_for_impl;
     107              : 
     108              :     // hot-path members first for cache locality
     109              :     void* source_ = nullptr;
     110              :     vtable const* vt_ = nullptr;
     111              :     void* cached_awaitable_ = nullptr;
     112              :     awaitable_ops const* active_ops_ = nullptr;
     113              :     read_awaitable_ops const* active_read_ops_ = nullptr;
     114              :     void* storage_ = nullptr;
     115              : 
     116              : public:
     117              :     /** Destructor.
     118              : 
     119              :         Destroys the owned source (if any) and releases the cached
     120              :         awaitable storage.
     121              :     */
     122              :     ~any_buffer_source();
     123              : 
     124              :     /** Default constructor.
     125              : 
     126              :         Constructs an empty wrapper. Operations on a default-constructed
     127              :         wrapper result in undefined behavior.
     128              :     */
     129              :     any_buffer_source() = default;
     130              : 
     131              :     /** Non-copyable.
     132              : 
     133              :         The awaitable cache is per-instance and cannot be shared.
     134              :     */
     135              :     any_buffer_source(any_buffer_source const&) = delete;
     136              :     any_buffer_source& operator=(any_buffer_source const&) = delete;
     137              : 
     138              :     /** Move constructor.
     139              : 
     140              :         Transfers ownership of the wrapped source (if owned) and
     141              :         cached awaitable storage from `other`. After the move, `other` is
     142              :         in a default-constructed state.
     143              : 
     144              :         @param other The wrapper to move from.
     145              :     */
     146            2 :     any_buffer_source(any_buffer_source&& other) noexcept
     147            2 :         : source_(std::exchange(other.source_, nullptr))
     148            2 :         , vt_(std::exchange(other.vt_, nullptr))
     149            2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     150            2 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     151            2 :         , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
     152            2 :         , storage_(std::exchange(other.storage_, nullptr))
     153              :     {
     154            2 :     }
     155              : 
     156              :     /** Move assignment operator.
     157              : 
     158              :         Destroys any owned source and releases existing resources,
     159              :         then transfers ownership from `other`.
     160              : 
     161              :         @param other The wrapper to move from.
     162              :         @return Reference to this wrapper.
     163              :     */
     164              :     any_buffer_source&
     165              :     operator=(any_buffer_source&& other) noexcept;
     166              : 
     167              :     /** Construct by taking ownership of a BufferSource.
     168              : 
     169              :         Allocates storage and moves the source into this wrapper.
     170              :         The wrapper owns the source and will destroy it. If `S` also
     171              :         satisfies @ref ReadSource, native read operations are
     172              :         forwarded through the virtual boundary.
     173              : 
     174              :         @param s The source to take ownership of.
     175              :     */
     176              :     template<BufferSource S>
     177              :         requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     178              :     any_buffer_source(S s);
     179              : 
     180              :     /** Construct by wrapping a BufferSource without ownership.
     181              : 
     182              :         Wraps the given source by pointer. The source must remain
     183              :         valid for the lifetime of this wrapper. If `S` also
     184              :         satisfies @ref ReadSource, native read operations are
     185              :         forwarded through the virtual boundary.
     186              : 
     187              :         @param s Pointer to the source to wrap.
     188              :     */
     189              :     template<BufferSource S>
     190              :     any_buffer_source(S* s);
     191              : 
     192              :     /** Check if the wrapper contains a valid source.
     193              : 
     194              :         @return `true` if wrapping a source, `false` if default-constructed
     195              :             or moved-from.
     196              :     */
     197              :     bool
     198           16 :     has_value() const noexcept
     199              :     {
     200           16 :         return source_ != nullptr;
     201              :     }
     202              : 
     203              :     /** Check if the wrapper contains a valid source.
     204              : 
     205              :         @return `true` if wrapping a source, `false` if default-constructed
     206              :             or moved-from.
     207              :     */
     208              :     explicit
     209            2 :     operator bool() const noexcept
     210              :     {
     211            2 :         return has_value();
     212              :     }
     213              : 
     214              :     /** Consume bytes from the source.
     215              : 
     216              :         Advances the internal read position of the underlying source
     217              :         by the specified number of bytes. The next call to @ref pull
     218              :         returns data starting after the consumed bytes.
     219              : 
     220              :         @param n The number of bytes to consume. Must not exceed the
     221              :         total size of buffers returned by the previous @ref pull.
     222              : 
     223              :         @par Preconditions
     224              :         The wrapper must contain a valid source (`has_value() == true`).
     225              :     */
     226              :     void
     227              :     consume(std::size_t n) noexcept;
     228              : 
     229              :     /** Pull buffer data from the source.
     230              : 
     231              :         Fills the provided span with buffer descriptors from the
     232              :         underlying source. The operation completes when data is
     233              :         available, the source is exhausted, or an error occurs.
     234              : 
     235              :         @param dest Span of const_buffer to fill.
     236              : 
     237              :         @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
     238              :             On success with data, a non-empty span of filled buffers.
     239              :             On EOF, `ec == cond::eof` and span is empty.
     240              : 
     241              :         @par Preconditions
     242              :         The wrapper must contain a valid source (`has_value() == true`).
     243              :         The caller must not call this function again after a prior
     244              :         call returned an error.
     245              :     */
     246              :     auto
     247              :     pull(std::span<const_buffer> dest);
     248              : 
     249              :     /** Read some data into a mutable buffer sequence.
     250              : 
     251              :         Reads one or more bytes into the caller's buffers. May fill
     252              :         less than the full sequence.
     253              : 
     254              :         When the wrapped type provides native @ref ReadSource support,
     255              :         the operation forwards directly. Otherwise it is synthesized
     256              :         from @ref pull, @ref buffer_copy, and @ref consume.
     257              : 
     258              :         @param buffers The buffer sequence to fill.
     259              : 
     260              :         @return An awaitable yielding `(error_code,std::size_t)`.
     261              : 
     262              :         @par Preconditions
     263              :         The wrapper must contain a valid source (`has_value() == true`).
     264              :         The caller must not call this function again after a prior
     265              :         call returned an error (including EOF).
     266              : 
     267              :         @see pull, consume
     268              :     */
     269              :     template<MutableBufferSequence MB>
     270              :     io_task<std::size_t>
     271              :     read_some(MB buffers);
     272              : 
     273              :     /** Read data into a mutable buffer sequence.
     274              : 
     275              :         Fills the provided buffer sequence completely. When the
     276              :         wrapped type provides native @ref ReadSource support, each
     277              :         window is forwarded directly. Otherwise the data is
     278              :         synthesized from @ref pull, @ref buffer_copy, and @ref consume.
     279              : 
     280              :         @param buffers The buffer sequence to fill.
     281              : 
     282              :         @return An awaitable yielding `(error_code,std::size_t)`.
     283              :             On success, `n == buffer_size(buffers)`.
     284              :             On EOF, `ec == error::eof` and `n` is bytes transferred.
     285              : 
     286              :         @par Preconditions
     287              :         The wrapper must contain a valid source (`has_value() == true`).
     288              :         The caller must not call this function again after a prior
     289              :         call returned an error (including EOF).
     290              : 
     291              :         @see pull, consume
     292              :     */
     293              :     template<MutableBufferSequence MB>
     294              :     io_task<std::size_t>
     295              :     read(MB buffers);
     296              : 
     297              : protected:
     298              :     /** Rebind to a new source after move.
     299              : 
     300              :         Updates the internal pointer to reference a new source object.
     301              :         Used by owning wrappers after move assignment when the owned
     302              :         object has moved to a new location.
     303              : 
     304              :         @param new_source The new source to bind to. Must be the same
     305              :             type as the original source.
     306              : 
     307              :         @note Terminates if called with a source of different type
     308              :             than the original.
     309              :     */
     310              :     template<BufferSource S>
     311              :     void
     312              :     rebind(S& new_source) noexcept
     313              :     {
     314              :         if(vt_ != &vtable_for_impl<S>::value)
     315              :             std::terminate();
     316              :         source_ = &new_source;
     317              :     }
     318              : 
     319              : private:
     320              :     /** Forward a partial read through the vtable.
     321              : 
     322              :         Constructs the underlying `read_some` awaitable in
     323              :         cached storage and returns a type-erased awaitable.
     324              :     */
     325              :     auto
     326              :     read_some_(std::span<mutable_buffer const> buffers);
     327              : 
     328              :     /** Forward a complete read through the vtable.
     329              : 
     330              :         Constructs the underlying `read` awaitable in
     331              :         cached storage and returns a type-erased awaitable.
     332              :     */
     333              :     auto
     334              :     read_(std::span<mutable_buffer const> buffers);
     335              : };
     336              : 
     337              : //----------------------------------------------------------
     338              : 
     339              : /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
     340              : struct any_buffer_source::awaitable_ops
     341              : {
     342              :     bool (*await_ready)(void*);
     343              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     344              :     io_result<std::span<const_buffer>> (*await_resume)(void*);
     345              :     void (*destroy)(void*) noexcept;
     346              : };
     347              : 
     348              : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
     349              : struct any_buffer_source::read_awaitable_ops
     350              : {
     351              :     bool (*await_ready)(void*);
     352              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     353              :     io_result<std::size_t> (*await_resume)(void*);
     354              :     void (*destroy)(void*) noexcept;
     355              : };
     356              : 
     357              : struct any_buffer_source::vtable
     358              : {
     359              :     // BufferSource ops (always populated)
     360              :     void (*destroy)(void*) noexcept;
     361              :     void (*do_consume)(void* source, std::size_t n) noexcept;
     362              :     std::size_t awaitable_size;
     363              :     std::size_t awaitable_align;
     364              :     awaitable_ops const* (*construct_awaitable)(
     365              :         void* source,
     366              :         void* storage,
     367              :         std::span<const_buffer> dest);
     368              : 
     369              :     // ReadSource forwarding (null when wrapped type is BufferSource-only)
     370              :     read_awaitable_ops const* (*construct_read_some_awaitable)(
     371              :         void* source,
     372              :         void* storage,
     373              :         std::span<mutable_buffer const> buffers);
     374              :     read_awaitable_ops const* (*construct_read_awaitable)(
     375              :         void* source,
     376              :         void* storage,
     377              :         std::span<mutable_buffer const> buffers);
     378              : };
     379              : 
     380              : template<BufferSource S>
     381              : struct any_buffer_source::vtable_for_impl
     382              : {
     383              :     using PullAwaitable = decltype(std::declval<S&>().pull(
     384              :         std::declval<std::span<const_buffer>>()));
     385              : 
     386              :     static void
     387            7 :     do_destroy_impl(void* source) noexcept
     388              :     {
     389            7 :         static_cast<S*>(source)->~S();
     390            7 :     }
     391              : 
     392              :     static void
     393           45 :     do_consume_impl(void* source, std::size_t n) noexcept
     394              :     {
     395           45 :         static_cast<S*>(source)->consume(n);
     396           45 :     }
     397              : 
     398              :     static awaitable_ops const*
     399          110 :     construct_awaitable_impl(
     400              :         void* source,
     401              :         void* storage,
     402              :         std::span<const_buffer> dest)
     403              :     {
     404          110 :         auto& s = *static_cast<S*>(source);
     405          110 :         ::new(storage) PullAwaitable(s.pull(dest));
     406              : 
     407              :         static constexpr awaitable_ops ops = {
     408          110 :             +[](void* p) {
     409          110 :                 return static_cast<PullAwaitable*>(p)->await_ready();
     410              :             },
     411            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     412            0 :                 return detail::call_await_suspend(
     413            0 :                     static_cast<PullAwaitable*>(p), h, ex, token);
     414              :             },
     415          110 :             +[](void* p) {
     416          110 :                 return static_cast<PullAwaitable*>(p)->await_resume();
     417              :             },
     418          110 :             +[](void* p) noexcept {
     419          110 :                 static_cast<PullAwaitable*>(p)->~PullAwaitable();
     420              :             }
     421              :         };
     422          110 :         return &ops;
     423              :     }
     424              : 
     425              :     //------------------------------------------------------
     426              :     // ReadSource forwarding (only instantiated when ReadSource<S>)
     427              : 
     428              :     static read_awaitable_ops const*
     429           48 :     construct_read_some_awaitable_impl(
     430              :         void* source,
     431              :         void* storage,
     432              :         std::span<mutable_buffer const> buffers)
     433              :         requires ReadSource<S>
     434              :     {
     435              :         using Aw = decltype(std::declval<S&>().read_some(
     436              :             std::span<mutable_buffer const>{}));
     437           48 :         auto& s = *static_cast<S*>(source);
     438           48 :         ::new(storage) Aw(s.read_some(buffers));
     439              : 
     440              :         static constexpr read_awaitable_ops ops = {
     441           48 :             +[](void* p) {
     442           48 :                 return static_cast<Aw*>(p)->await_ready();
     443              :             },
     444            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     445            0 :                 return detail::call_await_suspend(
     446            0 :                     static_cast<Aw*>(p), h, ex, token);
     447              :             },
     448           48 :             +[](void* p) {
     449           48 :                 return static_cast<Aw*>(p)->await_resume();
     450              :             },
     451           48 :             +[](void* p) noexcept {
     452           48 :                 static_cast<Aw*>(p)->~Aw();
     453              :             }
     454              :         };
     455           48 :         return &ops;
     456              :     }
     457              : 
     458              :     static read_awaitable_ops const*
     459           18 :     construct_read_awaitable_impl(
     460              :         void* source,
     461              :         void* storage,
     462              :         std::span<mutable_buffer const> buffers)
     463              :         requires ReadSource<S>
     464              :     {
     465              :         using Aw = decltype(std::declval<S&>().read(
     466              :             std::span<mutable_buffer const>{}));
     467           18 :         auto& s = *static_cast<S*>(source);
     468           18 :         ::new(storage) Aw(s.read(buffers));
     469              : 
     470              :         static constexpr read_awaitable_ops ops = {
     471           18 :             +[](void* p) {
     472           18 :                 return static_cast<Aw*>(p)->await_ready();
     473              :             },
     474            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     475            0 :                 return detail::call_await_suspend(
     476            0 :                     static_cast<Aw*>(p), h, ex, token);
     477              :             },
     478           18 :             +[](void* p) {
     479           18 :                 return static_cast<Aw*>(p)->await_resume();
     480              :             },
     481           18 :             +[](void* p) noexcept {
     482           18 :                 static_cast<Aw*>(p)->~Aw();
     483              :             }
     484              :         };
     485           18 :         return &ops;
     486              :     }
     487              : 
     488              :     //------------------------------------------------------
     489              : 
     490              :     static consteval std::size_t
     491              :     compute_max_size() noexcept
     492              :     {
     493              :         std::size_t s = sizeof(PullAwaitable);
     494              :         if constexpr (ReadSource<S>)
     495              :         {
     496              :             using RS = decltype(std::declval<S&>().read_some(
     497              :                 std::span<mutable_buffer const>{}));
     498              :             using R = decltype(std::declval<S&>().read(
     499              :                 std::span<mutable_buffer const>{}));
     500              : 
     501              :             if(sizeof(RS) > s) s = sizeof(RS);
     502              :             if(sizeof(R) > s) s = sizeof(R);
     503              :         }
     504              :         return s;
     505              :     }
     506              : 
     507              :     static consteval std::size_t
     508              :     compute_max_align() noexcept
     509              :     {
     510              :         std::size_t a = alignof(PullAwaitable);
     511              :         if constexpr (ReadSource<S>)
     512              :         {
     513              :             using RS = decltype(std::declval<S&>().read_some(
     514              :                 std::span<mutable_buffer const>{}));
     515              :             using R = decltype(std::declval<S&>().read(
     516              :                 std::span<mutable_buffer const>{}));
     517              : 
     518              :             if(alignof(RS) > a) a = alignof(RS);
     519              :             if(alignof(R) > a) a = alignof(R);
     520              :         }
     521              :         return a;
     522              :     }
     523              : 
     524              :     static consteval vtable
     525              :     make_vtable() noexcept
     526              :     {
     527              :         vtable v{};
     528              :         v.destroy = &do_destroy_impl;
     529              :         v.do_consume = &do_consume_impl;
     530              :         v.awaitable_size = compute_max_size();
     531              :         v.awaitable_align = compute_max_align();
     532              :         v.construct_awaitable = &construct_awaitable_impl;
     533              :         v.construct_read_some_awaitable = nullptr;
     534              :         v.construct_read_awaitable = nullptr;
     535              : 
     536              :         if constexpr (ReadSource<S>)
     537              :         {
     538              :             v.construct_read_some_awaitable =
     539              :                 &construct_read_some_awaitable_impl;
     540              :             v.construct_read_awaitable =
     541              :                 &construct_read_awaitable_impl;
     542              :         }
     543              :         return v;
     544              :     }
     545              : 
     546              :     static constexpr vtable value = make_vtable();
     547              : };
     548              : 
     549              : //----------------------------------------------------------
     550              : 
     551              : inline
     552          124 : any_buffer_source::~any_buffer_source()
     553              : {
     554          124 :     if(storage_)
     555              :     {
     556            7 :         vt_->destroy(source_);
     557            7 :         ::operator delete(storage_);
     558              :     }
     559          124 :     if(cached_awaitable_)
     560          119 :         ::operator delete(cached_awaitable_);
     561          124 : }
     562              : 
     563              : inline any_buffer_source&
     564            2 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
     565              : {
     566            2 :     if(this != &other)
     567              :     {
     568            2 :         if(storage_)
     569              :         {
     570            0 :             vt_->destroy(source_);
     571            0 :             ::operator delete(storage_);
     572              :         }
     573            2 :         if(cached_awaitable_)
     574            0 :             ::operator delete(cached_awaitable_);
     575            2 :         source_ = std::exchange(other.source_, nullptr);
     576            2 :         vt_ = std::exchange(other.vt_, nullptr);
     577            2 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     578            2 :         storage_ = std::exchange(other.storage_, nullptr);
     579            2 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     580            2 :         active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
     581              :     }
     582            2 :     return *this;
     583              : }
     584              : 
     585              : template<BufferSource S>
     586              :     requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     587            7 : any_buffer_source::any_buffer_source(S s)
     588            7 :     : vt_(&vtable_for_impl<S>::value)
     589              : {
     590              :     struct guard {
     591              :         any_buffer_source* self;
     592              :         bool committed = false;
     593            7 :         ~guard() {
     594            7 :             if(!committed && self->storage_) {
     595            0 :                 self->vt_->destroy(self->source_);
     596            0 :                 ::operator delete(self->storage_);
     597            0 :                 self->storage_ = nullptr;
     598            0 :                 self->source_ = nullptr;
     599              :             }
     600            7 :         }
     601            7 :     } g{this};
     602              : 
     603            7 :     storage_ = ::operator new(sizeof(S));
     604            7 :     source_ = ::new(storage_) S(std::move(s));
     605              : 
     606            7 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     607              : 
     608            7 :     g.committed = true;
     609            7 : }
     610              : 
     611              : template<BufferSource S>
     612          112 : any_buffer_source::any_buffer_source(S* s)
     613          112 :     : source_(s)
     614          112 :     , vt_(&vtable_for_impl<S>::value)
     615              : {
     616          112 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     617          112 : }
     618              : 
     619              : //----------------------------------------------------------
     620              : 
     621              : inline void
     622           45 : any_buffer_source::consume(std::size_t n) noexcept
     623              : {
     624           45 :     vt_->do_consume(source_, n);
     625           45 : }
     626              : 
     627              : inline auto
     628          110 : any_buffer_source::pull(std::span<const_buffer> dest)
     629              : {
     630              :     struct awaitable
     631              :     {
     632              :         any_buffer_source* self_;
     633              :         std::span<const_buffer> dest_;
     634              : 
     635              :         bool
     636          110 :         await_ready()
     637              :         {
     638          220 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     639          110 :                 self_->source_,
     640          110 :                 self_->cached_awaitable_,
     641              :                 dest_);
     642          110 :             return self_->active_ops_->await_ready(self_->cached_awaitable_);
     643              :         }
     644              : 
     645              :         coro
     646            0 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     647              :         {
     648            0 :             return self_->active_ops_->await_suspend(
     649            0 :                 self_->cached_awaitable_, h, ex, token);
     650              :         }
     651              : 
     652              :         io_result<std::span<const_buffer>>
     653          110 :         await_resume()
     654              :         {
     655              :             struct guard {
     656              :                 any_buffer_source* self;
     657          110 :                 ~guard() {
     658          110 :                     self->active_ops_->destroy(self->cached_awaitable_);
     659          110 :                     self->active_ops_ = nullptr;
     660          110 :                 }
     661          110 :             } g{self_};
     662          110 :             return self_->active_ops_->await_resume(
     663          195 :                 self_->cached_awaitable_);
     664          110 :         }
     665              :     };
     666          110 :     return awaitable{this, dest};
     667              : }
     668              : 
     669              : //----------------------------------------------------------
     670              : // Private helpers for native ReadSource forwarding
     671              : 
     672              : inline auto
     673           48 : any_buffer_source::read_some_(
     674              :     std::span<mutable_buffer const> buffers)
     675              : {
     676              :     struct awaitable
     677              :     {
     678              :         any_buffer_source* self_;
     679              :         std::span<mutable_buffer const> buffers_;
     680              : 
     681              :         bool
     682           48 :         await_ready() const noexcept
     683              :         {
     684           48 :             return false;
     685              :         }
     686              : 
     687              :         coro
     688           48 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     689              :         {
     690           96 :             self_->active_read_ops_ =
     691           96 :                 self_->vt_->construct_read_some_awaitable(
     692           48 :                     self_->source_,
     693           48 :                     self_->cached_awaitable_,
     694              :                     buffers_);
     695              : 
     696           48 :             if(self_->active_read_ops_->await_ready(
     697           48 :                 self_->cached_awaitable_))
     698           48 :                 return h;
     699              : 
     700            0 :             return self_->active_read_ops_->await_suspend(
     701            0 :                 self_->cached_awaitable_, h, ex, token);
     702              :         }
     703              : 
     704              :         io_result<std::size_t>
     705           48 :         await_resume()
     706              :         {
     707              :             struct guard {
     708              :                 any_buffer_source* self;
     709           48 :                 ~guard() {
     710           48 :                     self->active_read_ops_->destroy(
     711           48 :                         self->cached_awaitable_);
     712           48 :                     self->active_read_ops_ = nullptr;
     713           48 :                 }
     714           48 :             } g{self_};
     715           48 :             return self_->active_read_ops_->await_resume(
     716           88 :                 self_->cached_awaitable_);
     717           48 :         }
     718              :     };
     719           48 :     return awaitable{this, buffers};
     720              : }
     721              : 
     722              : inline auto
     723           18 : any_buffer_source::read_(
     724              :     std::span<mutable_buffer const> buffers)
     725              : {
     726              :     struct awaitable
     727              :     {
     728              :         any_buffer_source* self_;
     729              :         std::span<mutable_buffer const> buffers_;
     730              : 
     731              :         bool
     732           18 :         await_ready() const noexcept
     733              :         {
     734           18 :             return false;
     735              :         }
     736              : 
     737              :         coro
     738           18 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     739              :         {
     740           36 :             self_->active_read_ops_ =
     741           36 :                 self_->vt_->construct_read_awaitable(
     742           18 :                     self_->source_,
     743           18 :                     self_->cached_awaitable_,
     744              :                     buffers_);
     745              : 
     746           18 :             if(self_->active_read_ops_->await_ready(
     747           18 :                 self_->cached_awaitable_))
     748           18 :                 return h;
     749              : 
     750            0 :             return self_->active_read_ops_->await_suspend(
     751            0 :                 self_->cached_awaitable_, h, ex, token);
     752              :         }
     753              : 
     754              :         io_result<std::size_t>
     755           18 :         await_resume()
     756              :         {
     757              :             struct guard {
     758              :                 any_buffer_source* self;
     759           18 :                 ~guard() {
     760           18 :                     self->active_read_ops_->destroy(
     761           18 :                         self->cached_awaitable_);
     762           18 :                     self->active_read_ops_ = nullptr;
     763           18 :                 }
     764           18 :             } g{self_};
     765           18 :             return self_->active_read_ops_->await_resume(
     766           30 :                 self_->cached_awaitable_);
     767           18 :         }
     768              :     };
     769           18 :     return awaitable{this, buffers};
     770              : }
     771              : 
     772              : //----------------------------------------------------------
     773              : // Public ReadSource methods
     774              : 
     775              : template<MutableBufferSequence MB>
     776              : io_task<std::size_t>
     777           58 : any_buffer_source::read_some(MB buffers)
     778              : {
     779              :     buffer_param<MB> bp(buffers);
     780              :     auto dest = bp.data();
     781              :     if(dest.empty())
     782              :         co_return {{}, 0};
     783              : 
     784              :     // Native ReadSource path
     785              :     if(vt_->construct_read_some_awaitable)
     786              :         co_return co_await read_some_(dest);
     787              : 
     788              :     // Synthesized path: pull + buffer_copy + consume
     789              :     const_buffer arr[detail::max_iovec_];
     790              :     auto [ec, bufs] = co_await pull(arr);
     791              :     if(ec)
     792              :         co_return {ec, 0};
     793              : 
     794              :     auto n = buffer_copy(dest, bufs);
     795              :     consume(n);
     796              :     co_return {{}, n};
     797          116 : }
     798              : 
     799              : template<MutableBufferSequence MB>
     800              : io_task<std::size_t>
     801           24 : any_buffer_source::read(MB buffers)
     802              : {
     803              :     buffer_param<MB> bp(buffers);
     804              :     std::size_t total = 0;
     805              : 
     806              :     // Native ReadSource path
     807              :     if(vt_->construct_read_awaitable)
     808              :     {
     809              :         for(;;)
     810              :         {
     811              :             auto dest = bp.data();
     812              :             if(dest.empty())
     813              :                 break;
     814              : 
     815              :             auto [ec, n] = co_await read_(dest);
     816              :             total += n;
     817              :             if(ec)
     818              :                 co_return {ec, total};
     819              :             bp.consume(n);
     820              :         }
     821              :         co_return {{}, total};
     822              :     }
     823              : 
     824              :     // Synthesized path: pull + buffer_copy + consume
     825              :     for(;;)
     826              :     {
     827              :         auto dest = bp.data();
     828              :         if(dest.empty())
     829              :             break;
     830              : 
     831              :         const_buffer arr[detail::max_iovec_];
     832              :         auto [ec, bufs] = co_await pull(arr);
     833              : 
     834              :         if(ec)
     835              :             co_return {ec, total};
     836              : 
     837              :         auto n = buffer_copy(dest, bufs);
     838              :         consume(n);
     839              :         total += n;
     840              :         bp.consume(n);
     841              :     }
     842              : 
     843              :     co_return {{}, total};
     844           48 : }
     845              : 
     846              : //----------------------------------------------------------
     847              : 
     848              : static_assert(BufferSource<any_buffer_source>);
     849              : static_assert(ReadSource<any_buffer_source>);
     850              : 
     851              : } // namespace capy
     852              : } // namespace boost
     853              : 
     854              : #endif
        

Generated by: LCOV version 2.3