1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
10  
#ifndef BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
11  
#define BOOST_CAPY_TEST_WRITE_SINK_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/coro.hpp>
17  
#include <boost/capy/coro.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/test/fuse.hpp>
21  
#include <boost/capy/test/fuse.hpp>
22  

22  

23  
#include <algorithm>
23  
#include <algorithm>
24  
#include <stop_token>
24  
#include <stop_token>
25  
#include <string>
25  
#include <string>
26  
#include <string_view>
26  
#include <string_view>
27  

27  

28  
namespace boost {
28  
namespace boost {
29  
namespace capy {
29  
namespace capy {
30  
namespace test {
30  
namespace test {
31  

31  

32  
/** A mock sink for testing write operations.
32  
/** A mock sink for testing write operations.
33  

33  

34  
    Use this to verify code that performs complete writes without needing
34  
    Use this to verify code that performs complete writes without needing
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
35  
    real I/O. Call @ref write to write data, then @ref data to retrieve
36  
    what was written. The associated @ref fuse enables error injection
36  
    what was written. The associated @ref fuse enables error injection
37  
    at controlled points.
37  
    at controlled points.
38  

38  

39 -
    Unlike @ref write_stream which provides partial writes via `write_some`,
39 +
    This class satisfies the @ref WriteSink concept by providing partial
40 -
    this class satisfies the @ref WriteSink concept by providing complete
40 +
    writes via `write_some` (satisfying @ref WriteStream), complete
41 -
    writes and EOF signaling.
41 +
    writes via `write`, and EOF signaling via `write_eof`.
42  

42  

43  
    @par Thread Safety
43  
    @par Thread Safety
44  
    Not thread-safe.
44  
    Not thread-safe.
45  

45  

46  
    @par Example
46  
    @par Example
47  
    @code
47  
    @code
48  
    fuse f;
48  
    fuse f;
49  
    write_sink ws( f );
49  
    write_sink ws( f );
50  

50  

51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
        auto [ec, n] = co_await ws.write(
52  
        auto [ec, n] = co_await ws.write(
53  
            const_buffer( "Hello", 5 ) );
53  
            const_buffer( "Hello", 5 ) );
54  
        if( ec )
54  
        if( ec )
55  
            co_return;
55  
            co_return;
56  
        auto [ec2] = co_await ws.write_eof();
56  
        auto [ec2] = co_await ws.write_eof();
57  
        if( ec2 )
57  
        if( ec2 )
58  
            co_return;
58  
            co_return;
59  
        // ws.data() returns "Hello"
59  
        // ws.data() returns "Hello"
60  
    } );
60  
    } );
61  
    @endcode
61  
    @endcode
62  

62  

63  
    @see fuse, WriteSink
63  
    @see fuse, WriteSink
64  
*/
64  
*/
65  
class write_sink
65  
class write_sink
66  
{
66  
{
67 -
    fuse* f_;
67 +
    fuse f_;
68  
    std::string data_;
68  
    std::string data_;
69  
    std::string expect_;
69  
    std::string expect_;
70  
    std::size_t max_write_size_;
70  
    std::size_t max_write_size_;
71  
    bool eof_called_ = false;
71  
    bool eof_called_ = false;
72  

72  

73  
    std::error_code
73  
    std::error_code
74  
    consume_match_() noexcept
74  
    consume_match_() noexcept
75  
    {
75  
    {
76  
        if(data_.empty() || expect_.empty())
76  
        if(data_.empty() || expect_.empty())
77  
            return {};
77  
            return {};
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
78  
        std::size_t const n = (std::min)(data_.size(), expect_.size());
79  
        if(std::string_view(data_.data(), n) !=
79  
        if(std::string_view(data_.data(), n) !=
80  
            std::string_view(expect_.data(), n))
80  
            std::string_view(expect_.data(), n))
81  
            return error::test_failure;
81  
            return error::test_failure;
82  
        data_.erase(0, n);
82  
        data_.erase(0, n);
83  
        expect_.erase(0, n);
83  
        expect_.erase(0, n);
84  
        return {};
84  
        return {};
85  
    }
85  
    }
86  

86  

87  
public:
87  
public:
88  
    /** Construct a write sink.
88  
    /** Construct a write sink.
89  

89  

90  
        @param f The fuse used to inject errors during writes.
90  
        @param f The fuse used to inject errors during writes.
91  

91  

92  
        @param max_write_size Maximum bytes transferred per write.
92  
        @param max_write_size Maximum bytes transferred per write.
93  
        Use to simulate chunked delivery.
93  
        Use to simulate chunked delivery.
94  
    */
94  
    */
95  
    explicit write_sink(
95  
    explicit write_sink(
96 -
        fuse& f,
96 +
        fuse f = {},
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
97  
        std::size_t max_write_size = std::size_t(-1)) noexcept
98 -
        : f_(&f)
98 +
        : f_(std::move(f))
99  
        , max_write_size_(max_write_size)
99  
        , max_write_size_(max_write_size)
100  
    {
100  
    {
101  
    }
101  
    }
102  

102  

103  
    /// Return the written data as a string view.
103  
    /// Return the written data as a string view.
104  
    std::string_view
104  
    std::string_view
105  
    data() const noexcept
105  
    data() const noexcept
106  
    {
106  
    {
107  
        return data_;
107  
        return data_;
108  
    }
108  
    }
109  

109  

110  
    /** Set the expected data for subsequent writes.
110  
    /** Set the expected data for subsequent writes.
111  

111  

112  
        Stores the expected data and immediately tries to match
112  
        Stores the expected data and immediately tries to match
113  
        against any data already written. Matched data is consumed
113  
        against any data already written. Matched data is consumed
114  
        from both buffers.
114  
        from both buffers.
115  

115  

116  
        @param sv The expected data.
116  
        @param sv The expected data.
117  

117  

118  
        @return An error if existing data does not match.
118  
        @return An error if existing data does not match.
119  
    */
119  
    */
120  
    std::error_code
120  
    std::error_code
121  
    expect(std::string_view sv)
121  
    expect(std::string_view sv)
122  
    {
122  
    {
123  
        expect_.assign(sv);
123  
        expect_.assign(sv);
124  
        return consume_match_();
124  
        return consume_match_();
125  
    }
125  
    }
126  

126  

127  
    /// Return the number of bytes written.
127  
    /// Return the number of bytes written.
128  
    std::size_t
128  
    std::size_t
129  
    size() const noexcept
129  
    size() const noexcept
130  
    {
130  
    {
131  
        return data_.size();
131  
        return data_.size();
132  
    }
132  
    }
133  

133  

134  
    /// Return whether write_eof has been called.
134  
    /// Return whether write_eof has been called.
135  
    bool
135  
    bool
136  
    eof_called() const noexcept
136  
    eof_called() const noexcept
137  
    {
137  
    {
138  
        return eof_called_;
138  
        return eof_called_;
139  
    }
139  
    }
140  

140  

141  
    /// Clear all data and reset state.
141  
    /// Clear all data and reset state.
142  
    void
142  
    void
143  
    clear() noexcept
143  
    clear() noexcept
144  
    {
144  
    {
145  
        data_.clear();
145  
        data_.clear();
146  
        expect_.clear();
146  
        expect_.clear();
147  
        eof_called_ = false;
147  
        eof_called_ = false;
148  
    }
148  
    }
149  

149  

150 -
    /** Asynchronously write data to the sink.
150 +
    /** Asynchronously write some data to the sink.
151  

151  

152 -
        Transfers all bytes from the provided const buffer sequence to
152 +
        Transfers up to `buffer_size( buffers )` bytes from the provided
153 -
        the internal buffer. Before every write, the attached @ref fuse
153 +
        const buffer sequence to the internal buffer. Before every write,
154 -
        is consulted to possibly inject an error for testing fault
154 +
        the attached @ref fuse is consulted to possibly inject an error.
155 -
        scenarios. The returned `std::size_t` is the number of bytes
 
156 -
        transferred.
 
157  

155  

158 -
        @par Effects
156 +
        @param buffers The const buffer sequence containing data to write.
159 -
        On success, appends the written bytes to the internal buffer.
 
160 -
        If an error is injected by the fuse, the internal buffer remains
 
161 -
        unchanged.
 
162  

157  

163 -
        @par Exception Safety
158 +
        @return An awaitable yielding `(error_code,std::size_t)`.
164 -
        No-throw guarantee.
159 +

 
160 +
        @see fuse
 
161 +
    */
 
162 +
    template<ConstBufferSequence CB>
 
163 +
    auto
 
164 +
    write_some(CB buffers)
 
165 +
    {
 
166 +
        struct awaitable
 
167 +
        {
 
168 +
            write_sink* self_;
 
169 +
            CB buffers_;
 
170 +

 
171 +
            bool await_ready() const noexcept { return true; }
 
172 +

 
173 +
            void await_suspend(
 
174 +
                coro,
 
175 +
                executor_ref,
 
176 +
                std::stop_token) const noexcept
 
177 +
            {
 
178 +
            }
 
179 +

 
180 +
            io_result<std::size_t>
 
181 +
            await_resume()
 
182 +
            {
 
183 +
                if(buffer_empty(buffers_))
 
184 +
                    return {{}, 0};
 
185 +

 
186 +
                auto ec = self_->f_.maybe_fail();
 
187 +
                if(ec)
 
188 +
                    return {ec, 0};
 
189 +

 
190 +
                std::size_t n = buffer_size(buffers_);
 
191 +
                n = (std::min)(n, self_->max_write_size_);
 
192 +

 
193 +
                std::size_t const old_size = self_->data_.size();
 
194 +
                self_->data_.resize(old_size + n);
 
195 +
                buffer_copy(make_buffer(
 
196 +
                    self_->data_.data() + old_size, n), buffers_, n);
 
197 +

 
198 +
                ec = self_->consume_match_();
 
199 +
                if(ec)
 
200 +
                {
 
201 +
                    self_->data_.resize(old_size);
 
202 +
                    return {ec, 0};
 
203 +
                }
 
204 +

 
205 +
                return {{}, n};
 
206 +
            }
 
207 +
        };
 
208 +
        return awaitable{this, buffers};
 
209 +
    }
 
210 +

 
211 +
    /** Asynchronously write data to the sink.
 
212 +

 
213 +
        Transfers all bytes from the provided const buffer sequence
 
214 +
        to the internal buffer. Unlike @ref write_some, this ignores
 
215 +
        `max_write_size` and writes all available data, matching the
 
216 +
        @ref WriteSink semantic contract.
165  

217  

166  
        @param buffers The const buffer sequence containing data to write.
218  
        @param buffers The const buffer sequence containing data to write.
167  

219  

168  
        @return An awaitable yielding `(error_code,std::size_t)`.
220  
        @return An awaitable yielding `(error_code,std::size_t)`.
169  

221  

170  
        @see fuse
222  
        @see fuse
171  
    */
223  
    */
172  
    template<ConstBufferSequence CB>
224  
    template<ConstBufferSequence CB>
173  
    auto
225  
    auto
174  
    write(CB buffers)
226  
    write(CB buffers)
175  
    {
227  
    {
176  
        struct awaitable
228  
        struct awaitable
177  
        {
229  
        {
178  
            write_sink* self_;
230  
            write_sink* self_;
179  
            CB buffers_;
231  
            CB buffers_;
180  

232  

181  
            bool await_ready() const noexcept { return true; }
233  
            bool await_ready() const noexcept { return true; }
182 -
            // This method is required to satisfy Capy's IoAwaitable concept,
 
183 -
            // but is never called because await_ready() returns true.
 
184 -
            //
 
185 -
            // Capy uses a two-layer awaitable system: the promise's
 
186 -
            // await_transform wraps awaitables in a transform_awaiter whose
 
187 -
            // standard await_suspend(coroutine_handle) calls this custom
 
188 -
            // 3-argument overload, passing the executor and stop_token from
 
189 -
            // the coroutine's context. For synchronous test awaitables like
 
190 -
            // this one, the coroutine never suspends, so this is not invoked.
 
191 -
            // The signature exists to allow the same awaitable type to work
 
192 -
            // with both synchronous (test) and asynchronous (real I/O) code.
 
193  

234  

194  
            void await_suspend(
235  
            void await_suspend(
195  
                coro,
236  
                coro,
196  
                executor_ref,
237  
                executor_ref,
197  
                std::stop_token) const noexcept
238  
                std::stop_token) const noexcept
198  
            {
239  
            {
199  
            }
240  
            }
200  

241  

201  
            io_result<std::size_t>
242  
            io_result<std::size_t>
202  
            await_resume()
243  
            await_resume()
203  
            {
244  
            {
204 -
                auto ec = self_->f_->maybe_fail();
245 +
                auto ec = self_->f_.maybe_fail();
205  
                if(ec)
246  
                if(ec)
206  
                    return {ec, 0};
247  
                    return {ec, 0};
207  

248  

208 -
                n = (std::min)(n, self_->max_write_size_);
 
209  
                std::size_t n = buffer_size(buffers_);
249  
                std::size_t n = buffer_size(buffers_);
210  
                if(n == 0)
250  
                if(n == 0)
211  
                    return {{}, 0};
251  
                    return {{}, 0};
212  

252  

213  
                std::size_t const old_size = self_->data_.size();
253  
                std::size_t const old_size = self_->data_.size();
214  
                self_->data_.resize(old_size + n);
254  
                self_->data_.resize(old_size + n);
215  
                buffer_copy(make_buffer(
255  
                buffer_copy(make_buffer(
216 -
                    self_->data_.data() + old_size, n), buffers_, n);
256 +
                    self_->data_.data() + old_size, n), buffers_);
217  

257  

218  
                ec = self_->consume_match_();
258  
                ec = self_->consume_match_();
219  
                if(ec)
259  
                if(ec)
220  
                    return {ec, n};
260  
                    return {ec, n};
221  

261  

222  
                return {{}, n};
262  
                return {{}, n};
223  
            }
263  
            }
224  
        };
264  
        };
225  
        return awaitable{this, buffers};
265  
        return awaitable{this, buffers};
226  
    }
266  
    }
227  

267  

228 -
    /** Asynchronously write data to the sink with optional EOF.
268 +
    /** Atomically write data and signal end-of-stream.
229  

269  

230  
        Transfers all bytes from the provided const buffer sequence to
270  
        Transfers all bytes from the provided const buffer sequence to
231 -
        the internal buffer, optionally signaling end-of-stream. Before
271 +
        the internal buffer and signals end-of-stream. Before the write,
232 -
        every write, the attached @ref fuse is consulted to possibly
272 +
        the attached @ref fuse is consulted to possibly inject an error
233 -
        inject an error for testing fault scenarios. The returned
273 +
        for testing fault scenarios.
234 -
        `std::size_t` is the number of bytes transferred.
 
235  

274  

236  
        @par Effects
275  
        @par Effects
237 -
        On success, appends the written bytes to the internal buffer.
276 +
        On success, appends the written bytes to the internal buffer
238 -
        If `eof` is `true`, marks the sink as finalized.
277 +
        and marks the sink as finalized.
239  
        If an error is injected by the fuse, the internal buffer remains
278  
        If an error is injected by the fuse, the internal buffer remains
240  
        unchanged.
279  
        unchanged.
241  

280  

242  
        @par Exception Safety
281  
        @par Exception Safety
243  
        No-throw guarantee.
282  
        No-throw guarantee.
244  

283  

245 -
        @param eof If true, signals end-of-stream after writing.
 
246  
        @param buffers The const buffer sequence containing data to write.
284  
        @param buffers The const buffer sequence containing data to write.
247  

285  

248  
        @return An awaitable yielding `(error_code,std::size_t)`.
286  
        @return An awaitable yielding `(error_code,std::size_t)`.
249  

287  

250  
        @see fuse
288  
        @see fuse
251  
    */
289  
    */
252  
    template<ConstBufferSequence CB>
290  
    template<ConstBufferSequence CB>
253  
    auto
291  
    auto
254 -
    write(CB buffers, bool eof)
292 +
    write_eof(CB buffers)
255  
    {
293  
    {
256  
        struct awaitable
294  
        struct awaitable
257  
        {
295  
        {
258  
            write_sink* self_;
296  
            write_sink* self_;
259 -
            bool eof_;
 
260  
            CB buffers_;
297  
            CB buffers_;
261  

298  

262  
            bool await_ready() const noexcept { return true; }
299  
            bool await_ready() const noexcept { return true; }
263 -
            // This method is required to satisfy Capy's IoAwaitable concept,
 
264 -
            // but is never called because await_ready() returns true.
 
265 -
            // See the comment on write(CB buffers) for a detailed explanation.
 
266  

300  

267  
            void await_suspend(
301  
            void await_suspend(
268  
                coro,
302  
                coro,
269  
                executor_ref,
303  
                executor_ref,
270  
                std::stop_token) const noexcept
304  
                std::stop_token) const noexcept
271  
            {
305  
            {
272  
            }
306  
            }
273  

307  

274  
            io_result<std::size_t>
308  
            io_result<std::size_t>
275  
            await_resume()
309  
            await_resume()
276  
            {
310  
            {
277 -
                auto ec = self_->f_->maybe_fail();
311 +
                auto ec = self_->f_.maybe_fail();
278  
                if(ec)
312  
                if(ec)
279  
                    return {ec, 0};
313  
                    return {ec, 0};
280  

314  

281 -
                n = (std::min)(n, self_->max_write_size_);
 
282  
                std::size_t n = buffer_size(buffers_);
315  
                std::size_t n = buffer_size(buffers_);
283  
                if(n > 0)
316  
                if(n > 0)
284  
                {
317  
                {
285  
                    std::size_t const old_size = self_->data_.size();
318  
                    std::size_t const old_size = self_->data_.size();
286  
                    self_->data_.resize(old_size + n);
319  
                    self_->data_.resize(old_size + n);
287  
                    buffer_copy(make_buffer(
320  
                    buffer_copy(make_buffer(
288 -
                        self_->data_.data() + old_size, n), buffers_, n);
321 +
                        self_->data_.data() + old_size, n), buffers_);
289  

322  

290  
                    ec = self_->consume_match_();
323  
                    ec = self_->consume_match_();
291  
                    if(ec)
324  
                    if(ec)
292  
                        return {ec, n};
325  
                        return {ec, n};
293  
                }
326  
                }
294  

327  

295 -
                if(eof_)
328 +
                self_->eof_called_ = true;
296 -
                    self_->eof_called_ = true;
 
297  

329  

298  
                return {{}, n};
330  
                return {{}, n};
299  
            }
331  
            }
300  
        };
332  
        };
301 -
        return awaitable{this, buffers, eof};
333 +
        return awaitable{this, buffers};
302  
    }
334  
    }
303  

335  

304  
    /** Signal end-of-stream.
336  
    /** Signal end-of-stream.
305  

337  

306  
        Marks the sink as finalized, indicating no more data will be
338  
        Marks the sink as finalized, indicating no more data will be
307  
        written. Before signaling, the attached @ref fuse is consulted
339  
        written. Before signaling, the attached @ref fuse is consulted
308  
        to possibly inject an error for testing fault scenarios.
340  
        to possibly inject an error for testing fault scenarios.
309  

341  

310  
        @par Effects
342  
        @par Effects
311  
        On success, marks the sink as finalized.
343  
        On success, marks the sink as finalized.
312  
        If an error is injected by the fuse, the state remains unchanged.
344  
        If an error is injected by the fuse, the state remains unchanged.
313  

345  

314  
        @par Exception Safety
346  
        @par Exception Safety
315  
        No-throw guarantee.
347  
        No-throw guarantee.
316  

348  

317  
        @return An awaitable yielding `(error_code)`.
349  
        @return An awaitable yielding `(error_code)`.
318  

350  

319  
        @see fuse
351  
        @see fuse
320  
    */
352  
    */
321  
    auto
353  
    auto
322  
    write_eof()
354  
    write_eof()
323  
    {
355  
    {
324  
        struct awaitable
356  
        struct awaitable
325  
        {
357  
        {
326  
            write_sink* self_;
358  
            write_sink* self_;
327  

359  

328  
            bool await_ready() const noexcept { return true; }
360  
            bool await_ready() const noexcept { return true; }
329  

361  

330  
            // This method is required to satisfy Capy's IoAwaitable concept,
362  
            // This method is required to satisfy Capy's IoAwaitable concept,
331  
            // but is never called because await_ready() returns true.
363  
            // but is never called because await_ready() returns true.
332  
            // See the comment on write(CB buffers) for a detailed explanation.
364  
            // See the comment on write(CB buffers) for a detailed explanation.
333  
            void await_suspend(
365  
            void await_suspend(
334  
                coro,
366  
                coro,
335  
                executor_ref,
367  
                executor_ref,
336  
                std::stop_token) const noexcept
368  
                std::stop_token) const noexcept
337  
            {
369  
            {
338  
            }
370  
            }
339  

371  

340  
            io_result<>
372  
            io_result<>
341  
            await_resume()
373  
            await_resume()
342  
            {
374  
            {
343 -
                auto ec = self_->f_->maybe_fail();
375 +
                auto ec = self_->f_.maybe_fail();
344  
                if(ec)
376  
                if(ec)
345  
                    return {ec};
377  
                    return {ec};
346  

378  

347  
                self_->eof_called_ = true;
379  
                self_->eof_called_ = true;
348  
                return {};
380  
                return {};
349  
            }
381  
            }
350  
        };
382  
        };
351  
        return awaitable{this};
383  
        return awaitable{this};
352  
    }
384  
    }
353  
};
385  
};
354  

386  

355  
} // test
387  
} // test
356  
} // capy
388  
} // capy
357  
} // boost
389  
} // boost
358  

390  

359  
#endif
391  
#endif