Skip to content

Commit

Permalink
Merge pull request #239 from CESNET/ctt-controller
Browse files Browse the repository at this point in the history
Merge CTT controller to CTT-Support dev branch
  • Loading branch information
jaroslavpesek authored Dec 9, 2024
2 parents 2a56c98 + 036856d commit e5b7a46
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 11 deletions.
26 changes: 26 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,32 @@ if [[ -z "$WITH_NDP_TRUE" ]]; then
RPM_BUILDREQ+=" netcope-common-devel"
fi

AC_ARG_WITH([ctt],
AC_HELP_STRING([--with-ctt],[Compile ipfixprobe with ctt plugin for using Connection Tracking Table]),
[
if test "$withval" = "yes"; then
withctt="yes"
else
withctt="no"
fi
], [withctt="no"]
)

if test x${withctt} = xyes; then
AC_LANG_PUSH([C++])
CXXFLAGS="$CXXFLAGS -std=c++17"
AC_CHECK_HEADERS([ctt.hpp], [libctt=yes], AC_MSG_ERROR([ctt.hpp not found. Try installing libctt-devel]))
AC_LANG_POP([C++])
fi

AM_CONDITIONAL(WITH_CTT, test x${libctt} = xyes && test x${withctt} = xyes)
if [[ -z "$WITH_CTT_TRUE" ]]; then
AC_DEFINE([WITH_CTT], [1], [Define to 1 if the ctt is available])
LIBS="-lctt $LIBS"
RPM_REQUIRES+=" libctt"
RPM_BUILDREQ+=" libctt-devel"
fi

AC_ARG_WITH([pcap],
AC_HELP_STRING([--with-pcap],[Compile ipfixprobe with pcap plugin for capturing using libpcap library]),
[
Expand Down
10 changes: 10 additions & 0 deletions include/ipfixprobe/flowifc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <stdint.h>
#include <stdlib.h>
#include <sys/time.h>
#include <chrono>

#ifdef WITH_NEMEA
#include <unirec/unirec.h>
Expand Down Expand Up @@ -263,6 +264,14 @@ struct Flow : public Record {
};

uint64_t flow_hash;

#ifdef WITH_CTT
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
bool record_in_ctt; /**< CTT - offload or not. */
bool is_delayed; /**< Delayed export flag. */
time_t delay_time; /**< Time until export of the flow is delayed. */
#endif

PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check
if the flow process plugins requires all available data, only
metadata or nothing of this. */
Expand Down Expand Up @@ -290,4 +299,5 @@ struct Flow : public Record {
};

}

#endif /* IPXP_FLOWIFC_HPP */
7 changes: 5 additions & 2 deletions include/ipfixprobe/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ namespace ipxp {
* \brief Structure for storing parsed packet fields
*/
struct Packet : public Record {
Metadata_CTT cttmeta; /**< Metadata from CTT */
#ifdef WITH_CTT
Metadata_CTT cttmeta; /**< Metadata from CTT */
bool cttmeta_valid; /**< True if CTT metadata is valid */
#endif /* WITH_CTT */
struct timeval ts;

uint8_t dst_mac[6];
Expand Down Expand Up @@ -108,7 +111,7 @@ struct Packet : public Record {
* \brief Constructor.
*/
Packet() :
ts({0}),
cttmeta_valid(false), ts({0}),
dst_mac(), src_mac(), ethertype(0),
ip_len(0), ip_payload_len(0), ip_version(0), ip_ttl(0),
ip_proto(0), ip_tos(0), ip_flags(0), src_ip({0}), dst_ip({0}), vlan_id(0),
Expand Down
4 changes: 4 additions & 0 deletions include/ipfixprobe/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ class StoragePlugin : public Plugin
*/
int plugins_post_create(Flow& rec, const Packet& pkt)
{
// if metadata are valid, add flow hash ctt to the flow record
if (pkt.cttmeta_valid) {
rec.flow_hash_ctt = pkt.cttmeta.flow_hash;
}
PluginStatusConverter plugin_status_converter(m_plugins_status);
int ret = 0;
for (unsigned int i = 0; i < m_plugin_cnt; i++) {
Expand Down
6 changes: 5 additions & 1 deletion input/ndp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <netinet/in.h>
#include <sys/types.h>
#include <cstdint>
Expand Down Expand Up @@ -170,7 +171,10 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock &packets)
m_stats.bad_metadata++;
parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
} else {
parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
if (parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length) == -1) {
m_stats.bad_metadata++;
parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
}
}
} else {
parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
Expand Down
21 changes: 16 additions & 5 deletions input/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "parser.hpp"
#include "headers.hpp"
#include <ipfixprobe/cttmeta.hpp>
#include <ipfixprobe/packet.hpp>

namespace ipxp {
Expand Down Expand Up @@ -776,12 +777,21 @@ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, cons
opt->pblock->bytes += len;
}

void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen)
int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen)
{
if (opt->pblock->cnt >= opt->pblock->size) {
return;
return 0;
}
Packet *pkt = &opt->pblock->pkts[opt->pblock->cnt];

// check metadata validity
if (metadata.parser_status == PA_OK) {
pkt->cttmeta_valid = true;
} else {
pkt->cttmeta_valid = false;
return -1;
}

pkt->cttmeta = metadata;

pkt->packet_len_wire = len;
Expand Down Expand Up @@ -831,7 +841,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta
stats.pppoe_packets++;
} else { // if not previous, we try delegate to original parser
parse_packet(opt, stats, metadata.ts, data, len, caplen);
return;
return 0;
}

// L4
Expand All @@ -843,11 +853,11 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta
stats.udp_packets++;
} else { // if not previous, we try delegate to original parser
parse_packet(opt, stats, metadata.ts, data, len, caplen);
return;
return 0;
}
} catch (const char *err) {
DEBUG_MSG("%s\n", err);
return;
return 0;
}

if (pkt->vlan_id) {
Expand Down Expand Up @@ -880,6 +890,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta
opt->packet_valid = true;
opt->pblock->cnt++;
opt->pblock->bytes += len;
return 0;
}

}
2 changes: 1 addition & 1 deletion input/parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ typedef struct parser_opt_s {
*/
void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, const uint8_t *data, uint16_t len, uint16_t caplen);

void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen);
int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen);

}
#endif /* IPXP_INPUT_PARSER_HPP */
94 changes: 94 additions & 0 deletions storage/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <cstdlib>
#include <iostream>
#include <cstring>
#include <ratio>
#include <sys/time.h>

#include <ipfixprobe/ring.h>
Expand Down Expand Up @@ -138,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash)
m_flow.src_port = pkt.src_port;
m_flow.dst_port = pkt.dst_port;
}
#ifdef WITH_CTT
m_flow.is_delayed = false;
m_delayed_flow_waiting = false;
#endif /* WITH_CTT */
}

void FlowRecord::update(const Packet &pkt, bool src)
{
if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow
auto flow_hash = m_hash;
m_delayed_flow = m_flow;
m_delayed_flow_waiting = true;
erase(); // erase the old flow, keeping the delayed flow
create(pkt, flow_hash);
return;
}
m_flow.time_last = pkt.ts;
if (src) {
m_flow.src_packets++;
Expand Down Expand Up @@ -192,6 +205,9 @@ void NHTFlowCache::init(const char *params)
m_timeout_idx = 0;
m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1);
m_line_new_idx = m_line_size / 2;
#ifdef WITH_CTT
m_ctt_controller.init(parser.m_dev, 0);
#endif /* WITH_CTT */

if (m_export_queue == nullptr) {
throw PluginError("output queue must be set before init");
Expand Down Expand Up @@ -256,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue)

void NHTFlowCache::export_flow(size_t index)
{
if (m_flow_table[index]->m_flow.is_delayed) {
return;
}
if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) {
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason);
update_flow_record_stats(
m_flow_table[index]->m_delayed_flow.src_packets
+ m_flow_table[index]->m_delayed_flow.dst_packets);
ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow);
}
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason);
update_flow_record_stats(
Expand Down Expand Up @@ -502,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts)
m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow);
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) {
m_flow_table[i]->m_flow.is_delayed = false;
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
}
if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) {
m_flow_table[i]->m_delayed_flow_waiting = false;
plugins_pre_export(m_flow_table[i]->m_delayed_flow);
export_flow(i);
}
#ifdef FLOW_CACHE_STATS
m_expired++;
#endif /* FLOW_CACHE_STATS */
Expand Down Expand Up @@ -658,4 +695,61 @@ void NHTFlowCache::prefetch_export_expired() const
__builtin_prefetch(m_flow_table[i], 0, 1);
}
}

#ifdef WITH_CTT

void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
std::vector<std::byte> state = assemble_state(
OffloadMode::PACKET_OFFLOAD,
MetaType::FULL,
ts);
m_commander->write_record(std::move(key), std::move(state));
}
catch (const std::exception& e) {
throw;
}
}

void CttController::export_record(uint64_t flow_hash_ctt)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
m_commander->export_and_delete_record(std::move(key));
}
catch (const std::exception& e) {
throw;
}
}

std::vector<std::byte> CttController::assemble_key(uint64_t flow_hash_ctt)
{
std::vector<std::byte> key(key_size_bytes, std::byte(0));
for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) {
key[i] = static_cast<std::byte>((flow_hash_ctt >> (8 * i)) & 0xFF);
}
return key;
}

std::vector<std::byte> CttController::assemble_state(
OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts)
{
std::vector<std::byte> state(state_size_bytes, std::byte(0));
std::vector<std::byte> state_mask(state_mask_size_bytes, std::byte(0));

state[0] = static_cast<std::byte>(offload_mode);
state[1] = static_cast<std::byte>(meta_type);

// timestamp in sec/ns format, 32+32 bits - 64 bits in total
for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) {
state[2 + i] = static_cast<std::byte>((ts.tv_sec >> (8 * i)) & 0xFF);
}
for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) {
state[6 + i] = static_cast<std::byte>((ts.tv_usec >> (8 * i)) & 0xFF);
}
return state;
}
#endif // WITH_CTT
}
Loading

0 comments on commit e5b7a46

Please sign in to comment.