TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <boost/capy/concept/write_sink.hpp>
20 : #include <coroutine>
21 : #include <boost/capy/ex/io_env.hpp>
22 : #include <boost/capy/io_result.hpp>
23 : #include <boost/capy/io_task.hpp>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <exception>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <system_error>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any WriteSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref WriteSink concept, enabling runtime polymorphism for
42 : sink write operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper supports two construction modes:
46 : - **Owning**: Pass by value to transfer ownership. The wrapper
47 : allocates storage and owns the sink.
48 : - **Reference**: Pass a pointer to wrap without ownership. The
49 : pointed-to sink must outlive this wrapper.
50 :
51 : @par Awaitable Preallocation
52 : The constructor preallocates storage for the type-erased awaitable.
53 : This reserves all virtual address space at server startup
54 : so memory usage can be measured up front, rather than
55 : allocating piecemeal as traffic arrives.
56 :
57 : @par Immediate Completion
58 : Operations complete immediately without suspending when the
59 : buffer sequence is empty, or when the underlying sink's
60 : awaitable reports readiness via `await_ready`.
61 :
62 : @par Thread Safety
63 : Not thread-safe. Concurrent operations on the same wrapper
64 : are undefined behavior.
65 :
66 : @par Example
67 : @code
68 : // Owning - takes ownership of the sink
69 : any_write_sink ws(some_sink{args...});
70 :
71 : // Reference - wraps without ownership
72 : some_sink sink;
73 : any_write_sink ws(&sink);
74 :
75 : const_buffer buf(data, size);
76 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
77 : auto [ec2] = co_await ws.write_eof();
78 : @endcode
79 :
80 : @see any_write_stream, WriteSink
81 : */
82 : class any_write_sink
83 : {
84 : struct vtable;
85 : struct write_awaitable_ops;
86 : struct eof_awaitable_ops;
87 :
88 : template<WriteSink S>
89 : struct vtable_for_impl;
90 :
91 : void* sink_ = nullptr;
92 : vtable const* vt_ = nullptr;
93 : void* cached_awaitable_ = nullptr;
94 : void* storage_ = nullptr;
95 : write_awaitable_ops const* active_write_ops_ = nullptr;
96 : eof_awaitable_ops const* active_eof_ops_ = nullptr;
97 :
98 : public:
99 : /** Destructor.
100 :
101 : Destroys the owned sink (if any) and releases the cached
102 : awaitable storage.
103 : */
104 : ~any_write_sink();
105 :
106 : /** Construct a default instance.
107 :
108 : Constructs an empty wrapper. Operations on a default-constructed
109 : wrapper result in undefined behavior.
110 : */
111 : any_write_sink() = default;
112 :
113 : /** Non-copyable.
114 :
115 : The awaitable cache is per-instance and cannot be shared.
116 : */
117 : any_write_sink(any_write_sink const&) = delete;
118 : any_write_sink& operator=(any_write_sink const&) = delete;
119 :
120 : /** Construct by moving.
121 :
122 : Transfers ownership of the wrapped sink (if owned) and
123 : cached awaitable storage from `other`. After the move, `other` is
124 : in a default-constructed state.
125 :
126 : @param other The wrapper to move from.
127 : */
128 HIT 1 : any_write_sink(any_write_sink&& other) noexcept
129 1 : : sink_(std::exchange(other.sink_, nullptr))
130 1 : , vt_(std::exchange(other.vt_, nullptr))
131 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
132 1 : , storage_(std::exchange(other.storage_, nullptr))
133 1 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
134 1 : , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
135 : {
136 1 : }
137 :
138 : /** Assign by moving.
139 :
140 : Destroys any owned sink and releases existing resources,
141 : then transfers ownership from `other`.
142 :
143 : @param other The wrapper to move from.
144 : @return Reference to this wrapper.
145 : */
146 : any_write_sink&
147 : operator=(any_write_sink&& other) noexcept;
148 :
149 : /** Construct by taking ownership of a WriteSink.
150 :
151 : Allocates storage and moves the sink into this wrapper.
152 : The wrapper owns the sink and will destroy it.
153 :
154 : @param s The sink to take ownership of.
155 : */
156 : template<WriteSink S>
157 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
158 : any_write_sink(S s);
159 :
160 : /** Construct by wrapping a WriteSink without ownership.
161 :
162 : Wraps the given sink by pointer. The sink must remain
163 : valid for the lifetime of this wrapper.
164 :
165 : @param s Pointer to the sink to wrap.
166 : */
167 : template<WriteSink S>
168 : any_write_sink(S* s);
169 :
170 : /** Check if the wrapper contains a valid sink.
171 :
172 : @return `true` if wrapping a sink, `false` if default-constructed
173 : or moved-from.
174 : */
175 : bool
176 15 : has_value() const noexcept
177 : {
178 15 : return sink_ != nullptr;
179 : }
180 :
181 : /** Check if the wrapper contains a valid sink.
182 :
183 : @return `true` if wrapping a sink, `false` if default-constructed
184 : or moved-from.
185 : */
186 : explicit
187 2 : operator bool() const noexcept
188 : {
189 2 : return has_value();
190 : }
191 :
192 : /** Initiate a partial write operation.
193 :
194 : Writes one or more bytes from the provided buffer sequence.
195 : May consume less than the full sequence.
196 :
197 : @param buffers The buffer sequence containing data to write.
198 :
199 : @return An awaitable yielding `(error_code,std::size_t)`.
200 :
201 : @par Immediate Completion
202 : The operation completes immediately without suspending
203 : the calling coroutine when:
204 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
205 : @li The underlying sink's awaitable reports immediate
206 : readiness via `await_ready`.
207 :
208 : @note This is a partial operation and may not process the
209 : entire buffer sequence. Use @ref write for guaranteed
210 : complete transfer.
211 :
212 : @par Preconditions
213 : The wrapper must contain a valid sink (`has_value() == true`).
214 : */
215 : template<ConstBufferSequence CB>
216 : auto
217 : write_some(CB buffers);
218 :
219 : /** Initiate a complete write operation.
220 :
221 : Writes data from the provided buffer sequence. The operation
222 : completes when all bytes have been consumed, or an error
223 : occurs. Forwards to the underlying sink's `write` operation,
224 : windowed through @ref buffer_param when the sequence exceeds
225 : the per-call buffer limit.
226 :
227 : @param buffers The buffer sequence containing data to write.
228 :
229 : @return An awaitable yielding `(error_code,std::size_t)`.
230 :
231 : @par Immediate Completion
232 : The operation completes immediately without suspending
233 : the calling coroutine when:
234 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
235 : @li Every underlying `write` call completes
236 : immediately (the wrapped sink reports readiness
237 : via `await_ready` on each iteration).
238 :
239 : @par Preconditions
240 : The wrapper must contain a valid sink (`has_value() == true`).
241 : */
242 : template<ConstBufferSequence CB>
243 : io_task<std::size_t>
244 : write(CB buffers);
245 :
246 : /** Atomically write data and signal end-of-stream.
247 :
248 : Writes all data from the buffer sequence and then signals
249 : end-of-stream. The implementation decides how to partition
250 : the data across calls to the underlying sink's @ref write
251 : and `write_eof`. When the caller's buffer sequence is
252 : non-empty, the final call to the underlying sink is always
253 : `write_eof` with a non-empty buffer sequence. When the
254 : caller's buffer sequence is empty, only `write_eof()` with
255 : no data is called.
256 :
257 : @param buffers The buffer sequence containing data to write.
258 :
259 : @return An awaitable yielding `(error_code,std::size_t)`.
260 :
261 : @par Immediate Completion
262 : The operation completes immediately without suspending
263 : the calling coroutine when:
264 : @li The buffer sequence is empty. Only the @ref write_eof()
265 : call is performed.
266 : @li All underlying operations complete immediately (the
267 : wrapped sink reports readiness via `await_ready`).
268 :
269 : @par Preconditions
270 : The wrapper must contain a valid sink (`has_value() == true`).
271 : */
272 : template<ConstBufferSequence CB>
273 : io_task<std::size_t>
274 : write_eof(CB buffers);
275 :
276 : /** Signal end of data.
277 :
278 : Indicates that no more data will be written to the sink.
279 : The operation completes when the sink is finalized, or
280 : an error occurs.
281 :
282 : @return An awaitable yielding `(error_code)`.
283 :
284 : @par Immediate Completion
285 : The operation completes immediately without suspending
286 : the calling coroutine when the underlying sink's awaitable
287 : reports immediate readiness via `await_ready`.
288 :
289 : @par Preconditions
290 : The wrapper must contain a valid sink (`has_value() == true`).
291 : */
292 : auto
293 : write_eof();
294 :
295 : protected:
296 : /** Rebind to a new sink after move.
297 :
298 : Updates the internal pointer to reference a new sink object.
299 : Used by owning wrappers after move assignment when the owned
300 : object has moved to a new location.
301 :
302 : @param new_sink The new sink to bind to. Must be the same
303 : type as the original sink.
304 :
305 : @note Terminates if called with a sink of different type
306 : than the original.
307 : */
308 : template<WriteSink S>
309 : void
310 : rebind(S& new_sink) noexcept
311 : {
312 : if(vt_ != &vtable_for_impl<S>::value)
313 : std::terminate();
314 : sink_ = &new_sink;
315 : }
316 :
317 : private:
318 : auto
319 : write_some_(std::span<const_buffer const> buffers);
320 :
321 : auto
322 : write_(std::span<const_buffer const> buffers);
323 :
324 : auto
325 : write_eof_buffers_(std::span<const_buffer const> buffers);
326 : };
327 :
328 : struct any_write_sink::write_awaitable_ops
329 : {
330 : bool (*await_ready)(void*);
331 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
332 : io_result<std::size_t> (*await_resume)(void*);
333 : void (*destroy)(void*) noexcept;
334 : };
335 :
336 : struct any_write_sink::eof_awaitable_ops
337 : {
338 : bool (*await_ready)(void*);
339 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
340 : io_result<> (*await_resume)(void*);
341 : void (*destroy)(void*) noexcept;
342 : };
343 :
344 : struct any_write_sink::vtable
345 : {
346 : write_awaitable_ops const* (*construct_write_some_awaitable)(
347 : void* sink,
348 : void* storage,
349 : std::span<const_buffer const> buffers);
350 : write_awaitable_ops const* (*construct_write_awaitable)(
351 : void* sink,
352 : void* storage,
353 : std::span<const_buffer const> buffers);
354 : write_awaitable_ops const* (*construct_write_eof_buffers_awaitable)(
355 : void* sink,
356 : void* storage,
357 : std::span<const_buffer const> buffers);
358 : eof_awaitable_ops const* (*construct_eof_awaitable)(
359 : void* sink,
360 : void* storage);
361 : std::size_t awaitable_size;
362 : std::size_t awaitable_align;
363 : void (*destroy)(void*) noexcept;
364 : };
365 :
366 : template<WriteSink S>
367 : struct any_write_sink::vtable_for_impl
368 : {
369 : using WriteSomeAwaitable = decltype(std::declval<S&>().write_some(
370 : std::span<const_buffer const>{}));
371 : using WriteAwaitable = decltype(std::declval<S&>().write(
372 : std::span<const_buffer const>{}));
373 : using WriteEofBuffersAwaitable = decltype(std::declval<S&>().write_eof(
374 : std::span<const_buffer const>{}));
375 : using EofAwaitable = decltype(std::declval<S&>().write_eof());
376 :
377 : static void
378 6 : do_destroy_impl(void* sink) noexcept
379 : {
380 6 : static_cast<S*>(sink)->~S();
381 6 : }
382 :
383 : static write_awaitable_ops const*
384 40 : construct_write_some_awaitable_impl(
385 : void* sink,
386 : void* storage,
387 : std::span<const_buffer const> buffers)
388 : {
389 40 : auto& s = *static_cast<S*>(sink);
390 40 : ::new(storage) WriteSomeAwaitable(s.write_some(buffers));
391 :
392 : static constexpr write_awaitable_ops ops = {
393 40 : +[](void* p) {
394 40 : return static_cast<WriteSomeAwaitable*>(p)->await_ready();
395 : },
396 2 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
397 2 : return detail::call_await_suspend(
398 2 : static_cast<WriteSomeAwaitable*>(p), h, env);
399 : },
400 38 : +[](void* p) {
401 38 : return static_cast<WriteSomeAwaitable*>(p)->await_resume();
402 : },
403 42 : +[](void* p) noexcept {
404 2 : static_cast<WriteSomeAwaitable*>(p)->~WriteSomeAwaitable();
405 : }
406 : };
407 40 : return &ops;
408 : }
409 :
410 : static write_awaitable_ops const*
411 78 : construct_write_awaitable_impl(
412 : void* sink,
413 : void* storage,
414 : std::span<const_buffer const> buffers)
415 : {
416 78 : auto& s = *static_cast<S*>(sink);
417 78 : ::new(storage) WriteAwaitable(s.write(buffers));
418 :
419 : static constexpr write_awaitable_ops ops = {
420 78 : +[](void* p) {
421 78 : return static_cast<WriteAwaitable*>(p)->await_ready();
422 : },
423 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
424 0 : return detail::call_await_suspend(
425 0 : static_cast<WriteAwaitable*>(p), h, env);
426 : },
427 HIT 78 : +[](void* p) {
428 78 : return static_cast<WriteAwaitable*>(p)->await_resume();
429 : },
430 78 : +[](void* p) noexcept {
431 MIS 0 : static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
432 : }
433 : };
434 HIT 78 : return &ops;
435 : }
436 :
437 : static write_awaitable_ops const*
438 16 : construct_write_eof_buffers_awaitable_impl(
439 : void* sink,
440 : void* storage,
441 : std::span<const_buffer const> buffers)
442 : {
443 16 : auto& s = *static_cast<S*>(sink);
444 16 : ::new(storage) WriteEofBuffersAwaitable(s.write_eof(buffers));
445 :
446 : static constexpr write_awaitable_ops ops = {
447 16 : +[](void* p) {
448 16 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_ready();
449 : },
450 MIS 0 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
451 0 : return detail::call_await_suspend(
452 0 : static_cast<WriteEofBuffersAwaitable*>(p), h, env);
453 : },
454 HIT 16 : +[](void* p) {
455 16 : return static_cast<WriteEofBuffersAwaitable*>(p)->await_resume();
456 : },
457 16 : +[](void* p) noexcept {
458 MIS 0 : static_cast<WriteEofBuffersAwaitable*>(p)->~WriteEofBuffersAwaitable();
459 : }
460 : };
461 HIT 16 : return &ops;
462 : }
463 :
464 : static eof_awaitable_ops const*
465 17 : construct_eof_awaitable_impl(
466 : void* sink,
467 : void* storage)
468 : {
469 17 : auto& s = *static_cast<S*>(sink);
470 17 : ::new(storage) EofAwaitable(s.write_eof());
471 :
472 : static constexpr eof_awaitable_ops ops = {
473 17 : +[](void* p) {
474 17 : return static_cast<EofAwaitable*>(p)->await_ready();
475 : },
476 1 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
477 1 : return detail::call_await_suspend(
478 1 : static_cast<EofAwaitable*>(p), h, env);
479 : },
480 16 : +[](void* p) {
481 16 : return static_cast<EofAwaitable*>(p)->await_resume();
482 : },
483 18 : +[](void* p) noexcept {
484 1 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
485 : }
486 : };
487 17 : return &ops;
488 : }
489 :
490 : static constexpr std::size_t max4(
491 : std::size_t a, std::size_t b,
492 : std::size_t c, std::size_t d) noexcept
493 : {
494 : std::size_t ab = a > b ? a : b;
495 : std::size_t cd = c > d ? c : d;
496 : return ab > cd ? ab : cd;
497 : }
498 :
499 : static constexpr std::size_t max_awaitable_size =
500 : max4(sizeof(WriteSomeAwaitable),
501 : sizeof(WriteAwaitable),
502 : sizeof(WriteEofBuffersAwaitable),
503 : sizeof(EofAwaitable));
504 :
505 : static constexpr std::size_t max_awaitable_align =
506 : max4(alignof(WriteSomeAwaitable),
507 : alignof(WriteAwaitable),
508 : alignof(WriteEofBuffersAwaitable),
509 : alignof(EofAwaitable));
510 :
511 : static constexpr vtable value = {
512 : &construct_write_some_awaitable_impl,
513 : &construct_write_awaitable_impl,
514 : &construct_write_eof_buffers_awaitable_impl,
515 : &construct_eof_awaitable_impl,
516 : max_awaitable_size,
517 : max_awaitable_align,
518 : &do_destroy_impl
519 : };
520 : };
521 :
522 : inline
523 129 : any_write_sink::~any_write_sink()
524 : {
525 129 : if(storage_)
526 : {
527 6 : vt_->destroy(sink_);
528 6 : ::operator delete(storage_);
529 : }
530 129 : if(cached_awaitable_)
531 : {
532 124 : if(active_write_ops_)
533 1 : active_write_ops_->destroy(cached_awaitable_);
534 123 : else if(active_eof_ops_)
535 1 : active_eof_ops_->destroy(cached_awaitable_);
536 124 : ::operator delete(cached_awaitable_);
537 : }
538 129 : }
539 :
540 : inline any_write_sink&
541 2 : any_write_sink::operator=(any_write_sink&& other) noexcept
542 : {
543 2 : if(this != &other)
544 : {
545 2 : if(storage_)
546 : {
547 MIS 0 : vt_->destroy(sink_);
548 0 : ::operator delete(storage_);
549 : }
550 HIT 2 : if(cached_awaitable_)
551 : {
552 1 : if(active_write_ops_)
553 1 : active_write_ops_->destroy(cached_awaitable_);
554 MIS 0 : else if(active_eof_ops_)
555 0 : active_eof_ops_->destroy(cached_awaitable_);
556 HIT 1 : ::operator delete(cached_awaitable_);
557 : }
558 2 : sink_ = std::exchange(other.sink_, nullptr);
559 2 : vt_ = std::exchange(other.vt_, nullptr);
560 2 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
561 2 : storage_ = std::exchange(other.storage_, nullptr);
562 2 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
563 2 : active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
564 : }
565 2 : return *this;
566 : }
567 :
568 : template<WriteSink S>
569 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
570 6 : any_write_sink::any_write_sink(S s)
571 6 : : vt_(&vtable_for_impl<S>::value)
572 : {
573 : struct guard {
574 : any_write_sink* self;
575 : bool committed = false;
576 6 : ~guard() {
577 6 : if(!committed && self->storage_) {
578 MIS 0 : self->vt_->destroy(self->sink_);
579 0 : ::operator delete(self->storage_);
580 0 : self->storage_ = nullptr;
581 0 : self->sink_ = nullptr;
582 : }
583 HIT 6 : }
584 6 : } g{this};
585 :
586 6 : storage_ = ::operator new(sizeof(S));
587 6 : sink_ = ::new(storage_) S(std::move(s));
588 :
589 : // Preallocate the awaitable storage (sized for max of write/eof)
590 6 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
591 :
592 6 : g.committed = true;
593 6 : }
594 :
595 : template<WriteSink S>
596 119 : any_write_sink::any_write_sink(S* s)
597 119 : : sink_(s)
598 119 : , vt_(&vtable_for_impl<S>::value)
599 : {
600 : // Preallocate the awaitable storage (sized for max of write/eof)
601 119 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
602 119 : }
603 :
604 : inline auto
605 : any_write_sink::write_some_(
606 : std::span<const_buffer const> buffers)
607 : {
608 : struct awaitable
609 : {
610 : any_write_sink* self_;
611 : std::span<const_buffer const> buffers_;
612 :
613 : bool
614 : await_ready() const noexcept
615 : {
616 : return false;
617 : }
618 :
619 : std::coroutine_handle<>
620 : await_suspend(std::coroutine_handle<> h, io_env const* env)
621 : {
622 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
623 : self_->sink_,
624 : self_->cached_awaitable_,
625 : buffers_);
626 :
627 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
628 : return h;
629 :
630 : return self_->active_write_ops_->await_suspend(
631 : self_->cached_awaitable_, h, env);
632 : }
633 :
634 : io_result<std::size_t>
635 : await_resume()
636 : {
637 : struct guard {
638 : any_write_sink* self;
639 : ~guard() {
640 : self->active_write_ops_->destroy(self->cached_awaitable_);
641 : self->active_write_ops_ = nullptr;
642 : }
643 : } g{self_};
644 : return self_->active_write_ops_->await_resume(
645 : self_->cached_awaitable_);
646 : }
647 : };
648 : return awaitable{this, buffers};
649 : }
650 :
651 : inline auto
652 78 : any_write_sink::write_(
653 : std::span<const_buffer const> buffers)
654 : {
655 : struct awaitable
656 : {
657 : any_write_sink* self_;
658 : std::span<const_buffer const> buffers_;
659 :
660 : bool
661 78 : await_ready() const noexcept
662 : {
663 78 : return false;
664 : }
665 :
666 : std::coroutine_handle<>
667 78 : await_suspend(std::coroutine_handle<> h, io_env const* env)
668 : {
669 156 : self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
670 78 : self_->sink_,
671 78 : self_->cached_awaitable_,
672 : buffers_);
673 :
674 78 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
675 78 : return h;
676 :
677 MIS 0 : return self_->active_write_ops_->await_suspend(
678 0 : self_->cached_awaitable_, h, env);
679 : }
680 :
681 : io_result<std::size_t>
682 HIT 78 : await_resume()
683 : {
684 : struct guard {
685 : any_write_sink* self;
686 78 : ~guard() {
687 78 : self->active_write_ops_->destroy(self->cached_awaitable_);
688 78 : self->active_write_ops_ = nullptr;
689 78 : }
690 78 : } g{self_};
691 78 : return self_->active_write_ops_->await_resume(
692 135 : self_->cached_awaitable_);
693 78 : }
694 : };
695 78 : return awaitable{this, buffers};
696 : }
697 :
698 : inline auto
699 17 : any_write_sink::write_eof()
700 : {
701 : struct awaitable
702 : {
703 : any_write_sink* self_;
704 :
705 : bool
706 17 : await_ready() const noexcept
707 : {
708 17 : return false;
709 : }
710 :
711 : std::coroutine_handle<>
712 17 : await_suspend(std::coroutine_handle<> h, io_env const* env)
713 : {
714 : // Construct the underlying awaitable into cached storage
715 34 : self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
716 17 : self_->sink_,
717 17 : self_->cached_awaitable_);
718 :
719 : // Check if underlying is immediately ready
720 17 : if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
721 16 : return h;
722 :
723 : // Forward to underlying awaitable
724 1 : return self_->active_eof_ops_->await_suspend(
725 1 : self_->cached_awaitable_, h, env);
726 : }
727 :
728 : io_result<>
729 16 : await_resume()
730 : {
731 : struct guard {
732 : any_write_sink* self;
733 16 : ~guard() {
734 16 : self->active_eof_ops_->destroy(self->cached_awaitable_);
735 16 : self->active_eof_ops_ = nullptr;
736 16 : }
737 16 : } g{self_};
738 16 : return self_->active_eof_ops_->await_resume(
739 27 : self_->cached_awaitable_);
740 16 : }
741 : };
742 17 : return awaitable{this};
743 : }
744 :
745 : inline auto
746 16 : any_write_sink::write_eof_buffers_(
747 : std::span<const_buffer const> buffers)
748 : {
749 : struct awaitable
750 : {
751 : any_write_sink* self_;
752 : std::span<const_buffer const> buffers_;
753 :
754 : bool
755 16 : await_ready() const noexcept
756 : {
757 16 : return false;
758 : }
759 :
760 : std::coroutine_handle<>
761 16 : await_suspend(std::coroutine_handle<> h, io_env const* env)
762 : {
763 32 : self_->active_write_ops_ =
764 32 : self_->vt_->construct_write_eof_buffers_awaitable(
765 16 : self_->sink_,
766 16 : self_->cached_awaitable_,
767 : buffers_);
768 :
769 16 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
770 16 : return h;
771 :
772 MIS 0 : return self_->active_write_ops_->await_suspend(
773 0 : self_->cached_awaitable_, h, env);
774 : }
775 :
776 : io_result<std::size_t>
777 HIT 16 : await_resume()
778 : {
779 : struct guard {
780 : any_write_sink* self;
781 16 : ~guard() {
782 16 : self->active_write_ops_->destroy(self->cached_awaitable_);
783 16 : self->active_write_ops_ = nullptr;
784 16 : }
785 16 : } g{self_};
786 16 : return self_->active_write_ops_->await_resume(
787 27 : self_->cached_awaitable_);
788 16 : }
789 : };
790 16 : return awaitable{this, buffers};
791 : }
792 :
793 : template<ConstBufferSequence CB>
794 : auto
795 42 : any_write_sink::write_some(CB buffers)
796 : {
797 : struct awaitable
798 : {
799 : any_write_sink* self_;
800 : const_buffer_array<detail::max_iovec_> ba_;
801 :
802 42 : awaitable(
803 : any_write_sink* self,
804 : CB const& buffers)
805 42 : : self_(self)
806 42 : , ba_(buffers)
807 : {
808 42 : }
809 :
810 : bool
811 42 : await_ready() const noexcept
812 : {
813 42 : return ba_.to_span().empty();
814 : }
815 :
816 : std::coroutine_handle<>
817 40 : await_suspend(std::coroutine_handle<> h, io_env const* env)
818 : {
819 40 : self_->active_write_ops_ = self_->vt_->construct_write_some_awaitable(
820 40 : self_->sink_,
821 40 : self_->cached_awaitable_,
822 40 : ba_.to_span());
823 :
824 40 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
825 38 : return h;
826 :
827 2 : return self_->active_write_ops_->await_suspend(
828 2 : self_->cached_awaitable_, h, env);
829 : }
830 :
831 : io_result<std::size_t>
832 40 : await_resume()
833 : {
834 40 : if(ba_.to_span().empty())
835 2 : return {{}, 0};
836 :
837 : struct guard {
838 : any_write_sink* self;
839 38 : ~guard() {
840 38 : self->active_write_ops_->destroy(self->cached_awaitable_);
841 38 : self->active_write_ops_ = nullptr;
842 38 : }
843 38 : } g{self_};
844 38 : return self_->active_write_ops_->await_resume(
845 38 : self_->cached_awaitable_);
846 38 : }
847 : };
848 42 : return awaitable{this, buffers};
849 : }
850 :
851 : template<ConstBufferSequence CB>
852 : io_task<std::size_t>
853 68 : any_write_sink::write(CB buffers)
854 : {
855 : buffer_param<CB> bp(buffers);
856 : std::size_t total = 0;
857 :
858 : for(;;)
859 : {
860 : auto bufs = bp.data();
861 : if(bufs.empty())
862 : break;
863 :
864 : auto [ec, n] = co_await write_(bufs);
865 : total += n;
866 : if(ec)
867 : co_return {ec, total};
868 : bp.consume(n);
869 : }
870 :
871 : co_return {{}, total};
872 136 : }
873 :
874 : template<ConstBufferSequence CB>
875 : io_task<std::size_t>
876 26 : any_write_sink::write_eof(CB buffers)
877 : {
878 : const_buffer_param<CB> bp(buffers);
879 : std::size_t total = 0;
880 :
881 : for(;;)
882 : {
883 : auto bufs = bp.data();
884 : if(bufs.empty())
885 : {
886 : auto [ec] = co_await write_eof();
887 : co_return {ec, total};
888 : }
889 :
890 : if(! bp.more())
891 : {
892 : // Last window — send atomically with EOF
893 : auto [ec, n] = co_await write_eof_buffers_(bufs);
894 : total += n;
895 : co_return {ec, total};
896 : }
897 :
898 : auto [ec, n] = co_await write_(bufs);
899 : total += n;
900 : if(ec)
901 : co_return {ec, total};
902 : bp.consume(n);
903 : }
904 52 : }
905 :
906 : } // namespace capy
907 : } // namespace boost
908 :
909 : #endif
|