Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/write_sink.hpp>
20 : #include <boost/capy/coro.hpp>
21 : #include <boost/capy/ex/executor_ref.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any WriteSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref WriteSink concept, enabling runtime polymorphism for
42 : sink write operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper supports two construction modes:
46 : - **Owning**: Pass by value to transfer ownership. The wrapper
47 : allocates storage and owns the sink.
48 : - **Reference**: Pass a pointer to wrap without ownership. The
49 : pointed-to sink must outlive this wrapper.
50 :
51 : @par Awaitable Preallocation
52 : The constructor preallocates storage for the type-erased awaitable.
53 : This reserves all virtual address space at server startup
54 : so memory usage can be measured up front, rather than
55 : allocating piecemeal as traffic arrives.
56 :
57 : @par Immediate Completion
58 : Operations complete immediately without suspending when the
59 : buffer sequence is empty, or when the underlying sink's
60 : awaitable reports readiness via `await_ready`.
61 :
62 : @par Thread Safety
63 : Not thread-safe. Concurrent operations on the same wrapper
64 : are undefined behavior.
65 :
66 : @par Example
67 : @code
68 : // Owning - takes ownership of the sink
69 : any_write_sink ws(some_sink{args...});
70 :
71 : // Reference - wraps without ownership
72 : some_sink sink;
73 : any_write_sink ws(&sink);
74 :
75 : const_buffer buf(data, size);
76 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 : auto [ec2] = co_await ws.write_eof();
78 : @endcode
79 :
80 : @see any_write_stream, WriteSink
81 : */
82 : class any_write_sink
83 : {
84 : struct vtable;
85 : struct write_awaitable_ops;
86 : struct eof_awaitable_ops;
87 :
88 : template<WriteSink S>
89 : struct vtable_for_impl;
90 :
91 : void* sink_ = nullptr;
92 : vtable const* vt_ = nullptr;
93 : void* cached_awaitable_ = nullptr;
94 : void* storage_ = nullptr;
95 : write_awaitable_ops const* active_write_ops_ = nullptr;
96 : eof_awaitable_ops const* active_eof_ops_ = nullptr;
97 :
98 : public:
99 : /** Destructor.
100 :
101 : Destroys the owned sink (if any) and releases the cached
102 : awaitable storage.
103 : */
104 : ~any_write_sink();
105 :
106 : /** Default constructor.
107 :
108 : Constructs an empty wrapper. Operations on a default-constructed
109 : wrapper result in undefined behavior.
110 : */
111 : any_write_sink() = default;
112 :
113 : /** Non-copyable.
114 :
115 : The awaitable cache is per-instance and cannot be shared.
116 : */
117 : any_write_sink(any_write_sink const&) = delete;
118 : any_write_sink& operator=(any_write_sink const&) = delete;
119 :
120 : /** Move constructor.
121 :
122 : Transfers ownership of the wrapped sink (if owned) and
123 : cached awaitable storage from `other`. After the move, `other` is
124 : in a default-constructed state.
125 :
126 : @param other The wrapper to move from.
127 : */
128 1 : any_write_sink(any_write_sink&& other) noexcept
129 1 : : sink_(std::exchange(other.sink_, nullptr))
130 1 : , vt_(std::exchange(other.vt_, nullptr))
131 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1 : , storage_(std::exchange(other.storage_, nullptr))
133 1 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1 : , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 : {
136 1 : }
137 :
138 : /** Move assignment operator.
139 :
140 : Destroys any owned sink and releases existing resources,
141 : then transfers ownership from `other`.
142 :
143 : @param other The wrapper to move from.
144 : @return Reference to this wrapper.
145 : */
146 : any_write_sink&
147 : operator=(any_write_sink&& other) noexcept;
148 :
149 : /** Construct by taking ownership of a WriteSink.
150 :
151 : Allocates storage and moves the sink into this wrapper.
152 : The wrapper owns the sink and will destroy it.
153 :
154 : @param s The sink to take ownership of.
155 : */
156 : template<WriteSink S>
157 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 : any_write_sink(S s);
159 :
160 : /** Construct by wrapping a WriteSink without ownership.
161 :
162 : Wraps the given sink by pointer. The sink must remain
163 : valid for the lifetime of this wrapper.
164 :
165 : @param s Pointer to the sink to wrap.
166 : */
167 : template<WriteSink S>
168 : any_write_sink(S* s);
169 :
170 : /** Check if the wrapper contains a valid sink.
171 :
172 : @return `true` if wrapping a sink, `false` if default-constructed
173 : or moved-from.
174 : */
175 : bool
176 15 : has_value() const noexcept
177 : {
178 15 : return sink_ != nullptr;
179 : }
180 :
181 : /** Check if the wrapper contains a valid sink.
182 :
183 : @return `true` if wrapping a sink, `false` if default-constructed
184 : or moved-from.
185 : */
186 : explicit
187 2 : operator bool() const noexcept
188 : {
189 2 : return has_value();
190 : }
191 :
192 : /** Initiate a partial write operation.
193 :
194 : Writes one or more bytes from the provided buffer sequence.
195 : May consume less than the full sequence.
196 :
197 : @param buffers The buffer sequence containing data to write.
198 :
199 : @return An awaitable yielding `(error_code,std::size_t)`.
200 :
201 : @par Immediate Completion
202 : The operation completes immediately without suspending
203 : the calling coroutine when:
204 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205 : @li The underlying sink's awaitable reports immediate
206 : readiness via `await_ready`.
207 :
208 : @note This is a partial operation and may not process the
209 : entire buffer sequence. Use @ref write for guaranteed
210 : complete transfer.
211 :
212 : @par Preconditions
213 : The wrapper must contain a valid sink (`has_value() == true`).
214 : */
215 : template<ConstBufferSequence CB>
216 : auto
217 : write_some(CB buffers);
218 :
219 : /** Initiate a complete write operation.
220 :
221 : Writes data from the provided buffer sequence. The operation
222 : completes when all bytes have been consumed, or an error
223 : occurs. Forwards to the underlying sink's `write` operation,
224 : windowed through @ref buffer_param when the sequence exceeds
225 : the per-call buffer limit.
226 :
227 : @param buffers The buffer sequence containing data to write.
228 :
229 : @return An awaitable yielding `(error_code,std::size_t)`.
230 :
231 : @par Immediate Completion
232 : The operation completes immediately without suspending
233 : the calling coroutine when:
234 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235 : @li Every underlying `write` call completes
236 : immediately (the wrapped sink reports readiness
237 : via `await_ready` on each iteration).
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : template<ConstBufferSequence CB>
243 : io_task<std::size_t>
244 : write(CB buffers);
245 :
246 : /** Atomically write data and signal end-of-stream.
247 :
248 : Writes all data from the buffer sequence and then signals
249 : end-of-stream. The implementation decides how to partition
250 : the data across calls to the underlying sink's @ref write
251 : and `write_eof`. When the caller's buffer sequence is
252 : non-empty, the final call to the underlying sink is always
253 : `write_eof` with a non-empty buffer sequence. When the
254 : caller's buffer sequence is empty, only `write_eof()` with
255 : no data is called.
256 :
257 : @param buffers The buffer sequence containing data to write.
258 :
259 : @return An awaitable yielding `(error_code,std::size_t)`.
260 :
261 : @par Immediate Completion
262 : The operation completes immediately without suspending
263 : the calling coroutine when:
264 : @li The buffer sequence is empty. Only the @ref write_eof()
265 : call is performed.
266 : @li All underlying operations complete immediately (the
267 : wrapped sink reports readiness via `await_ready`).
268 :
269 : @par Preconditions
270 : The wrapper must contain a valid sink (`has_value() == true`).
271 : */
272 : template<ConstBufferSequence CB>
273 : io_task<std::size_t>
274 : write_eof(CB buffers);
275 :
276 : /** Signal end of data.
277 :
278 : Indicates that no more data will be written to the sink.
279 : The operation completes when the sink is finalized, or
280 : an error occurs.
281 :
282 : @return An awaitable yielding `(error_code)`.
283 :
284 : @par Immediate Completion
285 : The operation completes immediately without suspending
286 : the calling coroutine when the underlying sink's awaitable
287 : reports immediate readiness via `await_ready`.
288 :
289 : @par Preconditions
290 : The wrapper must contain a valid sink (`has_value() == true`).
291 : */
292 : auto
293 : write_eof();
294 :
295 : protected:
296 : /** Rebind to a new sink after move.
297 :
298 : Updates the internal pointer to reference a new sink object.
299 : Used by owning wrappers after move assignment when the owned
300 : object has moved to a new location.
301 :
302 : @param new_sink The new sink to bind to. Must be the same
303 : type as the original sink.
304 :
305 : @note Terminates if called with a sink of different type
306 : than the original.
307 : */
308 : template<WriteSink S>
309 : void
310 : rebind(S& new_sink) noexcept
311 : {
312 : if(vt_ != &vtable_for_impl<S>::value)
313 : std::terminate();
314 : sink_ = &new_sink;
315 : }
316 :
317 : private:
318 : auto
319 : write_some_(std::span<const_buffer const> buffers);
320 :
321 : auto
322 : write_(std::span<const_buffer const> buffers);
323 :
324 : auto
325 : write_eof_buffers_(std::span<const_buffer const> buffers);
326 : };
327 :
328 : //----------------------------------------------------------
329 :
330 : struct any_write_sink::write_awaitable_ops
331 : {
332 : bool (*await_ready)(void*);
333 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
334 : io_result<std::size_t> (*await_resume)(void*);
335 : void (*destroy)(void*) noexcept;
336 : };
337 :
338 : struct any_write_sink::eof_awaitable_ops
339 : {
340 : bool (*await_ready)(void*);
341 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
342 : io_result<> (*await_resume)(void*);
343 : void (*destroy)(void*) noexcept;
344 : };
345 :
346 : struct any_write_sink::vtable
347 : {
348 : write_awaitable_ops const* (*construct_write_some_awaitable)(
349 : void* sink,
350 : void* storage,
351 : std::span<const_buffer const> buffers);
352 : write_awaitable_ops const* (*construct_write_awaitable)(
353 : void* sink,
354 : void* storage,
355 : std::span<const_buffer const> buffers);
356 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
357 : void* sink,
358 : void* storage,
359 : std::span<const_buffer const> buffers);
360 : eof_awaitable_ops const* (*construct_eof_awaitable)(
361 : void* sink,
362 : void* storage);
363 : std::size_t awaitable_size;
364 : std::size_t awaitable_align;
365 : void (*destroy)(void*) noexcept;
366 : };
367 :
368 : template<WriteSink S>
369 : struct any_write_sink::vtable_for_impl
370 : {
371 : using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
372 : std::span<const_buffer const>{}));
373 : using WriteAwaitable = decltype(std::declval<S&>().write(
374 : std::span<const_buffer const>{}));
375 : using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
376 : std::span<const_buffer const>{}));
377 : using EofAwaitable = decltype(std::declval<S&>().write_eof());
378 :
379 : static void
380 6 : do_destroy_impl(void* sink) noexcept
381 : {
382 6 : static_cast<S*>(sink)->~S();
383 6 : }
384 :
385 : static write_awaitable_ops const*
386 40 : construct_write_some_awaitable_impl(
387 : void* sink,
388 : void* storage,
389 : std::span<const_buffer const> buffers)
390 : {
391 40 : auto& s = *static_cast<S*>(sink);
392 40 : ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
393 :
394 : static constexpr write_awaitable_ops ops = {
395 40 : +[](void* p) {
396 40 : return static_cast<WriteSomeAwaitable*>(p)->await_ready();
397 : },
398 2 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
399 4 : return detail::call_await_suspend(
400 2 : static_cast<WriteSomeAwaitable*>(p), h, ex, token);
401 : },
402 38 : +[](void* p) {
403 38 : return static_cast<WriteSomeAwaitable*>(p)->await_resume();
404 : },
405 42 : +[](void* p) noexcept {
406 2 : static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
407 : }
408 : };
409 40 : return &ops;
410 : }
411 :
412 : static write_awaitable_ops const*
413 78 : construct_write_awaitable_impl(
414 : void* sink,
415 : void* storage,
416 : std::span<const_buffer const> buffers)
417 : {
418 78 : auto& s = *static_cast<S*>(sink);
419 78 : ::new(storage) WriteAwaitable(s.write(buffers));
420 :
421 : static constexpr write_awaitable_ops ops = {
422 78 : +[](void* p) {
423 78 : return static_cast<WriteAwaitable*>(p)->await_ready();
424 : },
425 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
426 0 : return detail::call_await_suspend(
427 0 : static_cast<WriteAwaitable*>(p), h, ex, token);
428 : },
429 78 : +[](void* p) {
430 78 : return static_cast<WriteAwaitable*>(p)->await_resume();
431 : },
432 78 : +[](void* p) noexcept {
433 0 : static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
434 : }
435 : };
436 78 : return &ops;
437 : }
438 :
439 : static write_awaitable_ops const*
440 16 : construct_write_eof_buffers_awaitable_impl(
441 : void* sink,
442 : void* storage,
443 : std::span<const_buffer const> buffers)
444 : {
445 16 : auto& s = *static_cast<S*>(sink);
446 16 : ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
447 :
448 : static constexpr write_awaitable_ops ops = {
449 16 : +[](void* p) {
450 16 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
451 : },
452 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
453 0 : return detail::call_await_suspend(
454 0 : static_cast<WriteEofBuffersAwaitable*>(p), h, ex, token);
455 : },
456 16 : +[](void* p) {
457 16 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
458 : },
459 16 : +[](void* p) noexcept {
460 0 : static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
461 : }
462 : };
463 16 : return &ops;
464 : }
465 :
466 : static eof_awaitable_ops const*
467 17 : construct_eof_awaitable_impl(
468 : void* sink,
469 : void* storage)
470 : {
471 17 : auto& s = *static_cast<S*>(sink);
472 17 : ::new(storage) EofAwaitable(s.write_eof());
473 :
474 : static constexpr eof_awaitable_ops ops = {
475 17 : +[](void* p) {
476 17 : return static_cast<EofAwaitable*>(p)->await_ready();
477 : },
478 1 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
479 2 : return detail::call_await_suspend(
480 1 : static_cast<EofAwaitable*>(p), h, ex, token);
481 : },
482 16 : +[](void* p) {
483 16 : return static_cast<EofAwaitable*>(p)->await_resume();
484 : },
485 18 : +[](void* p) noexcept {
486 1 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
487 : }
488 : };
489 17 : return &ops;
490 : }
491 :
492 : static constexpr std::size_t max4(
493 : std::size_t a, std::size_t b,
494 : std::size_t c, std::size_t d) noexcept
495 : {
496 : std::size_t ab = a > b ? a : b;
497 : std::size_t cd = c > d ? c : d;
498 : return ab > cd ? ab : cd;
499 : }
500 :
501 : static constexpr std::size_t max_awaitable_size =
502 : max4(sizeof(WriteSomeAwaitable),
503 : sizeof(WriteAwaitable),
504 : sizeof(WriteEofBuffersAwaitable),
505 : sizeof(EofAwaitable));
506 :
507 : static constexpr std::size_t max_awaitable_align =
508 : max4(alignof(WriteSomeAwaitable),
509 : alignof(WriteAwaitable),
510 : alignof(WriteEofBuffersAwaitable),
511 : alignof(EofAwaitable));
512 :
513 : static constexpr vtable value = {
514 : &construct_write_some_awaitable_impl,
515 : &construct_write_awaitable_impl,
516 : &construct_write_eof_buffers_awaitable_impl,
517 : &construct_eof_awaitable_impl,
518 : max_awaitable_size,
519 : max_awaitable_align,
520 : &do_destroy_impl
521 : };
522 : };
523 :
524 : //----------------------------------------------------------
525 :
526 : inline
527 129 : any_write_sink::~any_write_sink()
528 : {
529 129 : if(storage_)
530 : {
531 6 : vt_->destroy(sink_);
532 6 : ::operator delete(storage_);
533 : }
534 129 : if(cached_awaitable_)
535 : {
536 124 : if(active_write_ops_)
537 1 : active_write_ops_->destroy(cached_awaitable_);
538 123 : else if(active_eof_ops_)
539 1 : active_eof_ops_->destroy(cached_awaitable_);
540 124 : ::operator delete(cached_awaitable_);
541 : }
542 129 : }
543 :
544 : inline any_write_sink&
545 2 : any_write_sink::operator=(any_write_sink&& other) noexcept
546 : {
547 2 : if(this != &other)
548 : {
549 2 : if(storage_)
550 : {
551 0 : vt_->destroy(sink_);
552 0 : ::operator delete(storage_);
553 : }
554 2 : if(cached_awaitable_)
555 : {
556 1 : if(active_write_ops_)
557 1 : active_write_ops_->destroy(cached_awaitable_);
558 0 : else if(active_eof_ops_)
559 0 : active_eof_ops_->destroy(cached_awaitable_);
560 1 : ::operator delete(cached_awaitable_);
561 : }
562 2 : sink_ = std::exchange(other.sink_, nullptr);
563 2 : vt_ = std::exchange(other.vt_, nullptr);
564 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
565 2 : storage_ = std::exchange(other.storage_, nullptr);
566 2 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
567 2 : active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
568 : }
569 2 : return *this;
570 : }
571 :
572 : template<WriteSink S>
573 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
574 6 : any_write_sink::any_write_sink(S s)
575 6 : : vt_(&vtable_for_impl<S>::value)
576 : {
577 : struct guard {
578 : any_write_sink* self;
579 : bool committed = false;
580 6 : ~guard() {
581 6 : if(!committed && self->storage_) {
582 0 : self->vt_->destroy(self->sink_);
583 0 : ::operator delete(self->storage_);
584 0 : self->storage_ = nullptr;
585 0 : self->sink_ = nullptr;
586 : }
587 6 : }
588 6 : } g{this};
589 :
590 6 : storage_ = ::operator new(sizeof(S));
591 6 : sink_ = ::new(storage_) S(std::move(s));
592 :
593 : // Preallocate the awaitable storage (sized for max of write/eof)
594 6 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
595 :
596 6 : g.committed = true;
597 6 : }
598 :
599 : template<WriteSink S>
600 119 : any_write_sink::any_write_sink(S* s)
601 119 : : sink_(s)
602 119 : , vt_(&vtable_for_impl<S>::value)
603 : {
604 : // Preallocate the awaitable storage (sized for max of write/eof)
605 119 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
606 119 : }
607 :
608 : //----------------------------------------------------------
609 :
610 : inline auto
611 : any_write_sink::write_some_(
612 : std::span<const_buffer const> buffers)
613 : {
614 : struct awaitable
615 : {
616 : any_write_sink* self_;
617 : std::span<const_buffer const> buffers_;
618 :
619 : bool
620 : await_ready() const noexcept
621 : {
622 : return false;
623 : }
624 :
625 : coro
626 : await_suspend(coro h, executor_ref ex, std::stop_token token)
627 : {
628 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
629 : self_->sink_,
630 : self_->cached_awaitable_,
631 : buffers_);
632 :
633 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
634 : return h;
635 :
636 : return self_->active_write_ops_->await_suspend(
637 : self_->cached_awaitable_, h, ex, token);
638 : }
639 :
640 : io_result<std::size_t>
641 : await_resume()
642 : {
643 : struct guard {
644 : any_write_sink* self;
645 : ~guard() {
646 : self->active_write_ops_->destroy(self->cached_awaitable_);
647 : self->active_write_ops_ = nullptr;
648 : }
649 : } g{self_};
650 : return self_->active_write_ops_->await_resume(
651 : self_->cached_awaitable_);
652 : }
653 : };
654 : return awaitable{this, buffers};
655 : }
656 :
657 : inline auto
658 78 : any_write_sink::write_(
659 : std::span<const_buffer const> buffers)
660 : {
661 : struct awaitable
662 : {
663 : any_write_sink* self_;
664 : std::span<const_buffer const> buffers_;
665 :
666 : bool
667 78 : await_ready() const noexcept
668 : {
669 78 : return false;
670 : }
671 :
672 : coro
673 78 : await_suspend(coro h, executor_ref ex, std::stop_token token)
674 : {
675 156 : self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
676 78 : self_->sink_,
677 78 : self_->cached_awaitable_,
678 : buffers_);
679 :
680 78 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
681 78 : return h;
682 :
683 0 : return self_->active_write_ops_->await_suspend(
684 0 : self_->cached_awaitable_, h, ex, token);
685 : }
686 :
687 : io_result<std::size_t>
688 78 : await_resume()
689 : {
690 : struct guard {
691 : any_write_sink* self;
692 78 : ~guard() {
693 78 : self->active_write_ops_->destroy(self->cached_awaitable_);
694 78 : self->active_write_ops_ = nullptr;
695 78 : }
696 78 : } g{self_};
697 78 : return self_->active_write_ops_->await_resume(
698 135 : self_->cached_awaitable_);
699 78 : }
700 : };
701 78 : return awaitable{this, buffers};
702 : }
703 :
704 : inline auto
705 17 : any_write_sink::write_eof()
706 : {
707 : struct awaitable
708 : {
709 : any_write_sink* self_;
710 :
711 : bool
712 17 : await_ready() const noexcept
713 : {
714 17 : return false;
715 : }
716 :
717 : coro
718 17 : await_suspend(coro h, executor_ref ex, std::stop_token token)
719 : {
720 : // Construct the underlying awaitable into cached storage
721 34 : self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
722 17 : self_->sink_,
723 17 : self_->cached_awaitable_);
724 :
725 : // Check if underlying is immediately ready
726 17 : if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
727 16 : return h;
728 :
729 : // Forward to underlying awaitable
730 2 : return self_->active_eof_ops_->await_suspend(
731 1 : self_->cached_awaitable_, h, ex, token);
732 : }
733 :
734 : io_result<>
735 16 : await_resume()
736 : {
737 : struct guard {
738 : any_write_sink* self;
739 16 : ~guard() {
740 16 : self->active_eof_ops_->destroy(self->cached_awaitable_);
741 16 : self->active_eof_ops_ = nullptr;
742 16 : }
743 16 : } g{self_};
744 16 : return self_->active_eof_ops_->await_resume(
745 27 : self_->cached_awaitable_);
746 16 : }
747 : };
748 17 : return awaitable{this};
749 : }
750 :
751 : inline auto
752 16 : any_write_sink::write_eof_buffers_(
753 : std::span<const_buffer const> buffers)
754 : {
755 : struct awaitable
756 : {
757 : any_write_sink* self_;
758 : std::span<const_buffer const> buffers_;
759 :
760 : bool
761 16 : await_ready() const noexcept
762 : {
763 16 : return false;
764 : }
765 :
766 : coro
767 16 : await_suspend(coro h, executor_ref ex, std::stop_token token)
768 : {
769 32 : self_->active_write_ops_ =
770 32 : self_->vt_->construct_write_eof_buffers_awaitable(
771 16 : self_->sink_,
772 16 : self_->cached_awaitable_,
773 : buffers_);
774 :
775 16 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
776 16 : return h;
777 :
778 0 : return self_->active_write_ops_->await_suspend(
779 0 : self_->cached_awaitable_, h, ex, token);
780 : }
781 :
782 : io_result<std::size_t>
783 16 : await_resume()
784 : {
785 : struct guard {
786 : any_write_sink* self;
787 16 : ~guard() {
788 16 : self->active_write_ops_->destroy(self->cached_awaitable_);
789 16 : self->active_write_ops_ = nullptr;
790 16 : }
791 16 : } g{self_};
792 16 : return self_->active_write_ops_->await_resume(
793 27 : self_->cached_awaitable_);
794 16 : }
795 : };
796 16 : return awaitable{this, buffers};
797 : }
798 :
799 : template<ConstBufferSequence CB>
800 : auto
801 42 : any_write_sink::write_some(CB buffers)
802 : {
803 : struct awaitable
804 : {
805 : any_write_sink* self_;
806 : const_buffer_array<detail::max_iovec_> ba_;
807 :
808 42 : awaitable(
809 : any_write_sink* self,
810 : CB const& buffers)
811 42 : : self_(self)
812 42 : , ba_(buffers)
813 : {
814 42 : }
815 :
816 : bool
817 42 : await_ready() const noexcept
818 : {
819 42 : return ba_.to_span().empty();
820 : }
821 :
822 : coro
823 40 : await_suspend(coro h, executor_ref ex, std::stop_token token)
824 : {
825 40 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
826 40 : self_->sink_,
827 40 : self_->cached_awaitable_,
828 40 : ba_.to_span());
829 :
830 40 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
831 38 : return h;
832 :
833 4 : return self_->active_write_ops_->await_suspend(
834 2 : self_->cached_awaitable_, h, ex, token);
835 : }
836 :
837 : io_result<std::size_t>
838 40 : await_resume()
839 : {
840 40 : if(ba_.to_span().empty())
841 2 : return {{}, 0};
842 :
843 : struct guard {
844 : any_write_sink* self;
845 38 : ~guard() {
846 38 : self->active_write_ops_->destroy(self->cached_awaitable_);
847 38 : self->active_write_ops_ = nullptr;
848 38 : }
849 38 : } g{self_};
850 38 : return self_->active_write_ops_->await_resume(
851 38 : self_->cached_awaitable_);
852 38 : }
853 : };
854 42 : return awaitable{this, buffers};
855 : }
856 :
857 : template<ConstBufferSequence CB>
858 : io_task<std::size_t>
859 68 : any_write_sink::write(CB buffers)
860 : {
861 : buffer_param<CB> bp(buffers);
862 : std::size_t total = 0;
863 :
864 : for(;;)
865 : {
866 : auto bufs = bp.data();
867 : if(bufs.empty())
868 : break;
869 :
870 : auto [ec, n] = co_await write_(bufs);
871 : total += n;
872 : if(ec)
873 : co_return {ec, total};
874 : bp.consume(n);
875 : }
876 :
877 : co_return {{}, total};
878 136 : }
879 :
880 : template<ConstBufferSequence CB>
881 : io_task<std::size_t>
882 26 : any_write_sink::write_eof(CB buffers)
883 : {
884 : const_buffer_param<CB> bp(buffers);
885 : std::size_t total = 0;
886 :
887 : for(;;)
888 : {
889 : auto bufs = bp.data();
890 : if(bufs.empty())
891 : {
892 : auto [ec] = co_await write_eof();
893 : co_return {ec, total};
894 : }
895 :
896 : if(! bp.more())
897 : {
898 : // Last window — send atomically with EOF
899 : auto [ec, n] = co_await write_eof_buffers_(bufs);
900 : total += n;
901 : co_return {ec, total};
902 : }
903 :
904 : auto [ec, n] = co_await write_(bufs);
905 : total += n;
906 : if(ec)
907 : co_return {ec, total};
908 : bp.consume(n);
909 : }
910 52 : }
911 :
912 : } // namespace capy
913 : } // namespace boost
914 :
915 : #endif
|