TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_array.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/io_env.hpp>
21 : #include <boost/capy/io_result.hpp>
22 :
23 : #include <concepts>
24 : #include <coroutine>
25 : #include <cstddef>
26 : #include <exception>
27 : #include <new>
28 : #include <span>
29 : #include <stop_token>
30 : #include <system_error>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any WriteStream.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref WriteStream concept, enabling runtime polymorphism for
40 : write operations. It uses cached awaitable storage to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the stream.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to stream must outlive this wrapper.
48 :
49 : @par Awaitable Preallocation
50 : The constructor preallocates storage for the type-erased awaitable.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Immediate Completion
56 : Operations complete immediately without suspending when the
57 : buffer sequence is empty, or when the underlying stream's
58 : awaitable reports readiness via `await_ready`.
59 :
60 : @par Thread Safety
61 : Not thread-safe. Concurrent operations on the same wrapper
62 : are undefined behavior.
63 :
64 : @par Example
65 : @code
66 : // Owning - takes ownership of the stream
67 : any_write_stream stream(socket{ioc});
68 :
69 : // Reference - wraps without ownership
70 : socket sock(ioc);
71 : any_write_stream stream(&sock);
72 :
73 : const_buffer buf(data, size);
74 : auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
75 : @endcode
76 :
77 : @see any_read_stream, any_stream, WriteStream
78 : */
79 : class any_write_stream
80 : {
81 : struct vtable;
82 :
83 : template<WriteStream S>
84 : struct vtable_for_impl;
85 :
86 : // ordered for cache line coherence
87 : void* stream_ = nullptr;
88 : vtable const* vt_ = nullptr;
89 : void* cached_awaitable_ = nullptr;
90 : void* storage_ = nullptr;
91 : bool awaitable_active_ = false;
92 :
93 : public:
94 : /** Destructor.
95 :
96 : Destroys the owned stream (if any) and releases the cached
97 : awaitable storage.
98 : */
99 : ~any_write_stream();
100 :
101 : /** Construct a default instance.
102 :
103 : Constructs an empty wrapper. Operations on a default-constructed
104 : wrapper result in undefined behavior.
105 : */
106 HIT 1 : any_write_stream() = default;
107 :
108 : /** Non-copyable.
109 :
110 : The awaitable cache is per-instance and cannot be shared.
111 : */
112 : any_write_stream(any_write_stream const&) = delete;
113 : any_write_stream& operator=(any_write_stream const&) = delete;
114 :
115 : /** Construct by moving.
116 :
117 : Transfers ownership of the wrapped stream (if owned) and
118 : cached awaitable storage from `other`. After the move, `other` is
119 : in a default-constructed state.
120 :
121 : @param other The wrapper to move from.
122 : */
123 2 : any_write_stream(any_write_stream&& other) noexcept
124 2 : : stream_(std::exchange(other.stream_, nullptr))
125 2 : , vt_(std::exchange(other.vt_, nullptr))
126 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 2 : , storage_(std::exchange(other.storage_, nullptr))
128 2 : , awaitable_active_(std::exchange(other.awaitable_active_, false))
129 : {
130 2 : }
131 :
132 : /** Assign by moving.
133 :
134 : Destroys any owned stream and releases existing resources,
135 : then transfers ownership from `other`.
136 :
137 : @param other The wrapper to move from.
138 : @return Reference to this wrapper.
139 : */
140 : any_write_stream&
141 : operator=(any_write_stream&& other) noexcept;
142 :
143 : /** Construct by taking ownership of a WriteStream.
144 :
145 : Allocates storage and moves the stream into this wrapper.
146 : The wrapper owns the stream and will destroy it.
147 :
148 : @param s The stream to take ownership of.
149 : */
150 : template<WriteStream S>
151 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
152 : any_write_stream(S s);
153 :
154 : /** Construct by wrapping a WriteStream without ownership.
155 :
156 : Wraps the given stream by pointer. The stream must remain
157 : valid for the lifetime of this wrapper.
158 :
159 : @param s Pointer to the stream to wrap.
160 : */
161 : template<WriteStream S>
162 : any_write_stream(S* s);
163 :
164 : /** Check if the wrapper contains a valid stream.
165 :
166 : @return `true` if wrapping a stream, `false` if default-constructed
167 : or moved-from.
168 : */
169 : bool
170 21 : has_value() const noexcept
171 : {
172 21 : return stream_ != nullptr;
173 : }
174 :
175 : /** Check if the wrapper contains a valid stream.
176 :
177 : @return `true` if wrapping a stream, `false` if default-constructed
178 : or moved-from.
179 : */
180 : explicit
181 3 : operator bool() const noexcept
182 : {
183 3 : return has_value();
184 : }
185 :
186 : /** Initiate an asynchronous write operation.
187 :
188 : Writes data from the provided buffer sequence. The operation
189 : completes when at least one byte has been written, or an error
190 : occurs.
191 :
192 : @param buffers The buffer sequence containing data to write.
193 : Passed by value to ensure the sequence lives in the
194 : coroutine frame across suspension points.
195 :
196 : @return An awaitable yielding `(error_code,std::size_t)`.
197 :
198 : @par Immediate Completion
199 : The operation completes immediately without suspending
200 : the calling coroutine when:
201 : @li The buffer sequence is empty, returning `{error_code{}, 0}`.
202 : @li The underlying stream's awaitable reports immediate
203 : readiness via `await_ready`.
204 :
205 : @note This is a partial operation and may not process the
206 : entire buffer sequence. Use the composed @ref write algorithm
207 : for guaranteed complete transfer.
208 :
209 : @par Preconditions
210 : The wrapper must contain a valid stream (`has_value() == true`).
211 : */
212 : template<ConstBufferSequence CB>
213 : auto
214 : write_some(CB buffers);
215 :
216 : protected:
217 : /** Rebind to a new stream after move.
218 :
219 : Updates the internal pointer to reference a new stream object.
220 : Used by owning wrappers after move assignment when the owned
221 : object has moved to a new location.
222 :
223 : @param new_stream The new stream to bind to. Must be the same
224 : type as the original stream.
225 :
226 : @note Terminates if called with a stream of different type
227 : than the original.
228 : */
229 : template<WriteStream S>
230 : void
231 : rebind(S& new_stream) noexcept
232 : {
233 : if(vt_ != &vtable_for_impl<S>::value)
234 : std::terminate();
235 : stream_ = &new_stream;
236 : }
237 : };
238 :
239 : struct any_write_stream::vtable
240 : {
241 : // ordered by call frequency for cache line coherence
242 : void (*construct_awaitable)(
243 : void* stream,
244 : void* storage,
245 : std::span<const_buffer const> buffers);
246 : bool (*await_ready)(void*);
247 : std::coroutine_handle<> (*await_suspend)(void*, std::coroutine_handle<>, io_env const*);
248 : io_result<std::size_t> (*await_resume)(void*);
249 : void (*destroy_awaitable)(void*) noexcept;
250 : std::size_t awaitable_size;
251 : std::size_t awaitable_align;
252 : void (*destroy)(void*) noexcept;
253 : };
254 :
255 : template<WriteStream S>
256 : struct any_write_stream::vtable_for_impl
257 : {
258 : using Awaitable = decltype(std::declval<S&>().write_some(
259 : std::span<const_buffer const>{}));
260 :
261 : static void
262 1 : do_destroy_impl(void* stream) noexcept
263 : {
264 1 : static_cast<S*>(stream)->~S();
265 1 : }
266 :
267 : static void
268 75 : construct_awaitable_impl(
269 : void* stream,
270 : void* storage,
271 : std::span<const_buffer const> buffers)
272 : {
273 75 : auto& s = *static_cast<S*>(stream);
274 75 : ::new(storage) Awaitable(s.write_some(buffers));
275 75 : }
276 :
277 : static constexpr vtable value = {
278 : &construct_awaitable_impl,
279 75 : +[](void* p) {
280 75 : return static_cast<Awaitable*>(p)->await_ready();
281 : },
282 2 : +[](void* p, std::coroutine_handle<> h, io_env const* env) {
283 2 : return detail::call_await_suspend(
284 2 : static_cast<Awaitable*>(p), h, env);
285 : },
286 73 : +[](void* p) {
287 73 : return static_cast<Awaitable*>(p)->await_resume();
288 : },
289 77 : +[](void* p) noexcept {
290 12 : static_cast<Awaitable*>(p)->~Awaitable();
291 : },
292 : sizeof(Awaitable),
293 : alignof(Awaitable),
294 : &do_destroy_impl
295 : };
296 : };
297 :
298 : inline
299 95 : any_write_stream::~any_write_stream()
300 : {
301 95 : if(storage_)
302 : {
303 1 : vt_->destroy(stream_);
304 1 : ::operator delete(storage_);
305 : }
306 95 : if(cached_awaitable_)
307 : {
308 85 : if(awaitable_active_)
309 1 : vt_->destroy_awaitable(cached_awaitable_);
310 85 : ::operator delete(cached_awaitable_);
311 : }
312 95 : }
313 :
314 : inline any_write_stream&
315 5 : any_write_stream::operator=(any_write_stream&& other) noexcept
316 : {
317 5 : if(this != &other)
318 : {
319 5 : if(storage_)
320 : {
321 MIS 0 : vt_->destroy(stream_);
322 0 : ::operator delete(storage_);
323 : }
324 HIT 5 : if(cached_awaitable_)
325 : {
326 2 : if(awaitable_active_)
327 1 : vt_->destroy_awaitable(cached_awaitable_);
328 2 : ::operator delete(cached_awaitable_);
329 : }
330 5 : stream_ = std::exchange(other.stream_, nullptr);
331 5 : vt_ = std::exchange(other.vt_, nullptr);
332 5 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
333 5 : storage_ = std::exchange(other.storage_, nullptr);
334 5 : awaitable_active_ = std::exchange(other.awaitable_active_, false);
335 : }
336 5 : return *this;
337 : }
338 :
339 : template<WriteStream S>
340 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
341 1 : any_write_stream::any_write_stream(S s)
342 1 : : vt_(&vtable_for_impl<S>::value)
343 : {
344 : struct guard {
345 : any_write_stream* self;
346 : bool committed = false;
347 1 : ~guard() {
348 1 : if(!committed && self->storage_) {
349 MIS 0 : self->vt_->destroy(self->stream_);
350 0 : ::operator delete(self->storage_);
351 0 : self->storage_ = nullptr;
352 0 : self->stream_ = nullptr;
353 : }
354 HIT 1 : }
355 1 : } g{this};
356 :
357 1 : storage_ = ::operator new(sizeof(S));
358 1 : stream_ = ::new(storage_) S(std::move(s));
359 :
360 : // Preallocate the awaitable storage
361 1 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
362 :
363 1 : g.committed = true;
364 1 : }
365 :
366 : template<WriteStream S>
367 86 : any_write_stream::any_write_stream(S* s)
368 86 : : stream_(s)
369 86 : , vt_(&vtable_for_impl<S>::value)
370 : {
371 : // Preallocate the awaitable storage
372 86 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
373 86 : }
374 :
375 : template<ConstBufferSequence CB>
376 : auto
377 79 : any_write_stream::write_some(CB buffers)
378 : {
379 : struct awaitable
380 : {
381 : any_write_stream* self_;
382 : const_buffer_array<detail::max_iovec_> ba_;
383 :
384 79 : awaitable(
385 : any_write_stream* self,
386 : CB const& buffers) noexcept
387 79 : : self_(self)
388 79 : , ba_(buffers)
389 : {
390 79 : }
391 :
392 : bool
393 79 : await_ready() const noexcept
394 : {
395 79 : return ba_.to_span().empty();
396 : }
397 :
398 : std::coroutine_handle<>
399 75 : await_suspend(std::coroutine_handle<> h, io_env const* env)
400 : {
401 75 : self_->vt_->construct_awaitable(
402 75 : self_->stream_,
403 75 : self_->cached_awaitable_,
404 75 : ba_.to_span());
405 75 : self_->awaitable_active_ = true;
406 :
407 75 : if(self_->vt_->await_ready(self_->cached_awaitable_))
408 73 : return h;
409 :
410 2 : return self_->vt_->await_suspend(
411 2 : self_->cached_awaitable_, h, env);
412 : }
413 :
414 : io_result<std::size_t>
415 77 : await_resume()
416 : {
417 77 : if(!self_->awaitable_active_)
418 4 : return {{}, 0};
419 : struct guard {
420 : any_write_stream* self;
421 73 : ~guard() {
422 73 : self->vt_->destroy_awaitable(self->cached_awaitable_);
423 73 : self->awaitable_active_ = false;
424 73 : }
425 73 : } g{self_};
426 73 : return self_->vt_->await_resume(
427 73 : self_->cached_awaitable_);
428 73 : }
429 : };
430 79 : return awaitable{this, buffers};
431 : }
432 :
433 : } // namespace capy
434 : } // namespace boost
435 :
436 : #endif
|