include/boost/capy/io/any_buffer_source.hpp

87.7% Lines (143/163) 90.7% Functions (39/43)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/buffers/slice.hpp>
19 #include <boost/capy/concept/buffer_source.hpp>
20 #include <boost/capy/concept/io_awaitable.hpp>
21 #include <boost/capy/concept/read_source.hpp>
22 #include <boost/capy/error.hpp>
23 #include <boost/capy/ex/io_env.hpp>
24 #include <boost/capy/io_result.hpp>
25 #include <boost/capy/io_task.hpp>
26
27 #include <concepts>
28 #include <coroutine>
29 #include <cstddef>
30 #include <exception>
31 #include <new>
32 #include <span>
33 #include <stop_token>
34 #include <system_error>
35 #include <utility>
36
37 namespace boost {
38 namespace capy {
39
40 /** Type-erased wrapper for any BufferSource.
41
42 This class provides type erasure for any type satisfying the
43 @ref BufferSource concept, enabling runtime polymorphism for
44 buffer pull operations. It uses cached awaitable storage to achieve
45 zero steady-state allocation after construction.
46
47 The wrapper also satisfies @ref ReadSource. When the wrapped type
48 satisfies only @ref BufferSource, the read operations are
49 synthesized using @ref pull and @ref consume with an extra
50 buffer copy. When the wrapped type satisfies both @ref BufferSource
51 and @ref ReadSource, the native read operations are forwarded
52 directly across the virtual boundary, avoiding the copy.
53
54 The wrapper supports two construction modes:
55 - **Owning**: Pass by value to transfer ownership. The wrapper
56 allocates storage and owns the source.
57 - **Reference**: Pass a pointer to wrap without ownership. The
58 pointed-to source must outlive this wrapper.
59
60 Within each mode, the vtable is populated at compile time based
61 on whether the wrapped type also satisfies @ref ReadSource:
62 - **BufferSource only**: @ref read_some and @ref read are
63 synthesized from @ref pull and @ref consume, incurring one
64 buffer copy per operation.
65 - **BufferSource + ReadSource**: All read operations are
66 forwarded natively through the type-erased boundary with
67 no extra copy.
68
69 @par Awaitable Preallocation
70 The constructor preallocates storage for the type-erased awaitable.
71 This reserves all virtual address space at server startup
72 so memory usage can be measured up front, rather than
73 allocating piecemeal as traffic arrives.
74
75 @par Thread Safety
76 Not thread-safe. Concurrent operations on the same wrapper
77 are undefined behavior.
78
79 @par Example
80 @code
81 // Owning - takes ownership of the source
82 any_buffer_source abs(some_buffer_source{args...});
83
84 // Reference - wraps without ownership
85 some_buffer_source src;
86 any_buffer_source abs(&src);
87
88 const_buffer arr[16];
89 auto [ec, bufs] = co_await abs.pull(arr);
90
91 // ReadSource interface also available
92 char buf[64];
93 auto [ec2, n] = co_await abs.read_some(mutable_buffer(buf, 64));
94 @endcode
95
96 @see any_buffer_sink, BufferSource, ReadSource
97 */
98 class any_buffer_source
99 {
100 struct vtable;
101 struct awaitable_ops;
102 struct read_awaitable_ops;
103
104 template<BufferSource S>
105 struct vtable_for_impl;
106
107 // hot-path members first for cache locality
108 void* source_ = nullptr;
109 vtable const* vt_ = nullptr;
110 void* cached_awaitable_ = nullptr;
111 awaitable_ops const* active_ops_ = nullptr;
112 read_awaitable_ops const* active_read_ops_ = nullptr;
113 void* storage_ = nullptr;
114
115 public:
116 /** Destructor.
117
118 Destroys the owned source (if any) and releases the cached
119 awaitable storage.
120 */
121 ~any_buffer_source();
122
123 /** Construct a default instance.
124
125 Constructs an empty wrapper. Operations on a default-constructed
126 wrapper result in undefined behavior.
127 */
128 any_buffer_source() = default;
129
130 /** Non-copyable.
131
132 The awaitable cache is per-instance and cannot be shared.
133 */
134 any_buffer_source(any_buffer_source const&) = delete;
135 any_buffer_source& operator=(any_buffer_source const&) = delete;
136
137 /** Construct by moving.
138
139 Transfers ownership of the wrapped source (if owned) and
140 cached awaitable storage from `other`. After the move, `other` is
141 in a default-constructed state.
142
143 @param other The wrapper to move from.
144 */
145 2x any_buffer_source(any_buffer_source&& other) noexcept
146 2x : source_(std::exchange(other.source_, nullptr))
147 2x , vt_(std::exchange(other.vt_, nullptr))
148 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
149 2x , active_ops_(std::exchange(other.active_ops_, nullptr))
150 2x , active_read_ops_(std::exchange(other.active_read_ops_, nullptr))
151 2x , storage_(std::exchange(other.storage_, nullptr))
152 {
153 2x }
154
155 /** Assign by moving.
156
157 Destroys any owned source and releases existing resources,
158 then transfers ownership from `other`.
159
160 @param other The wrapper to move from.
161 @return Reference to this wrapper.
162 */
163 any_buffer_source&
164 operator=(any_buffer_source&& other) noexcept;
165
166 /** Construct by taking ownership of a BufferSource.
167
168 Allocates storage and moves the source into this wrapper.
169 The wrapper owns the source and will destroy it. If `S` also
170 satisfies @ref ReadSource, native read operations are
171 forwarded through the virtual boundary.
172
173 @param s The source to take ownership of.
174 */
175 template<BufferSource S>
176 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
177 any_buffer_source(S s);
178
179 /** Construct by wrapping a BufferSource without ownership.
180
181 Wraps the given source by pointer. The source must remain
182 valid for the lifetime of this wrapper. If `S` also
183 satisfies @ref ReadSource, native read operations are
184 forwarded through the virtual boundary.
185
186 @param s Pointer to the source to wrap.
187 */
188 template<BufferSource S>
189 any_buffer_source(S* s);
190
191 /** Check if the wrapper contains a valid source.
192
193 @return `true` if wrapping a source, `false` if default-constructed
194 or moved-from.
195 */
196 bool
197 16x has_value() const noexcept
198 {
199 16x return source_ != nullptr;
200 }
201
202 /** Check if the wrapper contains a valid source.
203
204 @return `true` if wrapping a source, `false` if default-constructed
205 or moved-from.
206 */
207 explicit
208 2x operator bool() const noexcept
209 {
210 2x return has_value();
211 }
212
213 /** Consume bytes from the source.
214
215 Advances the internal read position of the underlying source
216 by the specified number of bytes. The next call to @ref pull
217 returns data starting after the consumed bytes.
218
219 @param n The number of bytes to consume. Must not exceed the
220 total size of buffers returned by the previous @ref pull.
221
222 @par Preconditions
223 The wrapper must contain a valid source (`has_value() == true`).
224 */
225 void
226 consume(std::size_t n) noexcept;
227
228 /** Pull buffer data from the source.
229
230 Fills the provided span with buffer descriptors from the
231 underlying source. The operation completes when data is
232 available, the source is exhausted, or an error occurs.
233
234 @param dest Span of const_buffer to fill.
235
236 @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
237 On success with data, a non-empty span of filled buffers.
238 On EOF, `ec == cond::eof` and span is empty.
239
240 @par Preconditions
241 The wrapper must contain a valid source (`has_value() == true`).
242 The caller must not call this function again after a prior
243 call returned an error.
244 */
245 auto
246 pull(std::span<const_buffer> dest);
247
248 /** Read some data into a mutable buffer sequence.
249
250 Reads one or more bytes into the caller's buffers. May fill
251 less than the full sequence.
252
253 When the wrapped type provides native @ref ReadSource support,
254 the operation forwards directly. Otherwise it is synthesized
255 from @ref pull, @ref buffer_copy, and @ref consume.
256
257 @param buffers The buffer sequence to fill.
258
259 @return An awaitable yielding `(error_code,std::size_t)`.
260
261 @par Preconditions
262 The wrapper must contain a valid source (`has_value() == true`).
263 The caller must not call this function again after a prior
264 call returned an error (including EOF).
265
266 @see pull, consume
267 */
268 template<MutableBufferSequence MB>
269 io_task<std::size_t>
270 read_some(MB buffers);
271
272 /** Read data into a mutable buffer sequence.
273
274 Fills the provided buffer sequence completely. When the
275 wrapped type provides native @ref ReadSource support, each
276 window is forwarded directly. Otherwise the data is
277 synthesized from @ref pull, @ref buffer_copy, and @ref consume.
278
279 @param buffers The buffer sequence to fill.
280
281 @return An awaitable yielding `(error_code,std::size_t)`.
282 On success, `n == buffer_size(buffers)`.
283 On EOF, `ec == error::eof` and `n` is bytes transferred.
284
285 @par Preconditions
286 The wrapper must contain a valid source (`has_value() == true`).
287 The caller must not call this function again after a prior
288 call returned an error (including EOF).
289
290 @see pull, consume
291 */
292 template<MutableBufferSequence MB>
293 io_task<std::size_t>
294 read(MB buffers);
295
296 protected:
297 /** Rebind to a new source after move.
298
299 Updates the internal pointer to reference a new source object.
300 Used by owning wrappers after move assignment when the owned
301 object has moved to a new location.
302
303 @param new_source The new source to bind to. Must be the same
304 type as the original source.
305
306 @note Terminates if called with a source of different type
307 than the original.
308 */
309 template<BufferSource S>
310 void
311 rebind(S& new_source) noexcept
312 {
313 if(vt_ != &vtable_for_impl<S>::value)
314 std::terminate();
315 source_ = &new_source;
316 }
317
318 private:
319 /** Forward a partial read through the vtable.
320
321 Constructs the underlying `read_some` awaitable in
322 cached storage and returns a type-erased awaitable.
323 */
324 auto
325 read_some_(std::span<mutable_buffer const> buffers);
326
327 /** Forward a complete read through the vtable.
328
329 Constructs the underlying `read` awaitable in
330 cached storage and returns a type-erased awaitable.
331 */
332 auto
333 read_(std::span<mutable_buffer const> buffers);
334 };
335
336 /** Type-erased ops for awaitables yielding `io_result<std::span<const_buffer>>`. */
337 struct any_buffer_source::awaitable_ops
338 {
339 bool (*await_ready)(void*);
340 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
341 io_result<std::span<const_buffer>> (*await_resume)(void*);
342 void (*destroy)(void*) noexcept;
343 };
344
345 /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
346 struct any_buffer_source::read_awaitable_ops
347 {
348 bool (*await_ready)(void*);
349 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
350 io_result<std::size_t> (*await_resume)(void*);
351 void (*destroy)(void*) noexcept;
352 };
353
354 struct any_buffer_source::vtable
355 {
356 // BufferSource ops (always populated)
357 void (*destroy)(void*) noexcept;
358 void (*do_consume)(void* source, std::size_t n) noexcept;
359 std::size_t awaitable_size;
360 std::size_t awaitable_align;
361 awaitable_ops const* (*construct_awaitable)(
362 void* source,
363 void* storage,
364 std::span<const_buffer> dest);
365
366 // ReadSource forwarding (null when wrapped type is BufferSource-only)
367 read_awaitable_ops const* (*construct_read_some_awaitable)(
368 void* source,
369 void* storage,
370 std::span<mutable_buffer const> buffers);
371 read_awaitable_ops const* (*construct_read_awaitable)(
372 void* source,
373 void* storage,
374 std::span<mutable_buffer const> buffers);
375 };
376
377 template<BufferSource S>
378 struct any_buffer_source::vtable_for_impl
379 {
380 using PullAwaitable = decltype(std::declval<S&>().pull(
381 std::declval<std::span<const_buffer>>()));
382
383 static void
384 7x do_destroy_impl(void* source) noexcept
385 {
386 7x static_cast<S*>(source)->~S();
387 7x }
388
389 static void
390 45x do_consume_impl(void* source, std::size_t n) noexcept
391 {
392 45x static_cast<S*>(source)->consume(n);
393 45x }
394
395 static awaitable_ops const*
396 110x construct_awaitable_impl(
397 void* source,
398 void* storage,
399 std::span<const_buffer> dest)
400 {
401 110x auto& s = *static_cast<S*>(source);
402 110x ::new(storage) PullAwaitable(s.pull(dest));
403
404 static constexpr awaitable_ops ops = {
405 +[](void* p) {
406 return static_cast<PullAwaitable*>(p)->await_ready();
407 },
408 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
409 return detail::call_await_suspend(
410 static_cast<PullAwaitable*>(p), h, env);
411 },
412 +[](void* p) {
413 return static_cast<PullAwaitable*>(p)->await_resume();
414 },
415 +[](void* p) noexcept {
416 static_cast<PullAwaitable*>(p)->~PullAwaitable();
417 }
418 };
419 110x return &ops;
420 }
421
422 static read_awaitable_ops const*
423 48x construct_read_some_awaitable_impl(
424 void* source,
425 void* storage,
426 std::span<mutable_buffer const> buffers)
427 requires ReadSource<S>
428 {
429 using Aw = decltype(std::declval<S&>().read_some(
430 std::span<mutable_buffer const>{}));
431 48x auto& s = *static_cast<S*>(source);
432 48x ::new(storage) Aw(s.read_some(buffers));
433
434 static constexpr read_awaitable_ops ops = {
435 48x +[](void* p) {
436 48x return static_cast<Aw*>(p)->await_ready();
437 },
438 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
439 return detail::call_await_suspend(
440 static_cast<Aw*>(p), h, env);
441 },
442 48x +[](void* p) {
443 48x return static_cast<Aw*>(p)->await_resume();
444 },
445 48x +[](void* p) noexcept {
446 48x static_cast<Aw*>(p)->~Aw();
447 }
448 };
449 48x return &ops;
450 }
451
452 static read_awaitable_ops const*
453 18x construct_read_awaitable_impl(
454 void* source,
455 void* storage,
456 std::span<mutable_buffer const> buffers)
457 requires ReadSource<S>
458 {
459 using Aw = decltype(std::declval<S&>().read(
460 std::span<mutable_buffer const>{}));
461 18x auto& s = *static_cast<S*>(source);
462 18x ::new(storage) Aw(s.read(buffers));
463
464 static constexpr read_awaitable_ops ops = {
465 18x +[](void* p) {
466 18x return static_cast<Aw*>(p)->await_ready();
467 },
468 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
469 return detail::call_await_suspend(
470 static_cast<Aw*>(p), h, env);
471 },
472 18x +[](void* p) {
473 18x return static_cast<Aw*>(p)->await_resume();
474 },
475 18x +[](void* p) noexcept {
476 18x static_cast<Aw*>(p)->~Aw();
477 }
478 };
479 18x return &ops;
480 }
481
482 static consteval std::size_t
483 compute_max_size() noexcept
484 {
485 std::size_t s = sizeof(PullAwaitable);
486 if constexpr (ReadSource<S>)
487 {
488 using RS = decltype(std::declval<S&>().read_some(
489 std::span<mutable_buffer const>{}));
490 using R = decltype(std::declval<S&>().read(
491 std::span<mutable_buffer const>{}));
492
493 if(sizeof(RS) > s) s = sizeof(RS);
494 if(sizeof(R) > s) s = sizeof(R);
495 }
496 return s;
497 }
498
499 static consteval std::size_t
500 compute_max_align() noexcept
501 {
502 std::size_t a = alignof(PullAwaitable);
503 if constexpr (ReadSource<S>)
504 {
505 using RS = decltype(std::declval<S&>().read_some(
506 std::span<mutable_buffer const>{}));
507 using R = decltype(std::declval<S&>().read(
508 std::span<mutable_buffer const>{}));
509
510 if(alignof(RS) > a) a = alignof(RS);
511 if(alignof(R) > a) a = alignof(R);
512 }
513 return a;
514 }
515
516 static consteval vtable
517 make_vtable() noexcept
518 {
519 vtable v{};
520 v.destroy = &do_destroy_impl;
521 v.do_consume = &do_consume_impl;
522 v.awaitable_size = compute_max_size();
523 v.awaitable_align = compute_max_align();
524 v.construct_awaitable = &construct_awaitable_impl;
525 v.construct_read_some_awaitable = nullptr;
526 v.construct_read_awaitable = nullptr;
527
528 if constexpr (ReadSource<S>)
529 {
530 v.construct_read_some_awaitable =
531 &construct_read_some_awaitable_impl;
532 v.construct_read_awaitable =
533 &construct_read_awaitable_impl;
534 }
535 return v;
536 }
537
538 static constexpr vtable value = make_vtable();
539 };
540
541 inline
542 124x any_buffer_source::~any_buffer_source()
543 {
544 124x if(storage_)
545 {
546 7x vt_->destroy(source_);
547 7x ::operator delete(storage_);
548 }
549 124x if(cached_awaitable_)
550 119x ::operator delete(cached_awaitable_);
551 124x }
552
553 inline any_buffer_source&
554 2x any_buffer_source::operator=(any_buffer_source&& other) noexcept
555 {
556 2x if(this != &other)
557 {
558 2x if(storage_)
559 {
560 vt_->destroy(source_);
561 ::operator delete(storage_);
562 }
563 2x if(cached_awaitable_)
564 ::operator delete(cached_awaitable_);
565 2x source_ = std::exchange(other.source_, nullptr);
566 2x vt_ = std::exchange(other.vt_, nullptr);
567 2x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
568 2x storage_ = std::exchange(other.storage_, nullptr);
569 2x active_ops_ = std::exchange(other.active_ops_, nullptr);
570 2x active_read_ops_ = std::exchange(other.active_read_ops_, nullptr);
571 }
572 2x return *this;
573 }
574
575 template<BufferSource S>
576 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
577 7x any_buffer_source::any_buffer_source(S s)
578 7x : vt_(&vtable_for_impl<S>::value)
579 {
580 struct guard {
581 any_buffer_source* self;
582 bool committed = false;
583 7x ~guard() {
584 7x if(!committed && self->storage_) {
585 self->vt_->destroy(self->source_);
586 ::operator delete(self->storage_);
587 self->storage_ = nullptr;
588 self->source_ = nullptr;
589 }
590 7x }
591 7x } g{this};
592
593 7x storage_ = ::operator new(sizeof(S));
594 7x source_ = ::new(storage_) S(std::move(s));
595
596 7x cached_awaitable_ = ::operator new(vt_->awaitable_size);
597
598 7x g.committed = true;
599 7x }
600
601 template<BufferSource S>
602 112x any_buffer_source::any_buffer_source(S* s)
603 112x : source_(s)
604 112x , vt_(&vtable_for_impl<S>::value)
605 {
606 112x cached_awaitable_ = ::operator new(vt_->awaitable_size);
607 112x }
608
609 inline void
610 45x any_buffer_source::consume(std::size_t n) noexcept
611 {
612 45x vt_->do_consume(source_, n);
613 45x }
614
615 inline auto
616 110x any_buffer_source::pull(std::span<const_buffer> dest)
617 {
618 struct awaitable
619 {
620 any_buffer_source* self_;
621 std::span<const_buffer> dest_;
622
623 bool
624 110x await_ready()
625 {
626 220x self_->active_ops_ = self_->vt_->construct_awaitable(
627 110x self_->source_,
628 110x self_->cached_awaitable_,
629 dest_);
630 110x return self_->active_ops_->await_ready(self_->cached_awaitable_);
631 }
632
633 std::coroutine_handle<>
634 await_suspend(std::coroutine_handle<> h, io_env const* env)
635 {
636 return self_->active_ops_->await_suspend(
637 self_->cached_awaitable_, h, env);
638 }
639
640 io_result<std::span<const_buffer>>
641 110x await_resume()
642 {
643 struct guard {
644 any_buffer_source* self;
645 110x ~guard() {
646 110x self->active_ops_->destroy(self->cached_awaitable_);
647 110x self->active_ops_ = nullptr;
648 110x }
649 110x } g{self_};
650 110x return self_->active_ops_->await_resume(
651 195x self_->cached_awaitable_);
652 110x }
653 };
654 110x return awaitable{this, dest};
655 }
656
657 inline auto
658 48x any_buffer_source::read_some_(
659 std::span<mutable_buffer const> buffers)
660 {
661 struct awaitable
662 {
663 any_buffer_source* self_;
664 std::span<mutable_buffer const> buffers_;
665
666 bool
667 48x await_ready() const noexcept
668 {
669 48x return false;
670 }
671
672 std::coroutine_handle<>
673 48x await_suspend(std::coroutine_handle<> h, io_env const* env)
674 {
675 96x self_->active_read_ops_ =
676 96x self_->vt_->construct_read_some_awaitable(
677 48x self_->source_,
678 48x self_->cached_awaitable_,
679 buffers_);
680
681 48x if(self_->active_read_ops_->await_ready(
682 48x self_->cached_awaitable_))
683 48x return h;
684
685 return self_->active_read_ops_->await_suspend(
686 self_->cached_awaitable_, h, env);
687 }
688
689 io_result<std::size_t>
690 48x await_resume()
691 {
692 struct guard {
693 any_buffer_source* self;
694 48x ~guard() {
695 48x self->active_read_ops_->destroy(
696 48x self->cached_awaitable_);
697 48x self->active_read_ops_ = nullptr;
698 48x }
699 48x } g{self_};
700 48x return self_->active_read_ops_->await_resume(
701 88x self_->cached_awaitable_);
702 48x }
703 };
704 48x return awaitable{this, buffers};
705 }
706
707 inline auto
708 18x any_buffer_source::read_(
709 std::span<mutable_buffer const> buffers)
710 {
711 struct awaitable
712 {
713 any_buffer_source* self_;
714 std::span<mutable_buffer const> buffers_;
715
716 bool
717 18x await_ready() const noexcept
718 {
719 18x return false;
720 }
721
722 std::coroutine_handle<>
723 18x await_suspend(std::coroutine_handle<> h, io_env const* env)
724 {
725 36x self_->active_read_ops_ =
726 36x self_->vt_->construct_read_awaitable(
727 18x self_->source_,
728 18x self_->cached_awaitable_,
729 buffers_);
730
731 18x if(self_->active_read_ops_->await_ready(
732 18x self_->cached_awaitable_))
733 18x return h;
734
735 return self_->active_read_ops_->await_suspend(
736 self_->cached_awaitable_, h, env);
737 }
738
739 io_result<std::size_t>
740 18x await_resume()
741 {
742 struct guard {
743 any_buffer_source* self;
744 18x ~guard() {
745 18x self->active_read_ops_->destroy(
746 18x self->cached_awaitable_);
747 18x self->active_read_ops_ = nullptr;
748 18x }
749 18x } g{self_};
750 18x return self_->active_read_ops_->await_resume(
751 30x self_->cached_awaitable_);
752 18x }
753 };
754 18x return awaitable{this, buffers};
755 }
756
757 template<MutableBufferSequence MB>
758 io_task<std::size_t>
759 58x any_buffer_source::read_some(MB buffers)
760 {
761 buffer_param<MB> bp(buffers);
762 auto dest = bp.data();
763 if(dest.empty())
764 co_return {{}, 0};
765
766 // Native ReadSource path
767 if(vt_->construct_read_some_awaitable)
768 co_return co_await read_some_(dest);
769
770 // Synthesized path: pull + buffer_copy + consume
771 const_buffer arr[detail::max_iovec_];
772 auto [ec, bufs] = co_await pull(arr);
773 if(ec)
774 co_return {ec, 0};
775
776 auto n = buffer_copy(dest, bufs);
777 consume(n);
778 co_return {{}, n};
779 116x }
780
781 template<MutableBufferSequence MB>
782 io_task<std::size_t>
783 24x any_buffer_source::read(MB buffers)
784 {
785 buffer_param<MB> bp(buffers);
786 std::size_t total = 0;
787
788 // Native ReadSource path
789 if(vt_->construct_read_awaitable)
790 {
791 for(;;)
792 {
793 auto dest = bp.data();
794 if(dest.empty())
795 break;
796
797 auto [ec, n] = co_await read_(dest);
798 total += n;
799 if(ec)
800 co_return {ec, total};
801 bp.consume(n);
802 }
803 co_return {{}, total};
804 }
805
806 // Synthesized path: pull + buffer_copy + consume
807 for(;;)
808 {
809 auto dest = bp.data();
810 if(dest.empty())
811 break;
812
813 const_buffer arr[detail::max_iovec_];
814 auto [ec, bufs] = co_await pull(arr);
815
816 if(ec)
817 co_return {ec, total};
818
819 auto n = buffer_copy(dest, bufs);
820 consume(n);
821 total += n;
822 bp.consume(n);
823 }
824
825 co_return {{}, total};
826 48x }
827
828 static_assert(BufferSource<any_buffer_source>);
829 static_assert(ReadSource<any_buffer_source>);
830
831 } // namespace capy
832 } // namespace boost
833
834 #endif
835