include/boost/capy/io/any_write_sink.hpp

93.2% Lines (164/176) 93.3% Functions (42/45)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_array.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/io_awaitable.hpp>
19 #include <boost/capy/concept/write_sink.hpp>
20 #include <coroutine>
21 #include <boost/capy/ex/io_env.hpp>
22 #include <boost/capy/io_result.hpp>
23 #include <boost/capy/io_task.hpp>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <exception>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any WriteSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref WriteSink concept, enabling runtime polymorphism for
42 sink write operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper supports two construction modes:
46 - **Owning**: Pass by value to transfer ownership. The wrapper
47 allocates storage and owns the sink.
48 - **Reference**: Pass a pointer to wrap without ownership. The
49 pointed-to sink must outlive this wrapper.
50
51 @par Awaitable Preallocation
52 The constructor preallocates storage for the type-erased awaitable.
53 This reserves all virtual address space at server startup
54 so memory usage can be measured up front, rather than
55 allocating piecemeal as traffic arrives.
56
57 @par Immediate Completion
58 Operations complete immediately without suspending when the
59 buffer sequence is empty, or when the underlying sink's
60 awaitable reports readiness via `await_ready`.
61
62 @par Thread Safety
63 Not thread-safe. Concurrent operations on the same wrapper
64 are undefined behavior.
65
66 @par Example
67 @code
68 // Owning - takes ownership of the sink
69 any_write_sink ws(some_sink{args...});
70
71 // Reference - wraps without ownership
72 some_sink sink;
73 any_write_sink ws(&sink);
74
75 const_buffer buf(data, size);
76 auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 auto [ec2] = co_await ws.write_eof();
78 @endcode
79
80 @see any_write_stream, WriteSink
81 */
82 class any_write_sink
83 {
84 struct vtable;
85 struct write_awaitable_ops;
86 struct eof_awaitable_ops;
87
88 template<WriteSink S>
89 struct vtable_for_impl;
90
91 void* sink_ = nullptr;
92 vtable const* vt_ = nullptr;
93 void* cached_awaitable_ = nullptr;
94 void* storage_ = nullptr;
95 write_awaitable_ops const* active_write_ops_ = nullptr;
96 eof_awaitable_ops const* active_eof_ops_ = nullptr;
97
98 public:
99 /** Destructor.
100
101 Destroys the owned sink (if any) and releases the cached
102 awaitable storage.
103 */
104 ~any_write_sink();
105
106 /** Construct a default instance.
107
108 Constructs an empty wrapper. Operations on a default-constructed
109 wrapper result in undefined behavior.
110 */
111 any_write_sink() = default;
112
113 /** Non-copyable.
114
115 The awaitable cache is per-instance and cannot be shared.
116 */
117 any_write_sink(any_write_sink const&) = delete;
118 any_write_sink& operator=(any_write_sink const&) = delete;
119
120 /** Construct by moving.
121
122 Transfers ownership of the wrapped sink (if owned) and
123 cached awaitable storage from `other`. After the move, `other` is
124 in a default-constructed state.
125
126 @param other The wrapper to move from.
127 */
128 1x any_write_sink(any_write_sink&& other) noexcept
129 1x : sink_(std::exchange(other.sink_, nullptr))
130 1x , vt_(std::exchange(other.vt_, nullptr))
131 1x , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1x , storage_(std::exchange(other.storage_, nullptr))
133 1x , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1x , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 {
136 1x }
137
138 /** Assign by moving.
139
140 Destroys any owned sink and releases existing resources,
141 then transfers ownership from `other`.
142
143 @param other The wrapper to move from.
144 @return Reference to this wrapper.
145 */
146 any_write_sink&
147 operator=(any_write_sink&& other) noexcept;
148
149 /** Construct by taking ownership of a WriteSink.
150
151 Allocates storage and moves the sink into this wrapper.
152 The wrapper owns the sink and will destroy it.
153
154 @param s The sink to take ownership of.
155 */
156 template<WriteSink S>
157 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 any_write_sink(S s);
159
160 /** Construct by wrapping a WriteSink without ownership.
161
162 Wraps the given sink by pointer. The sink must remain
163 valid for the lifetime of this wrapper.
164
165 @param s Pointer to the sink to wrap.
166 */
167 template<WriteSink S>
168 any_write_sink(S* s);
169
170 /** Check if the wrapper contains a valid sink.
171
172 @return `true` if wrapping a sink, `false` if default-constructed
173 or moved-from.
174 */
175 bool
176 15x has_value() const noexcept
177 {
178 15x return sink_ != nullptr;
179 }
180
181 /** Check if the wrapper contains a valid sink.
182
183 @return `true` if wrapping a sink, `false` if default-constructed
184 or moved-from.
185 */
186 explicit
187 2x operator bool() const noexcept
188 {
189 2x return has_value();
190 }
191
192 /** Initiate a partial write operation.
193
194 Writes one or more bytes from the provided buffer sequence.
195 May consume less than the full sequence.
196
197 @param buffers The buffer sequence containing data to write.
198
199 @return An awaitable yielding `(error_code,std::size_t)`.
200
201 @par Immediate Completion
202 The operation completes immediately without suspending
203 the calling coroutine when:
204 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205 @li The underlying sink's awaitable reports immediate
206 readiness via `await_ready`.
207
208 @note This is a partial operation and may not process the
209 entire buffer sequence. Use @ref write for guaranteed
210 complete transfer.
211
212 @par Preconditions
213 The wrapper must contain a valid sink (`has_value() == true`).
214 */
215 template<ConstBufferSequence CB>
216 auto
217 write_some(CB buffers);
218
219 /** Initiate a complete write operation.
220
221 Writes data from the provided buffer sequence. The operation
222 completes when all bytes have been consumed, or an error
223 occurs. Forwards to the underlying sink's `write` operation,
224 windowed through @ref buffer_param when the sequence exceeds
225 the per-call buffer limit.
226
227 @param buffers The buffer sequence containing data to write.
228
229 @return An awaitable yielding `(error_code,std::size_t)`.
230
231 @par Immediate Completion
232 The operation completes immediately without suspending
233 the calling coroutine when:
234 @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235 @li Every underlying `write` call completes
236 immediately (the wrapped sink reports readiness
237 via `await_ready` on each iteration).
238
239 @par Preconditions
240 The wrapper must contain a valid sink (`has_value() == true`).
241 */
242 template<ConstBufferSequence CB>
243 io_task<std::size_t>
244 write(CB buffers);
245
246 /** Atomically write data and signal end-of-stream.
247
248 Writes all data from the buffer sequence and then signals
249 end-of-stream. The implementation decides how to partition
250 the data across calls to the underlying sink's @ref write
251 and `write_eof`. When the caller's buffer sequence is
252 non-empty, the final call to the underlying sink is always
253 `write_eof` with a non-empty buffer sequence. When the
254 caller's buffer sequence is empty, only `write_eof()` with
255 no data is called.
256
257 @param buffers The buffer sequence containing data to write.
258
259 @return An awaitable yielding `(error_code,std::size_t)`.
260
261 @par Immediate Completion
262 The operation completes immediately without suspending
263 the calling coroutine when:
264 @li The buffer sequence is empty. Only the @ref write_eof()
265 call is performed.
266 @li All underlying operations complete immediately (the
267 wrapped sink reports readiness via `await_ready`).
268
269 @par Preconditions
270 The wrapper must contain a valid sink (`has_value() == true`).
271 */
272 template<ConstBufferSequence CB>
273 io_task<std::size_t>
274 write_eof(CB buffers);
275
276 /** Signal end of data.
277
278 Indicates that no more data will be written to the sink.
279 The operation completes when the sink is finalized, or
280 an error occurs.
281
282 @return An awaitable yielding `(error_code)`.
283
284 @par Immediate Completion
285 The operation completes immediately without suspending
286 the calling coroutine when the underlying sink's awaitable
287 reports immediate readiness via `await_ready`.
288
289 @par Preconditions
290 The wrapper must contain a valid sink (`has_value() == true`).
291 */
292 auto
293 write_eof();
294
295 protected:
296 /** Rebind to a new sink after move.
297
298 Updates the internal pointer to reference a new sink object.
299 Used by owning wrappers after move assignment when the owned
300 object has moved to a new location.
301
302 @param new_sink The new sink to bind to. Must be the same
303 type as the original sink.
304
305 @note Terminates if called with a sink of different type
306 than the original.
307 */
308 template<WriteSink S>
309 void
310 rebind(S& new_sink) noexcept
311 {
312 if(vt_ != &vtable_for_impl<S>::value)
313 std::terminate();
314 sink_ = &new_sink;
315 }
316
317 private:
318 auto
319 write_some_(std::span<const_buffer const> buffers);
320
321 auto
322 write_(std::span<const_buffer const> buffers);
323
324 auto
325 write_eof_buffers_(std::span<const_buffer const> buffers);
326 };
327
328 struct any_write_sink::write_awaitable_ops
329 {
330 bool (*await_ready)(void*);
331 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
332 io_result<std::size_t> (*await_resume)(void*);
333 void (*destroy)(void*) noexcept;
334 };
335
336 struct any_write_sink::eof_awaitable_ops
337 {
338 bool (*await_ready)(void*);
339 std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
340 io_result<> (*await_resume)(void*);
341 void (*destroy)(void*) noexcept;
342 };
343
344 struct any_write_sink::vtable
345 {
346 write_awaitable_ops const* (*construct_write_some_awaitable)(
347 void* sink,
348 void* storage,
349 std::span<const_buffer const> buffers);
350 write_awaitable_ops const* (*construct_write_awaitable)(
351 void* sink,
352 void* storage,
353 std::span<const_buffer const> buffers);
354 write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
355 void* sink,
356 void* storage,
357 std::span<const_buffer const> buffers);
358 eof_awaitable_ops const* (*construct_eof_awaitable)(
359 void* sink,
360 void* storage);
361 std::size_t awaitable_size;
362 std::size_t awaitable_align;
363 void (*destroy)(void*) noexcept;
364 };
365
366 template<WriteSink S>
367 struct any_write_sink::vtable_for_impl
368 {
369 using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
370 std::span<const_buffer const>{}));
371 using WriteAwaitable = decltype(std::declval<S&>().write(
372 std::span<const_buffer const>{}));
373 using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
374 std::span<const_buffer const>{}));
375 using EofAwaitable = decltype(std::declval<S&>().write_eof());
376
377 static void
378 6x do_destroy_impl(void* sink) noexcept
379 {
380 6x static_cast<S*>(sink)->~S();
381 6x }
382
383 static write_awaitable_ops const*
384 40x construct_write_some_awaitable_impl(
385 void* sink,
386 void* storage,
387 std::span<const_buffer const> buffers)
388 {
389 40x auto& s = *static_cast<S*>(sink);
390 40x ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
391
392 static constexpr write_awaitable_ops ops = {
393 +[](void* p) {
394 return static_cast<WriteSomeAwaitable*>(p)->await_ready();
395 },
396 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
397 return detail::call_await_suspend(
398 static_cast<WriteSomeAwaitable*>(p), h, env);
399 },
400 +[](void* p) {
401 return static_cast<WriteSomeAwaitable*>(p)->await_resume();
402 },
403 +[](void* p) noexcept {
404 static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
405 }
406 };
407 40x return &ops;
408 }
409
410 static write_awaitable_ops const*
411 78x construct_write_awaitable_impl(
412 void* sink,
413 void* storage,
414 std::span<const_buffer const> buffers)
415 {
416 78x auto& s = *static_cast<S*>(sink);
417 78x ::new(storage) WriteAwaitable(s.write(buffers));
418
419 static constexpr write_awaitable_ops ops = {
420 +[](void* p) {
421 return static_cast<WriteAwaitable*>(p)->await_ready();
422 },
423 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
424 return detail::call_await_suspend(
425 static_cast<WriteAwaitable*>(p), h, env);
426 },
427 +[](void* p) {
428 return static_cast<WriteAwaitable*>(p)->await_resume();
429 },
430 +[](void* p) noexcept {
431 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
432 }
433 };
434 78x return &ops;
435 }
436
437 static write_awaitable_ops const*
438 16x construct_write_eof_buffers_awaitable_impl(
439 void* sink,
440 void* storage,
441 std::span<const_buffer const> buffers)
442 {
443 16x auto& s = *static_cast<S*>(sink);
444 16x ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
445
446 static constexpr write_awaitable_ops ops = {
447 +[](void* p) {
448 return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
449 },
450 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
451 return detail::call_await_suspend(
452 static_cast<WriteEofBuffersAwaitable*>(p), h, env);
453 },
454 +[](void* p) {
455 return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
456 },
457 +[](void* p) noexcept {
458 static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
459 }
460 };
461 16x return &ops;
462 }
463
464 static eof_awaitable_ops const*
465 17x construct_eof_awaitable_impl(
466 void* sink,
467 void* storage)
468 {
469 17x auto& s = *static_cast<S*>(sink);
470 17x ::new(storage) EofAwaitable(s.write_eof());
471
472 static constexpr eof_awaitable_ops ops = {
473 +[](void* p) {
474 return static_cast<EofAwaitable*>(p)->await_ready();
475 },
476 +[](void* p, std::coroutine_handle<> h, io_env const* env) {
477 return detail::call_await_suspend(
478 static_cast<EofAwaitable*>(p), h, env);
479 },
480 +[](void* p) {
481 return static_cast<EofAwaitable*>(p)->await_resume();
482 },
483 +[](void* p) noexcept {
484 static_cast<EofAwaitable*>(p)->~EofAwaitable();
485 }
486 };
487 17x return &ops;
488 }
489
490 static constexpr std::size_t max4(
491 std::size_t a, std::size_t b,
492 std::size_t c, std::size_t d) noexcept
493 {
494 std::size_t ab = a > b ? a : b;
495 std::size_t cd = c > d ? c : d;
496 return ab > cd ? ab : cd;
497 }
498
499 static constexpr std::size_t max_awaitable_size =
500 max4(sizeof(WriteSomeAwaitable),
501 sizeof(WriteAwaitable),
502 sizeof(WriteEofBuffersAwaitable),
503 sizeof(EofAwaitable));
504
505 static constexpr std::size_t max_awaitable_align =
506 max4(alignof(WriteSomeAwaitable),
507 alignof(WriteAwaitable),
508 alignof(WriteEofBuffersAwaitable),
509 alignof(EofAwaitable));
510
511 static constexpr vtable value = {
512 &construct_write_some_awaitable_impl,
513 &construct_write_awaitable_impl,
514 &construct_write_eof_buffers_awaitable_impl,
515 &construct_eof_awaitable_impl,
516 max_awaitable_size,
517 max_awaitable_align,
518 &do_destroy_impl
519 };
520 };
521
522 inline
523 129x any_write_sink::~any_write_sink()
524 {
525 129x if(storage_)
526 {
527 6x vt_->destroy(sink_);
528 6x ::operator delete(storage_);
529 }
530 129x if(cached_awaitable_)
531 {
532 124x if(active_write_ops_)
533 1x active_write_ops_->destroy(cached_awaitable_);
534 123x else if(active_eof_ops_)
535 1x active_eof_ops_->destroy(cached_awaitable_);
536 124x ::operator delete(cached_awaitable_);
537 }
538 129x }
539
540 inline any_write_sink&
541 2x any_write_sink::operator=(any_write_sink&& other) noexcept
542 {
543 2x if(this != &other)
544 {
545 2x if(storage_)
546 {
547 vt_->destroy(sink_);
548 ::operator delete(storage_);
549 }
550 2x if(cached_awaitable_)
551 {
552 1x if(active_write_ops_)
553 1x active_write_ops_->destroy(cached_awaitable_);
554 else if(active_eof_ops_)
555 active_eof_ops_->destroy(cached_awaitable_);
556 1x ::operator delete(cached_awaitable_);
557 }
558 2x sink_ = std::exchange(other.sink_, nullptr);
559 2x vt_ = std::exchange(other.vt_, nullptr);
560 2x cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
561 2x storage_ = std::exchange(other.storage_, nullptr);
562 2x active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
563 2x active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
564 }
565 2x return *this;
566 }
567
568 template<WriteSink S>
569 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
570 6x any_write_sink::any_write_sink(S s)
571 6x : vt_(&vtable_for_impl<S>::value)
572 {
573 struct guard {
574 any_write_sink* self;
575 bool committed = false;
576 6x ~guard() {
577 6x if(!committed && self->storage_) {
578 self->vt_->destroy(self->sink_);
579 ::operator delete(self->storage_);
580 self->storage_ = nullptr;
581 self->sink_ = nullptr;
582 }
583 6x }
584 6x } g{this};
585
586 6x storage_ = ::operator new(sizeof(S));
587 6x sink_ = ::new(storage_) S(std::move(s));
588
589 // Preallocate the awaitable storage (sized for max of write/eof)
590 6x cached_awaitable_ = ::operator new(vt_->awaitable_size);
591
592 6x g.committed = true;
593 6x }
594
595 template<WriteSink S>
596 119x any_write_sink::any_write_sink(S* s)
597 119x : sink_(s)
598 119x , vt_(&vtable_for_impl<S>::value)
599 {
600 // Preallocate the awaitable storage (sized for max of write/eof)
601 119x cached_awaitable_ = ::operator new(vt_->awaitable_size);
602 119x }
603
604 inline auto
605 any_write_sink::write_some_(
606 std::span<const_buffer const> buffers)
607 {
608 struct awaitable
609 {
610 any_write_sink* self_;
611 std::span<const_buffer const> buffers_;
612
613 bool
614 await_ready() const noexcept
615 {
616 return false;
617 }
618
619 std::coroutine_handle<>
620 await_suspend(std::coroutine_handle<> h, io_env const* env)
621 {
622 self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
623 self_->sink_,
624 self_->cached_awaitable_,
625 buffers_);
626
627 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
628 return h;
629
630 return self_->active_write_ops_->await_suspend(
631 self_->cached_awaitable_, h, env);
632 }
633
634 io_result<std::size_t>
635 await_resume()
636 {
637 struct guard {
638 any_write_sink* self;
639 ~guard() {
640 self->active_write_ops_->destroy(self->cached_awaitable_);
641 self->active_write_ops_ = nullptr;
642 }
643 } g{self_};
644 return self_->active_write_ops_->await_resume(
645 self_->cached_awaitable_);
646 }
647 };
648 return awaitable{this, buffers};
649 }
650
651 inline auto
652 78x any_write_sink::write_(
653 std::span<const_buffer const> buffers)
654 {
655 struct awaitable
656 {
657 any_write_sink* self_;
658 std::span<const_buffer const> buffers_;
659
660 bool
661 78x await_ready() const noexcept
662 {
663 78x return false;
664 }
665
666 std::coroutine_handle<>
667 78x await_suspend(std::coroutine_handle<> h, io_env const* env)
668 {
669 156x self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
670 78x self_->sink_,
671 78x self_->cached_awaitable_,
672 buffers_);
673
674 78x if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
675 78x return h;
676
677 return self_->active_write_ops_->await_suspend(
678 self_->cached_awaitable_, h, env);
679 }
680
681 io_result<std::size_t>
682 78x await_resume()
683 {
684 struct guard {
685 any_write_sink* self;
686 78x ~guard() {
687 78x self->active_write_ops_->destroy(self->cached_awaitable_);
688 78x self->active_write_ops_ = nullptr;
689 78x }
690 78x } g{self_};
691 78x return self_->active_write_ops_->await_resume(
692 135x self_->cached_awaitable_);
693 78x }
694 };
695 78x return awaitable{this, buffers};
696 }
697
698 inline auto
699 17x any_write_sink::write_eof()
700 {
701 struct awaitable
702 {
703 any_write_sink* self_;
704
705 bool
706 17x await_ready() const noexcept
707 {
708 17x return false;
709 }
710
711 std::coroutine_handle<>
712 17x await_suspend(std::coroutine_handle<> h, io_env const* env)
713 {
714 // Construct the underlying awaitable into cached storage
715 34x self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
716 17x self_->sink_,
717 17x self_->cached_awaitable_);
718
719 // Check if underlying is immediately ready
720 17x if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
721 16x return h;
722
723 // Forward to underlying awaitable
724 1x return self_->active_eof_ops_->await_suspend(
725 1x self_->cached_awaitable_, h, env);
726 }
727
728 io_result<>
729 16x await_resume()
730 {
731 struct guard {
732 any_write_sink* self;
733 16x ~guard() {
734 16x self->active_eof_ops_->destroy(self->cached_awaitable_);
735 16x self->active_eof_ops_ = nullptr;
736 16x }
737 16x } g{self_};
738 16x return self_->active_eof_ops_->await_resume(
739 27x self_->cached_awaitable_);
740 16x }
741 };
742 17x return awaitable{this};
743 }
744
745 inline auto
746 16x any_write_sink::write_eof_buffers_(
747 std::span<const_buffer const> buffers)
748 {
749 struct awaitable
750 {
751 any_write_sink* self_;
752 std::span<const_buffer const> buffers_;
753
754 bool
755 16x await_ready() const noexcept
756 {
757 16x return false;
758 }
759
760 std::coroutine_handle<>
761 16x await_suspend(std::coroutine_handle<> h, io_env const* env)
762 {
763 32x self_->active_write_ops_ =
764 32x self_->vt_->construct_write_eof_buffers_awaitable(
765 16x self_->sink_,
766 16x self_->cached_awaitable_,
767 buffers_);
768
769 16x if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
770 16x return h;
771
772 return self_->active_write_ops_->await_suspend(
773 self_->cached_awaitable_, h, env);
774 }
775
776 io_result<std::size_t>
777 16x await_resume()
778 {
779 struct guard {
780 any_write_sink* self;
781 16x ~guard() {
782 16x self->active_write_ops_->destroy(self->cached_awaitable_);
783 16x self->active_write_ops_ = nullptr;
784 16x }
785 16x } g{self_};
786 16x return self_->active_write_ops_->await_resume(
787 27x self_->cached_awaitable_);
788 16x }
789 };
790 16x return awaitable{this, buffers};
791 }
792
793 template<ConstBufferSequence CB>
794 auto
795 42x any_write_sink::write_some(CB buffers)
796 {
797 struct awaitable
798 {
799 any_write_sink* self_;
800 const_buffer_array<detail::max_iovec_> ba_;
801
802 42x awaitable(
803 any_write_sink* self,
804 CB const& buffers)
805 42x : self_(self)
806 42x , ba_(buffers)
807 {
808 42x }
809
810 bool
811 42x await_ready() const noexcept
812 {
813 42x return ba_.to_span().empty();
814 }
815
816 std::coroutine_handle<>
817 40x await_suspend(std::coroutine_handle<> h, io_env const* env)
818 {
819 40x self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
820 40x self_->sink_,
821 40x self_->cached_awaitable_,
822 40x ba_.to_span());
823
824 40x if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
825 38x return h;
826
827 2x return self_->active_write_ops_->await_suspend(
828 2x self_->cached_awaitable_, h, env);
829 }
830
831 io_result<std::size_t>
832 40x await_resume()
833 {
834 40x if(ba_.to_span().empty())
835 2x return {{}, 0};
836
837 struct guard {
838 any_write_sink* self;
839 38x ~guard() {
840 38x self->active_write_ops_->destroy(self->cached_awaitable_);
841 38x self->active_write_ops_ = nullptr;
842 38x }
843 38x } g{self_};
844 38x return self_->active_write_ops_->await_resume(
845 38x self_->cached_awaitable_);
846 38x }
847 };
848 42x return awaitable{this, buffers};
849 }
850
851 template<ConstBufferSequence CB>
852 io_task<std::size_t>
853 68x any_write_sink::write(CB buffers)
854 {
855 buffer_param<CB> bp(buffers);
856 std::size_t total = 0;
857
858 for(;;)
859 {
860 auto bufs = bp.data();
861 if(bufs.empty())
862 break;
863
864 auto [ec, n] = co_await write_(bufs);
865 total += n;
866 if(ec)
867 co_return {ec, total};
868 bp.consume(n);
869 }
870
871 co_return {{}, total};
872 136x }
873
874 template<ConstBufferSequence CB>
875 io_task<std::size_t>
876 26x any_write_sink::write_eof(CB buffers)
877 {
878 const_buffer_param<CB> bp(buffers);
879 std::size_t total = 0;
880
881 for(;;)
882 {
883 auto bufs = bp.data();
884 if(bufs.empty())
885 {
886 auto [ec] = co_await write_eof();
887 co_return {ec, total};
888 }
889
890 if(! bp.more())
891 {
892 // Last window — send atomically with EOF
893 auto [ec, n] = co_await write_eof_buffers_(bufs);
894 total += n;
895 co_return {ec, total};
896 }
897
898 auto [ec, n] = co_await write_(bufs);
899 total += n;
900 if(ec)
901 co_return {ec, total};
902 bp.consume(n);
903 }
904 52x }
905
906 } // namespace capy
907 } // namespace boost
908
909 #endif
910