1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
 
16 +
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/write_sink.hpp>
19  
#include <boost/capy/concept/write_sink.hpp>
19  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
22 -
#include <boost/capy/task.hpp>
23 +
#include <boost/capy/io_task.hpp>
23  

24  

24  
#include <concepts>
25  
#include <concepts>
25  
#include <coroutine>
26  
#include <coroutine>
26  
#include <cstddef>
27  
#include <cstddef>
27  
#include <exception>
28  
#include <exception>
28  
#include <new>
29  
#include <new>
29  
#include <span>
30  
#include <span>
30  
#include <stop_token>
31  
#include <stop_token>
31  
#include <system_error>
32  
#include <system_error>
32  
#include <utility>
33  
#include <utility>
33  

34  

34  
namespace boost {
35  
namespace boost {
35  
namespace capy {
36  
namespace capy {
36  

37  

37  
/** Type-erased wrapper for any WriteSink.
38  
/** Type-erased wrapper for any WriteSink.
38  

39  

39  
    This class provides type erasure for any type satisfying the
40  
    This class provides type erasure for any type satisfying the
40  
    @ref WriteSink concept, enabling runtime polymorphism for
41  
    @ref WriteSink concept, enabling runtime polymorphism for
41  
    sink write operations. It uses cached awaitable storage to achieve
42  
    sink write operations. It uses cached awaitable storage to achieve
42  
    zero steady-state allocation after construction.
43  
    zero steady-state allocation after construction.
43  

44  

44  
    The wrapper supports two construction modes:
45  
    The wrapper supports two construction modes:
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
    - **Owning**: Pass by value to transfer ownership. The wrapper
46  
      allocates storage and owns the sink.
47  
      allocates storage and owns the sink.
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
    - **Reference**: Pass a pointer to wrap without ownership. The
48  
      pointed-to sink must outlive this wrapper.
49  
      pointed-to sink must outlive this wrapper.
49  

50  

50  
    @par Awaitable Preallocation
51  
    @par Awaitable Preallocation
51  
    The constructor preallocates storage for the type-erased awaitable.
52  
    The constructor preallocates storage for the type-erased awaitable.
52  
    This reserves all virtual address space at server startup
53  
    This reserves all virtual address space at server startup
53  
    so memory usage can be measured up front, rather than
54  
    so memory usage can be measured up front, rather than
54  
    allocating piecemeal as traffic arrives.
55  
    allocating piecemeal as traffic arrives.
55  

56  

 
57 +
    @par Immediate Completion
 
58 +
    Operations complete immediately without suspending when the
 
59 +
    buffer sequence is empty, or when the underlying sink's
 
60 +
    awaitable reports readiness via `await_ready`.
 
61 +

56  
    @par Thread Safety
62  
    @par Thread Safety
57  
    Not thread-safe. Concurrent operations on the same wrapper
63  
    Not thread-safe. Concurrent operations on the same wrapper
58  
    are undefined behavior.
64  
    are undefined behavior.
59  

65  

60  
    @par Example
66  
    @par Example
61  
    @code
67  
    @code
62  
    // Owning - takes ownership of the sink
68  
    // Owning - takes ownership of the sink
63  
    any_write_sink ws(some_sink{args...});
69  
    any_write_sink ws(some_sink{args...});
64  

70  

65  
    // Reference - wraps without ownership
71  
    // Reference - wraps without ownership
66  
    some_sink sink;
72  
    some_sink sink;
67  
    any_write_sink ws(&sink);
73  
    any_write_sink ws(&sink);
68  

74  

69  
    const_buffer buf(data, size);
75  
    const_buffer buf(data, size);
70  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
76  
    auto [ec, n] = co_await ws.write(std::span(&buf, 1));
71  
    auto [ec2] = co_await ws.write_eof();
77  
    auto [ec2] = co_await ws.write_eof();
72  
    @endcode
78  
    @endcode
73  

79  

74  
    @see any_write_stream, WriteSink
80  
    @see any_write_stream, WriteSink
75  
*/
81  
*/
76  
class any_write_sink
82  
class any_write_sink
77  
{
83  
{
78  
    struct vtable;
84  
    struct vtable;
79  
    struct write_awaitable_ops;
85  
    struct write_awaitable_ops;
80  
    struct eof_awaitable_ops;
86  
    struct eof_awaitable_ops;
81  

87  

82  
    template<WriteSink S>
88  
    template<WriteSink S>
83  
    struct vtable_for_impl;
89  
    struct vtable_for_impl;
84  

90  

85  
    void* sink_ = nullptr;
91  
    void* sink_ = nullptr;
86  
    vtable const* vt_ = nullptr;
92  
    vtable const* vt_ = nullptr;
87  
    void* cached_awaitable_ = nullptr;
93  
    void* cached_awaitable_ = nullptr;
88  
    void* storage_ = nullptr;
94  
    void* storage_ = nullptr;
89  
    write_awaitable_ops const* active_write_ops_ = nullptr;
95  
    write_awaitable_ops const* active_write_ops_ = nullptr;
90  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
96  
    eof_awaitable_ops const* active_eof_ops_ = nullptr;
91  

97  

92  
public:
98  
public:
93  
    /** Destructor.
99  
    /** Destructor.
94  

100  

95  
        Destroys the owned sink (if any) and releases the cached
101  
        Destroys the owned sink (if any) and releases the cached
96  
        awaitable storage.
102  
        awaitable storage.
97  
    */
103  
    */
98  
    ~any_write_sink();
104  
    ~any_write_sink();
99  

105  

100  
    /** Default constructor.
106  
    /** Default constructor.
101  

107  

102  
        Constructs an empty wrapper. Operations on a default-constructed
108  
        Constructs an empty wrapper. Operations on a default-constructed
103  
        wrapper result in undefined behavior.
109  
        wrapper result in undefined behavior.
104  
    */
110  
    */
105  
    any_write_sink() = default;
111  
    any_write_sink() = default;
106  

112  

107  
    /** Non-copyable.
113  
    /** Non-copyable.
108  

114  

109  
        The awaitable cache is per-instance and cannot be shared.
115  
        The awaitable cache is per-instance and cannot be shared.
110  
    */
116  
    */
111  
    any_write_sink(any_write_sink const&) = delete;
117  
    any_write_sink(any_write_sink const&) = delete;
112  
    any_write_sink& operator=(any_write_sink const&) = delete;
118  
    any_write_sink& operator=(any_write_sink const&) = delete;
113  

119  

114  
    /** Move constructor.
120  
    /** Move constructor.
115  

121  

116  
        Transfers ownership of the wrapped sink (if owned) and
122  
        Transfers ownership of the wrapped sink (if owned) and
117  
        cached awaitable storage from `other`. After the move, `other` is
123  
        cached awaitable storage from `other`. After the move, `other` is
118  
        in a default-constructed state.
124  
        in a default-constructed state.
119  

125  

120  
        @param other The wrapper to move from.
126  
        @param other The wrapper to move from.
121  
    */
127  
    */
122  
    any_write_sink(any_write_sink&& other) noexcept
128  
    any_write_sink(any_write_sink&& other) noexcept
123  
        : sink_(std::exchange(other.sink_, nullptr))
129  
        : sink_(std::exchange(other.sink_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
130  
        , vt_(std::exchange(other.vt_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
131  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
132  
        , storage_(std::exchange(other.storage_, nullptr))
127  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
133  
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
128  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
134  
        , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
129  
    {
135  
    {
130  
    }
136  
    }
131  

137  

132  
    /** Move assignment operator.
138  
    /** Move assignment operator.
133  

139  

134  
        Destroys any owned sink and releases existing resources,
140  
        Destroys any owned sink and releases existing resources,
135  
        then transfers ownership from `other`.
141  
        then transfers ownership from `other`.
136  

142  

137  
        @param other The wrapper to move from.
143  
        @param other The wrapper to move from.
138  
        @return Reference to this wrapper.
144  
        @return Reference to this wrapper.
139  
    */
145  
    */
140  
    any_write_sink&
146  
    any_write_sink&
141  
    operator=(any_write_sink&& other) noexcept;
147  
    operator=(any_write_sink&& other) noexcept;
142  

148  

143  
    /** Construct by taking ownership of a WriteSink.
149  
    /** Construct by taking ownership of a WriteSink.
144  

150  

145  
        Allocates storage and moves the sink into this wrapper.
151  
        Allocates storage and moves the sink into this wrapper.
146  
        The wrapper owns the sink and will destroy it.
152  
        The wrapper owns the sink and will destroy it.
147  

153  

148  
        @param s The sink to take ownership of.
154  
        @param s The sink to take ownership of.
149  
    */
155  
    */
150  
    template<WriteSink S>
156  
    template<WriteSink S>
151  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
157  
        requires (!std::same_as<std::decay_t<S>, any_write_sink>)
152  
    any_write_sink(S s);
158  
    any_write_sink(S s);
153  

159  

154  
    /** Construct by wrapping a WriteSink without ownership.
160  
    /** Construct by wrapping a WriteSink without ownership.
155  

161  

156  
        Wraps the given sink by pointer. The sink must remain
162  
        Wraps the given sink by pointer. The sink must remain
157  
        valid for the lifetime of this wrapper.
163  
        valid for the lifetime of this wrapper.
158  

164  

159  
        @param s Pointer to the sink to wrap.
165  
        @param s Pointer to the sink to wrap.
160  
    */
166  
    */
161  
    template<WriteSink S>
167  
    template<WriteSink S>
162  
    any_write_sink(S* s);
168  
    any_write_sink(S* s);
163  

169  

164  
    /** Check if the wrapper contains a valid sink.
170  
    /** Check if the wrapper contains a valid sink.
165  

171  

166  
        @return `true` if wrapping a sink, `false` if default-constructed
172  
        @return `true` if wrapping a sink, `false` if default-constructed
167  
            or moved-from.
173  
            or moved-from.
168  
    */
174  
    */
169  
    bool
175  
    bool
170  
    has_value() const noexcept
176  
    has_value() const noexcept
171  
    {
177  
    {
172  
        return sink_ != nullptr;
178  
        return sink_ != nullptr;
173  
    }
179  
    }
174  

180  

175  
    /** Check if the wrapper contains a valid sink.
181  
    /** Check if the wrapper contains a valid sink.
176  

182  

177  
        @return `true` if wrapping a sink, `false` if default-constructed
183  
        @return `true` if wrapping a sink, `false` if default-constructed
178  
            or moved-from.
184  
            or moved-from.
179  
    */
185  
    */
180  
    explicit
186  
    explicit
181  
    operator bool() const noexcept
187  
    operator bool() const noexcept
182  
    {
188  
    {
183  
        return has_value();
189  
        return has_value();
184  
    }
190  
    }
185  

191  

186 -
    /** Initiate an asynchronous write operation.
192 +
    /** Initiate a partial write operation.
 
193 +

 
194 +
        Writes one or more bytes from the provided buffer sequence.
 
195 +
        May consume less than the full sequence.
 
196 +

 
197 +
        @param buffers The buffer sequence containing data to write.
 
198 +

 
199 +
        @return An awaitable yielding `(error_code,std::size_t)`.
 
200 +

 
201 +
        @par Immediate Completion
 
202 +
        The operation completes immediately without suspending
 
203 +
        the calling coroutine when:
 
204 +
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
 
205 +
        @li The underlying sink's awaitable reports immediate
 
206 +
            readiness via `await_ready`.
 
207 +

 
208 +
        @note This is a partial operation and may not process the
 
209 +
        entire buffer sequence. Use @ref write for guaranteed
 
210 +
        complete transfer.
 
211 +

 
212 +
        @par Preconditions
 
213 +
        The wrapper must contain a valid sink (`has_value() == true`).
 
214 +
    */
 
215 +
    template<ConstBufferSequence CB>
 
216 +
    auto
 
217 +
    write_some(CB buffers);
 
218 +

 
219 +
    /** Initiate a complete write operation.
187  

220  

188  
        Writes data from the provided buffer sequence. The operation
221  
        Writes data from the provided buffer sequence. The operation
189  
        completes when all bytes have been consumed, or an error
222  
        completes when all bytes have been consumed, or an error
190 -
        occurs.
223 +
        occurs. Forwards to the underlying sink's `write` operation,
 
224 +
        windowed through @ref buffer_param when the sequence exceeds
 
225 +
        the per-call buffer limit.
191  

226  

192 -
            Passed by value to ensure the sequence lives in the
 
193 -
            coroutine frame across suspension points.
 
194  
        @param buffers The buffer sequence containing data to write.
227  
        @param buffers The buffer sequence containing data to write.
195  

228  

196  
        @return An awaitable yielding `(error_code,std::size_t)`.
229  
        @return An awaitable yielding `(error_code,std::size_t)`.
197  

230  

 
231 +
        @par Immediate Completion
 
232 +
        The operation completes immediately without suspending
 
233 +
        the calling coroutine when:
 
234 +
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
 
235 +
        @li Every underlying `write` call completes
 
236 +
            immediately (the wrapped sink reports readiness
 
237 +
            via `await_ready` on each iteration).
 
238 +

198  
        @par Preconditions
239  
        @par Preconditions
199  
        The wrapper must contain a valid sink (`has_value() == true`).
240  
        The wrapper must contain a valid sink (`has_value() == true`).
200  
    */
241  
    */
201  
    template<ConstBufferSequence CB>
242  
    template<ConstBufferSequence CB>
202 -
    task<io_result<std::size_t>>
243 +
    io_task<std::size_t>
203  
    write(CB buffers);
244  
    write(CB buffers);
204  

245  

205 -
    /** Initiate an asynchronous write operation with optional EOF.
246 +
    /** Atomically write data and signal end-of-stream.
206  

247  

207 -
        Writes data from the provided buffer sequence, optionally
248 +
        Writes all data from the buffer sequence and then signals
208 -
        finalizing the sink afterwards. The operation completes when
249 +
        end-of-stream. The implementation decides how to partition
209 -
        all bytes have been consumed and (if eof is true) the sink
250 +
        the data across calls to the underlying sink's @ref write
210 -
        is finalized, or an error occurs.
251 +
        and `write_eof`. When the caller's buffer sequence is
 
252 +
        non-empty, the final call to the underlying sink is always
 
253 +
        `write_eof` with a non-empty buffer sequence. When the
 
254 +
        caller's buffer sequence is empty, only `write_eof()` with
 
255 +
        no data is called.
211  

256  

212 -
            Passed by value to ensure the sequence lives in the
 
213 -
            coroutine frame across suspension points.
 
214 -

 
215 -
        @param eof If `true`, the sink is finalized after writing
 
216 -
            the data.
 
217  
        @param buffers The buffer sequence containing data to write.
257  
        @param buffers The buffer sequence containing data to write.
218  

258  

219  
        @return An awaitable yielding `(error_code,std::size_t)`.
259  
        @return An awaitable yielding `(error_code,std::size_t)`.
220  

260  

 
261 +
        @par Immediate Completion
 
262 +
        The operation completes immediately without suspending
 
263 +
        the calling coroutine when:
 
264 +
        @li The buffer sequence is empty. Only the @ref write_eof()
 
265 +
            call is performed.
 
266 +
        @li All underlying operations complete immediately (the
 
267 +
            wrapped sink reports readiness via `await_ready`).
 
268 +

221  
        @par Preconditions
269  
        @par Preconditions
222  
        The wrapper must contain a valid sink (`has_value() == true`).
270  
        The wrapper must contain a valid sink (`has_value() == true`).
223  
    */
271  
    */
224  
    template<ConstBufferSequence CB>
272  
    template<ConstBufferSequence CB>
225 -
    task<io_result<std::size_t>>
273 +
    io_task<std::size_t>
226 -
    write(CB buffers, bool eof);
274 +
    write_eof(CB buffers);
227  

275  

228  
    /** Signal end of data.
276  
    /** Signal end of data.
229  

277  

230  
        Indicates that no more data will be written to the sink.
278  
        Indicates that no more data will be written to the sink.
231  
        The operation completes when the sink is finalized, or
279  
        The operation completes when the sink is finalized, or
232  
        an error occurs.
280  
        an error occurs.
233  

281  

234  
        @return An awaitable yielding `(error_code)`.
282  
        @return An awaitable yielding `(error_code)`.
235  

283  

 
284 +
        @par Immediate Completion
 
285 +
        The operation completes immediately without suspending
 
286 +
        the calling coroutine when the underlying sink's awaitable
 
287 +
        reports immediate readiness via `await_ready`.
 
288 +

236  
        @par Preconditions
289  
        @par Preconditions
237  
        The wrapper must contain a valid sink (`has_value() == true`).
290  
        The wrapper must contain a valid sink (`has_value() == true`).
238  
    */
291  
    */
239  
    auto
292  
    auto
240  
    write_eof();
293  
    write_eof();
241  

294  

242  
protected:
295  
protected:
243  
    /** Rebind to a new sink after move.
296  
    /** Rebind to a new sink after move.
244  

297  

245  
        Updates the internal pointer to reference a new sink object.
298  
        Updates the internal pointer to reference a new sink object.
246  
        Used by owning wrappers after move assignment when the owned
299  
        Used by owning wrappers after move assignment when the owned
247  
        object has moved to a new location.
300  
        object has moved to a new location.
248  

301  

249  
        @param new_sink The new sink to bind to. Must be the same
302  
        @param new_sink The new sink to bind to. Must be the same
250  
            type as the original sink.
303  
            type as the original sink.
251  

304  

252  
        @note Terminates if called with a sink of different type
305  
        @note Terminates if called with a sink of different type
253  
            than the original.
306  
            than the original.
254  
    */
307  
    */
255  
    template<WriteSink S>
308  
    template<WriteSink S>
256  
    void
309  
    void
257  
    rebind(S& new_sink) noexcept
310  
    rebind(S& new_sink) noexcept
258  
    {
311  
    {
259  
        if(vt_ != &vtable_for_impl<S>::value)
312  
        if(vt_ != &vtable_for_impl<S>::value)
260  
            std::terminate();
313  
            std::terminate();
261  
        sink_ = &new_sink;
314  
        sink_ = &new_sink;
262  
    }
315  
    }
263  

316  

264  
private:
317  
private:
265  
    auto
318  
    auto
266 -
    write_some_(std::span<const_buffer const> buffers, bool eof);
319 +
    write_some_(std::span<const_buffer const> buffers);
 
320 +

 
321 +
    auto
 
322 +
    write_(std::span<const_buffer const> buffers);
 
323 +

 
324 +
    auto
 
325 +
    write_eof_buffers_(std::span<const_buffer const> buffers);
267  
};
326  
};
268  

327  

269  
//----------------------------------------------------------
328  
//----------------------------------------------------------
270  

329  

271  
struct any_write_sink::write_awaitable_ops
330  
struct any_write_sink::write_awaitable_ops
272  
{
331  
{
273  
    bool (*await_ready)(void*);
332  
    bool (*await_ready)(void*);
274  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
333  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
275  
    io_result<std::size_t> (*await_resume)(void*);
334  
    io_result<std::size_t> (*await_resume)(void*);
276  
    void (*destroy)(void*) noexcept;
335  
    void (*destroy)(void*) noexcept;
277  
};
336  
};
278  

337  

279  
struct any_write_sink::eof_awaitable_ops
338  
struct any_write_sink::eof_awaitable_ops
280  
{
339  
{
281  
    bool (*await_ready)(void*);
340  
    bool (*await_ready)(void*);
282  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
341  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
283  
    io_result<> (*await_resume)(void*);
342  
    io_result<> (*await_resume)(void*);
284  
    void (*destroy)(void*) noexcept;
343  
    void (*destroy)(void*) noexcept;
285  
};
344  
};
286  

345  

287  
struct any_write_sink::vtable
346  
struct any_write_sink::vtable
288  
{
347  
{
289 -
    void (*destroy)(void*) noexcept;
348 +
    write_awaitable_ops const* (*construct_write_some_awaitable)(
290 -
    std::size_t awaitable_size;
349 +
        void* sink,
291 -
    std::size_t awaitable_align;
350 +
        void* storage,
 
351 +
        std::span<const_buffer const> buffers);
292  
    write_awaitable_ops const* (*construct_write_awaitable)(
352  
    write_awaitable_ops const* (*construct_write_awaitable)(
293  
        void* sink,
353  
        void* sink,
294  
        void* storage,
354  
        void* storage,
295 -
        std::span<const_buffer const> buffers,
355 +
        std::span<const_buffer const> buffers);
296 -
        bool eof);
356 +
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
 
357 +
        void* sink,
 
358 +
        void* storage,
 
359 +
        std::span<const_buffer const> buffers);
297  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
360  
    eof_awaitable_ops const* (*construct_eof_awaitable)(
298  
        void* sink,
361  
        void* sink,
299  
        void* storage);
362  
        void* storage);
 
363 +
    std::size_t awaitable_size;
 
364 +
    std::size_t awaitable_align;
 
365 +
    void (*destroy)(void*) noexcept;
300  
};
366  
};
301  

367  

302  
template<WriteSink S>
368  
template<WriteSink S>
303  
struct any_write_sink::vtable_for_impl
369  
struct any_write_sink::vtable_for_impl
304  
{
370  
{
 
371 +
    using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
 
372 +
        std::span<const_buffer const>{}));
305  
    using WriteAwaitable = decltype(std::declval<S&>().write(
373  
    using WriteAwaitable = decltype(std::declval<S&>().write(
306 -
        std::span<const_buffer const>{}, false));
374 +
        std::span<const_buffer const>{}));
 
375 +
    using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
 
376 +
        std::span<const_buffer const>{}));
307  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
377  
    using EofAwaitable = decltype(std::declval<S&>().write_eof());
308  

378  

309  
    static void
379  
    static void
310  
    do_destroy_impl(void* sink) noexcept
380  
    do_destroy_impl(void* sink) noexcept
311  
    {
381  
    {
312  
        static_cast<S*>(sink)->~S();
382  
        static_cast<S*>(sink)->~S();
313  
    }
383  
    }
314  

384  

315  
    static write_awaitable_ops const*
385  
    static write_awaitable_ops const*
 
386 +
    construct_write_some_awaitable_impl(
 
387 +
        void* sink,
 
388 +
        void* storage,
 
389 +
        std::span<const_buffer const> buffers)
 
390 +
    {
 
391 +
        auto& s = *static_cast<S*>(sink);
 
392 +
        ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
 
393 +

 
394 +
        static constexpr write_awaitable_ops ops = {
 
395 +
            +[](void* p) {
 
396 +
                return static_cast<WriteSomeAwaitable*>(p)->await_ready();
 
397 +
            },
 
398 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
399 +
                return detail::call_await_suspend(
 
400 +
                    static_cast<WriteSomeAwaitable*>(p), h, ex, token);
 
401 +
            },
 
402 +
            +[](void* p) {
 
403 +
                return static_cast<WriteSomeAwaitable*>(p)->await_resume();
 
404 +
            },
 
405 +
            +[](void* p) noexcept {
 
406 +
                static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
 
407 +
            }
 
408 +
        };
 
409 +
        return &ops;
 
410 +
    }
 
411 +

 
412 +
    static write_awaitable_ops const*
316  
    construct_write_awaitable_impl(
413  
    construct_write_awaitable_impl(
317  
        void* sink,
414  
        void* sink,
318  
        void* storage,
415  
        void* storage,
319 -
        std::span<const_buffer const> buffers,
416 +
        std::span<const_buffer const> buffers)
320 -
        bool eof)
 
321  
    {
417  
    {
322  
        auto& s = *static_cast<S*>(sink);
418  
        auto& s = *static_cast<S*>(sink);
323 -
        ::new(storage) WriteAwaitable(s.write(buffers, eof));
419 +
        ::new(storage) WriteAwaitable(s.write(buffers));
324  

420  

325  
        static constexpr write_awaitable_ops ops = {
421  
        static constexpr write_awaitable_ops ops = {
326  
            +[](void* p) {
422  
            +[](void* p) {
327  
                return static_cast<WriteAwaitable*>(p)->await_ready();
423  
                return static_cast<WriteAwaitable*>(p)->await_ready();
328  
            },
424  
            },
329  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
425  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
330  
                return detail::call_await_suspend(
426  
                return detail::call_await_suspend(
331  
                    static_cast<WriteAwaitable*>(p), h, ex, token);
427  
                    static_cast<WriteAwaitable*>(p), h, ex, token);
332  
            },
428  
            },
333  
            +[](void* p) {
429  
            +[](void* p) {
334  
                return static_cast<WriteAwaitable*>(p)->await_resume();
430  
                return static_cast<WriteAwaitable*>(p)->await_resume();
335  
            },
431  
            },
336  
            +[](void* p) noexcept {
432  
            +[](void* p) noexcept {
337  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
433  
                static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
338  
            }
434  
            }
339  
        };
435  
        };
340  
        return &ops;
436  
        return &ops;
341  
    }
437  
    }
342  

438  

 
439 +
    static write_awaitable_ops const*
 
440 +
    construct_write_eof_buffers_awaitable_impl(
 
441 +
        void* sink,
 
442 +
        void* storage,
 
443 +
        std::span<const_buffer const> buffers)
 
444 +
    {
 
445 +
        auto& s = *static_cast<S*>(sink);
 
446 +
        ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
 
447 +

 
448 +
        static constexpr write_awaitable_ops ops = {
 
449 +
            +[](void* p) {
 
450 +
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
 
451 +
            },
 
452 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
453 +
                return detail::call_await_suspend(
 
454 +
                    static_cast<WriteEofBuffersAwaitable*>(p), h, ex, token);
 
455 +
            },
 
456 +
            +[](void* p) {
 
457 +
                return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
 
458 +
            },
 
459 +
            +[](void* p) noexcept {
 
460 +
                static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
 
461 +
            }
 
462 +
        };
 
463 +
        return &ops;
 
464 +
    }
 
