diff --git a/src/libnffile/nffile.c b/src/libnffile/nffile.c index d5b5cfd3..3483e074 100644 --- a/src/libnffile/nffile.c +++ b/src/libnffile/nffile.c @@ -124,8 +124,6 @@ static int Uncompress_Block_BZ2(dataBlock_t *in_block, dataBlock_t *out_block, s static dataBlock_t *NewDataBlock(void); -static nffile_t *NewFile(nffile_t *nffile); - static dataBlock_t *nfread(nffile_t *nffile); static int nfwrite(nffile_t *nffile, dataBlock_t *block_header); @@ -684,7 +682,7 @@ static int WriteAppendix(nffile_t *nffile) { } // End of WriteAppendix -static nffile_t *NewFile(nffile_t *nffile) { +nffile_t *NewFile(nffile_t *nffile) { // Create struct if (!nffile) { nffile = calloc(1, sizeof(nffile_t)); @@ -1195,6 +1193,8 @@ int CloseUpdateFile(nffile_t *nffile) { // destroy nffile handle: free up all resources void DisposeFile(nffile_t *nffile) { + if (nffile == NULL) return; + if (nffile->fd > 0) CloseFile(nffile); if (nffile->file_header) free(nffile->file_header); if (nffile->stat_record) free(nffile->stat_record); diff --git a/src/libnffile/nffile.h b/src/libnffile/nffile.h index 0d74d3bf..66dba127 100644 --- a/src/libnffile/nffile.h +++ b/src/libnffile/nffile.h @@ -259,6 +259,8 @@ int QueryFile(char *filename, int verbose); int GetStatRecord(char *filename, stat_record_t *stat_record); +nffile_t *NewFile(nffile_t *nffile); + void DisposeFile(nffile_t *nffile); void CloseFile(nffile_t *nffile); diff --git a/src/nfanon/nfanon.c b/src/nfanon/nfanon.c index ef57e793..c537251f 100755 --- a/src/nfanon/nfanon.c +++ b/src/nfanon/nfanon.c @@ -217,47 +217,14 @@ static inline void AnonRecord(recordHeaderV3_t *v3Record) { static void process_data(char *wfile, int verbose, worker_param_t **workerList, int numWorkers, pthread_control_barrier_t *barrier) { const char spinner[4] = {'|', '/', '-', '\\'}; - char pathBuff[MAXPATHLEN]; - - // Get the first file handle - nffile_t *nffile_r = GetNextFile(NULL); - if (nffile_r == NULL) { - LogError("Empty file list. No files to process\n"); - return; - } - - int cnt = 1; - char *cfile = nffile_r->fileName; - if (!cfile) { - LogError("(NULL) input file name error in %s line %d\n", __FILE__, __LINE__); - return; - } - if (verbose) printf(" %i Processing %s\r", cnt++, cfile); - char *outFile = NULL; - if (wfile == NULL) { - // prepare output file - snprintf(pathBuff, MAXPATHLEN - 1, "%s-tmp", cfile); - pathBuff[MAXPATHLEN - 1] = '\0'; - outFile = pathBuff; - } else { - outFile = wfile; - } + char *cfile = NULL; - nffile_t *nffile_w = OpenNewFile(outFile, NULL, CREATOR_NFANON, FILE_COMPRESSION(nffile_r), NOT_ENCRYPTED); - if (!nffile_w) { - // can not create output file - CloseFile(nffile_r); - DisposeFile(nffile_r); - return; - } - - SetIdent(nffile_w, FILE_IDENT(nffile_r)); - memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t)); - - // wait for workers ready to start - pthread_controller_wait(barrier); + int cnt = 1; + nffile_t *nffile_r = NewFile(NULL); + nffile_t *nffile_w = NULL; + dataBlock_t *nextBlock = NULL; dataBlock_t *dataBlock = NULL; // map datablock for workers - all workers // process thesame block but different records @@ -266,21 +233,22 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, workerList[i]->dataBlock = &dataBlock; } + // wait for workers ready to start + pthread_controller_wait(barrier); + int blk_count = 0; int done = 0; while (!done) { - // get next data block from file - dataBlock = ReadBlock(nffile_r, dataBlock); - if (verbose) { - printf("\r%c", spinner[blk_count & 0x3]); - blk_count++; - } - + // get next data block + dataBlock = nextBlock; if (dataBlock == NULL) { - CloseUpdateFile(nffile_w); - if (wfile == NULL && rename(outFile, cfile) < 0) { - LogError("rename() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno)); - return; + // nffile_w is NULL for 1st entry in while loop + if (nffile_w) { + CloseUpdateFile(nffile_w); + if (wfile == NULL && rename(outFile, cfile) < 0) { + LogError("rename() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno)); + return; + } } if (GetNextFile(nffile_r) == NULL) { @@ -289,7 +257,7 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, continue; } - cfile = nffile_r->fileName; + char *cfile = nffile_r->fileName; if (!cfile) { LogError("(NULL) input file name error in %s line %d\n", __FILE__, __LINE__); CloseFile(nffile_r); @@ -298,6 +266,7 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, } if (verbose) printf(" %i Processing %s\r", cnt++, cfile); + char pathBuff[MAXPATHLEN]; if (wfile == NULL) { // prepare output file snprintf(pathBuff, MAXPATHLEN - 1, "%s-tmp", cfile); @@ -318,13 +287,20 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, SetIdent(nffile_w, FILE_IDENT(nffile_r)); memcpy((void *)nffile_w->stat_record, (void *)nffile_r->stat_record, sizeof(stat_record_t)); - // continue with next file + // read first block from next file + nextBlock = ReadBlock(nffile_r, NULL); continue; } + if (verbose) { + printf("\r%c", spinner[blk_count & 0x3]); + blk_count++; + } + if (dataBlock->type != DATA_BLOCK_TYPE_2 && dataBlock->type != DATA_BLOCK_TYPE_3) { LogError("Can't process block type %u. Write block unmodified", dataBlock->type); dataBlock = WriteBlock(nffile_w, dataBlock); + nextBlock = ReadBlock(nffile_r, NULL); continue; } @@ -332,11 +308,14 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, // release workers from barrier pthread_control_barrier_release(barrier); - // wait for all workers, work done on this block + // prefetch next block + nextBlock = ReadBlock(nffile_r, NULL); + + // wait for all workers, work done on previous block pthread_controller_wait(barrier); // write modified block - dataBlock = WriteBlock(nffile_w, dataBlock); + FlushBlock(nffile_w, dataBlock); } // while @@ -345,8 +324,8 @@ static void process_data(char *wfile, int verbose, worker_param_t **workerList, pthread_control_barrier_release(barrier); FreeDataBlock(dataBlock); - DisposeFile(nffile_w); DisposeFile(nffile_r); + DisposeFile(nffile_w); if (verbose) LogError("Processed %i files", --cnt); @@ -526,7 +505,7 @@ int main(int argc, char **argv) { pthread_control_barrier_t *barrier = pthread_control_barrier_init(numWorkers); if (!barrier) exit(255); - pthread_t tid[MAXWORKERS]; + pthread_t tid[MAXWORKERS] = {0}; dbg_printf("Launch Workers\n"); worker_param_t **workerList = LauchWorkers(tid, numWorkers, barrier); if (!workerList) { diff --git a/src/nfreplay/nfreplay.c b/src/nfreplay/nfreplay.c index 46ec52f5..f8bacbc0 100644 --- a/src/nfreplay/nfreplay.c +++ b/src/nfreplay/nfreplay.c @@ -308,7 +308,6 @@ static void send_data(void *engine, timeWindow_t *timeWindow, uint32_t limitReco match &= limitRecords ? numflows < limitRecords : 1; // filter netflow record with user supplied filter - // XXX if (match) match = (*Engine->FilterEngine)(Engine); if (match) match = FilterRecord(engine, recordHandle); if (match == 0) { // record failed to pass all filters diff --git a/src/nfsen/nfprofile.c b/src/nfsen/nfprofile.c index c0e3d5a2..e19a6936 100644 --- a/src/nfsen/nfprofile.c +++ b/src/nfsen/nfprofile.c @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -43,6 +44,7 @@ #include #include +#include "barrier.h" #include "conf/nfconf.h" #include "filter/filter.h" #include "flist.h" @@ -60,12 +62,27 @@ char influxdb_url[1024] = ""; #endif +#define PROFILEWRITERS 2 +#define MAXPROFILERS 8 + +typedef struct worker_param_s { + int self; + uint32_t numWorkers; + uint32_t numChannels; + profile_channel_info_t *channels; + dataBlock_t **dataBlock; + + // sync barrier + pthread_control_barrier_t *barrier; +} worker_param_t; + /* Function Prototypes */ static void usage(char *name); static profile_param_info_t *ParseParams(char *profile_datadir); -static void process_data(profile_channel_info_t *channels, unsigned int num_channels, time_t tslot); +static void process_data(profile_channel_info_t *channels, unsigned int numChannels, time_t tslot, worker_param_t **workerList, int numWorkers, + pthread_control_barrier_t *barrie); /* Functions */ @@ -97,52 +114,27 @@ static void usage(char *name) { name); } /* usage */ -static void process_data(profile_channel_info_t *channels, unsigned int num_channels, time_t tslot) { - nffile_t *nffile = GetNextFile(NULL); - if (!nffile) { - LogError("GetNextFile() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno)); - return; - } - if (nffile == NULL) { - LogError("Empty file list. No files to process"); - return; - } - for (int j = 0; j < num_channels; j++) { - // apply profile filter - void *engine = channels[j].engine; - FilterSetParam(engine, nffile->ident, NOGEODB); - } +__attribute__((noreturn)) static void *worker(void *arg) { + worker_param_t *worker_param = (worker_param_t *)arg; + + uint32_t self = worker_param->self; + uint32_t numWorkers = worker_param->numWorkers; + uint32_t numChannels = worker_param->numChannels; + profile_channel_info_t *channels = worker_param->channels; recordHandle_t *recordHandle = calloc(1, sizeof(recordHandle_t)); if (!recordHandle) { LogError("malloc() error in %s line %d: %s\n", __FILE__, __LINE__, strerror(errno)); - return; + pthread_exit(NULL); } - dataBlock_t *dataBlock = NULL; - uint32_t processed = 0; - int done = 0; - while (!done) { - // get next data block from file - dataBlock = ReadBlock(nffile, dataBlock); - if (dataBlock == NULL) { - nffile_t *next = GetNextFile(nffile); - if (next == NULL) { - done = 1; - continue; - } - for (int j = 0; j < num_channels; j++) { - // apply profile filter - void *engine = channels[j].engine; - FilterSetParam(engine, nffile->ident, NOGEODB); - } - continue; - } + // wait in barrier after launch + pthread_control_barrier_wait(worker_param->barrier); - if (dataBlock->type != DATA_BLOCK_TYPE_2 && dataBlock->type != DATA_BLOCK_TYPE_3) { - LogError("Can't process block type %u. Skip block", dataBlock->type); - continue; - } + while (*(worker_param->dataBlock)) { + dataBlock_t *dataBlock = *(worker_param->dataBlock); + dbg_printf("Worker %i working on %p\n", self, dataBlock); + uint32_t recordCount = 0; record_header_t *record_ptr = GetCursor(dataBlock); uint32_t sumSize = 0; @@ -152,13 +144,13 @@ static void process_data(profile_channel_info_t *channels, unsigned int num_chan exit(255); } sumSize += record_ptr->size; + recordCount++; switch (record_ptr->type) { case V3Record: - processed++; - MapRecordHandle(recordHandle, (recordHeaderV3_t *)record_ptr, processed); + MapRecordHandle(recordHandle, (recordHeaderV3_t *)record_ptr, recordCount); - for (int j = 0; j < num_channels; j++) { + for (int j = self; j < numChannels; j += numWorkers) { int match; // apply profile filter @@ -184,7 +176,7 @@ static void process_data(profile_channel_info_t *channels, unsigned int num_chan break; case ExporterInfoRecordType: { - for (int j = 0; j < num_channels; j++) { + for (int j = self; j < numChannels; j += numWorkers) { if (channels[j].nffile != NULL) { // flush new exporter channels[j].dataBlock = AppendToBuffer(channels[j].nffile, channels[j].dataBlock, (void *)record_ptr, record_ptr->size); @@ -193,7 +185,7 @@ static void process_data(profile_channel_info_t *channels, unsigned int num_chan } break; case SamplerLegacyRecordType: case SamplerRecordType: { - for (int j = 0; j < num_channels; j++) { + for (int j = self; j < numChannels; j += numWorkers) { if (channels[j].nffile != NULL) { // flush new map channels[j].dataBlock = AppendToBuffer(channels[j].nffile, channels[j].dataBlock, (void *)record_ptr, record_ptr->size); @@ -203,7 +195,7 @@ static void process_data(profile_channel_info_t *channels, unsigned int num_chan case NbarRecordType: case IfNameRecordType: case VrfNameRecordType: - for (int j = 0; j < num_channels; j++) { + for (int j = self; j < numChannels; j += numWorkers) { if (channels[j].nffile != NULL) { // flush new map channels[j].dataBlock = AppendToBuffer(channels[j].nffile, channels[j].dataBlock, (void *)record_ptr, record_ptr->size); @@ -223,27 +215,120 @@ static void process_data(profile_channel_info_t *channels, unsigned int num_chan record_ptr = (record_header_t *)((pointer_addr_t)record_ptr + record_ptr->size); } // End of for all umRecords - } // End of while !done - // Close input - FreeDataBlock(dataBlock); - CloseFile(nffile); + // Done + // wait in barrier for next data record + pthread_control_barrier_wait(worker_param->barrier); + } + + dbg_printf("Worker %d done.\n", worker_param->self); + pthread_exit(NULL); + + // unreached +} // End of worker + +static worker_param_t **LauchWorkers(pthread_t *tid, int numWorkers, pthread_control_barrier_t *barrier, profile_channel_info_t *channels, + uint32_t numChannels) { + if (numWorkers > MAXWORKERS) { + LogError("LaunchWorkers: number of worker: %u > max workers: %u", numWorkers, MAXWORKERS); + return NULL; + } + + worker_param_t **workerList = calloc(numWorkers, sizeof(worker_param_t *)); + if (!workerList) NULL; + + for (int i = 0; i < numWorkers; i++) { + worker_param_t *worker_param = calloc(1, sizeof(worker_param_t)); + if (!worker_param) NULL; + + worker_param->barrier = barrier; + worker_param->self = i; + worker_param->dataBlock = NULL; + worker_param->numWorkers = numWorkers; + worker_param->channels = channels; + worker_param->numChannels = numChannels; + workerList[i] = worker_param; + + int err = pthread_create(&(tid[i]), NULL, worker, (void *)worker_param); + if (err) { + LogError("pthread_create() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno)); + return NULL; + } + } + return workerList; + +} // End of LaunchWorkers + +static void process_data(profile_channel_info_t *channels, unsigned int numChannels, time_t tslot, worker_param_t **workerList, int numWorkers, + pthread_control_barrier_t *barrier) { + dataBlock_t *nextBlock = NULL; + dataBlock_t *dataBlock = NULL; + // map datablock for workers - all workers + // process the same block but different channels + for (int i = 0; i < numWorkers; i++) { + // set new datablock for all workers + workerList[i]->dataBlock = &dataBlock; + } + + nffile_t *nffile = NewFile(NULL); + + // wait for workers ready to start + pthread_controller_wait(barrier); + + int done = 0; + while (!done) { + // get next data block from file + dataBlock = nextBlock; + if (dataBlock == NULL) { + if (GetNextFile(nffile) == NULL) { + done = 1; + continue; + } + for (int j = 0; j < numChannels; j++) { + // set ident to file engines + void *engine = channels[j].engine; + FilterSetParam(engine, nffile->ident, NOGEODB); + } + // read first block and continue + nextBlock = ReadBlock(nffile, NULL); + continue; + } + + if (dataBlock->type != DATA_BLOCK_TYPE_2 && dataBlock->type != DATA_BLOCK_TYPE_3) { + LogError("Can't process block type %u. Skip block", dataBlock->type); + nextBlock = ReadBlock(nffile, NULL); + continue; + } + + dbg_printf("Next block: Records: %u\n", dataBlock->NumRecords); + // release workers from barrier + pthread_control_barrier_release(barrier); + + // get next block while worker are processing the previous one + nextBlock = ReadBlock(nffile, NULL); + + // wait for all workers, work done on this block + pthread_controller_wait(barrier); + // free processed block + FreeDataBlock(dataBlock); + + } // End of while !done + + // done! - signal all workers to terminate + dataBlock = NULL; + pthread_control_barrier_release(barrier); + DisposeFile(nffile); // do we need to write data to new file - shadow profiles do not have files. // write all used blocks first, then close the files - for (int j = 0; j < num_channels; j++) { + for (int j = 0; j < numChannels; j++) { if (channels[j].nffile != NULL) { // flush output buffer FlushBlock(channels[j].nffile, channels[j].dataBlock); *channels[j].nffile->stat_record = channels[j].stat_record; - } - } - for (int j = 0; j < num_channels; j++) { - if (channels[j].nffile != NULL) { CloseUpdateFile(channels[j].nffile); DisposeFile(channels[j].nffile); - channels[j].nffile = NULL; } } @@ -409,8 +494,21 @@ static profile_param_info_t *ParseParams(char *profile_datadir) { } // End of ParseParams +static void WaitWorkersDone(pthread_t *tid, int numWorkers) { + // wait for all nfwriter threads to exit + for (int i = 0; i < numWorkers; i++) { + if (tid[i]) { + int err = pthread_join(tid[i], NULL); + if (err) { + LogError("pthread_join() error in %s line %d: %s", __FILE__, __LINE__, strerror(errno)); + } + tid[i] = 0; + } + } +} // End of WaitWorkersDone + int main(int argc, char **argv) { - unsigned int num_channels, compress; + unsigned int numChannels, compress; profile_param_info_t *profile_list; char *ffile, *filename, *syslog_facility; char *profile_datadir, *profile_statdir, *nameserver; @@ -418,6 +516,7 @@ int main(int argc, char **argv) { time_t tslot; flist_t flist; + int numWorkers = MAXPROFILERS; memset((void *)&flist, 0, sizeof(flist)); profile_datadir = NULL; profile_statdir = NULL; @@ -585,10 +684,18 @@ int main(int argc, char **argv) { exit(255); } - num_channels = InitChannels(profile_datadir, profile_statdir, profile_list, ffile, filename, subdir_index, syntax_only, compress); + if (!flist.single_file) { + LogError("Input file (-r) required!"); + exit(255); + } + + queue_t *fileList = SetupInputFileSequence(&flist); + if (!fileList || !Init_nffile(PROFILEWRITERS, fileList)) exit(254); + + numChannels = InitChannels(profile_datadir, profile_statdir, profile_list, ffile, filename, subdir_index, syntax_only, compress); // nothing to do - if (num_channels == 0) { + if (numChannels == 0) { LogInfo("No channels to process"); return 0; } @@ -598,15 +705,26 @@ int main(int argc, char **argv) { return 0; } - if (!flist.single_file) { - LogError("Input file (-r) required!"); + // check numWorkers depending on cores online + numWorkers = GetNumWorkers(numWorkers); + + pthread_control_barrier_t *barrier = pthread_control_barrier_init(numWorkers); + if (!barrier) exit(255); + + profile_channel_info_t *channels = GetChannelInfoList(); + + pthread_t tid[MAXWORKERS] = {0}; + dbg_printf("Launch Workers\n"); + worker_param_t **workerList = LauchWorkers(tid, numWorkers, barrier, channels, numChannels); + if (!workerList) { + LogError("Failed to launch workers"); exit(255); } - queue_t *fileList = SetupInputFileSequence(&flist); - if (!fileList || !Init_nffile(DEFAULTWORKERS, fileList)) exit(254); + process_data(channels, numChannels, tslot, workerList, numWorkers, barrier); - process_data(GetChannelInfoList(), num_channels, tslot); + WaitWorkersDone(tid, numWorkers); + pthread_control_barrier_destroy(barrier); UpdateChannels(tslot); #if 0