Skip to content

Commit

Permalink
Merge pull request #1904 from peternewman/e1.31
Browse files Browse the repository at this point in the history
Do some minor formatting tweaks
  • Loading branch information
peternewman authored Oct 1, 2023
2 parents b22f49f + c976c2d commit 486cae1
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 70 deletions.
70 changes: 41 additions & 29 deletions libs/acn/DMPE131Inflator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,
return true;
}

if (universe_iter == m_handlers.end())
if (universe_iter == m_handlers.end()) {
return true;
}

DMPHeader dmp_header = headers.GetDMPHeader();

Expand Down Expand Up @@ -104,16 +105,17 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,

if (address->Increment() != 1) {
OLA_INFO << "E1.31 DMP packet with increment " << address->Increment()
<< ", disarding";
<< ", disarding";
return true;
}

unsigned int length_remaining = pdu_len - available_length;
int start_code = -1;
if (e131_header.UsingRev2())
if (e131_header.UsingRev2()) {
start_code = static_cast<int>(address->Start());
else if (length_remaining && address->Number())
} else if (length_remaining && address->Number()) {
start_code = *(data + available_length);
}

// The only time we want to continue processing a non-0 start code is if it
// contains a Terminate message.
Expand All @@ -132,14 +134,16 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,
// Reaching here means that we actually have new data and we should merge.
if (target_buffer && start_code == 0) {
unsigned int channels = std::min(length_remaining, address->Number());
if (e131_header.UsingRev2())
if (e131_header.UsingRev2()) {
target_buffer->Set(data + available_length, channels);
else
target_buffer->Set(data + available_length + 1, channels - 1);
} else {
target_buffer->Set(data + available_length + 1, channels - 1);
}
}

if (universe_iter->second.priority)
if (universe_iter->second.priority) {
*universe_iter->second.priority = universe_iter->second.active_priority;
}

