Skip to content

Commit

Permalink
Adds option to run transform in two steps
Browse files Browse the repository at this point in the history
  • Loading branch information
cuducos committed Oct 18, 2024
1 parent 2ae1f78 commit 2ff36be
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 146 deletions.
22 changes: 18 additions & 4 deletions cmd/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,27 @@ import (
const transformHelper = `
Convert the CSV files from the Federal Revenue for venues (ESTABELE group of
files) into records in the database, 1 record per CNPJ, joining information
from all other source CSV files.`
from all other source CSV files.
The transformation process is divided into two steps:
1. Load relational data to a key-value store
2. Load the full database using the key-value store
If no specific step is specified, both steps will be executed by default, and
the key-value store is automaically deleted at the end.
If used with --step-one, the path to the key-value is printed to the stdout,
and it is NOT deleted at the end. This is the path expected as an argument
to --step-two.
`

var (
maxParallelDBQueries int
batchSize int
cleanUp bool
noPrivacy bool
highMemory bool
stepOne bool
stepTwo string
)

var transformCmd = &cobra.Command{
Expand Down Expand Up @@ -45,7 +58,7 @@ var transformCmd = &cobra.Command{
return err
}
}
return transform.Transform(dir, &pg, maxParallelDBQueries, batchSize, !noPrivacy, highMemory)
return transform.Transform(dir, &pg, maxParallelDBQueries, batchSize, !noPrivacy, stepOne, stepTwo)
},
}

Expand All @@ -62,6 +75,7 @@ func transformCLI() *cobra.Command {
transformCmd.Flags().IntVarP(&batchSize, "batch-size", "b", transform.BatchSize, "size of the batch to save to the database")
transformCmd.Flags().BoolVarP(&cleanUp, "clean-up", "c", cleanUp, "drop & recreate the database table before starting")
transformCmd.Flags().BoolVarP(&noPrivacy, "no-privacy", "p", noPrivacy, "include email addresses, CPF and other PII in the JSON data")
transformCmd.Flags().BoolVarP(&highMemory, "high-memory", "x", highMemory, "high memory availability mode, faster but requires a lot of free RAM")
transformCmd.Flags().BoolVarP(&stepOne, "step-one", "1", stepOne, "load relational data to a key-value store")
transformCmd.Flags().StringVarP(&stepTwo, "step-two", "2", stepTwo, "path to the key-value store from step 1 to load the full database")
return transformCmd
}
2 changes: 0 additions & 2 deletions transform/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/dgraph-io/badger/v4"
)

const badgerFilePrefix = "minha-receita-badger-"

func keyForPartners(n string) string { return fmt.Sprintf("partners%s", n) }
func keyForBase(n string) string { return fmt.Sprintf("base%s", n) }
func keyForTaxes(n string) string { return fmt.Sprintf("taxes%s", n) }
Expand Down
196 changes: 90 additions & 106 deletions transform/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,8 @@ import (

const testBaseCNPJ = "12345678"

func newTestBadgerDB(t *testing.T, inMem bool) *badger.DB {
var opt badger.Options
if !inMem {
opt = badger.DefaultOptions(t.TempDir())
} else {
opt = badger.DefaultOptions("").WithInMemory(inMem)
}
func newTestBadgerDB(t *testing.T) *badger.DB {
opt := badger.DefaultOptions(t.TempDir())
db, err := badger.Open(opt)
if err != nil {
t.Fatal("could not create a badger database")
Expand Down Expand Up @@ -91,112 +86,101 @@ func TestMergePartners(t *testing.T) {
k := []byte(testBaseCNPJ)
p := newTestPartner()
v := toBytes(t, p)
for _, inMem := range []bool{true, false} {
for _, tc := range []struct {
existing []partnerData
expected []partnerData
}{
{nil, []partnerData{p}},
{[]partnerData{testPartner1}, []partnerData{testPartner1, p}},
{[]partnerData{testPartner1, testPartner2}, []partnerData{testPartner1, testPartner2, p}},
} {
n := "in disk"
if inMem {
n = "in memory"
}
t.Run(fmt.Sprintf("merging to %d partners %s", len(tc.existing), n), func(t *testing.T) {
db := newTestBadgerDB(t, inMem)
defer db.Close()
if tc.existing != nil {
db.Update(func(tx *badger.Txn) error {
if err := tx.Set(k, toBytes(t, tc.existing)); err != nil {
t.Fatalf("error setting existing partners %v: %s", tc.existing, err)
}
return nil
})
}
m, err := mergePartners(db, k, v)
if err != nil {
t.Errorf("expected no error merging partners, got %s", err)
}
var got []partnerData
if err := json.Unmarshal(m, &got); err != nil {
t.Errorf("could not parse merged partners: %s", err)
}
if !reflect.DeepEqual(got, tc.expected) {
t.Errorf("expected merged partners to be %v, got %v", tc.expected, got)
}
})
}
}
}

func TestSaveAndReadItems(t *testing.T) {
for _, inMem := range []bool{true, false} {
n := "in disk"
if inMem {
n = "in memory"
}

t.Run(fmt.Sprintf("partners %s", n), func(t *testing.T) {
p := newTestPartner()
db := newTestBadgerDB(t, inMem)
for _, tc := range []struct {
existing []partnerData
expected []partnerData
}{
{nil, []partnerData{p}},
{[]partnerData{testPartner1}, []partnerData{testPartner1, p}},
{[]partnerData{testPartner1, testPartner2}, []partnerData{testPartner1, testPartner2, p}},
} {
t.Run(fmt.Sprintf("merging to %d partners", len(tc.existing)), func(t *testing.T) {
db := newTestBadgerDB(t)
defer db.Close()
err := saveItem(
db, partners,
[]byte(keyForPartners(testBaseCNPJ)),
toBytes(t, p),
)
if err != nil {
t.Errorf("expected no error saving partner, got %s", err)
if tc.existing != nil {
db.Update(func(tx *badger.Txn) error {
if err := tx.Set(k, toBytes(t, tc.existing)); err != nil {
t.Fatalf("error setting existing partners %v: %s", tc.existing, err)
}
return nil
})
}
got, err := partnersOf(db, testBaseCNPJ)
m, err := mergePartners(db, k, v)
if err != nil {
t.Errorf("expected no error reading partners, got %s", err)
t.Errorf("expected no error merging partners, got %s", err)
}
if len(got) != 1 {
t.Errorf("expected merged partnes to have 1 partger, got %d", len(got))
return
var got []partnerData
if err := json.Unmarshal(m, &got); err != nil {
t.Errorf("could not parse merged partners: %s", err)
}
if !reflect.DeepEqual(got[0], p) {
t.Errorf("expected merged partner to be %v, got %v", p, got[0])
if !reflect.DeepEqual(got, tc.expected) {
t.Errorf("expected merged partners to be %v, got %v", tc.expected, got)
}
})
}

t.Run(fmt.Sprintf("base %s", n), func(t *testing.T) {
db := newTestBadgerDB(t, inMem)
defer db.Close()
d := newTestBaseCNPJ()
v := toBytes(t, d)
err := saveItem(db, base, []byte(keyForBase(testBaseCNPJ)), v)
if err != nil {
t.Errorf("expected no error saving partner, got %s", err)
}
got, err := baseOf(db, testBaseCNPJ)
if err != nil {
t.Errorf("expected no error reading base, got %s", err)
}
if !reflect.DeepEqual(got, d) {
t.Errorf("expected %v, got %v", d, got)
}
})
}

func TestSaveAndReadItems(t *testing.T) {
t.Run("partners", func(t *testing.T) {
p := newTestPartner()
db := newTestBadgerDB(t)
defer db.Close()
err := saveItem(
db, partners,
[]byte(keyForPartners(testBaseCNPJ)),
toBytes(t, p),
)
if err != nil {
t.Errorf("expected no error saving partner, got %s", err)
}
got, err := partnersOf(db, testBaseCNPJ)
if err != nil {
t.Errorf("expected no error reading partners, got %s", err)
}
if len(got) != 1 {
t.Errorf("expected merged partnes to have 1 partger, got %d", len(got))
return
}
if !reflect.DeepEqual(got[0], p) {
t.Errorf("expected merged partner to be %v, got %v", p, got[0])
}
})

t.Run("base", func(t *testing.T) {
db := newTestBadgerDB(t)
defer db.Close()
d := newTestBaseCNPJ()
v := toBytes(t, d)
err := saveItem(db, base, []byte(keyForBase(testBaseCNPJ)), v)
if err != nil {
t.Errorf("expected no error saving partner, got %s", err)
}
got, err := baseOf(db, testBaseCNPJ)
if err != nil {
t.Errorf("expected no error reading base, got %s", err)
}
if !reflect.DeepEqual(got, d) {
t.Errorf("expected %v, got %v", d, got)
}
})

t.Run("taxes", func(t *testing.T) {
db := newTestBadgerDB(t)
defer db.Close()
d := newTestTaxes()
v := toBytes(t, d)
err := saveItem(db, taxes, []byte(keyForTaxes(testBaseCNPJ)), v)
if err != nil {
t.Errorf("expected no error saving partner, got %s", err)
}
got, err := taxesOf(db, testBaseCNPJ)
if err != nil {
t.Errorf("expected no error reading taxes, got %s", err)
}
if !reflect.DeepEqual(got, d) {
t.Errorf("expected %v, got %v", d, got)
}
})

t.Run(fmt.Sprintf("taxes %s", n), func(t *testing.T) {
db := newTestBadgerDB(t, inMem)
defer db.Close()
d := newTestTaxes()
v := toBytes(t, d)
err := saveItem(db, taxes, []byte(keyForTaxes(testBaseCNPJ)), v)
if err != nil {
t.Errorf("expected no error saving partner, got %s", err)
}
got, err := taxesOf(db, testBaseCNPJ)
if err != nil {
t.Errorf("expected no error reading taxes, got %s", err)
}
if !reflect.DeepEqual(got, d) {
t.Errorf("expected %v, got %v", d, got)
}
})
}
}
16 changes: 14 additions & 2 deletions transform/company_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package transform

import (
"fmt"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -106,7 +108,12 @@ func TestNewCompany(t *testing.T) {
}

t.Run("with privacy", func(t *testing.T) {
kv, err := newBadgerStorage(false)
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
defer os.RemoveAll(tmp)
kv, err := newBadgerStorage(tmp)
if err != nil {
t.Errorf("expected no error creating badger, got %s", err)
}
Expand Down Expand Up @@ -257,7 +264,12 @@ func TestNewCompany(t *testing.T) {
}
})
t.Run("without privacy", func(t *testing.T) {
kv, err := newBadgerStorage(true)
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
defer os.RemoveAll(tmp)
kv, err := newBadgerStorage(tmp)
if err != nil {
t.Errorf("expected no error creating badger, got %s", err)
}
Expand Down
17 changes: 4 additions & 13 deletions transform/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,22 +179,13 @@ func (*badgerLogger) Warningf(string, ...interface{}) {}
func (*badgerLogger) Infof(string, ...interface{}) {}
func (*badgerLogger) Debugf(string, ...interface{}) {}

func newBadgerStorage(m bool) (*badgerStorage, error) {
var dir string
func newBadgerStorage(dir string) (*badgerStorage, error) {
var err error
var opt badger.Options
if m {
opt = badger.DefaultOptions("").WithInMemory(m)
} else {
dir, err = os.MkdirTemp("", fmt.Sprintf("%s-%s",badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
return nil, fmt.Errorf("error creating temporary key-value storage: %w", err)
}
if os.Getenv("DEBUG") != "" {
log.Output(1, fmt.Sprintf("Creating temporary key-value storage at %s", dir))
}
opt = badger.DefaultOptions(dir)
if os.Getenv("DEBUG") != "" {
log.Output(1, fmt.Sprintf("Creating temporary key-value storage at %s", dir))
}
opt = badger.DefaultOptions(dir)
db, err := badger.Open(opt.WithLogger(&badgerLogger{}))
if err != nil {
return nil, fmt.Errorf("error creating badger key-value object: %w", err)
Expand Down
22 changes: 19 additions & 3 deletions transform/kv_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package transform

import (
"fmt"
"os"
"testing"
"time"
Expand All @@ -9,7 +10,12 @@ import (
)

func TestBadgerStorageClose(t *testing.T) {
kv, err := newBadgerStorage(false)
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
defer os.RemoveAll(tmp)
kv, err := newBadgerStorage(tmp)
if err != nil {
t.Errorf("expected no error creating badger storage, got %s", err)
}
Expand Down Expand Up @@ -59,7 +65,12 @@ func TestLoad(t *testing.T) {
if err != nil {
t.Fatalf("could not create lookups: %s", err)
}
kv, err := newBadgerStorage(true)
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
defer os.RemoveAll(tmp)
kv, err := newBadgerStorage(tmp)
if err != nil {
t.Fatalf("could not create badger storage: %s", err)
}
Expand All @@ -83,7 +94,12 @@ func TestEnrichCompany(t *testing.T) {
if err != nil {
t.Fatalf("could not create lookups: %s", err)
}
kv, err := newBadgerStorage(true)
tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405")))
if err != nil {
t.Fatal("error creating temporary key-value storage: %w", err)
}
defer os.RemoveAll(tmp)
kv, err := newBadgerStorage(tmp)
if err != nil {
t.Fatalf("could not create badger storage: %s", err)
}
Expand Down
Loading

0 comments on commit 2ff36be

Please sign in to comment.