1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
10  
#ifndef BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
11  
#define BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/make_buffer.hpp>
15  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/coro.hpp>
16  
#include <boost/capy/coro.hpp>
 
17 +
#include <boost/capy/error.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/io_result.hpp>
19  
#include <boost/capy/test/fuse.hpp>
20  
#include <boost/capy/test/fuse.hpp>
20  

21  

21  
#include <algorithm>
22  
#include <algorithm>
22  
#include <stop_token>
23  
#include <stop_token>
23  
#include <string>
24  
#include <string>
24  
#include <string_view>
25  
#include <string_view>
25  

26  

26  
namespace boost {
27  
namespace boost {
27  
namespace capy {
28  
namespace capy {
28  
namespace test {
29  
namespace test {
29  

30  

30  
/** A mock buffer source for testing push operations.
31  
/** A mock buffer source for testing push operations.
31  

32  

32  
    Use this to verify code that transfers data from a buffer source to
33  
    Use this to verify code that transfers data from a buffer source to
33  
    a sink without needing real I/O. Call @ref provide to supply data,
34  
    a sink without needing real I/O. Call @ref provide to supply data,
34  
    then @ref pull to retrieve buffer descriptors. The associated
35  
    then @ref pull to retrieve buffer descriptors. The associated
35  
    @ref fuse enables error injection at controlled points.
36  
    @ref fuse enables error injection at controlled points.
36  

37  

37  
    This class satisfies the @ref BufferSource concept by providing
38  
    This class satisfies the @ref BufferSource concept by providing
38  
    a pull interface that fills an array of buffer descriptors and
39  
    a pull interface that fills an array of buffer descriptors and
39  
    a consume interface to indicate bytes used.
40  
    a consume interface to indicate bytes used.
40  

41  

41  
    @par Thread Safety
42  
    @par Thread Safety
42  
    Not thread-safe.
43  
    Not thread-safe.
43  

44  

44  
    @par Example
45  
    @par Example
45  
    @code
46  
    @code
46  
    fuse f;
47  
    fuse f;
47  
    buffer_source bs( f );
48  
    buffer_source bs( f );
48  
    bs.provide( "Hello, " );
49  
    bs.provide( "Hello, " );
49  
    bs.provide( "World!" );
50  
    bs.provide( "World!" );
50  

51  

51  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
    auto r = f.armed( [&]( fuse& ) -> task<void> {
52  
        const_buffer arr[16];
53  
        const_buffer arr[16];
53 -
        auto [ec, count] = co_await bs.pull( arr, 16 );
54 +
        auto [ec, bufs] = co_await bs.pull( arr );
54  
        if( ec )
55  
        if( ec )
55  
            co_return;
56  
            co_return;
56 -
        // arr[0..count) contains buffer descriptors
57 +
        // bufs contains buffer descriptors
57 -
        std::size_t n = buffer_size( std::span( arr, count ) );
58 +
        std::size_t n = buffer_size( bufs );
58  
        bs.consume( n );
59  
        bs.consume( n );
59  
    } );
60  
    } );
60  
    @endcode
61  
    @endcode
61  

62  

62  
    @see fuse, BufferSource
63  
    @see fuse, BufferSource
63  
*/
64  
*/
64  
class buffer_source
65  
class buffer_source
65  
{
66  
{
66 -
    fuse* f_;
67 +
    fuse f_;
67  
    std::string data_;
68  
    std::string data_;
68  
    std::size_t pos_ = 0;
69  
    std::size_t pos_ = 0;
69  
    std::size_t max_pull_size_;
70  
    std::size_t max_pull_size_;
70  

71  

71  
public:
72  
public:
72  
    /** Construct a buffer source.
73  
    /** Construct a buffer source.
73  

74  

74  
        @param f The fuse used to inject errors during pulls.
75  
        @param f The fuse used to inject errors during pulls.
75  

76  

76  
        @param max_pull_size Maximum bytes returned per pull.
77  
        @param max_pull_size Maximum bytes returned per pull.
77  
        Use to simulate chunked delivery.
78  
        Use to simulate chunked delivery.
78  
    */
79  
    */
79  
    explicit buffer_source(
80  
    explicit buffer_source(
80 -
        fuse& f,
81 +
        fuse f = {},
81  
        std::size_t max_pull_size = std::size_t(-1)) noexcept
82  
        std::size_t max_pull_size = std::size_t(-1)) noexcept
82 -
        : f_(&f)
83 +
        : f_(std::move(f))
83  
        , max_pull_size_(max_pull_size)
84  
        , max_pull_size_(max_pull_size)
84  
    {
85  
    {
85  
    }
86  
    }
86  

87  

87  
    /** Append data to be returned by subsequent pulls.
88  
    /** Append data to be returned by subsequent pulls.
88  

89  

89  
        Multiple calls accumulate data that @ref pull returns.
90  
        Multiple calls accumulate data that @ref pull returns.
90  

91  

91  
        @param sv The data to append.
92  
        @param sv The data to append.
92  
    */
93  
    */
93  
    void
94  
    void
94  
    provide(std::string_view sv)
95  
    provide(std::string_view sv)
95  
    {
96  
    {
96  
        data_.append(sv);
97  
        data_.append(sv);
97  
    }
98  
    }
98  

99  

99  
    /// Clear all data and reset the read position.
100  
    /// Clear all data and reset the read position.
100  
    void
101  
    void
101  
    clear() noexcept
102  
    clear() noexcept
102  
    {
103  
    {
103  
        data_.clear();
104  
        data_.clear();
104  
        pos_ = 0;
105  
        pos_ = 0;
105  
    }
106  
    }
106  

107  

107  
    /// Return the number of bytes available for pulling.
108  
    /// Return the number of bytes available for pulling.
108  
    std::size_t
109  
    std::size_t
109  
    available() const noexcept
110  
    available() const noexcept
110  
    {
111  
    {
111  
        return data_.size() - pos_;
112  
        return data_.size() - pos_;
112  
    }
113  
    }
113  

114  

114  
    /** Consume bytes from the source.
115  
    /** Consume bytes from the source.
115  

116  

116  
        Advances the internal read position by the specified number
117  
        Advances the internal read position by the specified number
117  
        of bytes. The next call to @ref pull returns data starting
118  
        of bytes. The next call to @ref pull returns data starting
118  
        after the consumed bytes.
119  
        after the consumed bytes.
119  

120  

120  
        @param n The number of bytes to consume. Must not exceed the
121  
        @param n The number of bytes to consume. Must not exceed the
121  
        total size of buffers returned by the previous @ref pull.
122  
        total size of buffers returned by the previous @ref pull.
122  
    */
123  
    */
123  
    void
124  
    void
124  
    consume(std::size_t n) noexcept
125  
    consume(std::size_t n) noexcept
125  
    {
126  
    {
126  
        pos_ += n;
127  
        pos_ += n;
127  
    }
128  
    }
128  

129  

129  
    /** Pull buffer data from the source.
130  
    /** Pull buffer data from the source.
130  

131  

131  
        Fills the provided span with buffer descriptors pointing to
132  
        Fills the provided span with buffer descriptors pointing to
132  
        internal data starting from the current unconsumed position.
133  
        internal data starting from the current unconsumed position.
133  
        Returns a span of filled buffers. When no data remains,
134  
        Returns a span of filled buffers. When no data remains,
134  
        returns an empty span to signal completion.
135  
        returns an empty span to signal completion.
135  

136  

136  
        Calling pull multiple times without intervening @ref consume
137  
        Calling pull multiple times without intervening @ref consume
137  
        returns the same data. Use consume to advance past processed
138  
        returns the same data. Use consume to advance past processed
138  
        bytes.
139  
        bytes.
139  

140  

140  
        @param dest Span of const_buffer to fill.
141  
        @param dest Span of const_buffer to fill.
141  

142  

142  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
143  
        @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
143  

144  

144  
        @see consume, fuse
145  
        @see consume, fuse
145  
    */
146  
    */
146  
    auto
147  
    auto
147  
    pull(std::span<const_buffer> dest)
148  
    pull(std::span<const_buffer> dest)
148  
    {
149  
    {
149  
        struct awaitable
150  
        struct awaitable
150  
        {
151  
        {
151  
            buffer_source* self_;
152  
            buffer_source* self_;
152  
            std::span<const_buffer> dest_;
153  
            std::span<const_buffer> dest_;
153  

154  

154  
            bool await_ready() const noexcept { return true; }
155  
            bool await_ready() const noexcept { return true; }
155  

156  

156  
            // This method is required to satisfy Capy's IoAwaitable concept,
157  
            // This method is required to satisfy Capy's IoAwaitable concept,
157  
            // but is never called because await_ready() returns true.
158  
            // but is never called because await_ready() returns true.
158  
            //
159  
            //
159  
            // Capy uses a two-layer awaitable system: the promise's
160  
            // Capy uses a two-layer awaitable system: the promise's
160  
            // await_transform wraps awaitables in a transform_awaiter whose
161  
            // await_transform wraps awaitables in a transform_awaiter whose
161  
            // standard await_suspend(coroutine_handle) calls this custom
162  
            // standard await_suspend(coroutine_handle) calls this custom
162  
            // 3-argument overload, passing the executor and stop_token from
163  
            // 3-argument overload, passing the executor and stop_token from
163  
            // the coroutine's context. For synchronous test awaitables like
164  
            // the coroutine's context. For synchronous test awaitables like
164  
            // this one, the coroutine never suspends, so this is not invoked.
165  
            // this one, the coroutine never suspends, so this is not invoked.
165  
            // The signature exists to allow the same awaitable type to work
166  
            // The signature exists to allow the same awaitable type to work
166  
            // with both synchronous (test) and asynchronous (real I/O) code.
167  
            // with both synchronous (test) and asynchronous (real I/O) code.
167  
            void await_suspend(
168  
            void await_suspend(
168  
                coro,
169  
                coro,
169  
                executor_ref,
170  
                executor_ref,
170  
                std::stop_token) const noexcept
171  
                std::stop_token) const noexcept
171  
            {
172  
            {
172  
            }
173  
            }
173  

174  

174  
            io_result<std::span<const_buffer>>
175  
            io_result<std::span<const_buffer>>
175  
            await_resume()
176  
            await_resume()
176  
            {
177  
            {
177 -
                auto ec = self_->f_->maybe_fail();
178 +
                auto ec = self_->f_.maybe_fail();
178  
                if(ec)
179  
                if(ec)
179  
                    return {ec, {}};
180  
                    return {ec, {}};
180  

181  

181  
                if(self_->pos_ >= self_->data_.size())
182  
                if(self_->pos_ >= self_->data_.size())
182 -
                    return {{}, {}}; // Source exhausted
183 +
                    return {error::eof, {}};
183  

184  

184  
                std::size_t avail = self_->data_.size() - self_->pos_;
185  
                std::size_t avail = self_->data_.size() - self_->pos_;
185  
                std::size_t to_return = (std::min)(avail, self_->max_pull_size_);
186  
                std::size_t to_return = (std::min)(avail, self_->max_pull_size_);
186  

187  

187  
                if(dest_.empty())
188  
                if(dest_.empty())
188  
                    return {{}, {}};
189  
                    return {{}, {}};
189  

190  

190  
                // Fill a single buffer descriptor
191  
                // Fill a single buffer descriptor
191  
                dest_[0] = make_buffer(
192  
                dest_[0] = make_buffer(
192  
                    self_->data_.data() + self_->pos_,
193  
                    self_->data_.data() + self_->pos_,
193  
                    to_return);
194  
                    to_return);
194  

195  

195  
                return {{}, dest_.first(1)};
196  
                return {{}, dest_.first(1)};
196  
            }
197  
            }
197  
        };
198  
        };
198  
        return awaitable{this, dest};
199  
        return awaitable{this, dest};
199  
    }
200  
    }
200  
};
201  
};
201  

202  

202  
} // test
203  
} // test
203  
} // capy
204  
} // capy
204  
} // boost
205  
} // boost
205  

206  

206  
#endif
207  
#endif