1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
 
16 +
#include <boost/capy/buffers/buffer_array.hpp>
16  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/buffers/buffer_param.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/read_source.hpp>
19  
#include <boost/capy/concept/read_source.hpp>
19  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/io_result.hpp>
22  
#include <boost/capy/io_result.hpp>
22 -
#include <boost/capy/task.hpp>
23 +
#include <boost/capy/io_task.hpp>
23  

24  

24  
#include <concepts>
25  
#include <concepts>
25  
#include <coroutine>
26  
#include <coroutine>
26  
#include <cstddef>
27  
#include <cstddef>
27  
#include <new>
28  
#include <new>
28  
#include <span>
29  
#include <span>
29  
#include <stop_token>
30  
#include <stop_token>
30  
#include <system_error>
31  
#include <system_error>
31  
#include <utility>
32  
#include <utility>
32  

33  

33  
namespace boost {
34  
namespace boost {
34  
namespace capy {
35  
namespace capy {
35  

36  

36  
/** Type-erased wrapper for any ReadSource.
37  
/** Type-erased wrapper for any ReadSource.
37  

38  

38  
    This class provides type erasure for any type satisfying the
39  
    This class provides type erasure for any type satisfying the
39  
    @ref ReadSource concept, enabling runtime polymorphism for
40  
    @ref ReadSource concept, enabling runtime polymorphism for
40  
    source read operations. It uses cached awaitable storage to achieve
41  
    source read operations. It uses cached awaitable storage to achieve
41  
    zero steady-state allocation after construction.
42  
    zero steady-state allocation after construction.
42  

43  

43  
    The wrapper supports two construction modes:
44  
    The wrapper supports two construction modes:
44  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
    - **Owning**: Pass by value to transfer ownership. The wrapper
45  
      allocates storage and owns the source.
46  
      allocates storage and owns the source.
46  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
    - **Reference**: Pass a pointer to wrap without ownership. The
47  
      pointed-to source must outlive this wrapper.
48  
      pointed-to source must outlive this wrapper.
48  

49  

49  
    @par Awaitable Preallocation
50  
    @par Awaitable Preallocation
50  
    The constructor preallocates storage for the type-erased awaitable.
51  
    The constructor preallocates storage for the type-erased awaitable.
51  
    This reserves all virtual address space at server startup
52  
    This reserves all virtual address space at server startup
52  
    so memory usage can be measured up front, rather than
53  
    so memory usage can be measured up front, rather than
53  
    allocating piecemeal as traffic arrives.
54  
    allocating piecemeal as traffic arrives.
54  

55  

 
56 +
    @par Immediate Completion
 
57 +
    Operations complete immediately without suspending when the
 
58 +
    buffer sequence is empty, or when the underlying source's
 
59 +
    awaitable reports readiness via `await_ready`.
 
60 +

55  
    @par Thread Safety
61  
    @par Thread Safety
56  
    Not thread-safe. Concurrent operations on the same wrapper
62  
    Not thread-safe. Concurrent operations on the same wrapper
57  
    are undefined behavior.
63  
    are undefined behavior.
58  

64  

59  
    @par Example
65  
    @par Example
60  
    @code
66  
    @code
61  
    // Owning - takes ownership of the source
67  
    // Owning - takes ownership of the source
62  
    any_read_source rs(some_source{args...});
68  
    any_read_source rs(some_source{args...});
63  

69  

64  
    // Reference - wraps without ownership
70  
    // Reference - wraps without ownership
65  
    some_source source;
71  
    some_source source;
66  
    any_read_source rs(&source);
72  
    any_read_source rs(&source);
67  

73  

68  
    mutable_buffer buf(data, size);
74  
    mutable_buffer buf(data, size);
69  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
75  
    auto [ec, n] = co_await rs.read(std::span(&buf, 1));
70  
    @endcode
76  
    @endcode
71  

77  

72  
    @see any_read_stream, ReadSource
78  
    @see any_read_stream, ReadSource
73  
*/
79  
*/
74  
class any_read_source
80  
class any_read_source
75  
{
81  
{
76  
    struct vtable;
82  
    struct vtable;
77  
    struct awaitable_ops;
83  
    struct awaitable_ops;
78  

84  

79  
    template<ReadSource S>
85  
    template<ReadSource S>
80  
    struct vtable_for_impl;
86  
    struct vtable_for_impl;
81  

87  

82  
    void* source_ = nullptr;
88  
    void* source_ = nullptr;
83  
    vtable const* vt_ = nullptr;
89  
    vtable const* vt_ = nullptr;
84  
    void* cached_awaitable_ = nullptr;
90  
    void* cached_awaitable_ = nullptr;
85  
    void* storage_ = nullptr;
91  
    void* storage_ = nullptr;
86  
    awaitable_ops const* active_ops_ = nullptr;
92  
    awaitable_ops const* active_ops_ = nullptr;
87  

93  

88  
public:
94  
public:
89  
    /** Destructor.
95  
    /** Destructor.
90  

96  

91  
        Destroys the owned source (if any) and releases the cached
97  
        Destroys the owned source (if any) and releases the cached
92  
        awaitable storage.
98  
        awaitable storage.
93  
    */
99  
    */
94  
    ~any_read_source();
100  
    ~any_read_source();
95  

101  

96  
    /** Default constructor.
102  
    /** Default constructor.
97  

103  

98  
        Constructs an empty wrapper. Operations on a default-constructed
104  
        Constructs an empty wrapper. Operations on a default-constructed
99  
        wrapper result in undefined behavior.
105  
        wrapper result in undefined behavior.
100  
    */
106  
    */
101  
    any_read_source() = default;
107  
    any_read_source() = default;
102  

108  

103  
    /** Non-copyable.
109  
    /** Non-copyable.
104  

110  

105  
        The awaitable cache is per-instance and cannot be shared.
111  
        The awaitable cache is per-instance and cannot be shared.
106  
    */
112  
    */
107  
    any_read_source(any_read_source const&) = delete;
113  
    any_read_source(any_read_source const&) = delete;
108  
    any_read_source& operator=(any_read_source const&) = delete;
114  
    any_read_source& operator=(any_read_source const&) = delete;
109  

115  

110  
    /** Move constructor.
116  
    /** Move constructor.
111  

117  

112  
        Transfers ownership of the wrapped source (if owned) and
118  
        Transfers ownership of the wrapped source (if owned) and
113  
        cached awaitable storage from `other`. After the move, `other` is
119  
        cached awaitable storage from `other`. After the move, `other` is
114  
        in a default-constructed state.
120  
        in a default-constructed state.
115  

121  

116  
        @param other The wrapper to move from.
122  
        @param other The wrapper to move from.
117  
    */
123  
    */
118  
    any_read_source(any_read_source&& other) noexcept
124  
    any_read_source(any_read_source&& other) noexcept
119  
        : source_(std::exchange(other.source_, nullptr))
125  
        : source_(std::exchange(other.source_, nullptr))
120  
        , vt_(std::exchange(other.vt_, nullptr))
126  
        , vt_(std::exchange(other.vt_, nullptr))
121  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122  
        , storage_(std::exchange(other.storage_, nullptr))
128  
        , storage_(std::exchange(other.storage_, nullptr))
123  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
129  
        , active_ops_(std::exchange(other.active_ops_, nullptr))
124  
    {
130  
    {
125  
    }
131  
    }
126  

132  

127  
    /** Move assignment operator.
133  
    /** Move assignment operator.
128  

134  

129  
        Destroys any owned source and releases existing resources,
135  
        Destroys any owned source and releases existing resources,
130  
        then transfers ownership from `other`.
136  
        then transfers ownership from `other`.
131  

137  

132  
        @param other The wrapper to move from.
138  
        @param other The wrapper to move from.
133  
        @return Reference to this wrapper.
139  
        @return Reference to this wrapper.
134  
    */
140  
    */
135  
    any_read_source&
141  
    any_read_source&
136  
    operator=(any_read_source&& other) noexcept;
142  
    operator=(any_read_source&& other) noexcept;
137  

143  

138  
    /** Construct by taking ownership of a ReadSource.
144  
    /** Construct by taking ownership of a ReadSource.
139  

145  

140  
        Allocates storage and moves the source into this wrapper.
146  
        Allocates storage and moves the source into this wrapper.
141  
        The wrapper owns the source and will destroy it.
147  
        The wrapper owns the source and will destroy it.
142  

148  

143  
        @param s The source to take ownership of.
149  
        @param s The source to take ownership of.
144  
    */
150  
    */
145  
    template<ReadSource S>
151  
    template<ReadSource S>
146  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
152  
        requires (!std::same_as<std::decay_t<S>, any_read_source>)
147  
    any_read_source(S s);
153  
    any_read_source(S s);
148  

154  

149  
    /** Construct by wrapping a ReadSource without ownership.
155  
    /** Construct by wrapping a ReadSource without ownership.
150  

156  

151  
        Wraps the given source by pointer. The source must remain
157  
        Wraps the given source by pointer. The source must remain
152  
        valid for the lifetime of this wrapper.
158  
        valid for the lifetime of this wrapper.
153  

159  

154  
        @param s Pointer to the source to wrap.
160  
        @param s Pointer to the source to wrap.
155  
    */
161  
    */
156  
    template<ReadSource S>
162  
    template<ReadSource S>
157  
    any_read_source(S* s);
163  
    any_read_source(S* s);
158  

164  

159  
    /** Check if the wrapper contains a valid source.
165  
    /** Check if the wrapper contains a valid source.
160  

166  

161  
        @return `true` if wrapping a source, `false` if default-constructed
167  
        @return `true` if wrapping a source, `false` if default-constructed
162  
            or moved-from.
168  
            or moved-from.
163  
    */
169  
    */
164  
    bool
170  
    bool
165  
    has_value() const noexcept
171  
    has_value() const noexcept
166  
    {
172  
    {
167  
        return source_ != nullptr;
173  
        return source_ != nullptr;
168  
    }
174  
    }
169  

175  

170  
    /** Check if the wrapper contains a valid source.
176  
    /** Check if the wrapper contains a valid source.
171  

177  

172  
        @return `true` if wrapping a source, `false` if default-constructed
178  
        @return `true` if wrapping a source, `false` if default-constructed
173  
            or moved-from.
179  
            or moved-from.
174  
    */
180  
    */
175  
    explicit
181  
    explicit
176  
    operator bool() const noexcept
182  
    operator bool() const noexcept
177  
    {
183  
    {
178  
        return has_value();
184  
        return has_value();
179  
    }
185  
    }
180  

186  

181 -
    /** Initiate an asynchronous read operation.
187 +
    /** Initiate a partial read operation.
182  

188  

183 -
        Reads data into the provided buffer sequence. The operation
189 +
        Reads one or more bytes into the provided buffer sequence.
184 -
        completes when the entire buffer sequence is filled, end-of-file
190 +
        May fill less than the full sequence.
185 -
        is reached, or an error occurs.
 
186  

191  

187 -
        @param buffers The buffer sequence to read into. Passed by
192 +
        @param buffers The buffer sequence to read into.
188 -
            value to ensure the sequence lives in the coroutine frame
 
189 -
            across suspension points.
 
190  

193  

191  
        @return An awaitable yielding `(error_code,std::size_t)`.
194  
        @return An awaitable yielding `(error_code,std::size_t)`.
192  

195  

 
196 +
        @par Immediate Completion
 
197 +
        The operation completes immediately without suspending
 
198 +
        the calling coroutine when:
 
199 +
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
 
200 +
        @li The underlying source's awaitable reports immediate
 
201 +
            readiness via `await_ready`.
 
202 +

 
203 +
        @note This is a partial operation and may not process the
 
204 +
        entire buffer sequence. Use @ref read for guaranteed
 
205 +
        complete transfer.
 
206 +

 
207 +
        @par Preconditions
 
208 +
        The wrapper must contain a valid source (`has_value() == true`).
 
209 +
        The caller must not call this function again after a prior
 
210 +
        call returned an error (including EOF).
 
211 +
    */
 
212 +
    template<MutableBufferSequence MB>
 
213 +
    auto
 
214 +
    read_some(MB buffers);
 
215 +

 
216 +
    /** Initiate a complete read operation.
 
217 +

 
218 +
        Reads data into the provided buffer sequence by forwarding
 
219 +
        to the underlying source's `read` operation. Large buffer
 
220 +
        sequences are processed in windows, with each window
 
221 +
        forwarded as a separate `read` call to the underlying source.
 
222 +
        The operation completes when the entire buffer sequence is
 
223 +
        filled, end-of-file is reached, or an error occurs.
 
224 +

 
225 +
        @param buffers The buffer sequence to read into.
 
226 +

 
227 +
        @return An awaitable yielding `(error_code,std::size_t)`.
 
228 +

 
229 +
        @par Immediate Completion
 
230 +
        The operation completes immediately without suspending
 
231 +
        the calling coroutine when:
 
232 +
        @li The buffer sequence is empty, returning `{error_code{}, 0}`.
 
233 +
        @li The underlying source's `read` awaitable reports
 
234 +
            immediate readiness via `await_ready`.
 
235 +

193  
        @par Postconditions
236  
        @par Postconditions
194  
        Exactly one of the following is true on return:
237  
        Exactly one of the following is true on return:
195  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
238  
        @li **Success**: `!ec` and `n == buffer_size(buffers)`.
196  
            The entire buffer was filled.
239  
            The entire buffer was filled.
197  
        @li **End-of-stream or Error**: `ec` and `n` indicates
240  
        @li **End-of-stream or Error**: `ec` and `n` indicates
198  
            the number of bytes transferred before the failure.
241  
            the number of bytes transferred before the failure.
199  

242  

200  
        @par Preconditions
243  
        @par Preconditions
201  
        The wrapper must contain a valid source (`has_value() == true`).
244  
        The wrapper must contain a valid source (`has_value() == true`).
 
245 +
        The caller must not call this function again after a prior
 
246 +
        call returned an error (including EOF).
202  
    */
247  
    */
203  
    template<MutableBufferSequence MB>
248  
    template<MutableBufferSequence MB>
204 -
    task<io_result<std::size_t>>
249 +
    io_task<std::size_t>
205  
    read(MB buffers);
250  
    read(MB buffers);
206  

251  

207  
protected:
252  
protected:
208  
    /** Rebind to a new source after move.
253  
    /** Rebind to a new source after move.
209  

254  

210  
        Updates the internal pointer to reference a new source object.
255  
        Updates the internal pointer to reference a new source object.
211  
        Used by owning wrappers after move assignment when the owned
256  
        Used by owning wrappers after move assignment when the owned
212  
        object has moved to a new location.
257  
        object has moved to a new location.
213  

258  

214  
        @param new_source The new source to bind to. Must be the same
259  
        @param new_source The new source to bind to. Must be the same
215  
            type as the original source.
260  
            type as the original source.
216  

261  

217  
        @note Terminates if called with a source of different type
262  
        @note Terminates if called with a source of different type
218  
            than the original.
263  
            than the original.
219  
    */
264  
    */
220  
    template<ReadSource S>
265  
    template<ReadSource S>
221  
    void
266  
    void
222  
    rebind(S& new_source) noexcept
267  
    rebind(S& new_source) noexcept
223  
    {
268  
    {
224  
        if(vt_ != &vtable_for_impl<S>::value)
269  
        if(vt_ != &vtable_for_impl<S>::value)
225  
            std::terminate();
270  
            std::terminate();
226  
        source_ = &new_source;
271  
        source_ = &new_source;
227  
    }
272  
    }
228  

273  

229  
private:
274  
private:
230  
    auto
275  
    auto
231 -
    read_some_(std::span<mutable_buffer const> buffers);
276 +
    read_(std::span<mutable_buffer const> buffers);
232  
};
277  
};
233  

278  

234  
//----------------------------------------------------------
279  
//----------------------------------------------------------
235  

280  

 
281 +
// ordered by call sequence for cache line coherence
236  
struct any_read_source::awaitable_ops
282  
struct any_read_source::awaitable_ops
237  
{
283  
{
238  
    bool (*await_ready)(void*);
284  
    bool (*await_ready)(void*);
239  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
285  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
240  
    io_result<std::size_t> (*await_resume)(void*);
286  
    io_result<std::size_t> (*await_resume)(void*);
241  
    void (*destroy)(void*) noexcept;
287  
    void (*destroy)(void*) noexcept;
242  
};
288  
};
243  

289  

 
290 +
// ordered by call frequency for cache line coherence
244  
struct any_read_source::vtable
291  
struct any_read_source::vtable
245  
{
292  
{
246 -
    void (*destroy)(void*) noexcept;
293 +
    awaitable_ops const* (*construct_read_some_awaitable)(
247 -
    std::size_t awaitable_size;
294 +
        void* source,
248 -
    std::size_t awaitable_align;
295 +
        void* storage,
249 -
    awaitable_ops const* (*construct_awaitable)(
296 +
        std::span<mutable_buffer const> buffers);
 
297 +
    awaitable_ops const* (*construct_read_awaitable)(
250  
        void* source,
298  
        void* source,
251  
        void* storage,
299  
        void* storage,
252  
        std::span<mutable_buffer const> buffers);
300  
        std::span<mutable_buffer const> buffers);
 
301 +
    std::size_t awaitable_size;
 
302 +
    std::size_t awaitable_align;
 
303 +
    void (*destroy)(void*) noexcept;
253  
};
304  
};
254  

305  

255  
template<ReadSource S>
306  
template<ReadSource S>
256  
struct any_read_source::vtable_for_impl
307  
struct any_read_source::vtable_for_impl
257  
{
308  
{
258 -
    using Awaitable = decltype(std::declval<S&>().read(
309 +
    using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
 
310 +
        std::span<mutable_buffer const>{}));
 
311 +
    using ReadAwaitable = decltype(std::declval<S&>().read(
259  
        std::span<mutable_buffer const>{}));
312  
        std::span<mutable_buffer const>{}));
260  

313  

261  
    static void
314  
    static void
262  
    do_destroy_impl(void* source) noexcept
315  
    do_destroy_impl(void* source) noexcept
263  
    {
316  
    {
264  
        static_cast<S*>(source)->~S();
317  
        static_cast<S*>(source)->~S();
265  
    }
318  
    }
266  

319  

267  
    static awaitable_ops const*
320  
    static awaitable_ops const*
268 -
    construct_awaitable_impl(
321 +
    construct_read_some_awaitable_impl(
269  
        void* source,
322  
        void* source,
270  
        void* storage,
323  
        void* storage,
271  
        std::span<mutable_buffer const> buffers)
324  
        std::span<mutable_buffer const> buffers)
272  
    {
325  
    {
273  
        auto& s = *static_cast<S*>(source);
326  
        auto& s = *static_cast<S*>(source);
274 -
        ::new(storage) Awaitable(s.read(buffers));
327 +
        ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
275  

328  

276  
        static constexpr awaitable_ops ops = {
329  
        static constexpr awaitable_ops ops = {
277  
            +[](void* p) {
330  
            +[](void* p) {
278 -
                return static_cast<Awaitable*>(p)->await_ready();
331 +
                return static_cast<ReadSomeAwaitable*>(p)->await_ready();
279  
            },
332  
            },
280  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
333  
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
281  
                return detail::call_await_suspend(
334  
                return detail::call_await_suspend(
282 -
                    static_cast<Awaitable*>(p), h, ex, token);
335 +
                    static_cast<ReadSomeAwaitable*>(p), h, ex, token);
283  
            },
336  
            },
284  
            +[](void* p) {
337  
            +[](void* p) {
285 -
                return static_cast<Awaitable*>(p)->await_resume();
338 +
                return static_cast<ReadSomeAwaitable*>(p)->await_resume();
286  
            },
339  
            },
287  
            +[](void* p) noexcept {
340  
            +[](void* p) noexcept {
288 -
                static_cast<Awaitable*>(p)->~Awaitable();
341 +
                static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
289  
            }
342  
            }
290  
        };
343  
        };
291  
        return &ops;
344  
        return &ops;
292  
    }
345  
    }
293  

346  

 
347 +
    static awaitable_ops const*
 
348 +
    construct_read_awaitable_impl(
 
349 +
        void* source,
 
350 +
        void* storage,
 
351 +
        std::span<mutable_buffer const> buffers)
 
352 +
    {
 
353 +
        auto& s = *static_cast<S*>(source);
 
354 +
        ::new(storage) ReadAwaitable(s.read(buffers));
 
355 +

 
356 +
        static constexpr awaitable_ops ops = {
 
357 +
            +[](void* p) {
 
358 +
                return static_cast<ReadAwaitable*>(p)->await_ready();
 
359 +
            },
 
360 +
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
361 +
                return detail::call_await_suspend(
 
362 +
                    static_cast<ReadAwaitable*>(p), h, ex, token);
 
363 +
            },
 
364 +
            +[](void* p) {
 
365 +
                return static_cast<ReadAwaitable*>(p)->await_resume();
 
366 +
            },
 
367 +
            +[](void* p) noexcept {
 
368 +
                static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
 
369 +
            }
 
370 +
        };
 
371 +
        return &ops;
 
372 +
    }
 
373 +

 
374 +
    static constexpr std::size_t max_awaitable_size =
 
375 +
        sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
 
376 +
            ? sizeof(ReadSomeAwaitable)
 
377 +
            : sizeof(ReadAwaitable);
 
378 +
    static constexpr std::size_t max_awaitable_align =
 
379 +
        alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
 
380 +
            ? alignof(ReadSomeAwaitable)
 
381 +
            : alignof(ReadAwaitable);
 
382 +

294  
    static constexpr vtable value = {
383  
    static constexpr vtable value = {
295 -
        &do_destroy_impl,
384 +
        &construct_read_some_awaitable_impl,
296 -
        sizeof(Awaitable),
385 +
        &construct_read_awaitable_impl,
297 -
        alignof(Awaitable),
386 +
        max_awaitable_size,
298 -
        &construct_awaitable_impl
387 +
        max_awaitable_align,
 
388 +
        &do_destroy_impl
299  
    };
389  
    };
300  
};
390  
};
301  

