include/boost/capy/io/any_buffer_sink.hpp

89.5% Lines (229/256) 88.7% Functions (63/71)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any BufferSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref BufferSink concept, enabling runtime polymorphism for
42 buffer sink operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper exposes two interfaces for producing data:
46 the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 and the @ref WriteSink interface (`write_some`, `write`,
48 `write_eof`). Choose the interface that matches how your data
49 is produced:
50
51 @par Choosing an Interface
52
53 Use the **BufferSink** interface when you are a generator that
54 produces data into externally-provided buffers. The sink owns
55 the memory; you call @ref prepare to obtain writable buffers,
56 fill them, then call @ref commit or @ref commit_eof.
57
58 Use the **WriteSink** interface when you already have buffers
59 containing the data to write:
60 - If the entire body is available up front, call
61 @ref write_eof(buffers) to send everything atomically.
62 - If data arrives incrementally, call @ref write or
63 @ref write_some in a loop, then @ref write_eof() when done.
64 Prefer `write` (complete) unless your streaming pattern
65 benefits from partial writes via `write_some`.
66
67 If the wrapped type only satisfies @ref BufferSink, the
68 @ref WriteSink operations are provided automatically.
69
70 @par Construction Modes
71
72 - **Owning**: Pass by value to transfer ownership. The wrapper
73 allocates storage and owns the sink.
74 - **Reference**: Pass a pointer to wrap without ownership. The
75 pointed-to sink must outlive this wrapper.
76
77 @par Awaitable Preallocation
78 The constructor preallocates storage for the type-erased awaitable.
79 This reserves all virtual address space at server startup
80 so memory usage can be measured up front, rather than
81 allocating piecemeal as traffic arrives.
82
83 @par Thread Safety
84 Not thread-safe. Concurrent operations on the same wrapper
85 are undefined behavior.
86
87 @par Example
88 @code
89 // Owning - takes ownership of the sink
90 any_buffer_sink abs(some_buffer_sink{args...});
91
92 // Reference - wraps without ownership
93 some_buffer_sink sink;
94 any_buffer_sink abs(&sink);
95
96 // BufferSink interface: generate into callee-owned buffers
97 mutable_buffer arr[16];
98 auto bufs = abs.prepare(arr);
99 // Write data into bufs[0..bufs.size())
100 auto [ec] = co_await abs.commit(bytes_written);
101 auto [ec2] = co_await abs.commit_eof(0);
102
103 // WriteSink interface: send caller-owned buffers
104 auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 auto [ec4] = co_await abs.write_eof();
106
107 // Or send everything at once
108 auto [ec5, n2] = co_await abs.write_eof(
109 make_buffer(body_data));
110 @endcode
111
112 @see any_buffer_source, BufferSink, WriteSink
113 */
114 class any_buffer_sink
115 {
116 struct vtable;
117 struct awaitable_ops;
118 struct write_awaitable_ops;
119
120 template<BufferSink S>
121 struct vtable_for_impl;
122
123 // hot-path members first for cache locality
124 void* sink_ = nullptr;
125 vtable const* vt_ = nullptr;
126 void* cached_awaitable_ = nullptr;
127 awaitable_ops const* active_ops_ = nullptr;
128 write_awaitable_ops const* active_write_ops_ = nullptr;
129 void* storage_ = nullptr;
130
131 public:
132 /** Destructor.
133
134 Destroys the owned sink (if any) and releases the cached
135 awaitable storage.
136 */
137 ~any_buffer_sink();
138
139 /** Construct a default instance.
140
141 Constructs an empty wrapper. Operations on a default-constructed
142 wrapper result in undefined behavior.
143 */
144 any_buffer_sink() = default;
145
146 /** Non-copyable.
147
148 The awaitable cache is per-instance and cannot be shared.
149 */
150 any_buffer_sink(any_buffer_sink const&) = delete;
151 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152
153 /** Construct by moving.
154
155 Transfers ownership of the wrapped sink (if owned) and
156 cached awaitable storage from `other`. After the move, `other` is
157 in a default-constructed state.
158
159 @param other The wrapper to move from.
160 */
161 2x any_buffer_sink(any_buffer_sink&& other) noexcept
162 2x : sink_(std::exchange(other.sink_, nullptr))
163 2x , vt_(std::exchange(other.vt_, nullptr))
164 2x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2x , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2x , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2x , storage_(std::exchange(other.storage_, nullptr))
168 {
169 2x }
170
171 /** Assign by moving.
172
173 Destroys any owned sink and releases existing resources,
174 then transfers ownership from `other`.
175
176 @param other The wrapper to move from.
177 @return Reference to this wrapper.
178 */
179 any_buffer_sink&
180 operator=(any_buffer_sink&& other) noexcept;
181
182 /** Construct by taking ownership of a BufferSink.
183
184 Allocates storage and moves the sink into this wrapper.
185 The wrapper owns the sink and will destroy it. If `S` also
186 satisfies @ref WriteSink, native write operations are
187 forwarded through the virtual boundary.
188
189 @param s The sink to take ownership of.
190 */
191 template<BufferSink S>
192 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 any_buffer_sink(S s);
194
195 /** Construct by wrapping a BufferSink without ownership.
196
197 Wraps the given sink by pointer. The sink must remain
198 valid for the lifetime of this wrapper. If `S` also
199 satisfies @ref WriteSink, native write operations are
200 forwarded through the virtual boundary.
201
202 @param s Pointer to the sink to wrap.
203 */
204 template<BufferSink S>
205 any_buffer_sink(S* s);
206
207 /** Check if the wrapper contains a valid sink.
208
209 @return `true` if wrapping a sink, `false` if default-constructed
210 or moved-from.
211 */
212 bool
213 26x has_value() const noexcept
214 {
215 26x return sink_ != nullptr;
216 }
217
218 /** Check if the wrapper contains a valid sink.
219
220 @return `true` if wrapping a sink, `false` if default-constructed
221 or moved-from.
222 */
223 explicit
224 3x operator bool() const noexcept
225 {
226 3x return has_value();
227 }
228
229 /** Prepare writable buffers.
230
231 Fills the provided span with mutable buffer descriptors
232 pointing to the underlying sink's internal storage. This
233 operation is synchronous.
234
235 @param dest Span of mutable_buffer to fill.
236
237 @return A span of filled buffers.
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 std::span<mutable_buffer>
243 prepare(std::span<mutable_buffer> dest);
244
245 /** Commit bytes written to the prepared buffers.
246
247 Commits `n` bytes written to the buffers returned by the
248 most recent call to @ref prepare. The operation may trigger
249 underlying I/O.
250
251 @param n The number of bytes to commit.
252
253 @return An awaitable yielding `(error_code)`.
254
255 @par Preconditions
256 The wrapper must contain a valid sink (`has_value() == true`).
257 */
258 auto
259 commit(std::size_t n);
260
261 /** Commit final bytes and signal end-of-stream.
262
263 Commits `n` bytes written to the buffers returned by the
264 most recent call to @ref prepare and finalizes the sink.
265 After success, no further operations are permitted.
266
267 @param n The number of bytes to commit.
268
269 @return An awaitable yielding `(error_code)`.
270
271 @par Preconditions
272 The wrapper must contain a valid sink (`has_value() == true`).
273 */
274 auto
275 commit_eof(std::size_t n);
276
277 /** Write some data from a buffer sequence.
278
279 Writes one or more bytes from the buffer sequence to the
280 underlying sink. May consume less than the full sequence.
281
282 When the wrapped type provides native @ref WriteSink support,
283 the operation forwards directly. Otherwise it is synthesized
284 from @ref prepare and @ref commit with a buffer copy.
285
286 @param buffers The buffer sequence to write.
287
288 @return An awaitable yielding `(error_code,std::size_t)`.
289
290 @par Preconditions
291 The wrapper must contain a valid sink (`has_value() == true`).
292 */
293 template<ConstBufferSequence CB>
294 io_task<std::size_t>
295 write_some(CB buffers);
296
297 /** Write all data from a buffer sequence.
298
299 Writes all data from the buffer sequence to the underlying
300 sink. This method satisfies the @ref WriteSink concept.
301
302 When the wrapped type provides native @ref WriteSink support,
303 each window is forwarded directly. Otherwise the data is
304 copied into the sink via @ref prepare and @ref commit.
305
306 @param buffers The buffer sequence to write.
307
308 @return An awaitable yielding `(error_code,std::size_t)`.
309
310 @par Preconditions
311 The wrapper must contain a valid sink (`has_value() == true`).
312 */
313 template<ConstBufferSequence CB>
314 io_task<std::size_t>
315 write(CB buffers);
316
317 /** Atomically write data and signal end-of-stream.
318
319 Writes all data from the buffer sequence to the underlying
320 sink and then signals end-of-stream.
321
322 When the wrapped type provides native @ref WriteSink support,
323 the final window is sent atomically via the underlying
324 `write_eof(buffers)`. Otherwise the data is synthesized
325 through @ref prepare, @ref commit, and @ref commit_eof.
326
327 @param buffers The buffer sequence to write.
328
329 @return An awaitable yielding `(error_code,std::size_t)`.
330
331 @par Preconditions
332 The wrapper must contain a valid sink (`has_value() == true`).
333 */
334 template<ConstBufferSequence CB>
335 io_task<std::size_t>
336 write_eof(CB buffers);
337
338 /** Signal end-of-stream.
339
340 Indicates that no more data will be written to the sink.
341 This method satisfies the @ref WriteSink concept.
342
343 When the wrapped type provides native @ref WriteSink support,
344 the underlying `write_eof()` is called. Otherwise the
345 operation is implemented as `commit_eof(0)`.
346
347 @return An awaitable yielding `(error_code)`.
348
349 @par Preconditions
350 The wrapper must contain a valid sink (`has_value() == true`).
351 */
352 auto
353 write_eof();
354
355 protected:
356 /** Rebind to a new sink after move.
357
358 Updates the internal pointer to reference a new sink object.
359 Used by owning wrappers after move assignment when the owned
360 object has moved to a new location.
361
362 @param new_sink The new sink to bind to. Must be the same
363 type as the original sink.
364
365 @note Terminates if called with a sink of different type
366 than the original.
367 */
368 template<BufferSink S>
369 void
370 rebind(S& new_sink) noexcept
371 {
372 if(vt_ != &vtable_for_impl<S>::value)
373 std::terminate();
374 sink_ = &new_sink;
375 }
376
377 private:
378 /** Forward a partial write through the vtable.
379
380 Constructs the underlying `write_some` awaitable in
381 cached storage and returns a type-erased awaitable.
382 */
383 auto
384 write_some_(std::span<const_buffer const> buffers);
385
386 /** Forward a complete write through the vtable.
387
388 Constructs the underlying `write` awaitable in
389 cached storage and returns a type-erased awaitable.
390 */
391 auto
392 write_(std::span<const_buffer const> buffers);
393
394 /** Forward an atomic write-with-EOF through the vtable.
395
396 Constructs the underlying `write_eof(buffers)` awaitable
397 in cached storage and returns a type-erased awaitable.
398 */
399 auto
400 write_eof_buffers_(std::span<const_buffer const> buffers);
401 };
402
403 /** Type-erased ops for awaitables yielding `io_result<>`. */
404 struct any_buffer_sink::awaitable_ops
405 {
406 bool (*await_ready)(void*);
407 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
408 io_result<> (*await_resume)(void*);
409 void (*destroy)(void*) noexcept;
410 };
411
412 /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
413 struct any_buffer_sink::write_awaitable_ops
414 {
415 bool (*await_ready)(void*);
416 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
417 io_result<std::size_t> (*await_resume)(void*);
418 void (*destroy)(void*) noexcept;
419 };
420
421 struct any_buffer_sink::vtable
422 {
423 void (*destroy)(void*) noexcept;
424 std::span<mutable_buffer> (*do_prepare)(
425 void* sink,
426 std::span<mutable_buffer> dest);
427 std::size_t awaitable_size;
428 std::size_t awaitable_align;
429 awaitable_ops const* (*construct_commit_awaitable)(
430 void* sink,
431 void* storage,
432 std::size_t n);
433 awaitable_ops const* (*construct_commit_eof_awaitable)(
434 void* sink,
435 void* storage,
436 std::size_t n);
437
438 // WriteSink forwarding (null when wrapped type is BufferSink-only)
439 write_awaitable_ops const* (*construct_write_some_awaitable)(
440 void* sink,
441 void* storage,
442 std::span<const_buffer const> buffers);
443 write_awaitable_ops const* (*construct_write_awaitable)(
444 void* sink,
445 void* storage,
446 std::span<const_buffer const> buffers);
447 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
448 void* sink,
449 void* storage,
450 std::span<const_buffer const> buffers);
451 awaitable_ops const* (*construct_write_eof_awaitable)(
452 void* sink,
453 void* storage);
454 };
455
456 template<BufferSink S>
457 struct any_buffer_sink::vtable_for_impl
458 {
459 using CommitAwaitable = decltype(std::declval<S&>().commit(
460 std::size_t{}));
461 using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
462 std::size_t{}));
463
464 static void
465 18x do_destroy_impl(void* sink) noexcept
466 {
467 18x static_cast<S*>(sink)->~S();
468 18x }
469
470 static std::span<mutable_buffer>
471 126x do_prepare_impl(
472 void* sink,
473 std::span<mutable_buffer> dest)
474 {
475 126x auto& s = *static_cast<S*>(sink);
476 126x return s.prepare(dest);
477 }
478
479 static awaitable_ops const*
480 96x construct_commit_awaitable_impl(
481 void* sink,
482 void* storage,
483 std::size_t n)
484 {
485 96x auto& s = *static_cast<S*>(sink);
486 96x ::new(storage) CommitAwaitable(s.commit(n));
487
488 static constexpr awaitable_ops ops = {
489 +[](void* p) {
490 return static_cast<CommitAwaitable*>(p)->await_ready();
491 },
492 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
493 return detail::call_await_suspend(
494 static_cast<CommitAwaitable*>(p), h, env);
495 },
496 +[](void* p) {
497 return static_cast<CommitAwaitable*>(p)->await_resume();
498 },
499 +[](void* p) noexcept {
500 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
501 }
502 };
503 96x return &ops;
504 }
505
506 static awaitable_ops const*
507 70x construct_commit_eof_awaitable_impl(
508 void* sink,
509 void* storage,
510 std::size_t n)
511 {
512 70x auto& s = *static_cast<S*>(sink);
513 70x ::new(storage) CommitEofAwaitable(s.commit_eof(n));
514
515 static constexpr awaitable_ops ops = {
516 +[](void* p) {
517 return static_cast<CommitEofAwaitable*>(p)->await_ready();
518 },
519 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
520 return detail::call_await_suspend(
521 static_cast<CommitEofAwaitable*>(p), h, env);
522 },
523 +[](void* p) {
524 return static_cast<CommitEofAwaitable*>(p)->await_resume();
525 },
526 +[](void* p) noexcept {
527 static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
528 }
529 };
530 70x return &ops;
531 }
532
533 static write_awaitable_ops const*
534 6x construct_write_some_awaitable_impl(
535 void* sink,
536 void* storage,
537 std::span<const_buffer const> buffers)
538 requires WriteSink<S>
539 {
540 using Aw = decltype(std::declval<S&>().write_some(
541 std::span<const_buffer const>{}));
542 6x auto& s = *static_cast<S*>(sink);
543 6x ::new(storage) Aw(s.write_some(buffers));
544
545 static constexpr write_awaitable_ops ops = {
546 6x +[](void* p) {
547 6x return static_cast<Aw*>(p)->await_ready();
548 },
549 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
550 return detail::call_await_suspend(
551 static_cast<Aw*>(p), h, env);
552 },
553 6x +[](void* p) {
554 6x return static_cast<Aw*>(p)->await_resume();
555 },
556 6x +[](void* p) noexcept {
557 6x static_cast<Aw*>(p)->~Aw();
558 }
559 };
560 6x return &ops;
561 }
562
563 static write_awaitable_ops const*
564 14x construct_write_awaitable_impl(
565 void* sink,
566 void* storage,
567 std::span<const_buffer const> buffers)
568 requires WriteSink<S>
569 {
570 using Aw = decltype(std::declval<S&>().write(
571 std::span<const_buffer const>{}));
572 14x auto& s = *static_cast<S*>(sink);
573 14x ::new(storage) Aw(s.write(buffers));
574
575 static constexpr write_awaitable_ops ops = {
576 14x +[](void* p) {
577 14x return static_cast<Aw*>(p)->await_ready();
578 },
579 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
580 return detail::call_await_suspend(
581 static_cast<Aw*>(p), h, env);
582 },
583 14x +[](void* p) {
584 14x return static_cast<Aw*>(p)->await_resume();
585 },
586 14x +[](void* p) noexcept {
587 14x static_cast<Aw*>(p)->~Aw();
588 }
589 };
590 14x return &ops;
591 }
592
593 static write_awaitable_ops const*
594 12x construct_write_eof_buffers_awaitable_impl(
595 void* sink,
596 void* storage,
597 std::span<const_buffer const> buffers)
598 requires WriteSink<S>
599 {
600 using Aw = decltype(std::declval<S&>().write_eof(
601 std::span<const_buffer const>{}));
602 12x auto& s = *static_cast<S*>(sink);
603 12x ::new(storage) Aw(s.write_eof(buffers));
604
605 static constexpr write_awaitable_ops ops = {
606 12x +[](void* p) {
607 12x return static_cast<Aw*>(p)->await_ready();
608 },
609 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
610 return detail::call_await_suspend(
611 static_cast<Aw*>(p), h, env);
612 },
613 12x +[](void* p) {
614 12x return static_cast<Aw*>(p)->await_resume();
615 },
616 12x +[](void* p) noexcept {
617 12x static_cast<Aw*>(p)->~Aw();
618 }
619 };
620 12x return &ops;
621 }
622
623 static awaitable_ops const*
624 16x construct_write_eof_awaitable_impl(
625 void* sink,
626 void* storage)
627 requires WriteSink<S>
628 {
629 using Aw = decltype(std::declval<S&>().write_eof());
630 16x auto& s = *static_cast<S*>(sink);
631 16x ::new(storage) Aw(s.write_eof());
632
633 static constexpr awaitable_ops ops = {
634 16x +[](void* p) {
635 16x return static_cast<Aw*>(p)->await_ready();
636 },
637 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
638 return detail::call_await_suspend(
639 static_cast<Aw*>(p), h, env);
640 },
641 16x +[](void* p) {
642 16x return static_cast<Aw*>(p)->await_resume();
643 },
644 16x +[](void* p) noexcept {
645 16x static_cast<Aw*>(p)->~Aw();
646 }
647 };
648 16x return &ops;
649 }
650
651 static consteval std::size_t
652 compute_max_size() noexcept
653 {
654 std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
655 ? sizeof(CommitAwaitable)
656 : sizeof(CommitEofAwaitable);
657 if constexpr (WriteSink<S>)
658 {
659 using WS = decltype(std::declval<S&>().write_some(
660 std::span<const_buffer const>{}));
661 using W = decltype(std::declval<S&>().write(
662 std::span<const_buffer const>{}));
663 using WEB = decltype(std::declval<S&>().write_eof(
664 std::span<const_buffer const>{}));
665 using WE = decltype(std::declval<S&>().write_eof());
666
667 if(sizeof(WS) > s) s = sizeof(WS);
668 if(sizeof(W) > s) s = sizeof(W);
669 if(sizeof(WEB) > s) s = sizeof(WEB);
670 if(sizeof(WE) > s) s = sizeof(WE);
671 }
672 return s;
673 }
674
675 static consteval std::size_t
676 compute_max_align() noexcept
677 {
678 std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
679 ? alignof(CommitAwaitable)
680 : alignof(CommitEofAwaitable);
681 if constexpr (WriteSink<S>)
682 {
683 using WS = decltype(std::declval<S&>().write_some(
684 std::span<const_buffer const>{}));
685 using W = decltype(std::declval<S&>().write(
686 std::span<const_buffer const>{}));
687 using WEB = decltype(std::declval<S&>().write_eof(
688 std::span<const_buffer const>{}));
689 using WE = decltype(std::declval<S&>().write_eof());
690
691 if(alignof(WS) > a) a = alignof(WS);
692 if(alignof(W) > a) a = alignof(W);
693 if(alignof(WEB) > a) a = alignof(WEB);
694 if(alignof(WE) > a) a = alignof(WE);
695 }
696 return a;
697 }
698
699 static consteval vtable
700 make_vtable() noexcept
701 {
702 vtable v{};
703 v.destroy = &do_destroy_impl;
704 v.do_prepare = &do_prepare_impl;
705 v.awaitable_size = compute_max_size();
706 v.awaitable_align = compute_max_align();
707 v.construct_commit_awaitable = &construct_commit_awaitable_impl;
708 v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
709 v.construct_write_some_awaitable = nullptr;
710 v.construct_write_awaitable = nullptr;
711 v.construct_write_eof_buffers_awaitable = nullptr;
712 v.construct_write_eof_awaitable = nullptr;
713
714 if constexpr (WriteSink<S>)
715 {
716 v.construct_write_some_awaitable =
717 &construct_write_some_awaitable_impl;
718 v.construct_write_awaitable =
719 &construct_write_awaitable_impl;
720 v.construct_write_eof_buffers_awaitable =
721 &construct_write_eof_buffers_awaitable_impl;
722 v.construct_write_eof_awaitable =
723 &construct_write_eof_awaitable_impl;
724 }
725 return v;
726 }
727
728 static constexpr vtable value = make_vtable();
729 };
730
731 inline
732 215x any_buffer_sink::~any_buffer_sink()
733 {
734 215x if(storage_)
735 {
736 17x vt_->destroy(sink_);
737 17x ::operator delete(storage_);
738 }
739 215x if(cached_awaitable_)
740 208x ::operator delete(cached_awaitable_);
741 215x }
742
743 inline any_buffer_sink&
744 5x any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
745 {
746 5x if(this != &other)
747 {
748 4x if(storage_)
749 {
750 1x vt_->destroy(sink_);
751 1x ::operator delete(storage_);
752 }
753 4x if(cached_awaitable_)
754 2x ::operator delete(cached_awaitable_);
755 4x sink_ = std::exchange(other.sink_, nullptr);
756 4x vt_ = std::exchange(other.vt_, nullptr);
757 4x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
758 4x storage_ = std::exchange(other.storage_, nullptr);
759 4x active_ops_ = std::exchange(other.active_ops_, nullptr);
760 4x active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
761 }
762 5x return *this;
763 }
764
765 template<BufferSink S>
766 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
767 18x any_buffer_sink::any_buffer_sink(S s)
768 18x : vt_(&vtable_for_impl<S>::value)
769 {
770 struct guard {
771 any_buffer_sink* self;
772 bool committed = false;
773 ~guard() {
774 if(!committed && self->storage_) {
775 self->vt_->destroy(self->sink_);
776 ::operator delete(self->storage_);
777 self->storage_ = nullptr;
778 self->sink_ = nullptr;
779 }
780 }
781 18x } g{this};
782
783 18x storage_ = ::operator new(sizeof(S));
784 18x sink_ = ::new(storage_) S(std::move(s));
785
786 18x cached_awaitable_ = ::operator new(vt_->awaitable_size);
787
788 18x g.committed = true;
789 18x }
790
791 template<BufferSink S>
792 192x any_buffer_sink::any_buffer_sink(S* s)
793 192x : sink_(s)
794 192x , vt_(&vtable_for_impl<S>::value)
795 {
796 192x cached_awaitable_ = ::operator new(vt_->awaitable_size);
797 192x }
798
799 inline std::span<mutable_buffer>
800 126x any_buffer_sink::prepare(std::span<mutable_buffer> dest)
801 {
802 126x return vt_->do_prepare(sink_, dest);
803 }
804
805 inline auto
806 96x any_buffer_sink::commit(std::size_t n)
807 {
808 struct awaitable
809 {
810 any_buffer_sink* self_;
811 std::size_t n_;
812
813 bool
814 96x await_ready()
815 {
816 192x self_->active_ops_ = self_->vt_->construct_commit_awaitable(
817 96x self_->sink_,
818 96x self_->cached_awaitable_,
819 n_);
820 96x return self_->active_ops_->await_ready(self_->cached_awaitable_);
821 }
822
823 std::coroutine_handle<>
824 await_suspend(std::coroutine_handle<> h, io_env const* env)
825 {
826 return self_->active_ops_->await_suspend(
827 self_->cached_awaitable_, h, env);
828 }
829
830 io_result<>
831 96x await_resume()
832 {
833 struct guard {
834 any_buffer_sink* self;
835 96x ~guard() {
836 96x self->active_ops_->destroy(self->cached_awaitable_);
837 96x self->active_ops_ = nullptr;
838 96x }
839 96x } g{self_};
840 96x return self_->active_ops_->await_resume(
841 166x self_->cached_awaitable_);
842 96x }
843 };
844 96x return awaitable{this, n};
845 }
846
847 inline auto
848 54x any_buffer_sink::commit_eof(std::size_t n)
849 {
850 struct awaitable
851 {
852 any_buffer_sink* self_;
853 std::size_t n_;
854
855 bool
856 54x await_ready()
857 {
858 108x self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
859 54x self_->sink_,
860 54x self_->cached_awaitable_,
861 n_);
862 54x return self_->active_ops_->await_ready(self_->cached_awaitable_);
863 }
864
865 std::coroutine_handle<>
866 await_suspend(std::coroutine_handle<> h, io_env const* env)
867 {
868 return self_->active_ops_->await_suspend(
869 self_->cached_awaitable_, h, env);
870 }
871
872 io_result<>
873 54x await_resume()
874 {
875 struct guard {
876 any_buffer_sink* self;
877 54x ~guard() {
878 54x self->active_ops_->destroy(self->cached_awaitable_);
879 54x self->active_ops_ = nullptr;
880 54x }
881 54x } g{self_};
882 54x return self_->active_ops_->await_resume(
883 92x self_->cached_awaitable_);
884 54x }
885 };
886 54x return awaitable{this, n};
887 }
888
889 inline auto
890 6x any_buffer_sink::write_some_(
891 std::span<const_buffer const> buffers)
892 {
893 struct awaitable
894 {
895 any_buffer_sink* self_;
896 std::span<const_buffer const> buffers_;
897
898 bool
899 6x await_ready() const noexcept
900 {
901 6x return false;
902 }
903
904 std::coroutine_handle<>
905 6x await_suspend(std::coroutine_handle<> h, io_env const* env)
906 {
907 12x self_->active_write_ops_ =
908 12x self_->vt_->construct_write_some_awaitable(
909 6x self_->sink_,
910 6x self_->cached_awaitable_,
911 buffers_);
912
913 6x if(self_->active_write_ops_->await_ready(
914 6x self_->cached_awaitable_))
915 6x return h;
916
917 return self_->active_write_ops_->await_suspend(
918 self_->cached_awaitable_, h, env);
919 }
920
921 io_result<std::size_t>
922 6x await_resume()
923 {
924 struct guard {
925 any_buffer_sink* self;
926 6x ~guard() {
927 6x self->active_write_ops_->destroy(
928 6x self->cached_awaitable_);
929 6x self->active_write_ops_ = nullptr;
930 6x }
931 6x } g{self_};
932 6x return self_->active_write_ops_->await_resume(
933 10x self_->cached_awaitable_);
934 6x }
935 };
936 6x return awaitable{this, buffers};
937 }
938
939 inline auto
940 14x any_buffer_sink::write_(
941 std::span<const_buffer const> buffers)
942 {
943 struct awaitable
944 {
945 any_buffer_sink* self_;
946 std::span<const_buffer const> buffers_;
947
948 bool
949 14x await_ready() const noexcept
950 {
951 14x return false;
952 }
953
954 std::coroutine_handle<>
955 14x await_suspend(std::coroutine_handle<> h, io_env const* env)
956 {
957 28x self_->active_write_ops_ =
958 28x self_->vt_->construct_write_awaitable(
959 14x self_->sink_,
960 14x self_->cached_awaitable_,
961 buffers_);
962
963 14x if(self_->active_write_ops_->await_ready(
964 14x self_->cached_awaitable_))
965 14x return h;
966
967 return self_->active_write_ops_->await_suspend(
968 self_->cached_awaitable_, h, env);
969 }
970
971 io_result<std::size_t>
972 14x await_resume()
973 {
974 struct guard {
975 any_buffer_sink* self;
976 14x ~guard() {
977 14x self->active_write_ops_->destroy(
978 14x self->cached_awaitable_);
979 14x self->active_write_ops_ = nullptr;
980 14x }
981 14x } g{self_};
982 14x return self_->active_write_ops_->await_resume(
983 24x self_->cached_awaitable_);
984 14x }
985 };
986 14x return awaitable{this, buffers};
987 }
988
989 inline auto
990 12x any_buffer_sink::write_eof_buffers_(
991 std::span<const_buffer const> buffers)
992 {
993 struct awaitable
994 {
995 any_buffer_sink* self_;
996 std::span<const_buffer const> buffers_;
997
998 bool
999 12x await_ready() const noexcept
1000 {
1001 12x return false;
1002 }
1003
1004 std::coroutine_handle<>
1005 12x await_suspend(std::coroutine_handle<> h, io_env const* env)
1006 {
1007 24x self_->active_write_ops_ =
1008 24x self_->vt_->construct_write_eof_buffers_awaitable(
1009 12x self_->sink_,
1010 12x self_->cached_awaitable_,
1011 buffers_);
1012
1013 12x if(self_->active_write_ops_->await_ready(
1014 12x self_->cached_awaitable_))
1015 12x return h;
1016
1017 return self_->active_write_ops_->await_suspend(
1018 self_->cached_awaitable_, h, env);
1019 }
1020
1021 io_result<std::size_t>
1022 12x await_resume()
1023 {
1024 struct guard {
1025 any_buffer_sink* self;
1026 12x ~guard() {
1027 12x self->active_write_ops_->destroy(
1028 12x self->cached_awaitable_);
1029 12x self->active_write_ops_ = nullptr;
1030 12x }
1031 12x } g{self_};
1032 12x return self_->active_write_ops_->await_resume(
1033 20x self_->cached_awaitable_);
1034 12x }
1035 };
1036 12x return awaitable{this, buffers};
1037 }
1038
1039 template<ConstBufferSequence CB>
1040 io_task<std::size_t>
1041 22x any_buffer_sink::write_some(CB buffers)
1042 {
1043 buffer_param<CB> bp(buffers);
1044 auto src = bp.data();
1045 if(src.empty())
1046 co_return {{}, 0};
1047
1048 // Native WriteSink path
1049 if(vt_->construct_write_some_awaitable)
1050 co_return co_await write_some_(src);
1051
1052 // Synthesized path: prepare + buffer_copy + commit
1053 mutable_buffer arr[detail::max_iovec_];
1054 auto dst_bufs = prepare(arr);
1055 if(dst_bufs.empty())
1056 {
1057 auto [ec] = co_await commit(0);
1058 if(ec)
1059 co_return {ec, 0};
1060 dst_bufs = prepare(arr);
1061 if(dst_bufs.empty())
1062 co_return {{}, 0};
1063 }
1064
1065 auto n = buffer_copy(dst_bufs, src);
1066 auto [ec] = co_await commit(n);
1067 if(ec)
1068 co_return {ec, 0};
1069 co_return {{}, n};
1070 44x }
1071
1072 template<ConstBufferSequence CB>
1073 io_task<std::size_t>
1074 38x any_buffer_sink::write(CB buffers)
1075 {
1076 buffer_param<CB> bp(buffers);
1077 std::size_t total = 0;
1078
1079 // Native WriteSink path
1080 if(vt_->construct_write_awaitable)
1081 {
1082 for(;;)
1083 {
1084 auto bufs = bp.data();
1085 if(bufs.empty())
1086 break;
1087
1088 auto [ec, n] = co_await write_(bufs);
1089 total += n;
1090 if(ec)
1091 co_return {ec, total};
1092 bp.consume(n);
1093 }
1094 co_return {{}, total};
1095 }
1096
1097 // Synthesized path: prepare + buffer_copy + commit
1098 for(;;)
1099 {
1100 auto src = bp.data();
1101 if(src.empty())
1102 break;
1103
1104 mutable_buffer arr[detail::max_iovec_];
1105 auto dst_bufs = prepare(arr);
1106 if(dst_bufs.empty())
1107 {
1108 auto [ec] = co_await commit(0);
1109 if(ec)
1110 co_return {ec, total};
1111 continue;
1112 }
1113
1114 auto n = buffer_copy(dst_bufs, src);
1115 auto [ec] = co_await commit(n);
1116 if(ec)
1117 co_return {ec, total};
1118 bp.consume(n);
1119 total += n;
1120 }
1121
1122 co_return {{}, total};
1123 76x }
1124
1125 inline auto
1126 32x any_buffer_sink::write_eof()
1127 {
1128 struct awaitable
1129 {
1130 any_buffer_sink* self_;
1131
1132 bool
1133 32x await_ready()
1134 {
1135 32x if(self_->vt_->construct_write_eof_awaitable)
1136 {
1137 // Native WriteSink: forward to underlying write_eof()
1138 32x self_->active_ops_ =
1139 16x self_->vt_->construct_write_eof_awaitable(
1140 16x self_->sink_,
1141 16x self_->cached_awaitable_);
1142 }
1143 else
1144 {
1145 // Synthesized: commit_eof(0)
1146 32x self_->active_ops_ =
1147 16x self_->vt_->construct_commit_eof_awaitable(
1148 16x self_->sink_,
1149 16x self_->cached_awaitable_,
1150 0);
1151 }
1152 64x return self_->active_ops_->await_ready(
1153 32x self_->cached_awaitable_);
1154 }
1155
1156 std::coroutine_handle<>
1157 await_suspend(std::coroutine_handle<> h, io_env const* env)
1158 {
1159 return self_->active_ops_->await_suspend(
1160 self_->cached_awaitable_, h, env);
1161 }
1162
1163 io_result<>
1164 32x await_resume()
1165 {
1166 struct guard {
1167 any_buffer_sink* self;
1168 32x ~guard() {
1169 32x self->active_ops_->destroy(self->cached_awaitable_);
1170 32x self->active_ops_ = nullptr;
1171 32x }
1172 32x } g{self_};
1173 32x return self_->active_ops_->await_resume(
1174 54x self_->cached_awaitable_);
1175 32x }
1176 };
1177 32x return awaitable{this};
1178 }
1179
1180 template<ConstBufferSequence CB>
1181 io_task<std::size_t>
1182 40x any_buffer_sink::write_eof(CB buffers)
1183 {
1184 // Native WriteSink path
1185 if(vt_->construct_write_eof_buffers_awaitable)
1186 {
1187 const_buffer_param<CB> bp(buffers);
1188 std::size_t total = 0;
1189
1190 for(;;)
1191 {
1192 auto bufs = bp.data();
1193 if(bufs.empty())
1194 {
1195 auto [ec] = co_await write_eof();
1196 co_return {ec, total};
1197 }
1198
1199 if(!bp.more())
1200 {
1201 // Last window: send atomically with EOF
1202 auto [ec, n] = co_await write_eof_buffers_(bufs);
1203 total += n;
1204 co_return {ec, total};
1205 }
1206
1207 auto [ec, n] = co_await write_(bufs);
1208 total += n;
1209 if(ec)
1210 co_return {ec, total};
1211 bp.consume(n);
1212 }
1213 }
1214
1215 // Synthesized path: prepare + buffer_copy + commit + commit_eof
1216 buffer_param<CB> bp(buffers);
1217 std::size_t total = 0;
1218
1219 for(;;)
1220 {
1221 auto src = bp.data();
1222 if(src.empty())
1223 break;
1224
1225 mutable_buffer arr[detail::max_iovec_];
1226 auto dst_bufs = prepare(arr);
1227 if(dst_bufs.empty())
1228 {
1229 auto [ec] = co_await commit(0);
1230 if(ec)
1231 co_return {ec, total};
1232 continue;
1233 }
1234
1235 auto n = buffer_copy(dst_bufs, src);
1236 auto [ec] = co_await commit(n);
1237 if(ec)
1238 co_return {ec, total};
1239 bp.consume(n);
1240 total += n;
1241 }
1242
1243 auto [ec] = co_await commit_eof(0);
1244 if(ec)
1245 co_return {ec, total};
1246
1247 co_return {{}, total};
1248 80x }
1249
1250 static_assert(BufferSink<any_buffer_sink>);
1251 static_assert(WriteSink<any_buffer_sink>);
1252
1253 } // namespace capy
1254 } // namespace boost
1255
1256 #endif
1257