465 +

343  
    static eof_awaitable_ops const*
466  
    static eof_awaitable_ops const*
344  
    construct_eof_awaitable_impl(
467  
    construct_eof_awaitable_impl(
345  
        void* sink,
468  
        void* sink,
346  
        void* storage)
469  
        void* storage)
347  
    {
470  
    {
348  
        auto& s = *static_cast<S*>(sink);
471  
        auto& s = *static_cast<S*>(sink);
349  
        ::new(storage) EofAwaitable(s.write_eof());
472  
        ::new(storage) EofAwaitable(s.write_eof());
350  

473  

351  
        static constexpr eof_awaitable_ops ops = {
474  
        static constexpr eof_awaitable_ops ops = {
352  
            +[](void* p) {
475  
            +[](void* p) {
353  
                return static_cast<EofAwaitable*>(p)->await_ready();
476  
                return static_cast<EofAwaitable*>(p)->await_ready();
354  
            },
477  
            },
355  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
478  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
356  
                return detail::call_await_suspend(
479  
                return detail::call_await_suspend(
357  
                    static_cast<EofAwaitable*>(p), h, ex, token);
480  
                    static_cast<EofAwaitable*>(p), h, ex, token);
358  
            },
481  
            },
359  
            +[](void* p) {
482  
            +[](void* p) {
360  
                return static_cast<EofAwaitable*>(p)->await_resume();
483  
                return static_cast<EofAwaitable*>(p)->await_resume();
361  
            },
484  
            },
362  
            +[](void* p) noexcept {
485  
            +[](void* p) noexcept {
363  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
486  
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
364  
            }
487  
            }
365  
        };
488  
        };