391  

302  
//----------------------------------------------------------
392  
//----------------------------------------------------------
303  

393  

304  
inline
394  
inline
305  
any_read_source::~any_read_source()
395  
any_read_source::~any_read_source()
306  
{
396  
{
307  
    if(storage_)
397  
    if(storage_)
308  
    {
398  
    {
309  
        vt_->destroy(source_);
399  
        vt_->destroy(source_);
310  
        ::operator delete(storage_);
400  
        ::operator delete(storage_);
311  
    }
401  
    }
312  
    if(cached_awaitable_)
402  
    if(cached_awaitable_)
 
403 +
    {
 
404 +
        if(active_ops_)
 
405 +
            active_ops_->destroy(cached_awaitable_);
313  
        ::operator delete(cached_awaitable_);
406  
        ::operator delete(cached_awaitable_);
 
407 +
    }
314  
}
408  
}
315  

409  

316  
inline any_read_source&
410  
inline any_read_source&
317  
any_read_source::operator=(any_read_source&& other) noexcept
411  
any_read_source::operator=(any_read_source&& other) noexcept
318  
{
412  
{
319  
    if(this != &other)
413  
    if(this != &other)
320  
    {
414  
    {
321  
        if(storage_)
415  
        if(storage_)
322  
        {
416  
        {
323  
            vt_->destroy(source_);
417  
            vt_->destroy(source_);
324  
            ::operator delete(storage_);
418  
            ::operator delete(storage_);
325  
        }
419  
        }
326  
        if(cached_awaitable_)
420  
        if(cached_awaitable_)
 
421 +
        {
 
422 +
            if(active_ops_)
 
423 +
                active_ops_->destroy(cached_awaitable_);
327  
            ::operator delete(cached_awaitable_);
424  
            ::operator delete(cached_awaitable_);
 
425 +
        }
328  
        source_ = std::exchange(other.source_, nullptr);
426  
        source_ = std::exchange(other.source_, nullptr);
329  
        vt_ = std::exchange(other.vt_, nullptr);
427  
        vt_ = std::exchange(other.vt_, nullptr);
330  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
428  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
331  
        storage_ = std::exchange(other.storage_, nullptr);
429  
        storage_ = std::exchange(other.storage_, nullptr);
332  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
430  
        active_ops_ = std::exchange(other.active_ops_, nullptr);
333  
    }
431  
    }
334  
    return *this;
432  
    return *this;
335  
}
433  
}
336  

