TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/buffers/slice.hpp>
19 : #include <boost/capy/concept/buffer_source.hpp>
20 : #include <boost/capy/concept/io_awaitable.hpp>
21 : #include <boost/capy/concept/read_source.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <boost/capy/ex/io_env.hpp>
24 : #include <boost/capy/io_result.hpp>
25 : #include <boost/capy/io_task.hpp>
26 :
27 : #include <concepts>
28 : #include <coroutine>
29 : #include <cstddef>
30 : #include <exception>
31 : #include <new>
32 : #include <span>
33 : #include <stop_token>
34 : #include <system_error>
35 : #include <utility>
36 :
37 : namespace boost {
38 : namespace capy {
39 :
40 : /** Type-erased wrapper for any BufferSource.
41 :
42 : This class provides type erasure for any type satisfying the
43 : @ref BufferSource concept, enabling runtime polymorphism for
44 : buffer pull operations. It uses cached awaitable storage to achieve
45 : zero steady-state allocation after construction.
46 :
47 : The wrapper also satisfies @ref ReadSource. When the wrapped type
48 : satisfies only @ref BufferSource, the read operations are
49 : synthesized using @ref pull and @ref consume with an extra
50 : buffer copy. When the wrapped type satisfies both @ref BufferSource
51 : and @ref ReadSource, the native read operations are forwarded
52 : directly across the virtual boundary, avoiding the copy.
53 :
54 : The wrapper supports two construction modes:
55 : - **Owning**: Pass by value to transfer ownership. The wrapper
56 : allocates storage and owns the source.
57 : - **Reference**: Pass a pointer to wrap without ownership. The
58 : pointed-to source must outlive this wrapper.
59 :
60 : Within each mode, the vtable is populated at compile time based
61 : on whether the wrapped type also satisfies @ref ReadSource:
62 : - **BufferSource only**: @ref read_some and @ref read are
63 : synthesized from @ref pull and @ref consume, incurring one
64 : buffer copy per operation.
65 : - **BufferSource + ReadSource**: All read operations are
66 : forwarded natively through the type-erased boundary with
67 : no extra copy.
68 :
69 : @par Awaitable Preallocation
70 : The constructor preallocates storage for the type-erased awaitable.
71 : This reserves all virtual address space at server startup
72 : so memory usage can be measured up front, rather than
73 : allocating piecemeal as traffic arrives.
74 :
75 : @par Thread Safety
76 : Not thread-safe. Concurrent operations on the same wrapper
77 : are undefined behavior.
78 :
79 : @par Example
80 : @code
81 : // Owning - takes ownership of the source
82 : any_buffer_source abs(some_buffer_source{args...});
83 :
84 : // Reference - wraps without ownership
85 : some_buffer_source src;
86 : any_buffer_source abs(&src);
87 :
88 : const_buffer arr[16];
89 : auto [ec, bufs] = co_await abs.pull(arr);
90 :
91 : // ReadSource interface also available
92 : char buf[64];
93 : auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94 : @endcode
95 :
96 : @see any_buffer_sink, BufferSource, ReadSource
97 : */
98 : class any_buffer_source
99 : {
100 : struct vtable;
101 : struct awaitable_ops;
102 : struct read_awaitable_ops;
103 :
104 : template<BufferSource S>
105 : struct vtable_for_impl;
106 :
107 : // hot-path members first for cache locality
108 : void* source_ = nullptr;
109 : vtable const* vt_ = nullptr;
110 : void* cached_awaitable_ = nullptr;
111 : awaitable_ops const* active_ops_ = nullptr;
112 : read_awaitable_ops const* active_read_ops_ = nullptr;
113 : void* storage_ = nullptr;
114 :
115 : public:
116 : /** Destructor.
117 :
118 : Destroys the owned source (if any) and releases the cached
119 : awaitable storage.
120 : */
121 : ~any_buffer_source();
122 :
123 : /** Construct a default instance.
124 :
125 : Constructs an empty wrapper. Operations on a default-constructed
126 : wrapper result in undefined behavior.
127 : */
128 : any_buffer_source() = default;
129 :
130 : /** Non-copyable.
131 :
132 : The awaitable cache is per-instance and cannot be shared.
133 : */
134 : any_buffer_source(any_buffer_source const&) = delete;
135 : any_buffer_source& operator=(any_buffer_source const&) = delete;
136 :
137 : /** Construct by moving.
138 :
139 : Transfers ownership of the wrapped source (if owned) and
140 : cached awaitable storage from `other`. After the move, `other` is
141 : in a default-constructed state.
142 :
143 : @param other The wrapper to move from.
144 : */
145 HIT 2 : any_buffer_source(any_buffer_source&& other) noexcept
146 2 : : source_(std::exchange(other.source_, nullptr))
147 2 : , vt_(std::exchange(other.vt_, nullptr))
148 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
150 2 : , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151 2 : , storage_(std::exchange(other.storage_, nullptr))
152 : {
153 2 : }
154 :
155 : /** Assign by moving.
156 :
157 : Destroys any owned source and releases existing resources,
158 : then transfers ownership from `other`.
159 :
160 : @param other The wrapper to move from.
161 : @return Reference to this wrapper.
162 : */
163 : any_buffer_source&
164 : operator=(any_buffer_source&& other) noexcept;
165 :
166 : /** Construct by taking ownership of a BufferSource.
167 :
168 : Allocates storage and moves the source into this wrapper.
169 : The wrapper owns the source and will destroy it. If `S` also
170 : satisfies @ref ReadSource, native read operations are
171 : forwarded through the virtual boundary.
172 :
173 : @param s The source to take ownership of.
174 : */
175 : template<BufferSource S>
176 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177 : any_buffer_source(S s);
178 :
179 : /** Construct by wrapping a BufferSource without ownership.
180 :
181 : Wraps the given source by pointer. The source must remain
182 : valid for the lifetime of this wrapper. If `S` also
183 : satisfies @ref ReadSource, native read operations are
184 : forwarded through the virtual boundary.
185 :
186 : @param s Pointer to the source to wrap.
187 : */
188 : template<BufferSource S>
189 : any_buffer_source(S* s);
190 :
191 : /** Check if the wrapper contains a valid source.
192 :
193 : @return `true` if wrapping a source, `false` if default-constructed
194 : or moved-from.
195 : */
196 : bool
197 16 : has_value() const noexcept
198 : {
199 16 : return source_ != nullptr;
200 : }
201 :
202 : /** Check if the wrapper contains a valid source.
203 :
204 : @return `true` if wrapping a source, `false` if default-constructed
205 : or moved-from.
206 : */
207 : explicit
208 2 : operator bool() const noexcept
209 : {
210 2 : return has_value();
211 : }
212 :
213 : /** Consume bytes from the source.
214 :
215 : Advances the internal read position of the underlying source
216 : by the specified number of bytes. The next call to @ref pull
217 : returns data starting after the consumed bytes.
218 :
219 : @param n The number of bytes to consume. Must not exceed the
220 : total size of buffers returned by the previous @ref pull.
221 :
222 : @par Preconditions
223 : The wrapper must contain a valid source (`has_value() == true`).
224 : */
225 : void
226 : consume(std::size_t n) noexcept;
227 :
228 : /** Pull buffer data from the source.
229 :
230 : Fills the provided span with buffer descriptors from the
231 : underlying source. The operation completes when data is
232 : available, the source is exhausted, or an error occurs.
233 :
234 : @param dest Span of const_buffer to fill.
235 :
236 : @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237 : On success with data, a non-empty span of filled buffers.
238 : On EOF, `ec == cond::eof` and span is empty.
239 :
240 : @par Preconditions
241 : The wrapper must contain a valid source (`has_value() == true`).
242 : The caller must not call this function again after a prior
243 : call returned an error.
244 : */
245 : auto
246 : pull(std::span<const_buffer> dest);
247 :
248 : /** Read some data into a mutable buffer sequence.
249 :
250 : Reads one or more bytes into the caller's buffers. May fill
251 : less than the full sequence.
252 :
253 : When the wrapped type provides native @ref ReadSource support,
254 : the operation forwards directly. Otherwise it is synthesized
255 : from @ref pull, @ref buffer_copy, and @ref consume.
256 :
257 : @param buffers The buffer sequence to fill.
258 :
259 : @return An awaitable yielding `(error_code,std::size_t)`.
260 :
261 : @par Preconditions
262 : The wrapper must contain a valid source (`has_value() == true`).
263 : The caller must not call this function again after a prior
264 : call returned an error (including EOF).
265 :
266 : @see pull, consume
267 : */
268 : template<MutableBufferSequence MB>
269 : io_task<std::size_t>
270 : read_some(MB buffers);
271 :
272 : /** Read data into a mutable buffer sequence.
273 :
274 : Fills the provided buffer sequence completely. When the
275 : wrapped type provides native @ref ReadSource support, each
276 : window is forwarded directly. Otherwise the data is
277 : synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278 :
279 : @param buffers The buffer sequence to fill.
280 :
281 : @return An awaitable yielding `(error_code,std::size_t)`.
282 : On success, `n == buffer_size(buffers)`.
283 : On EOF, `ec == error::eof` and `n` is bytes transferred.
284 :
285 : @par Preconditions
286 : The wrapper must contain a valid source (`has_value() == true`).
287 : The caller must not call this function again after a prior
288 : call returned an error (including EOF).
289 :
290 : @see pull, consume
291 : */
292 : template<MutableBufferSequence MB>
293 : io_task<std::size_t>
294 : read(MB buffers);
295 :
296 : protected:
297 : /** Rebind to a new source after move.
298 :
299 : Updates the internal pointer to reference a new source object.
300 : Used by owning wrappers after move assignment when the owned
301 : object has moved to a new location.
302 :
303 : @param new_source The new source to bind to. Must be the same
304 : type as the original source.
305 :
306 : @note Terminates if called with a source of different type
307 : than the original.
308 : */
309 : template<BufferSource S>
310 : void
311 : rebind(S& new_source) noexcept
312 : {
313 : if(vt_ != &vtable_for_impl<S>::value)
314 : std::terminate();
315 : source_ = &new_source;
316 : }
317 :
318 : private:
319 : /** Forward a partial read through the vtable.
320 :
321 : Constructs the underlying `read_some` awaitable in
322 : cached storage and returns a type-erased awaitable.
323 : */
324 : auto
325 : read_some_(std::span<mutable_buffer const> buffers);
326 :
327 : /** Forward a complete read through the vtable.
328 :
329 : Constructs the underlying `read` awaitable in
330 : cached storage and returns a type-erased awaitable.
331 : */
332 : auto
333 : read_(std::span<mutable_buffer const> buffers);
334 : };
335 :
336 : /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
337 : struct any_buffer_source::awaitable_ops
338 : {
339 : bool (*await_ready)(void*);
340 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341 : io_result<std::span<const_buffer>> (*await_resume)(void*);
342 : void (*destroy)(void*) noexcept;
343 : };
344 :
345 : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
346 : struct any_buffer_source::read_awaitable_ops
347 : {
348 : bool (*await_ready)(void*);
349 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
350 : io_result<std::size_t> (*await_resume)(void*);
351 : void (*destroy)(void*) noexcept;
352 : };
353 :
354 : struct any_buffer_source::vtable
355 : {
356 : // BufferSource ops (always populated)
357 : void (*destroy)(void*) noexcept;
358 : void (*do_consume)(void* source, std::size_t n) noexcept;
359 : std::size_t awaitable_size;
360 : std::size_t awaitable_align;
361 : awaitable_ops const* (*construct_awaitable)(
362 : void* source,
363 : void* storage,
364 : std::span<const_buffer> dest);
365 :
366 : // ReadSource forwarding (null when wrapped type is BufferSource-only)
367 : read_awaitable_ops const* (*construct_read_some_awaitable)(
368 : void* source,
369 : void* storage,
370 : std::span<mutable_buffer const> buffers);
371 : read_awaitable_ops const* (*construct_read_awaitable)(
372 : void* source,
373 : void* storage,
374 : std::span<mutable_buffer const> buffers);
375 : };
376 :
377 : template<BufferSource S>
378 : struct any_buffer_source::vtable_for_impl
379 : {
380 : using PullAwaitable = decltype(std::declval<S&>().pull(
381 : std::declval<std::span<const_buffer>>()));
382 :
383 : static void
384 7 : do_destroy_impl(void* source) noexcept
385 : {
386 7 : static_cast<S*>(source)->~S();
387 7 : }
388 :
389 : static void
390 45 : do_consume_impl(void* source, std::size_t n) noexcept
391 : {
392 45 : static_cast<S*>(source)->consume(n);
393 45 : }
394 :
395 : static awaitable_ops const*
396 110 : construct_awaitable_impl(
397 : void* source,
398 : void* storage,
399 : std::span<const_buffer> dest)
400 : {
401 110 : auto& s = *static_cast<S*>(source);
402 110 : ::new(storage) PullAwaitable(s.pull(dest));
403 :
404 : static constexpr awaitable_ops ops = {
405 110 : +[](void* p) {
406 110 : return static_cast<PullAwaitable*>(p)->await_ready();
407 : },
408 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
409 0 : return detail::call_await_suspend(
410 0 : static_cast<PullAwaitable*>(p), h, env);
411 : },
412 HIT 110 : +[](void* p) {
413 110 : return static_cast<PullAwaitable*>(p)->await_resume();
414 : },
415 110 : +[](void* p) noexcept {
416 110 : static_cast<PullAwaitable*>(p)->~PullAwaitable();
417 : }
418 : };
419 110 : return &ops;
420 : }
421 :
422 : static read_awaitable_ops const*
423 48 : construct_read_some_awaitable_impl(
424 : void* source,
425 : void* storage,
426 : std::span<mutable_buffer const> buffers)
427 : requires ReadSource<S>
428 : {
429 : using Aw = decltype(std::declval<S&>().read_some(
430 : std::span<mutable_buffer const>{}));
431 48 : auto& s = *static_cast<S*>(source);
432 48 : ::new(storage) Aw(s.read_some(buffers));
433 :
434 : static constexpr read_awaitable_ops ops = {
435 48 : +[](void* p) {
436 48 : return static_cast<Aw*>(p)->await_ready();
437 : },
438 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
439 0 : return detail::call_await_suspend(
440 0 : static_cast<Aw*>(p), h, env);
441 : },
442 HIT 48 : +[](void* p) {
443 48 : return static_cast<Aw*>(p)->await_resume();
444 : },
445 48 : +[](void* p) noexcept {
446 48 : static_cast<Aw*>(p)->~Aw();
447 : }
448 : };
449 48 : return &ops;
450 : }
451 :
452 : static read_awaitable_ops const*
453 18 : construct_read_awaitable_impl(
454 : void* source,
455 : void* storage,
456 : std::span<mutable_buffer const> buffers)
457 : requires ReadSource<S>
458 : {
459 : using Aw = decltype(std::declval<S&>().read(
460 : std::span<mutable_buffer const>{}));
461 18 : auto& s = *static_cast<S*>(source);
462 18 : ::new(storage) Aw(s.read(buffers));
463 :
464 : static constexpr read_awaitable_ops ops = {
465 18 : +[](void* p) {
466 18 : return static_cast<Aw*>(p)->await_ready();
467 : },
468 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
469 0 : return detail::call_await_suspend(
470 0 : static_cast<Aw*>(p), h, env);
471 : },
472 HIT 18 : +[](void* p) {
473 18 : return static_cast<Aw*>(p)->await_resume();
474 : },
475 18 : +[](void* p) noexcept {
476 18 : static_cast<Aw*>(p)->~Aw();
477 : }
478 : };
479 18 : return &ops;
480 : }
481 :
482 : static consteval std::size_t
483 : compute_max_size() noexcept
484 : {
485 : std::size_t s = sizeof(PullAwaitable);
486 : if constexpr (ReadSource<S>)
487 : {
488 : using RS = decltype(std::declval<S&>().read_some(
489 : std::span<mutable_buffer const>{}));
490 : using R = decltype(std::declval<S&>().read(
491 : std::span<mutable_buffer const>{}));
492 :
493 : if(sizeof(RS) > s) s = sizeof(RS);
494 : if(sizeof(R) > s) s = sizeof(R);
495 : }
496 : return s;
497 : }
498 :
499 : static consteval std::size_t
500 : compute_max_align() noexcept
501 : {
502 : std::size_t a = alignof(PullAwaitable);
503 : if constexpr (ReadSource<S>)
504 : {
505 : using RS = decltype(std::declval<S&>().read_some(
506 : std::span<mutable_buffer const>{}));
507 : using R = decltype(std::declval<S&>().read(
508 : std::span<mutable_buffer const>{}));
509 :
510 : if(alignof(RS) > a) a = alignof(RS);
511 : if(alignof(R) > a) a = alignof(R);
512 : }
513 : return a;
514 : }
515 :
516 : static consteval vtable
517 : make_vtable() noexcept
518 : {
519 : vtable v{};
520 : v.destroy = &do_destroy_impl;
521 : v.do_consume = &do_consume_impl;
522 : v.awaitable_size = compute_max_size();
523 : v.awaitable_align = compute_max_align();
524 : v.construct_awaitable = &construct_awaitable_impl;
525 : v.construct_read_some_awaitable = nullptr;
526 : v.construct_read_awaitable = nullptr;
527 :
528 : if constexpr (ReadSource<S>)
529 : {
530 : v.construct_read_some_awaitable =
531 : &construct_read_some_awaitable_impl;
532 : v.construct_read_awaitable =
533 : &construct_read_awaitable_impl;
534 : }
535 : return v;
536 : }
537 :
538 : static constexpr vtable value = make_vtable();
539 : };
540 :
541 : inline
542 124 : any_buffer_source::~any_buffer_source()
543 : {
544 124 : if(storage_)
545 : {
546 7 : vt_->destroy(source_);
547 7 : ::operator delete(storage_);
548 : }
549 124 : if(cached_awaitable_)
550 119 : ::operator delete(cached_awaitable_);
551 124 : }
552 :
553 : inline any_buffer_source&
554 2 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
555 : {
556 2 : if(this != &other)
557 : {
558 2 : if(storage_)
559 : {
560 MIS 0 : vt_->destroy(source_);
561 0 : ::operator delete(storage_);
562 : }
563 HIT 2 : if(cached_awaitable_)
564 MIS 0 : ::operator delete(cached_awaitable_);
565 HIT 2 : source_ = std::exchange(other.source_, nullptr);
566 2 : vt_ = std::exchange(other.vt_, nullptr);
567 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
568 2 : storage_ = std::exchange(other.storage_, nullptr);
569 2 : active_ops_ = std::exchange(other.active_ops_, nullptr);
570 2 : active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
571 : }
572 2 : return *this;
573 : }
574 :
575 : template<BufferSource S>
576 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
577 7 : any_buffer_source::any_buffer_source(S s)
578 7 : : vt_(&vtable_for_impl<S>::value)
579 : {
580 : struct guard {
581 : any_buffer_source* self;
582 : bool committed = false;
583 7 : ~guard() {
584 7 : if(!committed && self->storage_) {
585 MIS 0 : self->vt_->destroy(self->source_);
586 0 : ::operator delete(self->storage_);
587 0 : self->storage_ = nullptr;
588 0 : self->source_ = nullptr;
589 : }
590 HIT 7 : }
591 7 : } g{this};
592 :
593 7 : storage_ = ::operator new(sizeof(S));
594 7 : source_ = ::new(storage_) S(std::move(s));
595 :
596 7 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
597 :
598 7 : g.committed = true;
599 7 : }
600 :
601 : template<BufferSource S>
602 112 : any_buffer_source::any_buffer_source(S* s)
603 112 : : source_(s)
604 112 : , vt_(&vtable_for_impl<S>::value)
605 : {
606 112 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
607 112 : }
608 :
609 : inline void
610 45 : any_buffer_source::consume(std::size_t n) noexcept
611 : {
612 45 : vt_->do_consume(source_, n);
613 45 : }
614 :
615 : inline auto
616 110 : any_buffer_source::pull(std::span<const_buffer> dest)
617 : {
618 : struct awaitable
619 : {
620 : any_buffer_source* self_;
621 : std::span<const_buffer> dest_;
622 :
623 : bool
624 110 : await_ready()
625 : {
626 220 : self_->active_ops_ = self_->vt_->construct_awaitable(
627 110 : self_->source_,
628 110 : self_->cached_awaitable_,
629 : dest_);
630 110 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
631 : }
632 :
633 : std::coroutine_handle<>
634 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
635 : {
636 0 : return self_->active_ops_->await_suspend(
637 0 : self_->cached_awaitable_, h, env);
638 : }
639 :
640 : io_result<std::span<const_buffer>>
641 HIT 110 : await_resume()
642 : {
643 : struct guard {
644 : any_buffer_source* self;
645 110 : ~guard() {
646 110 : self->active_ops_->destroy(self->cached_awaitable_);
647 110 : self->active_ops_ = nullptr;
648 110 : }
649 110 : } g{self_};
650 110 : return self_->active_ops_->await_resume(
651 195 : self_->cached_awaitable_);
652 110 : }
653 : };
654 110 : return awaitable{this, dest};
655 : }
656 :
657 : inline auto
658 48 : any_buffer_source::read_some_(
659 : std::span<mutable_buffer const> buffers)
660 : {
661 : struct awaitable
662 : {
663 : any_buffer_source* self_;
664 : std::span<mutable_buffer const> buffers_;
665 :
666 : bool
667 48 : await_ready() const noexcept
668 : {
669 48 : return false;
670 : }
671 :
672 : std::coroutine_handle<>
673 48 : await_suspend(std::coroutine_handle<> h, io_env const* env)
674 : {
675 96 : self_->active_read_ops_ =
676 96 : self_->vt_->construct_read_some_awaitable(
677 48 : self_->source_,
678 48 : self_->cached_awaitable_,
679 : buffers_);
680 :
681 48 : if(self_->active_read_ops_->await_ready(
682 48 : self_->cached_awaitable_))
683 48 : return h;
684 :
685 MIS 0 : return self_->active_read_ops_->await_suspend(
686 0 : self_->cached_awaitable_, h, env);
687 : }
688 :
689 : io_result<std::size_t>
690 HIT 48 : await_resume()
691 : {
692 : struct guard {
693 : any_buffer_source* self;
694 48 : ~guard() {
695 48 : self->active_read_ops_->destroy(
696 48 : self->cached_awaitable_);
697 48 : self->active_read_ops_ = nullptr;
698 48 : }
699 48 : } g{self_};
700 48 : return self_->active_read_ops_->await_resume(
701 88 : self_->cached_awaitable_);
702 48 : }
703 : };
704 48 : return awaitable{this, buffers};
705 : }
706 :
707 : inline auto
708 18 : any_buffer_source::read_(
709 : std::span<mutable_buffer const> buffers)
710 : {
711 : struct awaitable
712 : {
713 : any_buffer_source* self_;
714 : std::span<mutable_buffer const> buffers_;
715 :
716 : bool
717 18 : await_ready() const noexcept
718 : {
719 18 : return false;
720 : }
721 :
722 : std::coroutine_handle<>
723 18 : await_suspend(std::coroutine_handle<> h, io_env const* env)
724 : {
725 36 : self_->active_read_ops_ =
726 36 : self_->vt_->construct_read_awaitable(
727 18 : self_->source_,
728 18 : self_->cached_awaitable_,
729 : buffers_);
730 :
731 18 : if(self_->active_read_ops_->await_ready(
732 18 : self_->cached_awaitable_))
733 18 : return h;
734 :
735 MIS 0 : return self_->active_read_ops_->await_suspend(
736 0 : self_->cached_awaitable_, h, env);
737 : }
738 :
739 : io_result<std::size_t>
740 HIT 18 : await_resume()
741 : {
742 : struct guard {
743 : any_buffer_source* self;
744 18 : ~guard() {
745 18 : self->active_read_ops_->destroy(
746 18 : self->cached_awaitable_);
747 18 : self->active_read_ops_ = nullptr;
748 18 : }
749 18 : } g{self_};
750 18 : return self_->active_read_ops_->await_resume(
751 30 : self_->cached_awaitable_);
752 18 : }
753 : };
754 18 : return awaitable{this, buffers};
755 : }
756 :
757 : template<MutableBufferSequence MB>
758 : io_task<std::size_t>
759 58 : any_buffer_source::read_some(MB buffers)
760 : {
761 : buffer_param<MB> bp(buffers);
762 : auto dest = bp.data();
763 : if(dest.empty())
764 : co_return {{}, 0};
765 :
766 : // Native ReadSource path
767 : if(vt_->construct_read_some_awaitable)
768 : co_return co_await read_some_(dest);
769 :
770 : // Synthesized path: pull + buffer_copy + consume
771 : const_buffer arr[detail::max_iovec_];
772 : auto [ec, bufs] = co_await pull(arr);
773 : if(ec)
774 : co_return {ec, 0};
775 :
776 : auto n = buffer_copy(dest, bufs);
777 : consume(n);
778 : co_return {{}, n};
779 116 : }
780 :
781 : template<MutableBufferSequence MB>
782 : io_task<std::size_t>
783 24 : any_buffer_source::read(MB buffers)
784 : {
785 : buffer_param<MB> bp(buffers);
786 : std::size_t total = 0;
787 :
788 : // Native ReadSource path
789 : if(vt_->construct_read_awaitable)
790 : {
791 : for(;;)
792 : {
793 : auto dest = bp.data();
794 : if(dest.empty())
795 : break;
796 :
797 : auto [ec, n] = co_await read_(dest);
798 : total += n;
799 : if(ec)
800 : co_return {ec, total};
801 : bp.consume(n);
802 : }
803 : co_return {{}, total};
804 : }
805 :
806 : // Synthesized path: pull + buffer_copy + consume
807 : for(;;)
808 : {
809 : auto dest = bp.data();
810 : if(dest.empty())
811 : break;
812 :
813 : const_buffer arr[detail::max_iovec_];
814 : auto [ec, bufs] = co_await pull(arr);
815 :
816 : if(ec)
817 : co_return {ec, total};
818 :
819 : auto n = buffer_copy(dest, bufs);
820 : consume(n);
821 : total += n;
822 : bp.consume(n);
823 : }
824 :
825 : co_return {{}, total};
826 48 : }
827 :
828 : static_assert(BufferSource<any_buffer_source>);
829 : static_assert(ReadSource<any_buffer_source>);
830 :
831 : } // namespace capy
832 : } // namespace boost
833 :
834 : #endif
|