Skip to content

Commit

Permalink
Improve performance to read and convert nfdump-1.6.x records. See #512
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Mar 17, 2024
1 parent 3b388b8 commit d253340
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 16 deletions.
7 changes: 1 addition & 6 deletions src/include/nfdump.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,7 @@ typedef struct ip_addr_s {
typedef struct exporter_info_record_s exporter_info_record_t;
typedef struct extension_map_s extension_map_t;

enum { EXlocal = MAXEXTENSIONS,
SSLindex,
JA3index,
JA4index,
MAXLISTSIZE };
enum { EXlocal = MAXEXTENSIONS, SSLindex, JA3index, JA4index, MAXLISTSIZE };

typedef struct recordHandle_s {
recordHeaderV3_t *recordHeaderV3;
Expand All @@ -102,7 +98,6 @@ typedef struct recordHandle_s {
#define OFFflowCount offsetof(recordHandle_t, flowCount)
#define SIZEflowCount MemberSize(recordHandle_t, flowCount)
uint32_t numElements;
uint64_t elementBits;
} recordHandle_t;

typedef struct stat_record_s {
Expand Down
1 change: 0 additions & 1 deletion src/inline/nffile_inline.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static inline int MapRecordHandle(recordHandle_t *handle, recordHeaderV3_t *reco
if ((elementHeader->type > 0 && elementHeader->type < MAXEXTENSIONS) && elementHeader->length != 0) {
handle->extensionList[elementHeader->type] = (void *)elementHeader + sizeof(elementHeader_t);
elementHeader = (elementHeader_t *)((void *)elementHeader + elementHeader->length);
handle->elementBits |= 1 << elementHeader->type;
} else {
LogError("Invalid extension Type: %u, Length: %u", elementHeader->type, elementHeader->length);
return 0;
Expand Down
63 changes: 58 additions & 5 deletions src/nfdump/compat_1_6_x/nffile_compat.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,17 @@
*
*/

static inline record_header_t *ConvertRecordV2(common_record_t *input_record);
static inline record_header_t *ConvertRecordV2(recordHandle_t *handle, common_record_t *input_record, uint32_t flowCount);

#define MapExtension(extID, p) handle->extensionList[extID] = (void *)(p)

static inline record_header_t *ConvertRecordV2(recordHandle_t *handle, common_record_t *input_record, uint32_t flowCount) {
if (handle->extensionList[SSLindex]) free(handle->extensionList[SSLindex]);
if (handle->extensionList[JA3index]) free(handle->extensionList[JA3index]);
if (handle->extensionList[JA4index]) free(handle->extensionList[JA4index]);

memset((void *)handle, 0, sizeof(recordHandle_t));

static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
// tmp buffer on stack
static char tmpRecord[4096];
record_header_t *record_ptr = (record_header_t *)tmpRecord;
Expand All @@ -54,10 +62,15 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
extension_map_t *extension_map = extension_info->map;

AddV3Header(record_ptr, recordHeader);
MapExtension(EXnull, recordHeader);
MapExtension(EXlocal, handle);
handle->recordHeaderV3 = recordHeader;
handle->flowCount = flowCount;
recordHeader->exporterID = input_record->exporter_sysid;

// pack V3 record
PushExtension(recordHeader, EXgenericFlow, genericFlow);
MapExtension(EXgenericFlowID, genericFlow);
genericFlow->msecFirst = input_record->first * 1000L + input_record->msec_first;
genericFlow->msecLast = input_record->last * 1000L + input_record->msec_last;
genericFlow->proto = input_record->prot;
Expand All @@ -70,13 +83,15 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
if (TestFlag(input_record->flags, FLAG_IPV6_ADDR)) { // IPv6
// IPv6
PushExtension(recordHeader, EXipv6Flow, ipv6Flow);
MapExtension(EXipv6FlowID, ipv6Flow);
memcpy(ipv6Flow->srcAddr, (void *)input_record->data, 4 * sizeof(uint64_t));

p = (void *)(p + 4 * sizeof(uint64_t));
} else {
// IPv4
uint32_t *u = (uint32_t *)p;
PushExtension(recordHeader, EXipv4Flow, ipv4Flow);
MapExtension(EXipv4FlowID, ipv4Flow);

ipv4Flow->srcAddr = u[0];
ipv4Flow->dstAddr = u[1];
Expand Down Expand Up @@ -138,20 +153,23 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_AS_2: {
tpl_ext_6_t *tpl = (tpl_ext_6_t *)p;
PushExtension(recordHeader, EXasRouting, asRouting);
MapExtension(EXasRoutingID, asRouting);
asRouting->srcAS = tpl->src_as;
asRouting->dstAS = tpl->dst_as;
p = (void *)tpl->data;
} break;
case EX_AS_4: {
tpl_ext_7_t *tpl = (tpl_ext_7_t *)p;
PushExtension(recordHeader, EXasRouting, asRouting);
MapExtension(EXasRoutingID, asRouting);
asRouting->srcAS = tpl->src_as;
asRouting->dstAS = tpl->dst_as;
p = (void *)tpl->data;
} break;
case EX_MULIPLE: {
tpl_ext_8_t *tpl = (tpl_ext_8_t *)p;
PushExtension(recordHeader, EXflowMisc, flowMisc);
MapExtension(EXflowMiscID, flowMisc);
flowMisc->srcMask = tpl->src_mask;
flowMisc->dstMask = tpl->dst_mask;
flowMisc->dstTos = tpl->dst_tos;
Expand All @@ -164,6 +182,7 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_VLAN: {
tpl_ext_13_t *tpl = (tpl_ext_13_t *)p;
PushExtension(recordHeader, EXvLan, vLan);
MapExtension(EXvLanID, vLan);
vLan->srcVlan = tpl->src_vlan;
vLan->dstVlan = tpl->dst_vlan;
p = (void *)tpl->data;
Expand All @@ -179,12 +198,14 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_NEXT_HOP_v4: {
tpl_ext_9_t *tpl = (tpl_ext_9_t *)p;
PushExtension(recordHeader, EXipNextHopV4, ipNextHopV4);
MapExtension(EXipNextHopV4ID, ipNextHopV4);
ipNextHopV4->ip = tpl->nexthop;
p = (void *)tpl->data;
} break;
case EX_NEXT_HOP_v6: {
tpl_ext_10_t *tpl = (tpl_ext_10_t *)p;
PushExtension(recordHeader, EXipNextHopV6, ipNextHopV6);
MapExtension(EXipNextHopV6ID, ipNextHopV6);
ipNextHopV6->ip[0] = tpl->nexthop[0];
ipNextHopV6->ip[1] = tpl->nexthop[1];
p = (void *)tpl->data;
Expand All @@ -198,27 +219,31 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_ROUTER_IP_v4: {
tpl_ext_23_t *tpl = (tpl_ext_23_t *)p;
PushExtension(recordHeader, EXipReceivedV4, ipReceivedV4);
MapExtension(EXipReceivedV4ID, ipReceivedV4);
ipReceivedV4->ip = tpl->router_ip;
p = (void *)tpl->data;
} break;
case EX_ROUTER_IP_v6: {
tpl_ext_24_t *tpl = (tpl_ext_24_t *)p;
PushExtension(recordHeader, EXipReceivedV6, ipReceivedV6);
MapExtension(EXipReceivedV6ID, ipReceivedV6);
ipReceivedV6->ip[0] = tpl->router_ip[0];
ipReceivedV6->ip[1] = tpl->router_ip[1];
p = (void *)tpl->data;
} break;
case EX_NEXT_HOP_BGP_v4: {
tpl_ext_11_t *tpl = (tpl_ext_11_t *)p;
PushExtension(recordHeader, EXbgpNextHopV4, bgpNextHopV4);
MapExtension(EXbgpNextHopV4ID, bgpNextHopV4);
bgpNextHopV4->ip = tpl->bgp_nexthop;
p = (void *)tpl->data;
} break;
case EX_NEXT_HOP_BGP_v6: {
tpl_ext_12_t *tpl = (tpl_ext_12_t *)p;
PushExtension(recordHeader, EXipReceivedV6, ipNextHopV6);
ipNextHopV6->ip[0] = tpl->bgp_nexthop[0];
ipNextHopV6->ip[1] = tpl->bgp_nexthop[1];
PushExtension(recordHeader, EXbgpNextHopV6, bgpNextHopV6);
MapExtension(EXbgpNextHopV6ID, bgpNextHopV6);
bgpNextHopV6->ip[0] = tpl->bgp_nexthop[0];
bgpNextHopV6->ip[1] = tpl->bgp_nexthop[1];
p = (void *)tpl->data;
} break;
case EX_OUT_PKG_4: {
Expand Down Expand Up @@ -255,6 +280,7 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
tpl_ext_20_t *tpl = (tpl_ext_20_t *)p;
if (!macAddr) { // XXX fix
PushExtension(recordHeader, EXmacAddr, m);
MapExtension(EXmacAddrID, m);
macAddr = m;
macAddr->inSrcMac = tpl->in_src_mac;
macAddr->outDstMac = tpl->out_dst_mac;
Expand All @@ -270,6 +296,7 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
tpl_ext_21_t *tpl = (tpl_ext_21_t *)p;
if (!macAddr) {
PushExtension(recordHeader, EXmacAddr, m);
MapExtension(EXmacAddrID, m);
macAddr = m;
macAddr->inSrcMac = 0;
macAddr->outDstMac = 0;
Expand All @@ -284,20 +311,23 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_MPLS: {
tpl_ext_22_t *tpl = (tpl_ext_22_t *)p;
PushExtension(recordHeader, EXmplsLabel, mplsLabel);
MapExtension(EXmplsLabelID, mplsLabel);
for (int j = 0; j < 10; j++) {
mplsLabel->mplsLabel[j] = tpl->mpls_label[j];
}
p = (void *)tpl->data;
} break;
case EX_BGPADJ: {
PushExtension(recordHeader, EXasAdjacent, asAdjacent);
MapExtension(EXasAdjacentID, asAdjacent);
tpl_ext_26_t *tpl = (tpl_ext_26_t *)p;
asAdjacent->nextAdjacentAS = tpl->bgpNextAdjacentAS;
asAdjacent->prevAdjacentAS = tpl->bgpPrevAdjacentAS;
p = (void *)tpl->data;
} break;
case EX_LATENCY: {
PushExtension(recordHeader, EXlatency, latency);
MapExtension(EXlatencyID, latency);
tpl_ext_latency_t *tpl = (tpl_ext_latency_t *)p;
latency->usecClientNwDelay = tpl->client_nw_delay_usec;
latency->usecServerNwDelay = tpl->server_nw_delay_usec;
Expand All @@ -307,6 +337,7 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_NSEL_COMMON: {
tpl_ext_37_t *tpl = (tpl_ext_37_t *)p;
PushExtension(recordHeader, EXnselCommon, nselCommon);
MapExtension(EXnselCommonID, nselCommon);
nselCommon->msecEvent = tpl->event_time;
nselCommon->connID = tpl->conn_id;
nselCommon->fwXevent = tpl->fw_xevent;
Expand All @@ -317,34 +348,39 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_NSEL_XLATE_IP_v4: {
tpl_ext_39_t *tpl = (tpl_ext_39_t *)p;
PushExtension(recordHeader, EXnselXlateIPv4, nselXlateIPv4);
MapExtension(EXnselXlateIPv4ID, nselXlateIPv4);
nselXlateIPv4->xlateSrcAddr = tpl->xlate_src_ip;
nselXlateIPv4->xlateDstAddr = tpl->xlate_dst_ip;
p = (void *)tpl->data;
} break;
case EX_NSEL_XLATE_IP_v6: {
tpl_ext_40_t *tpl = (tpl_ext_40_t *)p;
PushExtension(recordHeader, EXnselXlateIPv6, nselXlateIPv6);
MapExtension(EXnselXlateIPv6ID, nselXlateIPv6);
memcpy(nselXlateIPv6->xlateSrcAddr, tpl->xlate_src_ip, 16);
memcpy(nselXlateIPv6->xlateDstAddr, tpl->xlate_dst_ip, 16);
p = (void *)tpl->data;
} break;
case EX_NSEL_XLATE_PORTS: {
tpl_ext_38_t *tpl = (tpl_ext_38_t *)p;
PushExtension(recordHeader, EXnselXlatePort, nselXlatePort);
MapExtension(EXnselXlatePortID, nselXlatePort);
nselXlatePort->xlateSrcPort = tpl->xlate_src_port;
nselXlatePort->xlateDstPort = tpl->xlate_dst_port;
p = (void *)tpl->data;
} break;
case EX_NSEL_ACL: {
tpl_ext_41_t *tpl = (tpl_ext_41_t *)p;
PushExtension(recordHeader, EXnselAcl, nselAcl);
MapExtension(EXnselAclID, nselAcl);
memcpy(nselAcl->ingressAcl, tpl->ingress_acl_id, 12);
memcpy(nselAcl->egressAcl, tpl->egress_acl_id, 12);
p = (void *)tpl->data;
} break;
case EX_NSEL_USER_MAX: {
tpl_ext_43_t *tpl = (tpl_ext_43_t *)p;
PushExtension(recordHeader, EXnselUser, nselUser);
MapExtension(EXnselUserID, nselUser);
tpl->username[65] = '\0'; // truncate old username
strncpy((void *)nselUser->username, (void *)tpl->username, 65);
nselUser->username[65] = '\0';
Expand All @@ -353,15 +389,18 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_NSEL_USER: {
tpl_ext_42_t *tpl = (tpl_ext_42_t *)p;
PushExtension(recordHeader, EXnselUser, nselUser);
MapExtension(EXnselUserID, nselUser);
strncpy((void *)nselUser->username, (void *)tpl->username, 65);
nselUser->username[65] = '\0';
p = (void *)tpl->data;
} break;
case EX_NEL_COMMON: {
tpl_ext_46_t *tpl = (tpl_ext_46_t *)p;
PushExtension(recordHeader, EXnelCommon, nelCommon);
MapExtension(EXnelCommonID, nelCommon);
nelCommon->natEvent = tpl->nat_event;
PushExtension(recordHeader, EXvrf, vrf);
MapExtension(EXvrfID, vrf);
vrf->egressVrf = tpl->egress_vrfid;
vrf->ingressVrf = tpl->ingress_vrfid;
p = (void *)tpl->data;
Expand All @@ -374,6 +413,7 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
case EX_PORT_BLOCK_ALLOC: {
tpl_ext_48_t *tpl = (tpl_ext_48_t *)p;
PushExtension(recordHeader, EXnelXlatePort, nelXlatePort);
MapExtension(EXnelXlatePortID, nelXlatePort);
nelXlatePort->blockStart = tpl->block_start;
nelXlatePort->blockEnd = tpl->block_end;
nelXlatePort->blockStep = tpl->block_step;
Expand All @@ -387,6 +427,7 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {

if (outPackets || outBytes || numFlows) {
PushExtension(recordHeader, EXcntFlow, cntFlow);
MapExtension(EXcntFlowID, cntFlow);
cntFlow->outPackets = outPackets;
cntFlow->outBytes = outBytes;
cntFlow->flows = numFlows;
Expand All @@ -398,11 +439,23 @@ static inline record_header_t *ConvertRecordV2(common_record_t *input_record) {
_flowMisc->output = output;
} else {
PushExtension(recordHeader, EXflowMisc, flowMisc);
MapExtension(EXflowMiscID, flowMisc);
flowMisc->input = input;
flowMisc->output = output;
}
}

handle->numElements = recordHeader->numElements;
if (genericFlow && genericFlow->msecFirst == 0) {
EXnselCommon_t *nselCommon = (EXnselCommon_t *)handle->extensionList[EXnselCommonID];
if (nselCommon) {
genericFlow->msecFirst = nselCommon->msecEvent;
} else {
EXnelCommon_t *nelCommon = (EXnelCommon_t *)handle->extensionList[EXnelCommonID];
if (nelCommon) genericFlow->msecFirst = nelCommon->msecEvent;
}
}

dbg_printf("V3 record: elements: %u, size: %u\n", recordHeader->numElements, recordHeader->size);
return record_ptr;

Expand Down
6 changes: 4 additions & 2 deletions src/nfdump/nfdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,12 @@ static stat_record_t process_data(void *engine, char *wfile, int element_stat, i
case CommonRecordType: {
if (__builtin_expect(record_ptr->type == CommonRecordType, 0)) {
dbg_printf("Convert nfdump 1.6.x v2 record\n");
process_ptr = ConvertRecordV2((common_record_t *)record_ptr);
// ConvertRecordV2 also maps recordHandle
process_ptr = ConvertRecordV2(recordHandle, (common_record_t *)record_ptr, ++processed);
if (!process_ptr) goto NEXT;
} else {
MapRecordHandle(recordHandle, (recordHeaderV3_t *)process_ptr, ++processed);
}
MapRecordHandle(recordHandle, (recordHeaderV3_t *)process_ptr, ++processed);

// Time based filter
// if no time filter is given, the result is always true
Expand Down
9 changes: 7 additions & 2 deletions src/nfreplay/send_v9.c
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,15 @@ int Close_v9_output(send_peer_t *peer) {
static outTemplate_t *GetOutputTemplate(recordHandle_t *recordHandle) {
uint32_t template_id = 0;

uint64_t elementBits = 0;
for (int i = 0; i < MAXEXTENSIONS; i++) {
if (recordHandle->extensionList[i]) elementBits |= 1 << i;
}

outTemplate_t **t = &outTemplates;
// search for the template, which corresponds to our flags and extension map
while (*t) {
if (((*t)->elementBits == recordHandle->elementBits) && ((*t)->numExtensions == recordHandle->numElements)) {
if (((*t)->elementBits == elementBits) && ((*t)->numExtensions == recordHandle->numElements)) {
return *t;
}
template_id = (*t)->template_id;
Expand All @@ -194,7 +199,7 @@ static outTemplate_t *GetOutputTemplate(recordHandle_t *recordHandle) {
}
(*t)->next = NULL;

(*t)->elementBits = recordHandle->elementBits;
(*t)->elementBits = elementBits;
(*t)->numExtensions = recordHandle->numElements;

if (template_id == 0)
Expand Down

0 comments on commit d253340

Please sign in to comment.