434  

337  
template<ReadSource S>
435  
template<ReadSource S>
338  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
436  
    requires (!std::same_as<std::decay_t<S>, any_read_source>)
339  
any_read_source::any_read_source(S s)
437  
any_read_source::any_read_source(S s)
340  
    : vt_(&vtable_for_impl<S>::value)
438  
    : vt_(&vtable_for_impl<S>::value)
341  
{
439  
{
342  
    struct guard {
440  
    struct guard {
343  
        any_read_source* self;
441  
        any_read_source* self;
344  
        bool committed = false;
442  
        bool committed = false;
345  
        ~guard() {
443  
        ~guard() {
346  
            if(!committed && self->storage_) {
444  
            if(!committed && self->storage_) {
347  
                self->vt_->destroy(self->source_);
445  
                self->vt_->destroy(self->source_);
348  
                ::operator delete(self->storage_);
446  
                ::operator delete(self->storage_);
349  
                self->storage_ = nullptr;
447  
                self->storage_ = nullptr;
350  
                self->source_ = nullptr;
448  
                self->source_ = nullptr;
351  
            }
449  
            }
352  
        }
450  
        }
353  
    } g{this};
451  
    } g{this};
354  

452  

355  
    storage_ = ::operator new(sizeof(S));
453  
    storage_ = ::operator new(sizeof(S));
356  
    source_ = ::new(storage_) S(std::move(s));
454  
    source_ = ::new(storage_) S(std::move(s));
357  

455  

358  
    // Preallocate the awaitable storage
456  
    // Preallocate the awaitable storage
359  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
457  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
360  

458  

361  
    g.committed = true;
459  
    g.committed = true;
362  
}
460  
}
363  

