Skip to content

Commit

Permalink
ndp ctt controller - ctt cond compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
jaroslavpesek committed Nov 21, 2024
1 parent ca90064 commit ac1bcf5
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 292 deletions.
26 changes: 26 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,32 @@ if [[ -z "$WITH_NDP_TRUE" ]]; then
RPM_BUILDREQ+=" netcope-common-devel"
fi

AC_ARG_WITH([ctt],
AC_HELP_STRING([--with-ctt],[Compile ipfixprobe with ctt plugin for using Connection Tracking Table]),
[
if test "$withval" = "yes"; then
withctt="yes"
else
withctt="no"
fi
], [withctt="no"]
)

if test x${withctt} = xyes; then
AC_LANG_PUSH([C++])
CXXFLAGS="$CXXFLAGS -std=c++17"
AC_CHECK_HEADERS([ctt.hpp], [libctt=yes], AC_MSG_ERROR([ctt.hpp not found. Try installing libctt-devel]))
AC_LANG_POP([C++])
fi

AM_CONDITIONAL(WITH_CTT, test x${libctt} = xyes && test x${withctt} = xyes)
if [[ -z "$WITH_CTT_TRUE" ]]; then
AC_DEFINE([WITH_CTT], [1], [Define to 1 if the ctt is available])
LIBS="-lctt $LIBS"
RPM_REQUIRES+=" libctt"
RPM_BUILDREQ+=" libctt-devel"
fi

AC_ARG_WITH([pcap],
AC_HELP_STRING([--with-pcap],[Compile ipfixprobe with pcap plugin for capturing using libpcap library]),
[
Expand Down
4 changes: 4 additions & 0 deletions include/ipfixprobe/flowifc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,13 @@ struct Flow : public Record {
};

uint64_t flow_hash;

#ifdef WITH_CTT
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
bool ctt_valid; /**< CTT validity flag. */
int ctt_state; /**< CTT - offload or not. */
#endif

PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check
if the flow process plugins requires all available data, only
metadata or nothing of this. */
Expand Down
1 change: 1 addition & 0 deletions input/ndp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <netinet/in.h>
#include <sys/types.h>
#include <cstdint>
Expand Down
Empty file removed storage/cache-ctt.cpp
Empty file.
72 changes: 0 additions & 72 deletions storage/cache-ctt.hpp

This file was deleted.

68 changes: 68 additions & 0 deletions storage/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <cstdlib>
#include <iostream>
#include <cstring>
#include <ratio>
#include <sys/time.h>

#include <ipfixprobe/ring.h>
Expand Down Expand Up @@ -192,6 +193,9 @@ void NHTFlowCache::init(const char *params)
m_timeout_idx = 0;
m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1);
m_line_new_idx = m_line_size / 2;
#ifdef WITH_CTT
m_ctt_controller.init(parser.m_dev, 0);
#endif /* WITH_CTT */

if (m_export_queue == nullptr) {
throw PluginError("output queue must be set before init");
Expand Down Expand Up @@ -658,4 +662,68 @@ void NHTFlowCache::prefetch_export_expired() const
__builtin_prefetch(m_flow_table[i], 0, 1);
}
}

#ifdef WITH_CTT

void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
std::vector<std::byte> state = assemble_state(
OffloadMode::PACKET_OFFLOAD,
MetaType::FULL,
ts);

std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: ";
for (auto& byte : state) {
std::cout << std::hex << static_cast<int>(byte) << " ";
}
std::cout << std::endl;
m_commander->write_record(std::move(key), std::move(state));
}
catch (const std::exception& e) {
throw;
}
}

void CttController::export_record(uint64_t flow_hash_ctt)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
m_commander->export_and_delete_record(std::move(key));
std::cout << "Exported record with key: " << flow_hash_ctt << std::endl;
}
catch (const std::exception& e) {
throw;
}
}

std::vector<std::byte> CttController::assemble_key(uint64_t flow_hash_ctt)
{
std::vector<std::byte> key(key_size_bytes, std::byte(0));
for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) {
key[i] = static_cast<std::byte>((flow_hash_ctt >> (8 * i)) & 0xFF);
}
return key;
}

std::vector<std::byte> CttController::assemble_state(
OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts)
{
std::vector<std::byte> state(state_size_bytes, std::byte(0));
std::vector<std::byte> state_mask(state_mask_size_bytes, std::byte(0));

state[0] = static_cast<std::byte>(offload_mode);
state[1] = static_cast<std::byte>(meta_type);

// timestamp in sec/ns format, 32+32 bits - 64 bits in total
for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) {
state[2 + i] = static_cast<std::byte>((ts.tv_sec >> (8 * i)) & 0xFF);
}
for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) {
state[6 + i] = static_cast<std::byte>((ts.tv_usec >> (8 * i)) & 0xFF);
}
return state;
}
#endif // WITH_CTT
}
139 changes: 137 additions & 2 deletions storage/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,99 @@

#include "fragmentationCache/fragmentationCache.hpp"

#ifdef WITH_CTT
#include <sys/time.h>
#include <ctt_async.hpp>
#include <ctt_factory.hpp>
#include <ctt_exceptions.hpp>
#include <ctt_modes.hpp>
#include <ctt.hpp>
#endif /* WITH_CTT */

