diff --git a/cmd/transform.go b/cmd/transform.go index 75a2367..f033236 100644 --- a/cmd/transform.go +++ b/cmd/transform.go @@ -9,14 +9,27 @@ import ( const transformHelper = ` Convert the CSV files from the Federal Revenue for venues (ESTABELE group of files) into records in the database, 1 record per CNPJ, joining information -from all other source CSV files.` +from all other source CSV files. + +The transformation process is divided into two steps: +1. Load relational data to a key-value store +2. Load the full database using the key-value store + +If no specific step is specified, both steps will be executed by default, and +the key-value store is automaically deleted at the end. + +If used with --step-one, the path to the key-value is printed to the stdout, +and it is NOT deleted at the end. This is the path expected as an argument +to --step-two. +` var ( maxParallelDBQueries int batchSize int cleanUp bool noPrivacy bool - highMemory bool + stepOne bool + stepTwo string ) var transformCmd = &cobra.Command{ @@ -45,7 +58,7 @@ var transformCmd = &cobra.Command{ return err } } - return transform.Transform(dir, &pg, maxParallelDBQueries, batchSize, !noPrivacy, highMemory) + return transform.Transform(dir, &pg, maxParallelDBQueries, batchSize, !noPrivacy, stepOne, stepTwo) }, } @@ -62,6 +75,7 @@ func transformCLI() *cobra.Command { transformCmd.Flags().IntVarP(&batchSize, "batch-size", "b", transform.BatchSize, "size of the batch to save to the database") transformCmd.Flags().BoolVarP(&cleanUp, "clean-up", "c", cleanUp, "drop & recreate the database table before starting") transformCmd.Flags().BoolVarP(&noPrivacy, "no-privacy", "p", noPrivacy, "include email addresses, CPF and other PII in the JSON data") - transformCmd.Flags().BoolVarP(&highMemory, "high-memory", "x", highMemory, "high memory availability mode, faster but requires a lot of free RAM") + transformCmd.Flags().BoolVarP(&stepOne, "step-one", "1", stepOne, "load relational data to a key-value store") + transformCmd.Flags().StringVarP(&stepTwo, "step-two", "2", stepTwo, "path to the key-value store from step 1 to load the full database") return transformCmd } diff --git a/transform/badger.go b/transform/badger.go index 4696268..ad8ae76 100644 --- a/transform/badger.go +++ b/transform/badger.go @@ -8,8 +8,6 @@ import ( "github.com/dgraph-io/badger/v4" ) -const badgerFilePrefix = "minha-receita-badger-" - func keyForPartners(n string) string { return fmt.Sprintf("partners%s", n) } func keyForBase(n string) string { return fmt.Sprintf("base%s", n) } func keyForTaxes(n string) string { return fmt.Sprintf("taxes%s", n) } diff --git a/transform/badger_test.go b/transform/badger_test.go index 17a4956..b442acf 100644 --- a/transform/badger_test.go +++ b/transform/badger_test.go @@ -11,13 +11,8 @@ import ( const testBaseCNPJ = "12345678" -func newTestBadgerDB(t *testing.T, inMem bool) *badger.DB { - var opt badger.Options - if !inMem { - opt = badger.DefaultOptions(t.TempDir()) - } else { - opt = badger.DefaultOptions("").WithInMemory(inMem) - } +func newTestBadgerDB(t *testing.T) *badger.DB { + opt := badger.DefaultOptions(t.TempDir()) db, err := badger.Open(opt) if err != nil { t.Fatal("could not create a badger database") @@ -91,112 +86,101 @@ func TestMergePartners(t *testing.T) { k := []byte(testBaseCNPJ) p := newTestPartner() v := toBytes(t, p) - for _, inMem := range []bool{true, false} { - for _, tc := range []struct { - existing []partnerData - expected []partnerData - }{ - {nil, []partnerData{p}}, - {[]partnerData{testPartner1}, []partnerData{testPartner1, p}}, - {[]partnerData{testPartner1, testPartner2}, []partnerData{testPartner1, testPartner2, p}}, - } { - n := "in disk" - if inMem { - n = "in memory" - } - t.Run(fmt.Sprintf("merging to %d partners %s", len(tc.existing), n), func(t *testing.T) { - db := newTestBadgerDB(t, inMem) - defer db.Close() - if tc.existing != nil { - db.Update(func(tx *badger.Txn) error { - if err := tx.Set(k, toBytes(t, tc.existing)); err != nil { - t.Fatalf("error setting existing partners %v: %s", tc.existing, err) - } - return nil - }) - } - m, err := mergePartners(db, k, v) - if err != nil { - t.Errorf("expected no error merging partners, got %s", err) - } - var got []partnerData - if err := json.Unmarshal(m, &got); err != nil { - t.Errorf("could not parse merged partners: %s", err) - } - if !reflect.DeepEqual(got, tc.expected) { - t.Errorf("expected merged partners to be %v, got %v", tc.expected, got) - } - }) - } - } -} - -func TestSaveAndReadItems(t *testing.T) { - for _, inMem := range []bool{true, false} { - n := "in disk" - if inMem { - n = "in memory" - } - - t.Run(fmt.Sprintf("partners %s", n), func(t *testing.T) { - p := newTestPartner() - db := newTestBadgerDB(t, inMem) + for _, tc := range []struct { + existing []partnerData + expected []partnerData + }{ + {nil, []partnerData{p}}, + {[]partnerData{testPartner1}, []partnerData{testPartner1, p}}, + {[]partnerData{testPartner1, testPartner2}, []partnerData{testPartner1, testPartner2, p}}, + } { + t.Run(fmt.Sprintf("merging to %d partners", len(tc.existing)), func(t *testing.T) { + db := newTestBadgerDB(t) defer db.Close() - err := saveItem( - db, partners, - []byte(keyForPartners(testBaseCNPJ)), - toBytes(t, p), - ) - if err != nil { - t.Errorf("expected no error saving partner, got %s", err) + if tc.existing != nil { + db.Update(func(tx *badger.Txn) error { + if err := tx.Set(k, toBytes(t, tc.existing)); err != nil { + t.Fatalf("error setting existing partners %v: %s", tc.existing, err) + } + return nil + }) } - got, err := partnersOf(db, testBaseCNPJ) + m, err := mergePartners(db, k, v) if err != nil { - t.Errorf("expected no error reading partners, got %s", err) + t.Errorf("expected no error merging partners, got %s", err) } - if len(got) != 1 { - t.Errorf("expected merged partnes to have 1 partger, got %d", len(got)) - return + var got []partnerData + if err := json.Unmarshal(m, &got); err != nil { + t.Errorf("could not parse merged partners: %s", err) } - if !reflect.DeepEqual(got[0], p) { - t.Errorf("expected merged partner to be %v, got %v", p, got[0]) + if !reflect.DeepEqual(got, tc.expected) { + t.Errorf("expected merged partners to be %v, got %v", tc.expected, got) } }) + } - t.Run(fmt.Sprintf("base %s", n), func(t *testing.T) { - db := newTestBadgerDB(t, inMem) - defer db.Close() - d := newTestBaseCNPJ() - v := toBytes(t, d) - err := saveItem(db, base, []byte(keyForBase(testBaseCNPJ)), v) - if err != nil { - t.Errorf("expected no error saving partner, got %s", err) - } - got, err := baseOf(db, testBaseCNPJ) - if err != nil { - t.Errorf("expected no error reading base, got %s", err) - } - if !reflect.DeepEqual(got, d) { - t.Errorf("expected %v, got %v", d, got) - } - }) +} + +func TestSaveAndReadItems(t *testing.T) { + t.Run("partners", func(t *testing.T) { + p := newTestPartner() + db := newTestBadgerDB(t) + defer db.Close() + err := saveItem( + db, partners, + []byte(keyForPartners(testBaseCNPJ)), + toBytes(t, p), + ) + if err != nil { + t.Errorf("expected no error saving partner, got %s", err) + } + got, err := partnersOf(db, testBaseCNPJ) + if err != nil { + t.Errorf("expected no error reading partners, got %s", err) + } + if len(got) != 1 { + t.Errorf("expected merged partnes to have 1 partger, got %d", len(got)) + return + } + if !reflect.DeepEqual(got[0], p) { + t.Errorf("expected merged partner to be %v, got %v", p, got[0]) + } + }) + + t.Run("base", func(t *testing.T) { + db := newTestBadgerDB(t) + defer db.Close() + d := newTestBaseCNPJ() + v := toBytes(t, d) + err := saveItem(db, base, []byte(keyForBase(testBaseCNPJ)), v) + if err != nil { + t.Errorf("expected no error saving partner, got %s", err) + } + got, err := baseOf(db, testBaseCNPJ) + if err != nil { + t.Errorf("expected no error reading base, got %s", err) + } + if !reflect.DeepEqual(got, d) { + t.Errorf("expected %v, got %v", d, got) + } + }) + + t.Run("taxes", func(t *testing.T) { + db := newTestBadgerDB(t) + defer db.Close() + d := newTestTaxes() + v := toBytes(t, d) + err := saveItem(db, taxes, []byte(keyForTaxes(testBaseCNPJ)), v) + if err != nil { + t.Errorf("expected no error saving partner, got %s", err) + } + got, err := taxesOf(db, testBaseCNPJ) + if err != nil { + t.Errorf("expected no error reading taxes, got %s", err) + } + if !reflect.DeepEqual(got, d) { + t.Errorf("expected %v, got %v", d, got) + } + }) - t.Run(fmt.Sprintf("taxes %s", n), func(t *testing.T) { - db := newTestBadgerDB(t, inMem) - defer db.Close() - d := newTestTaxes() - v := toBytes(t, d) - err := saveItem(db, taxes, []byte(keyForTaxes(testBaseCNPJ)), v) - if err != nil { - t.Errorf("expected no error saving partner, got %s", err) - } - got, err := taxesOf(db, testBaseCNPJ) - if err != nil { - t.Errorf("expected no error reading taxes, got %s", err) - } - if !reflect.DeepEqual(got, d) { - t.Errorf("expected %v, got %v", d, got) - } - }) - } } diff --git a/transform/company_test.go b/transform/company_test.go index 2ade51d..b3f8ce5 100644 --- a/transform/company_test.go +++ b/transform/company_test.go @@ -1,6 +1,8 @@ package transform import ( + "fmt" + "os" "strings" "testing" "time" @@ -106,7 +108,12 @@ func TestNewCompany(t *testing.T) { } t.Run("with privacy", func(t *testing.T) { - kv, err := newBadgerStorage(false) + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) + if err != nil { + t.Fatal("error creating temporary key-value storage: %w", err) + } + defer os.RemoveAll(tmp) + kv, err := newBadgerStorage(tmp) if err != nil { t.Errorf("expected no error creating badger, got %s", err) } @@ -257,7 +264,12 @@ func TestNewCompany(t *testing.T) { } }) t.Run("without privacy", func(t *testing.T) { - kv, err := newBadgerStorage(true) + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) + if err != nil { + t.Fatal("error creating temporary key-value storage: %w", err) + } + defer os.RemoveAll(tmp) + kv, err := newBadgerStorage(tmp) if err != nil { t.Errorf("expected no error creating badger, got %s", err) } diff --git a/transform/kv.go b/transform/kv.go index 4b4eb50..c4976db 100644 --- a/transform/kv.go +++ b/transform/kv.go @@ -179,22 +179,13 @@ func (*badgerLogger) Warningf(string, ...interface{}) {} func (*badgerLogger) Infof(string, ...interface{}) {} func (*badgerLogger) Debugf(string, ...interface{}) {} -func newBadgerStorage(m bool) (*badgerStorage, error) { - var dir string +func newBadgerStorage(dir string) (*badgerStorage, error) { var err error var opt badger.Options - if m { - opt = badger.DefaultOptions("").WithInMemory(m) - } else { - dir, err = os.MkdirTemp("", fmt.Sprintf("%s-%s",badgerFilePrefix, time.Now().Format("20060102150405"))) - if err != nil { - return nil, fmt.Errorf("error creating temporary key-value storage: %w", err) - } - if os.Getenv("DEBUG") != "" { - log.Output(1, fmt.Sprintf("Creating temporary key-value storage at %s", dir)) - } - opt = badger.DefaultOptions(dir) + if os.Getenv("DEBUG") != "" { + log.Output(1, fmt.Sprintf("Creating temporary key-value storage at %s", dir)) } + opt = badger.DefaultOptions(dir) db, err := badger.Open(opt.WithLogger(&badgerLogger{})) if err != nil { return nil, fmt.Errorf("error creating badger key-value object: %w", err) diff --git a/transform/kv_test.go b/transform/kv_test.go index 79994d5..9a62432 100644 --- a/transform/kv_test.go +++ b/transform/kv_test.go @@ -1,6 +1,7 @@ package transform import ( + "fmt" "os" "testing" "time" @@ -9,7 +10,12 @@ import ( ) func TestBadgerStorageClose(t *testing.T) { - kv, err := newBadgerStorage(false) + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) + if err != nil { + t.Fatal("error creating temporary key-value storage: %w", err) + } + defer os.RemoveAll(tmp) + kv, err := newBadgerStorage(tmp) if err != nil { t.Errorf("expected no error creating badger storage, got %s", err) } @@ -59,7 +65,12 @@ func TestLoad(t *testing.T) { if err != nil { t.Fatalf("could not create lookups: %s", err) } - kv, err := newBadgerStorage(true) + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) + if err != nil { + t.Fatal("error creating temporary key-value storage: %w", err) + } + defer os.RemoveAll(tmp) + kv, err := newBadgerStorage(tmp) if err != nil { t.Fatalf("could not create badger storage: %s", err) } @@ -83,7 +94,12 @@ func TestEnrichCompany(t *testing.T) { if err != nil { t.Fatalf("could not create lookups: %s", err) } - kv, err := newBadgerStorage(true) + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) + if err != nil { + t.Fatal("error creating temporary key-value storage: %w", err) + } + defer os.RemoveAll(tmp) + kv, err := newBadgerStorage(tmp) if err != nil { t.Fatalf("could not create badger storage: %s", err) } diff --git a/transform/transform.go b/transform/transform.go index 495d66b..9f9ccfa 100644 --- a/transform/transform.go +++ b/transform/transform.go @@ -1,21 +1,27 @@ package transform import ( + "errors" "fmt" "log" "os" "path/filepath" + "time" "github.com/cuducos/minha-receita/download" ) -// MaxParallelDBQueries is the default for maximum number of parallels save -// queries sent to the database -const MaxParallelDBQueries = 8 +const ( + // MaxParallelDBQueries is the default for maximum number of parallels save + // queries sent to the database + MaxParallelDBQueries = 8 -// BatchSize determines the size of the batches used to create the initial JSON -// data in the database. -const BatchSize = 8192 + // BatchSize determines the size of the batches used to create the initial JSON + // data in the database. + BatchSize = 8192 + + badgerFilePrefix = "minha-receita-badger-" +) type database interface { PreLoad() error @@ -30,6 +36,34 @@ type kvStorage interface { close() error } +type mode int + +const ( + // runs the two steps and cleans up temporary key-value directory + // from step one + both mode = iota + + // loads relational data to a key-value store and does not delete it from + // the temporary directory + stepOne + + // expects a path to the directory created in StepOne and use it to persist + // data to PostgreSQL + stepTwo +) + +func transformMode(s1 bool, s2 string) (mode, error) { + switch { + case s1 && s2 != "": + return both, errors.New("cannot use both --step-one and --step-two") + case s1 && s2 == "": + return stepOne, nil + case !s1 && s2 != "": + return stepTwo, nil + } + return both, nil +} + func saveUpdatedAt(db database, dir string) error { log.Output(1, "Saving the updated at date to the database…") p := filepath.Join(dir, download.FederalRevenueUpdatedAt) @@ -41,20 +75,31 @@ func saveUpdatedAt(db database, dir string) error { return db.MetaSave("updated-at", string(v)) } -// Transform the downloaded files for company venues creating a database record -// per CNPJ -func Transform(dir string, db database, maxParallelDBQueries, batchSize int, privacy, mem bool) error { - l, err := newLookups(dir) +func runStepOne(dir string, l lookups, isolated bool) (string, error) { + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) if err != nil { - return fmt.Errorf("error creating look up tables from %s: %w", dir, err) + return "", fmt.Errorf("error creating temporary key-value storage: %w", err) } - kv, err := newBadgerStorage(mem) + kv, err := newBadgerStorage(tmp) if err != nil { - return fmt.Errorf("could not create badger storage: %w", err) + return "", fmt.Errorf("could not create badger storage: %w", err) } - defer kv.close() if err := kv.load(dir, &l); err != nil { - return fmt.Errorf("error loading data to badger: %w", err) + return "", fmt.Errorf("error loading data to badger: %w", err) + } + if isolated { + fmt.Println(kv.path) + } + return kv.path, nil +} + +func runStepTwo(dir string, tmp string, db database, l lookups, maxParallelDBQueries, batchSize int, privacy, isolated bool) error { + kv, err := newBadgerStorage(tmp) + if err != nil { + return fmt.Errorf("could not create badger storage: %w", err) + } + if !isolated { + defer kv.close() } j, err := createJSONRecordsTask(dir, db, &l, kv, batchSize, privacy) if err != nil { @@ -65,3 +110,30 @@ func Transform(dir string, db database, maxParallelDBQueries, batchSize int, pri } return saveUpdatedAt(db, dir) } + +// Transform the downloaded files for company venues creating a database record +// per CNPJ +func Transform(dir string, db database, maxParallelDBQueries, batchSize int, privacy, s1 bool, s2 string) error { + m, err := transformMode(s1, s2) + if err != nil { + return fmt.Errorf("error determining transform mode: %w", err) + } + var tmp string + l, err := newLookups(dir) + if err != nil { + return fmt.Errorf("error creating look up tables from %s: %w", dir, err) + } + if m != stepTwo { + tmp, err = runStepOne(dir, l, m == stepOne) + if err != nil { + return fmt.Errorf("error creating key-value storage: %w", err) + } + } + if m != stepOne { + if s2 != "" { + tmp = s2 + } + return runStepTwo(dir, tmp, db, l, maxParallelDBQueries, batchSize, privacy, m == stepTwo) + } + return nil +} diff --git a/transform/venues_test.go b/transform/venues_test.go index b9017a8..eee0ef1 100644 --- a/transform/venues_test.go +++ b/transform/venues_test.go @@ -1,12 +1,20 @@ package transform import ( + "fmt" + "os" "testing" + "time" ) func TestTaskRun(t *testing.T) { db := newTestDB(t) - kv, err := newBadgerStorage(false) + tmp, err := os.MkdirTemp("", fmt.Sprintf("%s-%s", badgerFilePrefix, time.Now().Format("20060102150405"))) + if err != nil { + t.Fatal("error creating temporary key-value storage: %w", err) + } + defer os.RemoveAll(tmp) + kv, err := newBadgerStorage(tmp) if err != nil { t.Errorf("expected no error creating badger, got %s", err) }