libs/capy/include/boost/capy/io/write_now.hpp

94.0% Lines (63/67) 96.2% Functions (25/26) 73.3% Branches (11/15)
libs/capy/include/boost/capy/io/write_now.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 #define BOOST_CAPY_IO_WRITE_NOW_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/consuming_buffers.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <cstddef>
24 #include <exception>
25 #include <new>
26 #include <stop_token>
27 #include <utility>
28
29 #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
30 # if defined(__GNUC__) && !defined(__clang__)
31 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
32 # else
33 # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
34 # endif
35 #endif
36
37 namespace boost {
38 namespace capy {
39
40 /** Eagerly writes complete buffer sequences with frame caching.
41
42 This class wraps a @ref WriteStream and provides an `operator()`
43 that writes an entire buffer sequence, attempting to complete
44 synchronously. If every `write_some` completes without suspending,
45 the entire operation finishes in `await_ready` with no coroutine
46 suspension.
47
48 The class maintains a one-element coroutine frame cache. After
49 the first call, subsequent calls reuse the cached frame memory,
50 avoiding repeated allocation for the internal coroutine.
51
52 @tparam Stream The stream type, must satisfy @ref WriteStream.
53
54 @par Thread Safety
55 Distinct objects: Safe.
56 Shared objects: Unsafe.
57
58 @par Preconditions
59 Only one operation may be outstanding at a time. A new call to
60 `operator()` must not be made until the previous operation has
61 completed (i.e., the returned awaitable has been fully consumed).
62
63 @par Example
64
65 @code
66 template< WriteStream Stream >
67 task<> send_messages( Stream& stream )
68 {
69 write_now wn( stream );
70 auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
71 if( ec1 )
72 detail::throw_system_error( ec1 );
73 auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
74 if( ec2 )
75 detail::throw_system_error( ec2 );
76 }
77 @endcode
78
79 @see write, write_some, WriteStream, ConstBufferSequence
80 */
81 template<class Stream>
82 requires WriteStream<Stream>
83 class write_now
84 {
85 Stream& stream_;
86 void* cached_frame_ = nullptr;
87 std::size_t cached_size_ = 0;
88
89 struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
90 op_type
91 {
92 struct promise_type
93 {
94 io_result<std::size_t> result_;
95 std::exception_ptr ep_;
96 coro cont_{nullptr};
97 executor_ref ex_;
98 std::stop_token token_;
99 bool done_ = false;
100
101 68 op_type get_return_object()
102 {
103 return op_type{
104 std::coroutine_handle<
105 68 promise_type>::from_promise(*this)};
106 }
107
108 68 auto initial_suspend() noexcept
109 {
110 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 68 return std::suspend_always{};
112 #else
113 return std::suspend_never{};
114 #endif
115 }
116
117 68 auto final_suspend() noexcept
118 {
119 struct awaiter
120 {
121 promise_type* p_;
122
123 68 bool await_ready() const noexcept
124 {
125 68 return false;
126 }
127
128 68 coro await_suspend(coro) const noexcept
129 {
130 68 p_->done_ = true;
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 68 times.
68 if(!p_->cont_)
132 return std::noop_coroutine();
133 68 return p_->cont_;
134 }
135
136 void await_resume() const noexcept
137 {
138 }
139 };
140 68 return awaiter{this};
141 }
142
143 46 void return_value(
144 io_result<std::size_t> r) noexcept
145 {
146 46 result_ = r;
147 46 }
148
149 22 void unhandled_exception()
150 {
151 22 ep_ = std::current_exception();
152 22 }
153
154 std::suspend_always yield_value(int) noexcept
155 {
156 return {};
157 }
158
159 template<class A>
160 84 auto await_transform(A&& a)
161 {
162 using decayed = std::decay_t<A>;
163 if constexpr (IoAwaitable<decayed>)
164 {
165 struct wrapper
166 {
167 decayed inner_;
168 promise_type* p_;
169
170 bool await_ready()
171 {
172 return inner_.await_ready();
173 }
174
175 coro await_suspend(coro h)
176 {
177 return detail::call_await_suspend(
178 &inner_, h,
179 p_->ex_, p_->token_);
180 }
181
182 decltype(auto) await_resume()
183 {
184 return inner_.await_resume();
185 }
186 };
187 return wrapper{
188 84 std::forward<A>(a), this};
189 }
190 else
191 {
192 return std::forward<A>(a);
193 }
194 }
195
196 static void*
197 68 operator new(
198 std::size_t size,
199 write_now& self,
200 auto&)
201 {
202
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 64 times.
68 if(self.cached_frame_ &&
203
1/2
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
4 self.cached_size_ >= size)
204 4 return self.cached_frame_;
205 64 void* p = ::operator new(size);
206
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if(self.cached_frame_)
207 ::operator delete(self.cached_frame_);
208 64 self.cached_frame_ = p;
209 64 self.cached_size_ = size;
210 64 return p;
211 }
212
213 static void
214 68 operator delete(void*, std::size_t) noexcept
215 {
216 68 }
217 };
218
219 std::coroutine_handle<promise_type> h_;
220
221 136 ~op_type()
222 {
223
2/2
✓ Branch 1 taken 68 times.
✓ Branch 2 taken 68 times.
136 if(h_)
224 68 h_.destroy();
225 136 }
226
227 op_type(op_type const&) = delete;
228 op_type& operator=(op_type const&) = delete;
229
230 68 op_type(op_type&& other) noexcept
231 68 : h_(std::exchange(other.h_, nullptr))
232 {
233 68 }
234
235 op_type& operator=(op_type&&) = delete;
236
237 68 bool await_ready() const noexcept
238 {
239 68 return h_.promise().done_;
240 }
241
242 68 coro await_suspend(
243 coro cont,
244 executor_ref ex,
245 std::stop_token token)
246 {
247 68 auto& p = h_.promise();
248 68 p.cont_ = cont;
249 68 p.ex_ = ex;
250 68 p.token_ = token;
251 68 return h_;
252 }
253
254 68 io_result<std::size_t> await_resume()
255 {
256 68 auto& p = h_.promise();
257
2/2
✓ Branch 1 taken 22 times.
✓ Branch 2 taken 46 times.
68 if(p.ep_)
258 22 std::rethrow_exception(p.ep_);
259 46 return p.result_;
260 }
261
262 private:
263 68 explicit op_type(
264 std::coroutine_handle<promise_type> h)
265 68 : h_(h)
266 {
267 68 }
268 };
269
270 public:
271 /** Destructor. Frees the cached coroutine frame. */
272 64 ~write_now()
273 {
274
1/2
✓ Branch 0 taken 64 times.
✗ Branch 1 not taken.
64 if(cached_frame_)
275 64 ::operator delete(cached_frame_);
276 64 }
277
278 /** Construct from a stream reference.
279
280 @param s The stream to write to. Must outlive this object.
281 */
282 explicit
283 64 write_now(Stream& s) noexcept
284 64 : stream_(s)
285 {
286 64 }
287
288 write_now(write_now const&) = delete;
289 write_now& operator=(write_now const&) = delete;
290
291 /** Eagerly write the entire buffer sequence.
292
293 Writes data to the stream by calling `write_some` repeatedly
294 until the entire buffer sequence is written or an error
295 occurs. The operation attempts to complete synchronously:
296 if every `write_some` completes without suspending, the
297 entire operation finishes in `await_ready`.
298
299 When the fast path cannot complete, the coroutine suspends
300 and continues asynchronously. The internal coroutine frame
301 is cached and reused across calls.
302
303 @param buffers The buffer sequence to write. Passed by
304 value to ensure the sequence lives in the coroutine
305 frame across suspension points.
306
307 @return An awaitable yielding `(error_code,std::size_t)`.
308 On success, `n` equals `buffer_size(buffers)`. On
309 error, `n` is the number of bytes written before the
310 error. Compare error codes to conditions:
311 @li `cond::canceled` - Operation was cancelled
312 @li `std::errc::broken_pipe` - Peer closed connection
313
314 @par Example
315
316 @code
317 write_now wn( stream );
318 auto [ec, n] = co_await wn( make_buffer( body ) );
319 if( ec )
320 detail::throw_system_error( ec );
321 @endcode
322
323 @see write, write_some, WriteStream
324 */
325 // GCC falsely warns that the coroutine promise's
326 // placement operator new(size_t, write_now&, auto&)
327 // mismatches operator delete(void*, size_t). Per the
328 // standard, coroutine deallocation lookup is separate.
329 #if defined(__GNUC__) && !defined(__clang__)
330 #pragma GCC diagnostic push
331 #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
332 #endif
333
334 #if BOOST_CAPY_WRITE_NOW_WORKAROUND
335 template<ConstBufferSequence Buffers>
336 op_type
337
1/1
✓ Branch 1 taken 68 times.
68 operator()(Buffers buffers)
338 {
339 std::size_t const total_size = buffer_size(buffers);
340 std::size_t total_written = 0;
341 consuming_buffers cb(buffers);
342 while(total_written < total_size)
343 {
344 auto r =
345 co_await stream_.write_some(cb);
346 if(r.ec)
347 co_return io_result<std::size_t>{
348 r.ec, total_written};
349 cb.consume(r.t1);
350 total_written += r.t1;
351 }
352 co_return io_result<std::size_t>{
353 {}, total_written};
354 136 }
355 #else
356 template<ConstBufferSequence Buffers>
357 op_type
358 operator()(Buffers buffers)
359 {
360 std::size_t const total_size = buffer_size(buffers);
361 std::size_t total_written = 0;
362
363 // GCC ICE in expand_expr_real_1 (expr.cc:11376)
364 // when consuming_buffers spans a co_yield, so
365 // the GCC path uses a separate simple coroutine.
366 consuming_buffers cb(buffers);
367 while(total_written < total_size)
368 {
369 auto inner = stream_.write_some(cb);
370 if(!inner.await_ready())
371 break;
372 auto r = inner.await_resume();
373 if(r.ec)
374 co_return io_result<std::size_t>{
375 r.ec, total_written};
376 cb.consume(r.t1);
377 total_written += r.t1;
378 }
379
380 if(total_written >= total_size)
381 co_return io_result<std::size_t>{
382 {}, total_written};
383
384 co_yield 0;
385
386 while(total_written < total_size)
387 {
388 auto r =
389 co_await stream_.write_some(cb);
390 if(r.ec)
391 co_return io_result<std::size_t>{
392 r.ec, total_written};
393 cb.consume(r.t1);
394 total_written += r.t1;
395 }
396 co_return io_result<std::size_t>{
397 {}, total_written};
398 }
399 #endif
400
401 #if defined(__GNUC__) && !defined(__clang__)
402 #pragma GCC diagnostic pop
403 #endif
404 };
405
406 template<WriteStream S>
407 write_now(S&) -> write_now<S>;
408
409 } // namespace capy
410 } // namespace boost
411
412 #endif
413