include/boost/capy/io/any_read_stream.hpp

87.4% Lines (83/95) 81.6% Functions (31/38)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/ex/io_env.hpp>
20 #include <boost/capy/io_result.hpp>
21
22 #include <concepts>
23 #include <coroutine>
24 #include <cstddef>
25 #include <exception>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any ReadStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref ReadStream concept, enabling runtime polymorphism for
39 read operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Immediate Completion
55 When the underlying stream's awaitable reports ready immediately
56 (e.g. buffered data already available), the wrapper skips
57 coroutine suspension entirely and returns the result inline.
58
59 @par Thread Safety
60 Not thread-safe. Concurrent operations on the same wrapper
61 are undefined behavior.
62
63 @par Example
64 @code
65 // Owning - takes ownership of the stream
66 any_read_stream stream(socket{ioc});
67
68 // Reference - wraps without ownership
69 socket sock(ioc);
70 any_read_stream stream(&sock);
71
72 mutable_buffer buf(data, size);
73 auto [ec, n] = co_await stream.read_some(buf);
74 @endcode
75
76 @see any_write_stream, any_stream, ReadStream
77 */
78 class any_read_stream
79 {
80 struct vtable;
81
82 template<ReadStream S>
83 struct vtable_for_impl;
84
85 // ordered for cache line coherence
86 void* stream_ = nullptr;
87 vtable const* vt_ = nullptr;
88 void* cached_awaitable_ = nullptr;
89 void* storage_ = nullptr;
90 bool awaitable_active_ = false;
91
92 public:
93 /** Destructor.
94
95 Destroys the owned stream (if any) and releases the cached
96 awaitable storage.
97 */
98 ~any_read_stream();
99
100 /** Construct a default instance.
101
102 Constructs an empty wrapper. Operations on a default-constructed
103 wrapper result in undefined behavior.
104 */
105 1x any_read_stream() = default;
106
107 /** Non-copyable.
108
109 The awaitable cache is per-instance and cannot be shared.
110 */
111 any_read_stream(any_read_stream const&) = delete;
112 any_read_stream& operator=(any_read_stream const&) = delete;
113
114 /** Construct by moving.
115
116 Transfers ownership of the wrapped stream (if owned) and
117 cached awaitable storage from `other`. After the move, `other` is
118 in a default-constructed state.
119
120 @param other The wrapper to move from.
121 */
122 2x any_read_stream(any_read_stream&& other) noexcept
123 2x : stream_(std::exchange(other.stream_, nullptr))
124 2x , vt_(std::exchange(other.vt_, nullptr))
125 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 2x , storage_(std::exchange(other.storage_, nullptr))
127 2x , awaitable_active_(std::exchange(other.awaitable_active_, false))
128 {
129 2x }
130
131 /** Assign by moving.
132
133 Destroys any owned stream and releases existing resources,
134 then transfers ownership from `other`.
135
136 @param other The wrapper to move from.
137 @return Reference to this wrapper.
138 */
139 any_read_stream&
140 operator=(any_read_stream&& other) noexcept;
141
142 /** Construct by taking ownership of a ReadStream.
143
144 Allocates storage and moves the stream into this wrapper.
145 The wrapper owns the stream and will destroy it.
146
147 @param s The stream to take ownership of.
148 */
149 template<ReadStream S>
150 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
151 any_read_stream(S s);
152
153 /** Construct by wrapping a ReadStream without ownership.
154
155 Wraps the given stream by pointer. The stream must remain
156 valid for the lifetime of this wrapper.
157
158 @param s Pointer to the stream to wrap.
159 */
160 template<ReadStream S>
161 any_read_stream(S* s);
162
163 /** Check if the wrapper contains a valid stream.
164
165 @return `true` if wrapping a stream, `false` if default-constructed
166 or moved-from.
167 */
168 bool
169 25x has_value() const noexcept
170 {
171 25x return stream_ != nullptr;
172 }
173
174 /** Check if the wrapper contains a valid stream.
175
176 @return `true` if wrapping a stream, `false` if default-constructed
177 or moved-from.
178 */
179 explicit
180 3x operator bool() const noexcept
181 {
182 3x return has_value();
183 }
184
185 /** Initiate an asynchronous read operation.
186
187 Reads data into the provided buffer sequence. The operation
188 completes when at least one byte has been read, or an error
189 occurs.
190
191 @param buffers The buffer sequence to read into. Passed by
192 value to ensure the sequence lives in the coroutine frame
193 across suspension points.
194
195 @return An awaitable yielding `(error_code,std::size_t)`.
196
197 @par Immediate Completion
198 The operation completes immediately without suspending
199 the calling coroutine when the underlying stream's
200 awaitable reports immediate readiness via `await_ready`.
201
202 @note This is a partial operation and may not process the
203 entire buffer sequence. Use the composed @ref read algorithm
204 for guaranteed complete transfer.
205
206 @par Preconditions
207 The wrapper must contain a valid stream (`has_value() == true`).
208 The caller must not call this function again after a prior
209 call returned an error (including EOF).
210 */
211 template<MutableBufferSequence MB>
212 auto
213 read_some(MB buffers);
214
215 protected:
216 /** Rebind to a new stream after move.
217
218 Updates the internal pointer to reference a new stream object.
219 Used by owning wrappers after move assignment when the owned
220 object has moved to a new location.
221
222 @param new_stream The new stream to bind to. Must be the same
223 type as the original stream.
224
225 @note Terminates if called with a stream of different type
226 than the original.
227 */
228 template<ReadStream S>
229 void
230 rebind(S& new_stream) noexcept
231 {
232 if(vt_ != &vtable_for_impl<S>::value)
233 std::terminate();
234 stream_ = &new_stream;
235 }
236 };
237
238 struct any_read_stream::vtable
239 {
240 // ordered by call frequency for cache line coherence
241 void (*construct_awaitable)(
242 void* stream,
243 void* storage,
244 std::span<mutable_buffer const> buffers);
245 bool (*await_ready)(void*);
246 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
247 io_result<std::size_t> (*await_resume)(void*);
248 void (*destroy_awaitable)(void*) noexcept;
249 std::size_t awaitable_size;
250 std::size_t awaitable_align;
251 void (*destroy)(void*) noexcept;
252 };
253
254 template<ReadStream S>
255 struct any_read_stream::vtable_for_impl
256 {
257 using Awaitable = decltype(std::declval<S&>().read_some(
258 std::span<mutable_buffer const>{}));
259
260 static void
261 1x do_destroy_impl(void* stream) noexcept
262 {
263 1x static_cast<S*>(stream)->~S();
264 1x }
265
266 static void
267 91x construct_awaitable_impl(
268 void* stream,
269 void* storage,
270 std::span<mutable_buffer const> buffers)
271 {
272 91x auto& s = *static_cast<S*>(stream);
273 91x ::new(storage) Awaitable(s.read_some(buffers));
274 91x }
275
276 static constexpr vtable value = {
277 &construct_awaitable_impl,
278 91x +[](void* p) {
279 91x return static_cast<Awaitable*>(p)->await_ready();
280 },
281 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
282 return detail::call_await_suspend(
283 static_cast<Awaitable*>(p), h, env);
284 },
285 89x +[](void* p) {
286 89x return static_cast<Awaitable*>(p)->await_resume();
287 },
288 93x +[](void* p) noexcept {
289 16x static_cast<Awaitable*>(p)->~Awaitable();
290 },
291 sizeof(Awaitable),
292 alignof(Awaitable),
293 &do_destroy_impl
294 };
295 };
296
297 inline
298 101x any_read_stream::~any_read_stream()
299 {
300 101x if(storage_)
301 {
302 1x vt_->destroy(stream_);
303 1x ::operator delete(storage_);
304 }
305 101x if(cached_awaitable_)
306 {
307 91x if(awaitable_active_)
308 1x vt_->destroy_awaitable(cached_awaitable_);
309 91x ::operator delete(cached_awaitable_);
310 }
311 101x }
312
313 inline any_read_stream&
314 5x any_read_stream::operator=(any_read_stream&& other) noexcept
315 {
316 5x if(this != &other)
317 {
318 5x if(storage_)
319 {
320 vt_->destroy(stream_);
321 ::operator delete(storage_);
322 }
323 5x if(cached_awaitable_)
324 {
325 2x if(awaitable_active_)
326 1x vt_->destroy_awaitable(cached_awaitable_);
327 2x ::operator delete(cached_awaitable_);
328 }
329 5x stream_ = std::exchange(other.stream_, nullptr);
330 5x vt_ = std::exchange(other.vt_, nullptr);
331 5x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
332 5x storage_ = std::exchange(other.storage_, nullptr);
333 5x awaitable_active_ = std::exchange(other.awaitable_active_, false);
334 }
335 5x return *this;
336 }
337
338 template<ReadStream S>
339 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
340 1x any_read_stream::any_read_stream(S s)
341 1x : vt_(&vtable_for_impl<S>::value)
342 {
343 struct guard {
344 any_read_stream* self;
345 bool committed = false;
346 1x ~guard() {
347 1x if(!committed && self->storage_) {
348 self->vt_->destroy(self->stream_);
349 ::operator delete(self->storage_);
350 self->storage_ = nullptr;
351 self->stream_ = nullptr;
352 }
353 1x }
354 1x } g{this};
355
356 1x storage_ = ::operator new(sizeof(S));
357 1x stream_ = ::new(storage_) S(std::move(s));
358
359 // Preallocate the awaitable storage
360 1x cached_awaitable_ = ::operator new(vt_->awaitable_size);
361
362 1x g.committed = true;
363 1x }
364
365 template<ReadStream S>
366 92x any_read_stream::any_read_stream(S* s)
367 92x : stream_(s)
368 92x , vt_(&vtable_for_impl<S>::value)
369 {
370 // Preallocate the awaitable storage
371 92x cached_awaitable_ = ::operator new(vt_->awaitable_size);
372 92x }
373
374 template<MutableBufferSequence MB>
375 auto
376 91x any_read_stream::read_some(MB buffers)
377 {
378 // VFALCO in theory, we could use if constexpr to detect a
379 // span and then pass that through to read_some without the array
380 struct awaitable
381 {
382 any_read_stream* self_;
383 mutable_buffer_array<detail::max_iovec_> ba_;
384
385 bool
386 14x await_ready()
387 {
388 14x self_->vt_->construct_awaitable(
389 14x self_->stream_,
390 14x self_->cached_awaitable_,
391 14x ba_.to_span());
392 14x self_->awaitable_active_ = true;
393
394 28x return self_->vt_->await_ready(
395 14x self_->cached_awaitable_);
396 }
397
398 std::coroutine_handle<>
399 await_suspend(std::coroutine_handle<> h, io_env const* env)
400 {
401 return self_->vt_->await_suspend(
402 self_->cached_awaitable_, h, env);
403 }
404
405 io_result<std::size_t>
406 14x await_resume()
407 {
408 struct guard {
409 any_read_stream* self;
410 14x ~guard() {
411 14x self->vt_->destroy_awaitable(self->cached_awaitable_);
412 14x self->awaitable_active_ = false;
413 14x }
414 14x } g{self_};
415 14x return self_->vt_->await_resume(
416 24x self_->cached_awaitable_);
417 14x }
418 };
419 return awaitable{this,
420 91x mutable_buffer_array<detail::max_iovec_>(buffers)};
421 91x }
422
423 } // namespace capy
424 } // namespace boost
425
426 #endif
427