libs/capy/include/boost/capy/io/any_read_stream.hpp

87.4% Lines (83/95) 81.6% Functions (31/38) 73.9% Branches (17/23)
libs/capy/include/boost/capy/io/any_read_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any ReadStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref ReadStream concept, enabling runtime polymorphism for
39 read operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Immediate Completion
55 When the underlying stream's awaitable reports ready immediately
56 (e.g. buffered data already available), the wrapper skips
57 coroutine suspension entirely and returns the result inline.
58
59 @par Thread Safety
60 Not thread-safe. Concurrent operations on the same wrapper
61 are undefined behavior.
62
63 @par Example
64 @code
65 // Owning - takes ownership of the stream
66 any_read_stream stream(socket{ioc});
67
68 // Reference - wraps without ownership
69 socket sock(ioc);
70 any_read_stream stream(&sock);
71
72 mutable_buffer buf(data, size);
73 auto [ec, n] = co_await stream.read_some(buf);
74 @endcode
75
76 @see any_write_stream, any_stream, ReadStream
77 */
78 class any_read_stream
79 {
80 struct vtable;
81
82 template<ReadStream S>
83 struct vtable_for_impl;
84
85 // ordered for cache line coherence
86 void* stream_ = nullptr;
87 vtable const* vt_ = nullptr;
88 void* cached_awaitable_ = nullptr;
89 void* storage_ = nullptr;
90 bool awaitable_active_ = false;
91
92 public:
93 /** Destructor.
94
95 Destroys the owned stream (if any) and releases the cached
96 awaitable storage.
97 */
98 ~any_read_stream();
99
100 /** Default constructor.
101
102 Constructs an empty wrapper. Operations on a default-constructed
103 wrapper result in undefined behavior.
104 */
105 1 any_read_stream() = default;
106
107 /** Non-copyable.
108
109 The awaitable cache is per-instance and cannot be shared.
110 */
111 any_read_stream(any_read_stream const&) = delete;
112 any_read_stream& operator=(any_read_stream const&) = delete;
113
114 /** Move constructor.
115
116 Transfers ownership of the wrapped stream (if owned) and
117 cached awaitable storage from `other`. After the move, `other` is
118 in a default-constructed state.
119
120 @param other The wrapper to move from.
121 */
122 2 any_read_stream(any_read_stream&& other) noexcept
123 2 : stream_(std::exchange(other.stream_, nullptr))
124 2 , vt_(std::exchange(other.vt_, nullptr))
125 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 2 , storage_(std::exchange(other.storage_, nullptr))
127 2 , awaitable_active_(std::exchange(other.awaitable_active_, false))
128 {
129 2 }
130
131 /** Move assignment operator.
132
133 Destroys any owned stream and releases existing resources,
134 then transfers ownership from `other`.
135
136 @param other The wrapper to move from.
137 @return Reference to this wrapper.
138 */
139 any_read_stream&
140 operator=(any_read_stream&& other) noexcept;
141
142 /** Construct by taking ownership of a ReadStream.
143
144 Allocates storage and moves the stream into this wrapper.
145 The wrapper owns the stream and will destroy it.
146
147 @param s The stream to take ownership of.
148 */
149 template<ReadStream S>
150 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
151 any_read_stream(S s);
152
153 /** Construct by wrapping a ReadStream without ownership.
154
155 Wraps the given stream by pointer. The stream must remain
156 valid for the lifetime of this wrapper.
157
158 @param s Pointer to the stream to wrap.
159 */
160 template<ReadStream S>
161 any_read_stream(S* s);
162
163 /** Check if the wrapper contains a valid stream.
164
165 @return `true` if wrapping a stream, `false` if default-constructed
166 or moved-from.
167 */
168 bool
169 25 has_value() const noexcept
170 {
171 25 return stream_ != nullptr;
172 }
173
174 /** Check if the wrapper contains a valid stream.
175
176 @return `true` if wrapping a stream, `false` if default-constructed
177 or moved-from.
178 */
179 explicit
180 3 operator bool() const noexcept
181 {
182 3 return has_value();
183 }
184
185 /** Initiate an asynchronous read operation.
186
187 Reads data into the provided buffer sequence. The operation
188 completes when at least one byte has been read, or an error
189 occurs.
190
191 @param buffers The buffer sequence to read into. Passed by
192 value to ensure the sequence lives in the coroutine frame
193 across suspension points.
194
195 @return An awaitable yielding `(error_code,std::size_t)`.
196
197 @par Immediate Completion
198 The operation completes immediately without suspending
199 the calling coroutine when the underlying stream's
200 awaitable reports immediate readiness via `await_ready`.
201
202 @note This is a partial operation and may not process the
203 entire buffer sequence. Use the composed @ref read algorithm
204 for guaranteed complete transfer.
205
206 @par Preconditions
207 The wrapper must contain a valid stream (`has_value() == true`).
208 The caller must not call this function again after a prior
209 call returned an error (including EOF).
210 */
211 template<MutableBufferSequence MB>
212 auto
213 read_some(MB buffers);
214
215 protected:
216 /** Rebind to a new stream after move.
217
218 Updates the internal pointer to reference a new stream object.
219 Used by owning wrappers after move assignment when the owned
220 object has moved to a new location.
221
222 @param new_stream The new stream to bind to. Must be the same
223 type as the original stream.
224
225 @note Terminates if called with a stream of different type
226 than the original.
227 */
228 template<ReadStream S>
229 void
230 rebind(S& new_stream) noexcept
231 {
232 if(vt_ != &vtable_for_impl<S>::value)
233 std::terminate();
234 stream_ = &new_stream;
235 }
236 };
237
238 //----------------------------------------------------------
239
240 struct any_read_stream::vtable
241 {
242 // ordered by call frequency for cache line coherence
243 void (*construct_awaitable)(
244 void* stream,
245 void* storage,
246 std::span<mutable_buffer const> buffers);
247 bool (*await_ready)(void*);
248 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
249 io_result<std::size_t> (*await_resume)(void*);
250 void (*destroy_awaitable)(void*) noexcept;
251 std::size_t awaitable_size;
252 std::size_t awaitable_align;
253 void (*destroy)(void*) noexcept;
254 };
255
256 template<ReadStream S>
257 struct any_read_stream::vtable_for_impl
258 {
259 using Awaitable = decltype(std::declval<S&>().read_some(
260 std::span<mutable_buffer const>{}));
261
262 static void
263 1 do_destroy_impl(void* stream) noexcept
264 {
265 1 static_cast<S*>(stream)->~S();
266 1 }
267
268 static void
269 91 construct_awaitable_impl(
270 void* stream,
271 void* storage,
272 std::span<mutable_buffer const> buffers)
273 {
274 91 auto& s = *static_cast<S*>(stream);
275 91 ::new(storage) Awaitable(s.read_some(buffers));
276 91 }
277
278 static constexpr vtable value = {
279 &construct_awaitable_impl,
280 91 +[](void* p) {
281 91 return static_cast<Awaitable*>(p)->await_ready();
282 },
283 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
284 return detail::call_await_suspend(
285 static_cast<Awaitable*>(p), h, ex, token);
286 },
287 89 +[](void* p) {
288 89 return static_cast<Awaitable*>(p)->await_resume();
289 },
290 93 +[](void* p) noexcept {
291 16 static_cast<Awaitable*>(p)->~Awaitable();
292 },
293 sizeof(Awaitable),
294 alignof(Awaitable),
295 &do_destroy_impl
296 };
297 };
298
299 //----------------------------------------------------------
300
301 inline
302 101 any_read_stream::~any_read_stream()
303 {
304
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 100 times.
101 if(storage_)
305 {
306 1 vt_->destroy(stream_);
307 1 ::operator delete(storage_);
308 }
309
2/2
✓ Branch 0 taken 91 times.
✓ Branch 1 taken 10 times.
101 if(cached_awaitable_)
310 {
311
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 90 times.
91 if(awaitable_active_)
312 1 vt_->destroy_awaitable(cached_awaitable_);
313 91 ::operator delete(cached_awaitable_);
314 }
315 101 }
316
317 inline any_read_stream&
318 5 any_read_stream::operator=(any_read_stream&& other) noexcept
319 {
320
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if(this != &other)
321 {
322
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if(storage_)
323 {
324 vt_->destroy(stream_);
325 ::operator delete(storage_);
326 }
327
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if(cached_awaitable_)
328 {
329
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if(awaitable_active_)
330 1 vt_->destroy_awaitable(cached_awaitable_);
331 2 ::operator delete(cached_awaitable_);
332 }
333 5 stream_ = std::exchange(other.stream_, nullptr);
334 5 vt_ = std::exchange(other.vt_, nullptr);
335 5 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336 5 storage_ = std::exchange(other.storage_, nullptr);
337 5 awaitable_active_ = std::exchange(other.awaitable_active_, false);
338 }
339 5 return *this;
340 }
341
342 template<ReadStream S>
343 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
344 1 any_read_stream::any_read_stream(S s)
345 1 : vt_(&vtable_for_impl<S>::value)
346 {
347 struct guard {
348 any_read_stream* self;
349 bool committed = false;
350 1 ~guard() {
351
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if(!committed && self->storage_) {
352 self->vt_->destroy(self->stream_);
353 ::operator delete(self->storage_);
354 self->storage_ = nullptr;
355 self->stream_ = nullptr;
356 }
357 1 }
358 1 } g{this};
359
360
1/1
✓ Branch 1 taken 1 time.
1 storage_ = ::operator new(sizeof(S));
361 1 stream_ = ::new(storage_) S(std::move(s));
362
363 // Preallocate the awaitable storage
364
1/1
✓ Branch 1 taken 1 time.
1 cached_awaitable_ = ::operator new(vt_->awaitable_size);
365
366 1 g.committed = true;
367 1 }
368
369 template<ReadStream S>
370 92 any_read_stream::any_read_stream(S* s)
371 92 : stream_(s)
372 92 , vt_(&vtable_for_impl<S>::value)
373 {
374 // Preallocate the awaitable storage
375 92 cached_awaitable_ = ::operator new(vt_->awaitable_size);
376 92 }
377
378 //----------------------------------------------------------
379
380 template<MutableBufferSequence MB>
381 auto
382 91 any_read_stream::read_some(MB buffers)
383 {
384 // VFALCO in theory, we could use if constexpr to detect a
385 // span and then pass that through to read_some without the array
386 struct awaitable
387 {
388 any_read_stream* self_;
389 mutable_buffer_array<detail::max_iovec_> ba_;
390
391 bool
392 14 await_ready()
393 {
394 14 self_->vt_->construct_awaitable(
395 14 self_->stream_,
396
1/1
✓ Branch 1 taken 14 times.
14 self_->cached_awaitable_,
397 14 ba_.to_span());
398 14 self_->awaitable_active_ = true;
399
400 28 return self_->vt_->await_ready(
401 14 self_->cached_awaitable_);
402 }
403
404 coro
405 await_suspend(coro h, executor_ref ex, std::stop_token token)
406 {
407 return self_->vt_->await_suspend(
408 self_->cached_awaitable_, h, ex, token);
409 }
410
411 io_result<std::size_t>
412 14 await_resume()
413 {
414 struct guard {
415 any_read_stream* self;
416 14 ~guard() {
417 14 self->vt_->destroy_awaitable(self->cached_awaitable_);
418 14 self->awaitable_active_ = false;
419 14 }
420 14 } g{self_};
421 14 return self_->vt_->await_resume(
422
1/1
✓ Branch 1 taken 10 times.
24 self_->cached_awaitable_);
423 14 }
424 };
425 return awaitable{this,
426 91 mutable_buffer_array<detail::max_iovec_>(buffers)};
427 91 }
428
429 } // namespace capy
430 } // namespace boost
431
432 #endif
433