LCOV - code coverage report
Current view: top level - capy/io - any_read_source.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 91.1 % 135 123
Test Date: 2026-02-07 18:59:16 Functions: 82.4 % 51 42

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_array.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/concept/io_awaitable.hpp>
      19              : #include <boost/capy/concept/read_source.hpp>
      20              : #include <boost/capy/coro.hpp>
      21              : #include <boost/capy/ex/executor_ref.hpp>
      22              : #include <boost/capy/io_result.hpp>
      23              : #include <boost/capy/io_task.hpp>
      24              : 
      25              : #include <concepts>
      26              : #include <coroutine>
      27              : #include <cstddef>
      28              : #include <new>
      29              : #include <span>
      30              : #include <stop_token>
      31              : #include <system_error>
      32              : #include <utility>
      33              : 
      34              : namespace boost {
      35              : namespace capy {
      36              : 
      37              : /** Type-erased wrapper for any ReadSource.
      38              : 
      39              :     This class provides type erasure for any type satisfying the
      40              :     @ref ReadSource concept, enabling runtime polymorphism for
      41              :     source read operations. It uses cached awaitable storage to achieve
      42              :     zero steady-state allocation after construction.
      43              : 
      44              :     The wrapper supports two construction modes:
      45              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      46              :       allocates storage and owns the source.
      47              :     - **Reference**: Pass a pointer to wrap without ownership. The
      48              :       pointed-to source must outlive this wrapper.
      49              : 
      50              :     @par Awaitable Preallocation
      51              :     The constructor preallocates storage for the type-erased awaitable.
      52              :     This reserves all virtual address space at server startup
      53              :     so memory usage can be measured up front, rather than
      54              :     allocating piecemeal as traffic arrives.
      55              : 
      56              :     @par Immediate Completion
      57              :     Operations complete immediately without suspending when the
      58              :     buffer sequence is empty, or when the underlying source's
      59              :     awaitable reports readiness via `await_ready`.
      60              : 
      61              :     @par Thread Safety
      62              :     Not thread-safe. Concurrent operations on the same wrapper
      63              :     are undefined behavior.
      64              : 
      65              :     @par Example
      66              :     @code
      67              :     // Owning - takes ownership of the source
      68              :     any_read_source rs(some_source{args...});
      69              : 
      70              :     // Reference - wraps without ownership
      71              :     some_source source;
      72              :     any_read_source rs(&source);
      73              : 
      74              :     mutable_buffer buf(data, size);
      75              :     auto [ec, n] = co_await rs.read(std::span(&buf, 1));
      76              :     @endcode
      77              : 
      78              :     @see any_read_stream, ReadSource
      79              : */
      80              : class any_read_source
      81              : {
      82              :     struct vtable;
      83              :     struct awaitable_ops;
      84              : 
      85              :     template<ReadSource S>
      86              :     struct vtable_for_impl;
      87              : 
      88              :     void* source_ = nullptr;
      89              :     vtable const* vt_ = nullptr;
      90              :     void* cached_awaitable_ = nullptr;
      91              :     void* storage_ = nullptr;
      92              :     awaitable_ops const* active_ops_ = nullptr;
      93              : 
      94              : public:
      95              :     /** Destructor.
      96              : 
      97              :         Destroys the owned source (if any) and releases the cached
      98              :         awaitable storage.
      99              :     */
     100              :     ~any_read_source();
     101              : 
     102              :     /** Default constructor.
     103              : 
     104              :         Constructs an empty wrapper. Operations on a default-constructed
     105              :         wrapper result in undefined behavior.
     106              :     */
     107              :     any_read_source() = default;
     108              : 
     109              :     /** Non-copyable.
     110              : 
     111              :         The awaitable cache is per-instance and cannot be shared.
     112              :     */
     113              :     any_read_source(any_read_source const&) = delete;
     114              :     any_read_source& operator=(any_read_source const&) = delete;
     115              : 
     116              :     /** Move constructor.
     117              : 
     118              :         Transfers ownership of the wrapped source (if owned) and
     119              :         cached awaitable storage from `other`. After the move, `other` is
     120              :         in a default-constructed state.
     121              : 
     122              :         @param other The wrapper to move from.
     123              :     */
     124            1 :     any_read_source(any_read_source&& other) noexcept
     125            1 :         : source_(std::exchange(other.source_, nullptr))
     126            1 :         , vt_(std::exchange(other.vt_, nullptr))
     127            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     128            1 :         , storage_(std::exchange(other.storage_, nullptr))
     129            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     130              :     {
     131            1 :     }
     132              : 
     133              :     /** Move assignment operator.
     134              : 
     135              :         Destroys any owned source and releases existing resources,
     136              :         then transfers ownership from `other`.
     137              : 
     138              :         @param other The wrapper to move from.
     139              :         @return Reference to this wrapper.
     140              :     */
     141              :     any_read_source&
     142              :     operator=(any_read_source&& other) noexcept;
     143              : 
     144              :     /** Construct by taking ownership of a ReadSource.
     145              : 
     146              :         Allocates storage and moves the source into this wrapper.
     147              :         The wrapper owns the source and will destroy it.
     148              : 
     149              :         @param s The source to take ownership of.
     150              :     */
     151              :     template<ReadSource S>
     152              :         requires (!std::same_as<std::decay_t<S>, any_read_source>)
     153              :     any_read_source(S s);
     154              : 
     155              :     /** Construct by wrapping a ReadSource without ownership.
     156              : 
     157              :         Wraps the given source by pointer. The source must remain
     158              :         valid for the lifetime of this wrapper.
     159              : 
     160              :         @param s Pointer to the source to wrap.
     161              :     */
     162              :     template<ReadSource S>
     163              :     any_read_source(S* s);
     164              : 
     165              :     /** Check if the wrapper contains a valid source.
     166              : 
     167              :         @return `true` if wrapping a source, `false` if default-constructed
     168              :             or moved-from.
     169              :     */
     170              :     bool
     171           27 :     has_value() const noexcept
     172              :     {
     173           27 :         return source_ != nullptr;
     174              :     }
     175              : 
     176              :     /** Check if the wrapper contains a valid source.
     177              : 
     178              :         @return `true` if wrapping a source, `false` if default-constructed
     179              :             or moved-from.
     180              :     */
     181              :     explicit
     182            8 :     operator bool() const noexcept
     183              :     {
     184            8 :         return has_value();
     185              :     }
     186              : 
     187              :     /** Initiate a partial read operation.
     188              : 
     189              :         Reads one or more bytes into the provided buffer sequence.
     190              :         May fill less than the full sequence.
     191              : 
     192              :         @param buffers The buffer sequence to read into.
     193              : 
     194              :         @return An awaitable yielding `(error_code,std::size_t)`.
     195              : 
     196              :         @par Immediate Completion
     197              :         The operation completes immediately without suspending
     198              :         the calling coroutine when:
     199              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     200              :         @li The underlying source's awaitable reports immediate
     201              :             readiness via `await_ready`.
     202              : 
     203              :         @note This is a partial operation and may not process the
     204              :         entire buffer sequence. Use @ref read for guaranteed
     205              :         complete transfer.
     206              : 
     207              :         @par Preconditions
     208              :         The wrapper must contain a valid source (`has_value() == true`).
     209              :         The caller must not call this function again after a prior
     210              :         call returned an error (including EOF).
     211              :     */
     212              :     template<MutableBufferSequence MB>
     213              :     auto
     214              :     read_some(MB buffers);
     215              : 
     216              :     /** Initiate a complete read operation.
     217              : 
     218              :         Reads data into the provided buffer sequence by forwarding
     219              :         to the underlying source's `read` operation. Large buffer
     220              :         sequences are processed in windows, with each window
     221              :         forwarded as a separate `read` call to the underlying source.
     222              :         The operation completes when the entire buffer sequence is
     223              :         filled, end-of-file is reached, or an error occurs.
     224              : 
     225              :         @param buffers The buffer sequence to read into.
     226              : 
     227              :         @return An awaitable yielding `(error_code,std::size_t)`.
     228              : 
     229              :         @par Immediate Completion
     230              :         The operation completes immediately without suspending
     231              :         the calling coroutine when:
     232              :         @li The buffer sequence is empty, returning `{error_code{}, 0}`.
     233              :         @li The underlying source's `read` awaitable reports
     234              :             immediate readiness via `await_ready`.
     235              : 
     236              :         @par Postconditions
     237              :         Exactly one of the following is true on return:
     238              :         @li **Success**: `!ec` and `n == buffer_size(buffers)`.
     239              :             The entire buffer was filled.
     240              :         @li **End-of-stream or Error**: `ec` and `n` indicates
     241              :             the number of bytes transferred before the failure.
     242              : 
     243              :         @par Preconditions
     244              :         The wrapper must contain a valid source (`has_value() == true`).
     245              :         The caller must not call this function again after a prior
     246              :         call returned an error (including EOF).
     247              :     */
     248              :     template<MutableBufferSequence MB>
     249              :     io_task<std::size_t>
     250              :     read(MB buffers);
     251              : 
     252              : protected:
     253              :     /** Rebind to a new source after move.
     254              : 
     255              :         Updates the internal pointer to reference a new source object.
     256              :         Used by owning wrappers after move assignment when the owned
     257              :         object has moved to a new location.
     258              : 
     259              :         @param new_source The new source to bind to. Must be the same
     260              :             type as the original source.
     261              : 
     262              :         @note Terminates if called with a source of different type
     263              :             than the original.
     264              :     */
     265              :     template<ReadSource S>
     266              :     void
     267              :     rebind(S& new_source) noexcept
     268              :     {
     269              :         if(vt_ != &vtable_for_impl<S>::value)
     270              :             std::terminate();
     271              :         source_ = &new_source;
     272              :     }
     273              : 
     274              : private:
     275              :     auto
     276              :     read_(std::span<mutable_buffer const> buffers);
     277              : };
     278              : 
     279              : //----------------------------------------------------------
     280              : 
     281              : // ordered by call sequence for cache line coherence
     282              : struct any_read_source::awaitable_ops
     283              : {
     284              :     bool (*await_ready)(void*);
     285              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     286              :     io_result<std::size_t> (*await_resume)(void*);
     287              :     void (*destroy)(void*) noexcept;
     288              : };
     289              : 
     290              : // ordered by call frequency for cache line coherence
     291              : struct any_read_source::vtable
     292              : {
     293              :     awaitable_ops const* (*construct_read_some_awaitable)(
     294              :         void* source,
     295              :         void* storage,
     296              :         std::span<mutable_buffer const> buffers);
     297              :     awaitable_ops const* (*construct_read_awaitable)(
     298              :         void* source,
     299              :         void* storage,
     300              :         std::span<mutable_buffer const> buffers);
     301              :     std::size_t awaitable_size;
     302              :     std::size_t awaitable_align;
     303              :     void (*destroy)(void*) noexcept;
     304              : };
     305              : 
     306              : template<ReadSource S>
     307              : struct any_read_source::vtable_for_impl
     308              : {
     309              :     using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
     310              :         std::span<mutable_buffer const>{}));
     311              :     using ReadAwaitable = decltype(std::declval<S&>().read(
     312              :         std::span<mutable_buffer const>{}));
     313              : 
     314              :     static void
     315            6 :     do_destroy_impl(void* source) noexcept
     316              :     {
     317            6 :         static_cast<S*>(source)->~S();
     318            6 :     }
     319              : 
     320              :     static awaitable_ops const*
     321           52 :     construct_read_some_awaitable_impl(
     322              :         void* source,
     323              :         void* storage,
     324              :         std::span<mutable_buffer const> buffers)
     325              :     {
     326           52 :         auto& s = *static_cast<S*>(source);
     327           52 :         ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
     328              : 
     329              :         static constexpr awaitable_ops ops = {
     330           52 :             +[](void* p) {
     331           52 :                 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
     332              :             },
     333            2 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     334            4 :                 return detail::call_await_suspend(
     335            2 :                     static_cast<ReadSomeAwaitable*>(p), h, ex, token);
     336              :             },
     337           50 :             +[](void* p) {
     338           50 :                 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
     339              :             },
     340           54 :             +[](void* p) noexcept {
     341            2 :                 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
     342              :             }
     343              :         };
     344           52 :         return &ops;
     345              :     }
     346              : 
     347              :     static awaitable_ops const*
     348          116 :     construct_read_awaitable_impl(
     349              :         void* source,
     350              :         void* storage,
     351              :         std::span<mutable_buffer const> buffers)
     352              :     {
     353          116 :         auto& s = *static_cast<S*>(source);
     354          116 :         ::new(storage) ReadAwaitable(s.read(buffers));
     355              : 
     356              :         static constexpr awaitable_ops ops = {
     357          116 :             +[](void* p) {
     358          116 :                 return static_cast<ReadAwaitable*>(p)->await_ready();
     359              :             },
     360            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     361            0 :                 return detail::call_await_suspend(
     362            0 :                     static_cast<ReadAwaitable*>(p), h, ex, token);
     363              :             },
     364          116 :             +[](void* p) {
     365          116 :                 return static_cast<ReadAwaitable*>(p)->await_resume();
     366              :             },
     367          116 :             +[](void* p) noexcept {
     368            0 :                 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
     369              :             }
     370              :         };
     371          116 :         return &ops;
     372              :     }
     373              : 
     374              :     static constexpr std::size_t max_awaitable_size =
     375              :         sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
     376              :             ? sizeof(ReadSomeAwaitable)
     377              :             : sizeof(ReadAwaitable);
     378              :     static constexpr std::size_t max_awaitable_align =
     379              :         alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
     380              :             ? alignof(ReadSomeAwaitable)
     381              :             : alignof(ReadAwaitable);
     382              : 
     383              :     static constexpr vtable value = {
     384              :         &construct_read_some_awaitable_impl,
     385              :         &construct_read_awaitable_impl,
     386              :         max_awaitable_size,
     387              :         max_awaitable_align,
     388              :         &do_destroy_impl
     389              :     };
     390              : };
     391              : 
     392              : //----------------------------------------------------------
     393              : 
     394              : inline
     395          145 : any_read_source::~any_read_source()
     396              : {
     397          145 :     if(storage_)
     398              :     {
     399            6 :         vt_->destroy(source_);
     400            6 :         ::operator delete(storage_);
     401              :     }
     402          145 :     if(cached_awaitable_)
     403              :     {
     404          139 :         if(active_ops_)
     405            1 :             active_ops_->destroy(cached_awaitable_);
     406          139 :         ::operator delete(cached_awaitable_);
     407              :     }
     408          145 : }
     409              : 
     410              : inline any_read_source&
     411            4 : any_read_source::operator=(any_read_source&& other) noexcept
     412              : {
     413            4 :     if(this != &other)
     414              :     {
     415            3 :         if(storage_)
     416              :         {
     417            0 :             vt_->destroy(source_);
     418            0 :             ::operator delete(storage_);
     419              :         }
     420            3 :         if(cached_awaitable_)
     421              :         {
     422            2 :             if(active_ops_)
     423            1 :                 active_ops_->destroy(cached_awaitable_);
     424            2 :             ::operator delete(cached_awaitable_);
     425              :         }
     426            3 :         source_ = std::exchange(other.source_, nullptr);
     427            3 :         vt_ = std::exchange(other.vt_, nullptr);
     428            3 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     429            3 :         storage_ = std::exchange(other.storage_, nullptr);
     430            3 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     431              :     }
     432            4 :     return *this;
     433              : }
     434              : 
     435              : template<ReadSource S>
     436              :     requires (!std::same_as<std::decay_t<S>, any_read_source>)
     437            6 : any_read_source::any_read_source(S s)
     438            6 :     : vt_(&vtable_for_impl<S>::value)
     439              : {
     440              :     struct guard {
     441              :         any_read_source* self;
     442              :         bool committed = false;
     443            6 :         ~guard() {
     444            6 :             if(!committed && self->storage_) {
     445            0 :                 self->vt_->destroy(self->source_);
     446            0 :                 ::operator delete(self->storage_);
     447            0 :                 self->storage_ = nullptr;
     448            0 :                 self->source_ = nullptr;
     449              :             }
     450            6 :         }
     451            6 :     } g{this};
     452              : 
     453            6 :     storage_ = ::operator new(sizeof(S));
     454            6 :     source_ = ::new(storage_) S(std::move(s));
     455              : 
     456              :     // Preallocate the awaitable storage
     457            6 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     458              : 
     459            6 :     g.committed = true;
     460            6 : }
     461              : 
     462              : template<ReadSource S>
     463          135 : any_read_source::any_read_source(S* s)
     464          135 :     : source_(s)
     465          135 :     , vt_(&vtable_for_impl<S>::value)
     466              : {
     467              :     // Preallocate the awaitable storage
     468          135 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     469          135 : }
     470              : 
     471              : //----------------------------------------------------------
     472              : 
     473              : template<MutableBufferSequence MB>
     474              : auto
     475           54 : any_read_source::read_some(MB buffers)
     476              : {
     477              :     struct awaitable
     478              :     {
     479              :         any_read_source* self_;
     480              :         mutable_buffer_array<detail::max_iovec_> ba_;
     481              : 
     482           54 :         awaitable(any_read_source* self, MB const& buffers)
     483           54 :             : self_(self)
     484           54 :             , ba_(buffers)
     485              :         {
     486           54 :         }
     487              : 
     488              :         bool
     489           54 :         await_ready() const noexcept
     490              :         {
     491           54 :             return ba_.to_span().empty();
     492              :         }
     493              : 
     494              :         coro
     495           52 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     496              :         {
     497           52 :             self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
     498           52 :                 self_->source_,
     499           52 :                 self_->cached_awaitable_,
     500           52 :                 ba_.to_span());
     501              : 
     502           52 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     503           50 :                 return h;
     504              : 
     505            4 :             return self_->active_ops_->await_suspend(
     506            2 :                 self_->cached_awaitable_, h, ex, token);
     507              :         }
     508              : 
     509              :         io_result<std::size_t>
     510           52 :         await_resume()
     511              :         {
     512           52 :             if(ba_.to_span().empty())
     513            2 :                 return {{}, 0};
     514              : 
     515              :             struct guard {
     516              :                 any_read_source* self;
     517           50 :                 ~guard() {
     518           50 :                     self->active_ops_->destroy(self->cached_awaitable_);
     519           50 :                     self->active_ops_ = nullptr;
     520           50 :                 }
     521           50 :             } g{self_};
     522           50 :             return self_->active_ops_->await_resume(
     523           50 :                 self_->cached_awaitable_);
     524           50 :         }
     525              :     };
     526           54 :     return awaitable(this, buffers);
     527              : }
     528              : 
     529              : inline auto
     530          116 : any_read_source::read_(std::span<mutable_buffer const> buffers)
     531              : {
     532              :     struct awaitable
     533              :     {
     534              :         any_read_source* self_;
     535              :         std::span<mutable_buffer const> buffers_;
     536              : 
     537              :         bool
     538          116 :         await_ready() const noexcept
     539              :         {
     540          116 :             return false;
     541              :         }
     542              : 
     543              :         coro
     544          116 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     545              :         {
     546          232 :             self_->active_ops_ = self_->vt_->construct_read_awaitable(
     547          116 :                 self_->source_,
     548          116 :                 self_->cached_awaitable_,
     549              :                 buffers_);
     550              : 
     551          116 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     552          116 :                 return h;
     553              : 
     554            0 :             return self_->active_ops_->await_suspend(
     555            0 :                 self_->cached_awaitable_, h, ex, token);
     556              :         }
     557              : 
     558              :         io_result<std::size_t>
     559          116 :         await_resume()
     560              :         {
     561              :             struct guard {
     562              :                 any_read_source* self;
     563          116 :                 ~guard() {
     564          116 :                     self->active_ops_->destroy(self->cached_awaitable_);
     565          116 :                     self->active_ops_ = nullptr;
     566          116 :                 }
     567          116 :             } g{self_};
     568          116 :             return self_->active_ops_->await_resume(
     569          200 :                 self_->cached_awaitable_);
     570          116 :         }
     571              :     };
     572          116 :     return awaitable{this, buffers};
     573              : }
     574              : 
     575              : template<MutableBufferSequence MB>
     576              : io_task<std::size_t>
     577          110 : any_read_source::read(MB buffers)
     578              : {
     579              :     buffer_param bp(buffers);
     580              :     std::size_t total = 0;
     581              : 
     582              :     for(;;)
     583              :     {
     584              :         auto bufs = bp.data();
     585              :         if(bufs.empty())
     586              :             break;
     587              : 
     588              :         auto [ec, n] = co_await read_(bufs);
     589              :         total += n;
     590              :         if(ec)
     591              :             co_return {ec, total};
     592              :         bp.consume(n);
     593              :     }
     594              : 
     595              :     co_return {{}, total};
     596          220 : }
     597              : 
     598              : } // namespace capy
     599              : } // namespace boost
     600              : 
     601              : #endif
        

Generated by: LCOV version 2.3