Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/read_stream.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 :
23 : #include <concepts>
24 : #include <coroutine>
25 : #include <cstddef>
26 : #include <new>
27 : #include <span>
28 : #include <stop_token>
29 : #include <system_error>
30 : #include <utility>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : /** Type-erased wrapper for any ReadStream.
36 :
37 : This class provides type erasure for any type satisfying the
38 : @ref ReadStream concept, enabling runtime polymorphism for
39 : read operations. It uses cached awaitable storage to achieve
40 : zero steady-state allocation after construction.
41 :
42 : The wrapper supports two construction modes:
43 : - **Owning**: Pass by value to transfer ownership. The wrapper
44 : allocates storage and owns the stream.
45 : - **Reference**: Pass a pointer to wrap without ownership. The
46 : pointed-to stream must outlive this wrapper.
47 :
48 : @par Awaitable Preallocation
49 : The constructor preallocates storage for the type-erased awaitable.
50 : This reserves all virtual address space at server startup
51 : so memory usage can be measured up front, rather than
52 : allocating piecemeal as traffic arrives.
53 :
54 : @par Immediate Completion
55 : When the underlying stream's awaitable reports ready immediately
56 : (e.g. buffered data already available), the wrapper skips
57 : coroutine suspension entirely and returns the result inline.
58 :
59 : @par Thread Safety
60 : Not thread-safe. Concurrent operations on the same wrapper
61 : are undefined behavior.
62 :
63 : @par Example
64 : @code
65 : // Owning - takes ownership of the stream
66 : any_read_stream stream(socket{ioc});
67 :
68 : // Reference - wraps without ownership
69 : socket sock(ioc);
70 : any_read_stream stream(&sock);
71 :
72 : mutable_buffer buf(data, size);
73 : auto [ec, n] = co_await stream.read_some(buf);
74 : @endcode
75 :
76 : @see any_write_stream, any_stream, ReadStream
77 : */
78 : class any_read_stream
79 : {
80 : struct vtable;
81 :
82 : template<ReadStream S>
83 : struct vtable_for_impl;
84 :
85 : // ordered for cache line coherence
86 : void* stream_ = nullptr;
87 : vtable const* vt_ = nullptr;
88 : void* cached_awaitable_ = nullptr;
89 : void* storage_ = nullptr;
90 : bool awaitable_active_ = false;
91 :
92 : public:
93 : /** Destructor.
94 :
95 : Destroys the owned stream (if any) and releases the cached
96 : awaitable storage.
97 : */
98 : ~any_read_stream();
99 :
100 : /** Default constructor.
101 :
102 : Constructs an empty wrapper. Operations on a default-constructed
103 : wrapper result in undefined behavior.
104 : */
105 1 : any_read_stream() = default;
106 :
107 : /** Non-copyable.
108 :
109 : The awaitable cache is per-instance and cannot be shared.
110 : */
111 : any_read_stream(any_read_stream const&) = delete;
112 : any_read_stream& operator=(any_read_stream const&) = delete;
113 :
114 : /** Move constructor.
115 :
116 : Transfers ownership of the wrapped stream (if owned) and
117 : cached awaitable storage from `other`. After the move, `other` is
118 : in a default-constructed state.
119 :
120 : @param other The wrapper to move from.
121 : */
122 2 : any_read_stream(any_read_stream&& other) noexcept
123 2 : : stream_(std::exchange(other.stream_, nullptr))
124 2 : , vt_(std::exchange(other.vt_, nullptr))
125 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 2 : , storage_(std::exchange(other.storage_, nullptr))
127 2 : , awaitable_active_(std::exchange(other.awaitable_active_, false))
128 : {
129 2 : }
130 :
131 : /** Move assignment operator.
132 :
133 : Destroys any owned stream and releases existing resources,
134 : then transfers ownership from `other`.
135 :
136 : @param other The wrapper to move from.
137 : @return Reference to this wrapper.
138 : */
139 : any_read_stream&
140 : operator=(any_read_stream&& other) noexcept;
141 :
142 : /** Construct by taking ownership of a ReadStream.
143 :
144 : Allocates storage and moves the stream into this wrapper.
145 : The wrapper owns the stream and will destroy it.
146 :
147 : @param s The stream to take ownership of.
148 : */
149 : template<ReadStream S>
150 : requires (!std::same_as<std::decay_t<S>, any_read_stream>)
151 : any_read_stream(S s);
152 :
153 : /** Construct by wrapping a ReadStream without ownership.
154 :
155 : Wraps the given stream by pointer. The stream must remain
156 : valid for the lifetime of this wrapper.
157 :
158 : @param s Pointer to the stream to wrap.
159 : */
160 : template<ReadStream S>
161 : any_read_stream(S* s);
162 :
163 : /** Check if the wrapper contains a valid stream.
164 :
165 : @return `true` if wrapping a stream, `false` if default-constructed
166 : or moved-from.
167 : */
168 : bool
169 25 : has_value() const noexcept
170 : {
171 25 : return stream_ != nullptr;
172 : }
173 :
174 : /** Check if the wrapper contains a valid stream.
175 :
176 : @return `true` if wrapping a stream, `false` if default-constructed
177 : or moved-from.
178 : */
179 : explicit
180 3 : operator bool() const noexcept
181 : {
182 3 : return has_value();
183 : }
184 :
185 : /** Initiate an asynchronous read operation.
186 :
187 : Reads data into the provided buffer sequence. The operation
188 : completes when at least one byte has been read, or an error
189 : occurs.
190 :
191 : @param buffers The buffer sequence to read into. Passed by
192 : value to ensure the sequence lives in the coroutine frame
193 : across suspension points.
194 :
195 : @return An awaitable yielding `(error_code,std::size_t)`.
196 :
197 : @par Immediate Completion
198 : The operation completes immediately without suspending
199 : the calling coroutine when the underlying stream's
200 : awaitable reports immediate readiness via `await_ready`.
201 :
202 : @note This is a partial operation and may not process the
203 : entire buffer sequence. Use the composed @ref read algorithm
204 : for guaranteed complete transfer.
205 :
206 : @par Preconditions
207 : The wrapper must contain a valid stream (`has_value() == true`).
208 : The caller must not call this function again after a prior
209 : call returned an error (including EOF).
210 : */
211 : template<MutableBufferSequence MB>
212 : auto
213 : read_some(MB buffers);
214 :
215 : protected:
216 : /** Rebind to a new stream after move.
217 :
218 : Updates the internal pointer to reference a new stream object.
219 : Used by owning wrappers after move assignment when the owned
220 : object has moved to a new location.
221 :
222 : @param new_stream The new stream to bind to. Must be the same
223 : type as the original stream.
224 :
225 : @note Terminates if called with a stream of different type
226 : than the original.
227 : */
228 : template<ReadStream S>
229 : void
230 : rebind(S& new_stream) noexcept
231 : {
232 : if(vt_ != &vtable_for_impl<S>::value)
233 : std::terminate();
234 : stream_ = &new_stream;
235 : }
236 : };
237 :
238 : //----------------------------------------------------------
239 :
240 : struct any_read_stream::vtable
241 : {
242 : // ordered by call frequency for cache line coherence
243 : void (*construct_awaitable)(
244 : void* stream,
245 : void* storage,
246 : std::span<mutable_buffer const> buffers);
247 : bool (*await_ready)(void*);
248 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
249 : io_result<std::size_t> (*await_resume)(void*);
250 : void (*destroy_awaitable)(void*) noexcept;
251 : std::size_t awaitable_size;
252 : std::size_t awaitable_align;
253 : void (*destroy)(void*) noexcept;
254 : };
255 :
256 : template<ReadStream S>
257 : struct any_read_stream::vtable_for_impl
258 : {
259 : using Awaitable = decltype(std::declval<S&>().read_some(
260 : std::span<mutable_buffer const>{}));
261 :
262 : static void
263 1 : do_destroy_impl(void* stream) noexcept
264 : {
265 1 : static_cast<S*>(stream)->~S();
266 1 : }
267 :
268 : static void
269 91 : construct_awaitable_impl(
270 : void* stream,
271 : void* storage,
272 : std::span<mutable_buffer const> buffers)
273 : {
274 91 : auto& s = *static_cast<S*>(stream);
275 91 : ::new(storage) Awaitable(s.read_some(buffers));
276 91 : }
277 :
278 : static constexpr vtable value = {
279 : &construct_awaitable_impl,
280 91 : +[](void* p) {
281 91 : return static_cast<Awaitable*>(p)->await_ready();
282 : },
283 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
284 0 : return detail::call_await_suspend(
285 0 : static_cast<Awaitable*>(p), h, ex, token);
286 : },
287 89 : +[](void* p) {
288 89 : return static_cast<Awaitable*>(p)->await_resume();
289 : },
290 93 : +[](void* p) noexcept {
291 16 : static_cast<Awaitable*>(p)->~Awaitable();
292 : },
293 : sizeof(Awaitable),
294 : alignof(Awaitable),
295 : &do_destroy_impl
296 : };
297 : };
298 :
299 : //----------------------------------------------------------
300 :
301 : inline
302 101 : any_read_stream::~any_read_stream()
303 : {
304 101 : if(storage_)
305 : {
306 1 : vt_->destroy(stream_);
307 1 : ::operator delete(storage_);
308 : }
309 101 : if(cached_awaitable_)
310 : {
311 91 : if(awaitable_active_)
312 1 : vt_->destroy_awaitable(cached_awaitable_);
313 91 : ::operator delete(cached_awaitable_);
314 : }
315 101 : }
316 :
317 : inline any_read_stream&
318 5 : any_read_stream::operator=(any_read_stream&& other) noexcept
319 : {
320 5 : if(this != &other)
321 : {
322 5 : if(storage_)
323 : {
324 0 : vt_->destroy(stream_);
325 0 : ::operator delete(storage_);
326 : }
327 5 : if(cached_awaitable_)
328 : {
329 2 : if(awaitable_active_)
330 1 : vt_->destroy_awaitable(cached_awaitable_);
331 2 : ::operator delete(cached_awaitable_);
332 : }
333 5 : stream_ = std::exchange(other.stream_, nullptr);
334 5 : vt_ = std::exchange(other.vt_, nullptr);
335 5 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336 5 : storage_ = std::exchange(other.storage_, nullptr);
337 5 : awaitable_active_ = std::exchange(other.awaitable_active_, false);
338 : }
339 5 : return *this;
340 : }
341 :
342 : template<ReadStream S>
343 : requires (!std::same_as<std::decay_t<S>, any_read_stream>)
344 1 : any_read_stream::any_read_stream(S s)
345 1 : : vt_(&vtable_for_impl<S>::value)
346 : {
347 : struct guard {
348 : any_read_stream* self;
349 : bool committed = false;
350 1 : ~guard() {
351 1 : if(!committed && self->storage_) {
352 0 : self->vt_->destroy(self->stream_);
353 0 : ::operator delete(self->storage_);
354 0 : self->storage_ = nullptr;
355 0 : self->stream_ = nullptr;
356 : }
357 1 : }
358 1 : } g{this};
359 :
360 1 : storage_ = ::operator new(sizeof(S));
361 1 : stream_ = ::new(storage_) S(std::move(s));
362 :
363 : // Preallocate the awaitable storage
364 1 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
365 :
366 1 : g.committed = true;
367 1 : }
368 :
369 : template<ReadStream S>
370 92 : any_read_stream::any_read_stream(S* s)
371 92 : : stream_(s)
372 92 : , vt_(&vtable_for_impl<S>::value)
373 : {
374 : // Preallocate the awaitable storage
375 92 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
376 92 : }
377 :
378 : //----------------------------------------------------------
379 :
380 : template<MutableBufferSequence MB>
381 : auto
382 91 : any_read_stream::read_some(MB buffers)
383 : {
384 : // VFALCO in theory, we could use if constexpr to detect a
385 : // span and then pass that through to read_some without the array
386 : struct awaitable
387 : {
388 : any_read_stream* self_;
389 : mutable_buffer_array<detail::max_iovec_> ba_;
390 :
391 : bool
392 91 : await_ready()
393 : {
394 91 : self_->vt_->construct_awaitable(
395 91 : self_->stream_,
396 91 : self_->cached_awaitable_,
397 91 : ba_.to_span());
398 91 : self_->awaitable_active_ = true;
399 :
400 182 : return self_->vt_->await_ready(
401 91 : self_->cached_awaitable_);
402 : }
403 :
404 : coro
405 0 : await_suspend(coro h, executor_ref ex, std::stop_token token)
406 : {
407 0 : return self_->vt_->await_suspend(
408 0 : self_->cached_awaitable_, h, ex, token);
409 : }
410 :
411 : io_result<std::size_t>
412 89 : await_resume()
413 : {
414 : struct guard {
415 : any_read_stream* self;
416 89 : ~guard() {
417 89 : self->vt_->destroy_awaitable(self->cached_awaitable_);
418 89 : self->awaitable_active_ = false;
419 89 : }
420 89 : } g{self_};
421 89 : return self_->vt_->await_resume(
422 154 : self_->cached_awaitable_);
423 89 : }
424 : };
425 : return awaitable{this,
426 91 : mutable_buffer_array<detail::max_iovec_>(buffers)};
427 91 : }
428 :
429 : } // namespace capy
430 : } // namespace boost
431 :
432 : #endif
|