Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/buffers/slice.hpp>
19 : #include <boost/capy/concept/buffer_source.hpp>
20 : #include <boost/capy/concept/io_awaitable.hpp>
21 : #include <boost/capy/concept/read_source.hpp>
22 : #include <boost/capy/coro.hpp>
23 : #include <boost/capy/error.hpp>
24 : #include <boost/capy/ex/executor_ref.hpp>
25 : #include <boost/capy/io_result.hpp>
26 : #include <boost/capy/io_task.hpp>
27 :
28 : #include <concepts>
29 : #include <coroutine>
30 : #include <cstddef>
31 : #include <exception>
32 : #include <new>
33 : #include <span>
34 : #include <stop_token>
35 : #include <system_error>
36 : #include <utility>
37 :
38 : namespace boost {
39 : namespace capy {
40 :
41 : /** Type-erased wrapper for any BufferSource.
42 :
43 : This class provides type erasure for any type satisfying the
44 : @ref BufferSource concept, enabling runtime polymorphism for
45 : buffer pull operations. It uses cached awaitable storage to achieve
46 : zero steady-state allocation after construction.
47 :
48 : The wrapper also satisfies @ref ReadSource. When the wrapped type
49 : satisfies only @ref BufferSource, the read operations are
50 : synthesized using @ref pull and @ref consume with an extra
51 : buffer copy. When the wrapped type satisfies both @ref BufferSource
52 : and @ref ReadSource, the native read operations are forwarded
53 : directly across the virtual boundary, avoiding the copy.
54 :
55 : The wrapper supports two construction modes:
56 : - **Owning**: Pass by value to transfer ownership. The wrapper
57 : allocates storage and owns the source.
58 : - **Reference**: Pass a pointer to wrap without ownership. The
59 : pointed-to source must outlive this wrapper.
60 :
61 : Within each mode, the vtable is populated at compile time based
62 : on whether the wrapped type also satisfies @ref ReadSource:
63 : - **BufferSource only**: @ref read_some and @ref read are
64 : synthesized from @ref pull and @ref consume, incurring one
65 : buffer copy per operation.
66 : - **BufferSource + ReadSource**: All read operations are
67 : forwarded natively through the type-erased boundary with
68 : no extra copy.
69 :
70 : @par Awaitable Preallocation
71 : The constructor preallocates storage for the type-erased awaitable.
72 : This reserves all virtual address space at server startup
73 : so memory usage can be measured up front, rather than
74 : allocating piecemeal as traffic arrives.
75 :
76 : @par Thread Safety
77 : Not thread-safe. Concurrent operations on the same wrapper
78 : are undefined behavior.
79 :
80 : @par Example
81 : @code
82 : // Owning - takes ownership of the source
83 : any_buffer_source abs(some_buffer_source{args...});
84 :
85 : // Reference - wraps without ownership
86 : some_buffer_source src;
87 : any_buffer_source abs(&src);
88 :
89 : const_buffer arr[16];
90 : auto [ec, bufs] = co_await abs.pull(arr);
91 :
92 : // ReadSource interface also available
93 : char buf[64];
94 : auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
95 : @endcode
96 :
97 : @see any_buffer_sink, BufferSource, ReadSource
98 : */
99 : class any_buffer_source
100 : {
101 : struct vtable;
102 : struct awaitable_ops;
103 : struct read_awaitable_ops;
104 :
105 : template<BufferSource S>
106 : struct vtable_for_impl;
107 :
108 : // hot-path members first for cache locality
109 : void* source_ = nullptr;
110 : vtable const* vt_ = nullptr;
111 : void* cached_awaitable_ = nullptr;
112 : awaitable_ops const* active_ops_ = nullptr;
113 : read_awaitable_ops const* active_read_ops_ = nullptr;
114 : void* storage_ = nullptr;
115 :
116 : public:
117 : /** Destructor.
118 :
119 : Destroys the owned source (if any) and releases the cached
120 : awaitable storage.
121 : */
122 : ~any_buffer_source();
123 :
124 : /** Default constructor.
125 :
126 : Constructs an empty wrapper. Operations on a default-constructed
127 : wrapper result in undefined behavior.
128 : */
129 : any_buffer_source() = default;
130 :
131 : /** Non-copyable.
132 :
133 : The awaitable cache is per-instance and cannot be shared.
134 : */
135 : any_buffer_source(any_buffer_source const&) = delete;
136 : any_buffer_source& operator=(any_buffer_source const&) = delete;
137 :
138 : /** Move constructor.
139 :
140 : Transfers ownership of the wrapped source (if owned) and
141 : cached awaitable storage from `other`. After the move, `other` is
142 : in a default-constructed state.
143 :
144 : @param other The wrapper to move from.
145 : */
146 2 : any_buffer_source(any_buffer_source&& other) noexcept
147 2 : : source_(std::exchange(other.source_, nullptr))
148 2 : , vt_(std::exchange(other.vt_, nullptr))
149 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
150 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
151 2 : , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
152 2 : , storage_(std::exchange(other.storage_, nullptr))
153 : {
154 2 : }
155 :
156 : /** Move assignment operator.
157 :
158 : Destroys any owned source and releases existing resources,
159 : then transfers ownership from `other`.
160 :
161 : @param other The wrapper to move from.
162 : @return Reference to this wrapper.
163 : */
164 : any_buffer_source&
165 : operator=(any_buffer_source&& other) noexcept;
166 :
167 : /** Construct by taking ownership of a BufferSource.
168 :
169 : Allocates storage and moves the source into this wrapper.
170 : The wrapper owns the source and will destroy it. If `S` also
171 : satisfies @ref ReadSource, native read operations are
172 : forwarded through the virtual boundary.
173 :
174 : @param s The source to take ownership of.
175 : */
176 : template<BufferSource S>
177 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
178 : any_buffer_source(S s);
179 :
180 : /** Construct by wrapping a BufferSource without ownership.
181 :
182 : Wraps the given source by pointer. The source must remain
183 : valid for the lifetime of this wrapper. If `S` also
184 : satisfies @ref ReadSource, native read operations are
185 : forwarded through the virtual boundary.
186 :
187 : @param s Pointer to the source to wrap.
188 : */
189 : template<BufferSource S>
190 : any_buffer_source(S* s);
191 :
192 : /** Check if the wrapper contains a valid source.
193 :
194 : @return `true` if wrapping a source, `false` if default-constructed
195 : or moved-from.
196 : */
197 : bool
198 16 : has_value() const noexcept
199 : {
200 16 : return source_ != nullptr;
201 : }
202 :
203 : /** Check if the wrapper contains a valid source.
204 :
205 : @return `true` if wrapping a source, `false` if default-constructed
206 : or moved-from.
207 : */
208 : explicit
209 2 : operator bool() const noexcept
210 : {
211 2 : return has_value();
212 : }
213 :
214 : /** Consume bytes from the source.
215 :
216 : Advances the internal read position of the underlying source
217 : by the specified number of bytes. The next call to @ref pull
218 : returns data starting after the consumed bytes.
219 :
220 : @param n The number of bytes to consume. Must not exceed the
221 : total size of buffers returned by the previous @ref pull.
222 :
223 : @par Preconditions
224 : The wrapper must contain a valid source (`has_value() == true`).
225 : */
226 : void
227 : consume(std::size_t n) noexcept;
228 :
229 : /** Pull buffer data from the source.
230 :
231 : Fills the provided span with buffer descriptors from the
232 : underlying source. The operation completes when data is
233 : available, the source is exhausted, or an error occurs.
234 :
235 : @param dest Span of const_buffer to fill.
236 :
237 : @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
238 : On success with data, a non-empty span of filled buffers.
239 : On EOF, `ec == cond::eof` and span is empty.
240 :
241 : @par Preconditions
242 : The wrapper must contain a valid source (`has_value() == true`).
243 : The caller must not call this function again after a prior
244 : call returned an error.
245 : */
246 : auto
247 : pull(std::span<const_buffer> dest);
248 :
249 : /** Read some data into a mutable buffer sequence.
250 :
251 : Reads one or more bytes into the caller's buffers. May fill
252 : less than the full sequence.
253 :
254 : When the wrapped type provides native @ref ReadSource support,
255 : the operation forwards directly. Otherwise it is synthesized
256 : from @ref pull, @ref buffer_copy, and @ref consume.
257 :
258 : @param buffers The buffer sequence to fill.
259 :
260 : @return An awaitable yielding `(error_code,std::size_t)`.
261 :
262 : @par Preconditions
263 : The wrapper must contain a valid source (`has_value() == true`).
264 : The caller must not call this function again after a prior
265 : call returned an error (including EOF).
266 :
267 : @see pull, consume
268 : */
269 : template<MutableBufferSequence MB>
270 : io_task<std::size_t>
271 : read_some(MB buffers);
272 :
273 : /** Read data into a mutable buffer sequence.
274 :
275 : Fills the provided buffer sequence completely. When the
276 : wrapped type provides native @ref ReadSource support, each
277 : window is forwarded directly. Otherwise the data is
278 : synthesized from @ref pull, @ref buffer_copy, and @ref consume.
279 :
280 : @param buffers The buffer sequence to fill.
281 :
282 : @return An awaitable yielding `(error_code,std::size_t)`.
283 : On success, `n == buffer_size(buffers)`.
284 : On EOF, `ec == error::eof` and `n` is bytes transferred.
285 :
286 : @par Preconditions
287 : The wrapper must contain a valid source (`has_value() == true`).
288 : The caller must not call this function again after a prior
289 : call returned an error (including EOF).
290 :
291 : @see pull, consume
292 : */
293 : template<MutableBufferSequence MB>
294 : io_task<std::size_t>
295 : read(MB buffers);
296 :
297 : protected:
298 : /** Rebind to a new source after move.
299 :
300 : Updates the internal pointer to reference a new source object.
301 : Used by owning wrappers after move assignment when the owned
302 : object has moved to a new location.
303 :
304 : @param new_source The new source to bind to. Must be the same
305 : type as the original source.
306 :
307 : @note Terminates if called with a source of different type
308 : than the original.
309 : */
310 : template<BufferSource S>
311 : void
312 : rebind(S& new_source) noexcept
313 : {
314 : if(vt_ != &vtable_for_impl<S>::value)
315 : std::terminate();
316 : source_ = &new_source;
317 : }
318 :
319 : private:
320 : /** Forward a partial read through the vtable.
321 :
322 : Constructs the underlying `read_some` awaitable in
323 : cached storage and returns a type-erased awaitable.
324 : */
325 : auto
326 : read_some_(std::span<mutable_buffer const> buffers);
327 :
328 : /** Forward a complete read through the vtable.
329 :
330 : Constructs the underlying `read` awaitable in
331 : cached storage and returns a type-erased awaitable.
332 : */
333 : auto
334 : read_(std::span<mutable_buffer const> buffers);
335 : };
336 :
337 : //----------------------------------------------------------
338 :
339 : /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
340 : struct any_buffer_source::awaitable_ops
341 : {
342 : bool (*await_ready)(void*);
343 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
344 : io_result<std::span<const_buffer>> (*await_resume)(void*);
345 : void (*destroy)(void*) noexcept;
346 : };
347 :
348 : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
349 : struct any_buffer_source::read_awaitable_ops
350 : {
351 : bool (*await_ready)(void*);
352 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
353 : io_result<std::size_t> (*await_resume)(void*);
354 : void (*destroy)(void*) noexcept;
355 : };
356 :
357 : struct any_buffer_source::vtable
358 : {
359 : // BufferSource ops (always populated)
360 : void (*destroy)(void*) noexcept;
361 : void (*do_consume)(void* source, std::size_t n) noexcept;
362 : std::size_t awaitable_size;
363 : std::size_t awaitable_align;
364 : awaitable_ops const* (*construct_awaitable)(
365 : void* source,
366 : void* storage,
367 : std::span<const_buffer> dest);
368 :
369 : // ReadSource forwarding (null when wrapped type is BufferSource-only)
370 : read_awaitable_ops const* (*construct_read_some_awaitable)(
371 : void* source,
372 : void* storage,
373 : std::span<mutable_buffer const> buffers);
374 : read_awaitable_ops const* (*construct_read_awaitable)(
375 : void* source,
376 : void* storage,
377 : std::span<mutable_buffer const> buffers);
378 : };
379 :
380 : template<BufferSource S>
381 : struct any_buffer_source::vtable_for_impl
382 : {
383 : using PullAwaitable = decltype(std::declval<S&>().pull(
384 : std::declval<std::span<const_buffer>>()));
385 :
386 : static void
387 7 : do_destroy_impl(void* source) noexcept
388 : {
389 7 : static_cast<S*>(source)->~S();
390 7 : }
391 :
392 : static void
393 45 : do_consume_impl(void* source, std::size_t n) noexcept
394 : {
395 45 : static_cast<S*>(source)->consume(n);
396 45 : }
397 :
398 : static awaitable_ops const*
399 110 : construct_awaitable_impl(
400 : void* source,
401 : void* storage,
402 : std::span<const_buffer> dest)
403 : {
404 110 : auto& s = *static_cast<S*>(source);
405 110 : ::new(storage) PullAwaitable(s.pull(dest));
406 :
407 : static constexpr awaitable_ops ops = {
408 110 : +[](void* p) {
409 110 : return static_cast<PullAwaitable*>(p)->await_ready();
410 : },
411 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
412 0 : return detail::call_await_suspend(
413 0 : static_cast<PullAwaitable*>(p), h, ex, token);
414 : },
415 110 : +[](void* p) {
416 110 : return static_cast<PullAwaitable*>(p)->await_resume();
417 : },
418 110 : +[](void* p) noexcept {
419 110 : static_cast<PullAwaitable*>(p)->~PullAwaitable();
420 : }
421 : };
422 110 : return &ops;
423 : }
424 :
425 : //------------------------------------------------------
426 : // ReadSource forwarding (only instantiated when ReadSource<S>)
427 :
428 : static read_awaitable_ops const*
429 48 : construct_read_some_awaitable_impl(
430 : void* source,
431 : void* storage,
432 : std::span<mutable_buffer const> buffers)
433 : requires ReadSource<S>
434 : {
435 : using Aw = decltype(std::declval<S&>().read_some(
436 : std::span<mutable_buffer const>{}));
437 48 : auto& s = *static_cast<S*>(source);
438 48 : ::new(storage) Aw(s.read_some(buffers));
439 :
440 : static constexpr read_awaitable_ops ops = {
441 48 : +[](void* p) {
442 48 : return static_cast<Aw*>(p)->await_ready();
443 : },
444 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
445 0 : return detail::call_await_suspend(
446 0 : static_cast<Aw*>(p), h, ex, token);
447 : },
448 48 : +[](void* p) {
449 48 : return static_cast<Aw*>(p)->await_resume();
450 : },
451 48 : +[](void* p) noexcept {
452 48 : static_cast<Aw*>(p)->~Aw();
453 : }
454 : };
455 48 : return &ops;
456 : }
457 :
458 : static read_awaitable_ops const*
459 18 : construct_read_awaitable_impl(
460 : void* source,
461 : void* storage,
462 : std::span<mutable_buffer const> buffers)
463 : requires ReadSource<S>
464 : {
465 : using Aw = decltype(std::declval<S&>().read(
466 : std::span<mutable_buffer const>{}));
467 18 : auto& s = *static_cast<S*>(source);
468 18 : ::new(storage) Aw(s.read(buffers));
469 :
470 : static constexpr read_awaitable_ops ops = {
471 18 : +[](void* p) {
472 18 : return static_cast<Aw*>(p)->await_ready();
473 : },
474 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
475 0 : return detail::call_await_suspend(
476 0 : static_cast<Aw*>(p), h, ex, token);
477 : },
478 18 : +[](void* p) {
479 18 : return static_cast<Aw*>(p)->await_resume();
480 : },
481 18 : +[](void* p) noexcept {
482 18 : static_cast<Aw*>(p)->~Aw();
483 : }
484 : };
485 18 : return &ops;
486 : }
487 :
488 : //------------------------------------------------------
489 :
490 : static consteval std::size_t
491 : compute_max_size() noexcept
492 : {
493 : std::size_t s = sizeof(PullAwaitable);
494 : if constexpr (ReadSource<S>)
495 : {
496 : using RS = decltype(std::declval<S&>().read_some(
497 : std::span<mutable_buffer const>{}));
498 : using R = decltype(std::declval<S&>().read(
499 : std::span<mutable_buffer const>{}));
500 :
501 : if(sizeof(RS) > s) s = sizeof(RS);
502 : if(sizeof(R) > s) s = sizeof(R);
503 : }
504 : return s;
505 : }
506 :
507 : static consteval std::size_t
508 : compute_max_align() noexcept
509 : {
510 : std::size_t a = alignof(PullAwaitable);
511 : if constexpr (ReadSource<S>)
512 : {
513 : using RS = decltype(std::declval<S&>().read_some(
514 : std::span<mutable_buffer const>{}));
515 : using R = decltype(std::declval<S&>().read(
516 : std::span<mutable_buffer const>{}));
517 :
518 : if(alignof(RS) > a) a = alignof(RS);
519 : if(alignof(R) > a) a = alignof(R);
520 : }
521 : return a;
522 : }
523 :
524 : static consteval vtable
525 : make_vtable() noexcept
526 : {
527 : vtable v{};
528 : v.destroy = &do_destroy_impl;
529 : v.do_consume = &do_consume_impl;
530 : v.awaitable_size = compute_max_size();
531 : v.awaitable_align = compute_max_align();
532 : v.construct_awaitable = &construct_awaitable_impl;
533 : v.construct_read_some_awaitable = nullptr;
534 : v.construct_read_awaitable = nullptr;
535 :
536 : if constexpr (ReadSource<S>)
537 : {
538 : v.construct_read_some_awaitable =
539 : &construct_read_some_awaitable_impl;
540 : v.construct_read_awaitable =
541 : &construct_read_awaitable_impl;
542 : }
543 : return v;
544 : }
545 :
546 : static constexpr vtable value = make_vtable();
547 : };
548 :
549 : //----------------------------------------------------------
550 :
551 : inline
552 124 : any_buffer_source::~any_buffer_source()
553 : {
554 124 : if(storage_)
555 : {
556 7 : vt_->destroy(source_);
557 7 : ::operator delete(storage_);
558 : }
559 124 : if(cached_awaitable_)
560 119 : ::operator delete(cached_awaitable_);
561 124 : }
562 :
563 : inline any_buffer_source&
564 2 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
565 : {
566 2 : if(this != &other)
567 : {
568 2 : if(storage_)
569 : {
570 0 : vt_->destroy(source_);
571 0 : ::operator delete(storage_);
572 : }
573 2 : if(cached_awaitable_)
574 0 : ::operator delete(cached_awaitable_);
575 2 : source_ = std::exchange(other.source_, nullptr);
576 2 : vt_ = std::exchange(other.vt_, nullptr);
577 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
578 2 : storage_ = std::exchange(other.storage_, nullptr);
579 2 : active_ops_ = std::exchange(other.active_ops_, nullptr);
580 2 : active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
581 : }
582 2 : return *this;
583 : }
584 :
585 : template<BufferSource S>
586 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
587 7 : any_buffer_source::any_buffer_source(S s)
588 7 : : vt_(&vtable_for_impl<S>::value)
589 : {
590 : struct guard {
591 : any_buffer_source* self;
592 : bool committed = false;
593 7 : ~guard() {
594 7 : if(!committed && self->storage_) {
595 0 : self->vt_->destroy(self->source_);
596 0 : ::operator delete(self->storage_);
597 0 : self->storage_ = nullptr;
598 0 : self->source_ = nullptr;
599 : }
600 7 : }
601 7 : } g{this};
602 :
603 7 : storage_ = ::operator new(sizeof(S));
604 7 : source_ = ::new(storage_) S(std::move(s));
605 :
606 7 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
607 :
608 7 : g.committed = true;
609 7 : }
610 :
611 : template<BufferSource S>
612 112 : any_buffer_source::any_buffer_source(S* s)
613 112 : : source_(s)
614 112 : , vt_(&vtable_for_impl<S>::value)
615 : {
616 112 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
617 112 : }
618 :
619 : //----------------------------------------------------------
620 :
621 : inline void
622 45 : any_buffer_source::consume(std::size_t n) noexcept
623 : {
624 45 : vt_->do_consume(source_, n);
625 45 : }
626 :
627 : inline auto
628 110 : any_buffer_source::pull(std::span<const_buffer> dest)
629 : {
630 : struct awaitable
631 : {
632 : any_buffer_source* self_;
633 : std::span<const_buffer> dest_;
634 :
635 : bool
636 110 : await_ready()
637 : {
638 220 : self_->active_ops_ = self_->vt_->construct_awaitable(
639 110 : self_->source_,
640 110 : self_->cached_awaitable_,
641 : dest_);
642 110 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
643 : }
644 :
645 : coro
646 0 : await_suspend(coro h, executor_ref ex, std::stop_token token)
647 : {
648 0 : return self_->active_ops_->await_suspend(
649 0 : self_->cached_awaitable_, h, ex, token);
650 : }
651 :
652 : io_result<std::span<const_buffer>>
653 110 : await_resume()
654 : {
655 : struct guard {
656 : any_buffer_source* self;
657 110 : ~guard() {
658 110 : self->active_ops_->destroy(self->cached_awaitable_);
659 110 : self->active_ops_ = nullptr;
660 110 : }
661 110 : } g{self_};
662 110 : return self_->active_ops_->await_resume(
663 195 : self_->cached_awaitable_);
664 110 : }
665 : };
666 110 : return awaitable{this, dest};
667 : }
668 :
669 : //----------------------------------------------------------
670 : // Private helpers for native ReadSource forwarding
671 :
672 : inline auto
673 48 : any_buffer_source::read_some_(
674 : std::span<mutable_buffer const> buffers)
675 : {
676 : struct awaitable
677 : {
678 : any_buffer_source* self_;
679 : std::span<mutable_buffer const> buffers_;
680 :
681 : bool
682 48 : await_ready() const noexcept
683 : {
684 48 : return false;
685 : }
686 :
687 : coro
688 48 : await_suspend(coro h, executor_ref ex, std::stop_token token)
689 : {
690 96 : self_->active_read_ops_ =
691 96 : self_->vt_->construct_read_some_awaitable(
692 48 : self_->source_,
693 48 : self_->cached_awaitable_,
694 : buffers_);
695 :
696 48 : if(self_->active_read_ops_->await_ready(
697 48 : self_->cached_awaitable_))
698 48 : return h;
699 :
700 0 : return self_->active_read_ops_->await_suspend(
701 0 : self_->cached_awaitable_, h, ex, token);
702 : }
703 :
704 : io_result<std::size_t>
705 48 : await_resume()
706 : {
707 : struct guard {
708 : any_buffer_source* self;
709 48 : ~guard() {
710 48 : self->active_read_ops_->destroy(
711 48 : self->cached_awaitable_);
712 48 : self->active_read_ops_ = nullptr;
713 48 : }
714 48 : } g{self_};
715 48 : return self_->active_read_ops_->await_resume(
716 88 : self_->cached_awaitable_);
717 48 : }
718 : };
719 48 : return awaitable{this, buffers};
720 : }
721 :
722 : inline auto
723 18 : any_buffer_source::read_(
724 : std::span<mutable_buffer const> buffers)
725 : {
726 : struct awaitable
727 : {
728 : any_buffer_source* self_;
729 : std::span<mutable_buffer const> buffers_;
730 :
731 : bool
732 18 : await_ready() const noexcept
733 : {
734 18 : return false;
735 : }
736 :
737 : coro
738 18 : await_suspend(coro h, executor_ref ex, std::stop_token token)
739 : {
740 36 : self_->active_read_ops_ =
741 36 : self_->vt_->construct_read_awaitable(
742 18 : self_->source_,
743 18 : self_->cached_awaitable_,
744 : buffers_);
745 :
746 18 : if(self_->active_read_ops_->await_ready(
747 18 : self_->cached_awaitable_))
748 18 : return h;
749 :
750 0 : return self_->active_read_ops_->await_suspend(
751 0 : self_->cached_awaitable_, h, ex, token);
752 : }
753 :
754 : io_result<std::size_t>
755 18 : await_resume()
756 : {
757 : struct guard {
758 : any_buffer_source* self;
759 18 : ~guard() {
760 18 : self->active_read_ops_->destroy(
761 18 : self->cached_awaitable_);
762 18 : self->active_read_ops_ = nullptr;
763 18 : }
764 18 : } g{self_};
765 18 : return self_->active_read_ops_->await_resume(
766 30 : self_->cached_awaitable_);
767 18 : }
768 : };
769 18 : return awaitable{this, buffers};
770 : }
771 :
772 : //----------------------------------------------------------
773 : // Public ReadSource methods
774 :
775 : template<MutableBufferSequence MB>
776 : io_task<std::size_t>
777 58 : any_buffer_source::read_some(MB buffers)
778 : {
779 : buffer_param<MB> bp(buffers);
780 : auto dest = bp.data();
781 : if(dest.empty())
782 : co_return {{}, 0};
783 :
784 : // Native ReadSource path
785 : if(vt_->construct_read_some_awaitable)
786 : co_return co_await read_some_(dest);
787 :
788 : // Synthesized path: pull + buffer_copy + consume
789 : const_buffer arr[detail::max_iovec_];
790 : auto [ec, bufs] = co_await pull(arr);
791 : if(ec)
792 : co_return {ec, 0};
793 :
794 : auto n = buffer_copy(dest, bufs);
795 : consume(n);
796 : co_return {{}, n};
797 116 : }
798 :
799 : template<MutableBufferSequence MB>
800 : io_task<std::size_t>
801 24 : any_buffer_source::read(MB buffers)
802 : {
803 : buffer_param<MB> bp(buffers);
804 : std::size_t total = 0;
805 :
806 : // Native ReadSource path
807 : if(vt_->construct_read_awaitable)
808 : {
809 : for(;;)
810 : {
811 : auto dest = bp.data();
812 : if(dest.empty())
813 : break;
814 :
815 : auto [ec, n] = co_await read_(dest);
816 : total += n;
817 : if(ec)
818 : co_return {ec, total};
819 : bp.consume(n);
820 : }
821 : co_return {{}, total};
822 : }
823 :
824 : // Synthesized path: pull + buffer_copy + consume
825 : for(;;)
826 : {
827 : auto dest = bp.data();
828 : if(dest.empty())
829 : break;
830 :
831 : const_buffer arr[detail::max_iovec_];
832 : auto [ec, bufs] = co_await pull(arr);
833 :
834 : if(ec)
835 : co_return {ec, total};
836 :
837 : auto n = buffer_copy(dest, bufs);
838 : consume(n);
839 : total += n;
840 : bp.consume(n);
841 : }
842 :
843 : co_return {{}, total};
844 48 : }
845 :
846 : //----------------------------------------------------------
847 :
848 : static_assert(BufferSource<any_buffer_source>);
849 : static_assert(ReadSource<any_buffer_source>);
850 :
851 : } // namespace capy
852 : } // namespace boost
853 :
854 : #endif
|