366  
        return &ops;
489  
        return &ops;
367  
    }
490  
    }
368  

491  

 
492 +
    static constexpr std::size_t max4(
 
493 +
        std::size_t a, std::size_t b,
 
494 +
        std::size_t c, std::size_t d) noexcept
 
495 +
    {
 
496 +
        std::size_t ab = a > b ? a : b;
 
497 +
        std::size_t cd = c > d ? c : d;
 
498 +
        return ab > cd ? ab : cd;
 
499 +
    }
 
500 +

369  
    static constexpr std::size_t max_awaitable_size =
501  
    static constexpr std::size_t max_awaitable_size =
370 -
        sizeof(WriteAwaitable) > sizeof(EofAwaitable)
502 +
        max4(sizeof(WriteSomeAwaitable),
371 -
            ? sizeof(WriteAwaitable)
503 +
             sizeof(WriteAwaitable),
372 -
            : sizeof(EofAwaitable);
504 +
             sizeof(WriteEofBuffersAwaitable),
 
505 +
             sizeof(EofAwaitable));
373  

506  

374  
    static constexpr std::size_t max_awaitable_align =
507  
    static constexpr std::size_t max_awaitable_align =
375 -
        alignof(WriteAwaitable) > alignof(EofAwaitable)
508 +
        max4(alignof(WriteSomeAwaitable),
376 -
            ? alignof(WriteAwaitable)
509 +
             alignof(WriteAwaitable),
377 -
            : alignof(EofAwaitable);
510 +
             alignof(WriteEofBuffersAwaitable),
 
511 +
             alignof(EofAwaitable));
