1 +
//
 
2 +
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
 
3 +
//
 
4 +
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 +
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 +
//
 
7 +
// Official repository: https://github.com/cppalliance/capy
 
8 +
//
 
9 +

 
10 +
#ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
 
11 +
#define BOOST_CAPY_IO_WRITE_NOW_HPP
 
12 +

 
13 +
#include <boost/capy/detail/config.hpp>
 
14 +
#include <boost/capy/detail/await_suspend_helper.hpp>
 
15 +
#include <boost/capy/buffers.hpp>
 
16 +
#include <boost/capy/buffers/consuming_buffers.hpp>
 
17 +
#include <boost/capy/concept/io_awaitable.hpp>
 
18 +
#include <boost/capy/concept/write_stream.hpp>
 
19 +
#include <boost/capy/coro.hpp>
 
20 +
#include <boost/capy/ex/executor_ref.hpp>
 
21 +
#include <boost/capy/io_result.hpp>
 
22 +

 
23 +
#include <cstddef>
 
24 +
#include <exception>
 
25 +
#include <new>
 
26 +
#include <stop_token>
 
27 +
#include <utility>
 
28 +

 
29 +
#ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
 
30 +
# if defined(__GNUC__) && !defined(__clang__)
 
31 +
#  define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
 
32 +
# else
 
33 +
#  define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
 
34 +
# endif
 
35 +
#endif
 
