LCOV - code coverage report
Current view: top level - capy/io - write_now.hpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 90.5 % 74 67
Test Date: 2026-02-07 18:59:16 Functions: 88.6 % 35 31

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
      11              : #define BOOST_CAPY_IO_WRITE_NOW_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/consuming_buffers.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/write_stream.hpp>
      19              : #include <boost/capy/coro.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : 
      23              : #include <cstddef>
      24              : #include <exception>
      25              : #include <new>
      26              : #include <stop_token>
      27              : #include <utility>
      28              : 
      29              : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
      30              : # if defined(__GNUC__) && !defined(__clang__)
      31              : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
      32              : # else
      33              : #  define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
      34              : # endif
      35              : #endif
      36              : 
      37              : namespace boost {
      38              : namespace capy {
      39              : 
      40              : /** Eagerly writes complete buffer sequences with frame caching.
      41              : 
      42              :     This class wraps a @ref WriteStream and provides an `operator()`
      43              :     that writes an entire buffer sequence, attempting to complete
      44              :     synchronously. If every `write_some` completes without suspending,
      45              :     the entire operation finishes in `await_ready` with no coroutine
      46              :     suspension.
      47              : 
      48              :     The class maintains a one-element coroutine frame cache. After
      49              :     the first call, subsequent calls reuse the cached frame memory,
      50              :     avoiding repeated allocation for the internal coroutine.
      51              : 
      52              :     @tparam Stream The stream type, must satisfy @ref WriteStream.
      53              : 
      54              :     @par Thread Safety
      55              :     Distinct objects: Safe.
      56              :     Shared objects: Unsafe.
      57              : 
      58              :     @par Preconditions
      59              :     Only one operation may be outstanding at a time. A new call to
      60              :     `operator()` must not be made until the previous operation has
      61              :     completed (i.e., the returned awaitable has been fully consumed).
      62              : 
      63              :     @par Example
      64              : 
      65              :     @code
      66              :     template< WriteStream Stream >
      67              :     task<> send_messages( Stream& stream )
      68              :     {
      69              :         write_now wn( stream );
      70              :         auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
      71              :         if( ec1 )
      72              :             detail::throw_system_error( ec1 );
      73              :         auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
      74              :         if( ec2 )
      75              :             detail::throw_system_error( ec2 );
      76              :     }
      77              :     @endcode
      78              : 
      79              :     @see write, write_some, WriteStream, ConstBufferSequence
      80              : */
      81              : template<class Stream>
      82              :     requires WriteStream<Stream>
      83              : class write_now
      84              : {
      85              :     Stream& stream_;
      86              :     void* cached_frame_ = nullptr;
      87              :     std::size_t cached_size_ = 0;
      88              : 
      89              :     struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
      90              :         op_type
      91              :     {
      92              :         struct promise_type
      93              :         {
      94              :             io_result<std::size_t> result_;
      95              :             std::exception_ptr ep_;
      96              :             coro cont_{nullptr};
      97              :             executor_ref ex_;
      98              :             std::stop_token token_;
      99              :             bool done_ = false;
     100              : 
     101           68 :             op_type get_return_object()
     102              :             {
     103              :                 return op_type{
     104              :                     std::coroutine_handle<
     105           68 :                         promise_type>::from_promise(*this)};
     106              :             }
     107              : 
     108           68 :             auto initial_suspend() noexcept
     109              :             {
     110              : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     111           68 :                 return std::suspend_always{};
     112              : #else
     113              :                 return std::suspend_never{};
     114              : #endif
     115              :             }
     116              : 
     117           68 :             auto final_suspend() noexcept
     118              :             {
     119              :                 struct awaiter
     120              :                 {
     121              :                     promise_type* p_;
     122              : 
     123           68 :                     bool await_ready() const noexcept
     124              :                     {
     125           68 :                         return false;
     126              :                     }
     127              : 
     128           68 :                     coro await_suspend(coro) const noexcept
     129              :                     {
     130           68 :                         p_->done_ = true;
     131           68 :                         if(!p_->cont_)
     132            0 :                             return std::noop_coroutine();
     133           68 :                         return p_->cont_;
     134              :                     }
     135              : 
     136            0 :                     void await_resume() const noexcept
     137              :                     {
     138            0 :                     }
     139              :                 };
     140           68 :                 return awaiter{this};
     141              :             }
     142              : 
     143           46 :             void return_value(
     144              :                 io_result<std::size_t> r) noexcept
     145              :             {
     146           46 :                 result_ = r;
     147           46 :             }
     148              : 
     149           22 :             void unhandled_exception()
     150              :             {
     151           22 :                 ep_ = std::current_exception();
     152           22 :             }
     153              : 
     154              :             std::suspend_always yield_value(int) noexcept
     155              :             {
     156              :                 return {};
     157              :             }
     158              : 
     159              :             template<class A>
     160           84 :             auto await_transform(A&& a)
     161              :             {
     162              :                 using decayed = std::decay_t<A>;
     163              :                 if constexpr (IoAwaitable<decayed>)
     164              :                 {
     165              :                     struct wrapper
     166              :                     {
     167              :                         decayed inner_;
     168              :                         promise_type* p_;
     169              : 
     170           84 :                         bool await_ready()
     171              :                         {
     172           84 :                             return inner_.await_ready();
     173              :                         }
     174              : 
     175            0 :                         coro await_suspend(coro h)
     176              :                         {
     177            0 :                             return detail::call_await_suspend(
     178              :                                 &inner_, h,
     179            0 :                                 p_->ex_, p_->token_);
     180              :                         }
     181              : 
     182           84 :                         decltype(auto) await_resume()
     183              :                         {
     184           84 :                             return inner_.await_resume();
     185              :                         }
     186              :                     };
     187              :                     return wrapper{
     188           84 :                         std::forward<A>(a), this};
     189              :                 }
     190              :                 else
     191              :                 {
     192              :                     return std::forward<A>(a);
     193              :                 }
     194              :             }
     195              : 
     196              :             static void*
     197           68 :             operator new(
     198              :                 std::size_t size,
     199              :                 write_now& self,
     200              :                 auto&)
     201              :             {
     202           68 :                 if(self.cached_frame_ &&
     203            4 :                     self.cached_size_ >= size)
     204            4 :                     return self.cached_frame_;
     205           64 :                 void* p = ::operator new(size);
     206           64 :                 if(self.cached_frame_)
     207            0 :                     ::operator delete(self.cached_frame_);
     208           64 :                 self.cached_frame_ = p;
     209           64 :                 self.cached_size_ = size;
     210           64 :                 return p;
     211              :             }
     212              : 
     213              :             static void
     214           68 :             operator delete(void*, std::size_t) noexcept
     215              :             {
     216           68 :             }
     217              :         };
     218              : 
     219              :         std::coroutine_handle<promise_type> h_;
     220              : 
     221          136 :         ~op_type()
     222              :         {
     223          136 :             if(h_)
     224           68 :                 h_.destroy();
     225          136 :         }
     226              : 
     227              :         op_type(op_type const&) = delete;
     228              :         op_type& operator=(op_type const&) = delete;
     229              : 
     230           68 :         op_type(op_type&& other) noexcept
     231           68 :             : h_(std::exchange(other.h_, nullptr))
     232              :         {
     233           68 :         }
     234              : 
     235              :         op_type& operator=(op_type&&) = delete;
     236              : 
     237           68 :         bool await_ready() const noexcept
     238              :         {
     239           68 :             return h_.promise().done_;
     240              :         }
     241              : 
     242           68 :         coro await_suspend(
     243              :             coro cont,
     244              :             executor_ref ex,
     245              :             std::stop_token token)
     246              :         {
     247           68 :             auto& p = h_.promise();
     248           68 :             p.cont_ = cont;
     249           68 :             p.ex_ = ex;
     250           68 :             p.token_ = token;
     251           68 :             return h_;
     252              :         }
     253              : 
     254           68 :         io_result<std::size_t> await_resume()
     255              :         {
     256           68 :             auto& p = h_.promise();
     257           68 :             if(p.ep_)
     258           22 :                 std::rethrow_exception(p.ep_);
     259           46 :             return p.result_;
     260              :         }
     261              : 
     262              :     private:
     263           68 :         explicit op_type(
     264              :             std::coroutine_handle<promise_type> h)
     265           68 :             : h_(h)
     266              :         {
     267           68 :         }
     268              :     };
     269              : 
     270              : public:
     271              :     /** Destructor. Frees the cached coroutine frame. */
     272           64 :     ~write_now()
     273              :     {
     274           64 :         if(cached_frame_)
     275           64 :             ::operator delete(cached_frame_);
     276           64 :     }
     277              : 
     278              :     /** Construct from a stream reference.
     279              : 
     280              :         @param s The stream to write to. Must outlive this object.
     281              :     */
     282              :     explicit
     283           64 :     write_now(Stream& s) noexcept
     284           64 :         : stream_(s)
     285              :     {
     286           64 :     }
     287              : 
     288              :     write_now(write_now const&) = delete;
     289              :     write_now& operator=(write_now const&) = delete;
     290              : 
     291              :     /** Eagerly write the entire buffer sequence.
     292              : 
     293              :         Writes data to the stream by calling `write_some` repeatedly
     294              :         until the entire buffer sequence is written or an error
     295              :         occurs. The operation attempts to complete synchronously:
     296              :         if every `write_some` completes without suspending, the
     297              :         entire operation finishes in `await_ready`.
     298              : 
     299              :         When the fast path cannot complete, the coroutine suspends
     300              :         and continues asynchronously. The internal coroutine frame
     301              :         is cached and reused across calls.
     302              : 
     303              :         @param buffers The buffer sequence to write. Passed by
     304              :             value to ensure the sequence lives in the coroutine
     305              :             frame across suspension points.
     306              : 
     307              :         @return An awaitable yielding `(error_code,std::size_t)`.
     308              :             On success, `n` equals `buffer_size(buffers)`. On
     309              :             error, `n` is the number of bytes written before the
     310              :             error. Compare error codes to conditions:
     311              :             @li `cond::canceled` - Operation was cancelled
     312              :             @li `std::errc::broken_pipe` - Peer closed connection
     313              : 
     314              :         @par Example
     315              : 
     316              :         @code
     317              :         write_now wn( stream );
     318              :         auto [ec, n] = co_await wn( make_buffer( body ) );
     319              :         if( ec )
     320              :             detail::throw_system_error( ec );
     321              :         @endcode
     322              : 
     323              :         @see write, write_some, WriteStream
     324              :     */
     325              : // GCC falsely warns that the coroutine promise's
     326              : // placement operator new(size_t, write_now&, auto&)
     327              : // mismatches operator delete(void*, size_t). Per the
     328              : // standard, coroutine deallocation lookup is separate.
     329              : #if defined(__GNUC__) && !defined(__clang__)
     330              : #pragma GCC diagnostic push
     331              : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
     332              : #endif
     333              : 
     334              : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
     335              :     template<ConstBufferSequence Buffers>
     336              :     op_type
     337           68 :     operator()(Buffers buffers)
     338              :     {
     339              :         std::size_t const total_size = buffer_size(buffers);
     340              :         std::size_t total_written = 0;
     341              :         consuming_buffers cb(buffers);
     342              :         while(total_written < total_size)
     343              :         {
     344              :             auto r =
     345              :                 co_await stream_.write_some(cb);
     346              :             if(r.ec)
     347              :                 co_return io_result<std::size_t>{
     348              :                     r.ec, total_written};
     349              :             cb.consume(r.t1);
     350              :             total_written += r.t1;
     351              :         }
     352              :         co_return io_result<std::size_t>{
     353              :             {}, total_written};
     354          136 :     }
     355              : #else
     356              :     template<ConstBufferSequence Buffers>
     357              :     op_type
     358              :     operator()(Buffers buffers)
     359              :     {
     360              :         std::size_t const total_size = buffer_size(buffers);
     361              :         std::size_t total_written = 0;
     362              : 
     363              :         // GCC ICE in expand_expr_real_1 (expr.cc:11376)
     364              :         // when consuming_buffers spans a co_yield, so
     365              :         // the GCC path uses a separate simple coroutine.
     366              :         consuming_buffers cb(buffers);
     367              :         while(total_written < total_size)
     368              :         {
     369              :             auto inner = stream_.write_some(cb);
     370              :             if(!inner.await_ready())
     371              :                 break;
     372              :             auto r = inner.await_resume();
     373              :             if(r.ec)
     374              :                 co_return io_result<std::size_t>{
     375              :                     r.ec, total_written};
     376              :             cb.consume(r.t1);
     377              :             total_written += r.t1;
     378              :         }
     379              : 
     380              :         if(total_written >= total_size)
     381              :             co_return io_result<std::size_t>{
     382              :                 {}, total_written};
     383              : 
     384              :         co_yield 0;
     385              : 
     386              :         while(total_written < total_size)
     387              :         {
     388              :             auto r =
     389              :                 co_await stream_.write_some(cb);
     390              :             if(r.ec)
     391              :                 co_return io_result<std::size_t>{
     392              :                     r.ec, total_written};
     393              :             cb.consume(r.t1);
     394              :             total_written += r.t1;
     395              :         }
     396              :         co_return io_result<std::size_t>{
     397              :             {}, total_written};
     398              :     }
     399              : #endif
     400              : 
     401              : #if defined(__GNUC__) && !defined(__clang__)
     402              : #pragma GCC diagnostic pop
     403              : #endif
     404              : };
     405              : 
     406              : template<WriteStream S>
     407              : write_now(S&) -> write_now<S>;
     408              : 
     409              : } // namespace capy
     410              : } // namespace boost
     411              : 
     412              : #endif
        

Generated by: LCOV version 2.3