// merge the sources
switch (universe_iter->second.sources.size()) {
Expand All @@ -155,9 +159,11 @@ bool DMPE131Inflator::HandlePDUData(uint32_t vector,
// HTP Merge
universe_iter->second.buffer->Reset();
std::vector<dmx_source>::const_iterator source_iter =
universe_iter->second.sources.begin();
for (; source_iter != universe_iter->second.sources.end(); ++source_iter)
universe_iter->second.sources.begin();
for (; source_iter != universe_iter->second.sources.end();
++source_iter) {
universe_iter->second.buffer->HTPMerge(source_iter->buffer);
}
universe_iter->second.closure->Run();
}
return true;
Expand All @@ -175,8 +181,9 @@ bool DMPE131Inflator::SetHandler(uint16_t universe,
ola::DmxBuffer *buffer,
uint8_t *priority,
ola::Callback0<void> *closure) {
if (!closure || !buffer)
if (!closure || !buffer) {
return false;
}

UniverseHandlers::iterator iter = m_handlers.find(universe);

Expand Down Expand Up @@ -265,38 +272,41 @@ bool DMPE131Inflator::TrackSourceIfRequired(
iter++;
}

if (sources.empty())
if (sources.empty()) {
universe_data->active_priority = 0;
}

for (iter = sources.begin(); iter != sources.end(); ++iter) {
if (iter->cid == headers.GetRootHeader().GetCid())
if (iter->cid == headers.GetRootHeader().GetCid()) {
break;
}
}

if (iter == sources.end()) {
// This is an untracked source
if (e131_header.StreamTerminated() ||
priority < universe_data->active_priority)
priority < universe_data->active_priority) {
return false;
}

if (priority > universe_data->active_priority) {
OLA_INFO << "Raising priority for universe " <<
e131_header.Universe() << " from " <<
static_cast<int>(universe_data->active_priority) << " to " <<
static_cast<int>(priority);
OLA_INFO << "Raising priority for universe " << e131_header.Universe()
<< " from " << static_cast<int>(universe_data->active_priority)
<< " to " << static_cast<int>(priority);
sources.clear();
universe_data->active_priority = priority;
}

if (sources.size() == MAX_MERGE_SOURCES) {
// TODO(simon): flag this in the export map
OLA_WARN << "Max merge sources reached for universe " <<
e131_header.Universe() << ", " <<
headers.GetRootHeader().GetCid().ToString() << " won't be tracked";
OLA_WARN << "Max merge sources reached for universe "
<< e131_header.Universe() << ", "
<< headers.GetRootHeader().GetCid().ToString()
<< " won't be tracked";
return false;
} else {
OLA_INFO << "Added new E1.31 source: " <<
headers.GetRootHeader().GetCid().ToString();
OLA_INFO << "Added new E1.31 source: "
<< headers.GetRootHeader().GetCid().ToString();
dmx_source new_source;
new_source.cid = headers.GetRootHeader().GetCid();
new_source.sequence = e131_header.Sequence();
Expand All @@ -311,19 +321,21 @@ bool DMPE131Inflator::TrackSourceIfRequired(
int8_t seq_diff = static_cast<int8_t>(e131_header.Sequence() -
iter->sequence);
if (seq_diff <= 0 && seq_diff > SEQUENCE_DIFF_THRESHOLD) {
OLA_INFO << "Old packet received, ignoring, this # " <<
static_cast<int>(e131_header.Sequence()) << ", last " <<
static_cast<int>(iter->sequence);
OLA_INFO << "Old packet received, ignoring, this # "
<< static_cast<int>(e131_header.Sequence()) << ", last "
<< static_cast<int>(iter->sequence);
return false;
}
iter->sequence = e131_header.Sequence();

if (e131_header.StreamTerminated()) {
OLA_INFO << "CID " << headers.GetRootHeader().GetCid().ToString() <<
" sent a termination for universe " << e131_header.Universe();
OLA_INFO << "CID " << headers.GetRootHeader().GetCid().ToString()
<< " sent a termination for universe "
<< e131_header.Universe();
sources.erase(iter);
if (sources.empty())
if (sources.empty()) {
universe_data->active_priority = 0;
}
// We need to trigger a merge here else the buffer will be stale, we keep
// the buffer as NULL though so we don't use the data.
return true;
Expand Down
82 changes: 41 additions & 41 deletions libs/acn/DMPE131Inflator.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,58 +36,58 @@ class DMPE131Inflator: public DMPInflator {
friend class DMPE131InflatorTest;

public:
explicit DMPE131Inflator(bool ignore_preview):
DMPInflator(),
m_ignore_preview(ignore_preview) {
}
~DMPE131Inflator();
explicit DMPE131Inflator(bool ignore_preview):
DMPInflator(),
m_ignore_preview(ignore_preview) {
}
~DMPE131Inflator();

bool SetHandler(uint16_t universe, ola::DmxBuffer *buffer,
uint8_t *priority, ola::Callback0<void> *handler);
bool RemoveHandler(uint16_t universe);
bool SetHandler(uint16_t universe, ola::DmxBuffer *buffer,
uint8_t *priority, ola::Callback0<void> *handler);
bool RemoveHandler(uint16_t universe);

void RegisteredUniverses(std::vector<uint16_t> *universes);
void RegisteredUniverses(std::vector<uint16_t> *universes);

protected:
virtual bool HandlePDUData(uint32_t vector,
const HeaderSet &headers,
const uint8_t *data,
unsigned int pdu_len);
virtual bool HandlePDUData(uint32_t vector,
const HeaderSet &headers,
const uint8_t *data,
unsigned int pdu_len);

private:
typedef struct {
ola::acn::CID cid;
uint8_t sequence;
TimeStamp last_heard_from;
DmxBuffer buffer;
} dmx_source;
typedef struct {
ola::acn::CID cid;
uint8_t sequence;
TimeStamp last_heard_from;
DmxBuffer buffer;
} dmx_source;

typedef struct {
DmxBuffer *buffer;
Callback0<void> *closure;
uint8_t active_priority;
uint8_t *priority;
std::vector<dmx_source> sources;
} universe_handler;
typedef struct {
DmxBuffer *buffer;
Callback0<void> *closure;
uint8_t active_priority;
uint8_t *priority;
std::vector<dmx_source> sources;
} universe_handler;

typedef std::map<uint16_t, universe_handler> UniverseHandlers;
typedef std::map<uint16_t, universe_handler> UniverseHandlers;

UniverseHandlers m_handlers;
bool m_ignore_preview;
ola::Clock m_clock;
UniverseHandlers m_handlers;
bool m_ignore_preview;
ola::Clock m_clock;

bool TrackSourceIfRequired(universe_handler *universe_data,
const HeaderSet &headers,
DmxBuffer **buffer);
bool TrackSourceIfRequired(universe_handler *universe_data,
const HeaderSet &headers,
DmxBuffer **buffer);

// The max number of sources we'll track per universe.
static const uint8_t MAX_MERGE_SOURCES = 6;
// The max merge priority.
static const uint8_t MAX_E131_PRIORITY = 200;
// ignore packets that differ by less than this amount from the last one
static const int8_t SEQUENCE_DIFF_THRESHOLD = -20;
// expire sources after 2.5s
static const TimeInterval EXPIRY_INTERVAL;
// The max number of sources we'll track per universe.
static const uint8_t MAX_MERGE_SOURCES = 6;
// The max merge priority.
static const uint8_t MAX_E131_PRIORITY = 200;
// ignore packets that differ by less than this amount from the last one
static const int8_t SEQUENCE_DIFF_THRESHOLD = -20;
// expire sources after 2.5s
static const TimeInterval EXPIRY_INTERVAL;
};
} // namespace acn
} // namespace ola
Expand Down

0 comments on commit 486cae1

Please sign in to comment.