Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_WRITE_NOW_HPP
11 : #define BOOST_CAPY_IO_WRITE_NOW_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/consuming_buffers.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 :
23 : #include <cstddef>
24 : #include <exception>
25 : #include <new>
26 : #include <stop_token>
27 : #include <utility>
28 :
29 : #ifndef BOOST_CAPY_WRITE_NOW_WORKAROUND
30 : # if defined(__GNUC__) && !defined(__clang__)
31 : # define BOOST_CAPY_WRITE_NOW_WORKAROUND 1
32 : # else
33 : # define BOOST_CAPY_WRITE_NOW_WORKAROUND 0
34 : # endif
35 : #endif
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : /** Eagerly writes complete buffer sequences with frame caching.
41 :
42 : This class wraps a @ref WriteStream and provides an `operator()`
43 : that writes an entire buffer sequence, attempting to complete
44 : synchronously. If every `write_some` completes without suspending,
45 : the entire operation finishes in `await_ready` with no coroutine
46 : suspension.
47 :
48 : The class maintains a one-element coroutine frame cache. After
49 : the first call, subsequent calls reuse the cached frame memory,
50 : avoiding repeated allocation for the internal coroutine.
51 :
52 : @tparam Stream The stream type, must satisfy @ref WriteStream.
53 :
54 : @par Thread Safety
55 : Distinct objects: Safe.
56 : Shared objects: Unsafe.
57 :
58 : @par Preconditions
59 : Only one operation may be outstanding at a time. A new call to
60 : `operator()` must not be made until the previous operation has
61 : completed (i.e., the returned awaitable has been fully consumed).
62 :
63 : @par Example
64 :
65 : @code
66 : template< WriteStream Stream >
67 : task<> send_messages( Stream& stream )
68 : {
69 : write_now wn( stream );
70 : auto [ec1, n1] = co_await wn( make_buffer( "hello" ) );
71 : if( ec1 )
72 : detail::throw_system_error( ec1 );
73 : auto [ec2, n2] = co_await wn( make_buffer( "world" ) );
74 : if( ec2 )
75 : detail::throw_system_error( ec2 );
76 : }
77 : @endcode
78 :
79 : @see write, write_some, WriteStream, ConstBufferSequence
80 : */
81 : template<class Stream>
82 : requires WriteStream<Stream>
83 : class write_now
84 : {
85 : Stream& stream_;
86 : void* cached_frame_ = nullptr;
87 : std::size_t cached_size_ = 0;
88 :
89 : struct [[nodiscard]] BOOST_CAPY_CORO_AWAIT_ELIDABLE
90 : op_type
91 : {
92 : struct promise_type
93 : {
94 : io_result<std::size_t> result_;
95 : std::exception_ptr ep_;
96 : coro cont_{nullptr};
97 : executor_ref ex_;
98 : std::stop_token token_;
99 : bool done_ = false;
100 :
101 68 : op_type get_return_object()
102 : {
103 : return op_type{
104 : std::coroutine_handle<
105 68 : promise_type>::from_promise(*this)};
106 : }
107 :
108 68 : auto initial_suspend() noexcept
109 : {
110 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
111 68 : return std::suspend_always{};
112 : #else
113 : return std::suspend_never{};
114 : #endif
115 : }
116 :
117 68 : auto final_suspend() noexcept
118 : {
119 : struct awaiter
120 : {
121 : promise_type* p_;
122 :
123 68 : bool await_ready() const noexcept
124 : {
125 68 : return false;
126 : }
127 :
128 68 : coro await_suspend(coro) const noexcept
129 : {
130 68 : p_->done_ = true;
131 68 : if(!p_->cont_)
132 0 : return std::noop_coroutine();
133 68 : return p_->cont_;
134 : }
135 :
136 0 : void await_resume() const noexcept
137 : {
138 0 : }
139 : };
140 68 : return awaiter{this};
141 : }
142 :
143 46 : void return_value(
144 : io_result<std::size_t> r) noexcept
145 : {
146 46 : result_ = r;
147 46 : }
148 :
149 22 : void unhandled_exception()
150 : {
151 22 : ep_ = std::current_exception();
152 22 : }
153 :
154 : std::suspend_always yield_value(int) noexcept
155 : {
156 : return {};
157 : }
158 :
159 : template<class A>
160 84 : auto await_transform(A&& a)
161 : {
162 : using decayed = std::decay_t<A>;
163 : if constexpr (IoAwaitable<decayed>)
164 : {
165 : struct wrapper
166 : {
167 : decayed inner_;
168 : promise_type* p_;
169 :
170 84 : bool await_ready()
171 : {
172 84 : return inner_.await_ready();
173 : }
174 :
175 0 : coro await_suspend(coro h)
176 : {
177 0 : return detail::call_await_suspend(
178 : &inner_, h,
179 0 : p_->ex_, p_->token_);
180 : }
181 :
182 84 : decltype(auto) await_resume()
183 : {
184 84 : return inner_.await_resume();
185 : }
186 : };
187 : return wrapper{
188 84 : std::forward<A>(a), this};
189 : }
190 : else
191 : {
192 : return std::forward<A>(a);
193 : }
194 : }
195 :
196 : static void*
197 68 : operator new(
198 : std::size_t size,
199 : write_now& self,
200 : auto&)
201 : {
202 68 : if(self.cached_frame_ &&
203 4 : self.cached_size_ >= size)
204 4 : return self.cached_frame_;
205 64 : void* p = ::operator new(size);
206 64 : if(self.cached_frame_)
207 0 : ::operator delete(self.cached_frame_);
208 64 : self.cached_frame_ = p;
209 64 : self.cached_size_ = size;
210 64 : return p;
211 : }
212 :
213 : static void
214 68 : operator delete(void*, std::size_t) noexcept
215 : {
216 68 : }
217 : };
218 :
219 : std::coroutine_handle<promise_type> h_;
220 :
221 136 : ~op_type()
222 : {
223 136 : if(h_)
224 68 : h_.destroy();
225 136 : }
226 :
227 : op_type(op_type const&) = delete;
228 : op_type& operator=(op_type const&) = delete;
229 :
230 68 : op_type(op_type&& other) noexcept
231 68 : : h_(std::exchange(other.h_, nullptr))
232 : {
233 68 : }
234 :
235 : op_type& operator=(op_type&&) = delete;
236 :
237 68 : bool await_ready() const noexcept
238 : {
239 68 : return h_.promise().done_;
240 : }
241 :
242 68 : coro await_suspend(
243 : coro cont,
244 : executor_ref ex,
245 : std::stop_token token)
246 : {
247 68 : auto& p = h_.promise();
248 68 : p.cont_ = cont;
249 68 : p.ex_ = ex;
250 68 : p.token_ = token;
251 68 : return h_;
252 : }
253 :
254 68 : io_result<std::size_t> await_resume()
255 : {
256 68 : auto& p = h_.promise();
257 68 : if(p.ep_)
258 22 : std::rethrow_exception(p.ep_);
259 46 : return p.result_;
260 : }
261 :
262 : private:
263 68 : explicit op_type(
264 : std::coroutine_handle<promise_type> h)
265 68 : : h_(h)
266 : {
267 68 : }
268 : };
269 :
270 : public:
271 : /** Destructor. Frees the cached coroutine frame. */
272 64 : ~write_now()
273 : {
274 64 : if(cached_frame_)
275 64 : ::operator delete(cached_frame_);
276 64 : }
277 :
278 : /** Construct from a stream reference.
279 :
280 : @param s The stream to write to. Must outlive this object.
281 : */
282 : explicit
283 64 : write_now(Stream& s) noexcept
284 64 : : stream_(s)
285 : {
286 64 : }
287 :
288 : write_now(write_now const&) = delete;
289 : write_now& operator=(write_now const&) = delete;
290 :
291 : /** Eagerly write the entire buffer sequence.
292 :
293 : Writes data to the stream by calling `write_some` repeatedly
294 : until the entire buffer sequence is written or an error
295 : occurs. The operation attempts to complete synchronously:
296 : if every `write_some` completes without suspending, the
297 : entire operation finishes in `await_ready`.
298 :
299 : When the fast path cannot complete, the coroutine suspends
300 : and continues asynchronously. The internal coroutine frame
301 : is cached and reused across calls.
302 :
303 : @param buffers The buffer sequence to write. Passed by
304 : value to ensure the sequence lives in the coroutine
305 : frame across suspension points.
306 :
307 : @return An awaitable yielding `(error_code,std::size_t)`.
308 : On success, `n` equals `buffer_size(buffers)`. On
309 : error, `n` is the number of bytes written before the
310 : error. Compare error codes to conditions:
311 : @li `cond::canceled` - Operation was cancelled
312 : @li `std::errc::broken_pipe` - Peer closed connection
313 :
314 : @par Example
315 :
316 : @code
317 : write_now wn( stream );
318 : auto [ec, n] = co_await wn( make_buffer( body ) );
319 : if( ec )
320 : detail::throw_system_error( ec );
321 : @endcode
322 :
323 : @see write, write_some, WriteStream
324 : */
325 : // GCC falsely warns that the coroutine promise's
326 : // placement operator new(size_t, write_now&, auto&)
327 : // mismatches operator delete(void*, size_t). Per the
328 : // standard, coroutine deallocation lookup is separate.
329 : #if defined(__GNUC__) && !defined(__clang__)
330 : #pragma GCC diagnostic push
331 : #pragma GCC diagnostic ignored "-Wmismatched-new-delete"
332 : #endif
333 :
334 : #if BOOST_CAPY_WRITE_NOW_WORKAROUND
335 : template<ConstBufferSequence Buffers>
336 : op_type
337 68 : operator()(Buffers buffers)
338 : {
339 : std::size_t const total_size = buffer_size(buffers);
340 : std::size_t total_written = 0;
341 : consuming_buffers cb(buffers);
342 : while(total_written < total_size)
343 : {
344 : auto r =
345 : co_await stream_.write_some(cb);
346 : if(r.ec)
347 : co_return io_result<std::size_t>{
348 : r.ec, total_written};
349 : cb.consume(r.t1);
350 : total_written += r.t1;
351 : }
352 : co_return io_result<std::size_t>{
353 : {}, total_written};
354 136 : }
355 : #else
356 : template<ConstBufferSequence Buffers>
357 : op_type
358 : operator()(Buffers buffers)
359 : {
360 : std::size_t const total_size = buffer_size(buffers);
361 : std::size_t total_written = 0;
362 :
363 : // GCC ICE in expand_expr_real_1 (expr.cc:11376)
364 : // when consuming_buffers spans a co_yield, so
365 : // the GCC path uses a separate simple coroutine.
366 : consuming_buffers cb(buffers);
367 : while(total_written < total_size)
368 : {
369 : auto inner = stream_.write_some(cb);
370 : if(!inner.await_ready())
371 : break;
372 : auto r = inner.await_resume();
373 : if(r.ec)
374 : co_return io_result<std::size_t>{
375 : r.ec, total_written};
376 : cb.consume(r.t1);
377 : total_written += r.t1;
378 : }
379 :
380 : if(total_written >= total_size)
381 : co_return io_result<std::size_t>{
382 : {}, total_written};
383 :
384 : co_yield 0;
385 :
386 : while(total_written < total_size)
387 : {
388 : auto r =
389 : co_await stream_.write_some(cb);
390 : if(r.ec)
391 : co_return io_result<std::size_t>{
392 : r.ec, total_written};
393 : cb.consume(r.t1);
394 : total_written += r.t1;
395 : }
396 : co_return io_result<std::size_t>{
397 : {}, total_written};
398 : }
399 : #endif
400 :
401 : #if defined(__GNUC__) && !defined(__clang__)
402 : #pragma GCC diagnostic pop
403 : #endif
404 : };
405 :
406 : template<WriteStream S>
407 : write_now(S&) -> write_now<S>;
408 :
409 : } // namespace capy
410 : } // namespace boost
411 :
412 : #endif
|