461  

364  
template<ReadSource S>
462  
template<ReadSource S>
365  
any_read_source::any_read_source(S* s)
463  
any_read_source::any_read_source(S* s)
366  
    : source_(s)
464  
    : source_(s)
367  
    , vt_(&vtable_for_impl<S>::value)
465  
    , vt_(&vtable_for_impl<S>::value)
368  
{
466  
{
369  
    // Preallocate the awaitable storage
467  
    // Preallocate the awaitable storage
370  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
468  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
371  
}
469  
}
372  

470  

373  
//----------------------------------------------------------
471  
//----------------------------------------------------------
374  

472  

 
473 +
template<MutableBufferSequence MB>
 
474 +
auto
 
475 +
any_read_source::read_some(MB buffers)
 
476 +
{
 
477 +
    struct awaitable
 
478 +
    {
 
479 +
        any_read_source* self_;
 
480 +
        mutable_buffer_array<detail::max_iovec_> ba_;
 
481 +

 
482 +
        awaitable(any_read_source* self, MB const& buffers)
 
483 +
            : self_(self)
 
484 +
            , ba_(buffers)
 
485 +
        {
 
486 +
        }
 
487 +

 
488 +
        bool
 
489 +
        await_ready() const noexcept
 
490 +
        {
 
491 +
            return ba_.to_span().empty();
 
492 +
        }
 
493 +

 
494 +
        coro
 
495 +
        await_suspend(coro h, executor_ref ex, std::stop_token token)
 
496 +
        {
 
497 +
            self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
 
498 +
                self_->source_,
 
499 +
                self_->cached_awaitable_,
 
500 +
                ba_.to_span());
 
501 +

 
502 +
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
 
503 +
                return h;
 
504 +

 
505 +
            return self_->active_ops_->await_suspend(
 
506 +
                self_->cached_awaitable_, h, ex, token);
 
507 +
        }
 
508 +

 
509 +
        io_result<std::size_t>
 
510 +
        await_resume()
 
511 +
        {
 
512 +
            if(ba_.to_span().empty())
 
513 +
                return {{}, 0};
 
514 +

 
515 +
            struct guard {
 
516 +
                any_read_source* self;
 
517 +
                ~guard() {
 
518 +
                    self->active_ops_->destroy(self->cached_awaitable_);
 
519 +
                    self->active_ops_ = nullptr;
 
520 +
                }
 
521 +
            } g{self_};
 
522 +
            return self_->active_ops_->await_resume(
 
523 +
                self_->cached_awaitable_);
 
524 +
        }
 
525 +
    };
 