378  

512  

379  
    static constexpr vtable value = {
513  
    static constexpr vtable value = {
380 -
        &do_destroy_impl,
514 +
        &construct_write_some_awaitable_impl,
 
515 +
        &construct_write_awaitable_impl,
 
516 +
        &construct_write_eof_buffers_awaitable_impl,
 
517 +
        &construct_eof_awaitable_impl,
381  
        max_awaitable_size,
518  
        max_awaitable_size,
382  
        max_awaitable_align,
519  
        max_awaitable_align,
383 -
        &construct_write_awaitable_impl,
520 +
        &do_destroy_impl
384 -
        &construct_eof_awaitable_impl
 
385  
    };
521  
    };
386  
};
522  
};
387  

523  

388  
//----------------------------------------------------------
524  
//----------------------------------------------------------
389  

525  

390  
inline
526  
inline
391  
any_write_sink::~any_write_sink()
527  
any_write_sink::~any_write_sink()
392  
{
528  
{
393  
    if(storage_)
529  
    if(storage_)
394  
    {
530  
    {
395  
        vt_->destroy(sink_);
531  
        vt_->destroy(sink_);
396  
        ::operator delete(storage_);
532  
        ::operator delete(storage_);
397  
    }
533  
    }
398  
    if(cached_awaitable_)
534  
    if(cached_awaitable_)
 
535 +
    {
 
536 +
        if(active_write_ops_)
 
537 +
            active_write_ops_->destroy(cached_awaitable_);
 
538 +
        else if(active_eof_ops_)
 
539 +
            active_eof_ops_->destroy(cached_awaitable_);
399  
        ::operator delete(cached_awaitable_);
540  
        ::operator delete(cached_awaitable_);
 
541 +
    }
400  
}
542  
}
401  

