libs/capy/include/boost/capy/io/any_write_stream.hpp

94.1% Lines (95/101) 85.7% Functions (30/35) 70.4% Branches (19/27)
libs/capy/include/boost/capy/io/any_write_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any WriteStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref WriteStream concept, enabling runtime polymorphism for
39 write operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Immediate Completion
55 Operations complete immediately without suspending when the
56 buffer sequence is empty, or when the underlying stream's
57 awaitable reports readiness via `await_ready`.
58
59 @par Thread Safety
60 Not thread-safe. Concurrent operations on the same wrapper
61 are undefined behavior.
62
63 @par Example
64 @code
65 // Owning - takes ownership of the stream
66 any_write_stream stream(socket{ioc});
67
68 // Reference - wraps without ownership
69 socket sock(ioc);
70 any_write_stream stream(&sock);
71
72 const_buffer buf(data, size);
73 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
74 @endcode
75
76 @see any_read_stream, any_stream, WriteStream
77 */
78 class any_write_stream
79 {
80 struct vtable;
81
82 template<WriteStream S>
83 struct vtable_for_impl;
84
85 // ordered for cache line coherence
86 void* stream_ = nullptr;
87 vtable const* vt_ = nullptr;
88 void* cached_awaitable_ = nullptr;
89 void* storage_ = nullptr;
90 bool awaitable_active_ = false;
91
92 public:
93 /** Destructor.
94
95 Destroys the owned stream (if any) and releases the cached
96 awaitable storage.
97 */
98 ~any_write_stream();
99
100 /** Default constructor.
101
102 Constructs an empty wrapper. Operations on a default-constructed
103 wrapper result in undefined behavior.
104 */
105 1 any_write_stream() = default;
106
107 /** Non-copyable.
108
109 The awaitable cache is per-instance and cannot be shared.
110 */
111 any_write_stream(any_write_stream const&) = delete;
112 any_write_stream& operator=(any_write_stream const&) = delete;
113
114 /** Move constructor.
115
116 Transfers ownership of the wrapped stream (if owned) and
117 cached awaitable storage from `other`. After the move, `other` is
118 in a default-constructed state.
119
120 @param other The wrapper to move from.
121 */
122 2 any_write_stream(any_write_stream&& other) noexcept
123 2 : stream_(std::exchange(other.stream_, nullptr))
124 2 , vt_(std::exchange(other.vt_, nullptr))
125 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 2 , storage_(std::exchange(other.storage_, nullptr))
127 2 , awaitable_active_(std::exchange(other.awaitable_active_, false))
128 {
129 2 }
130
131 /** Move assignment operator.
132
133 Destroys any owned stream and releases existing resources,
134 then transfers ownership from `other`.
135
136 @param other The wrapper to move from.
137 @return Reference to this wrapper.
138 */
139 any_write_stream&
140 operator=(any_write_stream&& other) noexcept;
141
142 /** Construct by taking ownership of a WriteStream.
143
144 Allocates storage and moves the stream into this wrapper.
145 The wrapper owns the stream and will destroy it.
146
147 @param s The stream to take ownership of.
148 */
149 template<WriteStream S>
150 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
151 any_write_stream(S s);
152
153 /** Construct by wrapping a WriteStream without ownership.
154
155 Wraps the given stream by pointer. The stream must remain
156 valid for the lifetime of this wrapper.
157
158 @param s Pointer to the stream to wrap.
159 */
160 template<WriteStream S>
161 any_write_stream(S* s);
162
163 /** Check if the wrapper contains a valid stream.
164
165 @return `true` if wrapping a stream, `false` if default-constructed
166 or moved-from.
167 */
168 bool
169 21 has_value() const noexcept
170 {
171 21 return stream_ != nullptr;
172 }
173
174 /** Check if the wrapper contains a valid stream.
175
176 @return `true` if wrapping a stream, `false` if default-constructed
177 or moved-from.
178 */
179 explicit
180 3 operator bool() const noexcept
181 {
182 3 return has_value();
183 }
184
185 /** Initiate an asynchronous write operation.
186
187 Writes data from the provided buffer sequence. The operation
188 completes when at least one byte has been written, or an error
189 occurs.
190
191 @param buffers The buffer sequence containing data to write.
192 Passed by value to ensure the sequence lives in the
193 coroutine frame across suspension points.
194
195 @return An awaitable yielding `(error_code,std::size_t)`.
196
197 @par Immediate Completion
198 The operation completes immediately without suspending
199 the calling coroutine when:
200 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
201 @li The underlying stream's awaitable reports immediate
202 readiness via `await_ready`.
203
204 @note This is a partial operation and may not process the
205 entire buffer sequence. Use the composed @ref write algorithm
206 for guaranteed complete transfer.
207
208 @par Preconditions
209 The wrapper must contain a valid stream (`has_value() == true`).
210 */
211 template<ConstBufferSequence CB>
212 auto
213 write_some(CB buffers);
214
215 protected:
216 /** Rebind to a new stream after move.
217
218 Updates the internal pointer to reference a new stream object.
219 Used by owning wrappers after move assignment when the owned
220 object has moved to a new location.
221
222 @param new_stream The new stream to bind to. Must be the same
223 type as the original stream.
224
225 @note Terminates if called with a stream of different type
226 than the original.
227 */
228 template<WriteStream S>
229 void
230 rebind(S& new_stream) noexcept
231 {
232 if(vt_ != &vtable_for_impl<S>::value)
233 std::terminate();
234 stream_ = &new_stream;
235 }
236 };
237
238 //----------------------------------------------------------
239
240 struct any_write_stream::vtable
241 {
242 // ordered by call frequency for cache line coherence
243 void (*construct_awaitable)(
244 void* stream,
245 void* storage,
246 std::span<const_buffer const> buffers);
247 bool (*await_ready)(void*);
248 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
249 io_result<std::size_t> (*await_resume)(void*);
250 void (*destroy_awaitable)(void*) noexcept;
251 std::size_t awaitable_size;
252 std::size_t awaitable_align;
253 void (*destroy)(void*) noexcept;
254 };
255
256 template<WriteStream S>
257 struct any_write_stream::vtable_for_impl
258 {
259 using Awaitable = decltype(std::declval<S&>().write_some(
260 std::span<const_buffer const>{}));
261
262 static void
263 1 do_destroy_impl(void* stream) noexcept
264 {
265 1 static_cast<S*>(stream)->~S();
266 1 }
267
268 static void
269 75 construct_awaitable_impl(
270 void* stream,
271 void* storage,
272 std::span<const_buffer const> buffers)
273 {
274 75 auto& s = *static_cast<S*>(stream);
275 75 ::new(storage) Awaitable(s.write_some(buffers));
276 75 }
277
278 static constexpr vtable value = {
279 &construct_awaitable_impl,
280 75 +[](void* p) {
281 75 return static_cast<Awaitable*>(p)->await_ready();
282 },
283 2 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
284 4 return detail::call_await_suspend(
285 2 static_cast<Awaitable*>(p), h, ex, token);
286 },
287 73 +[](void* p) {
288 73 return static_cast<Awaitable*>(p)->await_resume();
289 },
290 77 +[](void* p) noexcept {
291 12 static_cast<Awaitable*>(p)->~Awaitable();
292 },
293 sizeof(Awaitable),
294 alignof(Awaitable),
295 &do_destroy_impl
296 };
297 };
298
299 //----------------------------------------------------------
300
301 inline
302 95 any_write_stream::~any_write_stream()
303 {
304
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 94 times.
95 if(storage_)
305 {
306 1 vt_->destroy(stream_);
307 1 ::operator delete(storage_);
308 }
309
2/2
✓ Branch 0 taken 85 times.
✓ Branch 1 taken 10 times.
95 if(cached_awaitable_)
310 {
311
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 84 times.
85 if(awaitable_active_)
312 1 vt_->destroy_awaitable(cached_awaitable_);
313 85 ::operator delete(cached_awaitable_);
314 }
315 95 }
316
317 inline any_write_stream&
318 5 any_write_stream::operator=(any_write_stream&& other) noexcept
319 {
320
1/2
✓ Branch 0 taken 5 times.
✗ Branch 1 not taken.
5 if(this != &other)
321 {
322
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5 times.
5 if(storage_)
323 {
324 vt_->destroy(stream_);
325 ::operator delete(storage_);
326 }
327
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3 times.
5 if(cached_awaitable_)
328 {
329
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if(awaitable_active_)
330 1 vt_->destroy_awaitable(cached_awaitable_);
331 2 ::operator delete(cached_awaitable_);
332 }
333 5 stream_ = std::exchange(other.stream_, nullptr);
334 5 vt_ = std::exchange(other.vt_, nullptr);
335 5 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
336 5 storage_ = std::exchange(other.storage_, nullptr);
337 5 awaitable_active_ = std::exchange(other.awaitable_active_, false);
338 }
339 5 return *this;
340 }
341
342 template<WriteStream S>
343 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
344 1 any_write_stream::any_write_stream(S s)
345 1 : vt_(&vtable_for_impl<S>::value)
346 {
347 struct guard {
348 any_write_stream* self;
349 bool committed = false;
350 1 ~guard() {
351
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
1 if(!committed && self->storage_) {
352 self->vt_->destroy(self->stream_);
353 ::operator delete(self->storage_);
354 self->storage_ = nullptr;
355 self->stream_ = nullptr;
356 }
357 1 }
358 1 } g{this};
359
360
1/1
✓ Branch 1 taken 1 time.
1 storage_ = ::operator new(sizeof(S));
361 1 stream_ = ::new(storage_) S(std::move(s));
362
363 // Preallocate the awaitable storage
364
1/1
✓ Branch 1 taken 1 time.
1 cached_awaitable_ = ::operator new(vt_->awaitable_size);
365
366 1 g.committed = true;
367 1 }
368
369 template<WriteStream S>
370 86 any_write_stream::any_write_stream(S* s)
371 86 : stream_(s)
372 86 , vt_(&vtable_for_impl<S>::value)
373 {
374 // Preallocate the awaitable storage
375 86 cached_awaitable_ = ::operator new(vt_->awaitable_size);
376 86 }
377
378 //----------------------------------------------------------
379
380 template<ConstBufferSequence CB>
381 auto
382 79 any_write_stream::write_some(CB buffers)
383 {
384 struct awaitable
385 {
386 any_write_stream* self_;
387 const_buffer_array<detail::max_iovec_> ba_;
388
389 79 awaitable(
390 any_write_stream* self,
391 CB const& buffers) noexcept
392 79 : self_(self)
393 79 , ba_(buffers)
394 {
395 79 }
396
397 bool
398 79 await_ready() const noexcept
399 {
400 79 return ba_.to_span().empty();
401 }
402
403 coro
404 75 await_suspend(coro h, executor_ref ex, std::stop_token token)
405 {
406 75 self_->vt_->construct_awaitable(
407 75 self_->stream_,
408
1/1
✓ Branch 1 taken 10 times.
75 self_->cached_awaitable_,
409 75 ba_.to_span());
410 75 self_->awaitable_active_ = true;
411
412
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
75 if(self_->vt_->await_ready(self_->cached_awaitable_))
413 73 return h;
414
415 4 return self_->vt_->await_suspend(
416
0/1
✗ Branch 1 not taken.
2 self_->cached_awaitable_, h, ex, token);
417 }
418
419 io_result<std::size_t>
420 77 await_resume()
421 {
422
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 10 times.
77 if(!self_->awaitable_active_)
423 4 return {{}, 0};
424 struct guard {
425 any_write_stream* self;
426 73 ~guard() {
427 73 self->vt_->destroy_awaitable(self->cached_awaitable_);
428 73 self->awaitable_active_ = false;
429 73 }
430 73 } g{self_};
431 73 return self_->vt_->await_resume(
432
1/1
✓ Branch 1 taken 7 times.
73 self_->cached_awaitable_);
433 73 }
434 };
435 79 return awaitable{this, buffers};
436 }
437
438 } // namespace capy
439 } // namespace boost
440
441 #endif
442