diff --git a/ase/queuemux.cc b/ase/queuemux.cc new file mode 100644 index 00000000..f02f29b7 --- /dev/null +++ b/ase/queuemux.cc @@ -0,0 +1,63 @@ +// This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0 +#include "queuemux.hh" +#include "randomhash.hh" +#include "internal.hh" +#include "testing.hh" + +#define QDEBUG(...) Ase::debug ("queuemux", __VA_ARGS__) + +namespace { +using namespace Ase; + +struct SomeValue { + int i; +}; + +static __attribute__ ((always_inline)) inline long +QueueMultiplexer_priority (const SomeValue &o) +{ + return o.i; +} + +TEST_INTEGRITY (queuemux_test); +static void +queuemux_test() +{ + // generate ascending (sorted) sample values + int ascending_counter = -17; + auto ascending_values = [&ascending_counter] () { + if (random_int64() & 1) + ascending_counter++; + return ascending_counter; + }; + std::vector samples; + for (size_t i = 0; i < 39; i++) + samples.push_back (ascending_values()); + + // setting: N queues contain ascending (sorted) values + constexpr size_t N = 4; + using Queue = std::vector; + std::vector queues; + queues.resize (N); + for (int v : samples) + queues[random_int64() % N].push_back (SomeValue { v }); + + // task: fetch values from all queues in sorted order + std::vector queue_ptrs; + for (Queue &queue : queues) + queue_ptrs.push_back (&queue); + QueueMultiplexer mux (queue_ptrs.size(), &queue_ptrs[0]); + int last = -2147483648; + size_t sc = 0; + while (mux.more()) + { + const SomeValue ¤t = mux.pop(); + QDEBUG ("QueueMultiplexer: %d\n", current.i); + TASSERT (current.i >= last); + last = current.i; + TASSERT (sc < samples.size() && samples[sc++] == current.i); + } + TASSERT (sc == samples.size()); +} + +} // Anon diff --git a/ase/queuemux.hh b/ase/queuemux.hh new file mode 100644 index 00000000..8918664d --- /dev/null +++ b/ase/queuemux.hh @@ -0,0 +1,88 @@ +// This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0 +#ifndef __ASE_QUEUEMUX_HH__ +#define __ASE_QUEUEMUX_HH__ + +#include + +namespace Ase { + +/// Multiplexer to pop from multiple Queues, while preserving priorities. +/// Order for values at the same priority is unstable. +/// Relies on unqualified calls to `Priority QueueMultiplexer_priority (const ValueType&)`. +template +struct QueueMultiplexer { + using Priority = decltype (QueueMultiplexer_priority (std::declval())); + struct Ptr { typename Queue::const_iterator it, end; }; + ssize_t n_queues = 0, current = -1; + Priority first = {}, next = {}; + std::array ptrs; + QueueMultiplexer (size_t nqueues, Queue **queues) + { + assign (nqueues, queues); + } + bool + assign (size_t nqueues, Queue **queues) + { + n_queues = 0; + ASE_ASSERT_RETURN (nqueues <= MAXQUEUES, false); + for (size_t i = 0; i < nqueues; i++) + if (queues[i]) [[likely]] + { + ptrs[n_queues].it = std::begin (*queues[i]); + ptrs[n_queues].end = std::end (*queues[i]); + if (ptrs[n_queues].it != ptrs[n_queues].end) + n_queues++; + } + seek(); + return more(); + } + bool + more() const + { + return n_queues > 0; + } + const ValueType& + pop () + { + ASE_ASSERT_RETURN (more(), []() -> const ValueType& { static const ValueType empty {}; return empty; }()); + const ValueType &result = *ptrs[current].it++; + if (ptrs[current].it == ptrs[current].end) [[unlikely]] + { // remove emptied queue + if (current < n_queues - 1) [[unlikely]] + ptrs[current] = ptrs[n_queues - 1]; // shuffles queue order, preserves prios + n_queues--; + seek(); + } + else if (QueueMultiplexer_priority (*ptrs[current].it) + > next) [[unlikely]] // next is in other queue + seek(); + return result; + } +private: + void + seek() + { + if (n_queues == 0) [[likely]] + return; + current = 0; // picks first queue if all contain max Priority + next = std::numeric_limits::max(); + first = next; // Priority to start with + for (ssize_t i = 0; i < n_queues; i++) + { + const Priority prio = QueueMultiplexer_priority (*ptrs[i].it); + if (prio < first) // prio comes before first + { + next = first; + first = prio; + current = i; // pick first matching Queue + } + else if (prio < next) // found next prio + next = prio; + } + // dprintf (2, "%s: n_queues=%zd current=%zd first=%ld next=%ld\n", __func__, n_queues, current, long (first), long (next)); + } +}; + +} // Ase + +#endif /* __ASE_QUEUEMUX_HH__ */