526 +
    return awaitable(this, buffers);
 
527 +
}
 
528 +

375  
inline auto
529  
inline auto
376 -
any_read_source::read_some_(std::span<mutable_buffer const> buffers)
530 +
any_read_source::read_(std::span<mutable_buffer const> buffers)
377  
{
531  
{
378  
    struct awaitable
532  
    struct awaitable
379  
    {
533  
    {
380  
        any_read_source* self_;
534  
        any_read_source* self_;
381  
        std::span<mutable_buffer const> buffers_;
535  
        std::span<mutable_buffer const> buffers_;
382  

536  

383  
        bool
537  
        bool
384  
        await_ready() const noexcept
538  
        await_ready() const noexcept
385  
        {
539  
        {
386  
            return false;
540  
            return false;
387  
        }
541  
        }
388  

542  

389  
        coro
543  
        coro
390  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
544  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
391  
        {
545  
        {
392 -
            // Construct the underlying awaitable into cached storage
546 +
            self_->active_ops_ = self_->vt_->construct_read_awaitable(
393 -
            self_->active_ops_ = self_->vt_->construct_awaitable(
 
394  
                self_->source_,
547  
                self_->source_,
395  
                self_->cached_awaitable_,
548  
                self_->cached_awaitable_,
396  
                buffers_);
549  
                buffers_);
397 -
            // Check if underlying is immediately ready
 
398  

550  

399  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
551  
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
400  
                return h;
552  
                return h;
401 -
            // Forward to underlying awaitable
 
402  

553  

403  
            return self_->active_ops_->await_suspend(
554  
            return self_->active_ops_->await_suspend(
404  
                self_->cached_awaitable_, h, ex, token);
555  
                self_->cached_awaitable_, h, ex, token);
405  
        }
556  
        }
406  

557  

407  
        io_result<std::size_t>
558  
        io_result<std::size_t>
408  
        await_resume()
559  
        await_resume()
409  
        {
560  
        {
410  
            struct guard {
561  
            struct guard {
411  
                any_read_source* self;
562  
                any_read_source* self;
412  
                ~guard() {
563  
                ~guard() {
413  
                    self->active_ops_->destroy(self->cached_awaitable_);
564  
                    self->active_ops_->destroy(self->cached_awaitable_);
414  
                    self->active_ops_ = nullptr;
565  
                    self->active_ops_ = nullptr;
415  
                }
566  
                }
416  
            } g{self_};
567  
            } g{self_};
417  
            return self_->active_ops_->await_resume(
568  
            return self_->active_ops_->await_resume(
418  
                self_->cached_awaitable_);
569  
                self_->cached_awaitable_);
419  
        }
570  
        }
420  
    };
571  
    };
421  
    return awaitable{this, buffers};
572  
    return awaitable{this, buffers};
422  
}
573  
}
423  

