libs/capy/include/boost/capy/io/any_buffer_source.hpp

87.7% Lines (143/163) 90.7% Functions (39/43) 60.7% Branches (17/28)
libs/capy/include/boost/capy/io/any_buffer_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/buffers/slice.hpp>
19 #include <boost/capy/concept/buffer_source.hpp>
20 #include <boost/capy/concept/io_awaitable.hpp>
21 #include <boost/capy/concept/read_source.hpp>
22 #include <boost/capy/coro.hpp>
23 #include <boost/capy/error.hpp>
24 #include <boost/capy/ex/executor_ref.hpp>
25 #include <boost/capy/io_result.hpp>
26 #include <boost/capy/io_task.hpp>
27
28 #include <concepts>
29 #include <coroutine>
30 #include <cstddef>
31 #include <exception>
32 #include <new>
33 #include <span>
34 #include <stop_token>
35 #include <system_error>
36 #include <utility>
37
38 namespace boost {
39 namespace capy {
40
41 /** Type-erased wrapper for any BufferSource.
42
43 This class provides type erasure for any type satisfying the
44 @ref BufferSource concept, enabling runtime polymorphism for
45 buffer pull operations. It uses cached awaitable storage to achieve
46 zero steady-state allocation after construction.
47
48 The wrapper also satisfies @ref ReadSource. When the wrapped type
49 satisfies only @ref BufferSource, the read operations are
50 synthesized using @ref pull and @ref consume with an extra
51 buffer copy. When the wrapped type satisfies both @ref BufferSource
52 and @ref ReadSource, the native read operations are forwarded
53 directly across the virtual boundary, avoiding the copy.
54
55 The wrapper supports two construction modes:
56 - **Owning**: Pass by value to transfer ownership. The wrapper
57 allocates storage and owns the source.
58 - **Reference**: Pass a pointer to wrap without ownership. The
59 pointed-to source must outlive this wrapper.
60
61 Within each mode, the vtable is populated at compile time based
62 on whether the wrapped type also satisfies @ref ReadSource:
63 - **BufferSource only**: @ref read_some and @ref read are
64 synthesized from @ref pull and @ref consume, incurring one
65 buffer copy per operation.
66 - **BufferSource + ReadSource**: All read operations are
67 forwarded natively through the type-erased boundary with
68 no extra copy.
69
70 @par Awaitable Preallocation
71 The constructor preallocates storage for the type-erased awaitable.
72 This reserves all virtual address space at server startup
73 so memory usage can be measured up front, rather than
74 allocating piecemeal as traffic arrives.
75
76 @par Thread Safety
77 Not thread-safe. Concurrent operations on the same wrapper
78 are undefined behavior.
79
80 @par Example
81 @code
82 // Owning - takes ownership of the source
83 any_buffer_source abs(some_buffer_source{args...});
84
85 // Reference - wraps without ownership
86 some_buffer_source src;
87 any_buffer_source abs(&src);
88
89 const_buffer arr[16];
90 auto [ec, bufs] = co_await abs.pull(arr);
91
92 // ReadSource interface also available
93 char buf[64];
94 auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
95 @endcode
96
97 @see any_buffer_sink, BufferSource, ReadSource
98 */
99 class any_buffer_source
100 {
101 struct vtable;
102 struct awaitable_ops;
103 struct read_awaitable_ops;
104
105 template<BufferSource S>
106 struct vtable_for_impl;
107
108 // hot-path members first for cache locality
109 void* source_ = nullptr;
110 vtable const* vt_ = nullptr;
111 void* cached_awaitable_ = nullptr;
112 awaitable_ops const* active_ops_ = nullptr;
113 read_awaitable_ops const* active_read_ops_ = nullptr;
114 void* storage_ = nullptr;
115
116 public:
117 /** Destructor.
118
119 Destroys the owned source (if any) and releases the cached
120 awaitable storage.
121 */
122 ~any_buffer_source();
123
124 /** Default constructor.
125
126 Constructs an empty wrapper. Operations on a default-constructed
127 wrapper result in undefined behavior.
128 */
129 any_buffer_source() = default;
130
131 /** Non-copyable.
132
133 The awaitable cache is per-instance and cannot be shared.
134 */
135 any_buffer_source(any_buffer_source const&) = delete;
136 any_buffer_source& operator=(any_buffer_source const&) = delete;
137
138 /** Move constructor.
139
140 Transfers ownership of the wrapped source (if owned) and
141 cached awaitable storage from `other`. After the move, `other` is
142 in a default-constructed state.
143
144 @param other The wrapper to move from.
145 */
146 2 any_buffer_source(any_buffer_source&& other) noexcept
147 2 : source_(std::exchange(other.source_, nullptr))
148 2 , vt_(std::exchange(other.vt_, nullptr))
149 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
150 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
151 2 , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
152 2 , storage_(std::exchange(other.storage_, nullptr))
153 {
154 2 }
155
156 /** Move assignment operator.
157
158 Destroys any owned source and releases existing resources,
159 then transfers ownership from `other`.
160
161 @param other The wrapper to move from.
162 @return Reference to this wrapper.
163 */
164 any_buffer_source&
165 operator=(any_buffer_source&& other) noexcept;
166
167 /** Construct by taking ownership of a BufferSource.
168
169 Allocates storage and moves the source into this wrapper.
170 The wrapper owns the source and will destroy it. If `S` also
171 satisfies @ref ReadSource, native read operations are
172 forwarded through the virtual boundary.
173
174 @param s The source to take ownership of.
175 */
176 template<BufferSource S>
177 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
178 any_buffer_source(S s);
179
180 /** Construct by wrapping a BufferSource without ownership.
181
182 Wraps the given source by pointer. The source must remain
183 valid for the lifetime of this wrapper. If `S` also
184 satisfies @ref ReadSource, native read operations are
185 forwarded through the virtual boundary.
186
187 @param s Pointer to the source to wrap.
188 */
189 template<BufferSource S>
190 any_buffer_source(S* s);
191
192 /** Check if the wrapper contains a valid source.
193
194 @return `true` if wrapping a source, `false` if default-constructed
195 or moved-from.
196 */
197 bool
198 16 has_value() const noexcept
199 {
200 16 return source_ != nullptr;
201 }
202
203 /** Check if the wrapper contains a valid source.
204
205 @return `true` if wrapping a source, `false` if default-constructed
206 or moved-from.
207 */
208 explicit
209 2 operator bool() const noexcept
210 {
211 2 return has_value();
212 }
213
214 /** Consume bytes from the source.
215
216 Advances the internal read position of the underlying source
217 by the specified number of bytes. The next call to @ref pull
218 returns data starting after the consumed bytes.
219
220 @param n The number of bytes to consume. Must not exceed the
221 total size of buffers returned by the previous @ref pull.
222
223 @par Preconditions
224 The wrapper must contain a valid source (`has_value() == true`).
225 */
226 void
227 consume(std::size_t n) noexcept;
228
229 /** Pull buffer data from the source.
230
231 Fills the provided span with buffer descriptors from the
232 underlying source. The operation completes when data is
233 available, the source is exhausted, or an error occurs.
234
235 @param dest Span of const_buffer to fill.
236
237 @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
238 On success with data, a non-empty span of filled buffers.
239 On EOF, `ec == cond::eof` and span is empty.
240
241 @par Preconditions
242 The wrapper must contain a valid source (`has_value() == true`).
243 The caller must not call this function again after a prior
244 call returned an error.
245 */
246 auto
247 pull(std::span<const_buffer> dest);
248
249 /** Read some data into a mutable buffer sequence.
250
251 Reads one or more bytes into the caller's buffers. May fill
252 less than the full sequence.
253
254 When the wrapped type provides native @ref ReadSource support,
255 the operation forwards directly. Otherwise it is synthesized
256 from @ref pull, @ref buffer_copy, and @ref consume.
257
258 @param buffers The buffer sequence to fill.
259
260 @return An awaitable yielding `(error_code,std::size_t)`.
261
262 @par Preconditions
263 The wrapper must contain a valid source (`has_value() == true`).
264 The caller must not call this function again after a prior
265 call returned an error (including EOF).
266
267 @see pull, consume
268 */
269 template<MutableBufferSequence MB>
270 io_task<std::size_t>
271 read_some(MB buffers);
272
273 /** Read data into a mutable buffer sequence.
274
275 Fills the provided buffer sequence completely. When the
276 wrapped type provides native @ref ReadSource support, each
277 window is forwarded directly. Otherwise the data is
278 synthesized from @ref pull, @ref buffer_copy, and @ref consume.
279
280 @param buffers The buffer sequence to fill.
281
282 @return An awaitable yielding `(error_code,std::size_t)`.
283 On success, `n == buffer_size(buffers)`.
284 On EOF, `ec == error::eof` and `n` is bytes transferred.
285
286 @par Preconditions
287 The wrapper must contain a valid source (`has_value() == true`).
288 The caller must not call this function again after a prior
289 call returned an error (including EOF).
290
291 @see pull, consume
292 */
293 template<MutableBufferSequence MB>
294 io_task<std::size_t>
295 read(MB buffers);
296
297 protected:
298 /** Rebind to a new source after move.
299
300 Updates the internal pointer to reference a new source object.
301 Used by owning wrappers after move assignment when the owned
302 object has moved to a new location.
303
304 @param new_source The new source to bind to. Must be the same
305 type as the original source.
306
307 @note Terminates if called with a source of different type
308 than the original.
309 */
310 template<BufferSource S>
311 void
312 rebind(S& new_source) noexcept
313 {
314 if(vt_ != &vtable_for_impl<S>::value)
315 std::terminate();
316 source_ = &new_source;
317 }
318
319 private:
320 /** Forward a partial read through the vtable.
321
322 Constructs the underlying `read_some` awaitable in
323 cached storage and returns a type-erased awaitable.
324 */
325 auto
326 read_some_(std::span<mutable_buffer const> buffers);
327
328 /** Forward a complete read through the vtable.
329
330 Constructs the underlying `read` awaitable in
331 cached storage and returns a type-erased awaitable.
332 */
333 auto
334 read_(std::span<mutable_buffer const> buffers);
335 };
336
337 //----------------------------------------------------------
338
339 /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
340 struct any_buffer_source::awaitable_ops
341 {
342 bool (*await_ready)(void*);
343 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
344 io_result<std::span<const_buffer>> (*await_resume)(void*);
345 void (*destroy)(void*) noexcept;
346 };
347
348 /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
349 struct any_buffer_source::read_awaitable_ops
350 {
351 bool (*await_ready)(void*);
352 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
353 io_result<std::size_t> (*await_resume)(void*);
354 void (*destroy)(void*) noexcept;
355 };
356
357 struct any_buffer_source::vtable
358 {
359 // BufferSource ops (always populated)
360 void (*destroy)(void*) noexcept;
361 void (*do_consume)(void* source, std::size_t n) noexcept;
362 std::size_t awaitable_size;
363 std::size_t awaitable_align;
364 awaitable_ops const* (*construct_awaitable)(
365 void* source,
366 void* storage,
367 std::span<const_buffer> dest);
368
369 // ReadSource forwarding (null when wrapped type is BufferSource-only)
370 read_awaitable_ops const* (*construct_read_some_awaitable)(
371 void* source,
372 void* storage,
373 std::span<mutable_buffer const> buffers);
374 read_awaitable_ops const* (*construct_read_awaitable)(
375 void* source,
376 void* storage,
377 std::span<mutable_buffer const> buffers);
378 };
379
380 template<BufferSource S>
381 struct any_buffer_source::vtable_for_impl
382 {
383 using PullAwaitable = decltype(std::declval<S&>().pull(
384 std::declval<std::span<const_buffer>>()));
385
386 static void
387 7 do_destroy_impl(void* source) noexcept
388 {
389 7 static_cast<S*>(source)->~S();
390 7 }
391
392 static void
393 45 do_consume_impl(void* source, std::size_t n) noexcept
394 {
395 45 static_cast<S*>(source)->consume(n);
396 45 }
397
398 static awaitable_ops const*
399 110 construct_awaitable_impl(
400 void* source,
401 void* storage,
402 std::span<const_buffer> dest)
403 {
404 110 auto& s = *static_cast<S*>(source);
405 110 ::new(storage) PullAwaitable(s.pull(dest));
406
407 static constexpr awaitable_ops ops = {
408 +[](void* p) {
409 return static_cast<PullAwaitable*>(p)->await_ready();
410 },
411 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
412 return detail::call_await_suspend(
413 static_cast<PullAwaitable*>(p), h, ex, token);
414 },
415 +[](void* p) {
416 return static_cast<PullAwaitable*>(p)->await_resume();
417 },
418 +[](void* p) noexcept {
419 static_cast<PullAwaitable*>(p)->~PullAwaitable();
420 }
421 };
422 110 return &ops;
423 }
424
425 //------------------------------------------------------
426 // ReadSource forwarding (only instantiated when ReadSource<S>)
427
428 static read_awaitable_ops const*
429 48 construct_read_some_awaitable_impl(
430 void* source,
431 void* storage,
432 std::span<mutable_buffer const> buffers)
433 requires ReadSource<S>
434 {
435 using Aw = decltype(std::declval<S&>().read_some(
436 std::span<mutable_buffer const>{}));
437 48 auto& s = *static_cast<S*>(source);
438 48 ::new(storage) Aw(s.read_some(buffers));
439
440 static constexpr read_awaitable_ops ops = {
441 48 +[](void* p) {
442 48 return static_cast<Aw*>(p)->await_ready();
443 },
444 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
445 return detail::call_await_suspend(
446 static_cast<Aw*>(p), h, ex, token);
447 },
448 48 +[](void* p) {
449 48 return static_cast<Aw*>(p)->await_resume();
450 },
451 48 +[](void* p) noexcept {
452 48 static_cast<Aw*>(p)->~Aw();
453 }
454 };
455 48 return &ops;
456 }
457
458 static read_awaitable_ops const*
459 18 construct_read_awaitable_impl(
460 void* source,
461 void* storage,
462 std::span<mutable_buffer const> buffers)
463 requires ReadSource<S>
464 {
465 using Aw = decltype(std::declval<S&>().read(
466 std::span<mutable_buffer const>{}));
467 18 auto& s = *static_cast<S*>(source);
468 18 ::new(storage) Aw(s.read(buffers));
469
470 static constexpr read_awaitable_ops ops = {
471 18 +[](void* p) {
472 18 return static_cast<Aw*>(p)->await_ready();
473 },
474 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
475 return detail::call_await_suspend(
476 static_cast<Aw*>(p), h, ex, token);
477 },
478 18 +[](void* p) {
479 18 return static_cast<Aw*>(p)->await_resume();
480 },
481 18 +[](void* p) noexcept {
482 18 static_cast<Aw*>(p)->~Aw();
483 }
484 };
485 18 return &ops;
486 }
487
488 //------------------------------------------------------
489
490 static consteval std::size_t
491 compute_max_size() noexcept
492 {
493 std::size_t s = sizeof(PullAwaitable);
494 if constexpr (ReadSource<S>)
495 {
496 using RS = decltype(std::declval<S&>().read_some(
497 std::span<mutable_buffer const>{}));
498 using R = decltype(std::declval<S&>().read(
499 std::span<mutable_buffer const>{}));
500
501 if(sizeof(RS) > s) s = sizeof(RS);
502 if(sizeof(R) > s) s = sizeof(R);
503 }
504 return s;
505 }
506
507 static consteval std::size_t
508 compute_max_align() noexcept
509 {
510 std::size_t a = alignof(PullAwaitable);
511 if constexpr (ReadSource<S>)
512 {
513 using RS = decltype(std::declval<S&>().read_some(
514 std::span<mutable_buffer const>{}));
515 using R = decltype(std::declval<S&>().read(
516 std::span<mutable_buffer const>{}));
517
518 if(alignof(RS) > a) a = alignof(RS);
519 if(alignof(R) > a) a = alignof(R);
520 }
521 return a;
522 }
523
524 static consteval vtable
525 make_vtable() noexcept
526 {
527 vtable v{};
528 v.destroy = &do_destroy_impl;
529 v.do_consume = &do_consume_impl;
530 v.awaitable_size = compute_max_size();
531 v.awaitable_align = compute_max_align();
532 v.construct_awaitable = &construct_awaitable_impl;
533 v.construct_read_some_awaitable = nullptr;
534 v.construct_read_awaitable = nullptr;
535
536 if constexpr (ReadSource<S>)
537 {
538 v.construct_read_some_awaitable =
539 &construct_read_some_awaitable_impl;
540 v.construct_read_awaitable =
541 &construct_read_awaitable_impl;
542 }
543 return v;
544 }
545
546 static constexpr vtable value = make_vtable();
547 };
548
549 //----------------------------------------------------------
550
551 inline
552 124 any_buffer_source::~any_buffer_source()
553 {
554
2/2
✓ Branch 0 taken 7 times.
✓ Branch 1 taken 117 times.
124 if(storage_)
555 {
556 7 vt_->destroy(source_);
557 7 ::operator delete(storage_);
558 }
559
2/2
✓ Branch 0 taken 119 times.
✓ Branch 1 taken 5 times.
124 if(cached_awaitable_)
560 119 ::operator delete(cached_awaitable_);
561 124 }
562
563 inline any_buffer_source&
564 2 any_buffer_source::operator=(any_buffer_source&& other) noexcept
565 {
566
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if(this != &other)
567 {
568
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if(storage_)
569 {
570 vt_->destroy(source_);
571 ::operator delete(storage_);
572 }
573
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if(cached_awaitable_)
574 ::operator delete(cached_awaitable_);
575 2 source_ = std::exchange(other.source_, nullptr);
576 2 vt_ = std::exchange(other.vt_, nullptr);
577 2 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
578 2 storage_ = std::exchange(other.storage_, nullptr);
579 2 active_ops_ = std::exchange(other.active_ops_, nullptr);
580 2 active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
581 }
582 2 return *this;
583 }
584
585 template<BufferSource S>
586 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
587 7 any_buffer_source::any_buffer_source(S s)
588 7 : vt_(&vtable_for_impl<S>::value)
589 {
590 struct guard {
591 any_buffer_source* self;
592 bool committed = false;
593 7 ~guard() {
594
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 7 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
7 if(!committed && self->storage_) {
595 self->vt_->destroy(self->source_);
596 ::operator delete(self->storage_);
597 self->storage_ = nullptr;
598 self->source_ = nullptr;
599 }
600 7 }
601 7 } g{this};
602
603
1/1
✓ Branch 1 taken 7 times.
7 storage_ = ::operator new(sizeof(S));
604 7 source_ = ::new(storage_) S(std::move(s));
605
606
1/1
✓ Branch 1 taken 7 times.
7 cached_awaitable_ = ::operator new(vt_->awaitable_size);
607
608 7 g.committed = true;
609 7 }
610
611 template<BufferSource S>
612 112 any_buffer_source::any_buffer_source(S* s)
613 112 : source_(s)
614 112 , vt_(&vtable_for_impl<S>::value)
615 {
616 112 cached_awaitable_ = ::operator new(vt_->awaitable_size);
617 112 }
618
619 //----------------------------------------------------------
620
621 inline void
622 45 any_buffer_source::consume(std::size_t n) noexcept
623 {
624 45 vt_->do_consume(source_, n);
625 45 }
626
627 inline auto
628 110 any_buffer_source::pull(std::span<const_buffer> dest)
629 {
630 struct awaitable
631 {
632 any_buffer_source* self_;
633 std::span<const_buffer> dest_;
634
635 bool
636 110 await_ready()
637 {
638 220 self_->active_ops_ = self_->vt_->construct_awaitable(
639 110 self_->source_,
640 110 self_->cached_awaitable_,
641 dest_);
642 110 return self_->active_ops_->await_ready(self_->cached_awaitable_);
643 }
644
645 coro
646 await_suspend(coro h, executor_ref ex, std::stop_token token)
647 {
648 return self_->active_ops_->await_suspend(
649 self_->cached_awaitable_, h, ex, token);
650 }
651
652 io_result<std::span<const_buffer>>
653 110 await_resume()
654 {
655 struct guard {
656 any_buffer_source* self;
657 110 ~guard() {
658 110 self->active_ops_->destroy(self->cached_awaitable_);
659 110 self->active_ops_ = nullptr;
660 110 }
661 110 } g{self_};
662 110 return self_->active_ops_->await_resume(
663
1/1
✓ Branch 1 taken 85 times.
195 self_->cached_awaitable_);
664 110 }
665 };
666 110 return awaitable{this, dest};
667 }
668
669 //----------------------------------------------------------
670 // Private helpers for native ReadSource forwarding
671
672 inline auto
673 48 any_buffer_source::read_some_(
674 std::span<mutable_buffer const> buffers)
675 {
676 struct awaitable
677 {
678 any_buffer_source* self_;
679 std::span<mutable_buffer const> buffers_;
680
681 bool
682 48 await_ready() const noexcept
683 {
684 48 return false;
685 }
686
687 coro
688 48 await_suspend(coro h, executor_ref ex, std::stop_token token)
689 {
690 96 self_->active_read_ops_ =
691 96 self_->vt_->construct_read_some_awaitable(
692 48 self_->source_,
693 48 self_->cached_awaitable_,
694 buffers_);
695
696
1/2
✓ Branch 0 taken 48 times.
✗ Branch 1 not taken.
48 if(self_->active_read_ops_->await_ready(
697 48 self_->cached_awaitable_))
698 48 return h;
699
700 return self_->active_read_ops_->await_suspend(
701 self_->cached_awaitable_, h, ex, token);
702 }
703
704 io_result<std::size_t>
705 48 await_resume()
706 {
707 struct guard {
708 any_buffer_source* self;
709 48 ~guard() {
710 48 self->active_read_ops_->destroy(
711 48 self->cached_awaitable_);
712 48 self->active_read_ops_ = nullptr;
713 48 }
714 48 } g{self_};
715 48 return self_->active_read_ops_->await_resume(
716
1/1
✓ Branch 1 taken 40 times.
88 self_->cached_awaitable_);
717 48 }
718 };
719 48 return awaitable{this, buffers};
720 }
721
722 inline auto
723 18 any_buffer_source::read_(
724 std::span<mutable_buffer const> buffers)
725 {
726 struct awaitable
727 {
728 any_buffer_source* self_;
729 std::span<mutable_buffer const> buffers_;
730
731 bool
732 18 await_ready() const noexcept
733 {
734 18 return false;
735 }
736
737 coro
738 18 await_suspend(coro h, executor_ref ex, std::stop_token token)
739 {
740 36 self_->active_read_ops_ =
741 36 self_->vt_->construct_read_awaitable(
742 18 self_->source_,
743 18 self_->cached_awaitable_,
744 buffers_);
745
746
1/2
✓ Branch 0 taken 18 times.
✗ Branch 1 not taken.
18 if(self_->active_read_ops_->await_ready(
747 18 self_->cached_awaitable_))
748 18 return h;
749
750 return self_->active_read_ops_->await_suspend(
751 self_->cached_awaitable_, h, ex, token);
752 }
753
754 io_result<std::size_t>
755 18 await_resume()
756 {
757 struct guard {
758 any_buffer_source* self;
759 18 ~guard() {
760 18 self->active_read_ops_->destroy(
761 18 self->cached_awaitable_);
762 18 self->active_read_ops_ = nullptr;
763 18 }
764 18 } g{self_};
765 18 return self_->active_read_ops_->await_resume(
766
1/1
✓ Branch 1 taken 12 times.
30 self_->cached_awaitable_);
767 18 }
768 };
769 18 return awaitable{this, buffers};
770 }
771
772 //----------------------------------------------------------
773 // Public ReadSource methods
774
775 template<MutableBufferSequence MB>
776 io_task<std::size_t>
777
1/1
✓ Branch 1 taken 58 times.
58 any_buffer_source::read_some(MB buffers)
778 {
779 buffer_param<MB> bp(buffers);
780 auto dest = bp.data();
781 if(dest.empty())
782 co_return {{}, 0};
783
784 // Native ReadSource path
785 if(vt_->construct_read_some_awaitable)
786 co_return co_await read_some_(dest);
787
788 // Synthesized path: pull + buffer_copy + consume
789 const_buffer arr[detail::max_iovec_];
790 auto [ec, bufs] = co_await pull(arr);
791 if(ec)
792 co_return {ec, 0};
793
794 auto n = buffer_copy(dest, bufs);
795 consume(n);
796 co_return {{}, n};
797 116 }
798
799 template<MutableBufferSequence MB>
800 io_task<std::size_t>
801
1/1
✓ Branch 1 taken 24 times.
24 any_buffer_source::read(MB buffers)
802 {
803 buffer_param<MB> bp(buffers);
804 std::size_t total = 0;
805
806 // Native ReadSource path
807 if(vt_->construct_read_awaitable)
808 {
809 for(;;)
810 {
811 auto dest = bp.data();
812 if(dest.empty())
813 break;
814
815 auto [ec, n] = co_await read_(dest);
816 total += n;
817 if(ec)
818 co_return {ec, total};
819 bp.consume(n);
820 }
821 co_return {{}, total};
822 }
823
824 // Synthesized path: pull + buffer_copy + consume
825 for(;;)
826 {
827 auto dest = bp.data();
828 if(dest.empty())
829 break;
830
831 const_buffer arr[detail::max_iovec_];
832 auto [ec, bufs] = co_await pull(arr);
833
834 if(ec)
835 co_return {ec, total};
836
837 auto n = buffer_copy(dest, bufs);
838 consume(n);
839 total += n;
840 bp.consume(n);
841 }
842
843 co_return {{}, total};
844 48 }
845
846 //----------------------------------------------------------
847
848 static_assert(BufferSource<any_buffer_source>);
849 static_assert(ReadSource<any_buffer_source>);
850
851 } // namespace capy
852 } // namespace boost
853
854 #endif
855