Skip to content

Commit

Permalink
implement stream reading
Browse files Browse the repository at this point in the history
  • Loading branch information
solnicki committed Nov 12, 2024
1 parent 00fe3ff commit cc41366
Show file tree
Hide file tree
Showing 7 changed files with 397 additions and 0 deletions.
67 changes: 67 additions & 0 deletions cmd/logveil/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package logveil

import (
"io"
"log/slog"
"os"

"github.com/logmanager-oss/logveil/internal/anonymizer"
"github.com/logmanager-oss/logveil/internal/flags"
"github.com/logmanager-oss/logveil/internal/inputs"
"github.com/logmanager-oss/logveil/internal/parser"
)

func Run() {
slog.Info("Anonymization process started...")

anonDataDir, inputFilename, outputFilename, enableLMexport := flags.Load()

input, err := os.Open(inputFilename)
if err != nil {
slog.Error("Opening input file", "error", err)
return
}
defer closeFile(input)

var output io.Writer
if outputFilename != "" {
output, err = os.Create(outputFilename)
if err != nil {
slog.Error("Opening input file", "error", err)
return
}
defer closeFile(output.(*os.File))
} else {
output = os.Stdout
}

anonData, err := parser.LoadAnonData(anonDataDir)
if err != nil {
slog.Error("loading anonymizing data from dir %s: %v", anonDataDir, err)
return
}
anonymizer := anonymizer.New(anonData)

if enableLMexport {
err := inputs.AnonymizeLmExport(input, output, anonymizer)
if err != nil {
slog.Error("reading lm export input file %s: %v", inputFilename, err)
return
}
} else {
err := inputs.AnonymizeLmBackup(input, output, anonymizer)
if err != nil {
slog.Error("reading lm backup input file %s: %v", inputFilename, err)
return
}
}

slog.Info("All done. Exiting...")
}

func closeFile(fs *os.File) {
err := fs.Close()
if err != nil {
slog.Error(err.Error())
}
}
64 changes: 64 additions & 0 deletions internal/inputs/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package inputs

import (
"bufio"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"os"

"github.com/logmanager-oss/logveil/internal/anonymizer"
)

type LmBackup struct {
Source LmLog `json:"_source"`
}

type LmLog struct {
Raw string `json:"raw"`
Msg map[string]interface{} `json:"msg"`
}

func AnonymizeLmBackup(input *os.File, output io.Writer, anonymizer *anonymizer.Anonymizer) error {
gzReader, err := gzip.NewReader(input)
if err != nil {
return fmt.Errorf("error creating gzip reader: %w", err)
}
defer gzReader.Close()

scanner := bufio.NewScanner(gzReader)

for scanner.Scan() {
line := scanner.Bytes()

lmBackup := &LmBackup{}
err = json.Unmarshal(line, &lmBackup)
if err != nil {
return fmt.Errorf("unmarshaling log line: %w", err)
}

// Convert map[string]interface{} to map[string]string as requred by anonymizer
logLine := make(map[string]string)
for key, value := range lmBackup.Source.Msg {
strKey := fmt.Sprintf("%v", key)
strValue := fmt.Sprintf("%v", value)

logLine[strKey] = strValue
}
logLine["raw"] = lmBackup.Source.Raw

anonymizedLogLine := anonymizer.Anonymize(logLine)

_, err = io.WriteString(output, fmt.Sprintln(anonymizedLogLine))
if err != nil {
return fmt.Errorf("writing anonymized data: %v", err)
}
}

if err := scanner.Err(); err != nil {
return fmt.Errorf("error reading input: %w", err)
}

return nil
}
54 changes: 54 additions & 0 deletions internal/inputs/backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package inputs

import (
"bytes"
"os"
"testing"

"github.com/logmanager-oss/logveil/internal/anonymizer"
"github.com/logmanager-oss/logveil/internal/parser"
"github.com/stretchr/testify/assert"
)

