From ca5cb5c6c48dac0b22666cb99cc90b8b98f279b3 Mon Sep 17 00:00:00 2001 From: Tim Janik Date: Mon, 12 Apr 2021 01:49:28 +0200 Subject: [PATCH 1/2] ASE: queuemux.cc: start QueueMultiplexer implementation Signed-off-by: Tim Janik --- ase/queuemux.cc | 142 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 ase/queuemux.cc diff --git a/ase/queuemux.cc b/ase/queuemux.cc new file mode 100644 index 00000000..0fe0bdc9 --- /dev/null +++ b/ase/queuemux.cc @@ -0,0 +1,142 @@ +// This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0 + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/// Multiplexer to pop from multiple Queues, while preserving priorities. +/// Order for values at the same priority is unstable. +/// Relies on unqualified calls to `Priority QueueMultiplexer_priority (const ValueType&)`. +template +struct QueueMultiplexer { + using Priority = decltype (QueueMultiplexer_priority (std::declval())); + struct Ptr { typename Queue::const_iterator it, end; }; + ssize_t n_queues = 0, current = -1; + Priority first = {}, next = {}; + std::array ptrs; + QueueMultiplexer (size_t nqueues, Queue **queues) + { + assign (nqueues, queues); + } + bool + assign (size_t nqueues, Queue **queues) + { + assert (nqueues <= MAXQUEUES); + n_queues = 0; + for (size_t i = 0; i < nqueues; i++) + if (queues[i]) [[likely]] + { + ptrs[n_queues].it = std::begin (*queues[i]); + ptrs[n_queues].end = std::end (*queues[i]); + if (ptrs[n_queues].it != ptrs[n_queues].end) + n_queues++; + } + seek(); + return more(); + } + bool + more() const + { + return n_queues > 0; + } + const ValueType& + pop () + { + assert (more()); + const ValueType &result = *ptrs[current].it++; + if (ptrs[current].it == ptrs[current].end) [[unlikely]] + { // remove emptied queue + if (current < n_queues - 1) [[unlikely]] + ptrs[current] = ptrs[n_queues - 1]; // shuffles queue order, preserves prios + n_queues--; + seek(); + } + else if (QueueMultiplexer_priority (*ptrs[current].it) + > next) [[unlikely]] // next is in other queue + seek(); + return result; + } +private: + void + seek() + { + if (n_queues == 0) [[likely]] + return; + current = 0; // picks first queue if all contain max Priority + next = std::numeric_limits::max(); + first = next; // Priority to start with + for (ssize_t i = 0; i < n_queues; i++) + { + const Priority prio = QueueMultiplexer_priority (*ptrs[i].it); + if (prio < first) // prio comes before first + { + next = first; + first = prio; + current = i; // pick first matching Queue + } + else if (prio < next) // found next prio + next = prio; + } + dprintf (2, "%s: n_queues=%zd current=%zd first=%ld next=%ld\n", __func__, n_queues, current, long (first), long (next)); + } +}; + +struct SomeValue { + int i; +}; +static __attribute__ ((always_inline)) inline long +QueueMultiplexer_priority (const SomeValue &o) +{ + return o.i; +} + +int +main (int argc, char *argv[]) +{ + // generate ascending (sorted) sample values + struct timeval tv; + gettimeofday (&tv, NULL); + srand (tv.tv_sec ^ tv.tv_usec); + int ascending_counter = -17; + auto ascending_values = [&ascending_counter] () { + if (rand() >= RAND_MAX / 2) + ascending_counter++; + return ascending_counter; + }; + std::vector samples; + for (size_t i = 0; i < 39; i++) + samples.push_back (ascending_values()); + + // setting: N queues contain ascending (sorted) values + constexpr size_t N = 4; + using Queue = std::vector; + std::vector queues; + queues.resize (N); + for (int v : samples) + queues[rand() % N].push_back (SomeValue { v }); + + // task: fetch values from all queues in sorted order + std::vector queue_ptrs; + for (Queue &queue : queues) + queue_ptrs.push_back (&queue); + QueueMultiplexer mux (queue_ptrs.size(), &queue_ptrs[0]); + int last = -2147483648; + size_t sc = 0; + while (mux.more()) + { + const SomeValue ¤t = mux.pop(); + printf ("%d\n", current.i); + assert (current.i >= last); + last = current.i; + assert (sc < samples.size() && samples[sc++] == current.i); + } + assert (sc == samples.size()); + return 0; +} +// clang++ -std=gnu++20 -Wall -O3 -fverbose-asm queuemux.cc && ./a.out From 937109570254c71fc8b171c88feb8249012b07a9 Mon Sep 17 00:00:00 2001 From: Tim Janik Date: Mon, 13 Nov 2023 11:37:56 +0100 Subject: [PATCH 2/2] ASE: queuemux: integrate QueueMultiplexer Signed-off-by: Tim Janik --- ase/queuemux.cc | 117 ++++++++---------------------------------------- ase/queuemux.hh | 88 ++++++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+), 98 deletions(-) create mode 100644 ase/queuemux.hh diff --git a/ase/queuemux.cc b/ase/queuemux.cc index 0fe0bdc9..f02f29b7 100644 --- a/ase/queuemux.cc +++ b/ase/queuemux.cc @@ -1,111 +1,32 @@ // This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0 +#include "queuemux.hh" +#include "randomhash.hh" +#include "internal.hh" +#include "testing.hh" -#include -#include -#include -#include -#include -#include -#include -#include -#include +#define QDEBUG(...) Ase::debug ("queuemux", __VA_ARGS__) -/// Multiplexer to pop from multiple Queues, while preserving priorities. -/// Order for values at the same priority is unstable. -/// Relies on unqualified calls to `Priority QueueMultiplexer_priority (const ValueType&)`. -template -struct QueueMultiplexer { - using Priority = decltype (QueueMultiplexer_priority (std::declval())); - struct Ptr { typename Queue::const_iterator it, end; }; - ssize_t n_queues = 0, current = -1; - Priority first = {}, next = {}; - std::array ptrs; - QueueMultiplexer (size_t nqueues, Queue **queues) - { - assign (nqueues, queues); - } - bool - assign (size_t nqueues, Queue **queues) - { - assert (nqueues <= MAXQUEUES); - n_queues = 0; - for (size_t i = 0; i < nqueues; i++) - if (queues[i]) [[likely]] - { - ptrs[n_queues].it = std::begin (*queues[i]); - ptrs[n_queues].end = std::end (*queues[i]); - if (ptrs[n_queues].it != ptrs[n_queues].end) - n_queues++; - } - seek(); - return more(); - } - bool - more() const - { - return n_queues > 0; - } - const ValueType& - pop () - { - assert (more()); - const ValueType &result = *ptrs[current].it++; - if (ptrs[current].it == ptrs[current].end) [[unlikely]] - { // remove emptied queue - if (current < n_queues - 1) [[unlikely]] - ptrs[current] = ptrs[n_queues - 1]; // shuffles queue order, preserves prios - n_queues--; - seek(); - } - else if (QueueMultiplexer_priority (*ptrs[current].it) - > next) [[unlikely]] // next is in other queue - seek(); - return result; - } -private: - void - seek() - { - if (n_queues == 0) [[likely]] - return; - current = 0; // picks first queue if all contain max Priority - next = std::numeric_limits::max(); - first = next; // Priority to start with - for (ssize_t i = 0; i < n_queues; i++) - { - const Priority prio = QueueMultiplexer_priority (*ptrs[i].it); - if (prio < first) // prio comes before first - { - next = first; - first = prio; - current = i; // pick first matching Queue - } - else if (prio < next) // found next prio - next = prio; - } - dprintf (2, "%s: n_queues=%zd current=%zd first=%ld next=%ld\n", __func__, n_queues, current, long (first), long (next)); - } -}; +namespace { +using namespace Ase; struct SomeValue { int i; }; + static __attribute__ ((always_inline)) inline long QueueMultiplexer_priority (const SomeValue &o) { return o.i; } -int -main (int argc, char *argv[]) +TEST_INTEGRITY (queuemux_test); +static void +queuemux_test() { // generate ascending (sorted) sample values - struct timeval tv; - gettimeofday (&tv, NULL); - srand (tv.tv_sec ^ tv.tv_usec); int ascending_counter = -17; auto ascending_values = [&ascending_counter] () { - if (rand() >= RAND_MAX / 2) + if (random_int64() & 1) ascending_counter++; return ascending_counter; }; @@ -119,7 +40,7 @@ main (int argc, char *argv[]) std::vector queues; queues.resize (N); for (int v : samples) - queues[rand() % N].push_back (SomeValue { v }); + queues[random_int64() % N].push_back (SomeValue { v }); // task: fetch values from all queues in sorted order std::vector queue_ptrs; @@ -131,12 +52,12 @@ main (int argc, char *argv[]) while (mux.more()) { const SomeValue ¤t = mux.pop(); - printf ("%d\n", current.i); - assert (current.i >= last); + QDEBUG ("QueueMultiplexer: %d\n", current.i); + TASSERT (current.i >= last); last = current.i; - assert (sc < samples.size() && samples[sc++] == current.i); + TASSERT (sc < samples.size() && samples[sc++] == current.i); } - assert (sc == samples.size()); - return 0; + TASSERT (sc == samples.size()); } -// clang++ -std=gnu++20 -Wall -O3 -fverbose-asm queuemux.cc && ./a.out + +} // Anon diff --git a/ase/queuemux.hh b/ase/queuemux.hh new file mode 100644 index 00000000..8918664d --- /dev/null +++ b/ase/queuemux.hh @@ -0,0 +1,88 @@ +// This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0 +#ifndef __ASE_QUEUEMUX_HH__ +#define __ASE_QUEUEMUX_HH__ + +#include + +namespace Ase { + +/// Multiplexer to pop from multiple Queues, while preserving priorities. +/// Order for values at the same priority is unstable. +/// Relies on unqualified calls to `Priority QueueMultiplexer_priority (const ValueType&)`. +template +struct QueueMultiplexer { + using Priority = decltype (QueueMultiplexer_priority (std::declval())); + struct Ptr { typename Queue::const_iterator it, end; }; + ssize_t n_queues = 0, current = -1; + Priority first = {}, next = {}; + std::array ptrs; + QueueMultiplexer (size_t nqueues, Queue **queues) + { + assign (nqueues, queues); + } + bool + assign (size_t nqueues, Queue **queues) + { + n_queues = 0; + ASE_ASSERT_RETURN (nqueues <= MAXQUEUES, false); + for (size_t i = 0; i < nqueues; i++) + if (queues[i]) [[likely]] + { + ptrs[n_queues].it = std::begin (*queues[i]); + ptrs[n_queues].end = std::end (*queues[i]); + if (ptrs[n_queues].it != ptrs[n_queues].end) + n_queues++; + } + seek(); + return more(); + } + bool + more() const + { + return n_queues > 0; + } + const ValueType& + pop () + { + ASE_ASSERT_RETURN (more(), []() -> const ValueType& { static const ValueType empty {}; return empty; }()); + const ValueType &result = *ptrs[current].it++; + if (ptrs[current].it == ptrs[current].end) [[unlikely]] + { // remove emptied queue + if (current < n_queues - 1) [[unlikely]] + ptrs[current] = ptrs[n_queues - 1]; // shuffles queue order, preserves prios + n_queues--; + seek(); + } + else if (QueueMultiplexer_priority (*ptrs[current].it) + > next) [[unlikely]] // next is in other queue + seek(); + return result; + } +private: + void + seek() + { + if (n_queues == 0) [[likely]] + return; + current = 0; // picks first queue if all contain max Priority + next = std::numeric_limits::max(); + first = next; // Priority to start with + for (ssize_t i = 0; i < n_queues; i++) + { + const Priority prio = QueueMultiplexer_priority (*ptrs[i].it); + if (prio < first) // prio comes before first + { + next = first; + first = prio; + current = i; // pick first matching Queue + } + else if (prio < next) // found next prio + next = prio; + } + // dprintf (2, "%s: n_queues=%zd current=%zd first=%ld next=%ld\n", __func__, n_queues, current, long (first), long (next)); + } +}; + +} // Ase + +#endif /* __ASE_QUEUEMUX_HH__ */