Skip to content

Commit

Permalink
implement stream reading
Browse files Browse the repository at this point in the history
  • Loading branch information
solnicki committed Nov 5, 2024
1 parent 487c905 commit 771373e
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 773 deletions.
601 changes: 0 additions & 601 deletions examples/anon_data/source-names-cz.txt

This file was deleted.

35 changes: 14 additions & 21 deletions internal/anonymizer/anonymizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,37 @@ import (
)

type Anonymizer struct {
csvData []map[string]string
anonData map[string][]string
randFunc func(int) int
}

func New(csvData []map[string]string, anonData map[string][]string) *Anonymizer {
func New(anonData map[string][]string) *Anonymizer {
return &Anonymizer{
csvData: csvData,
anonData: anonData,
randFunc: rand.Intn,
}
}

func (an *Anonymizer) anonymize() []string {
var output []string
for _, logLine := range an.csvData {
for field, value := range logLine {
if field == "raw" {
continue
}
func (an *Anonymizer) Anonymize(logLine map[string]string) string {
for field, value := range logLine {
if field == "raw" {
continue
}

if value == "" {
continue
}
if value == "" {
continue
}

if anonValues, exists := an.anonData[field]; exists {
newAnonValue := anonValues[an.randFunc(len(anonValues))]
if anonValues, exists := an.anonData[field]; exists {
newAnonValue := anonValues[an.randFunc(len(anonValues))]

slog.Debug(fmt.Sprintf("Replacing the values for field %s. From %s to %s.\n", field, value, newAnonValue))
slog.Debug(fmt.Sprintf("Replacing the values for field %s. From %s to %s.\n", field, value, newAnonValue))

logLine["raw"] = strings.Replace(logLine["raw"], value, newAnonValue, -1)
}
logLine["raw"] = strings.Replace(logLine["raw"], value, newAnonValue, -1)
}

output = append(output, fmt.Sprint(logLine["raw"]))
}

return output
return logLine["raw"]
}

func (an *Anonymizer) setRandFunc(randFunc func(int) int) {
Expand Down
22 changes: 9 additions & 13 deletions internal/anonymizer/anonymizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,28 @@ func TestAnonimizer_AnonymizeData(t *testing.T) {
tests := []struct {
name string
anonDataDir string
inputFile string
expectedOutput []string
input map[string]string
expectedOutput string
}{
{
name: "Test AnonymizeData",
anonDataDir: "../../examples/anon_data",
inputFile: "../../examples/logs/example_logs.csv",
expectedOutput: []string{"{\"@timestamp\": \"2024-06-05T14:59:27.000+00:00\", \"msg.src_ip\":\"10.10.10.1\", \"username\":\"miloslav.illes\", \"organization\":\"Microsoft\"}"},
input: map[string]string{"@timestamp": "2024-06-05T14:59:27.000+00:00", "msg.src_ip": "10.10.10.1", "msg.username": "miloslav.illes", "msg.organization": "Microsoft", "raw": "2024-06-05T14:59:27.000+00:00, 10.10.10.1, miloslav.illes, Microsoft"},
expectedOutput: "2024-06-05T14:59:27.000+00:00, 10.20.0.53, ladislav.dosek, Apple",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fieldNames, csvData, err := parser.ParseCSV(tt.inputFile)
if err != nil {
t.Fatalf("reading input file %s: %v", tt.inputFile, err)
}

anonData, err := parser.ParseAnonData(tt.anonDataDir, fieldNames)
anonData, err := parser.LoadAnonData(tt.anonDataDir)
if err != nil {
t.Fatalf("loading anonymizing data from dir %s: %v", tt.anonDataDir, err)
}

anonymizer := New(csvData, anonData)
anonymizer.setRandFunc(func(int) int { return 0 })
output := anonymizer.anonymize()
anonymizer := New(anonData)
// Disabling randomization so we know which values to expect
anonymizer.setRandFunc(func(int) int { return 1 })
output := anonymizer.Anonymize(tt.input)

assert.Equal(t, tt.expectedOutput, output)
})
Expand Down
42 changes: 42 additions & 0 deletions internal/anonymizer/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package anonymizer

import (
"encoding/csv"
"fmt"
"io"
"os"
)

// AnonymizeCSV takes a CSV file containing logs and transforms it into a list of maps, where each map entry represents a log line.
// Such format is required to be able to modify log data (replace original values with anonymous values).
// It is also returning names of the CSV columns. Names of the columns (field names) are needed to grab corresponding anonymization data.
func AnonymizeCSV(input *os.File, output io.Writer, anonymizer *Anonymizer) error {
csvReader := csv.NewReader(input)

// First element of the csvReader contains field names
fieldNames, err := csvReader.Read()
if err != nil {
return err
}

for {
row, err := csvReader.Read()
if err != nil {
break
}

logLine := make(map[string]string)
for i, val := range row {
logLine[fieldNames[i]] = val
}

anonymizedLogLine := anonymizer.Anonymize(logLine)

_, err = io.WriteString(output, anonymizedLogLine)
if err != nil {
return fmt.Errorf("writing anonymized data: %v", err)
}
}

return nil
}
53 changes: 53 additions & 0 deletions internal/anonymizer/csv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package anonymizer

import (
"bytes"
"os"
"testing"

"github.com/logmanager-oss/logveil/internal/parser"
"github.com/stretchr/testify/assert"
)

func TestAnonimizer_CSVloader(t *testing.T) {
tests := []struct {
name string
inputFilename string
outputFilename string
anonDataDir string
expectedOutput string
}{
{
name: "Test CSVLoader",
inputFilename: "../../examples/logs/example_logs.csv",
anonDataDir: "../../examples/anon_data",
expectedOutput: "{\"@timestamp\": \"2024-06-05T14:59:27.000+00:00\", \"msg.src_ip\":\"10.20.0.53\", \"username\":\"ladislav.dosek\", \"organization\":\"Apple\"}",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
input, err := os.Open(tt.inputFilename)
if err != nil {
t.Fatal(err)
}
defer input.Close()

var output bytes.Buffer

anonData, err := parser.LoadAnonData(tt.anonDataDir)
if err != nil {
t.Fatal(err)
}
anonymizer := New(anonData)
// Disabling randomization so we know which values to expect
anonymizer.setRandFunc(func(int) int { return 1 })

err = AnonymizeCSV(input, &output, anonymizer)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, tt.expectedOutput, output.String())
})
}
}
50 changes: 32 additions & 18 deletions internal/anonymizer/runner.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,58 @@
package anonymizer

import (
"fmt"
"io"
"log/slog"
"os"

"github.com/logmanager-oss/logveil/internal/flags"
"github.com/logmanager-oss/logveil/internal/parser"
"github.com/logmanager-oss/logveil/internal/writer"
)

func Run() {
slog.Info("Anonymization process started...")

anonDataDir, inputFile, outputFile := flags.Load()
anonDataDir, inputFilename, outputFilename, enableLMexport := flags.Load()

fieldNames, csvData, err := parser.ParseCSV(inputFile)
input, err := os.Open(inputFilename)
if err != nil {
slog.Error("reading input file %s: %v", inputFile, err)
return
}
defer func(fs *os.File) {
if err := fs.Close(); err != nil {
slog.Error(err.Error())
}
}(input)

var output io.Writer
if outputFilename != "" {
output, err = os.Create(outputFilename)
if err != nil {
return
}
defer func(fs *os.File) {
if err := fs.Close(); err != nil {
slog.Error(err.Error())
}
}(output.(*os.File))
} else {
output = os.Stdout
}

anonData, err := parser.ParseAnonData(anonDataDir, fieldNames)
anonData, err := parser.LoadAnonData(anonDataDir)
if err != nil {
slog.Error("loading anonymizing data from dir %s: %v", anonDataDir, err)
return
}
anonymizer := New(anonData)

anonymizer := New(csvData, anonData)
anonymizedData := anonymizer.anonymize()
if outputFile != "" {
outputwriter := &writer.Output{
Output: anonymizedData,
}
err := outputwriter.Write(outputFile)
if enableLMexport {
err := AnonymizeCSV(input, output, anonymizer)
if err != nil {
slog.Error("writing anonymized data to output file %s: %v", outputFile, err)
slog.Error("reading input file %s: %v", inputFilename, err)
return
}
} else {
fmt.Println(anonymizedData)
}

slog.Info("All done. Exiting...")
slog.Info("All done. Exiting...")
}
}
6 changes: 4 additions & 2 deletions internal/flags/initalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log/slog"
)

func Load() (string, string, string) {
func Load() (string, string, string, bool) {
var anonDataDir input
flag.Var(&anonDataDir, "d", "Path to directory with anonymizing data")

Expand All @@ -22,5 +22,7 @@ func Load() (string, string, string) {
slog.SetLogLoggerLevel(slog.LevelDebug)
}

return anonDataDir.String(), inputFile.String(), outputFile.String()
var enableLMexport = flag.Bool("e", false, "Change input file type to LM export (default input file type is LM Backup)")

return anonDataDir.String(), inputFile.String(), outputFile.String(), *enableLMexport
}
34 changes: 14 additions & 20 deletions internal/parser/anondataparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,36 @@ package parser

import (
"bufio"
"errors"
"fmt"
"io/fs"
"log"
"log/slog"
"os"
"path/filepath"
)

// ParseAnonData reads text files from provided directory based on provided field names.
// LoadAnonData reads text files from provided directory based on provided field names.
// In other words if file name matches one of the provided field names, it is loaded into the map[fieldName][]anonymizationValues.
// Returned map will be used in anonymization process to match original values with corresponding anonymization values.
func ParseAnonData(anonDataDir string, fieldNames []string) (map[string][]string, error) {
func LoadAnonData(anonDataDir string) (map[string][]string, error) {
var anonData = make(map[string][]string)

for i := range fieldNames {
if fieldNames[i] == "raw" {
continue
}
files, err := os.ReadDir(anonDataDir)
if err != nil {
log.Fatal(err)
}

filename := filepath.Join(anonDataDir, fieldNames[i])
_, err := os.Stat(filename)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
slog.Debug(fmt.Sprintf("Anonymizing data not found for field %s. Skipping.\n", fieldNames[i]))
continue
}
return nil, err
for _, file := range files {
if file.IsDir() {
continue
}

data, err := loadAnonymizingData(filename)
data, err := loadAnonymizingData(filepath.Join(anonDataDir, file.Name()))
if err != nil {
return nil, fmt.Errorf("loading anonymizing data from file %s: %v", filename, err)
return nil, fmt.Errorf("loading anonymizing data from file %s: %v", file.Name(), err)
}

anonData[fieldNames[i]] = data
slog.Debug(fmt.Sprintf("Loaded anonymizing data for field: %s; values loaded: %d\n", fieldNames[i], len(data)))
anonData[file.Name()] = data
slog.Debug(fmt.Sprintf("Loaded anonymizing data for field: %s; values loaded: %d\n", file.Name(), len(data)))
}

return anonData, nil
Expand Down
Loading

0 comments on commit 771373e

Please sign in to comment.