namespace ipxp {

#ifdef WITH_CTT

class CttController {
public:
enum class OffloadMode : uint8_t {
NO_OFFLOAD = 0x0,
PACKET_OFFLOAD = 0x1,
META_EXPORT = 0x2,
PACKET_OFFLOAD_WITH_EXPORT = 0x3
};
enum class MetaType : uint8_t {
FULL = 0x0,
HALF = 0x1,
TS_ONLY = 0x2,
NO_META = 0x3
};
/**
* @brief init the CTT.
*
* @param nfb_dev The NFB device file (e.g., "/dev/nfb0").
* @param ctt_comp_index The index of the CTT component.
*/
void init(const std::string& nfb_dev, unsigned ctt_comp_index) {
m_commander = std::make_unique<ctt::AsyncCommander>(ctt::NfbParams{nfb_dev, ctt_comp_index});
try {
// Get UserInfo to determine key, state, and state_mask sizes
ctt::UserInfo user_info = m_commander->get_user_info();
key_size_bytes = (user_info.key_bit_width + 7) / 8;
state_size_bytes = (user_info.state_bit_width + 7) / 8;
state_mask_size_bytes = (user_info.state_mask_bit_width + 7) / 8;

// Enable the CTT
std::future<void> enable_future = m_commander->enable(true);
enable_future.wait();
}
catch (const std::exception& e) {
throw;
}
}

/**
* @brief Command: mark a flow for offload.
*
* @param flow_hash_ctt The flow hash to be offloaded.
*/
void create_record(uint64_t flow_hash_ctt, const struct timeval& timestamp_first);

/**
* @brief Command: export a flow from the CTT.
*
* @param flow_hash_ctt The flow hash to be exported.
*/
void export_record(uint64_t flow_hash_ctt);

private:
std::unique_ptr<ctt::AsyncCommander> m_commander;
size_t key_size_bytes;
size_t state_size_bytes;
size_t state_mask_size_bytes;

/**
* @brief Assembles the state vector from the given values.
*
* @param offload_mode The offload mode.
* @param meta_type The metadata type.
* @param timestamp_first The first timestamp of the flow.
* @return A byte vector representing the assembled state vector.
*/
std::vector<std::byte> assemble_state(
OffloadMode offload_mode, MetaType meta_type,
const struct timeval& timestamp_first);

/**
* @brief Assembles the key vector from the given flow hash.
*
* @param flow_hash_ctt The flow hash.
* @return A byte vector representing the assembled key vector.
*/
std::vector<std::byte> assemble_key(uint64_t flow_hash_ctt);
};
#endif /* WITH_CTT */

struct __attribute__((packed)) flow_key_v4_t {
uint16_t src_port;
uint16_t dst_port;
Expand Down Expand Up @@ -99,6 +190,9 @@ class CacheOptParser : public OptionsParser
bool m_enable_fragmentation_cache;
std::size_t m_frag_cache_size;
time_t m_frag_cache_timeout;
#ifdef WITH_CTT
std::string m_dev;
#endif /* WITH_CTT */

CacheOptParser() : OptionsParser("cache", "Storage plugin implemented as a hash table"),
m_cache_size(1 << DEFAULT_FLOW_CACHE_SIZE), m_line_size(1 << DEFAULT_FLOW_LINE_SIZE),
Expand Down Expand Up @@ -156,6 +250,16 @@ class CacheOptParser : public OptionsParser
}
return true;
});

#ifdef WITH_CTT
register_option("d", "dev", "DEV", "Device name",
[this](const char *arg) {
m_dev = arg;
return true;
},
OptionFlags::RequiredArgument);
#endif /* WITH_CTT */

}
};

Expand Down Expand Up @@ -214,6 +318,35 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
*/
void set_telemetry_dir(std::shared_ptr<telemetry::Directory> dir) override;

#ifdef WITH_CTT

int plugins_post_create(Flow &rec, Packet &pkt) {
int ret = StoragePlugin::plugins_post_create(rec, pkt);
rec.ctt_state = static_cast<int>(CttController::OffloadMode::NO_OFFLOAD);
if (no_data_required(rec)) {
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
}
return ret;
}

// override post_update method
int plugins_post_update(Flow &rec, Packet &pkt) {
int ret = StoragePlugin::plugins_post_update(rec, pkt);
if (no_data_required(rec) && (rec.ctt_state == static_cast<int>(CttController::OffloadMode::NO_OFFLOAD))) {
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
}
return ret;
}

// override pre_export method
void plugins_pre_export(Flow &rec) {
StoragePlugin::plugins_pre_export(rec);
m_ctt_controller.export_record(rec.flow_hash_ctt);
}
#endif /* WITH_CTT */

private:
uint32_t m_cache_size;
uint32_t m_line_size;
Expand Down Expand Up @@ -242,7 +375,9 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
char m_key_inv[MAX_KEY_LENGTH];
FlowRecord **m_flow_table;
FlowRecord *m_flow_records;

#ifdef WITH_CTT
CttController m_ctt_controller;
#endif /* WITH_CTT */
FragmentationCache m_fragmentation_cache;
FlowEndReasonStats m_flow_end_reason_stats = {};
FlowRecordStats m_flow_record_stats = {};
Expand All @@ -265,4 +400,4 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin
};

}
#endif /* IPXP_STORAGE_CACHE_HPP */
#endif /* IPXP_STORAGE_CACHE_HPP */
Loading

0 comments on commit ac1bcf5

Please sign in to comment.