libs/capy/include/boost/capy/io/any_buffer_sink.hpp

89.5% Lines (229/256) 88.7% Functions (63/71) 74.3% Branches (26/35)
libs/capy/include/boost/capy/io/any_buffer_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <boost/capy/io_result.hpp>
24 #include <boost/capy/io_task.hpp>
25
26 #include <concepts>
27 #include <coroutine>
28 #include <cstddef>
29 #include <exception>
30 #include <new>
31 #include <span>
32 #include <stop_token>
33 #include <system_error>
34 #include <utility>
35
36 namespace boost {
37 namespace capy {
38
39 /** Type-erased wrapper for any BufferSink.
40
41 This class provides type erasure for any type satisfying the
42 @ref BufferSink concept, enabling runtime polymorphism for
43 buffer sink operations. It uses cached awaitable storage to achieve
44 zero steady-state allocation after construction.
45
46 The wrapper exposes two interfaces for producing data:
47 the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
48 and the @ref WriteSink interface (`write_some`, `write`,
49 `write_eof`). Choose the interface that matches how your data
50 is produced:
51
52 @par Choosing an Interface
53
54 Use the **BufferSink** interface when you are a generator that
55 produces data into externally-provided buffers. The sink owns
56 the memory; you call @ref prepare to obtain writable buffers,
57 fill them, then call @ref commit or @ref commit_eof.
58
59 Use the **WriteSink** interface when you already have buffers
60 containing the data to write:
61 - If the entire body is available up front, call
62 @ref write_eof(buffers) to send everything atomically.
63 - If data arrives incrementally, call @ref write or
64 @ref write_some in a loop, then @ref write_eof() when done.
65 Prefer `write` (complete) unless your streaming pattern
66 benefits from partial writes via `write_some`.
67
68 If the wrapped type only satisfies @ref BufferSink, the
69 @ref WriteSink operations are provided automatically.
70
71 @par Construction Modes
72
73 - **Owning**: Pass by value to transfer ownership. The wrapper
74 allocates storage and owns the sink.
75 - **Reference**: Pass a pointer to wrap without ownership. The
76 pointed-to sink must outlive this wrapper.
77
78 @par Awaitable Preallocation
79 The constructor preallocates storage for the type-erased awaitable.
80 This reserves all virtual address space at server startup
81 so memory usage can be measured up front, rather than
82 allocating piecemeal as traffic arrives.
83
84 @par Thread Safety
85 Not thread-safe. Concurrent operations on the same wrapper
86 are undefined behavior.
87
88 @par Example
89 @code
90 // Owning - takes ownership of the sink
91 any_buffer_sink abs(some_buffer_sink{args...});
92
93 // Reference - wraps without ownership
94 some_buffer_sink sink;
95 any_buffer_sink abs(&sink);
96
97 // BufferSink interface: generate into callee-owned buffers
98 mutable_buffer arr[16];
99 auto bufs = abs.prepare(arr);
100 // Write data into bufs[0..bufs.size())
101 auto [ec] = co_await abs.commit(bytes_written);
102 auto [ec2] = co_await abs.commit_eof(0);
103
104 // WriteSink interface: send caller-owned buffers
105 auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
106 auto [ec4] = co_await abs.write_eof();
107
108 // Or send everything at once
109 auto [ec5, n2] = co_await abs.write_eof(
110 make_buffer(body_data));
111 @endcode
112
113 @see any_buffer_source, BufferSink, WriteSink
114 */
115 class any_buffer_sink
116 {
117 struct vtable;
118 struct awaitable_ops;
119 struct write_awaitable_ops;
120
121 template<BufferSink S>
122 struct vtable_for_impl;
123
124 // hot-path members first for cache locality
125 void* sink_ = nullptr;
126 vtable const* vt_ = nullptr;
127 void* cached_awaitable_ = nullptr;
128 awaitable_ops const* active_ops_ = nullptr;
129 write_awaitable_ops const* active_write_ops_ = nullptr;
130 void* storage_ = nullptr;
131
132 public:
133 /** Destructor.
134
135 Destroys the owned sink (if any) and releases the cached
136 awaitable storage.
137 */
138 ~any_buffer_sink();
139
140 /** Default constructor.
141
142 Constructs an empty wrapper. Operations on a default-constructed
143 wrapper result in undefined behavior.
144 */
145 any_buffer_sink() = default;
146
147 /** Non-copyable.
148
149 The awaitable cache is per-instance and cannot be shared.
150 */
151 any_buffer_sink(any_buffer_sink const&) = delete;
152 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
153
154 /** Move constructor.
155
156 Transfers ownership of the wrapped sink (if owned) and
157 cached awaitable storage from `other`. After the move, `other` is
158 in a default-constructed state.
159
160 @param other The wrapper to move from.
161 */
162 2 any_buffer_sink(any_buffer_sink&& other) noexcept
163 2 : sink_(std::exchange(other.sink_, nullptr))
164 2 , vt_(std::exchange(other.vt_, nullptr))
165 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
166 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
167 2 , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
168 2 , storage_(std::exchange(other.storage_, nullptr))
169 {
170 2 }
171
172 /** Move assignment operator.
173
174 Destroys any owned sink and releases existing resources,
175 then transfers ownership from `other`.
176
177 @param other The wrapper to move from.
178 @return Reference to this wrapper.
179 */
180 any_buffer_sink&
181 operator=(any_buffer_sink&& other) noexcept;
182
183 /** Construct by taking ownership of a BufferSink.
184
185 Allocates storage and moves the sink into this wrapper.
186 The wrapper owns the sink and will destroy it. If `S` also
187 satisfies @ref WriteSink, native write operations are
188 forwarded through the virtual boundary.
189
190 @param s The sink to take ownership of.
191 */
192 template<BufferSink S>
193 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
194 any_buffer_sink(S s);
195
196 /** Construct by wrapping a BufferSink without ownership.
197
198 Wraps the given sink by pointer. The sink must remain
199 valid for the lifetime of this wrapper. If `S` also
200 satisfies @ref WriteSink, native write operations are
201 forwarded through the virtual boundary.
202
203 @param s Pointer to the sink to wrap.
204 */
205 template<BufferSink S>
206 any_buffer_sink(S* s);
207
208 /** Check if the wrapper contains a valid sink.
209
210 @return `true` if wrapping a sink, `false` if default-constructed
211 or moved-from.
212 */
213 bool
214 26 has_value() const noexcept
215 {
216 26 return sink_ != nullptr;
217 }
218
219 /** Check if the wrapper contains a valid sink.
220
221 @return `true` if wrapping a sink, `false` if default-constructed
222 or moved-from.
223 */
224 explicit
225 3 operator bool() const noexcept
226 {
227 3 return has_value();
228 }
229
230 /** Prepare writable buffers.
231
232 Fills the provided span with mutable buffer descriptors
233 pointing to the underlying sink's internal storage. This
234 operation is synchronous.
235
236 @param dest Span of mutable_buffer to fill.
237
238 @return A span of filled buffers.
239
240 @par Preconditions
241 The wrapper must contain a valid sink (`has_value() == true`).
242 */
243 std::span<mutable_buffer>
244 prepare(std::span<mutable_buffer> dest);
245
246 /** Commit bytes written to the prepared buffers.
247
248 Commits `n` bytes written to the buffers returned by the
249 most recent call to @ref prepare. The operation may trigger
250 underlying I/O.
251
252 @param n The number of bytes to commit.
253
254 @return An awaitable yielding `(error_code)`.
255
256 @par Preconditions
257 The wrapper must contain a valid sink (`has_value() == true`).
258 */
259 auto
260 commit(std::size_t n);
261
262 /** Commit final bytes and signal end-of-stream.
263
264 Commits `n` bytes written to the buffers returned by the
265 most recent call to @ref prepare and finalizes the sink.
266 After success, no further operations are permitted.
267
268 @param n The number of bytes to commit.
269
270 @return An awaitable yielding `(error_code)`.
271
272 @par Preconditions
273 The wrapper must contain a valid sink (`has_value() == true`).
274 */
275 auto
276 commit_eof(std::size_t n);
277
278 /** Write some data from a buffer sequence.
279
280 Writes one or more bytes from the buffer sequence to the
281 underlying sink. May consume less than the full sequence.
282
283 When the wrapped type provides native @ref WriteSink support,
284 the operation forwards directly. Otherwise it is synthesized
285 from @ref prepare and @ref commit with a buffer copy.
286
287 @param buffers The buffer sequence to write.
288
289 @return An awaitable yielding `(error_code,std::size_t)`.
290
291 @par Preconditions
292 The wrapper must contain a valid sink (`has_value() == true`).
293 */
294 template<ConstBufferSequence CB>
295 io_task<std::size_t>
296 write_some(CB buffers);
297
298 /** Write all data from a buffer sequence.
299
300 Writes all data from the buffer sequence to the underlying
301 sink. This method satisfies the @ref WriteSink concept.
302
303 When the wrapped type provides native @ref WriteSink support,
304 each window is forwarded directly. Otherwise the data is
305 copied into the sink via @ref prepare and @ref commit.
306
307 @param buffers The buffer sequence to write.
308
309 @return An awaitable yielding `(error_code,std::size_t)`.
310
311 @par Preconditions
312 The wrapper must contain a valid sink (`has_value() == true`).
313 */
314 template<ConstBufferSequence CB>
315 io_task<std::size_t>
316 write(CB buffers);
317
318 /** Atomically write data and signal end-of-stream.
319
320 Writes all data from the buffer sequence to the underlying
321 sink and then signals end-of-stream.
322
323 When the wrapped type provides native @ref WriteSink support,
324 the final window is sent atomically via the underlying
325 `write_eof(buffers)`. Otherwise the data is synthesized
326 through @ref prepare, @ref commit, and @ref commit_eof.
327
328 @param buffers The buffer sequence to write.
329
330 @return An awaitable yielding `(error_code,std::size_t)`.
331
332 @par Preconditions
333 The wrapper must contain a valid sink (`has_value() == true`).
334 */
335 template<ConstBufferSequence CB>
336 io_task<std::size_t>
337 write_eof(CB buffers);
338
339 /** Signal end-of-stream.
340
341 Indicates that no more data will be written to the sink.
342 This method satisfies the @ref WriteSink concept.
343
344 When the wrapped type provides native @ref WriteSink support,
345 the underlying `write_eof()` is called. Otherwise the
346 operation is implemented as `commit_eof(0)`.
347
348 @return An awaitable yielding `(error_code)`.
349
350 @par Preconditions
351 The wrapper must contain a valid sink (`has_value() == true`).
352 */
353 auto
354 write_eof();
355
356 protected:
357 /** Rebind to a new sink after move.
358
359 Updates the internal pointer to reference a new sink object.
360 Used by owning wrappers after move assignment when the owned
361 object has moved to a new location.
362
363 @param new_sink The new sink to bind to. Must be the same
364 type as the original sink.
365
366 @note Terminates if called with a sink of different type
367 than the original.
368 */
369 template<BufferSink S>
370 void
371 rebind(S& new_sink) noexcept
372 {
373 if(vt_ != &vtable_for_impl<S>::value)
374 std::terminate();
375 sink_ = &new_sink;
376 }
377
378 private:
379 /** Forward a partial write through the vtable.
380
381 Constructs the underlying `write_some` awaitable in
382 cached storage and returns a type-erased awaitable.
383 */
384 auto
385 write_some_(std::span<const_buffer const> buffers);
386
387 /** Forward a complete write through the vtable.
388
389 Constructs the underlying `write` awaitable in
390 cached storage and returns a type-erased awaitable.
391 */
392 auto
393 write_(std::span<const_buffer const> buffers);
394
395 /** Forward an atomic write-with-EOF through the vtable.
396
397 Constructs the underlying `write_eof(buffers)` awaitable
398 in cached storage and returns a type-erased awaitable.
399 */
400 auto
401 write_eof_buffers_(std::span<const_buffer const> buffers);
402 };
403
404 //----------------------------------------------------------
405
406 /** Type-erased ops for awaitables yielding `io_result<>`. */
407 struct any_buffer_sink::awaitable_ops
408 {
409 bool (*await_ready)(void*);
410 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
411 io_result<> (*await_resume)(void*);
412 void (*destroy)(void*) noexcept;
413 };
414
415 /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
416 struct any_buffer_sink::write_awaitable_ops
417 {
418 bool (*await_ready)(void*);
419 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
420 io_result<std::size_t> (*await_resume)(void*);
421 void (*destroy)(void*) noexcept;
422 };
423
424 struct any_buffer_sink::vtable
425 {
426 void (*destroy)(void*) noexcept;
427 std::span<mutable_buffer> (*do_prepare)(
428 void* sink,
429 std::span<mutable_buffer> dest);
430 std::size_t awaitable_size;
431 std::size_t awaitable_align;
432 awaitable_ops const* (*construct_commit_awaitable)(
433 void* sink,
434 void* storage,
435 std::size_t n);
436 awaitable_ops const* (*construct_commit_eof_awaitable)(
437 void* sink,
438 void* storage,
439 std::size_t n);
440
441 // WriteSink forwarding (null when wrapped type is BufferSink-only)
442 write_awaitable_ops const* (*construct_write_some_awaitable)(
443 void* sink,
444 void* storage,
445 std::span<const_buffer const> buffers);
446 write_awaitable_ops const* (*construct_write_awaitable)(
447 void* sink,
448 void* storage,
449 std::span<const_buffer const> buffers);
450 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
451 void* sink,
452 void* storage,
453 std::span<const_buffer const> buffers);
454 awaitable_ops const* (*construct_write_eof_awaitable)(
455 void* sink,
456 void* storage);
457 };
458
459 template<BufferSink S>
460 struct any_buffer_sink::vtable_for_impl
461 {
462 using CommitAwaitable = decltype(std::declval<S&>().commit(
463 std::size_t{}));
464 using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
465 std::size_t{}));
466
467 static void
468 18 do_destroy_impl(void* sink) noexcept
469 {
470 18 static_cast<S*>(sink)->~S();
471 18 }
472
473 static std::span<mutable_buffer>
474 126 do_prepare_impl(
475 void* sink,
476 std::span<mutable_buffer> dest)
477 {
478 126 auto& s = *static_cast<S*>(sink);
479 126 return s.prepare(dest);
480 }
481
482 static awaitable_ops const*
483 96 construct_commit_awaitable_impl(
484 void* sink,
485 void* storage,
486 std::size_t n)
487 {
488 96 auto& s = *static_cast<S*>(sink);
489 96 ::new(storage) CommitAwaitable(s.commit(n));
490
491 static constexpr awaitable_ops ops = {
492 +[](void* p) {
493 return static_cast<CommitAwaitable*>(p)->await_ready();
494 },
495 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
496 return detail::call_await_suspend(
497 static_cast<CommitAwaitable*>(p), h, ex, token);
498 },
499 +[](void* p) {
500 return static_cast<CommitAwaitable*>(p)->await_resume();
501 },
502 +[](void* p) noexcept {
503 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
504 }
505 };
506 96 return &ops;
507 }
508
509 static awaitable_ops const*
510 70 construct_commit_eof_awaitable_impl(
511 void* sink,
512 void* storage,
513 std::size_t n)
514 {
515 70 auto& s = *static_cast<S*>(sink);
516 70 ::new(storage) CommitEofAwaitable(s.commit_eof(n));
517
518 static constexpr awaitable_ops ops = {
519 +[](void* p) {
520 return static_cast<CommitEofAwaitable*>(p)->await_ready();
521 },
522 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
523 return detail::call_await_suspend(
524 static_cast<CommitEofAwaitable*>(p), h, ex, token);
525 },
526 +[](void* p) {
527 return static_cast<CommitEofAwaitable*>(p)->await_resume();
528 },
529 +[](void* p) noexcept {
530 static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
531 }
532 };
533 70 return &ops;
534 }
535
536 //------------------------------------------------------
537 // WriteSink forwarding (only instantiated when WriteSink<S>)
538
539 static write_awaitable_ops const*
540 6 construct_write_some_awaitable_impl(
541 void* sink,
542 void* storage,
543 std::span<const_buffer const> buffers)
544 requires WriteSink<S>
545 {
546 using Aw = decltype(std::declval<S&>().write_some(
547 std::span<const_buffer const>{}));
548 6 auto& s = *static_cast<S*>(sink);
549 6 ::new(storage) Aw(s.write_some(buffers));
550
551 static constexpr write_awaitable_ops ops = {
552 6 +[](void* p) {
553 6 return static_cast<Aw*>(p)->await_ready();
554 },
555 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
556 return detail::call_await_suspend(
557 static_cast<Aw*>(p), h, ex, token);
558 },
559 6 +[](void* p) {
560 6 return static_cast<Aw*>(p)->await_resume();
561 },
562 6 +[](void* p) noexcept {
563 6 static_cast<Aw*>(p)->~Aw();
564 }
565 };
566 6 return &ops;
567 }
568
569 static write_awaitable_ops const*
570 14 construct_write_awaitable_impl(
571 void* sink,
572 void* storage,
573 std::span<const_buffer const> buffers)
574 requires WriteSink<S>
575 {
576 using Aw = decltype(std::declval<S&>().write(
577 std::span<const_buffer const>{}));
578 14 auto& s = *static_cast<S*>(sink);
579 14 ::new(storage) Aw(s.write(buffers));
580
581 static constexpr write_awaitable_ops ops = {
582 14 +[](void* p) {
583 14 return static_cast<Aw*>(p)->await_ready();
584 },
585 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
586 return detail::call_await_suspend(
587 static_cast<Aw*>(p), h, ex, token);
588 },
589 14 +[](void* p) {
590 14 return static_cast<Aw*>(p)->await_resume();
591 },
592 14 +[](void* p) noexcept {
593 14 static_cast<Aw*>(p)->~Aw();
594 }
595 };
596 14 return &ops;
597 }
598
599 static write_awaitable_ops const*
600 12 construct_write_eof_buffers_awaitable_impl(
601 void* sink,
602 void* storage,
603 std::span<const_buffer const> buffers)
604 requires WriteSink<S>
605 {
606 using Aw = decltype(std::declval<S&>().write_eof(
607 std::span<const_buffer const>{}));
608 12 auto& s = *static_cast<S*>(sink);
609 12 ::new(storage) Aw(s.write_eof(buffers));
610
611 static constexpr write_awaitable_ops ops = {
612 12 +[](void* p) {
613 12 return static_cast<Aw*>(p)->await_ready();
614 },
615 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
616 return detail::call_await_suspend(
617 static_cast<Aw*>(p), h, ex, token);
618 },
619 12 +[](void* p) {
620 12 return static_cast<Aw*>(p)->await_resume();
621 },
622 12 +[](void* p) noexcept {
623 12 static_cast<Aw*>(p)->~Aw();
624 }
625 };
626 12 return &ops;
627 }
628
629 static awaitable_ops const*
630 16 construct_write_eof_awaitable_impl(
631 void* sink,
632 void* storage)
633 requires WriteSink<S>
634 {
635 using Aw = decltype(std::declval<S&>().write_eof());
636 16 auto& s = *static_cast<S*>(sink);
637 16 ::new(storage) Aw(s.write_eof());
638
639 static constexpr awaitable_ops ops = {
640 16 +[](void* p) {
641 16 return static_cast<Aw*>(p)->await_ready();
642 },
643 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
644 return detail::call_await_suspend(
645 static_cast<Aw*>(p), h, ex, token);
646 },
647 16 +[](void* p) {
648 16 return static_cast<Aw*>(p)->await_resume();
649 },
650 16 +[](void* p) noexcept {
651 16 static_cast<Aw*>(p)->~Aw();
652 }
653 };
654 16 return &ops;
655 }
656
657 //------------------------------------------------------
658
659 static consteval std::size_t
660 compute_max_size() noexcept
661 {
662 std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
663 ? sizeof(CommitAwaitable)
664 : sizeof(CommitEofAwaitable);
665 if constexpr (WriteSink<S>)
666 {
667 using WS = decltype(std::declval<S&>().write_some(
668 std::span<const_buffer const>{}));
669 using W = decltype(std::declval<S&>().write(
670 std::span<const_buffer const>{}));
671 using WEB = decltype(std::declval<S&>().write_eof(
672 std::span<const_buffer const>{}));
673 using WE = decltype(std::declval<S&>().write_eof());
674
675 if(sizeof(WS) > s) s = sizeof(WS);
676 if(sizeof(W) > s) s = sizeof(W);
677 if(sizeof(WEB) > s) s = sizeof(WEB);
678 if(sizeof(WE) > s) s = sizeof(WE);
679 }
680 return s;
681 }
682
683 static consteval std::size_t
684 compute_max_align() noexcept
685 {
686 std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
687 ? alignof(CommitAwaitable)
688 : alignof(CommitEofAwaitable);
689 if constexpr (WriteSink<S>)
690 {
691 using WS = decltype(std::declval<S&>().write_some(
692 std::span<const_buffer const>{}));
693 using W = decltype(std::declval<S&>().write(
694 std::span<const_buffer const>{}));
695 using WEB = decltype(std::declval<S&>().write_eof(
696 std::span<const_buffer const>{}));
697 using WE = decltype(std::declval<S&>().write_eof());
698
699 if(alignof(WS) > a) a = alignof(WS);
700 if(alignof(W) > a) a = alignof(W);
701 if(alignof(WEB) > a) a = alignof(WEB);
702 if(alignof(WE) > a) a = alignof(WE);
703 }
704 return a;
705 }
706
707 static consteval vtable
708 make_vtable() noexcept
709 {
710 vtable v{};
711 v.destroy = &do_destroy_impl;
712 v.do_prepare = &do_prepare_impl;
713 v.awaitable_size = compute_max_size();
714 v.awaitable_align = compute_max_align();
715 v.construct_commit_awaitable = &construct_commit_awaitable_impl;
716 v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
717 v.construct_write_some_awaitable = nullptr;
718 v.construct_write_awaitable = nullptr;
719 v.construct_write_eof_buffers_awaitable = nullptr;
720 v.construct_write_eof_awaitable = nullptr;
721
722 if constexpr (WriteSink<S>)
723 {
724 v.construct_write_some_awaitable =
725 &construct_write_some_awaitable_impl;
726 v.construct_write_awaitable =
727 &construct_write_awaitable_impl;
728 v.construct_write_eof_buffers_awaitable =
729 &construct_write_eof_buffers_awaitable_impl;
730 v.construct_write_eof_awaitable =
731 &construct_write_eof_awaitable_impl;
732 }
733 return v;
734 }
735
736 static constexpr vtable value = make_vtable();
737 };
738
739 //----------------------------------------------------------
740
741 inline
742 215 any_buffer_sink::~any_buffer_sink()
743 {
744
2/2
✓ Branch 0 taken 17 times.
✓ Branch 1 taken 198 times.
215 if(storage_)
745 {
746 17 vt_->destroy(sink_);
747 17 ::operator delete(storage_);
748 }
749
2/2
✓ Branch 0 taken 208 times.
✓ Branch 1 taken 7 times.
215 if(cached_awaitable_)
750 208 ::operator delete(cached_awaitable_);
751 215 }
752
753 inline any_buffer_sink&
754 5 any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
755 {
756
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 if(this != &other)
757 {
758
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if(storage_)
759 {
760 1 vt_->destroy(sink_);
761 1 ::operator delete(storage_);
762 }
763
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 if(cached_awaitable_)
764 2 ::operator delete(cached_awaitable_);
765 4 sink_ = std::exchange(other.sink_, nullptr);
766 4 vt_ = std::exchange(other.vt_, nullptr);
767 4 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
768 4 storage_ = std::exchange(other.storage_, nullptr);
769 4 active_ops_ = std::exchange(other.active_ops_, nullptr);
770 4 active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
771 }
772 5 return *this;
773 }
774
775 template<BufferSink S>
776 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
777 18 any_buffer_sink::any_buffer_sink(S s)
778 18 : vt_(&vtable_for_impl<S>::value)
779 {
780 struct guard {
781 any_buffer_sink* self;
782 bool committed = false;
783 ~guard() {
784 if(!committed && self->storage_) {
785 self->vt_->destroy(self->sink_);
786 ::operator delete(self->storage_);
787 self->storage_ = nullptr;
788 self->sink_ = nullptr;
789 }
790 }
791 18 } g{this};
792
793
1/1
✓ Branch 1 taken 18 times.
18 storage_ = ::operator new(sizeof(S));
794 18 sink_ = ::new(storage_) S(std::move(s));
795
796
1/1
✓ Branch 1 taken 18 times.
18 cached_awaitable_ = ::operator new(vt_->awaitable_size);
797
798 18 g.committed = true;
799 18 }
800
801 template<BufferSink S>
802 192 any_buffer_sink::any_buffer_sink(S* s)
803 192 : sink_(s)
804 192 , vt_(&vtable_for_impl<S>::value)
805 {
806 192 cached_awaitable_ = ::operator new(vt_->awaitable_size);
807 192 }
808
809 //----------------------------------------------------------
810
811 inline std::span<mutable_buffer>
812 126 any_buffer_sink::prepare(std::span<mutable_buffer> dest)
813 {
814 126 return vt_->do_prepare(sink_, dest);
815 }
816
817 inline auto
818 96 any_buffer_sink::commit(std::size_t n)
819 {
820 struct awaitable
821 {
822 any_buffer_sink* self_;
823 std::size_t n_;
824
825 bool
826 96 await_ready()
827 {
828 192 self_->active_ops_ = self_->vt_->construct_commit_awaitable(
829 96 self_->sink_,
830 96 self_->cached_awaitable_,
831 n_);
832 96 return self_->active_ops_->await_ready(self_->cached_awaitable_);
833 }
834
835 coro
836 await_suspend(coro h, executor_ref ex, std::stop_token token)
837 {
838 return self_->active_ops_->await_suspend(
839 self_->cached_awaitable_, h, ex, token);
840 }
841
842 io_result<>
843 96 await_resume()
844 {
845 struct guard {
846 any_buffer_sink* self;
847 96 ~guard() {
848 96 self->active_ops_->destroy(self->cached_awaitable_);
849 96 self->active_ops_ = nullptr;
850 96 }
851 96 } g{self_};
852 96 return self_->active_ops_->await_resume(
853
1/1
✓ Branch 1 taken 70 times.
166 self_->cached_awaitable_);
854 96 }
855 };
856 96 return awaitable{this, n};
857 }
858
859 inline auto
860 54 any_buffer_sink::commit_eof(std::size_t n)
861 {
862 struct awaitable
863 {
864 any_buffer_sink* self_;
865 std::size_t n_;
866
867 bool
868 54 await_ready()
869 {
870 108 self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
871 54 self_->sink_,
872 54 self_->cached_awaitable_,
873 n_);
874 54 return self_->active_ops_->await_ready(self_->cached_awaitable_);
875 }
876
877 coro
878 await_suspend(coro h, executor_ref ex, std::stop_token token)
879 {
880 return self_->active_ops_->await_suspend(
881 self_->cached_awaitable_, h, ex, token);
882 }
883
884 io_result<>
885 54 await_resume()
886 {
887 struct guard {
888 any_buffer_sink* self;
889 54 ~guard() {
890 54 self->active_ops_->destroy(self->cached_awaitable_);
891 54 self->active_ops_ = nullptr;
892 54 }
893 54 } g{self_};
894 54 return self_->active_ops_->await_resume(
895
1/1
✓ Branch 1 taken 38 times.
92 self_->cached_awaitable_);
896 54 }
897 };
898 54 return awaitable{this, n};
899 }
900
901 //----------------------------------------------------------
902 // Private helpers for native WriteSink forwarding
903
904 inline auto
905 6 any_buffer_sink::write_some_(
906 std::span<const_buffer const> buffers)
907 {
908 struct awaitable
909 {
910 any_buffer_sink* self_;
911 std::span<const_buffer const> buffers_;
912
913 bool
914 6 await_ready() const noexcept
915 {
916 6 return false;
917 }
918
919 coro
920 6 await_suspend(coro h, executor_ref ex, std::stop_token token)
921 {
922 12 self_->active_write_ops_ =
923 12 self_->vt_->construct_write_some_awaitable(
924 6 self_->sink_,
925 6 self_->cached_awaitable_,
926 buffers_);
927
928
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if(self_->active_write_ops_->await_ready(
929 6 self_->cached_awaitable_))
930 6 return h;
931
932 return self_->active_write_ops_->await_suspend(
933 self_->cached_awaitable_, h, ex, token);
934 }
935
936 io_result<std::size_t>
937 6 await_resume()
938 {
939 struct guard {
940 any_buffer_sink* self;
941 6 ~guard() {
942 6 self->active_write_ops_->destroy(
943 6 self->cached_awaitable_);
944 6 self->active_write_ops_ = nullptr;
945 6 }
946 6 } g{self_};
947 6 return self_->active_write_ops_->await_resume(
948
1/1
✓ Branch 1 taken 4 times.
10 self_->cached_awaitable_);
949 6 }
950 };
951 6 return awaitable{this, buffers};
952 }
953
954 inline auto
955 14 any_buffer_sink::write_(
956 std::span<const_buffer const> buffers)
957 {
958 struct awaitable
959 {
960 any_buffer_sink* self_;
961 std::span<const_buffer const> buffers_;
962
963 bool
964 14 await_ready() const noexcept
965 {
966 14 return false;
967 }
968
969 coro
970 14 await_suspend(coro h, executor_ref ex, std::stop_token token)
971 {
972 28 self_->active_write_ops_ =
973 28 self_->vt_->construct_write_awaitable(
974 14 self_->sink_,
975 14 self_->cached_awaitable_,
976 buffers_);
977
978
1/2
✓ Branch 0 taken 14 times.
✗ Branch 1 not taken.
14 if(self_->active_write_ops_->await_ready(
979 14 self_->cached_awaitable_))
980 14 return h;
981
982 return self_->active_write_ops_->await_suspend(
983 self_->cached_awaitable_, h, ex, token);
984 }
985
986 io_result<std::size_t>
987 14 await_resume()
988 {
989 struct guard {
990 any_buffer_sink* self;
991 14 ~guard() {
992 14 self->active_write_ops_->destroy(
993 14 self->cached_awaitable_);
994 14 self->active_write_ops_ = nullptr;
995 14 }
996 14 } g{self_};
997 14 return self_->active_write_ops_->await_resume(
998
1/1
✓ Branch 1 taken 10 times.
24 self_->cached_awaitable_);
999 14 }
1000 };
1001 14 return awaitable{this, buffers};
1002 }
1003
1004 inline auto
1005 12 any_buffer_sink::write_eof_buffers_(
1006 std::span<const_buffer const> buffers)
1007 {
1008 struct awaitable
1009 {
1010 any_buffer_sink* self_;
1011 std::span<const_buffer const> buffers_;
1012
1013 bool
1014 12 await_ready() const noexcept
1015 {
1016 12 return false;
1017 }
1018
1019 coro
1020 12 await_suspend(coro h, executor_ref ex, std::stop_token token)
1021 {
1022 24 self_->active_write_ops_ =
1023 24 self_->vt_->construct_write_eof_buffers_awaitable(
1024 12 self_->sink_,
1025 12 self_->cached_awaitable_,
1026 buffers_);
1027
1028
1/2
✓ Branch 0 taken 12 times.
✗ Branch 1 not taken.
12 if(self_->active_write_ops_->await_ready(
1029 12 self_->cached_awaitable_))
1030 12 return h;
1031
1032 return self_->active_write_ops_->await_suspend(
1033 self_->cached_awaitable_, h, ex, token);
1034 }
1035
1036 io_result<std::size_t>
1037 12 await_resume()
1038 {
1039 struct guard {
1040 any_buffer_sink* self;
1041 12 ~guard() {
1042 12 self->active_write_ops_->destroy(
1043 12 self->cached_awaitable_);
1044 12 self->active_write_ops_ = nullptr;
1045 12 }
1046 12 } g{self_};
1047 12 return self_->active_write_ops_->await_resume(
1048
1/1
✓ Branch 1 taken 8 times.
20 self_->cached_awaitable_);
1049 12 }
1050 };
1051 12 return awaitable{this, buffers};
1052 }
1053
1054 //----------------------------------------------------------
1055 // Public WriteSink methods
1056
1057 template<ConstBufferSequence CB>
1058 io_task<std::size_t>
1059
1/1
✓ Branch 1 taken 22 times.
22 any_buffer_sink::write_some(CB buffers)
1060 {
1061 buffer_param<CB> bp(buffers);
1062 auto src = bp.data();
1063 if(src.empty())
1064 co_return {{}, 0};
1065
1066 // Native WriteSink path
1067 if(vt_->construct_write_some_awaitable)
1068 co_return co_await write_some_(src);
1069
1070 // Synthesized path: prepare + buffer_copy + commit
1071 mutable_buffer arr[detail::max_iovec_];
1072 auto dst_bufs = prepare(arr);
1073 if(dst_bufs.empty())
1074 {
1075 auto [ec] = co_await commit(0);
1076 if(ec)
1077 co_return {ec, 0};
1078 dst_bufs = prepare(arr);
1079 if(dst_bufs.empty())
1080 co_return {{}, 0};
1081 }
1082
1083 auto n = buffer_copy(dst_bufs, src);
1084 auto [ec] = co_await commit(n);
1085 if(ec)
1086 co_return {ec, 0};
1087 co_return {{}, n};
1088 44 }
1089
1090 template<ConstBufferSequence CB>
1091 io_task<std::size_t>
1092
1/1
✓ Branch 1 taken 38 times.
38 any_buffer_sink::write(CB buffers)
1093 {
1094 buffer_param<CB> bp(buffers);
1095 std::size_t total = 0;
1096
1097 // Native WriteSink path
1098 if(vt_->construct_write_awaitable)
1099 {
1100 for(;;)
1101 {
1102 auto bufs = bp.data();
1103 if(bufs.empty())
1104 break;
1105
1106 auto [ec, n] = co_await write_(bufs);
1107 total += n;
1108 if(ec)
1109 co_return {ec, total};
1110 bp.consume(n);
1111 }
1112 co_return {{}, total};
1113 }
1114
1115 // Synthesized path: prepare + buffer_copy + commit
1116 for(;;)
1117 {
1118 auto src = bp.data();
1119 if(src.empty())
1120 break;
1121
1122 mutable_buffer arr[detail::max_iovec_];
1123 auto dst_bufs = prepare(arr);
1124 if(dst_bufs.empty())
1125 {
1126 auto [ec] = co_await commit(0);
1127 if(ec)
1128 co_return {ec, total};
1129 continue;
1130 }
1131
1132 auto n = buffer_copy(dst_bufs, src);
1133 auto [ec] = co_await commit(n);
1134 if(ec)
1135 co_return {ec, total};
1136 bp.consume(n);
1137 total += n;
1138 }
1139
1140 co_return {{}, total};
1141 76 }
1142
1143 inline auto
1144 32 any_buffer_sink::write_eof()
1145 {
1146 struct awaitable
1147 {
1148 any_buffer_sink* self_;
1149
1150 bool
1151 32 await_ready()
1152 {
1153
2/2
✓ Branch 0 taken 16 times.
✓ Branch 1 taken 16 times.
32 if(self_->vt_->construct_write_eof_awaitable)
1154 {
1155 // Native WriteSink: forward to underlying write_eof()
1156 32 self_->active_ops_ =
1157 16 self_->vt_->construct_write_eof_awaitable(
1158 16 self_->sink_,
1159 16 self_->cached_awaitable_);
1160 }
1161 else
1162 {
1163 // Synthesized: commit_eof(0)
1164 32 self_->active_ops_ =
1165 16 self_->vt_->construct_commit_eof_awaitable(
1166 16 self_->sink_,
1167 16 self_->cached_awaitable_,
1168 0);
1169 }
1170 64 return self_->active_ops_->await_ready(
1171 32 self_->cached_awaitable_);
1172 }
1173
1174 coro
1175 await_suspend(coro h, executor_ref ex, std::stop_token token)
1176 {
1177 return self_->active_ops_->await_suspend(
1178 self_->cached_awaitable_, h, ex, token);
1179 }
1180
1181 io_result<>
1182 32 await_resume()
1183 {
1184 struct guard {
1185 any_buffer_sink* self;
1186 32 ~guard() {
1187 32 self->active_ops_->destroy(self->cached_awaitable_);
1188 32 self->active_ops_ = nullptr;
1189 32 }
1190 32 } g{self_};
1191 32 return self_->active_ops_->await_resume(
1192
1/1
✓ Branch 1 taken 22 times.
54 self_->cached_awaitable_);
1193 32 }
1194 };
1195 32 return awaitable{this};
1196 }
1197
1198 template<ConstBufferSequence CB>
1199 io_task<std::size_t>
1200
1/1
✓ Branch 1 taken 40 times.
40 any_buffer_sink::write_eof(CB buffers)
1201 {
1202 // Native WriteSink path
1203 if(vt_->construct_write_eof_buffers_awaitable)
1204 {
1205 const_buffer_param<CB> bp(buffers);
1206 std::size_t total = 0;
1207
1208 for(;;)
1209 {
1210 auto bufs = bp.data();
1211 if(bufs.empty())
1212 {
1213 auto [ec] = co_await write_eof();
1214 co_return {ec, total};
1215 }
1216
1217 if(!bp.more())
1218 {
1219 // Last window: send atomically with EOF
1220 auto [ec, n] = co_await write_eof_buffers_(bufs);
1221 total += n;
1222 co_return {ec, total};
1223 }
1224
1225 auto [ec, n] = co_await write_(bufs);
1226 total += n;
1227 if(ec)
1228 co_return {ec, total};
1229 bp.consume(n);
1230 }
1231 }
1232
1233 // Synthesized path: prepare + buffer_copy + commit + commit_eof
1234 buffer_param<CB> bp(buffers);
1235 std::size_t total = 0;
1236
1237 for(;;)
1238 {
1239 auto src = bp.data();
1240 if(src.empty())
1241 break;
1242
1243 mutable_buffer arr[detail::max_iovec_];
1244 auto dst_bufs = prepare(arr);
1245 if(dst_bufs.empty())
1246 {
1247 auto [ec] = co_await commit(0);
1248 if(ec)
1249 co_return {ec, total};
1250 continue;
1251 }
1252
1253 auto n = buffer_copy(dst_bufs, src);
1254 auto [ec] = co_await commit(n);
1255 if(ec)
1256 co_return {ec, total};
1257 bp.consume(n);
1258 total += n;
1259 }
1260
1261 auto [ec] = co_await commit_eof(0);
1262 if(ec)
1263 co_return {ec, total};
1264
1265 co_return {{}, total};
1266 80 }
1267
1268 //----------------------------------------------------------
1269
1270 static_assert(BufferSink<any_buffer_sink>);
1271 static_assert(WriteSink<any_buffer_sink>);
1272
1273 } // namespace capy
1274 } // namespace boost
1275
1276 #endif
1277