From 036856ddd1e68b7ac2cb5436fdd6af595a20be89 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Thu, 28 Nov 2024 16:27:42 +0100 Subject: [PATCH] ctt - solving hazard during inconsistent state --- include/ipfixprobe/flowifc.hpp | 9 ++++--- include/ipfixprobe/packet.hpp | 6 +++-- include/ipfixprobe/storage.hpp | 3 --- storage/cache.cpp | 46 ++++++++++++++++++++++++++-------- storage/cache.hpp | 44 ++++++++++++++++++++++++-------- 5 files changed, 79 insertions(+), 29 deletions(-) diff --git a/include/ipfixprobe/flowifc.hpp b/include/ipfixprobe/flowifc.hpp index 05c77774..2f3b5acb 100644 --- a/include/ipfixprobe/flowifc.hpp +++ b/include/ipfixprobe/flowifc.hpp @@ -38,6 +38,7 @@ #include #include #include +#include #ifdef WITH_NEMEA #include @@ -265,9 +266,10 @@ struct Flow : public Record { uint64_t flow_hash; #ifdef WITH_CTT - uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ - bool ctt_valid; /**< CTT validity flag. */ - int ctt_state; /**< CTT - offload or not. */ + uint64_t flow_hash_ctt; /**< Flow hash for CTT. */ + bool record_in_ctt; /**< CTT - offload or not. */ + bool is_delayed; /**< Delayed export flag. */ + time_t delay_time; /**< Time until export of the flow is delayed. */ #endif PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check @@ -297,4 +299,5 @@ struct Flow : public Record { }; } + #endif /* IPXP_FLOWIFC_HPP */ diff --git a/include/ipfixprobe/packet.hpp b/include/ipfixprobe/packet.hpp index bd1e5aba..575c2fb9 100644 --- a/include/ipfixprobe/packet.hpp +++ b/include/ipfixprobe/packet.hpp @@ -46,8 +46,10 @@ namespace ipxp { * \brief Structure for storing parsed packet fields */ struct Packet : public Record { - Metadata_CTT cttmeta; /**< Metadata from CTT */ - bool cttmeta_valid; /**< True if CTT metadata is valid */ + #ifdef WITH_CTT + Metadata_CTT cttmeta; /**< Metadata from CTT */ + bool cttmeta_valid; /**< True if CTT metadata is valid */ + #endif /* WITH_CTT */ struct timeval ts; uint8_t dst_mac[6]; diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index d9573bbb..5296557e 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -191,10 +191,7 @@ class StoragePlugin : public Plugin { // if metadata are valid, add flow hash ctt to the flow record if (pkt.cttmeta_valid) { - rec.ctt_valid = true; rec.flow_hash_ctt = pkt.cttmeta.flow_hash; - } else { - rec.ctt_valid = false; } PluginStatusConverter plugin_status_converter(m_plugins_status); int ret = 0; diff --git a/storage/cache.cpp b/storage/cache.cpp index cbd492d4..af8171c4 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -139,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash) m_flow.src_port = pkt.src_port; m_flow.dst_port = pkt.dst_port; } + #ifdef WITH_CTT + m_flow.is_delayed = false; + m_delayed_flow_waiting = false; + #endif /* WITH_CTT */ } void FlowRecord::update(const Packet &pkt, bool src) { + if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow + auto flow_hash = m_hash; + m_delayed_flow = m_flow; + m_delayed_flow_waiting = true; + erase(); // erase the old flow, keeping the delayed flow + create(pkt, flow_hash); + return; + } m_flow.time_last = pkt.ts; if (src) { m_flow.src_packets++; @@ -260,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue) void NHTFlowCache::export_flow(size_t index) { + if (m_flow_table[index]->m_flow.is_delayed) { + return; + } + if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { + m_total_exported++; + update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); + update_flow_record_stats( + m_flow_table[index]->m_delayed_flow.src_packets + + m_flow_table[index]->m_delayed_flow.dst_packets); + ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); + } m_total_exported++; update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); update_flow_record_stats( @@ -506,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts) m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { + m_flow_table[i]->m_flow.is_delayed = false; + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + } + if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { + m_flow_table[i]->m_delayed_flow_waiting = false; + plugins_pre_export(m_flow_table[i]->m_delayed_flow); + export_flow(i); + } #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ @@ -668,18 +701,12 @@ void NHTFlowCache::prefetch_export_expired() const void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts) { try { - std::vector key = assemble_key(flow_hash_ctt); - std::vector state = assemble_state( + std::vector key = assemble_key(flow_hash_ctt); + std::vector state = assemble_state( OffloadMode::PACKET_OFFLOAD, MetaType::FULL, ts); - - std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: "; - for (auto& byte : state) { - std::cout << std::hex << static_cast(byte) << " "; - } - std::cout << std::endl; - m_commander->write_record(std::move(key), std::move(state)); + m_commander->write_record(std::move(key), std::move(state)); } catch (const std::exception& e) { throw; @@ -691,7 +718,6 @@ void CttController::export_record(uint64_t flow_hash_ctt) try { std::vector key = assemble_key(flow_hash_ctt); m_commander->export_and_delete_record(std::move(key)); - std::cout << "Exported record with key: " << flow_hash_ctt << std::endl; } catch (const std::exception& e) { throw; diff --git a/storage/cache.hpp b/storage/cache.hpp index 2abf3af8..ad8de024 100644 --- a/storage/cache.hpp +++ b/storage/cache.hpp @@ -32,6 +32,9 @@ #ifndef IPXP_STORAGE_CACHE_HPP #define IPXP_STORAGE_CACHE_HPP +#include +#include +#include #include #include @@ -49,6 +52,8 @@ #include #include #include +#include +#include #endif /* WITH_CTT */ namespace ipxp { @@ -269,6 +274,10 @@ class alignas(64) FlowRecord public: Flow m_flow; + #ifdef WITH_CTT + Flow m_delayed_flow; + bool m_delayed_flow_waiting; + #endif /* WITH_CTT */ FlowRecord(); ~FlowRecord(); @@ -322,29 +331,42 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin int plugins_post_create(Flow &rec, Packet &pkt) { int ret = StoragePlugin::plugins_post_create(rec, pkt); - rec.ctt_state = static_cast(CttController::OffloadMode::NO_OFFLOAD); - if (no_data_required(rec)) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); - } + rec.record_in_ctt = false; + //if (only_metadata_required(rec)) { + if (only_metadata_required(rec)) { + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.record_in_ctt = true; + } return ret; } // override post_update method int plugins_post_update(Flow &rec, Packet &pkt) { int ret = StoragePlugin::plugins_post_update(rec, pkt); - if (no_data_required(rec) && (rec.ctt_state == static_cast(CttController::OffloadMode::NO_OFFLOAD))) { - m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); - rec.ctt_state = static_cast(CttController::OffloadMode::PACKET_OFFLOAD); - } + //if (only_metadata_required(rec) && !rec.ctt_state) { + if (!rec.record_in_ctt) { // only for debug!!!!! line above is correct for production + m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first); + rec.record_in_ctt = true; + } return ret; } // override pre_export method void plugins_pre_export(Flow &rec) { - StoragePlugin::plugins_pre_export(rec); - m_ctt_controller.export_record(rec.flow_hash_ctt); + if (rec.record_in_ctt) { + rec.is_delayed = true; + rec.delay_time = time(nullptr) + 1; + m_ctt_controller.export_record(rec.flow_hash_ctt); + rec.record_in_ctt = false; + return; + } + if (rec.is_delayed) { + return; + } else { + StoragePlugin::plugins_pre_export(rec); + } } + #endif /* WITH_CTT */ private: