Skip to content

Commit

Permalink
Forwarder output: fix segfault when sending IPFIX Templates
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukas955 committed Mar 14, 2021
1 parent c3934bd commit adc6f2f
Showing 1 changed file with 28 additions and 28 deletions.
56 changes: 28 additions & 28 deletions src/plugins/output/forwarder/src/Forwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ struct Client;
struct Odid
{
Odid() {} // Default constructor needed because of std::map

Odid(Session &session, uint32_t odid)
: session(&session)
, odid(odid)
Expand All @@ -73,7 +73,7 @@ struct Odid
std::time_t last_templates_send_time = 0;
unsigned bytes_since_templates_sent = 0;

std::string
std::string
str();

void
Expand All @@ -93,7 +93,7 @@ struct Session
, client(client)
, ident(ident)
{}

Connection &connection;
Client &client;
std::string ident;
Expand All @@ -117,17 +117,17 @@ struct Client
std::string name;
std::map<const ipx_session *, std::unique_ptr<Session>> sessions;

std::string
std::string
str()
{
return name;
}
};

std::string
std::string
Odid::str()
{
return session->ident + "(" + std::to_string(odid) + ") -> " + session->client.name;
return session->ident + "(" + std::to_string(odid) + ") -> " + session->client.name;
}

std::string
Expand All @@ -137,37 +137,37 @@ Session::str()
}

class Forwarder
{
{
public:
Forwarder(ipx_ctx_t *log_ctx)
: log_ctx(log_ctx)
{}

void
void
set_transport_protocol(TransProto transport_protocol)
{
this->transport_protocol = transport_protocol;
}

void
void
set_forward_mode(ForwardMode forward_mode)
{
this->forward_mode = forward_mode;
}

void
void
set_connection_buffer_size(long number_of_bytes)
{
connection_manager.set_connection_buffer_size(number_of_bytes);
}

void
void
set_template_refresh_interval_secs(int number_of_seconds)
{
this->template_refresh_interval_secs = number_of_seconds;
}

void
void
set_template_refresh_interval_bytes(int number_of_bytes)
{
this->template_refresh_interval_bytes = number_of_bytes;
Expand Down Expand Up @@ -275,7 +275,7 @@ class Forwarder
client.sessions[session_info] = std::move(session_ptr);
}

void
void
close_session(Client &client, const ipx_session *session_info)
{
auto &session = *client.sessions[session_info];
Expand All @@ -295,7 +295,7 @@ class Forwarder
}
}

void
void
forward_round_robin(IPFIXMessage &message)
{
int i = 0;
Expand Down Expand Up @@ -331,7 +331,7 @@ class Forwarder
/// through the session connection and update the state accordingly
///
/// \return true if there was enough space in the connection buffer, false otherwise
bool
bool
send_templates(Session &session, Odid &odid, IPFIXMessage &message)
{
auto templates_snapshot = message.get_templates_snapshot();
Expand All @@ -342,19 +342,19 @@ class Forwarder
MessageBuilder builder;
builder.begin_message(header);

fds_tsnapshot_for(templates_snapshot,
fds_tsnapshot_for(templates_snapshot,
[](const fds_template *tmplt, void *data) -> bool {
auto &builder = *(MessageBuilder *)data;
builder.write_template(tmplt);
return true;
}, &builder);

builder.finalize_message();

auto lock = session.connection.begin_write();
if (builder.message_length() > session.connection.writeable()) {
// IPX_CTX_WARNING(log_ctx,
// "[%s] Cannot send templates because buffer is full! (need %dB, have %ldB)",
// IPX_CTX_WARNING(log_ctx,
// "[%s] Cannot send templates because buffer is full! (need %dB, have %ldB)",
// odid.str().c_str(), builder.message_length(), session.connection.writeable());
return false;
}
Expand All @@ -371,18 +371,18 @@ class Forwarder
return true;
}

bool
bool
should_refresh_templates(Odid &odid)
{
if (transport_protocol != TransProto::Udp) {
return false;
}
auto time_since = (std::time(NULL) - odid.last_templates_send_time);
return (time_since > (unsigned)template_refresh_interval_secs)
return (time_since > (unsigned)template_refresh_interval_secs)
|| (odid.bytes_since_templates_sent > (unsigned)template_refresh_interval_bytes);
}

bool
bool
templates_changed(Odid &odid, IPFIXMessage &message)
{
auto templates_snapshot = message.get_templates_snapshot();
Expand All @@ -392,7 +392,7 @@ class Forwarder
/// Forward message to the client, including templates update if needed
///
/// \return true if there was enough space in the connection buffer, false otherwise
bool
bool
forward_message(Client &client, IPFIXMessage &message)
{
auto &session = *client.sessions[message.session()];
Expand All @@ -409,17 +409,17 @@ class Forwarder
IPX_CTX_INFO(log_ctx, "[%s] Seen new ODID %u", session.str().c_str(), message.odid());
}
auto &odid = session.odids[message.odid()];

if (should_refresh_templates(odid) || templates_changed(odid, message)) {
if (!send_templates(session, odid, message)) {
if (message.get_templates_snapshot() != nullptr && !send_templates(session, odid, message)) {
return false;
}
}

auto lock = session.connection.begin_write();
if (message.length() > session.connection.writeable()) {
// IPX_CTX_WARNING(log_ctx,
// "[%s] Cannot forward message because buffer is full! (need %dB, have %ldB)",
// IPX_CTX_WARNING(log_ctx,
// "[%s] Cannot forward message because buffer is full! (need %dB, have %ldB)",
// odid.str().c_str(), message.length(), session.connection.writeable());
return false;
}
Expand Down

0 comments on commit adc6f2f

Please sign in to comment.