Skip to content

Commit

Permalink
Add working threads to nfprofile
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Mar 31, 2024
1 parent 819d8bc commit 9289f6a
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 123 deletions.
6 changes: 3 additions & 3 deletions src/libnffile/nffile.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ static int Uncompress_Block_BZ2(dataBlock_t *in_block, dataBlock_t *out_block, s

static dataBlock_t *NewDataBlock(void);

static nffile_t *NewFile(nffile_t *nffile);

static dataBlock_t *nfread(nffile_t *nffile);

static int nfwrite(nffile_t *nffile, dataBlock_t *block_header);
Expand Down Expand Up @@ -684,7 +682,7 @@ static int WriteAppendix(nffile_t *nffile) {

} // End of WriteAppendix

static nffile_t *NewFile(nffile_t *nffile) {
nffile_t *NewFile(nffile_t *nffile) {
// Create struct
if (!nffile) {
nffile = calloc(1, sizeof(nffile_t));
Expand Down Expand Up @@ -1195,6 +1193,8 @@ int CloseUpdateFile(nffile_t *nffile) {

// destroy nffile handle: free up all resources
void DisposeFile(nffile_t *nffile) {
if (nffile == NULL) return;

if (nffile->fd > 0) CloseFile(nffile);
if (nffile->file_header) free(nffile->file_header);
if (nffile->stat_record) free(nffile->stat_record);
Expand Down
2 changes: 2 additions & 0 deletions src/libnffile/nffile.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ int QueryFile(char *filename, int verbose);

int GetStatRecord(char *filename, stat_record_t *stat_record);

nffile_t *NewFile(nffile_t *nffile);

void DisposeFile(nffile_t *nffile);

void CloseFile(nffile_t *nffile);
Expand Down
89 changes: 34 additions & 55 deletions src/nfanon/nfanon.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,47 +217,14 @@ static inline void AnonRecord(recordHeaderV3_t *v3Record) {

static void process_data(char *wfile, int verbose, worker_param_t **workerList, int numWorkers, pthread_control_barrier_t *barrier) {
const char spinner[4] = {'|', '/', '-', '\\'};
char pathBuff[MAXPATHLEN];

// Get the first file handle
nffile_t *nffile_r = GetNextFile(NULL);
if (nffile_r == NULL) {
LogError("Empty file list. No files to process\n");
return;
}

int cnt = 1;
char *cfile = nffile_r->fileName;
if (!cfile) {
LogError("(NULL) input file name error in %s line %d\n", __FILE__, __LINE__);
return;
}
if (verbose) printf(" %i Processing %s\r", cnt++, cfile);

char *outFile = NULL;
if (wfile == NULL) {
// prepare output file
snprintf(pathBuff, MAXPATHLEN - 1, "%s-tmp", cfile);
pathBuff[MAXPATHLEN - 1] = '\0';
outFile = pathBuff;
} else {
outFile = wfile;
}
char *cfile = NULL;

nffile_t *nffile_w = OpenNewFile(outFile, NULL, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED);
if (!nffile_w) {
// can not create output file
CloseFile(nffile_r);
DisposeFile(nffile_r);
return;
}

SetIdent(nffile_w, FILE_IDENT(nffile_r));
memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t));

// wait for workers ready to start
pthread_controller_wait(barrier);
int cnt = 1;
nffile_t *nffile_r = NewFile(NULL);
nffile_t *nffile_w = NULL;

dataBlock_t *nextBlock = NULL;
dataBlock_t *dataBlock = NULL;
// map datablock for workers - all workers
// process thesame block but different records
Expand All @@ -266,21 +233,22 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
workerList[i]->dataBlock = &dataBlock;
}

// wait for workers ready to start
pthread_controller_wait(barrier);

int blk_count = 0;
int done = 0;
while (!done) {
// get next data block from file
dataBlock = ReadBlock(nffile_r, dataBlock);
if (verbose) {
printf("\r%c", spinner[blk_count & 0x3]);
blk_count++;
}

// get next data block
dataBlock = nextBlock;
if (dataBlock == NULL) {
CloseUpdateFile(nffile_w);
if (wfile == NULL && rename(outFile, cfile) < 0) {
LogError("rename() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno));
return;
// nffile_w is NULL for 1st entry in while loop
if (nffile_w) {
CloseUpdateFile(nffile_w);
if (wfile == NULL && rename(outFile, cfile) < 0) {
LogError("rename() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno));
return;
}
}

if (GetNextFile(nffile_r) == NULL) {
Expand All @@ -289,7 +257,7 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
continue;
}

cfile = nffile_r->fileName;
char *cfile = nffile_r->fileName;
if (!cfile) {
LogError("(NULL) input file name error in %s line %d\n", __FILE__, __LINE__);
CloseFile(nffile_r);
Expand All @@ -298,6 +266,7 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
}
if (verbose) printf(" %i Processing %s\r", cnt++, cfile);

char pathBuff[MAXPATHLEN];
if (wfile == NULL) {
// prepare output file
snprintf(pathBuff, MAXPATHLEN - 1, "%s-tmp", cfile);
Expand All @@ -318,25 +287,35 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
SetIdent(nffile_w, FILE_IDENT(nffile_r));
memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t));

// continue with next file
// read first block from next file
nextBlock = ReadBlock(nffile_r, NULL);
continue;
}

if (verbose) {
printf("\r%c", spinner[blk_count & 0x3]);
blk_count++;
}

if (dataBlock->type != DATA_BLOCK_TYPE_2 && dataBlock->type != DATA_BLOCK_TYPE_3) {
LogError("Can't process block type %u. Write block unmodified", dataBlock->type);
dataBlock = WriteBlock(nffile_w, dataBlock);
nextBlock = ReadBlock(nffile_r, NULL);
continue;
}

dbg_printf("Next block: %d, Records: %u\n", blk_count, dataBlock->NumRecords);
// release workers from barrier
pthread_control_barrier_release(barrier);

// wait for all workers, work done on this block
// prefetch next block
nextBlock = ReadBlock(nffile_r, NULL);

// wait for all workers, work done on previous block
pthread_controller_wait(barrier);

// write modified block
dataBlock = WriteBlock(nffile_w, dataBlock);
FlushBlock(nffile_w, dataBlock);

} // while

Expand All @@ -345,8 +324,8 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList,
pthread_control_barrier_release(barrier);

FreeDataBlock(dataBlock);
DisposeFile(nffile_w);
DisposeFile(nffile_r);
DisposeFile(nffile_w);

if (verbose) LogError("Processed %i files", --cnt);

Expand Down Expand Up @@ -526,7 +505,7 @@ int main(int argc, char **argv) {
pthread_control_barrier_t *barrier = pthread_control_barrier_init(numWorkers);
if (!barrier) exit(255);

pthread_t tid[MAXWORKERS];
pthread_t tid[MAXWORKERS] = {0};
dbg_printf("Launch Workers\n");
worker_param_t **workerList = LauchWorkers(tid, numWorkers, barrier);
if (!workerList) {
Expand Down
1 change: 0 additions & 1 deletion src/nfreplay/nfreplay.c
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ static void send_data(void *engine, timeWindow_t *timeWindow, uint32_t limitReco
match &= limitRecords ? numflows < limitRecords : 1;

// filter netflow record with user supplied filter
// XXX if (match) match = (*Engine->FilterEngine)(Engine);
if (match) match = FilterRecord(engine, recordHandle);

if (match == 0) { // record failed to pass all filters
Expand Down
Loading

0 comments on commit 9289f6a

Please sign in to comment.