TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_READ_UNTIL_HPP
11 : #define BOOST_CAPY_READ_UNTIL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/cond.hpp>
16 : #include <coroutine>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/io_task.hpp>
20 : #include <boost/capy/concept/dynamic_buffer.hpp>
21 : #include <boost/capy/concept/match_condition.hpp>
22 : #include <boost/capy/concept/read_stream.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 :
25 : #include <algorithm>
26 : #include <cstddef>
27 : #include <optional>
28 : #include <stop_token>
29 : #include <string_view>
30 : #include <type_traits>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : namespace detail {
36 :
37 : // Linearize a buffer sequence into a string
38 : inline
39 : std::string
40 MIS 0 : linearize_buffers(ConstBufferSequence auto const& data)
41 : {
42 0 : std::string linear;
43 0 : linear.reserve(buffer_size(data));
44 0 : auto const end_ = end(data);
45 0 : for(auto it = begin(data); it != end_; ++it)
46 0 : linear.append(
47 0 : static_cast<char const*>(it->data()),
48 : it->size());
49 0 : return linear;
50 0 : }
51 :
52 : // Search buffer using a MatchCondition, with single-buffer optimization
53 : template<MatchCondition M>
54 : std::size_t
55 HIT 240 : search_buffer_for_match(
56 : ConstBufferSequence auto const& data,
57 : M const& match,
58 : std::size_t* hint = nullptr)
59 : {
60 : // Fast path: single buffer - no linearization needed
61 240 : if(buffer_length(data) == 1)
62 : {
63 240 : auto const& buf = *begin(data);
64 720 : return match(std::string_view(
65 240 : static_cast<char const*>(buf.data()),
66 240 : buf.size()), hint);
67 : }
68 : // Multiple buffers - linearize
69 MIS 0 : return match(linearize_buffers(data), hint);
70 : }
71 :
72 : // Implementation coroutine for read_until with MatchCondition
73 : template<class Stream, class B, MatchCondition M>
74 : io_task<std::size_t>
75 HIT 126 : read_until_match_impl(
76 : Stream& stream,
77 : B& buffers,
78 : M match,
79 : std::size_t initial_amount)
80 : {
81 : std::size_t amount = initial_amount;
82 :
83 : for(;;)
84 : {
85 : // Check max_size before preparing
86 : if(buffers.size() >= buffers.max_size())
87 : co_return {error::not_found, 0};
88 :
89 : // Prepare space, respecting max_size
90 : std::size_t const available = buffers.max_size() - buffers.size();
91 : std::size_t const to_prepare = (std::min)(amount, available);
92 : if(to_prepare == 0)
93 : co_return {error::not_found, 0};
94 :
95 : auto mb = buffers.prepare(to_prepare);
96 : auto [ec, n] = co_await stream.read_some(mb);
97 : buffers.commit(n);
98 :
99 : if(!ec)
100 : {
101 : auto pos = search_buffer_for_match(buffers.data(), match);
102 : if(pos != std::string_view::npos)
103 : co_return {{}, pos};
104 : }
105 :
106 : if(ec == cond::eof)
107 : co_return {error::eof, buffers.size()};
108 : if(ec)
109 : co_return {ec, buffers.size()};
110 :
111 : // Grow buffer size for next iteration
112 : if(n == buffer_size(mb))
113 : amount = amount / 2 + amount;
114 : }
115 252 : }
116 :
117 : template<class Stream, class B, MatchCondition M, bool OwnsBuffer>
118 : struct read_until_awaitable
119 : {
120 : Stream* stream_;
121 : M match_;
122 : std::size_t initial_amount_;
123 : std::optional<io_result<std::size_t>> immediate_;
124 : std::optional<io_task<std::size_t>> inner_;
125 :
126 : using storage_type = std::conditional_t<OwnsBuffer, B, B*>;
127 : storage_type buffers_storage_;
128 :
129 126 : B& buffers() noexcept
130 : {
131 : if constexpr(OwnsBuffer)
132 126 : return buffers_storage_;
133 : else
134 MIS 0 : return *buffers_storage_;
135 : }
136 :
137 : // Constructor for lvalue (pointer storage)
138 HIT 4 : read_until_awaitable(
139 : Stream& stream,
140 : B* buffers,
141 : M match,
142 : std::size_t initial_amount)
143 : requires (!OwnsBuffer)
144 4 : : stream_(std::addressof(stream))
145 4 : , match_(std::move(match))
146 4 : , initial_amount_(initial_amount)
147 4 : , buffers_storage_(buffers)
148 : {
149 4 : auto pos = search_buffer_for_match(
150 4 : buffers_storage_->data(), match_);
151 4 : if(pos != std::string_view::npos)
152 4 : immediate_.emplace(io_result<std::size_t>{{}, pos});
153 4 : }
154 :
155 : // Constructor for rvalue adapter (owned storage)
156 132 : read_until_awaitable(
157 : Stream& stream,
158 : B&& buffers,
159 : M match,
160 : std::size_t initial_amount)
161 : requires OwnsBuffer
162 132 : : stream_(std::addressof(stream))
163 132 : , match_(std::move(match))
164 132 : , initial_amount_(initial_amount)
165 132 : , buffers_storage_(std::move(buffers))
166 : {
167 132 : auto pos = search_buffer_for_match(
168 132 : buffers_storage_.data(), match_);
169 132 : if(pos != std::string_view::npos)
170 6 : immediate_.emplace(io_result<std::size_t>{{}, pos});
171 132 : }
172 :
173 : bool
174 136 : await_ready() const noexcept
175 : {
176 136 : return immediate_.has_value();
177 : }
178 :
179 : std::coroutine_handle<>
180 126 : await_suspend(std::coroutine_handle<> h, io_env const* env)
181 : {
182 252 : inner_.emplace(read_until_match_impl(
183 126 : *stream_, buffers(), match_, initial_amount_));
184 126 : return inner_->await_suspend(h, env);
185 : }
186 :
187 : io_result<std::size_t>
188 136 : await_resume()
189 : {
190 136 : if(immediate_)
191 10 : return *immediate_;
192 126 : return inner_->await_resume();
193 : }
194 : };
195 :
196 : } // namespace detail
197 :
198 : /** Match condition that searches for a delimiter string.
199 :
200 : Satisfies @ref MatchCondition. Returns the position after the
201 : delimiter when found, or `npos` otherwise. Provides an overlap
202 : hint of `delim.size() - 1` to handle delimiters spanning reads.
203 :
204 : @see MatchCondition, read_until
205 : */
206 : struct match_delim
207 : {
208 : /** The delimiter string to search for.
209 :
210 : @note The referenced characters must remain valid
211 : for the lifetime of this object and any pending
212 : read operation.
213 : */
214 : std::string_view delim;
215 :
216 : /** Search for the delimiter in `data`.
217 :
218 : @param data The data to search.
219 : @param hint If non-null, receives the overlap hint
220 : on miss.
221 : @return `0` if `delim` is empty; otherwise the position
222 : just past the delimiter, or `npos` if not found.
223 : */
224 : std::size_t
225 202 : operator()(
226 : std::string_view data,
227 : std::size_t* hint) const noexcept
228 : {
229 202 : if(delim.empty())
230 2 : return 0;
231 200 : auto pos = data.find(delim);
232 200 : if(pos != std::string_view::npos)
233 24 : return pos + delim.size();
234 176 : if(hint)
235 MIS 0 : *hint = delim.size() > 1 ? delim.size() - 1 : 0;
236 HIT 176 : return std::string_view::npos;
237 : }
238 : };
239 :
240 : /** Asynchronously read until a match condition is satisfied.
241 :
242 : Reads data from the stream into the dynamic buffer until the match
243 : condition returns a valid position. Implemented using `read_some`.
244 : If the match condition is already satisfied by existing buffer
245 : data, returns immediately without I/O.
246 :
247 : @li The operation completes when:
248 : @li The match condition returns a valid position
249 : @li End-of-stream is reached (`cond::eof`)
250 : @li The buffer's `max_size()` is reached (`cond::not_found`)
251 : @li An error occurs
252 : @li The operation is cancelled
253 :
254 : @par Cancellation
255 : Supports cancellation via `stop_token` propagated through the
256 : IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
257 :
258 : @param stream The stream to read from. The caller retains ownership.
259 : @param buffers The dynamic buffer to append data to. Must remain
260 : valid until the operation completes.
261 : @param match The match condition callable. Copied into the awaitable.
262 : @param initial_amount Initial bytes to read per iteration (default
263 : 2048). Grows by 1.5x when filled.
264 :
265 : @return An awaitable yielding `(error_code, std::size_t)`.
266 : On success, `n` is the position returned by the match condition
267 : (bytes up to and including the matched delimiter). Compare error
268 : codes to conditions:
269 : @li `cond::eof` - EOF before match; `n` is buffer size
270 : @li `cond::not_found` - `max_size()` reached before match
271 : @li `cond::canceled` - Operation was cancelled
272 :
273 : @par Example
274 :
275 : @code
276 : task<> read_http_header( ReadStream auto& stream )
277 : {
278 : std::string header;
279 : auto [ec, n] = co_await read_until(
280 : stream,
281 : string_dynamic_buffer( &header ),
282 : []( std::string_view data, std::size_t* hint ) {
283 : auto pos = data.find( "\r\n\r\n" );
284 : if( pos != std::string_view::npos )
285 : return pos + 4;
286 : if( hint )
287 : *hint = 3; // partial "\r\n\r" possible
288 : return std::string_view::npos;
289 : } );
290 : if( ec )
291 : detail::throw_system_error( ec );
292 : // header contains data through "\r\n\r\n"
293 : }
294 : @endcode
295 :
296 : @see read_some, MatchCondition, DynamicBufferParam
297 : */
298 : template<ReadStream Stream, class B, MatchCondition M>
299 : requires DynamicBufferParam<B&&>
300 : auto
301 136 : read_until(
302 : Stream& stream,
303 : B&& buffers,
304 : M match,
305 : std::size_t initial_amount = 2048)
306 : {
307 136 : constexpr bool is_lvalue = std::is_lvalue_reference_v<B&&>;
308 : using BareB = std::remove_reference_t<B>;
309 :
310 : if constexpr(is_lvalue)
311 : return detail::read_until_awaitable<Stream, BareB, M, false>(
312 4 : stream, std::addressof(buffers), std::move(match), initial_amount);
313 : else
314 : return detail::read_until_awaitable<Stream, BareB, M, true>(
315 132 : stream, std::move(buffers), std::move(match), initial_amount);
316 : }
317 :
318 : /** Asynchronously read until a delimiter string is found.
319 :
320 : Reads data from the stream until the delimiter is found. This is
321 : a convenience overload equivalent to calling `read_until` with
322 : `match_delim{delim}`. If the delimiter already exists in the
323 : buffer, returns immediately without I/O.
324 :
325 : @li The operation completes when:
326 : @li The delimiter string is found
327 : @li End-of-stream is reached (`cond::eof`)
328 : @li The buffer's `max_size()` is reached (`cond::not_found`)
329 : @li An error occurs
330 : @li The operation is cancelled
331 :
332 : @par Cancellation
333 : Supports cancellation via `stop_token` propagated through the
334 : IoAwaitable protocol. When cancelled, returns with `cond::canceled`.
335 :
336 : @param stream The stream to read from. The caller retains ownership.
337 : @param buffers The dynamic buffer to append data to. Must remain
338 : valid until the operation completes.
339 : @param delim The delimiter string to search for.
340 : @param initial_amount Initial bytes to read per iteration (default
341 : 2048). Grows by 1.5x when filled.
342 :
343 : @return An awaitable yielding `(error_code, std::size_t)`.
344 : On success, `n` is bytes up to and including the delimiter.
345 : Compare error codes to conditions:
346 : @li `cond::eof` - EOF before delimiter; `n` is buffer size
347 : @li `cond::not_found` - `max_size()` reached before delimiter
348 : @li `cond::canceled` - Operation was cancelled
349 :
350 : @par Example
351 :
352 : @code
353 : task<std::string> read_line( ReadStream auto& stream )
354 : {
355 : std::string line;
356 : auto [ec, n] = co_await read_until(
357 : stream, string_dynamic_buffer( &line ), "\r\n" );
358 : if( ec == cond::eof )
359 : co_return line; // partial line at EOF
360 : if( ec )
361 : detail::throw_system_error( ec );
362 : line.resize( n - 2 ); // remove "\r\n"
363 : co_return line;
364 : }
365 : @endcode
366 :
367 : @see read_until, match_delim, DynamicBufferParam
368 : */
369 : template<ReadStream Stream, class B>
370 : requires DynamicBufferParam<B&&>
371 : auto
372 108 : read_until(
373 : Stream& stream,
374 : B&& buffers,
375 : std::string_view delim,
376 : std::size_t initial_amount = 2048)
377 : {
378 : return read_until(
379 : stream,
380 : std::forward<B>(buffers),
381 : match_delim{delim},
382 108 : initial_amount);
383 : }
384 :
385 : } // namespace capy
386 : } // namespace boost
387 :
388 : #endif
|