574  

424  
template<MutableBufferSequence MB>
575  
template<MutableBufferSequence MB>
425 -
task<io_result<std::size_t>>
576 +
io_task<std::size_t>
426  
any_read_source::read(MB buffers)
577  
any_read_source::read(MB buffers)
427  
{
578  
{
428 -
    buffer_param<MB> bp(std::move(buffers));
579 +
    buffer_param bp(buffers);
429  
    std::size_t total = 0;
580  
    std::size_t total = 0;
430  

581  

431  
    for(;;)
582  
    for(;;)
432  
    {
583  
    {
433  
        auto bufs = bp.data();
584  
        auto bufs = bp.data();
434  
        if(bufs.empty())
585  
        if(bufs.empty())
435  
            break;
586  
            break;
436  

587  

437 -
        auto [ec, n] = co_await read_some_(bufs);
588 +
        auto [ec, n] = co_await read_(bufs);
438  
        total += n;
589  
        total += n;
439  
        if(ec)
590  
        if(ec)
440  
            co_return {ec, total};
591  
            co_return {ec, total};
441  
        bp.consume(n);
592  
        bp.consume(n);
442  
    }
593  
    }
443  

594  

444  
    co_return {{}, total};
595  
    co_return {{}, total};
445  
}
596  
}
446  

597  

447  
} // namespace capy
598  
} // namespace capy
448  
} // namespace boost
599  
} // namespace boost
449  

600  

450  
#endif
601  
#endif