Skip to content

Commit

Permalink
Merge branch 'queuemux'
Browse files Browse the repository at this point in the history
* queuemux:
  ASE: queuemux: integrate QueueMultiplexer
  ASE: queuemux.cc: start QueueMultiplexer implementation

Signed-off-by: Tim Janik <[email protected]>
  • Loading branch information
tim-janik committed Nov 13, 2023
2 parents 409e87d + 9371095 commit e1031e5
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 0 deletions.
63 changes: 63 additions & 0 deletions ase/queuemux.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0
#include "queuemux.hh"
#include "randomhash.hh"
#include "internal.hh"
#include "testing.hh"

#define QDEBUG(...) Ase::debug ("queuemux", __VA_ARGS__)

namespace {
using namespace Ase;

struct SomeValue {
int i;
};

static __attribute__ ((always_inline)) inline long
QueueMultiplexer_priority (const SomeValue &o)
{
return o.i;
}

TEST_INTEGRITY (queuemux_test);
static void
queuemux_test()
{
// generate ascending (sorted) sample values
int ascending_counter = -17;
auto ascending_values = [&ascending_counter] () {
if (random_int64() & 1)
ascending_counter++;
return ascending_counter;
};
std::vector<int> samples;
for (size_t i = 0; i < 39; i++)
samples.push_back (ascending_values());

// setting: N queues contain ascending (sorted) values
constexpr size_t N = 4;
using Queue = std::vector<SomeValue>;
std::vector<Queue> queues;
queues.resize (N);
for (int v : samples)
queues[random_int64() % N].push_back (SomeValue { v });

// task: fetch values from all queues in sorted order
std::vector<Queue*> queue_ptrs;
for (Queue &queue : queues)
queue_ptrs.push_back (&queue);
QueueMultiplexer<N, Queue> mux (queue_ptrs.size(), &queue_ptrs[0]);
int last = -2147483648;
size_t sc = 0;
while (mux.more())
{
const SomeValue &current = mux.pop();
QDEBUG ("QueueMultiplexer: %d\n", current.i);
TASSERT (current.i >= last);
last = current.i;
TASSERT (sc < samples.size() && samples[sc++] == current.i);
}
TASSERT (sc == samples.size());
}

} // Anon
88 changes: 88 additions & 0 deletions ase/queuemux.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// This Source Code Form is licensed MPL-2.0: http://mozilla.org/MPL/2.0
#ifndef __ASE_QUEUEMUX_HH__
#define __ASE_QUEUEMUX_HH__

#include <ase/cxxaux.hh>

namespace Ase {

/// Multiplexer to pop from multiple Queues, while preserving priorities.
/// Order for values at the same priority is unstable.
/// Relies on unqualified calls to `Priority QueueMultiplexer_priority (const ValueType&)`.
template<size_t MAXQUEUES, class Queue, class ValueType = typename Queue::value_type>
struct QueueMultiplexer {
using Priority = decltype (QueueMultiplexer_priority (std::declval<const ValueType&>()));
struct Ptr { typename Queue::const_iterator it, end; };
ssize_t n_queues = 0, current = -1;
Priority first = {}, next = {};
std::array<Ptr, MAXQUEUES> ptrs;
QueueMultiplexer (size_t nqueues, Queue **queues)
{
assign (nqueues, queues);
}
bool
assign (size_t nqueues, Queue **queues)
{
n_queues = 0;
ASE_ASSERT_RETURN (nqueues <= MAXQUEUES, false);
for (size_t i = 0; i < nqueues; i++)
if (queues[i]) [[likely]]
{
ptrs[n_queues].it = std::begin (*queues[i]);
ptrs[n_queues].end = std::end (*queues[i]);
if (ptrs[n_queues].it != ptrs[n_queues].end)
n_queues++;
}
seek();
return more();
}
bool
more() const
{
return n_queues > 0;
}
const ValueType&
pop ()
{
ASE_ASSERT_RETURN (more(), []() -> const ValueType& { static const ValueType empty {}; return empty; }());
const ValueType &result = *ptrs[current].it++;
if (ptrs[current].it == ptrs[current].end) [[unlikely]]
{ // remove emptied queue
if (current < n_queues - 1) [[unlikely]]
ptrs[current] = ptrs[n_queues - 1]; // shuffles queue order, preserves prios
n_queues--;
seek();
}
else if (QueueMultiplexer_priority (*ptrs[current].it)
> next) [[unlikely]] // next is in other queue
seek();
return result;
}
private:
void
seek()
{
if (n_queues == 0) [[likely]]
return;
current = 0; // picks first queue if all contain max Priority
next = std::numeric_limits<Priority>::max();
first = next; // Priority to start with
for (ssize_t i = 0; i < n_queues; i++)
{
const Priority prio = QueueMultiplexer_priority (*ptrs[i].it);
if (prio < first) // prio comes before first
{
next = first;
first = prio;
current = i; // pick first matching Queue
}
else if (prio < next) // found next prio
next = prio;
}
// dprintf (2, "%s: n_queues=%zd current=%zd first=%ld next=%ld\n", __func__, n_queues, current, long (first), long (next));
}
};

} // Ase

#endif /* __ASE_QUEUEMUX_HH__ */

0 comments on commit e1031e5

Please sign in to comment.