include/boost/capy/read_until.hpp

81.8% Lines (54/66) 85.0% Functions (34/40)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 #define BOOST_CAPY_READ_UNTIL_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/buffers.hpp>
15 #include <boost/capy/cond.hpp>
16 #include <coroutine>
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/io_result.hpp>
19 #include <boost/capy/io_task.hpp>
20 #include <boost/capy/concept/dynamic_buffer.hpp>
21 #include <boost/capy/concept/match_condition.hpp>
22 #include <boost/capy/concept/read_stream.hpp>
23 #include <boost/capy/ex/io_env.hpp>
24
25 #include <algorithm>
26 #include <cstddef>
27 #include <optional>
28 #include <stop_token>
29 #include <string_view>
30 #include <type_traits>
31
32 namespace boost {
33 namespace capy {
34
35 namespace detail {
36
37 // Linearize a buffer sequence into a string
38 inline
39 std::string
40 linearize_buffers(ConstBufferSequence auto const& data)
41 {
42 std::string linear;
43 linear.reserve(buffer_size(data));
44 auto const end_ = end(data);
45 for(auto it = begin(data); it != end_; ++it)
46 linear.append(
47 static_cast<char const*>(it->data()),
48 it->size());
49 return linear;
50 }
51
52 // Search buffer using a MatchCondition, with single-buffer optimization
53 template<MatchCondition M>
54 std::size_t
55 240x search_buffer_for_match(
56 ConstBufferSequence auto const& data,
57 M const& match,
58 std::size_t* hint = nullptr)
59 {
60 // Fast path: single buffer - no linearization needed
61 240x if(buffer_length(data) == 1)
62 {
63 240x auto const& buf = *begin(data);
64 720x return match(std::string_view(
65 240x static_cast<char const*>(buf.data()),
66 240x buf.size()), hint);
67 }
68 // Multiple buffers - linearize
69 return match(linearize_buffers(data), hint);
70 }
71
72 // Implementation coroutine for read_until with MatchCondition
73 template<class Stream, class B, MatchCondition M>
74 io_task<std::size_t>
75 126x read_until_match_impl(
76 Stream& stream,
77 B& buffers,
78 M match,
79 std::size_t initial_amount)
80 {
81 std::size_t amount = initial_amount;
82
83 for(;;)
84 {
85 // Check max_size before preparing
86 if(buffers.size() >= buffers.max_size())
87 co_return {error::not_found, 0};
88
89 // Prepare space, respecting max_size
90 std::size_t const available = buffers.max_size() - buffers.size();
91 std::size_t const to_prepare = (std::min)(amount, available);
92 if(to_prepare == 0)
93 co_return {error::not_found, 0};
94
95 auto mb = buffers.prepare(to_prepare);
96 auto [ec, n] = co_await stream.read_some(mb);
97 buffers.commit(n);
98
99 if(!ec)
100 {
101 auto pos = search_buffer_for_match(buffers.data(), match);
102 if(pos != std::string_view::npos)
103 co_return {{}, pos};
104 }
105
106 if(ec == cond::eof)
107 co_return {error::eof, buffers.size()};
108 if(ec)
109 co_return {ec, buffers.size()};
110
111 // Grow buffer size for next iteration
112 if(n == buffer_size(mb))
113 amount = amount / 2 + amount;
114 }
115 252x }
116
117 template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
118 struct read_until_awaitable
119 {
120 Stream* stream_;
121 M match_;
122 std::size_t initial_amount_;
123 std::optional<io_result<std::size_t>> immediate_;
124 std::optional<io_task<std::size_t>> inner_;
125
126 using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
127 storage_type buffers_storage_;
128
129 126x B& buffers() noexcept
130 {
131 if constexpr(OwnsBuffer)
132 126x return buffers_storage_;
133 else
134 return *buffers_storage_;
135 }
136
137 // Constructor for lvalue (pointer storage)
138 4x read_until_awaitable(
139 Stream& stream,
140 B* buffers,
141 M match,
142 std::size_t initial_amount)
143 requires (!OwnsBuffer)
144 4x : stream_(std::addressof(stream))
145 4x , match_(std::move(match))
146 4x , initial_amount_(initial_amount)
147 4x , buffers_storage_(buffers)
148 {
149 4x auto pos = search_buffer_for_match(
150 4x buffers_storage_->data(), match_);
151 4x if(pos != std::string_view::npos)
152 4x immediate_.emplace(io_result<std::size_t>{{}, pos});
153 4x }
154
155 // Constructor for rvalue adapter (owned storage)
156 132x read_until_awaitable(
157 Stream& stream,
158 B&& buffers,
159 M match,
160 std::size_t initial_amount)
161 requires OwnsBuffer
162 132x : stream_(std::addressof(stream))
163 132x , match_(std::move(match))
164 132x , initial_amount_(initial_amount)
165 132x , buffers_storage_(std::move(buffers))
166 {
167 132x auto pos = search_buffer_for_match(
168 132x buffers_storage_.data(), match_);
169 132x if(pos != std::string_view::npos)
170 6x immediate_.emplace(io_result<std::size_t>{{}, pos});
171 132x }
172
173 bool
174 136x await_ready() const noexcept
175 {
176 136x return immediate_.has_value();
177 }
178
179 std::coroutine_handle<>
180 126x await_suspend(std::coroutine_handle<> h, io_env const* env)
181 {
182 252x inner_.emplace(read_until_match_impl(
183 126x *stream_, buffers(), match_, initial_amount_));
184 126x return inner_->await_suspend(h, env);
185 }
186
187 io_result<std::size_t>
188 136x await_resume()
189 {
190 136x if(immediate_)
191 10x return *immediate_;
192 126x return inner_->await_resume();
193 }
194 };
195
196 } // namespace detail
197
198 /** Match condition that searches for a delimiter string.
199
200 Satisfies @ref MatchCondition. Returns the position after the
201 delimiter when found, or `npos` otherwise. Provides an overlap
202 hint of `delim.size() - 1` to handle delimiters spanning reads.
203
204 @see MatchCondition, read_until
205 */
206 struct match_delim
207 {
208 /** The delimiter string to search for.
209
210 @note The referenced characters must remain valid
211 for the lifetime of this object and any pending
212 read operation.
213 */
214 std::string_view delim;
215
216 /** Search for the delimiter in `data`.
217
218 @param data The data to search.
219 @param hint If non-null, receives the overlap hint
220 on miss.
221 @return `0` if `delim` is empty; otherwise the position
222 just past the delimiter, or `npos` if not found.
223 */
224 std::size_t
225 202x operator()(
226 std::string_view data,
227 std::size_t* hint) const noexcept
228 {
229 202x if(delim.empty())
230 2x return 0;
231 200x auto pos = data.find(delim);
232 200x if(pos != std::string_view::npos)
233 24x return pos + delim.size();
234 176x if(hint)
235 *hint = delim.size() > 1 ? delim.size() - 1 : 0;
236 176x return std::string_view::npos;
237 }
238 };
239
240 /** Asynchronously read until a match condition is satisfied.
241
242 Reads data from the stream into the dynamic buffer until the match
243 condition returns a valid position. Implemented using `read_some`.
244 If the match condition is already satisfied by existing buffer
245 data, returns immediately without I/O.
246
247 @li The operation completes when:
248 @li The match condition returns a valid position
249 @li End-of-stream is reached (`cond::eof`)
250 @li The buffer's `max_size()` is reached (`cond::not_found`)
251 @li An error occurs
252 @li The operation is cancelled
253
254 @par Cancellation
255 Supports cancellation via `stop_token` propagated through the
256 IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
257
258 @param stream The stream to read from. The caller retains ownership.
259 @param buffers The dynamic buffer to append data to. Must remain
260 valid until the operation completes.
261 @param match The match condition callable. Copied into the awaitable.
262 @param initial_amount Initial bytes to read per iteration (default
263 2048). Grows by 1.5x when filled.
264
265 @return An awaitable yielding `(error_code, std::size_t)`.
266 On success, `n` is the position returned by the match condition
267 (bytes up to and including the matched delimiter). Compare error
268 codes to conditions:
269 @li `cond::eof` - EOF before match; `n` is buffer size
270 @li `cond::not_found` - `max_size()` reached before match
271 @li `cond::canceled` - Operation was cancelled
272
273 @par Example
274
275 @code
276 task<> read_http_header( ReadStream auto& stream )
277 {
278 std::string header;
279 auto [ec, n] = co_await read_until(
280 stream,
281 string_dynamic_buffer( &header ),
282 []( std::string_view data, std::size_t* hint ) {
283 auto pos = data.find( "\r\n\r\n" );
284 if( pos != std::string_view::npos )
285 return pos + 4;
286 if( hint )
287 *hint = 3; // partial "\r\n\r" possible
288 return std::string_view::npos;
289 } );
290 if( ec )
291 detail::throw_system_error( ec );
292 // header contains data through "\r\n\r\n"
293 }
294 @endcode
295
296 @see read_some, MatchCondition, DynamicBufferParam
297 */
298 template<ReadStream Stream, class B, MatchCondition M>
299 requires DynamicBufferParam<B&&>
300 auto
301 136x read_until(
302 Stream& stream,
303 B&& buffers,
304 M match,
305 std::size_t initial_amount = 2048)
306 {
307 136x constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
308 using BareB = std::remove_reference_t<B>;
309
310 if constexpr(is_lvalue)
311 return detail::read_until_awaitable<Stream, BareB, M, false>(
312 4x stream, std::addressof(buffers), std::move(match), initial_amount);
313 else
314 return detail::read_until_awaitable<Stream, BareB, M, true>(
315 132x stream, std::move(buffers), std::move(match), initial_amount);
316 }
317
318 /** Asynchronously read until a delimiter string is found.
319
320 Reads data from the stream until the delimiter is found. This is
321 a convenience overload equivalent to calling `read_until` with
322 `match_delim{delim}`. If the delimiter already exists in the
323 buffer, returns immediately without I/O.
324
325 @li The operation completes when:
326 @li The delimiter string is found
327 @li End-of-stream is reached (`cond::eof`)
328 @li The buffer's `max_size()` is reached (`cond::not_found`)
329 @li An error occurs
330 @li The operation is cancelled
331
332 @par Cancellation
333 Supports cancellation via `stop_token` propagated through the
334 IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
335
336 @param stream The stream to read from. The caller retains ownership.
337 @param buffers The dynamic buffer to append data to. Must remain
338 valid until the operation completes.
339 @param delim The delimiter string to search for.
340 @param initial_amount Initial bytes to read per iteration (default
341 2048). Grows by 1.5x when filled.
342
343 @return An awaitable yielding `(error_code, std::size_t)`.
344 On success, `n` is bytes up to and including the delimiter.
345 Compare error codes to conditions:
346 @li `cond::eof` - EOF before delimiter; `n` is buffer size
347 @li `cond::not_found` - `max_size()` reached before delimiter
348 @li `cond::canceled` - Operation was cancelled
349
350 @par Example
351
352 @code
353 task<std::string> read_line( ReadStream auto& stream )
354 {
355 std::string line;
356 auto [ec, n] = co_await read_until(
357 stream, string_dynamic_buffer( &line ), "\r\n" );
358 if( ec == cond::eof )
359 co_return line; // partial line at EOF
360 if( ec )
361 detail::throw_system_error( ec );
362 line.resize( n - 2 ); // remove "\r\n"
363 co_return line;
364 }
365 @endcode
366
367 @see read_until, match_delim, DynamicBufferParam
368 */
369 template<ReadStream Stream, class B>
370 requires DynamicBufferParam<B&&>
371 auto
372 108x read_until(
373 Stream& stream,
374 B&& buffers,
375 std::string_view delim,
376 std::size_t initial_amount = 2048)
377 {
378 return read_until(
379 stream,
380 std::forward<B>(buffers),
381 match_delim{delim},
382 108x initial_amount);
383 }
384
385 } // namespace capy
386 } // namespace boost
387
388 #endif
389