From 0d217b1a417af45ff2d51736962d8fb28a6bf81b Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 13 May 2021 15:18:01 +0530 Subject: [PATCH 1/4] Use cache if schema was processed before --- pkg/redshiftloader/load_processor.go | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index ce7e79a75..2622f700a 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -89,6 +89,10 @@ type loadProcessor struct { // metricSetter sets the load metrics metric metricSetter + + // schemaTargetTable is the cache used to get the targetTable from + // schema ID without doing recomputation for the schema id + schemaTargetTable map[int]redshift.Table } func newLoadProcessor( @@ -133,6 +137,7 @@ func newLoadProcessor( rsk: viper.GetString("rsk"), sinkGroup: viper.GetString("sinkGroup"), }, + schemaTargetTable: make(map[int]redshift.Table), }, nil } @@ -579,6 +584,7 @@ func (b *loadProcessor) migrateTable( } // migrateSchema construct the "inputTable" using schemaId in the message. +// If it has processed the schemaID before it returns, this is done for reducing queries to redshift. // If the schema and table does not exist it creates and returns. // If not then it constructs the "targetTable" by querying the database. // It compares the targetTable and inputTable schema. @@ -591,8 +597,13 @@ func (b *loadProcessor) migrateTable( // Supported: alter columns (supported via table migration) // TODO: NotSupported: row ordering changes and row renames func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTable redshift.Table) error { - // TODO: add cache here based on schema id and return - // save some database calls. + targetTableCache, ok := b.schemaTargetTable[schemaId] + if ok { + klog.V(2).Infof("%s using cache for targetTable", b.topic) + b.targetTable = &targetTableCache + return nil + } + tableExist, err := b.redshifter.TableExist( ctx, inputTable.Meta.Schema, inputTable.Name, ) @@ -623,6 +634,7 @@ func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTa inputTable.Name, ) b.targetTable = redshift.NewTable(inputTable) + b.schemaTargetTable[schemaId] = *b.targetTable return nil } @@ -630,6 +642,7 @@ func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTa ctx, inputTable.Meta.Schema, inputTable.Name, ) b.targetTable = targetTable + b.schemaTargetTable[schemaId] = *b.targetTable if err != nil { return fmt.Errorf("Error querying targetTable, err: %v\n", err) } From d21ce41fe4ccce34d9d87d0772317d4241045f58 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 14 May 2021 19:19:45 +0530 Subject: [PATCH 2/4] Save one more query Performance > reliability here When error happens try to drop the staging table so that next run goes ahead --- pkg/redshiftloader/load_processor.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index 2622f700a..eff1e43a4 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -440,7 +440,6 @@ func (b *loadProcessor) merge(ctx context.Context) error { return fmt.Errorf("Error committing tx, err:%v\n", err) } - // error is warning in the below task since we clean on start err = b.dropTable(ctx, b.stagingTable.Meta.Schema, b.stagingTable.Name) if err != nil { klog.Warningf("Dropping the table: %s failed!, err: %v\n", @@ -478,11 +477,8 @@ func (b *loadProcessor) createStagingTable( b.stagingTable.Columns[idx] = column } } - err := b.dropTable(ctx, b.stagingTable.Meta.Schema, b.stagingTable.Name) - if err != nil { - return fmt.Errorf("Error dropping staging table: %v\n", err) - } + var err error var primaryKeys []string if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic) @@ -520,7 +516,12 @@ func (b *loadProcessor) createStagingTable( err = b.redshifter.CreateTable(ctx, tx, *b.stagingTable) if err != nil { tx.Rollback() - return fmt.Errorf("Error creating staging table, err: %v\n", err) + orgiErr := fmt.Errorf("Error creating staging table, err: %v\n", err) + err = b.dropTable(ctx, b.stagingTable.Meta.Schema, b.stagingTable.Name) + if err != nil { + klog.Errorf("Error trying to drop staging table: %v\n", err) + } + return orgiErr } err = tx.Commit() if err != nil { From 2121a53f14357d6c73d766e75c3b95372c8129cd Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 14 May 2021 19:29:47 +0530 Subject: [PATCH 3/4] Run create and load in one tx --- pkg/redshiftloader/load_processor.go | 45 +++++++++++++++++----------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index eff1e43a4..1e409e5da 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -209,12 +209,12 @@ func (b *loadProcessor) printCurrentState() { // loadTable loads the batch to redhsift table using // COPY command. -func (b *loadProcessor) loadTable(ctx context.Context, schema, table, s3ManifestKey string) error { - tx, err := b.redshifter.Begin(ctx) - if err != nil { - return fmt.Errorf("Error creating database tx, err: %v\n", err) - } - err = b.redshifter.Copy( +func (b *loadProcessor) loadTable( + ctx context.Context, + tx *sql.Tx, + schema, table, s3ManifestKey string, +) error { + err := b.redshifter.Copy( ctx, tx, schema, table, b.s3sink.GetKeyURI(s3ManifestKey), true, false, true, true, @@ -451,14 +451,15 @@ func (b *loadProcessor) merge(ctx context.Context) error { return nil } -// createStagingTable creates a staging table based on the schema id of the -// batch messages. +// loadStagingTable creates a staging table based on the schema id of the +// batch messages and loads it // this also intializes b.stagingTable -func (b *loadProcessor) createStagingTable( +func (b *loadProcessor) loadStagingTable( ctx context.Context, schemaId int, schemaIdKey int, inputTable redshift.Table, + s3ManifestKey string, ) error { b.stagingTable = redshift.NewTable(inputTable) b.stagingTable.Name = b.stagingTable.Name + "_staged" @@ -523,12 +524,23 @@ func (b *loadProcessor) createStagingTable( } return orgiErr } + klog.V(2).Infof("%s, created staging table", b.topic) + err = b.loadTable( + ctx, + tx, + b.stagingTable.Meta.Schema, + b.stagingTable.Name, + s3ManifestKey, + ) + if err != nil { + return err + } err = tx.Commit() if err != nil { return fmt.Errorf("Error committing tx, err:%v\n", err) } klog.V(3).Infof( - "%s, schemaId:%d: created staging %s \n", + "%s, schemaId:%d: loaded staging %s \n", b.topic, schemaId, b.stagingTable.Name, @@ -757,15 +769,12 @@ func (b *loadProcessor) processBatch( // load in staging start := time.Now() - klog.V(2).Infof("%s, load staging\n", b.topic) - err = b.createStagingTable(ctx, schemaId, schemaIdKey, inputTable) - if err != nil { - return bytesProcessed, err - } - err = b.loadTable( + klog.V(2).Infof("%s, load staging", b.topic) + err = b.loadStagingTable( ctx, - b.stagingTable.Meta.Schema, - b.stagingTable.Name, + schemaId, + schemaIdKey, + inputTable, s3ManifestKey, ) if err != nil { From dbff294d0f4918a243f339607084d8dad60ff2b9 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 14 May 2021 22:37:36 +0530 Subject: [PATCH 4/4] Fix bug; tx commit is later --- pkg/redshiftloader/load_processor.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index 1e409e5da..5930f35d5 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -223,11 +223,6 @@ func (b *loadProcessor) loadTable( tx.Rollback() return fmt.Errorf("Error loading data in staging table, err:%v\n", err) } - err = tx.Commit() - if err != nil { - return fmt.Errorf("Error committing tx, err:%v\n", err) - } - klog.V(2).Infof( "%s, copied staging\n", b.topic,