Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <boost/capy/io_result.hpp>
24 : #include <boost/capy/io_task.hpp>
25 :
26 : #include <concepts>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <exception>
30 : #include <new>
31 : #include <span>
32 : #include <stop_token>
33 : #include <system_error>
34 : #include <utility>
35 :
36 : namespace boost {
37 : namespace capy {
38 :
39 : /** Type-erased wrapper for any BufferSink.
40 :
41 : This class provides type erasure for any type satisfying the
42 : @ref BufferSink concept, enabling runtime polymorphism for
43 : buffer sink operations. It uses cached awaitable storage to achieve
44 : zero steady-state allocation after construction.
45 :
46 : The wrapper exposes two interfaces for producing data:
47 : the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
48 : and the @ref WriteSink interface (`write_some`, `write`,
49 : `write_eof`). Choose the interface that matches how your data
50 : is produced:
51 :
52 : @par Choosing an Interface
53 :
54 : Use the **BufferSink** interface when you are a generator that
55 : produces data into externally-provided buffers. The sink owns
56 : the memory; you call @ref prepare to obtain writable buffers,
57 : fill them, then call @ref commit or @ref commit_eof.
58 :
59 : Use the **WriteSink** interface when you already have buffers
60 : containing the data to write:
61 : - If the entire body is available up front, call
62 : @ref write_eof(buffers) to send everything atomically.
63 : - If data arrives incrementally, call @ref write or
64 : @ref write_some in a loop, then @ref write_eof() when done.
65 : Prefer `write` (complete) unless your streaming pattern
66 : benefits from partial writes via `write_some`.
67 :
68 : If the wrapped type only satisfies @ref BufferSink, the
69 : @ref WriteSink operations are provided automatically.
70 :
71 : @par Construction Modes
72 :
73 : - **Owning**: Pass by value to transfer ownership. The wrapper
74 : allocates storage and owns the sink.
75 : - **Reference**: Pass a pointer to wrap without ownership. The
76 : pointed-to sink must outlive this wrapper.
77 :
78 : @par Awaitable Preallocation
79 : The constructor preallocates storage for the type-erased awaitable.
80 : This reserves all virtual address space at server startup
81 : so memory usage can be measured up front, rather than
82 : allocating piecemeal as traffic arrives.
83 :
84 : @par Thread Safety
85 : Not thread-safe. Concurrent operations on the same wrapper
86 : are undefined behavior.
87 :
88 : @par Example
89 : @code
90 : // Owning - takes ownership of the sink
91 : any_buffer_sink abs(some_buffer_sink{args...});
92 :
93 : // Reference - wraps without ownership
94 : some_buffer_sink sink;
95 : any_buffer_sink abs(&sink);
96 :
97 : // BufferSink interface: generate into callee-owned buffers
98 : mutable_buffer arr[16];
99 : auto bufs = abs.prepare(arr);
100 : // Write data into bufs[0..bufs.size())
101 : auto [ec] = co_await abs.commit(bytes_written);
102 : auto [ec2] = co_await abs.commit_eof(0);
103 :
104 : // WriteSink interface: send caller-owned buffers
105 : auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
106 : auto [ec4] = co_await abs.write_eof();
107 :
108 : // Or send everything at once
109 : auto [ec5, n2] = co_await abs.write_eof(
110 : make_buffer(body_data));
111 : @endcode
112 :
113 : @see any_buffer_source, BufferSink, WriteSink
114 : */
115 : class any_buffer_sink
116 : {
117 : struct vtable;
118 : struct awaitable_ops;
119 : struct write_awaitable_ops;
120 :
121 : template<BufferSink S>
122 : struct vtable_for_impl;
123 :
124 : // hot-path members first for cache locality
125 : void* sink_ = nullptr;
126 : vtable const* vt_ = nullptr;
127 : void* cached_awaitable_ = nullptr;
128 : awaitable_ops const* active_ops_ = nullptr;
129 : write_awaitable_ops const* active_write_ops_ = nullptr;
130 : void* storage_ = nullptr;
131 :
132 : public:
133 : /** Destructor.
134 :
135 : Destroys the owned sink (if any) and releases the cached
136 : awaitable storage.
137 : */
138 : ~any_buffer_sink();
139 :
140 : /** Default constructor.
141 :
142 : Constructs an empty wrapper. Operations on a default-constructed
143 : wrapper result in undefined behavior.
144 : */
145 : any_buffer_sink() = default;
146 :
147 : /** Non-copyable.
148 :
149 : The awaitable cache is per-instance and cannot be shared.
150 : */
151 : any_buffer_sink(any_buffer_sink const&) = delete;
152 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
153 :
154 : /** Move constructor.
155 :
156 : Transfers ownership of the wrapped sink (if owned) and
157 : cached awaitable storage from `other`. After the move, `other` is
158 : in a default-constructed state.
159 :
160 : @param other The wrapper to move from.
161 : */
162 2 : any_buffer_sink(any_buffer_sink&& other) noexcept
163 2 : : sink_(std::exchange(other.sink_, nullptr))
164 2 : , vt_(std::exchange(other.vt_, nullptr))
165 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
166 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
167 2 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
168 2 : , storage_(std::exchange(other.storage_, nullptr))
169 : {
170 2 : }
171 :
172 : /** Move assignment operator.
173 :
174 : Destroys any owned sink and releases existing resources,
175 : then transfers ownership from `other`.
176 :
177 : @param other The wrapper to move from.
178 : @return Reference to this wrapper.
179 : */
180 : any_buffer_sink&
181 : operator=(any_buffer_sink&& other) noexcept;
182 :
183 : /** Construct by taking ownership of a BufferSink.
184 :
185 : Allocates storage and moves the sink into this wrapper.
186 : The wrapper owns the sink and will destroy it. If `S` also
187 : satisfies @ref WriteSink, native write operations are
188 : forwarded through the virtual boundary.
189 :
190 : @param s The sink to take ownership of.
191 : */
192 : template<BufferSink S>
193 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
194 : any_buffer_sink(S s);
195 :
196 : /** Construct by wrapping a BufferSink without ownership.
197 :
198 : Wraps the given sink by pointer. The sink must remain
199 : valid for the lifetime of this wrapper. If `S` also
200 : satisfies @ref WriteSink, native write operations are
201 : forwarded through the virtual boundary.
202 :
203 : @param s Pointer to the sink to wrap.
204 : */
205 : template<BufferSink S>
206 : any_buffer_sink(S* s);
207 :
208 : /** Check if the wrapper contains a valid sink.
209 :
210 : @return `true` if wrapping a sink, `false` if default-constructed
211 : or moved-from.
212 : */
213 : bool
214 26 : has_value() const noexcept
215 : {
216 26 : return sink_ != nullptr;
217 : }
218 :
219 : /** Check if the wrapper contains a valid sink.
220 :
221 : @return `true` if wrapping a sink, `false` if default-constructed
222 : or moved-from.
223 : */
224 : explicit
225 3 : operator bool() const noexcept
226 : {
227 3 : return has_value();
228 : }
229 :
230 : /** Prepare writable buffers.
231 :
232 : Fills the provided span with mutable buffer descriptors
233 : pointing to the underlying sink's internal storage. This
234 : operation is synchronous.
235 :
236 : @param dest Span of mutable_buffer to fill.
237 :
238 : @return A span of filled buffers.
239 :
240 : @par Preconditions
241 : The wrapper must contain a valid sink (`has_value() == true`).
242 : */
243 : std::span<mutable_buffer>
244 : prepare(std::span<mutable_buffer> dest);
245 :
246 : /** Commit bytes written to the prepared buffers.
247 :
248 : Commits `n` bytes written to the buffers returned by the
249 : most recent call to @ref prepare. The operation may trigger
250 : underlying I/O.
251 :
252 : @param n The number of bytes to commit.
253 :
254 : @return An awaitable yielding `(error_code)`.
255 :
256 : @par Preconditions
257 : The wrapper must contain a valid sink (`has_value() == true`).
258 : */
259 : auto
260 : commit(std::size_t n);
261 :
262 : /** Commit final bytes and signal end-of-stream.
263 :
264 : Commits `n` bytes written to the buffers returned by the
265 : most recent call to @ref prepare and finalizes the sink.
266 : After success, no further operations are permitted.
267 :
268 : @param n The number of bytes to commit.
269 :
270 : @return An awaitable yielding `(error_code)`.
271 :
272 : @par Preconditions
273 : The wrapper must contain a valid sink (`has_value() == true`).
274 : */
275 : auto
276 : commit_eof(std::size_t n);
277 :
278 : /** Write some data from a buffer sequence.
279 :
280 : Writes one or more bytes from the buffer sequence to the
281 : underlying sink. May consume less than the full sequence.
282 :
283 : When the wrapped type provides native @ref WriteSink support,
284 : the operation forwards directly. Otherwise it is synthesized
285 : from @ref prepare and @ref commit with a buffer copy.
286 :
287 : @param buffers The buffer sequence to write.
288 :
289 : @return An awaitable yielding `(error_code,std::size_t)`.
290 :
291 : @par Preconditions
292 : The wrapper must contain a valid sink (`has_value() == true`).
293 : */
294 : template<ConstBufferSequence CB>
295 : io_task<std::size_t>
296 : write_some(CB buffers);
297 :
298 : /** Write all data from a buffer sequence.
299 :
300 : Writes all data from the buffer sequence to the underlying
301 : sink. This method satisfies the @ref WriteSink concept.
302 :
303 : When the wrapped type provides native @ref WriteSink support,
304 : each window is forwarded directly. Otherwise the data is
305 : copied into the sink via @ref prepare and @ref commit.
306 :
307 : @param buffers The buffer sequence to write.
308 :
309 : @return An awaitable yielding `(error_code,std::size_t)`.
310 :
311 : @par Preconditions
312 : The wrapper must contain a valid sink (`has_value() == true`).
313 : */
314 : template<ConstBufferSequence CB>
315 : io_task<std::size_t>
316 : write(CB buffers);
317 :
318 : /** Atomically write data and signal end-of-stream.
319 :
320 : Writes all data from the buffer sequence to the underlying
321 : sink and then signals end-of-stream.
322 :
323 : When the wrapped type provides native @ref WriteSink support,
324 : the final window is sent atomically via the underlying
325 : `write_eof(buffers)`. Otherwise the data is synthesized
326 : through @ref prepare, @ref commit, and @ref commit_eof.
327 :
328 : @param buffers The buffer sequence to write.
329 :
330 : @return An awaitable yielding `(error_code,std::size_t)`.
331 :
332 : @par Preconditions
333 : The wrapper must contain a valid sink (`has_value() == true`).
334 : */
335 : template<ConstBufferSequence CB>
336 : io_task<std::size_t>
337 : write_eof(CB buffers);
338 :
339 : /** Signal end-of-stream.
340 :
341 : Indicates that no more data will be written to the sink.
342 : This method satisfies the @ref WriteSink concept.
343 :
344 : When the wrapped type provides native @ref WriteSink support,
345 : the underlying `write_eof()` is called. Otherwise the
346 : operation is implemented as `commit_eof(0)`.
347 :
348 : @return An awaitable yielding `(error_code)`.
349 :
350 : @par Preconditions
351 : The wrapper must contain a valid sink (`has_value() == true`).
352 : */
353 : auto
354 : write_eof();
355 :
356 : protected:
357 : /** Rebind to a new sink after move.
358 :
359 : Updates the internal pointer to reference a new sink object.
360 : Used by owning wrappers after move assignment when the owned
361 : object has moved to a new location.
362 :
363 : @param new_sink The new sink to bind to. Must be the same
364 : type as the original sink.
365 :
366 : @note Terminates if called with a sink of different type
367 : than the original.
368 : */
369 : template<BufferSink S>
370 : void
371 : rebind(S& new_sink) noexcept
372 : {
373 : if(vt_ != &vtable_for_impl<S>::value)
374 : std::terminate();
375 : sink_ = &new_sink;
376 : }
377 :
378 : private:
379 : /** Forward a partial write through the vtable.
380 :
381 : Constructs the underlying `write_some` awaitable in
382 : cached storage and returns a type-erased awaitable.
383 : */
384 : auto
385 : write_some_(std::span<const_buffer const> buffers);
386 :
387 : /** Forward a complete write through the vtable.
388 :
389 : Constructs the underlying `write` awaitable in
390 : cached storage and returns a type-erased awaitable.
391 : */
392 : auto
393 : write_(std::span<const_buffer const> buffers);
394 :
395 : /** Forward an atomic write-with-EOF through the vtable.
396 :
397 : Constructs the underlying `write_eof(buffers)` awaitable
398 : in cached storage and returns a type-erased awaitable.
399 : */
400 : auto
401 : write_eof_buffers_(std::span<const_buffer const> buffers);
402 : };
403 :
404 : //----------------------------------------------------------
405 :
406 : /** Type-erased ops for awaitables yielding `io_result<>`. */
407 : struct any_buffer_sink::awaitable_ops
408 : {
409 : bool (*await_ready)(void*);
410 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
411 : io_result<> (*await_resume)(void*);
412 : void (*destroy)(void*) noexcept;
413 : };
414 :
415 : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
416 : struct any_buffer_sink::write_awaitable_ops
417 : {
418 : bool (*await_ready)(void*);
419 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
420 : io_result<std::size_t> (*await_resume)(void*);
421 : void (*destroy)(void*) noexcept;
422 : };
423 :
424 : struct any_buffer_sink::vtable
425 : {
426 : void (*destroy)(void*) noexcept;
427 : std::span<mutable_buffer> (*do_prepare)(
428 : void* sink,
429 : std::span<mutable_buffer> dest);
430 : std::size_t awaitable_size;
431 : std::size_t awaitable_align;
432 : awaitable_ops const* (*construct_commit_awaitable)(
433 : void* sink,
434 : void* storage,
435 : std::size_t n);
436 : awaitable_ops const* (*construct_commit_eof_awaitable)(
437 : void* sink,
438 : void* storage,
439 : std::size_t n);
440 :
441 : // WriteSink forwarding (null when wrapped type is BufferSink-only)
442 : write_awaitable_ops const* (*construct_write_some_awaitable)(
443 : void* sink,
444 : void* storage,
445 : std::span<const_buffer const> buffers);
446 : write_awaitable_ops const* (*construct_write_awaitable)(
447 : void* sink,
448 : void* storage,
449 : std::span<const_buffer const> buffers);
450 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
451 : void* sink,
452 : void* storage,
453 : std::span<const_buffer const> buffers);
454 : awaitable_ops const* (*construct_write_eof_awaitable)(
455 : void* sink,
456 : void* storage);
457 : };
458 :
459 : template<BufferSink S>
460 : struct any_buffer_sink::vtable_for_impl
461 : {
462 : using CommitAwaitable = decltype(std::declval<S&>().commit(
463 : std::size_t{}));
464 : using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
465 : std::size_t{}));
466 :
467 : static void
468 18 : do_destroy_impl(void* sink) noexcept
469 : {
470 18 : static_cast<S*>(sink)->~S();
471 18 : }
472 :
473 : static std::span<mutable_buffer>
474 126 : do_prepare_impl(
475 : void* sink,
476 : std::span<mutable_buffer> dest)
477 : {
478 126 : auto& s = *static_cast<S*>(sink);
479 126 : return s.prepare(dest);
480 : }
481 :
482 : static awaitable_ops const*
483 96 : construct_commit_awaitable_impl(
484 : void* sink,
485 : void* storage,
486 : std::size_t n)
487 : {
488 96 : auto& s = *static_cast<S*>(sink);
489 96 : ::new(storage) CommitAwaitable(s.commit(n));
490 :
491 : static constexpr awaitable_ops ops = {
492 96 : +[](void* p) {
493 96 : return static_cast<CommitAwaitable*>(p)->await_ready();
494 : },
495 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
496 0 : return detail::call_await_suspend(
497 0 : static_cast<CommitAwaitable*>(p), h, ex, token);
498 : },
499 96 : +[](void* p) {
500 96 : return static_cast<CommitAwaitable*>(p)->await_resume();
501 : },
502 96 : +[](void* p) noexcept {
503 96 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
504 : }
505 : };
506 96 : return &ops;
507 : }
508 :
509 : static awaitable_ops const*
510 70 : construct_commit_eof_awaitable_impl(
511 : void* sink,
512 : void* storage,
513 : std::size_t n)
514 : {
515 70 : auto& s = *static_cast<S*>(sink);
516 70 : ::new(storage) CommitEofAwaitable(s.commit_eof(n));
517 :
518 : static constexpr awaitable_ops ops = {
519 70 : +[](void* p) {
520 70 : return static_cast<CommitEofAwaitable*>(p)->await_ready();
521 : },
522 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
523 0 : return detail::call_await_suspend(
524 0 : static_cast<CommitEofAwaitable*>(p), h, ex, token);
525 : },
526 70 : +[](void* p) {
527 70 : return static_cast<CommitEofAwaitable*>(p)->await_resume();
528 : },
529 70 : +[](void* p) noexcept {
530 70 : static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
531 : }
532 : };
533 70 : return &ops;
534 : }
535 :
536 : //------------------------------------------------------
537 : // WriteSink forwarding (only instantiated when WriteSink<S>)
538 :
539 : static write_awaitable_ops const*
540 6 : construct_write_some_awaitable_impl(
541 : void* sink,
542 : void* storage,
543 : std::span<const_buffer const> buffers)
544 : requires WriteSink<S>
545 : {
546 : using Aw = decltype(std::declval<S&>().write_some(
547 : std::span<const_buffer const>{}));
548 6 : auto& s = *static_cast<S*>(sink);
549 6 : ::new(storage) Aw(s.write_some(buffers));
550 :
551 : static constexpr write_awaitable_ops ops = {
552 6 : +[](void* p) {
553 6 : return static_cast<Aw*>(p)->await_ready();
554 : },
555 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
556 0 : return detail::call_await_suspend(
557 0 : static_cast<Aw*>(p), h, ex, token);
558 : },
559 6 : +[](void* p) {
560 6 : return static_cast<Aw*>(p)->await_resume();
561 : },
562 6 : +[](void* p) noexcept {
563 6 : static_cast<Aw*>(p)->~Aw();
564 : }
565 : };
566 6 : return &ops;
567 : }
568 :
569 : static write_awaitable_ops const*
570 14 : construct_write_awaitable_impl(
571 : void* sink,
572 : void* storage,
573 : std::span<const_buffer const> buffers)
574 : requires WriteSink<S>
575 : {
576 : using Aw = decltype(std::declval<S&>().write(
577 : std::span<const_buffer const>{}));
578 14 : auto& s = *static_cast<S*>(sink);
579 14 : ::new(storage) Aw(s.write(buffers));
580 :
581 : static constexpr write_awaitable_ops ops = {
582 14 : +[](void* p) {
583 14 : return static_cast<Aw*>(p)->await_ready();
584 : },
585 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
586 0 : return detail::call_await_suspend(
587 0 : static_cast<Aw*>(p), h, ex, token);
588 : },
589 14 : +[](void* p) {
590 14 : return static_cast<Aw*>(p)->await_resume();
591 : },
592 14 : +[](void* p) noexcept {
593 14 : static_cast<Aw*>(p)->~Aw();
594 : }
595 : };
596 14 : return &ops;
597 : }
598 :
599 : static write_awaitable_ops const*
600 12 : construct_write_eof_buffers_awaitable_impl(
601 : void* sink,
602 : void* storage,
603 : std::span<const_buffer const> buffers)
604 : requires WriteSink<S>
605 : {
606 : using Aw = decltype(std::declval<S&>().write_eof(
607 : std::span<const_buffer const>{}));
608 12 : auto& s = *static_cast<S*>(sink);
609 12 : ::new(storage) Aw(s.write_eof(buffers));
610 :
611 : static constexpr write_awaitable_ops ops = {
612 12 : +[](void* p) {
613 12 : return static_cast<Aw*>(p)->await_ready();
614 : },
615 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
616 0 : return detail::call_await_suspend(
617 0 : static_cast<Aw*>(p), h, ex, token);
618 : },
619 12 : +[](void* p) {
620 12 : return static_cast<Aw*>(p)->await_resume();
621 : },
622 12 : +[](void* p) noexcept {
623 12 : static_cast<Aw*>(p)->~Aw();
624 : }
625 : };
626 12 : return &ops;
627 : }
628 :
629 : static awaitable_ops const*
630 16 : construct_write_eof_awaitable_impl(
631 : void* sink,
632 : void* storage)
633 : requires WriteSink<S>
634 : {
635 : using Aw = decltype(std::declval<S&>().write_eof());
636 16 : auto& s = *static_cast<S*>(sink);
637 16 : ::new(storage) Aw(s.write_eof());
638 :
639 : static constexpr awaitable_ops ops = {
640 16 : +[](void* p) {
641 16 : return static_cast<Aw*>(p)->await_ready();
642 : },
643 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
644 0 : return detail::call_await_suspend(
645 0 : static_cast<Aw*>(p), h, ex, token);
646 : },
647 16 : +[](void* p) {
648 16 : return static_cast<Aw*>(p)->await_resume();
649 : },
650 16 : +[](void* p) noexcept {
651 16 : static_cast<Aw*>(p)->~Aw();
652 : }
653 : };
654 16 : return &ops;
655 : }
656 :
657 : //------------------------------------------------------
658 :
659 : static consteval std::size_t
660 : compute_max_size() noexcept
661 : {
662 : std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
663 : ? sizeof(CommitAwaitable)
664 : : sizeof(CommitEofAwaitable);
665 : if constexpr (WriteSink<S>)
666 : {
667 : using WS = decltype(std::declval<S&>().write_some(
668 : std::span<const_buffer const>{}));
669 : using W = decltype(std::declval<S&>().write(
670 : std::span<const_buffer const>{}));
671 : using WEB = decltype(std::declval<S&>().write_eof(
672 : std::span<const_buffer const>{}));
673 : using WE = decltype(std::declval<S&>().write_eof());
674 :
675 : if(sizeof(WS) > s) s = sizeof(WS);
676 : if(sizeof(W) > s) s = sizeof(W);
677 : if(sizeof(WEB) > s) s = sizeof(WEB);
678 : if(sizeof(WE) > s) s = sizeof(WE);
679 : }
680 : return s;
681 : }
682 :
683 : static consteval std::size_t
684 : compute_max_align() noexcept
685 : {
686 : std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
687 : ? alignof(CommitAwaitable)
688 : : alignof(CommitEofAwaitable);
689 : if constexpr (WriteSink<S>)
690 : {
691 : using WS = decltype(std::declval<S&>().write_some(
692 : std::span<const_buffer const>{}));
693 : using W = decltype(std::declval<S&>().write(
694 : std::span<const_buffer const>{}));
695 : using WEB = decltype(std::declval<S&>().write_eof(
696 : std::span<const_buffer const>{}));
697 : using WE = decltype(std::declval<S&>().write_eof());
698 :
699 : if(alignof(WS) > a) a = alignof(WS);
700 : if(alignof(W) > a) a = alignof(W);
701 : if(alignof(WEB) > a) a = alignof(WEB);
702 : if(alignof(WE) > a) a = alignof(WE);
703 : }
704 : return a;
705 : }
706 :
707 : static consteval vtable
708 : make_vtable() noexcept
709 : {
710 : vtable v{};
711 : v.destroy = &do_destroy_impl;
712 : v.do_prepare = &do_prepare_impl;
713 : v.awaitable_size = compute_max_size();
714 : v.awaitable_align = compute_max_align();
715 : v.construct_commit_awaitable = &construct_commit_awaitable_impl;
716 : v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
717 : v.construct_write_some_awaitable = nullptr;
718 : v.construct_write_awaitable = nullptr;
719 : v.construct_write_eof_buffers_awaitable = nullptr;
720 : v.construct_write_eof_awaitable = nullptr;
721 :
722 : if constexpr (WriteSink<S>)
723 : {
724 : v.construct_write_some_awaitable =
725 : &construct_write_some_awaitable_impl;
726 : v.construct_write_awaitable =
727 : &construct_write_awaitable_impl;
728 : v.construct_write_eof_buffers_awaitable =
729 : &construct_write_eof_buffers_awaitable_impl;
730 : v.construct_write_eof_awaitable =
731 : &construct_write_eof_awaitable_impl;
732 : }
733 : return v;
734 : }
735 :
736 : static constexpr vtable value = make_vtable();
737 : };
738 :
739 : //----------------------------------------------------------
740 :
741 : inline
742 215 : any_buffer_sink::~any_buffer_sink()
743 : {
744 215 : if(storage_)
745 : {
746 17 : vt_->destroy(sink_);
747 17 : ::operator delete(storage_);
748 : }
749 215 : if(cached_awaitable_)
750 208 : ::operator delete(cached_awaitable_);
751 215 : }
752 :
753 : inline any_buffer_sink&
754 5 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
755 : {
756 5 : if(this != &other)
757 : {
758 4 : if(storage_)
759 : {
760 1 : vt_->destroy(sink_);
761 1 : ::operator delete(storage_);
762 : }
763 4 : if(cached_awaitable_)
764 2 : ::operator delete(cached_awaitable_);
765 4 : sink_ = std::exchange(other.sink_, nullptr);
766 4 : vt_ = std::exchange(other.vt_, nullptr);
767 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
768 4 : storage_ = std::exchange(other.storage_, nullptr);
769 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
770 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
771 : }
772 5 : return *this;
773 : }
774 :
775 : template<BufferSink S>
776 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
777 18 : any_buffer_sink::any_buffer_sink(S s)
778 18 : : vt_(&vtable_for_impl<S>::value)
779 : {
780 : struct guard {
781 : any_buffer_sink* self;
782 : bool committed = false;
783 18 : ~guard() {
784 18 : if(!committed && self->storage_) {
785 0 : self->vt_->destroy(self->sink_);
786 0 : ::operator delete(self->storage_);
787 0 : self->storage_ = nullptr;
788 0 : self->sink_ = nullptr;
789 : }
790 18 : }
791 18 : } g{this};
792 :
793 18 : storage_ = ::operator new(sizeof(S));
794 18 : sink_ = ::new(storage_) S(std::move(s));
795 :
796 18 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
797 :
798 18 : g.committed = true;
799 18 : }
800 :
801 : template<BufferSink S>
802 192 : any_buffer_sink::any_buffer_sink(S* s)
803 192 : : sink_(s)
804 192 : , vt_(&vtable_for_impl<S>::value)
805 : {
806 192 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
807 192 : }
808 :
809 : //----------------------------------------------------------
810 :
811 : inline std::span<mutable_buffer>
812 126 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
813 : {
814 126 : return vt_->do_prepare(sink_, dest);
815 : }
816 :
817 : inline auto
818 96 : any_buffer_sink::commit(std::size_t n)
819 : {
820 : struct awaitable
821 : {
822 : any_buffer_sink* self_;
823 : std::size_t n_;
824 :
825 : bool
826 96 : await_ready()
827 : {
828 192 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
829 96 : self_->sink_,
830 96 : self_->cached_awaitable_,
831 : n_);
832 96 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
833 : }
834 :
835 : coro
836 0 : await_suspend(coro h, executor_ref ex, std::stop_token token)
837 : {
838 0 : return self_->active_ops_->await_suspend(
839 0 : self_->cached_awaitable_, h, ex, token);
840 : }
841 :
842 : io_result<>
843 96 : await_resume()
844 : {
845 : struct guard {
846 : any_buffer_sink* self;
847 96 : ~guard() {
848 96 : self->active_ops_->destroy(self->cached_awaitable_);
849 96 : self->active_ops_ = nullptr;
850 96 : }
851 96 : } g{self_};
852 96 : return self_->active_ops_->await_resume(
853 166 : self_->cached_awaitable_);
854 96 : }
855 : };
856 96 : return awaitable{this, n};
857 : }
858 :
859 : inline auto
860 54 : any_buffer_sink::commit_eof(std::size_t n)
861 : {
862 : struct awaitable
863 : {
864 : any_buffer_sink* self_;
865 : std::size_t n_;
866 :
867 : bool
868 54 : await_ready()
869 : {
870 108 : self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
871 54 : self_->sink_,
872 54 : self_->cached_awaitable_,
873 : n_);
874 54 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
875 : }
876 :
877 : coro
878 0 : await_suspend(coro h, executor_ref ex, std::stop_token token)
879 : {
880 0 : return self_->active_ops_->await_suspend(
881 0 : self_->cached_awaitable_, h, ex, token);
882 : }
883 :
884 : io_result<>
885 54 : await_resume()
886 : {
887 : struct guard {
888 : any_buffer_sink* self;
889 54 : ~guard() {
890 54 : self->active_ops_->destroy(self->cached_awaitable_);
891 54 : self->active_ops_ = nullptr;
892 54 : }
893 54 : } g{self_};
894 54 : return self_->active_ops_->await_resume(
895 92 : self_->cached_awaitable_);
896 54 : }
897 : };
898 54 : return awaitable{this, n};
899 : }
900 :
901 : //----------------------------------------------------------
902 : // Private helpers for native WriteSink forwarding
903 :
904 : inline auto
905 6 : any_buffer_sink::write_some_(
906 : std::span<const_buffer const> buffers)
907 : {
908 : struct awaitable
909 : {
910 : any_buffer_sink* self_;
911 : std::span<const_buffer const> buffers_;
912 :
913 : bool
914 6 : await_ready() const noexcept
915 : {
916 6 : return false;
917 : }
918 :
919 : coro
920 6 : await_suspend(coro h, executor_ref ex, std::stop_token token)
921 : {
922 12 : self_->active_write_ops_ =
923 12 : self_->vt_->construct_write_some_awaitable(
924 6 : self_->sink_,
925 6 : self_->cached_awaitable_,
926 : buffers_);
927 :
928 6 : if(self_->active_write_ops_->await_ready(
929 6 : self_->cached_awaitable_))
930 6 : return h;
931 :
932 0 : return self_->active_write_ops_->await_suspend(
933 0 : self_->cached_awaitable_, h, ex, token);
934 : }
935 :
936 : io_result<std::size_t>
937 6 : await_resume()
938 : {
939 : struct guard {
940 : any_buffer_sink* self;
941 6 : ~guard() {
942 6 : self->active_write_ops_->destroy(
943 6 : self->cached_awaitable_);
944 6 : self->active_write_ops_ = nullptr;
945 6 : }
946 6 : } g{self_};
947 6 : return self_->active_write_ops_->await_resume(
948 10 : self_->cached_awaitable_);
949 6 : }
950 : };
951 6 : return awaitable{this, buffers};
952 : }
953 :
954 : inline auto
955 14 : any_buffer_sink::write_(
956 : std::span<const_buffer const> buffers)
957 : {
958 : struct awaitable
959 : {
960 : any_buffer_sink* self_;
961 : std::span<const_buffer const> buffers_;
962 :
963 : bool
964 14 : await_ready() const noexcept
965 : {
966 14 : return false;
967 : }
968 :
969 : coro
970 14 : await_suspend(coro h, executor_ref ex, std::stop_token token)
971 : {
972 28 : self_->active_write_ops_ =
973 28 : self_->vt_->construct_write_awaitable(
974 14 : self_->sink_,
975 14 : self_->cached_awaitable_,
976 : buffers_);
977 :
978 14 : if(self_->active_write_ops_->await_ready(
979 14 : self_->cached_awaitable_))
980 14 : return h;
981 :
982 0 : return self_->active_write_ops_->await_suspend(
983 0 : self_->cached_awaitable_, h, ex, token);
984 : }
985 :
986 : io_result<std::size_t>
987 14 : await_resume()
988 : {
989 : struct guard {
990 : any_buffer_sink* self;
991 14 : ~guard() {
992 14 : self->active_write_ops_->destroy(
993 14 : self->cached_awaitable_);
994 14 : self->active_write_ops_ = nullptr;
995 14 : }
996 14 : } g{self_};
997 14 : return self_->active_write_ops_->await_resume(
998 24 : self_->cached_awaitable_);
999 14 : }
1000 : };
1001 14 : return awaitable{this, buffers};
1002 : }
1003 :
1004 : inline auto
1005 12 : any_buffer_sink::write_eof_buffers_(
1006 : std::span<const_buffer const> buffers)
1007 : {
1008 : struct awaitable
1009 : {
1010 : any_buffer_sink* self_;
1011 : std::span<const_buffer const> buffers_;
1012 :
1013 : bool
1014 12 : await_ready() const noexcept
1015 : {
1016 12 : return false;
1017 : }
1018 :
1019 : coro
1020 12 : await_suspend(coro h, executor_ref ex, std::stop_token token)
1021 : {
1022 24 : self_->active_write_ops_ =
1023 24 : self_->vt_->construct_write_eof_buffers_awaitable(
1024 12 : self_->sink_,
1025 12 : self_->cached_awaitable_,
1026 : buffers_);
1027 :
1028 12 : if(self_->active_write_ops_->await_ready(
1029 12 : self_->cached_awaitable_))
1030 12 : return h;
1031 :
1032 0 : return self_->active_write_ops_->await_suspend(
1033 0 : self_->cached_awaitable_, h, ex, token);
1034 : }
1035 :
1036 : io_result<std::size_t>
1037 12 : await_resume()
1038 : {
1039 : struct guard {
1040 : any_buffer_sink* self;
1041 12 : ~guard() {
1042 12 : self->active_write_ops_->destroy(
1043 12 : self->cached_awaitable_);
1044 12 : self->active_write_ops_ = nullptr;
1045 12 : }
1046 12 : } g{self_};
1047 12 : return self_->active_write_ops_->await_resume(
1048 20 : self_->cached_awaitable_);
1049 12 : }
1050 : };
1051 12 : return awaitable{this, buffers};
1052 : }
1053 :
1054 : //----------------------------------------------------------
1055 : // Public WriteSink methods
1056 :
1057 : template<ConstBufferSequence CB>
1058 : io_task<std::size_t>
1059 22 : any_buffer_sink::write_some(CB buffers)
1060 : {
1061 : buffer_param<CB> bp(buffers);
1062 : auto src = bp.data();
1063 : if(src.empty())
1064 : co_return {{}, 0};
1065 :
1066 : // Native WriteSink path
1067 : if(vt_->construct_write_some_awaitable)
1068 : co_return co_await write_some_(src);
1069 :
1070 : // Synthesized path: prepare + buffer_copy + commit
1071 : mutable_buffer arr[detail::max_iovec_];
1072 : auto dst_bufs = prepare(arr);
1073 : if(dst_bufs.empty())
1074 : {
1075 : auto [ec] = co_await commit(0);
1076 : if(ec)
1077 : co_return {ec, 0};
1078 : dst_bufs = prepare(arr);
1079 : if(dst_bufs.empty())
1080 : co_return {{}, 0};
1081 : }
1082 :
1083 : auto n = buffer_copy(dst_bufs, src);
1084 : auto [ec] = co_await commit(n);
1085 : if(ec)
1086 : co_return {ec, 0};
1087 : co_return {{}, n};
1088 44 : }
1089 :
1090 : template<ConstBufferSequence CB>
1091 : io_task<std::size_t>
1092 38 : any_buffer_sink::write(CB buffers)
1093 : {
1094 : buffer_param<CB> bp(buffers);
1095 : std::size_t total = 0;
1096 :
1097 : // Native WriteSink path
1098 : if(vt_->construct_write_awaitable)
1099 : {
1100 : for(;;)
1101 : {
1102 : auto bufs = bp.data();
1103 : if(bufs.empty())
1104 : break;
1105 :
1106 : auto [ec, n] = co_await write_(bufs);
1107 : total += n;
1108 : if(ec)
1109 : co_return {ec, total};
1110 : bp.consume(n);
1111 : }
1112 : co_return {{}, total};
1113 : }
1114 :
1115 : // Synthesized path: prepare + buffer_copy + commit
1116 : for(;;)
1117 : {
1118 : auto src = bp.data();
1119 : if(src.empty())
1120 : break;
1121 :
1122 : mutable_buffer arr[detail::max_iovec_];
1123 : auto dst_bufs = prepare(arr);
1124 : if(dst_bufs.empty())
1125 : {
1126 : auto [ec] = co_await commit(0);
1127 : if(ec)
1128 : co_return {ec, total};
1129 : continue;
1130 : }
1131 :
1132 : auto n = buffer_copy(dst_bufs, src);
1133 : auto [ec] = co_await commit(n);
1134 : if(ec)
1135 : co_return {ec, total};
1136 : bp.consume(n);
1137 : total += n;
1138 : }
1139 :
1140 : co_return {{}, total};
1141 76 : }
1142 :
1143 : inline auto
1144 32 : any_buffer_sink::write_eof()
1145 : {
1146 : struct awaitable
1147 : {
1148 : any_buffer_sink* self_;
1149 :
1150 : bool
1151 32 : await_ready()
1152 : {
1153 32 : if(self_->vt_->construct_write_eof_awaitable)
1154 : {
1155 : // Native WriteSink: forward to underlying write_eof()
1156 32 : self_->active_ops_ =
1157 16 : self_->vt_->construct_write_eof_awaitable(
1158 16 : self_->sink_,
1159 16 : self_->cached_awaitable_);
1160 : }
1161 : else
1162 : {
1163 : // Synthesized: commit_eof(0)
1164 32 : self_->active_ops_ =
1165 16 : self_->vt_->construct_commit_eof_awaitable(
1166 16 : self_->sink_,
1167 16 : self_->cached_awaitable_,
1168 : 0);
1169 : }
1170 64 : return self_->active_ops_->await_ready(
1171 32 : self_->cached_awaitable_);
1172 : }
1173 :
1174 : coro
1175 0 : await_suspend(coro h, executor_ref ex, std::stop_token token)
1176 : {
1177 0 : return self_->active_ops_->await_suspend(
1178 0 : self_->cached_awaitable_, h, ex, token);
1179 : }
1180 :
1181 : io_result<>
1182 32 : await_resume()
1183 : {
1184 : struct guard {
1185 : any_buffer_sink* self;
1186 32 : ~guard() {
1187 32 : self->active_ops_->destroy(self->cached_awaitable_);
1188 32 : self->active_ops_ = nullptr;
1189 32 : }
1190 32 : } g{self_};
1191 32 : return self_->active_ops_->await_resume(
1192 54 : self_->cached_awaitable_);
1193 32 : }
1194 : };
1195 32 : return awaitable{this};
1196 : }
1197 :
1198 : template<ConstBufferSequence CB>
1199 : io_task<std::size_t>
1200 40 : any_buffer_sink::write_eof(CB buffers)
1201 : {
1202 : // Native WriteSink path
1203 : if(vt_->construct_write_eof_buffers_awaitable)
1204 : {
1205 : const_buffer_param<CB> bp(buffers);
1206 : std::size_t total = 0;
1207 :
1208 : for(;;)
1209 : {
1210 : auto bufs = bp.data();
1211 : if(bufs.empty())
1212 : {
1213 : auto [ec] = co_await write_eof();
1214 : co_return {ec, total};
1215 : }
1216 :
1217 : if(!bp.more())
1218 : {
1219 : // Last window: send atomically with EOF
1220 : auto [ec, n] = co_await write_eof_buffers_(bufs);
1221 : total += n;
1222 : co_return {ec, total};
1223 : }
1224 :
1225 : auto [ec, n] = co_await write_(bufs);
1226 : total += n;
1227 : if(ec)
1228 : co_return {ec, total};
1229 : bp.consume(n);
1230 : }
1231 : }
1232 :
1233 : // Synthesized path: prepare + buffer_copy + commit + commit_eof
1234 : buffer_param<CB> bp(buffers);
1235 : std::size_t total = 0;
1236 :
1237 : for(;;)
1238 : {
1239 : auto src = bp.data();
1240 : if(src.empty())
1241 : break;
1242 :
1243 : mutable_buffer arr[detail::max_iovec_];
1244 : auto dst_bufs = prepare(arr);
1245 : if(dst_bufs.empty())
1246 : {
1247 : auto [ec] = co_await commit(0);
1248 : if(ec)
1249 : co_return {ec, total};
1250 : continue;
1251 : }
1252 :
1253 : auto n = buffer_copy(dst_bufs, src);
1254 : auto [ec] = co_await commit(n);
1255 : if(ec)
1256 : co_return {ec, total};
1257 : bp.consume(n);
1258 : total += n;
1259 : }
1260 :
1261 : auto [ec] = co_await commit_eof(0);
1262 : if(ec)
1263 : co_return {ec, total};
1264 :
1265 : co_return {{}, total};
1266 80 : }
1267 :
1268 : //----------------------------------------------------------
1269 :
1270 : static_assert(BufferSink<any_buffer_sink>);
1271 : static_assert(WriteSink<any_buffer_sink>);
1272 :
1273 : } // namespace capy
1274 : } // namespace boost
1275 :
1276 : #endif
|