543  

402  
inline any_write_sink&
544  
inline any_write_sink&
403  
any_write_sink::operator=(any_write_sink&& other) noexcept
545  
any_write_sink::operator=(any_write_sink&& other) noexcept
404  
{
546  
{
405  
    if(this != &other)
547  
    if(this != &other)
406  
    {
548  
    {
407  
        if(storage_)
549  
        if(storage_)
408  
        {
550  
        {
409  
            vt_->destroy(sink_);
551  
            vt_->destroy(sink_);
410  
            ::operator delete(storage_);
552  
            ::operator delete(storage_);
411  
        }
553  
        }
412  
        if(cached_awaitable_)
554  
        if(cached_awaitable_)
 
555 +
        {
 
556 +
            if(active_write_ops_)
 
557 +
                active_write_ops_->destroy(cached_awaitable_);
 
558 +
            else if(active_eof_ops_)
 
559 +
                active_eof_ops_->destroy(cached_awaitable_);
413  
            ::operator delete(cached_awaitable_);
560  
            ::operator delete(cached_awaitable_);
 
561 +
        }
414  
        sink_ = std::exchange(other.sink_, nullptr);
562  
        sink_ = std::exchange(other.sink_, nullptr);
415  
        vt_ = std::exchange(other.vt_, nullptr);
563  
        vt_ = std::exchange(other.vt_, nullptr);
416  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
564  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
417  
        storage_ = std::exchange(other.storage_, nullptr);
565  
        storage_ = std::exchange(other.storage_, nullptr);
418  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
566  
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
419  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
567  
        active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
420  
    }
568  
    }
421  
    return *this;
569  
    return *this;
422  
}
570  
}
423  