func TestLmBackup(t *testing.T) {
tests := []struct {
name string
inputFilename string
outputFilename string
anonDataDir string
expectedOutput string
}{
{
name: "Test Test LM Backup Anonymizer",
inputFilename: "../../examples/logs/lm-2024-06-09_0000.gz",
anonDataDir: "../../examples/anon_data",
expectedOutput: "<189>date=2024-11-06 time=12:29:25 devname=\"LM-FW-70F-Praha\" devid=\"FGT70FTK22012016\" eventtime=1730892565525108329 tz=\"+0100\" logid=\"0000000013\" type=\"traffic\" subtype=\"forward\" level=\"notice\" vd=\"root\" srcip=10.20.0.53 srcport=57158 srcintf=\"lan1\" srcintfrole=\"wan\" dstip=227.51.221.89 dstport=80 dstintf=\"lan1\" dstintfrole=\"lan\" srccountry=\"China\" dstcountry=\"Czech Republic\" sessionid=179455916 proto=6 action=\"client-rst\" policyid=9 policytype=\"policy\" poluuid=\"d8ccb3e4-74d4-51ef-69a3-73b41f46df74\" policyname=\"Gitlab web from all\" service=\"HTTP\" trandisp=\"noop\" duration=6 sentbyte=80 rcvdbyte=44 sentpkt=2 rcvdpkt=1 appcat=\"unscanned\" srchwvendor=\"H3C\" devtype=\"Router\" mastersrcmac=\"00:23:89:39:a4:ef\" srcmac=\"00:23:89:39:a4:ef\" srcserver=0 dsthwvendor=\"H3C\" dstdevtype=\"Router\" masterdstmac=\"00:23:89:39:a4:fa\" dstmac=\"00:23:89:39:a4:fa\" dstserver=0\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
input, err := os.Open(tt.inputFilename)
if err != nil {
t.Fatal(err)
}
defer input.Close()

var output bytes.Buffer

anonData, err := parser.LoadAnonData(tt.anonDataDir)
if err != nil {
t.Fatal(err)
}
anonymizer := anonymizer.New(anonData)
// Disabling randomization so we know which values to expect
anonymizer.SetRandFunc(func(int) int { return 1 })

err = AnonymizeLmBackup(input, &output, anonymizer)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, tt.expectedOutput, output.String())
})
}
}
47 changes: 47 additions & 0 deletions internal/inputs/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package inputs

import (
"encoding/csv"
"fmt"
"io"
"os"
"strings"

"github.com/logmanager-oss/logveil/internal/anonymizer"
)

func AnonymizeLmExport(input *os.File, output io.Writer, anonymizer *anonymizer.Anonymizer) error {
csvReader := csv.NewReader(input)

// First element of the csvReader contains field names
fieldNames, err := csvReader.Read()
if err != nil {
return err
}

// Trimming prefix from field names
for i, fieldName := range fieldNames {
fieldNames[i] = strings.TrimPrefix(fieldName, "msg.")
}

for {
row, err := csvReader.Read()
if err != nil {
break
}

logLine := make(map[string]string)
for i, val := range row {
logLine[fieldNames[i]] = val
}

anonymizedLogLine := anonymizer.Anonymize(logLine)

_, err = io.WriteString(output, fmt.Sprintln(anonymizedLogLine))
if err != nil {
return fmt.Errorf("writing anonymized data: %v", err)
}
}

return nil
}
54 changes: 54 additions & 0 deletions internal/inputs/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package inputs

import (
"bytes"
"os"
"testing"

"github.com/logmanager-oss/logveil/internal/anonymizer"
"github.com/logmanager-oss/logveil/internal/parser"
"github.com/stretchr/testify/assert"
)

func TestLmExport(t *testing.T) {
tests := []struct {
name string
inputFilename string
outputFilename string
anonDataDir string
expectedOutput string
}{
{
name: "Test LM Export Anonymizer",
inputFilename: "../../examples/logs/example_logs.csv",
anonDataDir: "../../examples/anon_data",
expectedOutput: "{\"@timestamp\": \"2024-06-05T14:59:27.000+00:00\", \"msg.src_ip\":\"10.20.0.53\", \"username\":\"ladislav.dosek\", \"organization\":\"Apple\"}\n",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
input, err := os.Open(tt.inputFilename)
if err != nil {
t.Fatal(err)
}
defer input.Close()

var output bytes.Buffer

anonData, err := parser.LoadAnonData(tt.anonDataDir)
if err != nil {
t.Fatal(err)
}
anonymizer := anonymizer.New(anonData)
// Disabling randomization so we know which values to expect
anonymizer.SetRandFunc(func(int) int { return 1 })

err = AnonymizeLmExport(input, &output, anonymizer)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, tt.expectedOutput, output.String())
})
}
}
54 changes: 54 additions & 0 deletions internal/parser/anondataparser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package parser

import (
"bufio"
"fmt"
"log"
"log/slog"
"os"
"path/filepath"
)

func LoadAnonData(anonDataDir string) (map[string][]string, error) {
var anonData = make(map[string][]string)

files, err := os.ReadDir(anonDataDir)
if err != nil {
log.Fatal(err)
}

for _, file := range files {
if file.IsDir() {
continue
}

data, err := loadAnonymizingData(filepath.Join(anonDataDir, file.Name()))
if err != nil {
return nil, fmt.Errorf("loading anonymizing data from file %s: %v", file.Name(), err)
}

anonData[file.Name()] = data
slog.Debug(fmt.Sprintf("Loaded anonymizing data for field: %s; values loaded: %d\n", file.Name(), len(data)))
}

return anonData, nil
}

func loadAnonymizingData(filepath string) ([]string, error) {
anonDataFile, err := os.OpenFile(filepath, os.O_RDONLY, os.ModePerm)
if err != nil {
return nil, err
}

var anonData []string
scanner := bufio.NewScanner(anonDataFile)
for scanner.Scan() {
anonData = append(anonData, scanner.Text())
}

if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error reading anon data: %w", err)
}

return anonData, anonDataFile.Close()
}
Loading

0 comments on commit cc41366

Please sign in to comment.