Skip to content

Commit

Permalink
ctt - solving hazard during inconsistent state
Browse files Browse the repository at this point in the history
  • Loading branch information
jaroslavpesek committed Nov 28, 2024
1 parent ac1bcf5 commit 036856d
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 29 deletions.
9 changes: 6 additions & 3 deletions include/ipfixprobe/flowifc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <stdint.h>
#include <stdlib.h>
#include <sys/time.h>
#include <chrono>

#ifdef WITH_NEMEA
#include <unirec/unirec.h>
Expand Down Expand Up @@ -265,9 +266,10 @@ struct Flow : public Record {
uint64_t flow_hash;

#ifdef WITH_CTT
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
bool ctt_valid; /**< CTT validity flag. */
int ctt_state; /**< CTT - offload or not. */
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
bool record_in_ctt; /**< CTT - offload or not. */
bool is_delayed; /**< Delayed export flag. */
time_t delay_time; /**< Time until export of the flow is delayed. */
#endif

PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check
Expand Down Expand Up @@ -297,4 +299,5 @@ struct Flow : public Record {
};

}

#endif /* IPXP_FLOWIFC_HPP */
6 changes: 4 additions & 2 deletions include/ipfixprobe/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ namespace ipxp {
* \brief Structure for storing parsed packet fields
*/
struct Packet : public Record {
Metadata_CTT cttmeta; /**< Metadata from CTT */
bool cttmeta_valid; /**< True if CTT metadata is valid */
#ifdef WITH_CTT
Metadata_CTT cttmeta; /**< Metadata from CTT */
bool cttmeta_valid; /**< True if CTT metadata is valid */
#endif /* WITH_CTT */
struct timeval ts;

uint8_t dst_mac[6];
Expand Down
3 changes: 0 additions & 3 deletions include/ipfixprobe/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,7 @@ class StoragePlugin : public Plugin
{
// if metadata are valid, add flow hash ctt to the flow record
if (pkt.cttmeta_valid) {
rec.ctt_valid = true;
rec.flow_hash_ctt = pkt.cttmeta.flow_hash;
} else {
rec.ctt_valid = false;
}
PluginStatusConverter plugin_status_converter(m_plugins_status);
int ret = 0;
Expand Down
46 changes: 36 additions & 10 deletions storage/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash)
m_flow.src_port = pkt.src_port;
m_flow.dst_port = pkt.dst_port;
}
#ifdef WITH_CTT
m_flow.is_delayed = false;
m_delayed_flow_waiting = false;
#endif /* WITH_CTT */
}

void FlowRecord::update(const Packet &pkt, bool src)
{
if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow
auto flow_hash = m_hash;
m_delayed_flow = m_flow;
m_delayed_flow_waiting = true;
erase(); // erase the old flow, keeping the delayed flow
create(pkt, flow_hash);
return;
}
m_flow.time_last = pkt.ts;
if (src) {
m_flow.src_packets++;
Expand Down Expand Up @@ -260,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue)

void NHTFlowCache::export_flow(size_t index)
{
if (m_flow_table[index]->m_flow.is_delayed) {
return;
}
if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) {
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason);
update_flow_record_stats(
m_flow_table[index]->m_delayed_flow.src_packets
+ m_flow_table[index]->m_delayed_flow.dst_packets);
ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow);
}
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason);
update_flow_record_stats(
Expand Down Expand Up @@ -506,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts)
m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow);
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) {
m_flow_table[i]->m_flow.is_delayed = false;
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
}
if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) {
m_flow_table[i]->m_delayed_flow_waiting = false;
plugins_pre_export(m_flow_table[i]->m_delayed_flow);
export_flow(i);
}
#ifdef FLOW_CACHE_STATS
m_expired++;
#endif /* FLOW_CACHE_STATS */
Expand Down Expand Up @@ -668,18 +701,12 @@ void NHTFlowCache::prefetch_export_expired() const
void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
std::vector<std::byte> state = assemble_state(
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
std::vector<std::byte> state = assemble_state(
OffloadMode::PACKET_OFFLOAD,
MetaType::FULL,
ts);

std::cout << "Created record\n\tkey: " << flow_hash_ctt << "\n\tstate: ";
for (auto& byte : state) {
std::cout << std::hex << static_cast<int>(byte) << " ";
}
std::cout << std::endl;
m_commander->write_record(std::move(key), std::move(state));
m_commander->write_record(std::move(key), std::move(state));
}
catch (const std::exception& e) {
throw;
Expand All @@ -691,7 +718,6 @@ void CttController::export_record(uint64_t flow_hash_ctt)
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
m_commander->export_and_delete_record(std::move(key));
std::cout << "Exported record with key: " << flow_hash_ctt << std::endl;
}
catch (const std::exception& e) {
throw;
Expand Down
44 changes: 33 additions & 11 deletions storage/cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#ifndef IPXP_STORAGE_CACHE_HPP
#define IPXP_STORAGE_CACHE_HPP

#include <bits/types/struct_timeval.h>
#include <chrono>
#include <ctime>
#include <string>

#include <ipfixprobe/storage.hpp>
Expand All @@ -49,6 +52,8 @@
#include <ctt_exceptions.hpp>
#include <ctt_modes.hpp>
#include <ctt.hpp>
#include <queue>
#include <tuple>
#endif /* WITH_CTT */

namespace ipxp {
Expand Down Expand Up @@ -269,6 +274,10 @@ class alignas(64) FlowRecord

public:
Flow m_flow;
#ifdef WITH_CTT
Flow m_delayed_flow;
bool m_delayed_flow_waiting;
#endif /* WITH_CTT */

FlowRecord();
~FlowRecord();
Expand Down Expand Up @@ -322,29 +331,42 @@ class NHTFlowCache : TelemetryUtils, public StoragePlugin

int plugins_post_create(Flow &rec, Packet &pkt) {
int ret = StoragePlugin::plugins_post_create(rec, pkt);
rec.ctt_state = static_cast<int>(CttController::OffloadMode::NO_OFFLOAD);
if (no_data_required(rec)) {
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
}
rec.record_in_ctt = false;
//if (only_metadata_required(rec)) {
if (only_metadata_required(rec)) {
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
rec.record_in_ctt = true;
}
return ret;
}

// override post_update method
int plugins_post_update(Flow &rec, Packet &pkt) {
int ret = StoragePlugin::plugins_post_update(rec, pkt);
if (no_data_required(rec) && (rec.ctt_state == static_cast<int>(CttController::OffloadMode::NO_OFFLOAD))) {
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
rec.ctt_state = static_cast<int>(CttController::OffloadMode::PACKET_OFFLOAD);
}
//if (only_metadata_required(rec) && !rec.ctt_state) {
if (!rec.record_in_ctt) { // only for debug!!!!! line above is correct for production
m_ctt_controller.create_record(rec.flow_hash_ctt, rec.time_first);
rec.record_in_ctt = true;
}
return ret;
}

// override pre_export method
void plugins_pre_export(Flow &rec) {
StoragePlugin::plugins_pre_export(rec);
m_ctt_controller.export_record(rec.flow_hash_ctt);
if (rec.record_in_ctt) {
rec.is_delayed = true;
rec.delay_time = time(nullptr) + 1;
m_ctt_controller.export_record(rec.flow_hash_ctt);
rec.record_in_ctt = false;
return;
}
if (rec.is_delayed) {
return;
} else {
StoragePlugin::plugins_pre_export(rec);
}
}

#endif /* WITH_CTT */

private:
Expand Down

0 comments on commit 036856d

Please sign in to comment.