30namespace seqan3::contrib
34enum class queue_op_status : uint8_t
42enum struct buffer_queue_policy : uint8_t
84template <std::semiregular value_t,
86 buffer_queue_policy buffer_policy = buffer_queue_policy::dynamic>
90 using buffer_type = buffer_t;
91 using value_type =
typename buffer_type::value_type;
92 using size_type =
typename buffer_type::size_type;
93 using reference = void;
94 using const_reference = void;
97 buffer_queue() : buffer_queue{0u}
99 buffer_queue(buffer_queue
const &) =
delete;
100 buffer_queue(buffer_queue &&) =
delete;
101 buffer_queue & operator=(buffer_queue
const &) =
delete;
102 buffer_queue & operator=(buffer_queue &&) =
delete;
103 ~buffer_queue() =
default;
106 explicit buffer_queue(size_type
const init_capacity)
108 buffer.resize(init_capacity + 1);
112 template <std::ranges::input_range range_type>
113 requires std::convertible_to<std::ranges::range_value_t<range_type>, value_type>
114 buffer_queue(size_type
const init_capacity, range_type && r) : buffer_queue{init_capacity}
122 template <
typename value2_t>
123 requires std::convertible_to<value2_t, value_t>
124 void push(value2_t && value)
126 detail::spin_delay delay{};
130 auto status = try_push(std::forward<value2_t>(value));
131 if (status == queue_op_status::closed)
132 throw queue_op_status::closed;
133 else if (status == queue_op_status::success)
136 assert(status != queue_op_status::empty);
137 assert(status == queue_op_status::full);
142 template <
typename value2_t>
143 requires std::convertible_to<value2_t, value_t>
144 queue_op_status wait_push(value2_t && value)
146 detail::spin_delay delay{};
150 auto status = try_push(std::forward<value2_t>(value));
152 if (status != queue_op_status::full)
155 assert(status != queue_op_status::empty);
156 assert(status == queue_op_status::full);
161 value_type value_pop()
163 detail::spin_delay delay{};
168 if (!writer_waiting.load())
170 auto status = try_pop(value);
172 if (status == queue_op_status::closed)
173 throw queue_op_status::closed;
174 else if (status == queue_op_status::success)
177 assert(status != queue_op_status::full);
178 assert(status == queue_op_status::empty);
184 queue_op_status wait_pop(value_type & value)
186 detail::spin_delay delay{};
191 if (!writer_waiting.load())
195 if (status == queue_op_status::closed || status == queue_op_status::success)
198 assert(status != queue_op_status::full);
199 assert(status == queue_op_status::empty);
210 template <
typename value2_t>
211 requires std::convertible_to<value2_t, value_t>
212 queue_op_status try_push(value2_t &&);
214 queue_op_status try_pop(value_t &);
222 if (writer_waiting.exchange(
true))
229 writer_waiting.store(
false);
233 writer_waiting.store(
false);
238 bool is_closed() const noexcept
246 return pop_front_position == push_back_position;
249 bool is_full() const noexcept
252 return is_ring_buffer_exhausted(pop_front_position, push_back_position);
255 size_type
size() const noexcept
258 if (to_buffer_position(pop_front_position) <= to_buffer_position(push_back_position))
260 return to_buffer_position(push_back_position) - to_buffer_position(pop_front_position);
264 assert(buffer.size() > (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position)));
265 return buffer.size() - (to_buffer_position(pop_front_position) - to_buffer_position(push_back_position));
280 constexpr bool is_ring_buffer_exhausted(size_type
const from, size_type
const to)
const
282 assert(to <= (from + ring_buffer_capacity + 1));
284 return to >= from + ring_buffer_capacity;
300 constexpr size_type to_buffer_position(size_type
const position)
const
302 return position & (ring_buffer_capacity - 1);
324 size_type cyclic_increment(size_type position)
331 if (to_buffer_position(++position) >= buffer.size())
332 position += ring_buffer_capacity - buffer.size();
336 template <
typename value2_t>
337 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::fixed)
bool
338 overflow(value2_t &&)
343 template <
typename value2_t>
344 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
bool
345 overflow(value2_t && value);
360template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
361using fixed_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::fixed>;
364template <std::semiregular value_t, sequence_container buffer_t = std::vector<value_t>>
365using dynamic_buffer_queue = buffer_queue<value_t, buffer_t, buffer_queue_policy::dynamic>;
375template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
376template <
typename value2_t>
377 requires (std::convertible_to<value2_t, value_t>) && (buffer_policy == buffer_queue_policy::dynamic)
378inline bool buffer_queue<value_t, buffer_t, buffer_policy>::overflow(value2_t && value)
383 size_type old_size = buffer.size();
384 size_type ring_buffer_capacity = this->ring_buffer_capacity;
385 size_type local_front = this->pop_front_position;
386 size_type local_back = this->push_back_position;
389 assert(local_back == this->pending_push_back_position);
390 assert(local_front == this->pending_pop_front_position);
392 bool valueWasAppended =
false;
396 if (is_ring_buffer_exhausted(local_front, cyclic_increment(local_back)))
404 *it = std::forward<value2_t>(value);
405 local_back = local_front + ring_buffer_capacity;
406 valueWasAppended =
true;
409 assert(is_ring_buffer_exhausted(local_front, local_back));
412 size_type front_buffer_position = to_buffer_position(local_front);
413 size_type back_buffer_position = to_buffer_position(local_back);
416 buffer.resize(old_size + 1);
419 buffer.data() + buffer.size());
424 this->pending_pop_front_position = this->pop_front_position = front_buffer_position + 1;
425 this->pending_push_back_position = this->push_back_position = back_buffer_position + ring_buffer_capacity;
427 this->ring_buffer_capacity = ring_buffer_capacity;
429 return valueWasAppended;
452template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
453inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_pop(value_t & result)
458 size_type local_pending_pop_front_position{};
459 size_type next_local_pop_front_position{};
460 detail::spin_delay spinDelay{};
462 local_pending_pop_front_position = this->pending_pop_front_position;
466 size_type local_push_back_position = this->push_back_position;
468 assert(local_pending_pop_front_position <= local_push_back_position);
471 if (local_pending_pop_front_position == local_push_back_position)
473 return is_closed() ? queue_op_status::closed : queue_op_status::empty;
477 next_local_pop_front_position = cyclic_increment(local_pending_pop_front_position);
480 if (this->pending_pop_front_position.compare_exchange_weak(local_pending_pop_front_position,
481 next_local_pop_front_position))
488 result = std::ranges::iter_move(buffer.begin() + to_buffer_position(local_pending_pop_front_position));
492 detail::spin_delay delay{};
493 size_type acquired_slot = local_pending_pop_front_position;
494 while (!this->pop_front_position.compare_exchange_weak(acquired_slot, next_local_pop_front_position))
496 acquired_slot = local_pending_pop_front_position;
501 return queue_op_status::success;
527template <std::semiregular value_t, sequence_container buffer_t, buffer_queue_policy buffer_policy>
528template <
typename value2_t>
529 requires std::convertible_to<value2_t, value_t>
530inline queue_op_status buffer_queue<value_t, buffer_t, buffer_policy>::try_push(value2_t && value)
534 detail::spin_delay delay{};
539 return queue_op_status::closed;
542 size_type local_pending_push_back_position = this->pending_push_back_position;
547 size_type next_local_push_back_position = cyclic_increment(local_pending_push_back_position);
548 size_type local_pop_front_position = this->pop_front_position;
552 if (is_ring_buffer_exhausted(local_pop_front_position, next_local_push_back_position))
557 if (this->pending_push_back_position.compare_exchange_weak(local_pending_push_back_position,
558 next_local_push_back_position))
562 auto it =
std::ranges::begin(buffer) + to_buffer_position(local_pending_push_back_position);
563 *it = std::forward<value2_t>(value);
568 detail::spin_delay delay{};
570 size_type acquired_slot = local_pending_push_back_position;
572 !this->push_back_position.compare_exchange_weak(acquired_slot, next_local_push_back_position))
574 acquired_slot = local_pending_push_back_position;
578 return queue_op_status::success;
586 if (overflow(std::forward<value2_t>(value)))
588 return queue_op_status::success;
592 return queue_op_status::full;
Provides various transformation traits used by the range module.
T current_exception(T... args)
constexpr std::size_t hardware_destructive_interference_size
Minimum offset between two objects to avoid false sharing.
Definition new:54
seqan::stl::ranges::to to
Converts a range to a container. <dl class="no-api">This entity is not part of the SeqAn API....
Definition to.hpp:23
constexpr size_t size
The size of a type pack.
Definition type_pack/traits.hpp:143
A more refined container concept than seqan3::container.
T move_backward(T... args)
The <new> header from C++17's standard library.
T rethrow_exception(T... args)
Provides seqan3::detail::spin_delay.
Adaptations of concepts from the standard library.