571  

424  
template<WriteSink S>
572  
template<WriteSink S>
425  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
573  
    requires (!std::same_as<std::decay_t<S>, any_write_sink>)
426  
any_write_sink::any_write_sink(S s)
574  
any_write_sink::any_write_sink(S s)
427  
    : vt_(&vtable_for_impl<S>::value)
575  
    : vt_(&vtable_for_impl<S>::value)
428  
{
576  
{
429  
    struct guard {
577  
    struct guard {
430  
        any_write_sink* self;
578  
        any_write_sink* self;
431  
        bool committed = false;
579  
        bool committed = false;
432  
        ~guard() {
580  
        ~guard() {
433  
            if(!committed && self->storage_) {
581  
            if(!committed && self->storage_) {
434  
                self->vt_->destroy(self->sink_);
582  
                self->vt_->destroy(self->sink_);
435  
                ::operator delete(self->storage_);
583  
                ::operator delete(self->storage_);
436  
                self->storage_ = nullptr;
584  
                self->storage_ = nullptr;
437  
                self->sink_ = nullptr;
585  
                self->sink_ = nullptr;
438  
            }
586  
            }
439  
        }
587  
        }
440  
    } g{this};
588  
    } g{this};
441  

589  

442  
    storage_ = ::operator new(sizeof(S));
590  
    storage_ = ::operator new(sizeof(S));
443  
    sink_ = ::new(storage_) S(std::move(s));
591  
    sink_ = ::new(storage_) S(std::move(s));
444  

592  

445  
    // Preallocate the awaitable storage (sized for max of write/eof)
593  
    // Preallocate the awaitable storage (sized for max of write/eof)
446  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
594  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
447  

595  

448  
    g.committed = true;
596  
    g.committed = true;
449  
}
597  
}
450  

598  

451  
template<WriteSink S>
599  
template<WriteSink S>
452  
any_write_sink::any_write_sink(S* s)
600  
any_write_sink::any_write_sink(S* s)
453  
    : sink_(s)
601  
    : sink_(s)
454  
    , vt_(&vtable_for_impl<S>::value)
602  
    , vt_(&vtable_for_impl<S>::value)
455  
{
603  
{
456  
    // Preallocate the awaitable storage (sized for max of write/eof)
604  
    // Preallocate the awaitable storage (sized for max of write/eof)
457  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
605  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
458  
}
606  
}
459  

607  

460  
//----------------------------------------------------------
608  
//----------------------------------------------------------
461  

609  

462  
inline auto
610  
inline auto
463  
any_write_sink::write_some_(
611  
any_write_sink::write_some_(
464 -
    std::span<const_buffer const> buffers,
612 +
    std::span<const_buffer const> buffers)
465 -
    bool eof)
613 +
{
 
614 +
    struct awaitable
 
615 +
    {
 
616 +
        any_write_sink* self_;
 
617 +
        std::span<const_buffer const> buffers_;
 
618 +

 
619 +
        bool
 
620 +
        await_ready() const noexcept
 
621 +
        {
 
622 +
            return false;
 
623 +
        }
 
624 +

 
625 +
        coro
 
626 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
627 +
        {
 
628 +
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
 
629 +
                self_->sink_,
 
630 +
                self_->cached_awaitable_,
 
631 +
                buffers_);
 
632 +

 
633 +
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
 
634 +
                return h;
 
635 +

 
636 +
            return self_->active_write_ops_->await_suspend(
 
637 +
                self_->cached_awaitable_, h, ex, token);
 
638 +
        }
 
639 +

 
640 +
        io_result<std::size_t>
 
641 +
        await_resume()
 
642 +
        {
 
643 +
            struct guard {
 
644 +
                any_write_sink* self;
 
645 +
                ~guard() {
 
646 +
                    self->active_write_ops_->destroy(self->cached_awaitable_);
 
647 +
                    self->active_write_ops_ = nullptr;
 
648 +
                }
 
649 +
            } g{self_};
 
650 +
            return self_->active_write_ops_->await_resume(
 
651 +
                self_->cached_awaitable_);
 
652 +
        }
 
653 +
    };
 
