1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
10  
#ifndef BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
11  
#define BOOST_CAPY_IO_PULL_FROM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/cond.hpp>
15  
#include <boost/capy/cond.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
16  
#include <boost/capy/concept/buffer_sink.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
17  
#include <boost/capy/concept/read_source.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
18  
#include <boost/capy/concept/read_stream.hpp>
19 -
#include <boost/capy/io_result.hpp>
19 +
#include <boost/capy/io_task.hpp>
20 -
#include <boost/capy/task.hpp>
 
21  

20  

22  
#include <cstddef>
21  
#include <cstddef>
23  
#include <span>
22  
#include <span>
24  

23  

25  
namespace boost {
24  
namespace boost {
26  
namespace capy {
25  
namespace capy {
27  

26  

28  
/** Transfer data from a ReadSource to a BufferSink.
27  
/** Transfer data from a ReadSource to a BufferSink.
29  

28  

30  
    This function reads data from the source directly into the sink's
29  
    This function reads data from the source directly into the sink's
31  
    internal buffers using the callee-owns-buffers model. The sink
30  
    internal buffers using the callee-owns-buffers model. The sink
32  
    provides writable buffers via `prepare()`, the source reads into
31  
    provides writable buffers via `prepare()`, the source reads into
33  
    them, and the sink commits the data. When the source signals EOF,
32  
    them, and the sink commits the data. When the source signals EOF,
34  
    `commit_eof()` is called on the sink to finalize the transfer.
33  
    `commit_eof()` is called on the sink to finalize the transfer.
35  

34  

36  
    @tparam Src The source type, must satisfy @ref ReadSource.
35  
    @tparam Src The source type, must satisfy @ref ReadSource.
37  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
36  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
38  

37  

39  
    @param source The source to read data from.
38  
    @param source The source to read data from.
40  
    @param sink The sink to write data to.
39  
    @param sink The sink to write data to.
41  

40  

42  
    @return A task that yields `(std::error_code, std::size_t)`.
41  
    @return A task that yields `(std::error_code, std::size_t)`.
43  
        On success, `ec` is default-constructed (no error) and `n` is
42  
        On success, `ec` is default-constructed (no error) and `n` is
44  
        the total number of bytes transferred. On error, `ec` contains
43  
        the total number of bytes transferred. On error, `ec` contains
45  
        the error code and `n` is the total number of bytes transferred
44  
        the error code and `n` is the total number of bytes transferred
46  
        before the error.
45  
        before the error.
47  

46  

48  
    @par Example
47  
    @par Example
49  
    @code
48  
    @code
50  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
49  
    task<void> transfer_body(ReadSource auto& source, BufferSink auto& sink)
51  
    {
50  
    {
52  
        auto [ec, n] = co_await pull_from(source, sink);
51  
        auto [ec, n] = co_await pull_from(source, sink);
53  
        if (ec)
52  
        if (ec)
54  
        {
53  
        {
55  
            // Handle error
54  
            // Handle error
56  
        }
55  
        }
57  
        // n bytes were transferred
56  
        // n bytes were transferred
58  
    }
57  
    }
59  
    @endcode
58  
    @endcode
60  

59  

61  
    @see ReadSource, BufferSink, push_to
60  
    @see ReadSource, BufferSink, push_to
62  
*/
61  
*/
63  
template<ReadSource Src, BufferSink Sink>
62  
template<ReadSource Src, BufferSink Sink>
64 -
task<io_result<std::size_t>>
63 +
io_task<std::size_t>
65  
pull_from(Src& source, Sink& sink)
64  
pull_from(Src& source, Sink& sink)
66  
{
65  
{
67  
    mutable_buffer dst_arr[detail::max_iovec_];
66  
    mutable_buffer dst_arr[detail::max_iovec_];
68  
    std::size_t total = 0;
67  
    std::size_t total = 0;
69  

68  

70  
    for(;;)
69  
    for(;;)
71  
    {
70  
    {
72  
        auto dst_bufs = sink.prepare(dst_arr);
71  
        auto dst_bufs = sink.prepare(dst_arr);
73  
        if(dst_bufs.empty())
72  
        if(dst_bufs.empty())
74  
        {
73  
        {
75  
            // No buffer space available; commit nothing to flush
74  
            // No buffer space available; commit nothing to flush
76  
            auto [flush_ec] = co_await sink.commit(0);
75  
            auto [flush_ec] = co_await sink.commit(0);
77  
            if(flush_ec)
76  
            if(flush_ec)
78  
                co_return {flush_ec, total};
77  
                co_return {flush_ec, total};
79  
            continue;
78  
            continue;
80  
        }
79  
        }
81  

80  

82  
        auto [ec, n] = co_await source.read(
81  
        auto [ec, n] = co_await source.read(
83  
            std::span<mutable_buffer const>(dst_bufs));
82  
            std::span<mutable_buffer const>(dst_bufs));
84  

83  

85  
        if(n > 0)
84  
        if(n > 0)
86  
        {
85  
        {
87  
            auto [commit_ec] = co_await sink.commit(n);
86  
            auto [commit_ec] = co_await sink.commit(n);
88  
            if(commit_ec)
87  
            if(commit_ec)
89  
                co_return {commit_ec, total};
88  
                co_return {commit_ec, total};
90  
            total += n;
89  
            total += n;
91  
        }
90  
        }
92  

91  

93  
        if(ec == cond::eof)
92  
        if(ec == cond::eof)
94  
        {
93  
        {
95 -
            auto [eof_ec] = co_await sink.commit_eof();
94 +
            auto [eof_ec] = co_await sink.commit_eof(0);
96  
            co_return {eof_ec, total};
95  
            co_return {eof_ec, total};
97  
        }
96  
        }
98  

97  

99  
        if(ec)
98  
        if(ec)
100  
            co_return {ec, total};
99  
            co_return {ec, total};
101  
    }
100  
    }
102  
}
101  
}
103  

102  

104  
/** Transfer data from a ReadStream to a BufferSink.
103  
/** Transfer data from a ReadStream to a BufferSink.
105  

104  

106  
    This function reads data from the stream directly into the sink's
105  
    This function reads data from the stream directly into the sink's
107  
    internal buffers using the callee-owns-buffers model. The sink
106  
    internal buffers using the callee-owns-buffers model. The sink
108  
    provides writable buffers via `prepare()`, the stream reads into
107  
    provides writable buffers via `prepare()`, the stream reads into
109  
    them using `read_some()`, and the sink commits the data. When the
108  
    them using `read_some()`, and the sink commits the data. When the
110  
    stream signals EOF, `commit_eof()` is called on the sink to
109  
    stream signals EOF, `commit_eof()` is called on the sink to
111  
    finalize the transfer.
110  
    finalize the transfer.
112  

111  

113  
    This overload handles partial reads from the stream, committing
112  
    This overload handles partial reads from the stream, committing
114  
    data incrementally as it arrives. It loops until EOF is encountered
113  
    data incrementally as it arrives. It loops until EOF is encountered
115  
    or an error occurs.
114  
    or an error occurs.
116  

115  

117  
    @tparam Src The source type, must satisfy @ref ReadStream.
116  
    @tparam Src The source type, must satisfy @ref ReadStream.
118  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
117  
    @tparam Sink The sink type, must satisfy @ref BufferSink.
119  

118  

120  
    @param source The stream to read data from.
119  
    @param source The stream to read data from.
121  
    @param sink The sink to write data to.
120  
    @param sink The sink to write data to.
122  

121  

123  
    @return A task that yields `(std::error_code, std::size_t)`.
122  
    @return A task that yields `(std::error_code, std::size_t)`.
124  
        On success, `ec` is default-constructed (no error) and `n` is
123  
        On success, `ec` is default-constructed (no error) and `n` is
125  
        the total number of bytes transferred. On error, `ec` contains
124  
        the total number of bytes transferred. On error, `ec` contains
126  
        the error code and `n` is the total number of bytes transferred
125  
        the error code and `n` is the total number of bytes transferred
127  
        before the error.
126  
        before the error.
128  

127  

129  
    @par Example
128  
    @par Example
130  
    @code
129  
    @code
131  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
130  
    task<void> transfer_body(ReadStream auto& stream, BufferSink auto& sink)
132  
    {
131  
    {
133  
        auto [ec, n] = co_await pull_from(stream, sink);
132  
        auto [ec, n] = co_await pull_from(stream, sink);
134  
        if (ec)
133  
        if (ec)
135  
        {
134  
        {
136  
            // Handle error
135  
            // Handle error
137  
        }
136  
        }
138  
        // n bytes were transferred
137  
        // n bytes were transferred
139  
    }
138  
    }
140  
    @endcode
139  
    @endcode
141  

140  

142  
    @see ReadStream, BufferSink, push_to
141  
    @see ReadStream, BufferSink, push_to
143  
*/
142  
*/
144  
template<ReadStream Src, BufferSink Sink>
143  
template<ReadStream Src, BufferSink Sink>
145 -
task<io_result<std::size_t>>
144 +
    requires (!ReadSource<Src>)
 
145 +
io_task<std::size_t>
146  
pull_from(Src& source, Sink& sink)
146  
pull_from(Src& source, Sink& sink)
147  
{
147  
{
148  
    mutable_buffer dst_arr[detail::max_iovec_];
148  
    mutable_buffer dst_arr[detail::max_iovec_];
149  
    std::size_t total = 0;
149  
    std::size_t total = 0;
150  

150  

151  
    for(;;)
151  
    for(;;)
152  
    {
152  
    {
153  
        // Prepare destination buffers from the sink
153  
        // Prepare destination buffers from the sink
154  
        auto dst_bufs = sink.prepare(dst_arr);
154  
        auto dst_bufs = sink.prepare(dst_arr);
155  
        if(dst_bufs.empty())
155  
        if(dst_bufs.empty())
156  
        {
156  
        {
157  
            // No buffer space available; commit nothing to flush
157  
            // No buffer space available; commit nothing to flush
158  
            auto [flush_ec] = co_await sink.commit(0);
158  
            auto [flush_ec] = co_await sink.commit(0);
159  
            if(flush_ec)
159  
            if(flush_ec)
160  
                co_return {flush_ec, total};
160  
                co_return {flush_ec, total};
161  
            continue;
161  
            continue;
162  
        }
162  
        }
163  

163  

164  
        // Read data from the stream into the sink's buffers
164  
        // Read data from the stream into the sink's buffers
165  
        auto [ec, n] = co_await source.read_some(
165  
        auto [ec, n] = co_await source.read_some(
166  
            std::span<mutable_buffer const>(dst_bufs));
166  
            std::span<mutable_buffer const>(dst_bufs));
167  

167  

168  
        // Commit any data that was read
168  
        // Commit any data that was read
169  
        if(n > 0)
169  
        if(n > 0)
170  
        {
170  
        {
171  
            auto [commit_ec] = co_await sink.commit(n);
171  
            auto [commit_ec] = co_await sink.commit(n);
172  
            if(commit_ec)
172  
            if(commit_ec)
173  
                co_return {commit_ec, total};
173  
                co_return {commit_ec, total};
174  
            total += n;
174  
            total += n;
175  
        }
175  
        }
176  

176  

177  
        // Check for EOF condition
177  
        // Check for EOF condition
178  
        if(ec == cond::eof)
178  
        if(ec == cond::eof)
179  
        {
179  
        {
180 -
            auto [eof_ec] = co_await sink.commit_eof();
180 +
            auto [eof_ec] = co_await sink.commit_eof(0);
181  
            co_return {eof_ec, total};
181  
            co_return {eof_ec, total};
182  
        }
182  
        }
183  

183  

184  
        // Check for other errors
184  
        // Check for other errors
185  
        if(ec)
185  
        if(ec)
186  
            co_return {ec, total};
186  
            co_return {ec, total};
187  
    }
187  
    }
188  
}
188  
}
189  

189  

190  
} // namespace capy
190  
} // namespace capy
191  
} // namespace boost
191  
} // namespace boost
192  

192  

193  
#endif
193  
#endif