Skip to content

Commit

Permalink
Merge pull request #223 from practo/fix-204
Browse files Browse the repository at this point in the history
Reduce queries to redshift
  • Loading branch information
alok87 authored May 14, 2021
2 parents 073c63a + dbff294 commit f8c34cb
Showing 1 changed file with 49 additions and 31 deletions.
80 changes: 49 additions & 31 deletions pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type loadProcessor struct {

// metricSetter sets the load metrics
metric metricSetter

// schemaTargetTable is the cache used to get the targetTable from
// schema ID without doing recomputation for the schema id
schemaTargetTable map[int]redshift.Table
}

func newLoadProcessor(
Expand Down Expand Up @@ -133,6 +137,7 @@ func newLoadProcessor(
rsk: viper.GetString("rsk"),
sinkGroup: viper.GetString("sinkGroup"),
},
schemaTargetTable: make(map[int]redshift.Table),
}, nil
}

Expand Down Expand Up @@ -204,12 +209,12 @@ func (b *loadProcessor) printCurrentState() {

// loadTable loads the batch to redhsift table using
// COPY command.
func (b *loadProcessor) loadTable(ctx context.Context, schema, table, s3ManifestKey string) error {
tx, err := b.redshifter.Begin(ctx)
if err != nil {
return fmt.Errorf("Error creating database tx, err: %v\n", err)
}
err = b.redshifter.Copy(
func (b *loadProcessor) loadTable(
ctx context.Context,
tx *sql.Tx,
schema, table, s3ManifestKey string,
) error {
err := b.redshifter.Copy(
ctx, tx, schema, table, b.s3sink.GetKeyURI(s3ManifestKey),
true, false,
true, true,
Expand All @@ -218,11 +223,6 @@ func (b *loadProcessor) loadTable(ctx context.Context, schema, table, s3Manifest
tx.Rollback()
return fmt.Errorf("Error loading data in staging table, err:%v\n", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("Error committing tx, err:%v\n", err)
}

klog.V(2).Infof(
"%s, copied staging\n",
b.topic,
Expand Down Expand Up @@ -435,7 +435,6 @@ func (b *loadProcessor) merge(ctx context.Context) error {
return fmt.Errorf("Error committing tx, err:%v\n", err)
}

// error is warning in the below task since we clean on start
err = b.dropTable(ctx, b.stagingTable.Meta.Schema, b.stagingTable.Name)
if err != nil {
klog.Warningf("Dropping the table: %s failed!, err: %v\n",
Expand All @@ -447,14 +446,15 @@ func (b *loadProcessor) merge(ctx context.Context) error {
return nil
}

// createStagingTable creates a staging table based on the schema id of the
// batch messages.
// loadStagingTable creates a staging table based on the schema id of the
// batch messages and loads it
// this also intializes b.stagingTable
func (b *loadProcessor) createStagingTable(
func (b *loadProcessor) loadStagingTable(
ctx context.Context,
schemaId int,
schemaIdKey int,
inputTable redshift.Table,
s3ManifestKey string,
) error {
b.stagingTable = redshift.NewTable(inputTable)
b.stagingTable.Name = b.stagingTable.Name + "_staged"
Expand All @@ -473,11 +473,8 @@ func (b *loadProcessor) createStagingTable(
b.stagingTable.Columns[idx] = column
}
}
err := b.dropTable(ctx, b.stagingTable.Meta.Schema, b.stagingTable.Name)
if err != nil {
return fmt.Errorf("Error dropping staging table: %v\n", err)
}

var err error
var primaryKeys []string
if schemaIdKey == -1 || schemaIdKey == 0 { // Deprecated as below is expensive and does not use cache
primaryKeys, err = b.schemaTransformer.TransformKey(b.upstreamTopic)
Expand Down Expand Up @@ -515,14 +512,30 @@ func (b *loadProcessor) createStagingTable(
err = b.redshifter.CreateTable(ctx, tx, *b.stagingTable)
if err != nil {
tx.Rollback()
return fmt.Errorf("Error creating staging table, err: %v\n", err)
orgiErr := fmt.Errorf("Error creating staging table, err: %v\n", err)
err = b.dropTable(ctx, b.stagingTable.Meta.Schema, b.stagingTable.Name)
if err != nil {
klog.Errorf("Error trying to drop staging table: %v\n", err)
}
return orgiErr
}
klog.V(2).Infof("%s, created staging table", b.topic)
err = b.loadTable(
ctx,
tx,
b.stagingTable.Meta.Schema,
b.stagingTable.Name,
s3ManifestKey,
)
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("Error committing tx, err:%v\n", err)
}
klog.V(3).Infof(
"%s, schemaId:%d: created staging %s \n",
"%s, schemaId:%d: loaded staging %s \n",
b.topic,
schemaId,
b.stagingTable.Name,
Expand Down Expand Up @@ -579,6 +592,7 @@ func (b *loadProcessor) migrateTable(
}

// migrateSchema construct the "inputTable" using schemaId in the message.
// If it has processed the schemaID before it returns, this is done for reducing queries to redshift.
// If the schema and table does not exist it creates and returns.
// If not then it constructs the "targetTable" by querying the database.
// It compares the targetTable and inputTable schema.
Expand All @@ -591,8 +605,13 @@ func (b *loadProcessor) migrateTable(
// Supported: alter columns (supported via table migration)
// TODO: NotSupported: row ordering changes and row renames
func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTable redshift.Table) error {
// TODO: add cache here based on schema id and return
// save some database calls.
targetTableCache, ok := b.schemaTargetTable[schemaId]
if ok {
klog.V(2).Infof("%s using cache for targetTable", b.topic)
b.targetTable = &targetTableCache
return nil
}

tableExist, err := b.redshifter.TableExist(
ctx, inputTable.Meta.Schema, inputTable.Name,
)
Expand Down Expand Up @@ -623,13 +642,15 @@ func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTa
inputTable.Name,
)
b.targetTable = redshift.NewTable(inputTable)
b.schemaTargetTable[schemaId] = *b.targetTable
return nil
}

targetTable, err := b.redshifter.GetTableMetadata(
ctx, inputTable.Meta.Schema, inputTable.Name,
)
b.targetTable = targetTable
b.schemaTargetTable[schemaId] = *b.targetTable
if err != nil {
return fmt.Errorf("Error querying targetTable, err: %v\n", err)
}
Expand Down Expand Up @@ -743,15 +764,12 @@ func (b *loadProcessor) processBatch(

// load in staging
start := time.Now()
klog.V(2).Infof("%s, load staging\n", b.topic)
err = b.createStagingTable(ctx, schemaId, schemaIdKey, inputTable)
if err != nil {
return bytesProcessed, err
}
err = b.loadTable(
klog.V(2).Infof("%s, load staging", b.topic)
err = b.loadStagingTable(
ctx,
b.stagingTable.Meta.Schema,
b.stagingTable.Name,
schemaId,
schemaIdKey,
inputTable,
s3ManifestKey,
)
if err != nil {
Expand Down

0 comments on commit f8c34cb

Please sign in to comment.