654 +
    return awaitable{this, buffers};
 
655 +
}
 
656 +

 
657 +
inline auto
 
658 +
any_write_sink::write_(
 
659 +
    std::span<const_buffer const> buffers)
466  
{
660  
{
467  
    struct awaitable
661  
    struct awaitable
468  
    {
662  
    {
469  
        any_write_sink* self_;
663  
        any_write_sink* self_;
470 -
        bool eof_;
 
471  
        std::span<const_buffer const> buffers_;
664  
        std::span<const_buffer const> buffers_;
472  

665  

473  
        bool
666  
        bool
474  
        await_ready() const noexcept
667  
        await_ready() const noexcept
475  
        {
668  
        {
476  
            return false;
669  
            return false;
477  
        }
670  
        }
478  

671  

479  
        coro
672  
        coro
480  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
673  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
481 -
            // Construct the underlying awaitable into cached storage
 
482  
        {
674  
        {
483  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
675  
            self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
484  
                self_->sink_,
676  
                self_->sink_,
485  
                self_->cached_awaitable_,
677  
                self_->cached_awaitable_,
486 -
                buffers_,
678 +
                buffers_);
487 -
                eof_);
 
488 -
            // Check if underlying is immediately ready
 
489  

679  

490  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
680  
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
491  
                return h;
681  
                return h;
492 -
            // Forward to underlying awaitable
 
493  

682  

494  
            return self_->active_write_ops_->await_suspend(
683  
            return self_->active_write_ops_->await_suspend(
495  
                self_->cached_awaitable_, h, ex, token);
684  
                self_->cached_awaitable_, h, ex, token);
496  
        }
685  
        }
497  

686  

498  
        io_result<std::size_t>
687  
        io_result<std::size_t>
499  
        await_resume()
688  
        await_resume()
500  
        {
689  
        {
501  
            struct guard {
690  
            struct guard {
502  
                any_write_sink* self;
691  
                any_write_sink* self;
503  
                ~guard() {
692  
                ~guard() {
504  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
693  
                    self->active_write_ops_->destroy(self->cached_awaitable_);
505  
                    self->active_write_ops_ = nullptr;
694  
                    self->active_write_ops_ = nullptr;
506  
                }
695  
                }
507  
            } g{self_};
696  
            } g{self_};
508  
            return self_->active_write_ops_->await_resume(
697  
            return self_->active_write_ops_->await_resume(
509  
                self_->cached_awaitable_);
698  
                self_->cached_awaitable_);
510  
        }
699  
        }
511  
    };
700  
    };
512 -
    return awaitable{this, buffers, eof};
701 +
    return awaitable{this, buffers};
513  
}
702  
}
514  

703  

515  
inline auto
704  
inline auto
516  
any_write_sink::write_eof()
705  
any_write_sink::write_eof()
517  
{
706  
{
518  
    struct awaitable
707  
    struct awaitable
519  
    {
708  
    {
520  
        any_write_sink* self_;
709  
        any_write_sink* self_;
521  

710  

522  
        bool
711  
        bool
523  
        await_ready() const noexcept
712  
        await_ready() const noexcept
524  
        {
713  
        {
525  
            return false;
714  
            return false;
526  
        }
715  
        }
527  

716  

528  
        coro
717  
        coro
529  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
718  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
530  
        {
719  
        {
531  
            // Construct the underlying awaitable into cached storage
720  
            // Construct the underlying awaitable into cached storage
532  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
721  
            self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
533  
                self_->sink_,
722  
                self_->sink_,
534  
                self_->cached_awaitable_);
723  
                self_->cached_awaitable_);
535  

724  

536  
            // Check if underlying is immediately ready
725  
            // Check if underlying is immediately ready
537  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
726  
            if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
538  
                return h;
727  
                return h;
539  

728  

540  
            // Forward to underlying awaitable
729  
            // Forward to underlying awaitable
541  
            return self_->active_eof_ops_->await_suspend(
730  
            return self_->active_eof_ops_->await_suspend(
542  
                self_->cached_awaitable_, h, ex, token);
731  
                self_->cached_awaitable_, h, ex, token);
543  
        }
732  
        }
544  

733  

545  
        io_result<>
734  
        io_result<>
546  
        await_resume()
735  
        await_resume()
547  
        {
736  
        {
548  
            struct guard {
737  
            struct guard {
549  
                any_write_sink* self;
738  
                any_write_sink* self;
550  
                ~guard() {
739  
                ~guard() {
551  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
740  
                    self->active_eof_ops_->destroy(self->cached_awaitable_);
552  
                    self->active_eof_ops_ = nullptr;
741  
                    self->active_eof_ops_ = nullptr;
553  
                }
742  
                }
554  
            } g{self_};
743  
            } g{self_};
555  
            return self_->active_eof_ops_->await_resume(
744  
            return self_->active_eof_ops_->await_resume(
556  
                self_->cached_awaitable_);
745  
                self_->cached_awaitable_);
557  
        }
746  
        }
558  
    };
747  
    };
559  
    return awaitable{this};
748  
    return awaitable{this};
560  
}
749  
}
561  

750  

 
751 +
inline auto
 
752 +
any_write_sink::write_eof_buffers_(
 
753 +
    std::span<const_buffer const> buffers)
 
