1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
10  
#ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11  
#define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
14  
#include <boost/capy/detail/await_suspend_helper.hpp>
15  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers.hpp>
16 -
#include <boost/capy/buffers/buffer_param.hpp>
16 +
#include <boost/capy/buffers/buffer_array.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19  
#include <boost/capy/coro.hpp>
19  
#include <boost/capy/coro.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
20  
#include <boost/capy/ex/executor_ref.hpp>
21  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/io_result.hpp>
22  

22  

23  
#include <concepts>
23  
#include <concepts>
24  
#include <coroutine>
24  
#include <coroutine>
25  
#include <cstddef>
25  
#include <cstddef>
26  
#include <new>
26  
#include <new>
27  
#include <span>
27  
#include <span>
28  
#include <stop_token>
28  
#include <stop_token>
29  
#include <system_error>
29  
#include <system_error>
30  
#include <utility>
30  
#include <utility>
31  

31  

32  
namespace boost {
32  
namespace boost {
33  
namespace capy {
33  
namespace capy {
34  

34  

35  
/** Type-erased wrapper for any ReadStream.
35  
/** Type-erased wrapper for any ReadStream.
36  

36  

37  
    This class provides type erasure for any type satisfying the
37  
    This class provides type erasure for any type satisfying the
38  
    @ref ReadStream concept, enabling runtime polymorphism for
38  
    @ref ReadStream concept, enabling runtime polymorphism for
39  
    read operations. It uses cached awaitable storage to achieve
39  
    read operations. It uses cached awaitable storage to achieve
40  
    zero steady-state allocation after construction.
40  
    zero steady-state allocation after construction.
41  

41  

42  
    The wrapper supports two construction modes:
42  
    The wrapper supports two construction modes:
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
43  
    - **Owning**: Pass by value to transfer ownership. The wrapper
44  
      allocates storage and owns the stream.
44  
      allocates storage and owns the stream.
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
45  
    - **Reference**: Pass a pointer to wrap without ownership. The
46  
      pointed-to stream must outlive this wrapper.
46  
      pointed-to stream must outlive this wrapper.
47  

47  

48  
    @par Awaitable Preallocation
48  
    @par Awaitable Preallocation
49  
    The constructor preallocates storage for the type-erased awaitable.
49  
    The constructor preallocates storage for the type-erased awaitable.
50  
    This reserves all virtual address space at server startup
50  
    This reserves all virtual address space at server startup
51  
    so memory usage can be measured up front, rather than
51  
    so memory usage can be measured up front, rather than
52  
    allocating piecemeal as traffic arrives.
52  
    allocating piecemeal as traffic arrives.
53  

53  

 
54 +
    @par Immediate Completion
 
55 +
    When the underlying stream's awaitable reports ready immediately
 
56 +
    (e.g. buffered data already available), the wrapper skips
 
57 +
    coroutine suspension entirely and returns the result inline.
 
58 +

54  
    @par Thread Safety
59  
    @par Thread Safety
55  
    Not thread-safe. Concurrent operations on the same wrapper
60  
    Not thread-safe. Concurrent operations on the same wrapper
56  
    are undefined behavior.
61  
    are undefined behavior.
57  

62  

58  
    @par Example
63  
    @par Example
59  
    @code
64  
    @code
60  
    // Owning - takes ownership of the stream
65  
    // Owning - takes ownership of the stream
61  
    any_read_stream stream(socket{ioc});
66  
    any_read_stream stream(socket{ioc});
62  

67  

63  
    // Reference - wraps without ownership
68  
    // Reference - wraps without ownership
64  
    socket sock(ioc);
69  
    socket sock(ioc);
65  
    any_read_stream stream(&sock);
70  
    any_read_stream stream(&sock);
66  

71  

67  
    mutable_buffer buf(data, size);
72  
    mutable_buffer buf(data, size);
68 -
    auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
73 +
    auto [ec, n] = co_await stream.read_some(buf);
69  
    @endcode
74  
    @endcode
70  

75  

71  
    @see any_write_stream, any_stream, ReadStream
76  
    @see any_write_stream, any_stream, ReadStream
72  
*/
77  
*/
73  
class any_read_stream
78  
class any_read_stream
74  
{
79  
{
75 -
    struct awaitable_ops;
 
76  
    struct vtable;
80  
    struct vtable;
77  

81  

78  
    template<ReadStream S>
82  
    template<ReadStream S>
79  
    struct vtable_for_impl;
83  
    struct vtable_for_impl;
80  

84  

 
85 +
    // ordered for cache line coherence
81  
    void* stream_ = nullptr;
86  
    void* stream_ = nullptr;
82  
    vtable const* vt_ = nullptr;
87  
    vtable const* vt_ = nullptr;
83  
    void* cached_awaitable_ = nullptr;
88  
    void* cached_awaitable_ = nullptr;
84  
    void* storage_ = nullptr;
89  
    void* storage_ = nullptr;
85 -
    awaitable_ops const* active_ops_ = nullptr;
90 +
    bool awaitable_active_ = false;
86  

91  

87  
public:
92  
public:
88  
    /** Destructor.
93  
    /** Destructor.
89  

94  

90  
        Destroys the owned stream (if any) and releases the cached
95  
        Destroys the owned stream (if any) and releases the cached
91  
        awaitable storage.
96  
        awaitable storage.
92  
    */
97  
    */
93  
    ~any_read_stream();
98  
    ~any_read_stream();
94  

99  

95  
    /** Default constructor.
100  
    /** Default constructor.
96  

101  

97  
        Constructs an empty wrapper. Operations on a default-constructed
102  
        Constructs an empty wrapper. Operations on a default-constructed
98  
        wrapper result in undefined behavior.
103  
        wrapper result in undefined behavior.
99  
    */
104  
    */
100  
    any_read_stream() = default;
105  
    any_read_stream() = default;
101  

106  

102  
    /** Non-copyable.
107  
    /** Non-copyable.
103  

108  

104  
        The awaitable cache is per-instance and cannot be shared.
109  
        The awaitable cache is per-instance and cannot be shared.
105  
    */
110  
    */
106  
    any_read_stream(any_read_stream const&) = delete;
111  
    any_read_stream(any_read_stream const&) = delete;
107  
    any_read_stream& operator=(any_read_stream const&) = delete;
112  
    any_read_stream& operator=(any_read_stream const&) = delete;
108  

113  

109  
    /** Move constructor.
114  
    /** Move constructor.
110  

115  

111  
        Transfers ownership of the wrapped stream (if owned) and
116  
        Transfers ownership of the wrapped stream (if owned) and
112  
        cached awaitable storage from `other`. After the move, `other` is
117  
        cached awaitable storage from `other`. After the move, `other` is
113  
        in a default-constructed state.
118  
        in a default-constructed state.
114  

119  

115  
        @param other The wrapper to move from.
120  
        @param other The wrapper to move from.
116  
    */
121  
    */
117  
    any_read_stream(any_read_stream&& other) noexcept
122  
    any_read_stream(any_read_stream&& other) noexcept
118  
        : stream_(std::exchange(other.stream_, nullptr))
123  
        : stream_(std::exchange(other.stream_, nullptr))
119  
        , vt_(std::exchange(other.vt_, nullptr))
124  
        , vt_(std::exchange(other.vt_, nullptr))
120  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
125  
        , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121  
        , storage_(std::exchange(other.storage_, nullptr))
126  
        , storage_(std::exchange(other.storage_, nullptr))
122 -
        , active_ops_(std::exchange(other.active_ops_, nullptr))
127 +
        , awaitable_active_(std::exchange(other.awaitable_active_, false))
123  
    {
128  
    {
124  
    }
129  
    }
125  

130  

126  
    /** Move assignment operator.
131  
    /** Move assignment operator.
127  

132  

128  
        Destroys any owned stream and releases existing resources,
133  
        Destroys any owned stream and releases existing resources,
129  
        then transfers ownership from `other`.
134  
        then transfers ownership from `other`.
130  

135  

131  
        @param other The wrapper to move from.
136  
        @param other The wrapper to move from.
132  
        @return Reference to this wrapper.
137  
        @return Reference to this wrapper.
133  
    */
138  
    */
134  
    any_read_stream&
139  
    any_read_stream&
135  
    operator=(any_read_stream&& other) noexcept;
140  
    operator=(any_read_stream&& other) noexcept;
136  

141  

137  
    /** Construct by taking ownership of a ReadStream.
142  
    /** Construct by taking ownership of a ReadStream.
138  

143  

139  
        Allocates storage and moves the stream into this wrapper.
144  
        Allocates storage and moves the stream into this wrapper.
140  
        The wrapper owns the stream and will destroy it.
145  
        The wrapper owns the stream and will destroy it.
141  

146  

142  
        @param s The stream to take ownership of.
147  
        @param s The stream to take ownership of.
143  
    */
148  
    */
144  
    template<ReadStream S>
149  
    template<ReadStream S>
145  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
150  
        requires (!std::same_as<std::decay_t<S>, any_read_stream>)
146  
    any_read_stream(S s);
151  
    any_read_stream(S s);
147  

152  

148  
    /** Construct by wrapping a ReadStream without ownership.
153  
    /** Construct by wrapping a ReadStream without ownership.
149  

154  

150  
        Wraps the given stream by pointer. The stream must remain
155  
        Wraps the given stream by pointer. The stream must remain
151  
        valid for the lifetime of this wrapper.
156  
        valid for the lifetime of this wrapper.
152  

157  

153  
        @param s Pointer to the stream to wrap.
158  
        @param s Pointer to the stream to wrap.
154  
    */
159  
    */
155  
    template<ReadStream S>
160  
    template<ReadStream S>
156  
    any_read_stream(S* s);
161  
    any_read_stream(S* s);
157  

162  

158  
    /** Check if the wrapper contains a valid stream.
163  
    /** Check if the wrapper contains a valid stream.
159  

164  

160  
        @return `true` if wrapping a stream, `false` if default-constructed
165  
        @return `true` if wrapping a stream, `false` if default-constructed
161  
            or moved-from.
166  
            or moved-from.
162  
    */
167  
    */
163  
    bool
168  
    bool
164  
    has_value() const noexcept
169  
    has_value() const noexcept
165  
    {
170  
    {
166  
        return stream_ != nullptr;
171  
        return stream_ != nullptr;
167  
    }
172  
    }
168  

173  

169  
    /** Check if the wrapper contains a valid stream.
174  
    /** Check if the wrapper contains a valid stream.
170  

175  

171  
        @return `true` if wrapping a stream, `false` if default-constructed
176  
        @return `true` if wrapping a stream, `false` if default-constructed
172  
            or moved-from.
177  
            or moved-from.
173  
    */
178  
    */
174  
    explicit
179  
    explicit
175  
    operator bool() const noexcept
180  
    operator bool() const noexcept
176  
    {
181  
    {
177  
        return has_value();
182  
        return has_value();
178  
    }
183  
    }
179  

184  

180  
    /** Initiate an asynchronous read operation.
185  
    /** Initiate an asynchronous read operation.
181  

186  

182  
        Reads data into the provided buffer sequence. The operation
187  
        Reads data into the provided buffer sequence. The operation
183  
        completes when at least one byte has been read, or an error
188  
        completes when at least one byte has been read, or an error
184  
        occurs.
189  
        occurs.
185  

190  

186  
        @param buffers The buffer sequence to read into. Passed by
191  
        @param buffers The buffer sequence to read into. Passed by
187  
            value to ensure the sequence lives in the coroutine frame
192  
            value to ensure the sequence lives in the coroutine frame
188  
            across suspension points.
193  
            across suspension points.
189  

194  

190  
        @return An awaitable yielding `(error_code,std::size_t)`.
195  
        @return An awaitable yielding `(error_code,std::size_t)`.
191  

196  

 
197 +
        @par Immediate Completion
 
198 +
        The operation completes immediately without suspending
 
199 +
        the calling coroutine when the underlying stream's
 
200 +
        awaitable reports immediate readiness via `await_ready`.
 
201 +

 
202 +
        @note This is a partial operation and may not process the
 
203 +
        entire buffer sequence. Use the composed @ref read algorithm
 
204 +
        for guaranteed complete transfer.
 
205 +

192  
        @par Preconditions
206  
        @par Preconditions
193  
        The wrapper must contain a valid stream (`has_value() == true`).
207  
        The wrapper must contain a valid stream (`has_value() == true`).
 
208 +
        The caller must not call this function again after a prior
 
209 +
        call returned an error (including EOF).
194  
    */
210  
    */
195  
    template<MutableBufferSequence MB>
211  
    template<MutableBufferSequence MB>
196  
    auto
212  
    auto
197  
    read_some(MB buffers);
213  
    read_some(MB buffers);
198  

214  

199  
protected:
215  
protected:
200  
    /** Rebind to a new stream after move.
216  
    /** Rebind to a new stream after move.
201  

217  

202  
        Updates the internal pointer to reference a new stream object.
218  
        Updates the internal pointer to reference a new stream object.
203  
        Used by owning wrappers after move assignment when the owned
219  
        Used by owning wrappers after move assignment when the owned
204  
        object has moved to a new location.
220  
        object has moved to a new location.
205  

221  

206  
        @param new_stream The new stream to bind to. Must be the same
222  
        @param new_stream The new stream to bind to. Must be the same
207  
            type as the original stream.
223  
            type as the original stream.
208  

224  

209  
        @note Terminates if called with a stream of different type
225  
        @note Terminates if called with a stream of different type
210  
            than the original.
226  
            than the original.
211  
    */
227  
    */
212  
    template<ReadStream S>
228  
    template<ReadStream S>
213  
    void
229  
    void
214  
    rebind(S& new_stream) noexcept
230  
    rebind(S& new_stream) noexcept
215  
    {
231  
    {
216  
        if(vt_ != &vtable_for_impl<S>::value)
232  
        if(vt_ != &vtable_for_impl<S>::value)
217  
            std::terminate();
233  
            std::terminate();
218  
        stream_ = &new_stream;
234  
        stream_ = &new_stream;
219  
    }
235  
    }
220  
};
236  
};
221  

237  

222  
//----------------------------------------------------------
238  
//----------------------------------------------------------
223  

239  

224 -
struct any_read_stream::awaitable_ops
240 +
struct any_read_stream::vtable
225  
{
241  
{
 
242 +
    // ordered by call frequency for cache line coherence
 
243 +
    void (*construct_awaitable)(
 
244 +
        void* stream,
 
245 +
        void* storage,
 
246 +
        std::span<mutable_buffer const> buffers);
226  
    bool (*await_ready)(void*);
247  
    bool (*await_ready)(void*);
227  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
248  
    coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228  
    io_result<std::size_t> (*await_resume)(void*);
249  
    io_result<std::size_t> (*await_resume)(void*);
229 -
    void (*destroy)(void*) noexcept;
250 +
    void (*destroy_awaitable)(void*) noexcept;
230 -
};
 
231 -

 
232 -
struct any_read_stream::vtable
 
233 -
{
 
234 -
    void (*destroy)(void*) noexcept;
 
235  
    std::size_t awaitable_size;
251  
    std::size_t awaitable_size;
236  
    std::size_t awaitable_align;
252  
    std::size_t awaitable_align;
237 -
    awaitable_ops const* (*construct_awaitable)(
253 +
    void (*destroy)(void*) noexcept;
238 -
        void* stream,
 
239 -
        void* storage,
 
240 -
        std::span<mutable_buffer const> buffers);
 
241  
};
254  
};
242  

255  

243  
template<ReadStream S>
256  
template<ReadStream S>
244  
struct any_read_stream::vtable_for_impl
257  
struct any_read_stream::vtable_for_impl
245  
{
258  
{
246  
    using Awaitable = decltype(std::declval<S&>().read_some(
259  
    using Awaitable = decltype(std::declval<S&>().read_some(
247  
        std::span<mutable_buffer const>{}));
260  
        std::span<mutable_buffer const>{}));
248  

261  

249  
    static void
262  
    static void
250  
    do_destroy_impl(void* stream) noexcept
263  
    do_destroy_impl(void* stream) noexcept
251  
    {
264  
    {
252  
        static_cast<S*>(stream)->~S();
265  
        static_cast<S*>(stream)->~S();
253  
    }
266  
    }
254  

267  

255 -
    static awaitable_ops const*
268 +
    static void
256  
    construct_awaitable_impl(
269  
    construct_awaitable_impl(
257  
        void* stream,
270  
        void* stream,
258  
        void* storage,
271  
        void* storage,
259  
        std::span<mutable_buffer const> buffers)
272  
        std::span<mutable_buffer const> buffers)
260  
    {
273  
    {
261  
        auto& s = *static_cast<S*>(stream);
274  
        auto& s = *static_cast<S*>(stream);
262 -

 
263 -
        static constexpr awaitable_ops ops = {
 
264 -
            +[](void* p) {
 
265 -
                return static_cast<Awaitable*>(p)->await_ready();
 
266 -
            },
 
267 -
            +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
268 -
                return detail::call_await_suspend(
 
269 -
                    static_cast<Awaitable*>(p), h, ex, token);
 
270 -
            },
 
271 -
            +[](void* p) {
 
272 -
                return static_cast<Awaitable*>(p)->await_resume();
 
273 -
            },
 
274 -
            +[](void* p) noexcept {
 
275 -
                static_cast<Awaitable*>(p)->~Awaitable();
 
276 -
            }
 
277 -
        };
 
278 -
        return &ops;
 
279  
        ::new(storage) Awaitable(s.read_some(buffers));
275  
        ::new(storage) Awaitable(s.read_some(buffers));
280  
    }
276  
    }
281  

277  

282  
    static constexpr vtable value = {
278  
    static constexpr vtable value = {
283 -
        &do_destroy_impl,
279 +
        &construct_awaitable_impl,
 
280 +
        +[](void* p) {
 
281 +
            return static_cast<Awaitable*>(p)->await_ready();
 
282 +
        },
 
283 +
        +[](void* p, coro h, executor_ref ex, std::stop_token token) {
 
284 +
            return detail::call_await_suspend(
 
285 +
                static_cast<Awaitable*>(p), h, ex, token);
 
286 +
        },
 
287 +
        +[](void* p) {
 
288 +
            return static_cast<Awaitable*>(p)->await_resume();
 
289 +
        },
 
290 +
        +[](void* p) noexcept {
 
291 +
            static_cast<Awaitable*>(p)->~Awaitable();
 
292 +
        },
284  
        sizeof(Awaitable),
293  
        sizeof(Awaitable),
285  
        alignof(Awaitable),
294  
        alignof(Awaitable),
286 -
        &construct_awaitable_impl
295 +
        &do_destroy_impl
287  
    };
296  
    };
288  
};
297  
};
289  

298  

290  
//----------------------------------------------------------
299  
//----------------------------------------------------------
291  

300  

292  
inline
301  
inline
293  
any_read_stream::~any_read_stream()
302  
any_read_stream::~any_read_stream()
294  
{
303  
{
295  
    if(storage_)
304  
    if(storage_)
296  
    {
305  
    {
297  
        vt_->destroy(stream_);
306  
        vt_->destroy(stream_);
298  
        ::operator delete(storage_);
307  
        ::operator delete(storage_);
299  
    }
308  
    }
300  
    if(cached_awaitable_)
309  
    if(cached_awaitable_)
301  
    {
310  
    {
302 -
        if(active_ops_)
311 +
        if(awaitable_active_)
303 -
            active_ops_->destroy(cached_awaitable_);
312 +
            vt_->destroy_awaitable(cached_awaitable_);
304  
        ::operator delete(cached_awaitable_);
313  
        ::operator delete(cached_awaitable_);
305  
    }
314  
    }
306  
}
315  
}
307  

316  

308  
inline any_read_stream&
317  
inline any_read_stream&
309  
any_read_stream::operator=(any_read_stream&& other) noexcept
318  
any_read_stream::operator=(any_read_stream&& other) noexcept
310  
{
319  
{
311  
    if(this != &other)
320  
    if(this != &other)
312  
    {
321  
    {
313  
        if(storage_)
322  
        if(storage_)
314  
        {
323  
        {
315  
            vt_->destroy(stream_);
324  
            vt_->destroy(stream_);
316  
            ::operator delete(storage_);
325  
            ::operator delete(storage_);
317  
        }
326  
        }
318  
        if(cached_awaitable_)
327  
        if(cached_awaitable_)
319  
        {
328  
        {
320 -
            if(active_ops_)
329 +
            if(awaitable_active_)
321 -
                active_ops_->destroy(cached_awaitable_);
330 +
                vt_->destroy_awaitable(cached_awaitable_);
322  
            ::operator delete(cached_awaitable_);
331  
            ::operator delete(cached_awaitable_);
323  
        }
332  
        }
324  
        stream_ = std::exchange(other.stream_, nullptr);
333  
        stream_ = std::exchange(other.stream_, nullptr);
325  
        vt_ = std::exchange(other.vt_, nullptr);
334  
        vt_ = std::exchange(other.vt_, nullptr);
326  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
335  
        cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
327  
        storage_ = std::exchange(other.storage_, nullptr);
336  
        storage_ = std::exchange(other.storage_, nullptr);
328 -
        active_ops_ = std::exchange(other.active_ops_, nullptr);
337 +
        awaitable_active_ = std::exchange(other.awaitable_active_, false);
329  
    }
338  
    }
330  
    return *this;
339  
    return *this;
331  
}
340  
}
332  

341  

333  
template<ReadStream S>
342  
template<ReadStream S>
334  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
343  
    requires (!std::same_as<std::decay_t<S>, any_read_stream>)
335  
any_read_stream::any_read_stream(S s)
344  
any_read_stream::any_read_stream(S s)
336  
    : vt_(&vtable_for_impl<S>::value)
345  
    : vt_(&vtable_for_impl<S>::value)
337  
{
346  
{
338  
    struct guard {
347  
    struct guard {
339  
        any_read_stream* self;
348  
        any_read_stream* self;
340  
        bool committed = false;
349  
        bool committed = false;
341  
        ~guard() {
350  
        ~guard() {
342  
            if(!committed && self->storage_) {
351  
            if(!committed && self->storage_) {
343  
                self->vt_->destroy(self->stream_);
352  
                self->vt_->destroy(self->stream_);
344  
                ::operator delete(self->storage_);
353  
                ::operator delete(self->storage_);
345  
                self->storage_ = nullptr;
354  
                self->storage_ = nullptr;
346  
                self->stream_ = nullptr;
355  
                self->stream_ = nullptr;
347  
            }
356  
            }
348  
        }
357  
        }
349  
    } g{this};
358  
    } g{this};
350  

359  

351  
    storage_ = ::operator new(sizeof(S));
360  
    storage_ = ::operator new(sizeof(S));
352  
    stream_ = ::new(storage_) S(std::move(s));
361  
    stream_ = ::new(storage_) S(std::move(s));
353  

362  

354  
    // Preallocate the awaitable storage
363  
    // Preallocate the awaitable storage
355  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
364  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
356  

365  

357  
    g.committed = true;
366  
    g.committed = true;
358  
}
367  
}
359  

368  

360  
template<ReadStream S>
369  
template<ReadStream S>
361  
any_read_stream::any_read_stream(S* s)
370  
any_read_stream::any_read_stream(S* s)
362  
    : stream_(s)
371  
    : stream_(s)
363  
    , vt_(&vtable_for_impl<S>::value)
372  
    , vt_(&vtable_for_impl<S>::value)
364  
{
373  
{
365  
    // Preallocate the awaitable storage
374  
    // Preallocate the awaitable storage
366  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
375  
    cached_awaitable_ = ::operator new(vt_->awaitable_size);
367  
}
376  
}
368  

377  

369  
//----------------------------------------------------------
378  
//----------------------------------------------------------
370  

379  

371  
template<MutableBufferSequence MB>
380  
template<MutableBufferSequence MB>
372  
auto
381  
auto
373  
any_read_stream::read_some(MB buffers)
382  
any_read_stream::read_some(MB buffers)
374  
{
383  
{
 
384 +
    // VFALCO in theory, we could use if constexpr to detect a
 
385 +
    // span and then pass that through to read_some without the array
375  
    struct awaitable
386  
    struct awaitable
376  
    {
387  
    {
377  
        any_read_stream* self_;
388  
        any_read_stream* self_;
378 -
        buffer_param<MB> bp_;
389 +
        mutable_buffer_array<detail::max_iovec_> ba_;
379  

390  

380  
        bool
391  
        bool
381 -
        await_ready() const noexcept
392 +
        await_ready()
382  
        {
393  
        {
383 -
            return false;
394 +
            self_->vt_->construct_awaitable(
 
395 +
                self_->stream_,
 
396 +
                self_->cached_awaitable_,
 
397 +
                ba_.to_span());
 
398 +
            self_->awaitable_active_ = true;
 
399 +

 
400 +
            return self_->vt_->await_ready(
 
401 +
                self_->cached_awaitable_);
384  
        }
402  
        }
385  

403  

386  
        coro
404  
        coro
387  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
405  
        await_suspend(coro h, executor_ref ex, std::stop_token token)
388  
        {
406  
        {
389 -
            // Construct the underlying awaitable into cached storage
407 +
            return self_->vt_->await_suspend(
390 -
            self_->active_ops_ = self_->vt_->construct_awaitable(
 
391 -
                self_->stream_,
 
392 -
                self_->cached_awaitable_,
 
393 -
                bp_.data());
 
394 -

 
395 -
            // Check if underlying is immediately ready
 
396 -
            if(self_->active_ops_->await_ready(self_->cached_awaitable_))
 
397 -
                return h;
 
398 -

 
399 -
            // Forward to underlying awaitable
 
400 -
            return self_->active_ops_->await_suspend(
 
401  
                self_->cached_awaitable_, h, ex, token);
408  
                self_->cached_awaitable_, h, ex, token);
402  
        }
409  
        }
403  

410  

404  
        io_result<std::size_t>
411  
        io_result<std::size_t>
405  
        await_resume()
412  
        await_resume()
406  
        {
413  
        {
407  
            struct guard {
414  
            struct guard {
408  
                any_read_stream* self;
415  
                any_read_stream* self;
409  
                ~guard() {
416  
                ~guard() {
410 -
                    self->active_ops_->destroy(self->cached_awaitable_);
417 +
                    self->vt_->destroy_awaitable(self->cached_awaitable_);
411 -
                    self->active_ops_ = nullptr;
418 +
                    self->awaitable_active_ = false;
412  
                }
419  
                }
413  
            } g{self_};
420  
            } g{self_};
414 -
            return self_->active_ops_->await_resume(
421 +
            return self_->vt_->await_resume(
415  
                self_->cached_awaitable_);
422  
                self_->cached_awaitable_);
416  
        }
423  
        }
417  
    };
424  
    };
418 -
    return awaitable{this, buffer_param<MB>(buffers)};
425 +
    return awaitable{this,
 
426 +
        mutable_buffer_array<detail::max_iovec_>(buffers)};
419  
}
427  
}
420  

428  

421  
} // namespace capy
429  
} // namespace capy
422  
} // namespace boost
430  
} // namespace boost
423  

431  

424  
#endif
432  
#endif