1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11  
#define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/buffer_copy.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
18  
#include <boost/capy/concept/buffer_sink.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
19  
#include <boost/capy/concept/io_awaitable.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
20  
#include <boost/capy/concept/write_sink.hpp>
21  
#include <boost/capy/coro.hpp>
21  
#include <boost/capy/coro.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <boost/capy/io_result.hpp>
23  
#include <boost/capy/io_result.hpp>
24 -
#include <boost/capy/task.hpp>
24 +
#include <boost/capy/io_task.hpp>
25  

25  

26  
#include <concepts>
26  
#include <concepts>
27  
#include <coroutine>
27  
#include <coroutine>
28  
#include <cstddef>
28  
#include <cstddef>
29  
#include <exception>
29  
#include <exception>
30  
#include <new>
30  
#include <new>
 
31 +
#include <span>
31  
#include <stop_token>
32  
#include <stop_token>
32  
#include <system_error>
33  
#include <system_error>
33  
#include <utility>
34  
#include <utility>
34  

35  

35  
namespace boost {
36  
namespace boost {
36  
namespace capy {
37  
namespace capy {
37  

38  

38  
/** Type-erased wrapper for any BufferSink.
39  
/** Type-erased wrapper for any BufferSink.
39  

40  

40  
    This class provides type erasure for any type satisfying the
41  
    This class provides type erasure for any type satisfying the
41  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    @ref BufferSink concept, enabling runtime polymorphism for
42  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    buffer sink operations. It uses cached awaitable storage to achieve
43  
    zero steady-state allocation after construction.
44  
    zero steady-state allocation after construction.
44  

45  

45 -
    The wrapper also satisfies @ref WriteSink through templated
46 +
    The wrapper exposes two interfaces for producing data:
46 -
    @ref write methods. These methods copy data from the caller's
47 +
    the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 -
    buffers into the sink's internal storage, incurring one extra
48 +
    and the @ref WriteSink interface (`write_some`, `write`,
48 -
    buffer copy compared to using @ref prepare and @ref commit
49 +
    `write_eof`). Choose the interface that matches how your data
49 -
    directly.
50 +
    is produced:
 
51 +

 
52 +
    @par Choosing an Interface
 
53 +

 
54 +
    Use the **BufferSink** interface when you are a generator that
 
55 +
    produces data into externally-provided buffers. The sink owns
 
56 +
    the memory; you call @ref prepare to obtain writable buffers,
 
57 +
    fill them, then call @ref commit or @ref commit_eof.
 
58 +

 
59 +
    Use the **WriteSink** interface when you already have buffers
 
60 +
    containing the data to write:
 
61 +
    - If the entire body is available up front, call
 
62 +
      @ref write_eof(buffers) to send everything atomically.
 
63 +
    - If data arrives incrementally, call @ref write or
 
64 +
      @ref write_some in a loop, then @ref write_eof() when done.
 
65 +
      Prefer `write` (complete) unless your streaming pattern
 
66 +
      benefits from partial writes via `write_some`.
 
67 +

 
68 +
    If the wrapped type only satisfies @ref BufferSink, the
 
69 +
    @ref WriteSink operations are provided automatically.
 
70 +

 
71 +
    @par Construction Modes
50 -
    The wrapper supports two construction modes:
 
51  

72  

52  
    - **Owning**: Pass by value to transfer ownership. The wrapper
73  
    - **Owning**: Pass by value to transfer ownership. The wrapper
53  
      allocates storage and owns the sink.
74  
      allocates storage and owns the sink.
54  
    - **Reference**: Pass a pointer to wrap without ownership. The
75  
    - **Reference**: Pass a pointer to wrap without ownership. The
55  
      pointed-to sink must outlive this wrapper.
76  
      pointed-to sink must outlive this wrapper.
56  

77  

57  
    @par Awaitable Preallocation
78  
    @par Awaitable Preallocation
58  
    The constructor preallocates storage for the type-erased awaitable.
79  
    The constructor preallocates storage for the type-erased awaitable.
59  
    This reserves all virtual address space at server startup
80  
    This reserves all virtual address space at server startup
60  
    so memory usage can be measured up front, rather than
81  
    so memory usage can be measured up front, rather than
61  
    allocating piecemeal as traffic arrives.
82  
    allocating piecemeal as traffic arrives.
62  

83  

63  
    @par Thread Safety
84  
    @par Thread Safety
64  
    Not thread-safe. Concurrent operations on the same wrapper
85  
    Not thread-safe. Concurrent operations on the same wrapper
65  
    are undefined behavior.
86  
    are undefined behavior.
66  

87  

67  
    @par Example
88  
    @par Example
68  
    @code
89  
    @code
69  
    // Owning - takes ownership of the sink
90  
    // Owning - takes ownership of the sink
70  
    any_buffer_sink abs(some_buffer_sink{args...});
91  
    any_buffer_sink abs(some_buffer_sink{args...});
71  

92  

72  
    // Reference - wraps without ownership
93  
    // Reference - wraps without ownership
73  
    some_buffer_sink sink;
94  
    some_buffer_sink sink;
74  
    any_buffer_sink abs(&sink);
95  
    any_buffer_sink abs(&sink);
75  

96  

 
97 +
    // BufferSink interface: generate into callee-owned buffers
76  
    mutable_buffer arr[16];
98  
    mutable_buffer arr[16];
77  
    auto bufs = abs.prepare(arr);
99  
    auto bufs = abs.prepare(arr);
78  
    // Write data into bufs[0..bufs.size())
100  
    // Write data into bufs[0..bufs.size())
79  
    auto [ec] = co_await abs.commit(bytes_written);
101  
    auto [ec] = co_await abs.commit(bytes_written);
80 -
    auto [ec2] = co_await abs.commit_eof();
102 +
    auto [ec2] = co_await abs.commit_eof(0);
 
103 +

 
104 +
    // WriteSink interface: send caller-owned buffers
 
105 +
    auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
 
106 +
    auto [ec4] = co_await abs.write_eof();
 
107 +

 
108 +
    // Or send everything at once
 
109 +
    auto [ec5, n2] = co_await abs.write_eof(
 
110 +
        make_buffer(body_data));
81  
    @endcode
111  
    @endcode
82  

112  

83  
    @see any_buffer_source, BufferSink, WriteSink
113  
    @see any_buffer_source, BufferSink, WriteSink
84  
*/
114  
*/
85  
class any_buffer_sink
115  
class any_buffer_sink
86  
{
116  
{
87  
    struct vtable;
117  
    struct vtable;
88  
    struct awaitable_ops;
118  
    struct awaitable_ops;
 
119 +
    struct write_awaitable_ops;
89  

120  

90  
    template<BufferSink S>
121  
    template<BufferSink S>
91  
    struct vtable_for_impl;
122  
    struct vtable_for_impl;
92  

123  

 
124 +
    // hot-path members first for cache locality
93  
    void* sink_ = nullptr;
125  
    void* sink_ = nullptr;
94  
    vtable const* vt_ = nullptr;
126  
    vtable const* vt_ = nullptr;
95 -
    void* storage_ = nullptr;
 
96  
    void* cached_awaitable_ = nullptr;
127  
    void* cached_awaitable_ = nullptr;
97  
    awaitable_ops const* active_ops_ = nullptr;
128  
    awaitable_ops const* active_ops_ = nullptr;
 
129 +
    write_awaitable_ops const* active_write_ops_ = nullptr;
 
130 +
    void* storage_ = nullptr;
98  

131  

99  
public:
132  
public:
100  
    /** Destructor.
133  
    /** Destructor.
101  

134  

102  
        Destroys the owned sink (if any) and releases the cached
135  
        Destroys the owned sink (if any) and releases the cached
103  
        awaitable storage.
136  
        awaitable storage.
104  
    */
137  
    */
105  
    ~any_buffer_sink();
138  
    ~any_buffer_sink();
106  

139  

107  
    /** Default constructor.
140  
    /** Default constructor.
108  

141  

109  
        Constructs an empty wrapper. Operations on a default-constructed
142  
        Constructs an empty wrapper. Operations on a default-constructed
110  
        wrapper result in undefined behavior.
143  
        wrapper result in undefined behavior.
111  
    */
144  
    */
112  
    any_buffer_sink() = default;
145  
    any_buffer_sink() = default;
113  

146  

114  
    /** Non-copyable.
147  
    /** Non-copyable.
115  

148  

116  
        The awaitable cache is per-instance and cannot be shared.
149  
        The awaitable cache is per-instance and cannot be shared.
117  
    */
150  
    */
118  
    any_buffer_sink(any_buffer_sink const&) = delete;
151  
    any_buffer_sink(any_buffer_sink const&) = delete;
119  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152  
    any_buffer_sink& operator=(any_buffer_sink const&) = delete;
120  

153  

121  
    /** Move constructor.
154  
    /** Move constructor.
122  

155  

123  
        Transfers ownership of the wrapped sink (if owned) and
156  
        Transfers ownership of the wrapped sink (if owned) and
124  
        cached awaitable storage from `other`. After the move, `other` is
157  
        cached awaitable storage from `other`. After the move, `other` is
125  
        in a default-constructed state.
158  
        in a default-constructed state.
126  

159  

127  
        @param other The wrapper to move from.
160  
        @param other The wrapper to move from.
128  
    */
161  
    */
129  
    any_buffer_sink(any_buffer_sink&& other) noexcept
162  
    any_buffer_sink(any_buffer_sink&& other) noexcept
130  
        : sink_(std::exchange(other.sink_, nullptr))
163  
        : sink_(std::exchange(other.sink_, nullptr))
131  
        , vt_(std::exchange(other.vt_, nullptr))
164  
        , vt_(std::exchange(other.vt_, nullptr))
132 -
        , storage_(std::exchange(other.storage_, nullptr))
 
133  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
134  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
166  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
 
167 +
        , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
 
168 +
        , storage_(std::exchange(other.storage_, nullptr))
135  
    {
169  
    {
136  
    }
170  
    }
137  

171  

138  
    /** Move assignment operator.
172  
    /** Move assignment operator.
139  

173  

140  
        Destroys any owned sink and releases existing resources,
174  
        Destroys any owned sink and releases existing resources,
141  
        then transfers ownership from `other`.
175  
        then transfers ownership from `other`.
142  

176  

143  
        @param other The wrapper to move from.
177  
        @param other The wrapper to move from.
144  
        @return Reference to this wrapper.
178  
        @return Reference to this wrapper.
145  
    */
179  
    */
146  
    any_buffer_sink&
180  
    any_buffer_sink&
147  
    operator=(any_buffer_sink&& other) noexcept;
181  
    operator=(any_buffer_sink&& other) noexcept;
148  

182  

149  
    /** Construct by taking ownership of a BufferSink.
183  
    /** Construct by taking ownership of a BufferSink.
150  

184  

151  
        Allocates storage and moves the sink into this wrapper.
185  
        Allocates storage and moves the sink into this wrapper.
152 -
        The wrapper owns the sink and will destroy it.
186 +
        The wrapper owns the sink and will destroy it. If `S` also
 
187 +
        satisfies @ref WriteSink, native write operations are
 
188 +
        forwarded through the virtual boundary.
153  

189  

154  
        @param s The sink to take ownership of.
190  
        @param s The sink to take ownership of.
155  
    */
191  
    */
156  
    template<BufferSink S>
192  
    template<BufferSink S>
157  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193  
        requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
158  
    any_buffer_sink(S s);
194  
    any_buffer_sink(S s);
159  

195  

160  
    /** Construct by wrapping a BufferSink without ownership.
196  
    /** Construct by wrapping a BufferSink without ownership.
161  

197  

162  
        Wraps the given sink by pointer. The sink must remain
198  
        Wraps the given sink by pointer. The sink must remain
163 -
        valid for the lifetime of this wrapper.
199 +
        valid for the lifetime of this wrapper. If `S` also
 
200 +
        satisfies @ref WriteSink, native write operations are
 
201 +
        forwarded through the virtual boundary.
164  

202  

165  
        @param s Pointer to the sink to wrap.
203  
        @param s Pointer to the sink to wrap.
166  
    */
204  
    */
167  
    template<BufferSink S>
205  
    template<BufferSink S>
168  
    any_buffer_sink(S* s);
206  
    any_buffer_sink(S* s);
169  

207  

170  
    /** Check if the wrapper contains a valid sink.
208  
    /** Check if the wrapper contains a valid sink.
171  

209  

172  
        @return `true` if wrapping a sink, `false` if default-constructed
210  
        @return `true` if wrapping a sink, `false` if default-constructed
173  
            or moved-from.
211  
            or moved-from.
174  
    */
212  
    */
175  
    bool
213  
    bool
176  
    has_value() const noexcept
214  
    has_value() const noexcept
177  
    {
215  
    {
178  
        return sink_ != nullptr;
216  
        return sink_ != nullptr;
179  
    }
217  
    }
180  

218  

181  
    /** Check if the wrapper contains a valid sink.
219  
    /** Check if the wrapper contains a valid sink.
182  

220  

183  
        @return `true` if wrapping a sink, `false` if default-constructed
221  
        @return `true` if wrapping a sink, `false` if default-constructed
184  
            or moved-from.
222  
            or moved-from.
185  
    */
223  
    */
186  
    explicit
224  
    explicit
187  
    operator bool() const noexcept
225  
    operator bool() const noexcept
188  
    {
226  
    {
189  
        return has_value();
227  
        return has_value();
190  
    }
228  
    }
191  

229  

192  
    /** Prepare writable buffers.
230  
    /** Prepare writable buffers.
193  

231  

194  
        Fills the provided span with mutable buffer descriptors
232  
        Fills the provided span with mutable buffer descriptors
195  
        pointing to the underlying sink's internal storage. This
233  
        pointing to the underlying sink's internal storage. This
196  
        operation is synchronous.
234  
        operation is synchronous.
197  

235  

198  
        @param dest Span of mutable_buffer to fill.
236  
        @param dest Span of mutable_buffer to fill.
199  

237  

200  
        @return A span of filled buffers.
238  
        @return A span of filled buffers.
201  

239  

202  
        @par Preconditions
240  
        @par Preconditions
203  
        The wrapper must contain a valid sink (`has_value() == true`).
241  
        The wrapper must contain a valid sink (`has_value() == true`).
204  
    */
242  
    */
205  
    std::span<mutable_buffer>
243  
    std::span<mutable_buffer>
206  
    prepare(std::span<mutable_buffer> dest);
244  
    prepare(std::span<mutable_buffer> dest);
207  

245  

208  
    /** Commit bytes written to the prepared buffers.
246  
    /** Commit bytes written to the prepared buffers.
209  

247  

210  
        Commits `n` bytes written to the buffers returned by the
248  
        Commits `n` bytes written to the buffers returned by the
211  
        most recent call to @ref prepare. The operation may trigger
249  
        most recent call to @ref prepare. The operation may trigger
212  
        underlying I/O.
250  
        underlying I/O.
213  

251  

214  
        @param n The number of bytes to commit.
252  
        @param n The number of bytes to commit.
215  

253  

216  
        @return An awaitable yielding `(error_code)`.
254  
        @return An awaitable yielding `(error_code)`.
217  

255  

218  
        @par Preconditions
256  
        @par Preconditions
219  
        The wrapper must contain a valid sink (`has_value() == true`).
257  
        The wrapper must contain a valid sink (`has_value() == true`).
220  
    */
258  
    */
221  
    auto
259  
    auto
222  
    commit(std::size_t n);
260  
    commit(std::size_t n);
223  

261  

224 -
    /** Commit bytes written with optional end-of-stream.
262 +
    /** Commit final bytes and signal end-of-stream.
225  

263  

226  
        Commits `n` bytes written to the buffers returned by the
264  
        Commits `n` bytes written to the buffers returned by the
227 -
        most recent call to @ref prepare. If `eof` is true, also
265 +
        most recent call to @ref prepare and finalizes the sink.
228 -
        signals end-of-stream.
266 +
        After success, no further operations are permitted.
229  

267  

230 -
        @param eof If true, signals end-of-stream after committing.
 
231  
        @param n The number of bytes to commit.
268  
        @param n The number of bytes to commit.
232  

269  

233  
        @return An awaitable yielding `(error_code)`.
270  
        @return An awaitable yielding `(error_code)`.
234  

271  

235  
        @par Preconditions
272  
        @par Preconditions
236  
        The wrapper must contain a valid sink (`has_value() == true`).
273  
        The wrapper must contain a valid sink (`has_value() == true`).
237  
    */
274  
    */
238  
    auto
275  
    auto
239 -
    commit(std::size_t n, bool eof);
276 +
    commit_eof(std::size_t n);
240  

277  

241 -
    /** Signal end-of-stream.
278 +
    /** Write some data from a buffer sequence.
242  

279  

243 -
        Indicates that no more data will be written to the sink.
280 +
        Writes one or more bytes from the buffer sequence to the
244 -
        The operation completes when the sink is finalized, or
281 +
        underlying sink. May consume less than the full sequence.
245 -
        an error occurs.
 
246  

282  

247 -
        @return An awaitable yielding `(error_code)`.
283 +
        When the wrapped type provides native @ref WriteSink support,
 
284 +
        the operation forwards directly. Otherwise it is synthesized
 
285 +
        from @ref prepare and @ref commit with a buffer copy.
 
286 +

 
287 +
        @param buffers The buffer sequence to write.
 
288 +

 
289 +
        @return An awaitable yielding `(error_code,std::size_t)`.
248  

290  

249  
        @par Preconditions
291  
        @par Preconditions
250  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
        The wrapper must contain a valid sink (`has_value() == true`).
251  
    */
293  
    */
252 -
    auto
294 +
    template<ConstBufferSequence CB>
253 -
    commit_eof();
295 +
    io_task<std::size_t>
 
296 +
    write_some(CB buffers);
254  

297  

255 -
    /** Write data from a buffer sequence.
298 +
    /** Write all data from a buffer sequence.
256  

299  

257  
        Writes all data from the buffer sequence to the underlying
300  
        Writes all data from the buffer sequence to the underlying
258  
        sink. This method satisfies the @ref WriteSink concept.
301  
        sink. This method satisfies the @ref WriteSink concept.
259  

302  

260 -
        @note This operation copies data from the caller's buffers
303 +
        When the wrapped type provides native @ref WriteSink support,
261 -
        into the sink's internal buffers. For zero-copy writes,
304 +
        each window is forwarded directly. Otherwise the data is
262 -
        use @ref prepare and @ref commit directly.
305 +
        copied into the sink via @ref prepare and @ref commit.
263  

306  

264  
        @param buffers The buffer sequence to write.
307  
        @param buffers The buffer sequence to write.
265  

308  

266  
        @return An awaitable yielding `(error_code,std::size_t)`.
309  
        @return An awaitable yielding `(error_code,std::size_t)`.
267  

310  

268  
        @par Preconditions
311  
        @par Preconditions
269  
        The wrapper must contain a valid sink (`has_value() == true`).
312  
        The wrapper must contain a valid sink (`has_value() == true`).
270  
    */
313  
    */
271  
    template<ConstBufferSequence CB>
314  
    template<ConstBufferSequence CB>
272 -
    task<io_result<std::size_t>>
315 +
    io_task<std::size_t>
273  
    write(CB buffers);
316  
    write(CB buffers);
274  

317  

275 -
    /** Write data with optional end-of-stream.
318 +
    /** Atomically write data and signal end-of-stream.
276  

319  

277  
        Writes all data from the buffer sequence to the underlying
320  
        Writes all data from the buffer sequence to the underlying
278 -
        sink, optionally finalizing it afterwards. This method
321 +
        sink and then signals end-of-stream.
279 -
        satisfies the @ref WriteSink concept.
 
280  

322  

281 -
        @note This operation copies data from the caller's buffers
323 +
        When the wrapped type provides native @ref WriteSink support,
282 -
        into the sink's internal buffers. For zero-copy writes,
324 +
        the final window is sent atomically via the underlying
283 -
        use @ref prepare and @ref commit directly.
325 +
        `write_eof(buffers)`. Otherwise the data is synthesized
 
326 +
        through @ref prepare, @ref commit, and @ref commit_eof.
284  

327  

285 -
        @param eof If true, finalize the sink after writing.
 
286  
        @param buffers The buffer sequence to write.
328  
        @param buffers The buffer sequence to write.
287  

329  

288  
        @return An awaitable yielding `(error_code,std::size_t)`.
330  
        @return An awaitable yielding `(error_code,std::size_t)`.
289  

331  

290  
        @par Preconditions
332  
        @par Preconditions
291  
        The wrapper must contain a valid sink (`has_value() == true`).
333  
        The wrapper must contain a valid sink (`has_value() == true`).
292  
    */
334  
    */
293  
    template<ConstBufferSequence CB>
335  
    template<ConstBufferSequence CB>
294 -
    task<io_result<std::size_t>>
336 +
    io_task<std::size_t>
295 -
    write(CB buffers, bool eof);
337 +
    write_eof(CB buffers);
296  

338  

297  
    /** Signal end-of-stream.
339  
    /** Signal end-of-stream.
298  

340  

299  
        Indicates that no more data will be written to the sink.
341  
        Indicates that no more data will be written to the sink.
300  
        This method satisfies the @ref WriteSink concept.
342  
        This method satisfies the @ref WriteSink concept.
301  

343  

 
344 +
        When the wrapped type provides native @ref WriteSink support,
 
345 +
        the underlying `write_eof()` is called. Otherwise the
 
346 +
        operation is implemented as `commit_eof(0)`.
 
347 +

302  
        @return An awaitable yielding `(error_code)`.
348  
        @return An awaitable yielding `(error_code)`.
303  

349  

304  
        @par Preconditions
350  
        @par Preconditions
305  
        The wrapper must contain a valid sink (`has_value() == true`).
351  
        The wrapper must contain a valid sink (`has_value() == true`).
306  
    */
352  
    */
307  
    auto
353  
    auto
308  
    write_eof();
354  
    write_eof();
309  

355  

310  
protected:
356  
protected:
311  
    /** Rebind to a new sink after move.
357  
    /** Rebind to a new sink after move.
312  

358  

313  
        Updates the internal pointer to reference a new sink object.
359  
        Updates the internal pointer to reference a new sink object.
314  
        Used by owning wrappers after move assignment when the owned
360  
        Used by owning wrappers after move assignment when the owned
315  
        object has moved to a new location.
361  
        object has moved to a new location.
316  

362  

317  
        @param new_sink The new sink to bind to. Must be the same
363  
        @param new_sink The new sink to bind to. Must be the same
318  
            type as the original sink.
364  
            type as the original sink.
319  

365  

320  
        @note Terminates if called with a sink of different type
366  
        @note Terminates if called with a sink of different type
321  
            than the original.
367  
            than the original.
322  
    */
368  
    */
323  
    template<BufferSink S>
369  
    template<BufferSink S>
324  
    void
370  
    void
325  
    rebind(S& new_sink) noexcept
371  
    rebind(S& new_sink) noexcept
326  
    {
372  
    {
327  
        if(vt_ != &vtable_for_impl<S>::value)
373  
        if(vt_ != &vtable_for_impl<S>::value)
328  
            std::terminate();
374  
            std::terminate();
329  
        sink_ = &new_sink;
375  
        sink_ = &new_sink;
330  
    }
376  
    }
 
377 +

 
378 +
private:
 
379 +
    /** Forward a partial write through the vtable.
 
380 +

 
381 +
        Constructs the underlying `write_some` awaitable in
 
382 +
        cached storage and returns a type-erased awaitable.
 
383 +
    */
 
384 +
    auto
 
385 +
    write_some_(std::span<const_buffer const> buffers);
 
386 +

 
387 +
    /** Forward a complete write through the vtable.
 
388 +

 
389 +
        Constructs the underlying `write` awaitable in
 
390 +
        cached storage and returns a type-erased awaitable.
 
391 +
    */
 
392 +
    auto
 
393 +
    write_(std::span<const_buffer const> buffers);
 
394 +

 
395 +
    /** Forward an atomic write-with-EOF through the vtable.
 
396 +

 
397 +
        Constructs the underlying `write_eof(buffers)` awaitable
 
398 +
        in cached storage and returns a type-erased awaitable.
 
399 +
    */
 
400 +
    auto
 
401 +
    write_eof_buffers_(std::span<const_buffer const> buffers);
331  
};
402  
};
332  

403  

333  
//----------------------------------------------------------
404  
//----------------------------------------------------------
334  

405  

 
406 +
/** Type-erased ops for awaitables yielding `io_result<>`. */
335  
struct any_buffer_sink::awaitable_ops
407  
struct any_buffer_sink::awaitable_ops
336  
{
408  
{
337  
    bool (*await_ready)(void*);
409  
    bool (*await_ready)(void*);
338  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
410  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
339  
    io_result<> (*await_resume)(void*);
411  
    io_result<> (*await_resume)(void*);
340  
    void (*destroy)(void*) noexcept;
412  
    void (*destroy)(void*) noexcept;
341  
};
413  
};
342  

414  

 
415 +
/** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
 
416 +
struct any_buffer_sink::write_awaitable_ops
 
417 +
{
 
418 +
    bool (*await_ready)(void*);
 
419 +
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
 
420 +
    io_result<std::size_t> (*await_resume)(void*);
 
421 +
    void (*destroy)(void*) noexcept;
 
422 +
};
 
423 +

343  
struct any_buffer_sink::vtable
424  
struct any_buffer_sink::vtable
344  
{
425  
{
345  
    void (*destroy)(void*) noexcept;
426  
    void (*destroy)(void*) noexcept;
346  
    std::span<mutable_buffer> (*do_prepare)(
427  
    std::span<mutable_buffer> (*do_prepare)(
347  
        void* sink,
428  
        void* sink,
348  
        std::span<mutable_buffer> dest);
429  
        std::span<mutable_buffer> dest);
349  
    std::size_t awaitable_size;
430  
    std::size_t awaitable_size;
350  
    std::size_t awaitable_align;
431  
    std::size_t awaitable_align;
351  
    awaitable_ops const* (*construct_commit_awaitable)(
432  
    awaitable_ops const* (*construct_commit_awaitable)(
352  
        void* sink,
433  
        void* sink,
353  
        void* storage,
434  
        void* storage,
354 -
        std::size_t n,
435 +
        std::size_t n);
355 -
        bool eof);
436 +
    awaitable_ops const* (*construct_commit_eof_awaitable)(
356 -
    awaitable_ops const* (*construct_eof_awaitable)(
437 +
        void* sink,
 
438 +
        void* storage,
 
439 +
        std::size_t n);
 
440 +

 
441 +
    // WriteSink forwarding (null when wrapped type is BufferSink-only)
 
442 +
    write_awaitable_ops const* (*construct_write_some_awaitable)(
 
443 +
        void* sink,
 
444 +
        void* storage,
 
445 +
        std::span<const_buffer const> buffers);
 
446 +
    write_awaitable_ops const* (*construct_write_awaitable)(
 
447 +
        void* sink,
 
448 +
        void* storage,
 
449 +
        std::span<const_buffer const> buffers);
 
450 +
    write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
 
451 +
        void* sink,
 
452 +
        void* storage,
 
453 +
        std::span<const_buffer const> buffers);
 
454 +
    awaitable_ops const* (*construct_write_eof_awaitable)(
357  
        void* sink,
455  
        void* sink,
358  
        void* storage);
456  
        void* storage);
359  
};
457  
};
360  

458  

361  
template<BufferSink S>
459  
template<BufferSink S>
362  
struct any_buffer_sink::vtable_for_impl
460  
struct any_buffer_sink::vtable_for_impl
363  
{
461  
{
364  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
462  
    using CommitAwaitable = decltype(std::declval<S&>().commit(
365 -
        std::size_t{}, false));
463 +
        std::size_t{}));
366 -
    using EofAwaitable = decltype(std::declval<S&>().commit_eof());
464 +
    using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
 
465 +
        std::size_t{}));
367  

466  

368  
    static void
467  
    static void
369  
    do_destroy_impl(void* sink) noexcept
468  
    do_destroy_impl(void* sink) noexcept
370  
    {
469  
    {
371  
        static_cast<S*>(sink)->~S();
470  
        static_cast<S*>(sink)->~S();
372  
    }
471  
    }
373  

472  

374  
    static std::span<mutable_buffer>
473  
    static std::span<mutable_buffer>
375  
    do_prepare_impl(
474  
    do_prepare_impl(
376  
        void* sink,
475  
        void* sink,
377  
        std::span<mutable_buffer> dest)
476  
        std::span<mutable_buffer> dest)
378  
    {
477  
    {
379  
        auto& s = *static_cast<S*>(sink);
478  
        auto& s = *static_cast<S*>(sink);
380  
        return s.prepare(dest);
479  
        return s.prepare(dest);
381  
    }
480  
    }
382  

481  

383  
    static awaitable_ops const*
482  
    static awaitable_ops const*
384  
    construct_commit_awaitable_impl(
483  
    construct_commit_awaitable_impl(
385  
        void* sink,
484  
        void* sink,
386  
        void* storage,
485  
        void* storage,
387 -
        std::size_t n,
486 +
        std::size_t n)
388 -
        bool eof)
 
389  
    {
487  
    {
390  
        auto& s = *static_cast<S*>(sink);
488  
        auto& s = *static_cast<S*>(sink);
391 -
        ::new(storage) CommitAwaitable(s.commit(n, eof));
489 +
        ::new(storage) CommitAwaitable(s.commit(n));
392  

490  

393  
        static constexpr awaitable_ops ops = {
491  
        static constexpr awaitable_ops ops = {
394  
            +[](void* p) {
492  
            +[](void* p) {
395  
                return static_cast<CommitAwaitable*>(p)->await_ready();
493  
                return static_cast<CommitAwaitable*>(p)->await_ready();
396  
            },
494  
            },
397  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
495  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
398  
                return detail::call_await_suspend(
496  
                return detail::call_await_suspend(
399  
                    static_cast<CommitAwaitable*>(p), h, ex, token);
497  
                    static_cast<CommitAwaitable*>(p), h, ex, token);
400  
            },
498  
            },
401  
            +[](void* p) {
499  
            +[](void* p) {
402  
                return static_cast<CommitAwaitable*>(p)->await_resume();
500  
                return static_cast<CommitAwaitable*>(p)->await_resume();
403  
            },
501  
            },
404  
            +[](void* p) noexcept {
502  
            +[](void* p) noexcept {
405  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
503  
                static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
406  
            }
504  
            }
407  
        };
505  
        };
408  
        return &ops;
506  
        return &ops;
409  
    }
507  
    }
410  

508  

411  
    static awaitable_ops const*
509  
    static awaitable_ops const*
412 -
    construct_eof_awaitable_impl(
510 +
    construct_commit_eof_awaitable_impl(
 
511 +
        void* sink,
 
512 +
        void* storage,
 
513 +
        std::size_t n)
 
514 +
    {
 
515 +
        auto& s = *static_cast<S*>(sink);
 
516 +
        ::new(storage) CommitEofAwaitable(s.commit_eof(n));
 
517 +

 
518 +
        static constexpr awaitable_ops ops = {
 
519 +
            +[](void* p) {
 
520 +
                return static_cast<CommitEofAwaitable*>(p)->await_ready();
 
521 +
            },
 
522 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
523 +
                return detail::call_await_suspend(
 
524 +
                    static_cast<CommitEofAwaitable*>(p), h, ex, token);
 
525 +
            },
 
526 +
            +[](void* p) {
 
527 +
                return static_cast<CommitEofAwaitable*>(p)->await_resume();
 
528 +
            },
 
529 +
            +[](void* p) noexcept {
 
530 +
                static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
 
531 +
            }
 
532 +
        };
 
533 +
        return &ops;
 
534 +
    }
 
535 +

 
536 +
    //------------------------------------------------------
 
537 +
    // WriteSink forwarding (only instantiated when WriteSink<S>)
 
538 +

 
539 +
    static write_awaitable_ops const*
 
540 +
    construct_write_some_awaitable_impl(
 
541 +
        void* sink,
 
542 +
        void* storage,
 
543 +
        std::span<const_buffer const> buffers)
 
544 +
        requires WriteSink<S>
 
545 +
    {
 
546 +
        using Aw = decltype(std::declval<S&>().write_some(
 
547 +
            std::span<const_buffer const>{}));
 
548 +
        auto& s = *static_cast<S*>(sink);
 
549 +
        ::new(storage) Aw(s.write_some(buffers));
 
550 +

 
551 +
        static constexpr write_awaitable_ops ops = {
 
552 +
            +[](void* p) {
 
553 +
                return static_cast<Aw*>(p)->await_ready();
 
554 +
            },
 
555 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
556 +
                return detail::call_await_suspend(
 
557 +
                    static_cast<Aw*>(p), h, ex, token);
 
558 +
            },
 
559 +
            +[](void* p) {
 
560 +
                return static_cast<Aw*>(p)->await_resume();
 
561 +
            },
 
562 +
            +[](void* p) noexcept {
 
563 +
                static_cast<Aw*>(p)->~Aw();
 
564 +
            }
 
565 +
        };
 
566 +
        return &ops;
 
567 +
    }
 
568 +

 
569 +
    static write_awaitable_ops const*
 
570 +
    construct_write_awaitable_impl(
 
571 +
        void* sink,
 
572 +
        void* storage,
 
573 +
        std::span<const_buffer const> buffers)
 
574 +
        requires WriteSink<S>
 
575 +
    {
 
576 +
        using Aw = decltype(std::declval<S&>().write(
 
577 +
            std::span<const_buffer const>{}));
 
578 +
        auto& s = *static_cast<S*>(sink);
 
579 +
        ::new(storage) Aw(s.write(buffers));
 
580 +

 
581 +
        static constexpr write_awaitable_ops ops = {
 
582 +
            +[](void* p) {
 
583 +
                return static_cast<Aw*>(p)->await_ready();
 
584 +
            },
 
585 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
586 +
                return detail::call_await_suspend(
 
587 +
                    static_cast<Aw*>(p), h, ex, token);
 
588 +
            },
 
589 +
            +[](void* p) {
 
590 +
                return static_cast<Aw*>(p)->await_resume();
 
591 +
            },
 
592 +
            +[](void* p) noexcept {
 
593 +
                static_cast<Aw*>(p)->~Aw();
 
594 +
            }
 
595 +
        };
 
596 +
        return &ops;
 
597 +
    }
 
598 +

 
599 +
    static write_awaitable_ops const*
 
600 +
    construct_write_eof_buffers_awaitable_impl(
 
601 +
        void* sink,
 
602 +
        void* storage,
 
603 +
        std::span<const_buffer const> buffers)
 
604 +
        requires WriteSink<S>
 
605 +
    {
 
606 +
        using Aw = decltype(std::declval<S&>().write_eof(
 
607 +
            std::span<const_buffer const>{}));
 
608 +
        auto& s = *static_cast<S*>(sink);
 
609 +
        ::new(storage) Aw(s.write_eof(buffers));
 
610 +

 
611 +
        static constexpr write_awaitable_ops ops = {
 
612 +
            +[](void* p) {
 
613 +
                return static_cast<Aw*>(p)->await_ready();
 
614 +
            },
 
615 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
616 +
                return detail::call_await_suspend(
 
617 +
                    static_cast<Aw*>(p), h, ex, token);
 
618 +
            },
 
619 +
            +[](void* p) {
 
620 +
                return static_cast<Aw*>(p)->await_resume();
 
621 +
            },
 
622 +
            +[](void* p) noexcept {
 
623 +
                static_cast<Aw*>(p)->~Aw();
 
624 +
            }
 
625 +
        };
 
626 +
        return &ops;
 
627 +
    }
 
628 +

 
629 +
    static awaitable_ops const*
 
630 +
    construct_write_eof_awaitable_impl(
413  
        void* sink,
631  
        void* sink,
414  
        void* storage)
632  
        void* storage)
 
633 +
        requires WriteSink<S>
415  
    {
634  
    {
 
635 +
        using Aw = decltype(std::declval<S&>().write_eof());
416  
        auto& s = *static_cast<S*>(sink);
636  
        auto& s = *static_cast<S*>(sink);
417 -
        ::new(storage) EofAwaitable(s.commit_eof());
637 +
        ::new(storage) Aw(s.write_eof());
418  

638  

419  
        static constexpr awaitable_ops ops = {
639  
        static constexpr awaitable_ops ops = {
420  
            +[](void* p) {
640  
            +[](void* p) {
421 -
                return static_cast<EofAwaitable*>(p)->await_ready();
641 +
                return static_cast<Aw*>(p)->await_ready();
422  
            },
642  
            },
423  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
643  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
424  
                return detail::call_await_suspend(
644  
                return detail::call_await_suspend(
425 -
                    static_cast<EofAwaitable*>(p), h, ex, token);
645 +
                    static_cast<Aw*>(p), h, ex, token);
426  
            },
646  
            },
427  
            +[](void* p) {
647  
            +[](void* p) {
428 -
                return static_cast<EofAwaitable*>(p)->await_resume();
648 +
                return static_cast<Aw*>(p)->await_resume();
429  
            },
649  
            },
430  
            +[](void* p) noexcept {
650  
            +[](void* p) noexcept {
431 -
                static_cast<EofAwaitable*>(p)->~EofAwaitable();
651 +
                static_cast<Aw*>(p)->~Aw();
432  
            }
652  
            }
433  
        };
653  
        };
434  
        return &ops;
654  
        return &ops;
435  
    }
655  
    }
436  

656  

437 -
    static constexpr std::size_t max_awaitable_size =
657 +
    //------------------------------------------------------
438 -
        sizeof(CommitAwaitable) > sizeof(EofAwaitable)
658 +

 
659 +
    static consteval std::size_t
 
660 +
    compute_max_size() noexcept
 
661 +
    {
 
662 +
        std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
439  
            ? sizeof(CommitAwaitable)
663  
            ? sizeof(CommitAwaitable)
440 -
            : sizeof(EofAwaitable);
664 +
            : sizeof(CommitEofAwaitable);
 
665 +
        if constexpr (WriteSink<S>)
 
666 +
        {
 
667 +
            using WS = decltype(std::declval<S&>().write_some(
 
668 +
                std::span<const_buffer const>{}));
 
669 +
            using W = decltype(std::declval<S&>().write(
 
670 +
                std::span<const_buffer const>{}));
 
671 +
            using WEB = decltype(std::declval<S&>().write_eof(
 
672 +
                std::span<const_buffer const>{}));
 
673 +
            using WE = decltype(std::declval<S&>().write_eof());
441  

674  

442 -
    static constexpr std::size_t max_awaitable_align =
675 +
            if(sizeof(WS) > s) s = sizeof(WS);
443 -
        alignof(CommitAwaitable) > alignof(EofAwaitable)
676 +
            if(sizeof(W) > s) s = sizeof(W);
 
677 +
            if(sizeof(WEB) > s) s = sizeof(WEB);
 
678 +
            if(sizeof(WE) > s) s = sizeof(WE);
 
679 +
        }
 
680 +
        return s;
 
681 +
    }
 
682 +

 
683 +
    static consteval std::size_t
 
684 +
    compute_max_align() noexcept
 
685 +
    {
 
686 +
        std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
444  
            ? alignof(CommitAwaitable)
687  
            ? alignof(CommitAwaitable)
445 -
            : alignof(EofAwaitable);
688 +
            : alignof(CommitEofAwaitable);
 
689 +
        if constexpr (WriteSink<S>)
 
690 +
        {
 
691 +
            using WS = decltype(std::declval<S&>().write_some(
 
692 +
                std::span<const_buffer const>{}));
 
693 +
            using W = decltype(std::declval<S&>().write(
 
694 +
                std::span<const_buffer const>{}));
 
695 +
            using WEB = decltype(std::declval<S&>().write_eof(
 
696 +
                std::span<const_buffer const>{}));
 
697 +
            using WE = decltype(std::declval<S&>().write_eof());
446  

698  

447 -
    static constexpr vtable value = {
699 +
            if(alignof(WS) > a) a = alignof(WS);
448 -
        &do_destroy_impl,
700 +
            if(alignof(W) > a) a = alignof(W);
449 -
        &do_prepare_impl,
701 +
            if(alignof(WEB) > a) a = alignof(WEB);
450 -
        max_awaitable_size,
702 +
            if(alignof(WE) > a) a = alignof(WE);
451 -
        max_awaitable_align,
703 +
        }
452 -
        &construct_commit_awaitable_impl,
704 +
        return a;
453 -
        &construct_eof_awaitable_impl
705 +
    }
454 -
    };
706 +

 
707 +
    static consteval vtable
 
708 +
    make_vtable() noexcept
 
709 +
    {
 
710 +
        vtable v{};
 
711 +
        v.destroy = &do_destroy_impl;
 
712 +
        v.do_prepare = &do_prepare_impl;
 
713 +
        v.awaitable_size = compute_max_size();
 
714 +
        v.awaitable_align = compute_max_align();
 
715 +
        v.construct_commit_awaitable = &construct_commit_awaitable_impl;
 
716 +
        v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
 
717 +
        v.construct_write_some_awaitable = nullptr;
 
718 +
        v.construct_write_awaitable = nullptr;
 
719 +
        v.construct_write_eof_buffers_awaitable = nullptr;
 
720 +
        v.construct_write_eof_awaitable = nullptr;
 
721 +

 
722 +
        if constexpr (WriteSink<S>)
 
723 +
        {
 
724 +
            v.construct_write_some_awaitable =
 
725 +
                &construct_write_some_awaitable_impl;
 
726 +
            v.construct_write_awaitable =
 
727 +
                &construct_write_awaitable_impl;
 
728 +
            v.construct_write_eof_buffers_awaitable =
 
729 +
                &construct_write_eof_buffers_awaitable_impl;
 
730 +
            v.construct_write_eof_awaitable =
 
731 +
                &construct_write_eof_awaitable_impl;
 
732 +
        }
 
733 +
        return v;
 
734 +
    }
 
735 +

 
736 +
    static constexpr vtable value = make_vtable();
455  
};
737  
};
456  

738  

457  
//----------------------------------------------------------
739  
//----------------------------------------------------------
458  

740  

459  
inline
741  
inline
460  
any_buffer_sink::~any_buffer_sink()
742  
any_buffer_sink::~any_buffer_sink()
461  
{
743  
{
462  
    if(storage_)
744  
    if(storage_)
463  
    {
745  
    {
464  
        vt_->destroy(sink_);
746  
        vt_->destroy(sink_);
465  
        ::operator delete(storage_);
747  
        ::operator delete(storage_);
466  
    }
748  
    }
467  
    if(cached_awaitable_)
749  
    if(cached_awaitable_)
468  
        ::operator delete(cached_awaitable_);
750  
        ::operator delete(cached_awaitable_);
469  
}
751  
}
470  

752  

471  
inline any_buffer_sink&
753  
inline any_buffer_sink&
472  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
754  
any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
473  
{
755  
{
474  
    if(this != &other)
756  
    if(this != &other)
475  
    {
757  
    {
476  
        if(storage_)
758  
        if(storage_)
477  
        {
759  
        {
478  
            vt_->destroy(sink_);
760  
            vt_->destroy(sink_);
479  
            ::operator delete(storage_);
761  
            ::operator delete(storage_);
480  
        }
762  
        }
481  
        if(cached_awaitable_)
763  
        if(cached_awaitable_)
482  
            ::operator delete(cached_awaitable_);
764  
            ::operator delete(cached_awaitable_);
483  
        sink_ = std::exchange(other.sink_, nullptr);
765  
        sink_ = std::exchange(other.sink_, nullptr);
484  
        vt_ = std::exchange(other.vt_, nullptr);
766  
        vt_ = std::exchange(other.vt_, nullptr);
485  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
767  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
486  
        storage_ = std::exchange(other.storage_, nullptr);
768  
        storage_ = std::exchange(other.storage_, nullptr);
487  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
769  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
 
770 +
        active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
488  
    }
771  
    }
489  
    return *this;
772  
    return *this;
490  
}
773  
}
491  

774  

492  
template<BufferSink S>
775  
template<BufferSink S>
493  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
776  
    requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
494  
any_buffer_sink::any_buffer_sink(S s)
777  
any_buffer_sink::any_buffer_sink(S s)
495  
    : vt_(&vtable_for_impl<S>::value)
778  
    : vt_(&vtable_for_impl<S>::value)
496  
{
779  
{
497  
    struct guard {
780  
    struct guard {
498  
        any_buffer_sink* self;
781  
        any_buffer_sink* self;
499  
        bool committed = false;
782  
        bool committed = false;
500  
        ~guard() {
783  
        ~guard() {
501  
            if(!committed && self->storage_) {
784  
            if(!committed && self->storage_) {
502  
                self->vt_->destroy(self->sink_);
785  
                self->vt_->destroy(self->sink_);
503  
                ::operator delete(self->storage_);
786  
                ::operator delete(self->storage_);
504  
                self->storage_ = nullptr;
787  
                self->storage_ = nullptr;
505  
                self->sink_ = nullptr;
788  
                self->sink_ = nullptr;
506  
            }
789  
            }
507  
        }
790  
        }
508  
    } g{this};
791  
    } g{this};
509  

792  

510  
    storage_ = ::operator new(sizeof(S));
793  
    storage_ = ::operator new(sizeof(S));
511  
    sink_ = ::new(storage_) S(std::move(s));
794  
    sink_ = ::new(storage_) S(std::move(s));
512 -
    // Preallocate the awaitable storage (sized for max of commit/eof)
 
513  

795  

514  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
796  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
515  

797  

516  
    g.committed = true;
798  
    g.committed = true;
517  
}
799  
}
518  

800  

519  
template<BufferSink S>
801  
template<BufferSink S>
520  
any_buffer_sink::any_buffer_sink(S* s)
802  
any_buffer_sink::any_buffer_sink(S* s)
521  
    : sink_(s)
803  
    : sink_(s)
522  
    , vt_(&vtable_for_impl<S>::value)
804  
    , vt_(&vtable_for_impl<S>::value)
523 -
    // Preallocate the awaitable storage (sized for max of commit/eof)
 
524  
{
805  
{
525  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
806  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
526  
}
807  
}
527  

808  

528  
//----------------------------------------------------------
809  
//----------------------------------------------------------
529  

810  

530  
inline std::span<mutable_buffer>
811  
inline std::span<mutable_buffer>
531  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
812  
any_buffer_sink::prepare(std::span<mutable_buffer> dest)
532  
{
813  
{
533  
    return vt_->do_prepare(sink_, dest);
814  
    return vt_->do_prepare(sink_, dest);
534  
}
815  
}
535  

816  

536  
inline auto
817  
inline auto
537 -
any_buffer_sink::commit(std::size_t n, bool eof)
818 +
any_buffer_sink::commit(std::size_t n)
538  
{
819  
{
539  
    struct awaitable
820  
    struct awaitable
540  
    {
821  
    {
541  
        any_buffer_sink* self_;
822  
        any_buffer_sink* self_;
542 -
        bool eof_;
 
543  
        std::size_t n_;
823  
        std::size_t n_;
544  

824  

545  
        bool
825  
        bool
546 -
        await_ready() const noexcept
826 +
        await_ready()
547  
        {
827  
        {
548 -
            return false;
828 +
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
 
829 +
                self_->sink_,
 
830 +
                self_->cached_awaitable_,
 
831 +
                n_);
 
832 +
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
549  
        }
833  
        }
550  

834  

551  
        coro
835  
        coro
552  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
836  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
553  
        {
837  
        {
554 -
            // Construct the underlying awaitable into cached storage
838 +
            return self_->active_ops_->await_suspend(
555 -
            self_->active_ops_ = self_->vt_->construct_commit_awaitable(
839 +
                self_->cached_awaitable_, h, ex, token);
 
840 +
        }
 
841 +

 
842 +
        io_result<>
 
843 +
        await_resume()
 
844 +
        {
 
845 +
            struct guard {
 
846 +
                any_buffer_sink* self;
 
847 +
                ~guard() {
 
848 +
                    self->active_ops_->destroy(self->cached_awaitable_);
 
849 +
                    self->active_ops_ = nullptr;
 
850 +
                }
 
851 +
            } g{self_};
 
852 +
            return self_->active_ops_->await_resume(
 
853 +
                self_->cached_awaitable_);
 
854 +
        }
 
855 +
    };
 
856 +
    return awaitable{this, n};
 
857 +
}
 
858 +

 
859 +
inline auto
 
860 +
any_buffer_sink::commit_eof(std::size_t n)
 
861 +
{
 
862 +
    struct awaitable
 
863 +
    {
 
864 +
        any_buffer_sink* self_;
 
865 +
        std::size_t n_;
 
866 +

 
867 +
        bool
 
868 +
        await_ready()
 
869 +
        {
 
870 +
            self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
556  
                self_->sink_,
871  
                self_->sink_,
557  
                self_->cached_awaitable_,
872  
                self_->cached_awaitable_,
558 -
                n_,
873 +
                n_);
559 -
                eof_);
874 +
            return self_->active_ops_->await_ready(self_->cached_awaitable_);
560 -

875 +
        }
561 -
            // Check if underlying is immediately ready
 
562 -
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
 
563 -
                return h;
 
564  

876  

565 -
            // Forward to underlying awaitable
877 +
        coro
 
878 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
879 +
        {
566  
            return self_->active_ops_->await_suspend(
880  
            return self_->active_ops_->await_suspend(
567  
                self_->cached_awaitable_, h, ex, token);
881  
                self_->cached_awaitable_, h, ex, token);
568  
        }
882  
        }
569  

883  

570  
        io_result<>
884  
        io_result<>
571  
        await_resume()
885  
        await_resume()
572  
        {
886  
        {
573  
            struct guard {
887  
            struct guard {
574  
                any_buffer_sink* self;
888  
                any_buffer_sink* self;
575  
                ~guard() {
889  
                ~guard() {
576  
                    self->active_ops_->destroy(self->cached_awaitable_);
890  
                    self->active_ops_->destroy(self->cached_awaitable_);
577  
                    self->active_ops_ = nullptr;
891  
                    self->active_ops_ = nullptr;
578  
                }
892  
                }
579  
            } g{self_};
893  
            } g{self_};
580  
            return self_->active_ops_->await_resume(
894  
            return self_->active_ops_->await_resume(
581  
                self_->cached_awaitable_);
895  
                self_->cached_awaitable_);
582  
        }
896  
        }
583  
    };
897  
    };
584 -
    return awaitable{this, n, eof};
898 +
    return awaitable{this, n};
585  
}
899  
}
586  

900  

 
901 +
//----------------------------------------------------------
 
902 +
// Private helpers for native WriteSink forwarding
 
903 +

587  
inline auto
904  
inline auto
588 -
any_buffer_sink::commit(std::size_t n)
905 +
any_buffer_sink::write_some_(
 
906 +
    std::span<const_buffer const> buffers)
589  
{
907  
{
590 -
    return commit(n, false);
908 +
    struct awaitable
 
909 +
    {
 
910 +
        any_buffer_sink* self_;
 
911 +
        std::span<const_buffer const> buffers_;
 
912 +

 
913 +
        bool
 
914 +
        await_ready() const noexcept
 
915 +
        {
 
916 +
            return false;
 
917 +
        }
 
918 +

 
919 +
        coro
 
920 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
921 +
        {
 
922 +
            self_->active_write_ops_ =
 
923 +
                self_->vt_->construct_write_some_awaitable(
 
924 +
                    self_->sink_,
 
925 +
                    self_->cached_awaitable_,
 
926 +
                    buffers_);
 
927 +

 
928 +
            if(self_->active_write_ops_->await_ready(
 
929 +
                self_->cached_awaitable_))
 
930 +
                return h;
 
931 +

 
932 +
            return self_->active_write_ops_->await_suspend(
 
933 +
                self_->cached_awaitable_, h, ex, token);
 
934 +
        }
 
935 +

 
936 +
        io_result<std::size_t>
 
937 +
        await_resume()
 
938 +
        {
 
939 +
            struct guard {
 
940 +
                any_buffer_sink* self;
 
941 +
                ~guard() {
 
942 +
                    self->active_write_ops_->destroy(
 
943 +
                        self->cached_awaitable_);
 
944 +
                    self->active_write_ops_ = nullptr;
 
945 +
                }
 
946 +
            } g{self_};
 
947 +
            return self_->active_write_ops_->await_resume(
 
948 +
                self_->cached_awaitable_);
 
949 +
        }
 
950 +
    };
 
951 +
    return awaitable{this, buffers};
591  
}
952  
}
592  

953  

593  
inline auto
954  
inline auto
594 -
any_buffer_sink::commit_eof()
955 +
any_buffer_sink::write_(
 
956 +
    std::span<const_buffer const> buffers)
595  
{
957  
{
596  
    struct awaitable
958  
    struct awaitable
597  
    {
959  
    {
598  
        any_buffer_sink* self_;
960  
        any_buffer_sink* self_;
 
961 +
        std::span<const_buffer const> buffers_;
599  

962  

600  
        bool
963  
        bool
601  
        await_ready() const noexcept
964  
        await_ready() const noexcept
602  
        {
965  
        {
603  
            return false;
966  
            return false;
604  
        }
967  
        }
605  

968  

606  
        coro
969  
        coro
607  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
970  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
608  
        {
971  
        {
609 -
            // Construct the underlying awaitable into cached storage
972 +
            self_->active_write_ops_ =
610 -
            self_->active_ops_ = self_->vt_->construct_eof_awaitable(
973 +
                self_->vt_->construct_write_awaitable(
611 -
                self_->sink_,
974 +
                    self_->sink_,
 
975 +
                    self_->cached_awaitable_,
 
976 +
                    buffers_);
 
977 +

 
978 +
            if(self_->active_write_ops_->await_ready(
 
979 +
                self_->cached_awaitable_))
 
980 +
                return h;
 
981 +

 
982 +
            return self_->active_write_ops_->await_suspend(
 
983 +
                self_->cached_awaitable_, h, ex, token);
 
984 +
        }
 
985 +

 
986 +
        io_result<std::size_t>
 
987 +
        await_resume()
 
988 +
        {
 
989 +
            struct guard {
 
990 +
                any_buffer_sink* self;
 
991 +
                ~guard() {
 
992 +
                    self->active_write_ops_->destroy(
 
993 +
                        self->cached_awaitable_);
 
994 +
                    self->active_write_ops_ = nullptr;
 
995 +
                }
 
996 +
            } g{self_};
 
997 +
            return self_->active_write_ops_->await_resume(
612  
                self_->cached_awaitable_);
998  
                self_->cached_awaitable_);
 
999 +
        }
 
1000 +
    };
 
1001 +
    return awaitable{this, buffers};
 
1002 +
}
613  

1003  

614 -
            // Check if underlying is immediately ready
1004 +
inline auto
615 -
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
1005 +
any_buffer_sink::write_eof_buffers_(
 
1006 +
    std::span<const_buffer const> buffers)
 
1007 +
{
 
1008 +
    struct awaitable
 
1009 +
    {
 
1010 +
        any_buffer_sink* self_;
 
1011 +
        std::span<const_buffer const> buffers_;
 
1012 +

 
1013 +
        bool
 
1014 +
        await_ready() const noexcept
 
1015 +
        {
 
1016 +
            return false;
 
1017 +
        }
 
1018 +

 
1019 +
        coro
 
1020 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
1021 +
        {
 
1022 +
            self_->active_write_ops_ =
 
1023 +
                self_->vt_->construct_write_eof_buffers_awaitable(
 
1024 +
                    self_->sink_,
 
1025 +
                    self_->cached_awaitable_,
 
1026 +
                    buffers_);
 
1027 +

 
1028 +
            if(self_->active_write_ops_->await_ready(
 
1029 +
                self_->cached_awaitable_))
616  
                return h;
1030  
                return h;
617  

1031  

618 -
            // Forward to underlying awaitable
1032 +
            return self_->active_write_ops_->await_suspend(
619 -
            return self_->active_ops_->await_suspend(
 
620  
                self_->cached_awaitable_, h, ex, token);
1033  
                self_->cached_awaitable_, h, ex, token);
621  
        }
1034  
        }
622  

1035  

623 -
        io_result<>
1036 +
        io_result<std::size_t>
624  
        await_resume()
1037  
        await_resume()
625  
        {
1038  
        {
626  
            struct guard {
1039  
            struct guard {
627  
                any_buffer_sink* self;
1040  
                any_buffer_sink* self;
628  
                ~guard() {
1041  
                ~guard() {
629 -
                    self->active_ops_->destroy(self->cached_awaitable_);
1042 +
                    self->active_write_ops_->destroy(
630 -
                    self->active_ops_ = nullptr;
1043 +
                        self->cached_awaitable_);
 
1044 +
                    self->active_write_ops_ = nullptr;
631  
                }
1045  
                }
632  
            } g{self_};
1046  
            } g{self_};
633 -
            return self_->active_ops_->await_resume(
1047 +
            return self_->active_write_ops_->await_resume(
634  
                self_->cached_awaitable_);
1048  
                self_->cached_awaitable_);
635  
        }
1049  
        }
636  
    };
1050  
    };
637 -
    return awaitable{this};
1051 +
    return awaitable{this, buffers};
638  
}
1052  
}
639  

1053  

640  
//----------------------------------------------------------
1054  
//----------------------------------------------------------
 
1055 +
// Public WriteSink methods
641  

1056  

642  
template<ConstBufferSequence CB>
1057  
template<ConstBufferSequence CB>
643 -
task<io_result<std::size_t>>
1058 +
io_task<std::size_t>
644 -
any_buffer_sink::write(CB buffers)
1059 +
any_buffer_sink::write_some(CB buffers)
645  
{
1060  
{
646 -
    return write(buffers, false);
1061 +
    buffer_param<CB> bp(buffers);
 
1062 +
    auto src = bp.data();
 
1063 +
    if(src.empty())
 
1064 +
        co_return {{}, 0};
 
1065 +

 
1066 +
    // Native WriteSink path
 
1067 +
    if(vt_->construct_write_some_awaitable)
 
1068 +
        co_return co_await write_some_(src);
 
1069 +

 
1070 +
    // Synthesized path: prepare + buffer_copy + commit
 
1071 +
    mutable_buffer arr[detail::max_iovec_];
 
1072 +
    auto dst_bufs = prepare(arr);
 
1073 +
    if(dst_bufs.empty())
 
1074 +
    {
 
1075 +
        auto [ec] = co_await commit(0);
 
1076 +
        if(ec)
 
1077 +
            co_return {ec, 0};
 
1078 +
        dst_bufs = prepare(arr);
 
1079 +
        if(dst_bufs.empty())
 
1080 +
            co_return {{}, 0};
 
1081 +
    }
 
1082 +

 
1083 +
    auto n = buffer_copy(dst_bufs, src);
 
1084 +
    auto [ec] = co_await commit(n);
 
1085 +
    if(ec)
 
1086 +
        co_return {ec, 0};
 
1087 +
    co_return {{}, n};
647  
}
1088  
}
648  

1089  

649  
template<ConstBufferSequence CB>
1090  
template<ConstBufferSequence CB>
650 -
task<io_result<std::size_t>>
1091 +
io_task<std::size_t>
651 -
any_buffer_sink::write(CB buffers, bool eof)
1092 +
any_buffer_sink::write(CB buffers)
652  
{
1093  
{
653  
    buffer_param<CB> bp(buffers);
1094  
    buffer_param<CB> bp(buffers);
654  
    std::size_t total = 0;
1095  
    std::size_t total = 0;
655  

1096  

 
1097 +
    // Native WriteSink path
 
1098 +
    if(vt_->construct_write_awaitable)
 
1099 +
    {
 
1100 +
        for(;;)
 
1101 +
        {
 
1102 +
            auto bufs = bp.data();
 
1103 +
            if(bufs.empty())
 
1104 +
                break;
 
1105 +

 
1106 +
            auto [ec, n] = co_await write_(bufs);
 
1107 +
            total += n;
 
1108 +
            if(ec)
 
1109 +
                co_return {ec, total};
 
1110 +
            bp.consume(n);
 
1111 +
        }
 
1112 +
        co_return {{}, total};
 
1113 +
    }
 
1114 +

 
1115 +
    // Synthesized path: prepare + buffer_copy + commit
656  
    for(;;)
1116  
    for(;;)
657  
    {
1117  
    {
658  
        auto src = bp.data();
1118  
        auto src = bp.data();
659  
        if(src.empty())
1119  
        if(src.empty())
660  
            break;
1120  
            break;
661  

1121  

662  
        mutable_buffer arr[detail::max_iovec_];
1122  
        mutable_buffer arr[detail::max_iovec_];
663  
        auto dst_bufs = prepare(arr);
1123  
        auto dst_bufs = prepare(arr);
664  
        if(dst_bufs.empty())
1124  
        if(dst_bufs.empty())
665  
        {
1125  
        {
666  
            auto [ec] = co_await commit(0);
1126  
            auto [ec] = co_await commit(0);
667  
            if(ec)
1127  
            if(ec)
668  
                co_return {ec, total};
1128  
                co_return {ec, total};
669  
            continue;
1129  
            continue;
670  
        }
1130  
        }
671  

1131  

672  
        auto n = buffer_copy(dst_bufs, src);
1132  
        auto n = buffer_copy(dst_bufs, src);
673  
        auto [ec] = co_await commit(n);
1133  
        auto [ec] = co_await commit(n);
674  
        if(ec)
1134  
        if(ec)
675  
            co_return {ec, total};
1135  
            co_return {ec, total};
676  
        bp.consume(n);
1136  
        bp.consume(n);
677  
        total += n;
1137  
        total += n;
678  
    }
1138  
    }
679 -
    if(eof)
 
680 -
    {
 
681 -
        auto [ec] = co_await commit_eof();
 
682 -
        if(ec)
 
683 -
            co_return {ec, total};
 
684 -
    }
 
685 -

 
686  

1139  

687  
    co_return {{}, total};
1140  
    co_return {{}, total};
688  
}
1141  
}
689  

1142  

690  
inline auto
1143  
inline auto
691  
any_buffer_sink::write_eof()
1144  
any_buffer_sink::write_eof()
692  
{
1145  
{
693 -
    return commit_eof();
1146 +
    struct awaitable
 
1147 +
    {
 
1148 +
        any_buffer_sink* self_;
 
1149 +

 
1150 +
        bool
 
1151 +
        await_ready()
 
1152 +
        {
 
1153 +
            if(self_->vt_->construct_write_eof_awaitable)
 
1154 +
            {
 
1155 +
                // Native WriteSink: forward to underlying write_eof()
 
1156 +
                self_->active_ops_ =
 
1157 +
                    self_->vt_->construct_write_eof_awaitable(
 
1158 +
                        self_->sink_,
 
1159 +
                        self_->cached_awaitable_);
 
1160 +
            }
 
1161 +
            else
 
1162 +
            {
 
1163 +
                // Synthesized: commit_eof(0)
 
1164 +
                self_->active_ops_ =
 
1165 +
                    self_->vt_->construct_commit_eof_awaitable(
 
1166 +
                        self_->sink_,
 
1167 +
                        self_->cached_awaitable_,
 
1168 +
                        0);
 
1169 +
            }
 
1170 +
            return self_->active_ops_->await_ready(
 
1171 +
                self_->cached_awaitable_);
 
1172 +
        }
 
1173 +

 
1174 +
        coro
 
1175 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
1176 +
        {
 
1177 +
            return self_->active_ops_->await_suspend(
 
1178 +
                self_->cached_awaitable_, h, ex, token);
 
1179 +
        }
 
1180 +

 
1181 +
        io_result<>
 
1182 +
        await_resume()
 
1183 +
        {
 
1184 +
            struct guard {
 
1185 +
                any_buffer_sink* self;
 
1186 +
                ~guard() {
 
1187 +
                    self->active_ops_->destroy(self->cached_awaitable_);
 
1188 +
                    self->active_ops_ = nullptr;
 
1189 +
                }
 
1190 +
            } g{self_};
 
1191 +
            return self_->active_ops_->await_resume(
 
1192 +
                self_->cached_awaitable_);
 
1193 +
        }
 
1194 +
    };
 
1195 +
    return awaitable{this};
 
1196 +
}
 
1197 +

 
1198 +
template<ConstBufferSequence CB>
 
1199 +
io_task<std::size_t>
 
1200 +
any_buffer_sink::write_eof(CB buffers)
 
1201 +
{
 
1202 +
    // Native WriteSink path
 
1203 +
    if(vt_->construct_write_eof_buffers_awaitable)
 
1204 +
    {
 
1205 +
        const_buffer_param<CB> bp(buffers);
 
1206 +
        std::size_t total = 0;
 
1207 +

 
1208 +
        for(;;)
 
1209 +
        {
 
1210 +
            auto bufs = bp.data();
 
1211 +
            if(bufs.empty())
 
1212 +
            {
 
1213 +
                auto [ec] = co_await write_eof();
 
1214 +
                co_return {ec, total};
 
1215 +
            }
 
1216 +

 
1217 +
            if(!bp.more())
 
1218 +
            {
 
1219 +
                // Last window: send atomically with EOF
 
1220 +
                auto [ec, n] = co_await write_eof_buffers_(bufs);
 
1221 +
                total += n;
 
1222 +
                co_return {ec, total};
 
1223 +
            }
 
1224 +

 
1225 +
            auto [ec, n] = co_await write_(bufs);
 
1226 +
            total += n;
 
1227 +
            if(ec)
 
1228 +
                co_return {ec, total};
 
1229 +
            bp.consume(n);
 
1230 +
        }
 
1231 +
    }
 
1232 +

 
1233 +
    // Synthesized path: prepare + buffer_copy + commit + commit_eof
 
1234 +
    buffer_param<CB> bp(buffers);
 
1235 +
    std::size_t total = 0;
 
1236 +

 
1237 +
    for(;;)
 
1238 +
    {
 
1239 +
        auto src = bp.data();
 
1240 +
        if(src.empty())
 
1241 +
            break;
 
1242 +

 
1243 +
        mutable_buffer arr[detail::max_iovec_];
 
1244 +
        auto dst_bufs = prepare(arr);
 
1245 +
        if(dst_bufs.empty())
 
1246 +
        {
 
1247 +
            auto [ec] = co_await commit(0);
 
1248 +
            if(ec)
 
1249 +
                co_return {ec, total};
 
1250 +
            continue;
 
1251 +
        }
 
1252 +

 
1253 +
        auto n = buffer_copy(dst_bufs, src);
 
1254 +
        auto [ec] = co_await commit(n);
 
1255 +
        if(ec)
 
1256 +
            co_return {ec, total};
 
1257 +
        bp.consume(n);
 
1258 +
        total += n;
 
1259 +
    }
 
1260 +

 
1261 +
    auto [ec] = co_await commit_eof(0);
 
1262 +
    if(ec)
 
1263 +
        co_return {ec, total};
 
1264 +

 
1265 +
    co_return {{}, total};
694  
}
1266  
}
695  

1267  

696  
//----------------------------------------------------------
1268  
//----------------------------------------------------------
697  

1269  

 
1270 +
static_assert(BufferSink<any_buffer_sink>);
698  
static_assert(WriteSink<any_buffer_sink>);
1271  
static_assert(WriteSink<any_buffer_sink>);
699  

1272  

700  
} // namespace capy
1273  
} // namespace capy
701  
} // namespace boost
1274  
} // namespace boost
702  

1275  

703  
#endif
1276  
#endif