libs/capy/include/boost/capy/io/any_read_source.hpp

91.2% Lines (83/91) 92.0% Functions (23/25) 76.0% Branches (19/25)
libs/capy/include/boost/capy/io/any_read_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/read_source.hpp>
20 #include <boost/capy/coro.hpp>
21 #include <boost/capy/ex/executor_ref.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <system_error>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any ReadSource.
38
39 This class provides type erasure for any type satisfying the
40 @ref ReadSource concept, enabling runtime polymorphism for
41 source read operations. It uses cached awaitable storage to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper supports two construction modes:
45 - **Owning**: Pass by value to transfer ownership. The wrapper
46 allocates storage and owns the source.
47 - **Reference**: Pass a pointer to wrap without ownership. The
48 pointed-to source must outlive this wrapper.
49
50 @par Awaitable Preallocation
51 The constructor preallocates storage for the type-erased awaitable.
52 This reserves all virtual address space at server startup
53 so memory usage can be measured up front, rather than
54 allocating piecemeal as traffic arrives.
55
56 @par Immediate Completion
57 Operations complete immediately without suspending when the
58 buffer sequence is empty, or when the underlying source's
59 awaitable reports readiness via `await_ready`.
60
61 @par Thread Safety
62 Not thread-safe. Concurrent operations on the same wrapper
63 are undefined behavior.
64
65 @par Example
66 @code
67 // Owning - takes ownership of the source
68 any_read_source rs(some_source{args...});
69
70 // Reference - wraps without ownership
71 some_source source;
72 any_read_source rs(&source);
73
74 mutable_buffer buf(data, size);
75 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
76 @endcode
77
78 @see any_read_stream, ReadSource
79 */
80 class any_read_source
81 {
82 struct vtable;
83 struct awaitable_ops;
84
85 template<ReadSource S>
86 struct vtable_for_impl;
87
88 void* source_ = nullptr;
89 vtable const* vt_ = nullptr;
90 void* cached_awaitable_ = nullptr;
91 void* storage_ = nullptr;
92 awaitable_ops const* active_ops_ = nullptr;
93
94 public:
95 /** Destructor.
96
97 Destroys the owned source (if any) and releases the cached
98 awaitable storage.
99 */
100 ~any_read_source();
101
102 /** Default constructor.
103
104 Constructs an empty wrapper. Operations on a default-constructed
105 wrapper result in undefined behavior.
106 */
107 any_read_source() = default;
108
109 /** Non-copyable.
110
111 The awaitable cache is per-instance and cannot be shared.
112 */
113 any_read_source(any_read_source const&) = delete;
114 any_read_source& operator=(any_read_source const&) = delete;
115
116 /** Move constructor.
117
118 Transfers ownership of the wrapped source (if owned) and
119 cached awaitable storage from `other`. After the move, `other` is
120 in a default-constructed state.
121
122 @param other The wrapper to move from.
123 */
124 1 any_read_source(any_read_source&& other) noexcept
125 1 : source_(std::exchange(other.source_, nullptr))
126 1 , vt_(std::exchange(other.vt_, nullptr))
127 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
128 1 , storage_(std::exchange(other.storage_, nullptr))
129 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
130 {
131 1 }
132
133 /** Move assignment operator.
134
135 Destroys any owned source and releases existing resources,
136 then transfers ownership from `other`.
137
138 @param other The wrapper to move from.
139 @return Reference to this wrapper.
140 */
141 any_read_source&
142 operator=(any_read_source&& other) noexcept;
143
144 /** Construct by taking ownership of a ReadSource.
145
146 Allocates storage and moves the source into this wrapper.
147 The wrapper owns the source and will destroy it.
148
149 @param s The source to take ownership of.
150 */
151 template<ReadSource S>
152 requires (!std::same_as<std::decay_t<S>, any_read_source>)
153 any_read_source(S s);
154
155 /** Construct by wrapping a ReadSource without ownership.
156
157 Wraps the given source by pointer. The source must remain
158 valid for the lifetime of this wrapper.
159
160 @param s Pointer to the source to wrap.
161 */
162 template<ReadSource S>
163 any_read_source(S* s);
164
165 /** Check if the wrapper contains a valid source.
166
167 @return `true` if wrapping a source, `false` if default-constructed
168 or moved-from.
169 */
170 bool
171 27 has_value() const noexcept
172 {
173 27 return source_ != nullptr;
174 }
175
176 /** Check if the wrapper contains a valid source.
177
178 @return `true` if wrapping a source, `false` if default-constructed
179 or moved-from.
180 */
181 explicit
182 8 operator bool() const noexcept
183 {
184 8 return has_value();
185 }
186
187 /** Initiate a partial read operation.
188
189 Reads one or more bytes into the provided buffer sequence.
190 May fill less than the full sequence.
191
192 @param buffers The buffer sequence to read into.
193
194 @return An awaitable yielding `(error_code,std::size_t)`.
195
196 @par Immediate Completion
197 The operation completes immediately without suspending
198 the calling coroutine when:
199 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
200 @li The underlying source's awaitable reports immediate
201 readiness via `await_ready`.
202
203 @note This is a partial operation and may not process the
204 entire buffer sequence. Use @ref read for guaranteed
205 complete transfer.
206
207 @par Preconditions
208 The wrapper must contain a valid source (`has_value() == true`).
209 The caller must not call this function again after a prior
210 call returned an error (including EOF).
211 */
212 template<MutableBufferSequence MB>
213 auto
214 read_some(MB buffers);
215
216 /** Initiate a complete read operation.
217
218 Reads data into the provided buffer sequence by forwarding
219 to the underlying source's `read` operation. Large buffer
220 sequences are processed in windows, with each window
221 forwarded as a separate `read` call to the underlying source.
222 The operation completes when the entire buffer sequence is
223 filled, end-of-file is reached, or an error occurs.
224
225 @param buffers The buffer sequence to read into.
226
227 @return An awaitable yielding `(error_code,std::size_t)`.
228
229 @par Immediate Completion
230 The operation completes immediately without suspending
231 the calling coroutine when:
232 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
233 @li The underlying source's `read` awaitable reports
234 immediate readiness via `await_ready`.
235
236 @par Postconditions
237 Exactly one of the following is true on return:
238 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
239 The entire buffer was filled.
240 @li **End-of-stream or Error**: `ec` and `n` indicates
241 the number of bytes transferred before the failure.
242
243 @par Preconditions
244 The wrapper must contain a valid source (`has_value() == true`).
245 The caller must not call this function again after a prior
246 call returned an error (including EOF).
247 */
248 template<MutableBufferSequence MB>
249 io_task<std::size_t>
250 read(MB buffers);
251
252 protected:
253 /** Rebind to a new source after move.
254
255 Updates the internal pointer to reference a new source object.
256 Used by owning wrappers after move assignment when the owned
257 object has moved to a new location.
258
259 @param new_source The new source to bind to. Must be the same
260 type as the original source.
261
262 @note Terminates if called with a source of different type
263 than the original.
264 */
265 template<ReadSource S>
266 void
267 rebind(S& new_source) noexcept
268 {
269 if(vt_ != &vtable_for_impl<S>::value)
270 std::terminate();
271 source_ = &new_source;
272 }
273
274 private:
275 auto
276 read_(std::span<mutable_buffer const> buffers);
277 };
278
279 //----------------------------------------------------------
280
281 // ordered by call sequence for cache line coherence
282 struct any_read_source::awaitable_ops
283 {
284 bool (*await_ready)(void*);
285 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
286 io_result<std::size_t> (*await_resume)(void*);
287 void (*destroy)(void*) noexcept;
288 };
289
290 // ordered by call frequency for cache line coherence
291 struct any_read_source::vtable
292 {
293 awaitable_ops const* (*construct_read_some_awaitable)(
294 void* source,
295 void* storage,
296 std::span<mutable_buffer const> buffers);
297 awaitable_ops const* (*construct_read_awaitable)(
298 void* source,
299 void* storage,
300 std::span<mutable_buffer const> buffers);
301 std::size_t awaitable_size;
302 std::size_t awaitable_align;
303 void (*destroy)(void*) noexcept;
304 };
305
306 template<ReadSource S>
307 struct any_read_source::vtable_for_impl
308 {
309 using ReadSomeAwaitable = decltype(std::declval<S&>().read_some(
310 std::span<mutable_buffer const>{}));
311 using ReadAwaitable = decltype(std::declval<S&>().read(
312 std::span<mutable_buffer const>{}));
313
314 static void
315 6 do_destroy_impl(void* source) noexcept
316 {
317 6 static_cast<S*>(source)->~S();
318 6 }
319
320 static awaitable_ops const*
321 52 construct_read_some_awaitable_impl(
322 void* source,
323 void* storage,
324 std::span<mutable_buffer const> buffers)
325 {
326 52 auto& s = *static_cast<S*>(source);
327 52 ::new(storage) ReadSomeAwaitable(s.read_some(buffers));
328
329 static constexpr awaitable_ops ops = {
330 +[](void* p) {
331 return static_cast<ReadSomeAwaitable*>(p)->await_ready();
332 },
333 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
334 return detail::call_await_suspend(
335 static_cast<ReadSomeAwaitable*>(p), h, ex, token);
336 },
337 +[](void* p) {
338 return static_cast<ReadSomeAwaitable*>(p)->await_resume();
339 },
340 +[](void* p) noexcept {
341 static_cast<ReadSomeAwaitable*>(p)->~ReadSomeAwaitable();
342 }
343 };
344 52 return &ops;
345 }
346
347 static awaitable_ops const*
348 116 construct_read_awaitable_impl(
349 void* source,
350 void* storage,
351 std::span<mutable_buffer const> buffers)
352 {
353 116 auto& s = *static_cast<S*>(source);
354 116 ::new(storage) ReadAwaitable(s.read(buffers));
355
356 static constexpr awaitable_ops ops = {
357 +[](void* p) {
358 return static_cast<ReadAwaitable*>(p)->await_ready();
359 },
360 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
361 return detail::call_await_suspend(
362 static_cast<ReadAwaitable*>(p), h, ex, token);
363 },
364 +[](void* p) {
365 return static_cast<ReadAwaitable*>(p)->await_resume();
366 },
367 +[](void* p) noexcept {
368 static_cast<ReadAwaitable*>(p)->~ReadAwaitable();
369 }
370 };
371 116 return &ops;
372 }
373
374 static constexpr std::size_t max_awaitable_size =
375 sizeof(ReadSomeAwaitable) > sizeof(ReadAwaitable)
376 ? sizeof(ReadSomeAwaitable)
377 : sizeof(ReadAwaitable);
378 static constexpr std::size_t max_awaitable_align =
379 alignof(ReadSomeAwaitable) > alignof(ReadAwaitable)
380 ? alignof(ReadSomeAwaitable)
381 : alignof(ReadAwaitable);
382
383 static constexpr vtable value = {
384 &construct_read_some_awaitable_impl,
385 &construct_read_awaitable_impl,
386 max_awaitable_size,
387 max_awaitable_align,
388 &do_destroy_impl
389 };
390 };
391
392 //----------------------------------------------------------
393
394 inline
395 145 any_read_source::~any_read_source()
396 {
397
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 139 times.
145 if(storage_)
398 {
399 6 vt_->destroy(source_);
400 6 ::operator delete(storage_);
401 }
402
2/2
✓ Branch 0 taken 139 times.
✓ Branch 1 taken 6 times.
145 if(cached_awaitable_)
403 {
404
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 138 times.
139 if(active_ops_)
405 1 active_ops_->destroy(cached_awaitable_);
406 139 ::operator delete(cached_awaitable_);
407 }
408 145 }
409
410 inline any_read_source&
411 4 any_read_source::operator=(any_read_source&& other) noexcept
412 {
413
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 if(this != &other)
414 {
415
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
416 {
417 vt_->destroy(source_);
418 ::operator delete(storage_);
419 }
420
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 if(cached_awaitable_)
421 {
422
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if(active_ops_)
423 1 active_ops_->destroy(cached_awaitable_);
424 2 ::operator delete(cached_awaitable_);
425 }
426 3 source_ = std::exchange(other.source_, nullptr);
427 3 vt_ = std::exchange(other.vt_, nullptr);
428 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
429 3 storage_ = std::exchange(other.storage_, nullptr);
430 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
431 }
432 4 return *this;
433 }
434
435 template<ReadSource S>
436 requires (!std::same_as<std::decay_t<S>, any_read_source>)
437 6 any_read_source::any_read_source(S s)
438 6 : vt_(&vtable_for_impl<S>::value)
439 {
440 struct guard {
441 any_read_source* self;
442 bool committed = false;
443 6 ~guard() {
444
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 if(!committed && self->storage_) {
445 self->vt_->destroy(self->source_);
446 ::operator delete(self->storage_);
447 self->storage_ = nullptr;
448 self->source_ = nullptr;
449 }
450 6 }
451 6 } g{this};
452
453
1/1
✓ Branch 1 taken 6 times.
6 storage_ = ::operator new(sizeof(S));
454 6 source_ = ::new(storage_) S(std::move(s));
455
456 // Preallocate the awaitable storage
457
1/1
✓ Branch 1 taken 6 times.
6 cached_awaitable_ = ::operator new(vt_->awaitable_size);
458
459 6 g.committed = true;
460 6 }
461
462 template<ReadSource S>
463 135 any_read_source::any_read_source(S* s)
464 135 : source_(s)
465 135 , vt_(&vtable_for_impl<S>::value)
466 {
467 // Preallocate the awaitable storage
468 135 cached_awaitable_ = ::operator new(vt_->awaitable_size);
469 135 }
470
471 //----------------------------------------------------------
472
473 template<MutableBufferSequence MB>
474 auto
475 54 any_read_source::read_some(MB buffers)
476 {
477 struct awaitable
478 {
479 any_read_source* self_;
480 mutable_buffer_array<detail::max_iovec_> ba_;
481
482 awaitable(any_read_source* self, MB const& buffers)
483 : self_(self)
484 , ba_(buffers)
485 {
486 }
487
488 bool
489 await_ready() const noexcept
490 {
491 return ba_.to_span().empty();
492 }
493
494 coro
495 await_suspend(coro h, executor_ref ex, std::stop_token token)
496 {
497 self_->active_ops_ = self_->vt_->construct_read_some_awaitable(
498 self_->source_,
499 self_->cached_awaitable_,
500 ba_.to_span());
501
502 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
503 return h;
504
505 return self_->active_ops_->await_suspend(
506 self_->cached_awaitable_, h, ex, token);
507 }
508
509 io_result<std::size_t>
510 await_resume()
511 {
512 if(ba_.to_span().empty())
513 return {{}, 0};
514
515 struct guard {
516 any_read_source* self;
517 ~guard() {
518 self->active_ops_->destroy(self->cached_awaitable_);
519 self->active_ops_ = nullptr;
520 }
521 } g{self_};
522 return self_->active_ops_->await_resume(
523 self_->cached_awaitable_);
524 }
525 };
526 54 return awaitable(this, buffers);
527 }
528
529 inline auto
530 116 any_read_source::read_(std::span<mutable_buffer const> buffers)
531 {
532 struct awaitable
533 {
534 any_read_source* self_;
535 std::span<mutable_buffer const> buffers_;
536
537 bool
538 116 await_ready() const noexcept
539 {
540 116 return false;
541 }
542
543 coro
544 116 await_suspend(coro h, executor_ref ex, std::stop_token token)
545 {
546 232 self_->active_ops_ = self_->vt_->construct_read_awaitable(
547 116 self_->source_,
548 116 self_->cached_awaitable_,
549 buffers_);
550
551
1/2
✓ Branch 1 taken 116 times.
✗ Branch 2 not taken.
116 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
552 116 return h;
553
554 return self_->active_ops_->await_suspend(
555 self_->cached_awaitable_, h, ex, token);
556 }
557
558 io_result<std::size_t>
559 116 await_resume()
560 {
561 struct guard {
562 any_read_source* self;
563 116 ~guard() {
564 116 self->active_ops_->destroy(self->cached_awaitable_);
565 116 self->active_ops_ = nullptr;
566 116 }
567 116 } g{self_};
568 116 return self_->active_ops_->await_resume(
569
1/1
✓ Branch 1 taken 84 times.
200 self_->cached_awaitable_);
570 116 }
571 };
572 116 return awaitable{this, buffers};
573 }
574
575 template<MutableBufferSequence MB>
576 io_task<std::size_t>
577
1/1
✓ Branch 1 taken 110 times.
110 any_read_source::read(MB buffers)
578 {
579 buffer_param bp(buffers);
580 std::size_t total = 0;
581
582 for(;;)
583 {
584 auto bufs = bp.data();
585 if(bufs.empty())
586 break;
587
588 auto [ec, n] = co_await read_(bufs);
589 total += n;
590 if(ec)
591 co_return {ec, total};
592 bp.consume(n);
593 }
594
595 co_return {{}, total};
596 220 }
597
598 } // namespace capy
599 } // namespace boost
600
601 #endif
602