Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_TEST_READ_STREAM_HPP
11 : #define BOOST_CAPY_TEST_READ_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_copy.hpp>
16 : #include <boost/capy/buffers/make_buffer.hpp>
17 : #include <boost/capy/cond.hpp>
18 : #include <boost/capy/coro.hpp>
19 : #include <boost/capy/ex/executor_ref.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/test/fuse.hpp>
22 :
23 : #include <stop_token>
24 : #include <string>
25 : #include <string_view>
26 :
27 : namespace boost {
28 : namespace capy {
29 : namespace test {
30 :
31 : /** A mock stream for testing read operations.
32 :
33 : Use this to verify code that performs reads without needing
34 : real I/O. Call @ref provide to supply data, then @ref read_some
35 : to consume it. The associated @ref fuse enables error injection
36 : at controlled points. An optional `max_read_size` constructor
37 : parameter limits bytes per read to simulate chunked delivery.
38 :
39 : This class satisfies the @ref ReadStream concept.
40 :
41 : @par Thread Safety
42 : Not thread-safe.
43 :
44 : @par Example
45 : @code
46 : fuse f;
47 : read_stream rs( f );
48 : rs.provide( "Hello, " );
49 : rs.provide( "World!" );
50 :
51 : auto r = f.armed( [&]( fuse& ) -> task<void> {
52 : char buf[32];
53 : auto [ec, n] = co_await rs.read_some(
54 : mutable_buffer( buf, sizeof( buf ) ) );
55 : if( ec )
56 : co_return;
57 : // buf contains "Hello, World!"
58 : } );
59 : @endcode
60 :
61 : @see fuse, ReadStream
62 : */
63 : class read_stream
64 : {
65 : fuse f_;
66 : std::string data_;
67 : std::size_t pos_ = 0;
68 : std::size_t max_read_size_;
69 :
70 : public:
71 : /** Construct a read stream.
72 :
73 : @param f The fuse used to inject errors during reads.
74 :
75 : @param max_read_size Maximum bytes returned per read.
76 : Use to simulate chunked network delivery.
77 : */
78 1328 : explicit read_stream(
79 : fuse f = {},
80 : std::size_t max_read_size = std::size_t(-1)) noexcept
81 1328 : : f_(std::move(f))
82 1328 : , max_read_size_(max_read_size)
83 : {
84 1328 : }
85 :
86 : /** Append data to be returned by subsequent reads.
87 :
88 : Multiple calls accumulate data that @ref read_some returns.
89 :
90 : @param sv The data to append.
91 : */
92 : void
93 1300 : provide(std::string_view sv)
94 : {
95 1300 : data_.append(sv);
96 1300 : }
97 :
98 : /// Clear all data and reset the read position.
99 : void
100 : clear() noexcept
101 : {
102 : data_.clear();
103 : pos_ = 0;
104 : }
105 :
106 : /// Return the number of bytes available for reading.
107 : std::size_t
108 4 : available() const noexcept
109 : {
110 4 : return data_.size() - pos_;
111 : }
112 :
113 : /** Asynchronously read data from the stream.
114 :
115 : Transfers up to `buffer_size( buffers )` bytes from the internal
116 : buffer to the provided mutable buffer sequence. If no data remains,
117 : returns `error::eof`. Before every read, the attached @ref fuse is
118 : consulted to possibly inject an error for testing fault scenarios.
119 : The returned `std::size_t` is the number of bytes transferred.
120 :
121 : @par Effects
122 : On success, advances the internal read position by the number of
123 : bytes copied. If an error is injected by the fuse, the read position
124 : remains unchanged.
125 :
126 : @par Exception Safety
127 : No-throw guarantee.
128 :
129 : @param buffers The mutable buffer sequence to receive data.
130 :
131 : @return An awaitable yielding `(error_code,std::size_t)`.
132 :
133 : @see fuse
134 : */
135 : template<MutableBufferSequence MB>
136 : auto
137 1589 : read_some(MB buffers)
138 : {
139 : struct awaitable
140 : {
141 : read_stream* self_;
142 : MB buffers_;
143 :
144 1589 : bool await_ready() const noexcept { return true; }
145 :
146 : // This method is required to satisfy Capy's IoAwaitable concept,
147 : // but is never called because await_ready() returns true.
148 : //
149 : // Capy uses a two-layer awaitable system: the promise's
150 : // await_transform wraps awaitables in a transform_awaiter whose
151 : // standard await_suspend(coroutine_handle) calls this custom
152 : // 3-argument overload, passing the executor and stop_token from
153 : // the coroutine's context. For synchronous test awaitables like
154 : // this one, the coroutine never suspends, so this is not invoked.
155 : // The signature exists to allow the same awaitable type to work
156 : // with both synchronous (test) and asynchronous (real I/O) code.
157 0 : void await_suspend(
158 : coro,
159 : executor_ref,
160 : std::stop_token) const noexcept
161 : {
162 0 : }
163 :
164 : io_result<std::size_t>
165 1589 : await_resume()
166 : {
167 : // Empty buffer is a no-op regardless of
168 : // stream state or fuse.
169 1589 : if(buffer_empty(buffers_))
170 3 : return {{}, 0};
171 :
172 1586 : auto ec = self_->f_.maybe_fail();
173 1386 : if(ec)
174 200 : return {ec, 0};
175 :
176 1186 : if(self_->pos_ >= self_->data_.size())
177 85 : return {error::eof, 0};
178 :
179 1101 : std::size_t avail = self_->data_.size() - self_->pos_;
180 1101 : if(avail > self_->max_read_size_)
181 236 : avail = self_->max_read_size_;
182 1101 : auto src = make_buffer(self_->data_.data() + self_->pos_, avail);
183 1101 : std::size_t const n = buffer_copy(buffers_, src);
184 1101 : self_->pos_ += n;
185 1101 : return {{}, n};
186 : }
187 : };
188 1589 : return awaitable{this, buffers};
189 : }
190 : };
191 :
192 : } // test
193 : } // capy
194 : } // boost
195 :
196 : #endif
|