libs/capy/include/boost/capy/io/any_write_sink.hpp

93.2% Lines (164/176) 93.3% Functions (42/45) 73.3% Branches (33/45)
libs/capy/include/boost/capy/io/any_write_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/write_sink.hpp>
20 #include <boost/capy/coro.hpp>
21 #include <boost/capy/ex/executor_ref.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any WriteSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref WriteSink concept, enabling runtime polymorphism for
42 sink write operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper supports two construction modes:
46 - **Owning**: Pass by value to transfer ownership. The wrapper
47 allocates storage and owns the sink.
48 - **Reference**: Pass a pointer to wrap without ownership. The
49 pointed-to sink must outlive this wrapper.
50
51 @par Awaitable Preallocation
52 The constructor preallocates storage for the type-erased awaitable.
53 This reserves all virtual address space at server startup
54 so memory usage can be measured up front, rather than
55 allocating piecemeal as traffic arrives.
56
57 @par Immediate Completion
58 Operations complete immediately without suspending when the
59 buffer sequence is empty, or when the underlying sink's
60 awaitable reports readiness via `await_ready`.
61
62 @par Thread Safety
63 Not thread-safe. Concurrent operations on the same wrapper
64 are undefined behavior.
65
66 @par Example
67 @code
68 // Owning - takes ownership of the sink
69 any_write_sink ws(some_sink{args...});
70
71 // Reference - wraps without ownership
72 some_sink sink;
73 any_write_sink ws(&sink);
74
75 const_buffer buf(data, size);
76 auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 auto [ec2] = co_await ws.write_eof();
78 @endcode
79
80 @see any_write_stream, WriteSink
81 */
82 class any_write_sink
83 {
84 struct vtable;
85 struct write_awaitable_ops;
86 struct eof_awaitable_ops;
87
88 template<WriteSink S>
89 struct vtable_for_impl;
90
91 void* sink_ = nullptr;
92 vtable const* vt_ = nullptr;
93 void* cached_awaitable_ = nullptr;
94 void* storage_ = nullptr;
95 write_awaitable_ops const* active_write_ops_ = nullptr;
96 eof_awaitable_ops const* active_eof_ops_ = nullptr;
97
98 public:
99 /** Destructor.
100
101 Destroys the owned sink (if any) and releases the cached
102 awaitable storage.
103 */
104 ~any_write_sink();
105
106 /** Default constructor.
107
108 Constructs an empty wrapper. Operations on a default-constructed
109 wrapper result in undefined behavior.
110 */
111 any_write_sink() = default;
112
113 /** Non-copyable.
114
115 The awaitable cache is per-instance and cannot be shared.
116 */
117 any_write_sink(any_write_sink const&) = delete;
118 any_write_sink& operator=(any_write_sink const&) = delete;
119
120 /** Move constructor.
121
122 Transfers ownership of the wrapped sink (if owned) and
123 cached awaitable storage from `other`. After the move, `other` is
124 in a default-constructed state.
125
126 @param other The wrapper to move from.
127 */
128 1 any_write_sink(any_write_sink&& other) noexcept
129 1 : sink_(std::exchange(other.sink_, nullptr))
130 1 , vt_(std::exchange(other.vt_, nullptr))
131 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1 , storage_(std::exchange(other.storage_, nullptr))
133 1 , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1 , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 {
136 1 }
137
138 /** Move assignment operator.
139
140 Destroys any owned sink and releases existing resources,
141 then transfers ownership from `other`.
142
143 @param other The wrapper to move from.
144 @return Reference to this wrapper.
145 */
146 any_write_sink&
147 operator=(any_write_sink&& other) noexcept;
148
149 /** Construct by taking ownership of a WriteSink.
150
151 Allocates storage and moves the sink into this wrapper.
152 The wrapper owns the sink and will destroy it.
153
154 @param s The sink to take ownership of.
155 */
156 template<WriteSink S>
157 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 any_write_sink(S s);
159
160 /** Construct by wrapping a WriteSink without ownership.
161
162 Wraps the given sink by pointer. The sink must remain
163 valid for the lifetime of this wrapper.
164
165 @param s Pointer to the sink to wrap.
166 */
167 template<WriteSink S>
168 any_write_sink(S* s);
169
170 /** Check if the wrapper contains a valid sink.
171
172 @return `true` if wrapping a sink, `false` if default-constructed
173 or moved-from.
174 */
175 bool
176 15 has_value() const noexcept
177 {
178 15 return sink_ != nullptr;
179 }
180
181 /** Check if the wrapper contains a valid sink.
182
183 @return `true` if wrapping a sink, `false` if default-constructed
184 or moved-from.
185 */
186 explicit
187 2 operator bool() const noexcept
188 {
189 2 return has_value();
190 }
191
192 /** Initiate a partial write operation.
193
194 Writes one or more bytes from the provided buffer sequence.
195 May consume less than the full sequence.
196
197 @param buffers The buffer sequence containing data to write.
198
199 @return An awaitable yielding `(error_code,std::size_t)`.
200
201 @par Immediate Completion
202 The operation completes immediately without suspending
203 the calling coroutine when:
204 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205 @li The underlying sink's awaitable reports immediate
206 readiness via `await_ready`.
207
208 @note This is a partial operation and may not process the
209 entire buffer sequence. Use @ref write for guaranteed
210 complete transfer.
211
212 @par Preconditions
213 The wrapper must contain a valid sink (`has_value() == true`).
214 */
215 template<ConstBufferSequence CB>
216 auto
217 write_some(CB buffers);
218
219 /** Initiate a complete write operation.
220
221 Writes data from the provided buffer sequence. The operation
222 completes when all bytes have been consumed, or an error
223 occurs. Forwards to the underlying sink's `write` operation,
224 windowed through @ref buffer_param when the sequence exceeds
225 the per-call buffer limit.
226
227 @param buffers The buffer sequence containing data to write.
228
229 @return An awaitable yielding `(error_code,std::size_t)`.
230
231 @par Immediate Completion
232 The operation completes immediately without suspending
233 the calling coroutine when:
234 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235 @li Every underlying `write` call completes
236 immediately (the wrapped sink reports readiness
237 via `await_ready` on each iteration).
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 template<ConstBufferSequence CB>
243 io_task<std::size_t>
244 write(CB buffers);
245
246 /** Atomically write data and signal end-of-stream.
247
248 Writes all data from the buffer sequence and then signals
249 end-of-stream. The implementation decides how to partition
250 the data across calls to the underlying sink's @ref write
251 and `write_eof`. When the caller's buffer sequence is
252 non-empty, the final call to the underlying sink is always
253 `write_eof` with a non-empty buffer sequence. When the
254 caller's buffer sequence is empty, only `write_eof()` with
255 no data is called.
256
257 @param buffers The buffer sequence containing data to write.
258
259 @return An awaitable yielding `(error_code,std::size_t)`.
260
261 @par Immediate Completion
262 The operation completes immediately without suspending
263 the calling coroutine when:
264 @li The buffer sequence is empty. Only the @ref write_eof()
265 call is performed.
266 @li All underlying operations complete immediately (the
267 wrapped sink reports readiness via `await_ready`).
268
269 @par Preconditions
270 The wrapper must contain a valid sink (`has_value() == true`).
271 */
272 template<ConstBufferSequence CB>
273 io_task<std::size_t>
274 write_eof(CB buffers);
275
276 /** Signal end of data.
277
278 Indicates that no more data will be written to the sink.
279 The operation completes when the sink is finalized, or
280 an error occurs.
281
282 @return An awaitable yielding `(error_code)`.
283
284 @par Immediate Completion
285 The operation completes immediately without suspending
286 the calling coroutine when the underlying sink's awaitable
287 reports immediate readiness via `await_ready`.
288
289 @par Preconditions
290 The wrapper must contain a valid sink (`has_value() == true`).
291 */
292 auto
293 write_eof();
294
295 protected:
296 /** Rebind to a new sink after move.
297
298 Updates the internal pointer to reference a new sink object.
299 Used by owning wrappers after move assignment when the owned
300 object has moved to a new location.
301
302 @param new_sink The new sink to bind to. Must be the same
303 type as the original sink.
304
305 @note Terminates if called with a sink of different type
306 than the original.
307 */
308 template<WriteSink S>
309 void
310 rebind(S& new_sink) noexcept
311 {
312 if(vt_ != &vtable_for_impl<S>::value)
313 std::terminate();
314 sink_ = &new_sink;
315 }
316
317 private:
318 auto
319 write_some_(std::span<const_buffer const> buffers);
320
321 auto
322 write_(std::span<const_buffer const> buffers);
323
324 auto
325 write_eof_buffers_(std::span<const_buffer const> buffers);
326 };
327
328 //----------------------------------------------------------
329
330 struct any_write_sink::write_awaitable_ops
331 {
332 bool (*await_ready)(void*);
333 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
334 io_result<std::size_t> (*await_resume)(void*);
335 void (*destroy)(void*) noexcept;
336 };
337
338 struct any_write_sink::eof_awaitable_ops
339 {
340 bool (*await_ready)(void*);
341 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
342 io_result<> (*await_resume)(void*);
343 void (*destroy)(void*) noexcept;
344 };
345
346 struct any_write_sink::vtable
347 {
348 write_awaitable_ops const* (*construct_write_some_awaitable)(
349 void* sink,
350 void* storage,
351 std::span<const_buffer const> buffers);
352 write_awaitable_ops const* (*construct_write_awaitable)(
353 void* sink,
354 void* storage,
355 std::span<const_buffer const> buffers);
356 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
357 void* sink,
358 void* storage,
359 std::span<const_buffer const> buffers);
360 eof_awaitable_ops const* (*construct_eof_awaitable)(
361 void* sink,
362 void* storage);
363 std::size_t awaitable_size;
364 std::size_t awaitable_align;
365 void (*destroy)(void*) noexcept;
366 };
367
368 template<WriteSink S>
369 struct any_write_sink::vtable_for_impl
370 {
371 using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
372 std::span<const_buffer const>{}));
373 using WriteAwaitable = decltype(std::declval<S&>().write(
374 std::span<const_buffer const>{}));
375 using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
376 std::span<const_buffer const>{}));
377 using EofAwaitable = decltype(std::declval<S&>().write_eof());
378
379 static void
380 6 do_destroy_impl(void* sink) noexcept
381 {
382 6 static_cast<S*>(sink)->~S();
383 6 }
384
385 static write_awaitable_ops const*
386 40 construct_write_some_awaitable_impl(
387 void* sink,
388 void* storage,
389 std::span<const_buffer const> buffers)
390 {
391 40 auto& s = *static_cast<S*>(sink);
392 40 ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
393
394 static constexpr write_awaitable_ops ops = {
395 +[](void* p) {
396 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
397 },
398 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
399 return detail::call_await_suspend(
400 static_cast<WriteSomeAwaitable*>(p), h, ex, token);
401 },
402 +[](void* p) {
403 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
404 },
405 +[](void* p) noexcept {
406 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
407 }
408 };
409 40 return &ops;
410 }
411
412 static write_awaitable_ops const*
413 78 construct_write_awaitable_impl(
414 void* sink,
415 void* storage,
416 std::span<const_buffer const> buffers)
417 {
418 78 auto& s = *static_cast<S*>(sink);
419 78 ::new(storage) WriteAwaitable(s.write(buffers));
420
421 static constexpr write_awaitable_ops ops = {
422 +[](void* p) {
423 return static_cast<WriteAwaitable*>(p)->await_ready();
424 },
425 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
426 return detail::call_await_suspend(
427 static_cast<WriteAwaitable*>(p), h, ex, token);
428 },
429 +[](void* p) {
430 return static_cast<WriteAwaitable*>(p)->await_resume();
431 },
432 +[](void* p) noexcept {
433 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
434 }
435 };
436 78 return &ops;
437 }
438
439 static write_awaitable_ops const*
440 16 construct_write_eof_buffers_awaitable_impl(
441 void* sink,
442 void* storage,
443 std::span<const_buffer const> buffers)
444 {
445 16 auto& s = *static_cast<S*>(sink);
446 16 ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
447
448 static constexpr write_awaitable_ops ops = {
449 +[](void* p) {
450 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
451 },
452 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
453 return detail::call_await_suspend(
454 static_cast<WriteEofBuffersAwaitable*>(p), h, ex, token);
455 },
456 +[](void* p) {
457 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
458 },
459 +[](void* p) noexcept {
460 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
461 }
462 };
463 16 return &ops;
464 }
465
466 static eof_awaitable_ops const*
467 17 construct_eof_awaitable_impl(
468 void* sink,
469 void* storage)
470 {
471 17 auto& s = *static_cast<S*>(sink);
472 17 ::new(storage) EofAwaitable(s.write_eof());
473
474 static constexpr eof_awaitable_ops ops = {
475 +[](void* p) {
476 return static_cast<EofAwaitable*>(p)->await_ready();
477 },
478 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
479 return detail::call_await_suspend(
480 static_cast<EofAwaitable*>(p), h, ex, token);
481 },
482 +[](void* p) {
483 return static_cast<EofAwaitable*>(p)->await_resume();
484 },
485 +[](void* p) noexcept {
486 static_cast<EofAwaitable*>(p)->~EofAwaitable();
487 }
488 };
489 17 return &ops;
490 }
491
492 static constexpr std::size_t max4(
493 std::size_t a, std::size_t b,
494 std::size_t c, std::size_t d) noexcept
495 {
496 std::size_t ab = a > b ? a : b;
497 std::size_t cd = c > d ? c : d;
498 return ab > cd ? ab : cd;
499 }
500
501 static constexpr std::size_t max_awaitable_size =
502 max4(sizeof(WriteSomeAwaitable),
503 sizeof(WriteAwaitable),
504 sizeof(WriteEofBuffersAwaitable),
505 sizeof(EofAwaitable));
506
507 static constexpr std::size_t max_awaitable_align =
508 max4(alignof(WriteSomeAwaitable),
509 alignof(WriteAwaitable),
510 alignof(WriteEofBuffersAwaitable),
511 alignof(EofAwaitable));
512
513 static constexpr vtable value = {
514 &construct_write_some_awaitable_impl,
515 &construct_write_awaitable_impl,
516 &construct_write_eof_buffers_awaitable_impl,
517 &construct_eof_awaitable_impl,
518 max_awaitable_size,
519 max_awaitable_align,
520 &do_destroy_impl
521 };
522 };
523
524 //----------------------------------------------------------
525
526 inline
527 129 any_write_sink::~any_write_sink()
528 {
529
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 123 times.
129 if(storage_)
530 {
531 6 vt_->destroy(sink_);
532 6 ::operator delete(storage_);
533 }
534
2/2
✓ Branch 0 taken 124 times.
✓ Branch 1 taken 5 times.
129 if(cached_awaitable_)
535 {
536
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 123 times.
124 if(active_write_ops_)
537 1 active_write_ops_->destroy(cached_awaitable_);
538
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 122 times.
123 else if(active_eof_ops_)
539 1 active_eof_ops_->destroy(cached_awaitable_);
540 124 ::operator delete(cached_awaitable_);
541 }
542 129 }
543
544 inline any_write_sink&
545 2 any_write_sink::operator=(any_write_sink&& other) noexcept
546 {
547
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if(this != &other)
548 {
549
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if(storage_)
550 {
551 vt_->destroy(sink_);
552 ::operator delete(storage_);
553 }
554
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if(cached_awaitable_)
555 {
556
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(active_write_ops_)
557 1 active_write_ops_->destroy(cached_awaitable_);
558 else if(active_eof_ops_)
559 active_eof_ops_->destroy(cached_awaitable_);
560 1 ::operator delete(cached_awaitable_);
561 }
562 2 sink_ = std::exchange(other.sink_, nullptr);
563 2 vt_ = std::exchange(other.vt_, nullptr);
564 2 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
565 2 storage_ = std::exchange(other.storage_, nullptr);
566 2 active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
567 2 active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
568 }
569 2 return *this;
570 }
571
572 template<WriteSink S>
573 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
574 6 any_write_sink::any_write_sink(S s)
575 6 : vt_(&vtable_for_impl<S>::value)
576 {
577 struct guard {
578 any_write_sink* self;
579 bool committed = false;
580 6 ~guard() {
581
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 6 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
6 if(!committed && self->storage_) {
582 self->vt_->destroy(self->sink_);
583 ::operator delete(self->storage_);
584 self->storage_ = nullptr;
585 self->sink_ = nullptr;
586 }
587 6 }
588 6 } g{this};
589
590
1/1
✓ Branch 1 taken 6 times.
6 storage_ = ::operator new(sizeof(S));
591 6 sink_ = ::new(storage_) S(std::move(s));
592
593 // Preallocate the awaitable storage (sized for max of write/eof)
594
1/1
✓ Branch 1 taken 6 times.
6 cached_awaitable_ = ::operator new(vt_->awaitable_size);
595
596 6 g.committed = true;
597 6 }
598
599 template<WriteSink S>
600 119 any_write_sink::any_write_sink(S* s)
601 119 : sink_(s)
602 119 , vt_(&vtable_for_impl<S>::value)
603 {
604 // Preallocate the awaitable storage (sized for max of write/eof)
605 119 cached_awaitable_ = ::operator new(vt_->awaitable_size);
606 119 }
607
608 //----------------------------------------------------------
609
610 inline auto
611 any_write_sink::write_some_(
612 std::span<const_buffer const> buffers)
613 {
614 struct awaitable
615 {
616 any_write_sink* self_;
617 std::span<const_buffer const> buffers_;
618
619 bool
620 await_ready() const noexcept
621 {
622 return false;
623 }
624
625 coro
626 await_suspend(coro h, executor_ref ex, std::stop_token token)
627 {
628 self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
629 self_->sink_,
630 self_->cached_awaitable_,
631 buffers_);
632
633 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
634 return h;
635
636 return self_->active_write_ops_->await_suspend(
637 self_->cached_awaitable_, h, ex, token);
638 }
639
640 io_result<std::size_t>
641 await_resume()
642 {
643 struct guard {
644 any_write_sink* self;
645 ~guard() {
646 self->active_write_ops_->destroy(self->cached_awaitable_);
647 self->active_write_ops_ = nullptr;
648 }
649 } g{self_};
650 return self_->active_write_ops_->await_resume(
651 self_->cached_awaitable_);
652 }
653 };
654 return awaitable{this, buffers};
655 }
656
657 inline auto
658 78 any_write_sink::write_(
659 std::span<const_buffer const> buffers)
660 {
661 struct awaitable
662 {
663 any_write_sink* self_;
664 std::span<const_buffer const> buffers_;
665
666 bool
667 78 await_ready() const noexcept
668 {
669 78 return false;
670 }
671
672 coro
673 78 await_suspend(coro h, executor_ref ex, std::stop_token token)
674 {
675 156 self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
676 78 self_->sink_,
677 78 self_->cached_awaitable_,
678 buffers_);
679
680
1/2
✓ Branch 1 taken 78 times.
✗ Branch 2 not taken.
78 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
681 78 return h;
682
683 return self_->active_write_ops_->await_suspend(
684 self_->cached_awaitable_, h, ex, token);
685 }
686
687 io_result<std::size_t>
688 78 await_resume()
689 {
690 struct guard {
691 any_write_sink* self;
692 78 ~guard() {
693 78 self->active_write_ops_->destroy(self->cached_awaitable_);
694 78 self->active_write_ops_ = nullptr;
695 78 }
696 78 } g{self_};
697 78 return self_->active_write_ops_->await_resume(
698
1/1
✓ Branch 1 taken 57 times.
135 self_->cached_awaitable_);
699 78 }
700 };
701 78 return awaitable{this, buffers};
702 }
703
704 inline auto
705 17 any_write_sink::write_eof()
706 {
707 struct awaitable
708 {
709 any_write_sink* self_;
710
711 bool
712 17 await_ready() const noexcept
713 {
714 17 return false;
715 }
716
717 coro
718 17 await_suspend(coro h, executor_ref ex, std::stop_token token)
719 {
720 // Construct the underlying awaitable into cached storage
721 34 self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
722 17 self_->sink_,
723 17 self_->cached_awaitable_);
724
725 // Check if underlying is immediately ready
726
2/2
✓ Branch 1 taken 16 times.
✓ Branch 2 taken 1 time.
17 if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
727 16 return h;
728
729 // Forward to underlying awaitable
730 2 return self_->active_eof_ops_->await_suspend(
731
1/1
✓ Branch 1 taken 1 time.
1 self_->cached_awaitable_, h, ex, token);
732 }
733
734 io_result<>
735 16 await_resume()
736 {
737 struct guard {
738 any_write_sink* self;
739 16 ~guard() {
740 16 self->active_eof_ops_->destroy(self->cached_awaitable_);
741 16 self->active_eof_ops_ = nullptr;
742 16 }
743 16 } g{self_};
744 16 return self_->active_eof_ops_->await_resume(
745
1/1
✓ Branch 1 taken 11 times.
27 self_->cached_awaitable_);
746 16 }
747 };
748 17 return awaitable{this};
749 }
750
751 inline auto
752 16 any_write_sink::write_eof_buffers_(
753 std::span<const_buffer const> buffers)
754 {
755 struct awaitable
756 {
757 any_write_sink* self_;
758 std::span<const_buffer const> buffers_;
759
760 bool
761 16 await_ready() const noexcept
762 {
763 16 return false;
764 }
765
766 coro
767 16 await_suspend(coro h, executor_ref ex, std::stop_token token)
768 {
769 32 self_->active_write_ops_ =
770 32 self_->vt_->construct_write_eof_buffers_awaitable(
771 16 self_->sink_,
772 16 self_->cached_awaitable_,
773 buffers_);
774
775
1/2
✓ Branch 1 taken 16 times.
✗ Branch 2 not taken.
16 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
776 16 return h;
777
778 return self_->active_write_ops_->await_suspend(
779 self_->cached_awaitable_, h, ex, token);
780 }
781
782 io_result<std::size_t>
783 16 await_resume()
784 {
785 struct guard {
786 any_write_sink* self;
787 16 ~guard() {
788 16 self->active_write_ops_->destroy(self->cached_awaitable_);
789 16 self->active_write_ops_ = nullptr;
790 16 }
791 16 } g{self_};
792 16 return self_->active_write_ops_->await_resume(
793
1/1
✓ Branch 1 taken 11 times.
27 self_->cached_awaitable_);
794 16 }
795 };
796 16 return awaitable{this, buffers};
797 }
798
799 template<ConstBufferSequence CB>
800 auto
801 42 any_write_sink::write_some(CB buffers)
802 {
803 struct awaitable
804 {
805 any_write_sink* self_;
806 const_buffer_array<detail::max_iovec_> ba_;
807
808 42 awaitable(
809 any_write_sink* self,
810 CB const& buffers)
811 42 : self_(self)
812 42 , ba_(buffers)
813 {
814 42 }
815
816 bool
817 42 await_ready() const noexcept
818 {
819 42 return ba_.to_span().empty();
820 }
821
822 coro
823 40 await_suspend(coro h, executor_ref ex, std::stop_token token)
824 {
825 40 self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
826 40 self_->sink_,
827
1/1
✓ Branch 1 taken 40 times.
40 self_->cached_awaitable_,
828 40 ba_.to_span());
829
830
2/2
✓ Branch 1 taken 38 times.
✓ Branch 2 taken 2 times.
40 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
831 38 return h;
832
833 4 return self_->active_write_ops_->await_suspend(
834
1/1
✓ Branch 1 taken 2 times.
2 self_->cached_awaitable_, h, ex, token);
835 }
836
837 io_result<std::size_t>
838 40 await_resume()
839 {
840
2/2
✓ Branch 2 taken 2 times.
✓ Branch 3 taken 38 times.
40 if(ba_.to_span().empty())
841 2 return {{}, 0};
842
843 struct guard {
844 any_write_sink* self;
845 38 ~guard() {
846 38 self->active_write_ops_->destroy(self->cached_awaitable_);
847 38 self->active_write_ops_ = nullptr;
848 38 }
849 38 } g{self_};
850 38 return self_->active_write_ops_->await_resume(
851
1/1
✓ Branch 1 taken 28 times.
38 self_->cached_awaitable_);
852 38 }
853 };
854 42 return awaitable{this, buffers};
855 }
856
857 template<ConstBufferSequence CB>
858 io_task<std::size_t>
859
1/1
✓ Branch 1 taken 68 times.
68 any_write_sink::write(CB buffers)
860 {
861 buffer_param<CB> bp(buffers);
862 std::size_t total = 0;
863
864 for(;;)
865 {
866 auto bufs = bp.data();
867 if(bufs.empty())
868 break;
869
870 auto [ec, n] = co_await write_(bufs);
871 total += n;
872 if(ec)
873 co_return {ec, total};
874 bp.consume(n);
875 }
876
877 co_return {{}, total};
878 136 }
879
880 template<ConstBufferSequence CB>
881 io_task<std::size_t>
882
1/1
✓ Branch 1 taken 26 times.
26 any_write_sink::write_eof(CB buffers)
883 {
884 const_buffer_param<CB> bp(buffers);
885 std::size_t total = 0;
886
887 for(;;)
888 {
889 auto bufs = bp.data();
890 if(bufs.empty())
891 {
892 auto [ec] = co_await write_eof();
893 co_return {ec, total};
894 }
895
896 if(! bp.more())
897 {
898 // Last window — send atomically with EOF
899 auto [ec, n] = co_await write_eof_buffers_(bufs);
900 total += n;
901 co_return {ec, total};
902 }
903
904 auto [ec, n] = co_await write_(bufs);
905 total += n;
906 if(ec)
907 co_return {ec, total};
908 bp.consume(n);
909 }
910 52 }
911
912 } // namespace capy
913 } // namespace boost
914
915 #endif
916