TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any BufferSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref BufferSink concept, enabling runtime polymorphism for
42 : buffer sink operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper exposes two interfaces for producing data:
46 : the @ref BufferSink interface (`prepare`, `commit`, `commit_eof`)
47 : and the @ref WriteSink interface (`write_some`, `write`,
48 : `write_eof`). Choose the interface that matches how your data
49 : is produced:
50 :
51 : @par Choosing an Interface
52 :
53 : Use the **BufferSink** interface when you are a generator that
54 : produces data into externally-provided buffers. The sink owns
55 : the memory; you call @ref prepare to obtain writable buffers,
56 : fill them, then call @ref commit or @ref commit_eof.
57 :
58 : Use the **WriteSink** interface when you already have buffers
59 : containing the data to write:
60 : - If the entire body is available up front, call
61 : @ref write_eof(buffers) to send everything atomically.
62 : - If data arrives incrementally, call @ref write or
63 : @ref write_some in a loop, then @ref write_eof() when done.
64 : Prefer `write` (complete) unless your streaming pattern
65 : benefits from partial writes via `write_some`.
66 :
67 : If the wrapped type only satisfies @ref BufferSink, the
68 : @ref WriteSink operations are provided automatically.
69 :
70 : @par Construction Modes
71 :
72 : - **Owning**: Pass by value to transfer ownership. The wrapper
73 : allocates storage and owns the sink.
74 : - **Reference**: Pass a pointer to wrap without ownership. The
75 : pointed-to sink must outlive this wrapper.
76 :
77 : @par Awaitable Preallocation
78 : The constructor preallocates storage for the type-erased awaitable.
79 : This reserves all virtual address space at server startup
80 : so memory usage can be measured up front, rather than
81 : allocating piecemeal as traffic arrives.
82 :
83 : @par Thread Safety
84 : Not thread-safe. Concurrent operations on the same wrapper
85 : are undefined behavior.
86 :
87 : @par Example
88 : @code
89 : // Owning - takes ownership of the sink
90 : any_buffer_sink abs(some_buffer_sink{args...});
91 :
92 : // Reference - wraps without ownership
93 : some_buffer_sink sink;
94 : any_buffer_sink abs(&sink);
95 :
96 : // BufferSink interface: generate into callee-owned buffers
97 : mutable_buffer arr[16];
98 : auto bufs = abs.prepare(arr);
99 : // Write data into bufs[0..bufs.size())
100 : auto [ec] = co_await abs.commit(bytes_written);
101 : auto [ec2] = co_await abs.commit_eof(0);
102 :
103 : // WriteSink interface: send caller-owned buffers
104 : auto [ec3, n] = co_await abs.write(make_buffer("hello", 5));
105 : auto [ec4] = co_await abs.write_eof();
106 :
107 : // Or send everything at once
108 : auto [ec5, n2] = co_await abs.write_eof(
109 : make_buffer(body_data));
110 : @endcode
111 :
112 : @see any_buffer_source, BufferSink, WriteSink
113 : */
114 : class any_buffer_sink
115 : {
116 : struct vtable;
117 : struct awaitable_ops;
118 : struct write_awaitable_ops;
119 :
120 : template<BufferSink S>
121 : struct vtable_for_impl;
122 :
123 : // hot-path members first for cache locality
124 : void* sink_ = nullptr;
125 : vtable const* vt_ = nullptr;
126 : void* cached_awaitable_ = nullptr;
127 : awaitable_ops const* active_ops_ = nullptr;
128 : write_awaitable_ops const* active_write_ops_ = nullptr;
129 : void* storage_ = nullptr;
130 :
131 : public:
132 : /** Destructor.
133 :
134 : Destroys the owned sink (if any) and releases the cached
135 : awaitable storage.
136 : */
137 : ~any_buffer_sink();
138 :
139 : /** Construct a default instance.
140 :
141 : Constructs an empty wrapper. Operations on a default-constructed
142 : wrapper result in undefined behavior.
143 : */
144 : any_buffer_sink() = default;
145 :
146 : /** Non-copyable.
147 :
148 : The awaitable cache is per-instance and cannot be shared.
149 : */
150 : any_buffer_sink(any_buffer_sink const&) = delete;
151 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
152 :
153 : /** Construct by moving.
154 :
155 : Transfers ownership of the wrapped sink (if owned) and
156 : cached awaitable storage from `other`. After the move, `other` is
157 : in a default-constructed state.
158 :
159 : @param other The wrapper to move from.
160 : */
161 HIT 2 : any_buffer_sink(any_buffer_sink&& other) noexcept
162 2 : : sink_(std::exchange(other.sink_, nullptr))
163 2 : , vt_(std::exchange(other.vt_, nullptr))
164 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
165 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
166 2 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
167 2 : , storage_(std::exchange(other.storage_, nullptr))
168 : {
169 2 : }
170 :
171 : /** Assign by moving.
172 :
173 : Destroys any owned sink and releases existing resources,
174 : then transfers ownership from `other`.
175 :
176 : @param other The wrapper to move from.
177 : @return Reference to this wrapper.
178 : */
179 : any_buffer_sink&
180 : operator=(any_buffer_sink&& other) noexcept;
181 :
182 : /** Construct by taking ownership of a BufferSink.
183 :
184 : Allocates storage and moves the sink into this wrapper.
185 : The wrapper owns the sink and will destroy it. If `S` also
186 : satisfies @ref WriteSink, native write operations are
187 : forwarded through the virtual boundary.
188 :
189 : @param s The sink to take ownership of.
190 : */
191 : template<BufferSink S>
192 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
193 : any_buffer_sink(S s);
194 :
195 : /** Construct by wrapping a BufferSink without ownership.
196 :
197 : Wraps the given sink by pointer. The sink must remain
198 : valid for the lifetime of this wrapper. If `S` also
199 : satisfies @ref WriteSink, native write operations are
200 : forwarded through the virtual boundary.
201 :
202 : @param s Pointer to the sink to wrap.
203 : */
204 : template<BufferSink S>
205 : any_buffer_sink(S* s);
206 :
207 : /** Check if the wrapper contains a valid sink.
208 :
209 : @return `true` if wrapping a sink, `false` if default-constructed
210 : or moved-from.
211 : */
212 : bool
213 26 : has_value() const noexcept
214 : {
215 26 : return sink_ != nullptr;
216 : }
217 :
218 : /** Check if the wrapper contains a valid sink.
219 :
220 : @return `true` if wrapping a sink, `false` if default-constructed
221 : or moved-from.
222 : */
223 : explicit
224 3 : operator bool() const noexcept
225 : {
226 3 : return has_value();
227 : }
228 :
229 : /** Prepare writable buffers.
230 :
231 : Fills the provided span with mutable buffer descriptors
232 : pointing to the underlying sink's internal storage. This
233 : operation is synchronous.
234 :
235 : @param dest Span of mutable_buffer to fill.
236 :
237 : @return A span of filled buffers.
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : std::span<mutable_buffer>
243 : prepare(std::span<mutable_buffer> dest);
244 :
245 : /** Commit bytes written to the prepared buffers.
246 :
247 : Commits `n` bytes written to the buffers returned by the
248 : most recent call to @ref prepare. The operation may trigger
249 : underlying I/O.
250 :
251 : @param n The number of bytes to commit.
252 :
253 : @return An awaitable yielding `(error_code)`.
254 :
255 : @par Preconditions
256 : The wrapper must contain a valid sink (`has_value() == true`).
257 : */
258 : auto
259 : commit(std::size_t n);
260 :
261 : /** Commit final bytes and signal end-of-stream.
262 :
263 : Commits `n` bytes written to the buffers returned by the
264 : most recent call to @ref prepare and finalizes the sink.
265 : After success, no further operations are permitted.
266 :
267 : @param n The number of bytes to commit.
268 :
269 : @return An awaitable yielding `(error_code)`.
270 :
271 : @par Preconditions
272 : The wrapper must contain a valid sink (`has_value() == true`).
273 : */
274 : auto
275 : commit_eof(std::size_t n);
276 :
277 : /** Write some data from a buffer sequence.
278 :
279 : Writes one or more bytes from the buffer sequence to the
280 : underlying sink. May consume less than the full sequence.
281 :
282 : When the wrapped type provides native @ref WriteSink support,
283 : the operation forwards directly. Otherwise it is synthesized
284 : from @ref prepare and @ref commit with a buffer copy.
285 :
286 : @param buffers The buffer sequence to write.
287 :
288 : @return An awaitable yielding `(error_code,std::size_t)`.
289 :
290 : @par Preconditions
291 : The wrapper must contain a valid sink (`has_value() == true`).
292 : */
293 : template<ConstBufferSequence CB>
294 : io_task<std::size_t>
295 : write_some(CB buffers);
296 :
297 : /** Write all data from a buffer sequence.
298 :
299 : Writes all data from the buffer sequence to the underlying
300 : sink. This method satisfies the @ref WriteSink concept.
301 :
302 : When the wrapped type provides native @ref WriteSink support,
303 : each window is forwarded directly. Otherwise the data is
304 : copied into the sink via @ref prepare and @ref commit.
305 :
306 : @param buffers The buffer sequence to write.
307 :
308 : @return An awaitable yielding `(error_code,std::size_t)`.
309 :
310 : @par Preconditions
311 : The wrapper must contain a valid sink (`has_value() == true`).
312 : */
313 : template<ConstBufferSequence CB>
314 : io_task<std::size_t>
315 : write(CB buffers);
316 :
317 : /** Atomically write data and signal end-of-stream.
318 :
319 : Writes all data from the buffer sequence to the underlying
320 : sink and then signals end-of-stream.
321 :
322 : When the wrapped type provides native @ref WriteSink support,
323 : the final window is sent atomically via the underlying
324 : `write_eof(buffers)`. Otherwise the data is synthesized
325 : through @ref prepare, @ref commit, and @ref commit_eof.
326 :
327 : @param buffers The buffer sequence to write.
328 :
329 : @return An awaitable yielding `(error_code,std::size_t)`.
330 :
331 : @par Preconditions
332 : The wrapper must contain a valid sink (`has_value() == true`).
333 : */
334 : template<ConstBufferSequence CB>
335 : io_task<std::size_t>
336 : write_eof(CB buffers);
337 :
338 : /** Signal end-of-stream.
339 :
340 : Indicates that no more data will be written to the sink.
341 : This method satisfies the @ref WriteSink concept.
342 :
343 : When the wrapped type provides native @ref WriteSink support,
344 : the underlying `write_eof()` is called. Otherwise the
345 : operation is implemented as `commit_eof(0)`.
346 :
347 : @return An awaitable yielding `(error_code)`.
348 :
349 : @par Preconditions
350 : The wrapper must contain a valid sink (`has_value() == true`).
351 : */
352 : auto
353 : write_eof();
354 :
355 : protected:
356 : /** Rebind to a new sink after move.
357 :
358 : Updates the internal pointer to reference a new sink object.
359 : Used by owning wrappers after move assignment when the owned
360 : object has moved to a new location.
361 :
362 : @param new_sink The new sink to bind to. Must be the same
363 : type as the original sink.
364 :
365 : @note Terminates if called with a sink of different type
366 : than the original.
367 : */
368 : template<BufferSink S>
369 : void
370 : rebind(S& new_sink) noexcept
371 : {
372 : if(vt_ != &vtable_for_impl<S>::value)
373 : std::terminate();
374 : sink_ = &new_sink;
375 : }
376 :
377 : private:
378 : /** Forward a partial write through the vtable.
379 :
380 : Constructs the underlying `write_some` awaitable in
381 : cached storage and returns a type-erased awaitable.
382 : */
383 : auto
384 : write_some_(std::span<const_buffer const> buffers);
385 :
386 : /** Forward a complete write through the vtable.
387 :
388 : Constructs the underlying `write` awaitable in
389 : cached storage and returns a type-erased awaitable.
390 : */
391 : auto
392 : write_(std::span<const_buffer const> buffers);
393 :
394 : /** Forward an atomic write-with-EOF through the vtable.
395 :
396 : Constructs the underlying `write_eof(buffers)` awaitable
397 : in cached storage and returns a type-erased awaitable.
398 : */
399 : auto
400 : write_eof_buffers_(std::span<const_buffer const> buffers);
401 : };
402 :
403 : /** Type-erased ops for awaitables yielding `io_result<>`. */
404 : struct any_buffer_sink::awaitable_ops
405 : {
406 : bool (*await_ready)(void*);
407 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
408 : io_result<> (*await_resume)(void*);
409 : void (*destroy)(void*) noexcept;
410 : };
411 :
412 : /** Type-erased ops for awaitables yielding `io_result<std::size_t>`. */
413 : struct any_buffer_sink::write_awaitable_ops
414 : {
415 : bool (*await_ready)(void*);
416 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
417 : io_result<std::size_t> (*await_resume)(void*);
418 : void (*destroy)(void*) noexcept;
419 : };
420 :
421 : struct any_buffer_sink::vtable
422 : {
423 : void (*destroy)(void*) noexcept;
424 : std::span<mutable_buffer> (*do_prepare)(
425 : void* sink,
426 : std::span<mutable_buffer> dest);
427 : std::size_t awaitable_size;
428 : std::size_t awaitable_align;
429 : awaitable_ops const* (*construct_commit_awaitable)(
430 : void* sink,
431 : void* storage,
432 : std::size_t n);
433 : awaitable_ops const* (*construct_commit_eof_awaitable)(
434 : void* sink,
435 : void* storage,
436 : std::size_t n);
437 :
438 : // WriteSink forwarding (null when wrapped type is BufferSink-only)
439 : write_awaitable_ops const* (*construct_write_some_awaitable)(
440 : void* sink,
441 : void* storage,
442 : std::span<const_buffer const> buffers);
443 : write_awaitable_ops const* (*construct_write_awaitable)(
444 : void* sink,
445 : void* storage,
446 : std::span<const_buffer const> buffers);
447 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
448 : void* sink,
449 : void* storage,
450 : std::span<const_buffer const> buffers);
451 : awaitable_ops const* (*construct_write_eof_awaitable)(
452 : void* sink,
453 : void* storage);
454 : };
455 :
456 : template<BufferSink S>
457 : struct any_buffer_sink::vtable_for_impl
458 : {
459 : using CommitAwaitable = decltype(std::declval<S&>().commit(
460 : std::size_t{}));
461 : using CommitEofAwaitable = decltype(std::declval<S&>().commit_eof(
462 : std::size_t{}));
463 :
464 : static void
465 18 : do_destroy_impl(void* sink) noexcept
466 : {
467 18 : static_cast<S*>(sink)->~S();
468 18 : }
469 :
470 : static std::span<mutable_buffer>
471 126 : do_prepare_impl(
472 : void* sink,
473 : std::span<mutable_buffer> dest)
474 : {
475 126 : auto& s = *static_cast<S*>(sink);
476 126 : return s.prepare(dest);
477 : }
478 :
479 : static awaitable_ops const*
480 96 : construct_commit_awaitable_impl(
481 : void* sink,
482 : void* storage,
483 : std::size_t n)
484 : {
485 96 : auto& s = *static_cast<S*>(sink);
486 96 : ::new(storage) CommitAwaitable(s.commit(n));
487 :
488 : static constexpr awaitable_ops ops = {
489 96 : +[](void* p) {
490 96 : return static_cast<CommitAwaitable*>(p)->await_ready();
491 : },
492 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
493 0 : return detail::call_await_suspend(
494 0 : static_cast<CommitAwaitable*>(p), h, env);
495 : },
496 HIT 96 : +[](void* p) {
497 96 : return static_cast<CommitAwaitable*>(p)->await_resume();
498 : },
499 96 : +[](void* p) noexcept {
500 96 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
501 : }
502 : };
503 96 : return &ops;
504 : }
505 :
506 : static awaitable_ops const*
507 70 : construct_commit_eof_awaitable_impl(
508 : void* sink,
509 : void* storage,
510 : std::size_t n)
511 : {
512 70 : auto& s = *static_cast<S*>(sink);
513 70 : ::new(storage) CommitEofAwaitable(s.commit_eof(n));
514 :
515 : static constexpr awaitable_ops ops = {
516 70 : +[](void* p) {
517 70 : return static_cast<CommitEofAwaitable*>(p)->await_ready();
518 : },
519 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
520 0 : return detail::call_await_suspend(
521 0 : static_cast<CommitEofAwaitable*>(p), h, env);
522 : },
523 HIT 70 : +[](void* p) {
524 70 : return static_cast<CommitEofAwaitable*>(p)->await_resume();
525 : },
526 70 : +[](void* p) noexcept {
527 70 : static_cast<CommitEofAwaitable*>(p)->~CommitEofAwaitable();
528 : }
529 : };
530 70 : return &ops;
531 : }
532 :
533 : static write_awaitable_ops const*
534 6 : construct_write_some_awaitable_impl(
535 : void* sink,
536 : void* storage,
537 : std::span<const_buffer const> buffers)
538 : requires WriteSink<S>
539 : {
540 : using Aw = decltype(std::declval<S&>().write_some(
541 : std::span<const_buffer const>{}));
542 6 : auto& s = *static_cast<S*>(sink);
543 6 : ::new(storage) Aw(s.write_some(buffers));
544 :
545 : static constexpr write_awaitable_ops ops = {
546 6 : +[](void* p) {
547 6 : return static_cast<Aw*>(p)->await_ready();
548 : },
549 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
550 0 : return detail::call_await_suspend(
551 0 : static_cast<Aw*>(p), h, env);
552 : },
553 HIT 6 : +[](void* p) {
554 6 : return static_cast<Aw*>(p)->await_resume();
555 : },
556 6 : +[](void* p) noexcept {
557 6 : static_cast<Aw*>(p)->~Aw();
558 : }
559 : };
560 6 : return &ops;
561 : }
562 :
563 : static write_awaitable_ops const*
564 14 : construct_write_awaitable_impl(
565 : void* sink,
566 : void* storage,
567 : std::span<const_buffer const> buffers)
568 : requires WriteSink<S>
569 : {
570 : using Aw = decltype(std::declval<S&>().write(
571 : std::span<const_buffer const>{}));
572 14 : auto& s = *static_cast<S*>(sink);
573 14 : ::new(storage) Aw(s.write(buffers));
574 :
575 : static constexpr write_awaitable_ops ops = {
576 14 : +[](void* p) {
577 14 : return static_cast<Aw*>(p)->await_ready();
578 : },
579 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
580 0 : return detail::call_await_suspend(
581 0 : static_cast<Aw*>(p), h, env);
582 : },
583 HIT 14 : +[](void* p) {
584 14 : return static_cast<Aw*>(p)->await_resume();
585 : },
586 14 : +[](void* p) noexcept {
587 14 : static_cast<Aw*>(p)->~Aw();
588 : }
589 : };
590 14 : return &ops;
591 : }
592 :
593 : static write_awaitable_ops const*
594 12 : construct_write_eof_buffers_awaitable_impl(
595 : void* sink,
596 : void* storage,
597 : std::span<const_buffer const> buffers)
598 : requires WriteSink<S>
599 : {
600 : using Aw = decltype(std::declval<S&>().write_eof(
601 : std::span<const_buffer const>{}));
602 12 : auto& s = *static_cast<S*>(sink);
603 12 : ::new(storage) Aw(s.write_eof(buffers));
604 :
605 : static constexpr write_awaitable_ops ops = {
606 12 : +[](void* p) {
607 12 : return static_cast<Aw*>(p)->await_ready();
608 : },
609 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
610 0 : return detail::call_await_suspend(
611 0 : static_cast<Aw*>(p), h, env);
612 : },
613 HIT 12 : +[](void* p) {
614 12 : return static_cast<Aw*>(p)->await_resume();
615 : },
616 12 : +[](void* p) noexcept {
617 12 : static_cast<Aw*>(p)->~Aw();
618 : }
619 : };
620 12 : return &ops;
621 : }
622 :
623 : static awaitable_ops const*
624 16 : construct_write_eof_awaitable_impl(
625 : void* sink,
626 : void* storage)
627 : requires WriteSink<S>
628 : {
629 : using Aw = decltype(std::declval<S&>().write_eof());
630 16 : auto& s = *static_cast<S*>(sink);
631 16 : ::new(storage) Aw(s.write_eof());
632 :
633 : static constexpr awaitable_ops ops = {
634 16 : +[](void* p) {
635 16 : return static_cast<Aw*>(p)->await_ready();
636 : },
637 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
638 0 : return detail::call_await_suspend(
639 0 : static_cast<Aw*>(p), h, env);
640 : },
641 HIT 16 : +[](void* p) {
642 16 : return static_cast<Aw*>(p)->await_resume();
643 : },
644 16 : +[](void* p) noexcept {
645 16 : static_cast<Aw*>(p)->~Aw();
646 : }
647 : };
648 16 : return &ops;
649 : }
650 :
651 : static consteval std::size_t
652 : compute_max_size() noexcept
653 : {
654 : std::size_t s = sizeof(CommitAwaitable) > sizeof(CommitEofAwaitable)
655 : ? sizeof(CommitAwaitable)
656 : : sizeof(CommitEofAwaitable);
657 : if constexpr (WriteSink<S>)
658 : {
659 : using WS = decltype(std::declval<S&>().write_some(
660 : std::span<const_buffer const>{}));
661 : using W = decltype(std::declval<S&>().write(
662 : std::span<const_buffer const>{}));
663 : using WEB = decltype(std::declval<S&>().write_eof(
664 : std::span<const_buffer const>{}));
665 : using WE = decltype(std::declval<S&>().write_eof());
666 :
667 : if(sizeof(WS) > s) s = sizeof(WS);
668 : if(sizeof(W) > s) s = sizeof(W);
669 : if(sizeof(WEB) > s) s = sizeof(WEB);
670 : if(sizeof(WE) > s) s = sizeof(WE);
671 : }
672 : return s;
673 : }
674 :
675 : static consteval std::size_t
676 : compute_max_align() noexcept
677 : {
678 : std::size_t a = alignof(CommitAwaitable) > alignof(CommitEofAwaitable)
679 : ? alignof(CommitAwaitable)
680 : : alignof(CommitEofAwaitable);
681 : if constexpr (WriteSink<S>)
682 : {
683 : using WS = decltype(std::declval<S&>().write_some(
684 : std::span<const_buffer const>{}));
685 : using W = decltype(std::declval<S&>().write(
686 : std::span<const_buffer const>{}));
687 : using WEB = decltype(std::declval<S&>().write_eof(
688 : std::span<const_buffer const>{}));
689 : using WE = decltype(std::declval<S&>().write_eof());
690 :
691 : if(alignof(WS) > a) a = alignof(WS);
692 : if(alignof(W) > a) a = alignof(W);
693 : if(alignof(WEB) > a) a = alignof(WEB);
694 : if(alignof(WE) > a) a = alignof(WE);
695 : }
696 : return a;
697 : }
698 :
699 : static consteval vtable
700 : make_vtable() noexcept
701 : {
702 : vtable v{};
703 : v.destroy = &do_destroy_impl;
704 : v.do_prepare = &do_prepare_impl;
705 : v.awaitable_size = compute_max_size();
706 : v.awaitable_align = compute_max_align();
707 : v.construct_commit_awaitable = &construct_commit_awaitable_impl;
708 : v.construct_commit_eof_awaitable = &construct_commit_eof_awaitable_impl;
709 : v.construct_write_some_awaitable = nullptr;
710 : v.construct_write_awaitable = nullptr;
711 : v.construct_write_eof_buffers_awaitable = nullptr;
712 : v.construct_write_eof_awaitable = nullptr;
713 :
714 : if constexpr (WriteSink<S>)
715 : {
716 : v.construct_write_some_awaitable =
717 : &construct_write_some_awaitable_impl;
718 : v.construct_write_awaitable =
719 : &construct_write_awaitable_impl;
720 : v.construct_write_eof_buffers_awaitable =
721 : &construct_write_eof_buffers_awaitable_impl;
722 : v.construct_write_eof_awaitable =
723 : &construct_write_eof_awaitable_impl;
724 : }
725 : return v;
726 : }
727 :
728 : static constexpr vtable value = make_vtable();
729 : };
730 :
731 : inline
732 215 : any_buffer_sink::~any_buffer_sink()
733 : {
734 215 : if(storage_)
735 : {
736 17 : vt_->destroy(sink_);
737 17 : ::operator delete(storage_);
738 : }
739 215 : if(cached_awaitable_)
740 208 : ::operator delete(cached_awaitable_);
741 215 : }
742 :
743 : inline any_buffer_sink&
744 5 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
745 : {
746 5 : if(this != &other)
747 : {
748 4 : if(storage_)
749 : {
750 1 : vt_->destroy(sink_);
751 1 : ::operator delete(storage_);
752 : }
753 4 : if(cached_awaitable_)
754 2 : ::operator delete(cached_awaitable_);
755 4 : sink_ = std::exchange(other.sink_, nullptr);
756 4 : vt_ = std::exchange(other.vt_, nullptr);
757 4 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
758 4 : storage_ = std::exchange(other.storage_, nullptr);
759 4 : active_ops_ = std::exchange(other.active_ops_, nullptr);
760 4 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
761 : }
762 5 : return *this;
763 : }
764 :
765 : template<BufferSink S>
766 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
767 18 : any_buffer_sink::any_buffer_sink(S s)
768 18 : : vt_(&vtable_for_impl<S>::value)
769 : {
770 : struct guard {
771 : any_buffer_sink* self;
772 : bool committed = false;
773 18 : ~guard() {
774 18 : if(!committed && self->storage_) {
775 MIS 0 : self->vt_->destroy(self->sink_);
776 0 : ::operator delete(self->storage_);
777 0 : self->storage_ = nullptr;
778 0 : self->sink_ = nullptr;
779 : }
780 HIT 18 : }
781 18 : } g{this};
782 :
783 18 : storage_ = ::operator new(sizeof(S));
784 18 : sink_ = ::new(storage_) S(std::move(s));
785 :
786 18 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
787 :
788 18 : g.committed = true;
789 18 : }
790 :
791 : template<BufferSink S>
792 192 : any_buffer_sink::any_buffer_sink(S* s)
793 192 : : sink_(s)
794 192 : , vt_(&vtable_for_impl<S>::value)
795 : {
796 192 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
797 192 : }
798 :
799 : inline std::span<mutable_buffer>
800 126 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
801 : {
802 126 : return vt_->do_prepare(sink_, dest);
803 : }
804 :
805 : inline auto
806 96 : any_buffer_sink::commit(std::size_t n)
807 : {
808 : struct awaitable
809 : {
810 : any_buffer_sink* self_;
811 : std::size_t n_;
812 :
813 : bool
814 96 : await_ready()
815 : {
816 192 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
817 96 : self_->sink_,
818 96 : self_->cached_awaitable_,
819 : n_);
820 96 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
821 : }
822 :
823 : std::coroutine_handle<>
824 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
825 : {
826 0 : return self_->active_ops_->await_suspend(
827 0 : self_->cached_awaitable_, h, env);
828 : }
829 :
830 : io_result<>
831 HIT 96 : await_resume()
832 : {
833 : struct guard {
834 : any_buffer_sink* self;
835 96 : ~guard() {
836 96 : self->active_ops_->destroy(self->cached_awaitable_);
837 96 : self->active_ops_ = nullptr;
838 96 : }
839 96 : } g{self_};
840 96 : return self_->active_ops_->await_resume(
841 166 : self_->cached_awaitable_);
842 96 : }
843 : };
844 96 : return awaitable{this, n};
845 : }
846 :
847 : inline auto
848 54 : any_buffer_sink::commit_eof(std::size_t n)
849 : {
850 : struct awaitable
851 : {
852 : any_buffer_sink* self_;
853 : std::size_t n_;
854 :
855 : bool
856 54 : await_ready()
857 : {
858 108 : self_->active_ops_ = self_->vt_->construct_commit_eof_awaitable(
859 54 : self_->sink_,
860 54 : self_->cached_awaitable_,
861 : n_);
862 54 : return self_->active_ops_->await_ready(self_->cached_awaitable_);
863 : }
864 :
865 : std::coroutine_handle<>
866 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
867 : {
868 0 : return self_->active_ops_->await_suspend(
869 0 : self_->cached_awaitable_, h, env);
870 : }
871 :
872 : io_result<>
873 HIT 54 : await_resume()
874 : {
875 : struct guard {
876 : any_buffer_sink* self;
877 54 : ~guard() {
878 54 : self->active_ops_->destroy(self->cached_awaitable_);
879 54 : self->active_ops_ = nullptr;
880 54 : }
881 54 : } g{self_};
882 54 : return self_->active_ops_->await_resume(
883 92 : self_->cached_awaitable_);
884 54 : }
885 : };
886 54 : return awaitable{this, n};
887 : }
888 :
889 : inline auto
890 6 : any_buffer_sink::write_some_(
891 : std::span<const_buffer const> buffers)
892 : {
893 : struct awaitable
894 : {
895 : any_buffer_sink* self_;
896 : std::span<const_buffer const> buffers_;
897 :
898 : bool
899 6 : await_ready() const noexcept
900 : {
901 6 : return false;
902 : }
903 :
904 : std::coroutine_handle<>
905 6 : await_suspend(std::coroutine_handle<> h, io_env const* env)
906 : {
907 12 : self_->active_write_ops_ =
908 12 : self_->vt_->construct_write_some_awaitable(
909 6 : self_->sink_,
910 6 : self_->cached_awaitable_,
911 : buffers_);
912 :
913 6 : if(self_->active_write_ops_->await_ready(
914 6 : self_->cached_awaitable_))
915 6 : return h;
916 :
917 MIS 0 : return self_->active_write_ops_->await_suspend(
918 0 : self_->cached_awaitable_, h, env);
919 : }
920 :
921 : io_result<std::size_t>
922 HIT 6 : await_resume()
923 : {
924 : struct guard {
925 : any_buffer_sink* self;
926 6 : ~guard() {
927 6 : self->active_write_ops_->destroy(
928 6 : self->cached_awaitable_);
929 6 : self->active_write_ops_ = nullptr;
930 6 : }
931 6 : } g{self_};
932 6 : return self_->active_write_ops_->await_resume(
933 10 : self_->cached_awaitable_);
934 6 : }
935 : };
936 6 : return awaitable{this, buffers};
937 : }
938 :
939 : inline auto
940 14 : any_buffer_sink::write_(
941 : std::span<const_buffer const> buffers)
942 : {
943 : struct awaitable
944 : {
945 : any_buffer_sink* self_;
946 : std::span<const_buffer const> buffers_;
947 :
948 : bool
949 14 : await_ready() const noexcept
950 : {
951 14 : return false;
952 : }
953 :
954 : std::coroutine_handle<>
955 14 : await_suspend(std::coroutine_handle<> h, io_env const* env)
956 : {
957 28 : self_->active_write_ops_ =
958 28 : self_->vt_->construct_write_awaitable(
959 14 : self_->sink_,
960 14 : self_->cached_awaitable_,
961 : buffers_);
962 :
963 14 : if(self_->active_write_ops_->await_ready(
964 14 : self_->cached_awaitable_))
965 14 : return h;
966 :
967 MIS 0 : return self_->active_write_ops_->await_suspend(
968 0 : self_->cached_awaitable_, h, env);
969 : }
970 :
971 : io_result<std::size_t>
972 HIT 14 : await_resume()
973 : {
974 : struct guard {
975 : any_buffer_sink* self;
976 14 : ~guard() {
977 14 : self->active_write_ops_->destroy(
978 14 : self->cached_awaitable_);
979 14 : self->active_write_ops_ = nullptr;
980 14 : }
981 14 : } g{self_};
982 14 : return self_->active_write_ops_->await_resume(
983 24 : self_->cached_awaitable_);
984 14 : }
985 : };
986 14 : return awaitable{this, buffers};
987 : }
988 :
989 : inline auto
990 12 : any_buffer_sink::write_eof_buffers_(
991 : std::span<const_buffer const> buffers)
992 : {
993 : struct awaitable
994 : {
995 : any_buffer_sink* self_;
996 : std::span<const_buffer const> buffers_;
997 :
998 : bool
999 12 : await_ready() const noexcept
1000 : {
1001 12 : return false;
1002 : }
1003 :
1004 : std::coroutine_handle<>
1005 12 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1006 : {
1007 24 : self_->active_write_ops_ =
1008 24 : self_->vt_->construct_write_eof_buffers_awaitable(
1009 12 : self_->sink_,
1010 12 : self_->cached_awaitable_,
1011 : buffers_);
1012 :
1013 12 : if(self_->active_write_ops_->await_ready(
1014 12 : self_->cached_awaitable_))
1015 12 : return h;
1016 :
1017 MIS 0 : return self_->active_write_ops_->await_suspend(
1018 0 : self_->cached_awaitable_, h, env);
1019 : }
1020 :
1021 : io_result<std::size_t>
1022 HIT 12 : await_resume()
1023 : {
1024 : struct guard {
1025 : any_buffer_sink* self;
1026 12 : ~guard() {
1027 12 : self->active_write_ops_->destroy(
1028 12 : self->cached_awaitable_);
1029 12 : self->active_write_ops_ = nullptr;
1030 12 : }
1031 12 : } g{self_};
1032 12 : return self_->active_write_ops_->await_resume(
1033 20 : self_->cached_awaitable_);
1034 12 : }
1035 : };
1036 12 : return awaitable{this, buffers};
1037 : }
1038 :
1039 : template<ConstBufferSequence CB>
1040 : io_task<std::size_t>
1041 22 : any_buffer_sink::write_some(CB buffers)
1042 : {
1043 : buffer_param<CB> bp(buffers);
1044 : auto src = bp.data();
1045 : if(src.empty())
1046 : co_return {{}, 0};
1047 :
1048 : // Native WriteSink path
1049 : if(vt_->construct_write_some_awaitable)
1050 : co_return co_await write_some_(src);
1051 :
1052 : // Synthesized path: prepare + buffer_copy + commit
1053 : mutable_buffer arr[detail::max_iovec_];
1054 : auto dst_bufs = prepare(arr);
1055 : if(dst_bufs.empty())
1056 : {
1057 : auto [ec] = co_await commit(0);
1058 : if(ec)
1059 : co_return {ec, 0};
1060 : dst_bufs = prepare(arr);
1061 : if(dst_bufs.empty())
1062 : co_return {{}, 0};
1063 : }
1064 :
1065 : auto n = buffer_copy(dst_bufs, src);
1066 : auto [ec] = co_await commit(n);
1067 : if(ec)
1068 : co_return {ec, 0};
1069 : co_return {{}, n};
1070 44 : }
1071 :
1072 : template<ConstBufferSequence CB>
1073 : io_task<std::size_t>
1074 38 : any_buffer_sink::write(CB buffers)
1075 : {
1076 : buffer_param<CB> bp(buffers);
1077 : std::size_t total = 0;
1078 :
1079 : // Native WriteSink path
1080 : if(vt_->construct_write_awaitable)
1081 : {
1082 : for(;;)
1083 : {
1084 : auto bufs = bp.data();
1085 : if(bufs.empty())
1086 : break;
1087 :
1088 : auto [ec, n] = co_await write_(bufs);
1089 : total += n;
1090 : if(ec)
1091 : co_return {ec, total};
1092 : bp.consume(n);
1093 : }
1094 : co_return {{}, total};
1095 : }
1096 :
1097 : // Synthesized path: prepare + buffer_copy + commit
1098 : for(;;)
1099 : {
1100 : auto src = bp.data();
1101 : if(src.empty())
1102 : break;
1103 :
1104 : mutable_buffer arr[detail::max_iovec_];
1105 : auto dst_bufs = prepare(arr);
1106 : if(dst_bufs.empty())
1107 : {
1108 : auto [ec] = co_await commit(0);
1109 : if(ec)
1110 : co_return {ec, total};
1111 : continue;
1112 : }
1113 :
1114 : auto n = buffer_copy(dst_bufs, src);
1115 : auto [ec] = co_await commit(n);
1116 : if(ec)
1117 : co_return {ec, total};
1118 : bp.consume(n);
1119 : total += n;
1120 : }
1121 :
1122 : co_return {{}, total};
1123 76 : }
1124 :
1125 : inline auto
1126 32 : any_buffer_sink::write_eof()
1127 : {
1128 : struct awaitable
1129 : {
1130 : any_buffer_sink* self_;
1131 :
1132 : bool
1133 32 : await_ready()
1134 : {
1135 32 : if(self_->vt_->construct_write_eof_awaitable)
1136 : {
1137 : // Native WriteSink: forward to underlying write_eof()
1138 32 : self_->active_ops_ =
1139 16 : self_->vt_->construct_write_eof_awaitable(
1140 16 : self_->sink_,
1141 16 : self_->cached_awaitable_);
1142 : }
1143 : else
1144 : {
1145 : // Synthesized: commit_eof(0)
1146 32 : self_->active_ops_ =
1147 16 : self_->vt_->construct_commit_eof_awaitable(
1148 16 : self_->sink_,
1149 16 : self_->cached_awaitable_,
1150 : 0);
1151 : }
1152 64 : return self_->active_ops_->await_ready(
1153 32 : self_->cached_awaitable_);
1154 : }
1155 :
1156 : std::coroutine_handle<>
1157 MIS 0 : await_suspend(std::coroutine_handle<> h, io_env const* env)
1158 : {
1159 0 : return self_->active_ops_->await_suspend(
1160 0 : self_->cached_awaitable_, h, env);
1161 : }
1162 :
1163 : io_result<>
1164 HIT 32 : await_resume()
1165 : {
1166 : struct guard {
1167 : any_buffer_sink* self;
1168 32 : ~guard() {
1169 32 : self->active_ops_->destroy(self->cached_awaitable_);
1170 32 : self->active_ops_ = nullptr;
1171 32 : }
1172 32 : } g{self_};
1173 32 : return self_->active_ops_->await_resume(
1174 54 : self_->cached_awaitable_);
1175 32 : }
1176 : };
1177 32 : return awaitable{this};
1178 : }
1179 :
1180 : template<ConstBufferSequence CB>
1181 : io_task<std::size_t>
1182 40 : any_buffer_sink::write_eof(CB buffers)
1183 : {
1184 : // Native WriteSink path
1185 : if(vt_->construct_write_eof_buffers_awaitable)
1186 : {
1187 : const_buffer_param<CB> bp(buffers);
1188 : std::size_t total = 0;
1189 :
1190 : for(;;)
1191 : {
1192 : auto bufs = bp.data();
1193 : if(bufs.empty())
1194 : {
1195 : auto [ec] = co_await write_eof();
1196 : co_return {ec, total};
1197 : }
1198 :
1199 : if(!bp.more())
1200 : {
1201 : // Last window: send atomically with EOF
1202 : auto [ec, n] = co_await write_eof_buffers_(bufs);
1203 : total += n;
1204 : co_return {ec, total};
1205 : }
1206 :
1207 : auto [ec, n] = co_await write_(bufs);
1208 : total += n;
1209 : if(ec)
1210 : co_return {ec, total};
1211 : bp.consume(n);
1212 : }
1213 : }
1214 :
1215 : // Synthesized path: prepare + buffer_copy + commit + commit_eof
1216 : buffer_param<CB> bp(buffers);
1217 : std::size_t total = 0;
1218 :
1219 : for(;;)
1220 : {
1221 : auto src = bp.data();
1222 : if(src.empty())
1223 : break;
1224 :
1225 : mutable_buffer arr[detail::max_iovec_];
1226 : auto dst_bufs = prepare(arr);
1227 : if(dst_bufs.empty())
1228 : {
1229 : auto [ec] = co_await commit(0);
1230 : if(ec)
1231 : co_return {ec, total};
1232 : continue;
1233 : }
1234 :
1235 : auto n = buffer_copy(dst_bufs, src);
1236 : auto [ec] = co_await commit(n);
1237 : if(ec)
1238 : co_return {ec, total};
1239 : bp.consume(n);
1240 : total += n;
1241 : }
1242 :
1243 : auto [ec] = co_await commit_eof(0);
1244 : if(ec)
1245 : co_return {ec, total};
1246 :
1247 : co_return {{}, total};
1248 80 : }
1249 :
1250 : static_assert(BufferSink<any_buffer_sink>);
1251 : static_assert(WriteSink<any_buffer_sink>);
1252 :
1253 : } // namespace capy
1254 : } // namespace boost
1255 :
1256 : #endif
|