36 +

 
37 +
namespace boost {
 
38 +
namespace capy {
 
39 +

 
40 +
/** Eagerly writes complete buffer sequences with frame caching.
 
41 +

 
42 +
    This class wraps a @ref WriteStream and provides an `operator()`
 
43 +
    that writes an entire buffer sequence, attempting to complete
 
44 +
    synchronously. If every `write_some` completes without suspending,
 
45 +
    the entire operation finishes in `await_ready` with no coroutine
 
46 +
    suspension.
 
47 +

 
48 +
    The class maintains a one-element coroutine frame cache. After
 
49 +
    the first call, subsequent calls reuse the cached frame memory,
 
50 +
    avoiding repeated allocation for the internal coroutine.
 
51 +

 
52 +
    @tparam Stream The stream type, must satisfy @ref WriteStream.
 
53 +

 
54 +
    @par Thread Safety
 
55 +
    Distinct objects: Safe.
 
56 +
    Shared objects: Unsafe.
 
57 +

 
58 +
    @par Preconditions
 
59 +
    Only one operation may be outstanding at a time. A new call to
 
60 +
    `operator()` must not be made until the previous operation has
 
61 +
    completed (i.e., the returned awaitable has been fully consumed).
 
62 +

 
63 +
    @par Example
 
64 +

 
65 +
    @code
 
66 +
    template< WriteStream Stream >
 
67 +
    task<> send_messages( Stream& stream )
 
68 +
    {
 
69 +
        write_now wn( stream );
 
70 +
        auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
 
71 +
        if( ec1 )
 
72 +
            detail::throw_system_error( ec1 );
 
73 +
        auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
 
74 +
        if( ec2 )
 
75 +
            detail::throw_system_error( ec2 );
 
76 +
    }
 
77 +
    @endcode
 
78 +

 
79 +
    @see write, write_some, WriteStream, ConstBufferSequence
 
80 +
*/
 
81 +
template<class Stream>
 
82 +
    requires WriteStream<Stream>
 
83 +
class write_now
 
84 +
{
 
85 +
    Stream& stream_;
 
86 +
    void* cached_frame_ = nullptr;
 
87 +
    std::size_t cached_size_ = 0;
 
88 +

 
89 +
    struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
 
90 +
        op_type
 
91 +
    {
 
92 +
        struct promise_type
 
93 +
        {
 
94 +
            io_result<std::size_t> result_;
 
95 +
            std::exception_ptr ep_;
 
96 +
            coro cont_{nullptr};
 
97 +
            executor_ref ex_;
 
98 +
            std::stop_token token_;
 
99 +
            bool done_ = false;
 
100 +

 
101 +
            op_type get_return_object()
 
102 +
            {
 
103 +
                return op_type{
 
104 +
                    std::coroutine_handle<
 
105 +
                        promise_type>::from_promise(*this)};
 
106 +
            }
 
107 +

 
108 +
            auto initial_suspend() noexcept
 
109 +
            {
 
110 +
#if BOOST_CAPY_WRITE_NOW_WORKAROUND
 
111 +
                return std::suspend_always{};
 
112 +
#else
 
113 +
                return std::suspend_never{};
 
114 +
#endif
 
115 +
            }
 
116 +

 
117 +
            auto final_suspend() noexcept
 
118 +
            {
 
119 +
                struct awaiter
 
120 +
                {
 
121 +
                    promise_type* p_;
 
122 +

 
123 +
                    bool await_ready() const noexcept
 
124 +
                    {
 
125 +
                        return false;
 
126 +
                    }
 
127 +

 
128 +
                    coro await_suspend(coro) const noexcept
 
129 +
                    {
 
130 +
                        p_->done_ = true;
 
131 +
                        if(!p_->cont_)
 
132 +
                            return std::noop_coroutine();
 
133 +
                        return p_->cont_;
 
134 +
                    }
 
135 +

 
136 +
                    void await_resume() const noexcept
 
137 +
                    {
 
138 +
                    }
 
139 +
                };
 
140 +
                return awaiter{this};
 
141 +
            }
 
142 +

 
143 +
            void return_value(
 
144 +
                io_result<std::size_t> r) noexcept
 
145 +
            {
 
146 +
                result_ = r;
 
147 +
            }
 
148 +

 
149 +
            void unhandled_exception()
 
150 +
            {
 
151 +
                ep_ = std::current_exception();
 
152 +
            }
 
153 +

 
154 +
            std::suspend_always yield_value(int) noexcept
 
155 +
            {
 
156 +
                return {};
 
157 +
            }
 
158 +

 
159 +
            template<class A>
 
160 +
            auto await_transform(A&& a)
 
161 +
            {
 
162 +
                using decayed = std::decay_t<A>;
 
163 +
                if constexpr (IoAwaitable<decayed>)
 
164 +
                {
 
165 +
                    struct wrapper
 
166 +
                    {
 
167 +
                        decayed inner_;
 
168 +
                        promise_type* p_;
 
169 +

 
170 +
                        bool await_ready()
 
171 +
                        {
 
172 +
                            return inner_.await_ready();
 
173 +
                        }
 
174 +

 
175 +
                        coro await_suspend(coro h)
 
176 +
                        {
 
177 +
                            return detail::call_await_suspend(
 
178 +
                                &inner_, h,
 
179 +
                                p_->ex_, p_->token_);
 
180 +
                        }
 
181 +

 
182 +
                        decltype(auto) await_resume()
 
183 +
                        {
 
184 +
                            return inner_.await_resume();
 
185 +
                        }
 
186 +
                    };
 
187 +
                    return wrapper{
 
188 +
                        std::forward<A>(a), this};
 
189 +
                }
 
190 +
                else
 
191 +
                {
 
192 +
                    return std::forward<A>(a);
 
193 +
                }
 
194 +
            }
 
195 +

 
196 +
            static void*
 
197 +
            operator new(
 
198 +
                std::size_t size,
 
199 +
                write_now& self,
 
200 +
                auto&)
 
201 +
            {
 
202 +
                if(self.cached_frame_ &&
 
203 +
                    self.cached_size_ >= size)
 
204 +
                    return self.cached_frame_;
 
205 +
                void* p = ::operator new(size);
 
206 +
                if(self.cached_frame_)
 
207 +
                    ::operator delete(self.cached_frame_);
 
208 +
                self.cached_frame_ = p;
 
209 +
                self.cached_size_ = size;
 
210 +
                return p;
 
211 +
            }
 
212 +

 
213 +
            static void
 
214 +
            operator delete(void*, std::size_t) noexcept
 
215 +
            {
 
216 +
            }
 
217 +
        };
 
218 +

 
219 +
        std::coroutine_handle<promise_type> h_;
 
220 +

 
221 +
        ~op_type()
 
222 +
        {
 
223 +
            if(h_)
 
224 +
                h_.destroy();
 
225 +
        }
 
226 +

 
227 +
        op_type(op_type const&) = delete;
 
228 +
        op_type& operator=(op_type const&) = delete;
 
229 +

 
230 +
        op_type(op_type&& other) noexcept
 
231 +
            : h_(std::exchange(other.h_, nullptr))
 
232 +
        {
 
233 +
        }
 
234 +

 
235 +
        op_type& operator=(op_type&&) = delete;
 
236 +

 
237 +
        bool await_ready() const noexcept
 
238 +
        {
 
239 +
            return h_.promise().done_;
 
240 +
        }
 
241 +

 
242 +
        coro await_suspend(
 
243 +
            coro cont,
 
244 +
            executor_ref ex,
 
245 +
            std::stop_token token)
 
246 +
        {
 
247 +
            auto& p = h_.promise();
 
248 +
            p.cont_ = cont;
 
249 +
            p.ex_ = ex;
 
250 +
            p.token_ = token;
 
251 +
            return h_;
 
252 +
        }
 
253 +

 
254 +
        io_result<std::size_t> await_resume()
 
255 +
        {
 
256 +
            auto& p = h_.promise();
 
257 +
            if(p.ep_)
 
258 +
                std::rethrow_exception(p.ep_);
 
259 +
            return p.result_;
 
260 +
        }
 
261 +

 
262 +
    private:
 
263 +
        explicit op_type(
 
264 +
            std::coroutine_handle<promise_type> h)
 
265 +
            : h_(h)
 
266 +
        {
 
267 +
        }
 
268 +
    };
 
269 +

 
270 +
public:
 
271 +
    /** Destructor. Frees the cached coroutine frame. */
 
272 +
    ~write_now()
 
273 +
    {
 
274 +
        if(cached_frame_)
 
275 +
            ::operator delete(cached_frame_);
 
276 +
    }
 
277 +

 
278 +
    /** Construct from a stream reference.
 
279 +

 
280 +
        @param s The stream to write to. Must outlive this object.
 
281 +
    */
 
282 +
    explicit
 
283 +
    write_now(Stream& s) noexcept
 
284 +
        : stream_(s)
 
285 +
    {
 
286 +
    }
 
287 +

 
288 +
    write_now(write_now const&) = delete;
 
289 +
    write_now& operator=(write_now const&) = delete;
 
290 +

 
291 +
    /** Eagerly write the entire buffer sequence.
 
292 +

 
293 +
        Writes data to the stream by calling `write_some` repeatedly
 
294 +
        until the entire buffer sequence is written or an error
 
295 +
        occurs. The operation attempts to complete synchronously:
 
296 +
        if every `write_some` completes without suspending, the
 
297 +
        entire operation finishes in `await_ready`.
 
298 +

 
299 +
        When the fast path cannot complete, the coroutine suspends
 
300 +
        and continues asynchronously. The internal coroutine frame
 
301 +
        is cached and reused across calls.
 
302 +

 
303 +
        @param buffers The buffer sequence to write. Passed by
 
304 +
            value to ensure the sequence lives in the coroutine
 
305 +
            frame across suspension points.
 
306 +

 
307 +
        @return An awaitable yielding `(error_code,std::size_t)`.
 
308 +
            On success, `n` equals `buffer_size(buffers)`. On
 
309 +
            error, `n` is the number of bytes written before the
 
310 +
            error. Compare error codes to conditions:
 
311 +
            @li `cond::canceled` - Operation was cancelled
 
312 +
            @li `std::errc::broken_pipe` - Peer closed connection
 
313 +

 
314 +
        @par Example
 
315 +

 
316 +
        @code
 
317 +
        write_now wn( stream );
 
318 +
        auto [ec, n] = co_await wn( make_buffer( body ) );
 
319 +
        if( ec )
 
320 +
            detail::throw_system_error( ec );
 
321 +
        @endcode
 
322 +

 
323 +
        @see write, write_some, WriteStream
 
324 +
    */
 
325 +
// GCC falsely warns that the coroutine promise's
 
326 +
// placement operator new(size_t, write_now&, auto&)
 
327 +
// mismatches operator delete(void*, size_t). Per the
 
328 +
// standard, coroutine deallocation lookup is separate.
 
329 +
#if defined(__GNUC__) && !defined(__clang__)
 
330 +
#pragma GCC diagnostic push
 
331 +
#pragma GCC diagnostic ignored "-Wmismatched-new-delete"
 
332 +
#endif
 
333 +

 
334 +
#if BOOST_CAPY_WRITE_NOW_WORKAROUND
 
335 +
    template<ConstBufferSequence Buffers>
 
336 +
    op_type
 
337 +
    operator()(Buffers buffers)
 
338 +
    {
 
339 +
        std::size_t const total_size = buffer_size(buffers);
 
340 +
        std::size_t total_written = 0;
 
341 +
        consuming_buffers cb(buffers);
 
342 +
        while(total_written < total_size)
 
343 +
        {
 
344 +
            auto r =
 
345 +
                co_await stream_.write_some(cb);
 
346 +
            if(r.ec)
 
347 +
                co_return io_result<std::size_t>{
 
348 +
                    r.ec, total_written};
 
349 +
            cb.consume(r.t1);
 
350 +
            total_written += r.t1;
 
351 +
        }
 
352 +
        co_return io_result<std::size_t>{
 
353 +
            {}, total_written};
 
354 +
    }
 
355 +
#else
 
356 +
    template<ConstBufferSequence Buffers>
 
357 +
    op_type
 
358 +
    operator()(Buffers buffers)
 
359 +
    {
 
360 +
        std::size_t const total_size = buffer_size(buffers);
 
361 +
        std::size_t total_written = 0;
 
362 +

 
363 +
        // GCC ICE in expand_expr_real_1 (expr.cc:11376)
 
364 +
        // when consuming_buffers spans a co_yield, so
 
365 +
        // the GCC path uses a separate simple coroutine.
 
366 +
        consuming_buffers cb(buffers);
 
367 +
        while(total_written < total_size)
 
368 +
        {
 
369 +
            auto inner = stream_.write_some(cb);
 
370 +
            if(!inner.await_ready())
 
371 +
                break;
 
372 +
            auto r = inner.await_resume();
 
373 +
            if(r.ec)
 
374 +
                co_return io_result<std::size_t>{
 
375 +
                    r.ec, total_written};
 
376 +
            cb.consume(r.t1);
 
377 +
            total_written += r.t1;
 
378 +
        }
 
379 +

 
380 +
        if(total_written >= total_size)
 
381 +
            co_return io_result<std::size_t>{
 
382 +
                {}, total_written};
 
383 +

 
384 +
        co_yield 0;
 
385 +

 
386 +
        while(total_written < total_size)
 
387 +
        {
 
388 +
            auto r =
 
389 +
                co_await stream_.write_some(cb);
 
390 +
            if(r.ec)
 
391 +
                co_return io_result<std::size_t>{
 
392 +
                    r.ec, total_written};
 
393 +
            cb.consume(r.t1);
 
394 +
            total_written += r.t1;
 
395 +
        }
 
396 +
        co_return io_result<std::size_t>{
 
397 +
            {}, total_written};
 
398 +
    }
 
399 +
#endif
 
400 +

 
401 +
#if defined(__GNUC__) && !defined(__clang__)
 
402 +
#pragma GCC diagnostic pop
 
403 +
#endif
 
404 +
};
 
405 +

 
406 +
template<WriteStream S>
 
407 +
write_now(S&) -> write_now<S>;
 
408 +

 
409 +
} // namespace capy
 
410 +
} // namespace boost
 
411 +

 
412 +
#endif