754 +
{
 
755 +
    struct awaitable
 
756 +
    {
 
757 +
        any_write_sink* self_;
 
758 +
        std::span<const_buffer const> buffers_;
 
759 +

 
760 +
        bool
 
761 +
        await_ready() const noexcept
 
762 +
        {
 
763 +
            return false;
 
764 +
        }
 
765 +

 
766 +
        coro
 
767 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
768 +
        {
 
769 +
            self_->active_write_ops_ =
 
770 +
                self_->vt_->construct_write_eof_buffers_awaitable(
 
771 +
                    self_->sink_,
 
772 +
                    self_->cached_awaitable_,
 
773 +
                    buffers_);
 
774 +

 
775 +
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
 
776 +
                return h;
 
777 +

 
778 +
            return self_->active_write_ops_->await_suspend(
 
779 +
                self_->cached_awaitable_, h, ex, token);
 
780 +
        }
 
781 +

 
782 +
        io_result<std::size_t>
 
783 +
        await_resume()
 
784 +
        {
 
785 +
            struct guard {
 
786 +
                any_write_sink* self;
 
787 +
                ~guard() {
 
788 +
                    self->active_write_ops_->destroy(self->cached_awaitable_);
 
789 +
                    self->active_write_ops_ = nullptr;
 
790 +
                }
 
791 +
            } g{self_};
 
792 +
            return self_->active_write_ops_->await_resume(
 
793 +
                self_->cached_awaitable_);
 
794 +
        }
 
795 +
    };
 
796 +
    return awaitable{this, buffers};
 
797 +
}
 
798 +

562  
template<ConstBufferSequence CB>
799  
template<ConstBufferSequence CB>
563 -
task<io_result<std::size_t>>
800 +
auto
564 -
any_write_sink::write(CB buffers)
801 +
any_write_sink::write_some(CB buffers)
565  
{
802  
{
566 -
    return write(buffers, false);
803 +
    struct awaitable
 
804 +
    {
 
805 +
        any_write_sink* self_;
 
806 +
        const_buffer_array<detail::max_iovec_> ba_;
 
807 +

 
808 +
        awaitable(
 
809 +
            any_write_sink* self,
 
810 +
            CB const& buffers)
 
811 +
            : self_(self)
 
812 +
            , ba_(buffers)
 
813 +
        {
 
814 +
        }
 
815 +

 
816 +
        bool
 
817 +
        await_ready() const noexcept
 
818 +
        {
 
819 +
            return ba_.to_span().empty();
 
820 +
        }
 
821 +

 
822 +
        coro
 
823 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
824 +
        {
 
825 +
            self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
 
826 +
                self_->sink_,
 
827 +
                self_->cached_awaitable_,
 
828 +
                ba_.to_span());
 
829 +

 
830 +
            if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
 
831 +
                return h;
 
832 +

 
833 +
            return self_->active_write_ops_->await_suspend(
 
834 +
                self_->cached_awaitable_, h, ex, token);
 
835 +
        }
 
836 +

 
837 +
        io_result<std::size_t>
 
838 +
        await_resume()
 
839 +
        {
 
840 +
            if(ba_.to_span().empty())
 
841 +
                return {{}, 0};
 
842 +

 
843 +
            struct guard {
 
844 +
                any_write_sink* self;
 
845 +
                ~guard() {
 
846 +
                    self->active_write_ops_->destroy(self->cached_awaitable_);
 
847 +
                    self->active_write_ops_ = nullptr;
 
848 +
                }
 
849 +
            } g{self_};
 
850 +
            return self_->active_write_ops_->await_resume(
 
851 +
                self_->cached_awaitable_);
 
852 +
        }
 
853 +
    };
 
854 +
    return awaitable{this, buffers};
567  
}
855  
}
568  

856  

569  
template<ConstBufferSequence CB>
857  
template<ConstBufferSequence CB>
570 -
task<io_result<std::size_t>>
858 +
io_task<std::size_t>
571 -
any_write_sink::write(CB buffers, bool eof)
859 +
any_write_sink::write(CB buffers)
572  
{
860  
{
573  
    buffer_param<CB> bp(buffers);
861  
    buffer_param<CB> bp(buffers);
574  
    std::size_t total = 0;
862  
    std::size_t total = 0;
575  

863  

576  
    for(;;)
864  
    for(;;)
577  
    {
865  
    {
578  
        auto bufs = bp.data();
866  
        auto bufs = bp.data();
579  
        if(bufs.empty())
867  
        if(bufs.empty())
580  
            break;
868  
            break;
581  

869  

582 -
        auto [ec, n] = co_await write_some_(bufs, false);
870 +
        auto [ec, n] = co_await write_(bufs);
 
871 +
        total += n;
583  
        if(ec)
872  
        if(ec)
584 -
            co_return {ec, total + n};
873 +
            co_return {ec, total};
585 -
        total += n;
 
586  
        bp.consume(n);
874  
        bp.consume(n);
587  
    }
875  
    }
588  

876  

589 -
    if(eof)
877 +
    co_return {{}, total};
 
878 +
}
 
879 +

 
880 +
template<ConstBufferSequence CB>
 
881 +
io_task<std::size_t>
 
882 +
any_write_sink::write_eof(CB buffers)
 
883 +
{
 
884 +
    const_buffer_param<CB> bp(buffers);
 
885 +
    std::size_t total = 0;
 
886 +

 
887 +
    for(;;)
590  
    {
888  
    {
591 -
        auto [ec] = co_await write_eof();
889 +
        auto bufs = bp.data();
 
890 +
        if(bufs.empty())
 
891 +
        {
 
892 +
            auto [ec] = co_await write_eof();
 
893 +
            co_return {ec, total};
 
894 +
        }
 
895 +

 
896 +
        if(! bp.more())
 
897 +
        {
 
898 +
            // Last window — send atomically with EOF
 
899 +
            auto [ec, n] = co_await write_eof_buffers_(bufs);
 
900 +
            total += n;
 
901 +
            co_return {ec, total};
 
902 +
        }
 
903 +

 
904 +
        auto [ec, n] = co_await write_(bufs);
 
905 +
        total += n;
592  
        if(ec)
906  
        if(ec)
593  
            co_return {ec, total};
907  
            co_return {ec, total};
 
908 +
        bp.consume(n);
594 -

 
595 -
    co_return {{}, total};
 
596  
    }
909  
    }
597  
}
910  
}
598  

911  

599  
} // namespace capy
912  
} // namespace capy
600  
} // namespace boost
913  
} // namespace boost
601  

914  

602  
#endif
915  
#endif