From 23095a468507d9f105a02f6301458114e1310fc5 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sun, 16 May 2021 14:31:11 +0530 Subject: [PATCH 1/8] Skip merge when possible to improv perf --- .../pkg/redshiftloader/load_processor.go | 63 +++++++++++++------ 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index 5930f35d5..a79bed2c8 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -691,6 +691,7 @@ func (b *loadProcessor) processBatch( b.targetTable = nil b.upstreamTopic = "" + var skipMerge bool var entries []s3sink.S3ManifestEntry for id, message := range msgBuf { select { @@ -699,6 +700,9 @@ func (b *loadProcessor) processBatch( "session ctx done, err: %v", ctx.Err()) default: job := StringMapToJob(message.Value.(map[string]interface{})) + if job.SkipMerge { + skipMerge = true + } schemaId = job.SchemaId schemaIdKey = job.SchemaIdKey b.batchEndOffset = message.Offset @@ -762,25 +766,48 @@ func (b *loadProcessor) processBatch( ) } - // load in staging - start := time.Now() - klog.V(2).Infof("%s, load staging", b.topic) - err = b.loadStagingTable( - ctx, - schemaId, - schemaIdKey, - inputTable, - s3ManifestKey, - ) - if err != nil { - return bytesProcessed, err - } - b.metric.setCopyStageSeconds(time.Since(start).Seconds()) + if !skipMerge { + // load data in target using staging table merge + start := time.Now() + klog.V(2).Infof("%s, load staging (use merge)", b.topic) + err = b.loadStagingTable( + ctx, + schemaId, + schemaIdKey, + inputTable, + s3ManifestKey, + ) + if err != nil { + return bytesProcessed, err + } + b.metric.setCopyStageSeconds(time.Since(start).Seconds()) - // merge and load in target - err = b.merge(ctx) - if err != nil { - return bytesProcessed, err + // merge and load in target + err = b.merge(ctx) + if err != nil { + return bytesProcessed, err + } + } else { + // directy load data in target table as there is no update or delete + klog.V(2).Infof("%s, load target (skip merge)", b.topic) + tx, err := b.redshifter.Begin(ctx) + if err != nil { + return bytesProcessed, fmt.Errorf("Error creating database tx, err: %v\n", err) + } + err = b.loadTable( + ctx, + tx, + b.targetTable.Meta.Schema, + b.targetTable.Name, + s3ManifestKey, + ) + if err != nil { + return bytesProcessed, err + } + err = tx.Commit() + if err != nil { + return bytesProcessed, fmt.Errorf("Error committing tx, err:%v\n", err) + } } if b.redshiftStats { From 6960b4f29fe142487826b373e4b91d5619e5e036 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sun, 16 May 2021 15:36:55 +0530 Subject: [PATCH 2/8] Use create update, delete events and update events count to decide merge or not This is more safe than skipMerge. skipMerge gets deprecated --- .../pkg/redshiftbatcher/batch_processor.go | 27 ++++++++++-- redshiftsink/pkg/redshiftloader/job.go | 42 ++++++++++++++++--- .../pkg/redshiftloader/load_processor.go | 21 ++++++++-- 3 files changed, 76 insertions(+), 14 deletions(-) diff --git a/redshiftsink/pkg/redshiftbatcher/batch_processor.go b/redshiftsink/pkg/redshiftbatcher/batch_processor.go index a990230f4..ef9a13af2 100644 --- a/redshiftsink/pkg/redshiftbatcher/batch_processor.go +++ b/redshiftsink/pkg/redshiftbatcher/batch_processor.go @@ -173,6 +173,9 @@ type response struct { batchSchemaID int batchSchemaTable redshift.Table skipMerge bool + createEvents int64 + updateEvents int64 + deleteEvents int64 s3Key string bodyBuf *bytes.Buffer startOffset int64 @@ -269,6 +272,9 @@ func (b *batchProcessor) signalLoad(resp *response) error { resp.maskSchema, resp.skipMerge, resp.bytesProcessed, + resp.createEvents, + resp.updateEvents, + resp.deleteEvents, ) err := b.signaler.Add( @@ -384,9 +390,8 @@ func (b *batchProcessor) processMessage( if b.maskMessages && len(resp.maskSchema) == 0 { resp.maskSchema = message.MaskSchema } - if message.Operation != serializer.OperationCreate { - resp.skipMerge = false - } + + resp.skipMerge = false // deprecated klog.V(5).Infof( "%s: batchID:%d id:%d: transformed\n", b.topic, resp.batchID, messageID, @@ -415,6 +420,17 @@ func (b *batchProcessor) processMessages( return totalBytesProcessed, err } totalBytesProcessed += bytesProcessed + + switch message.Operation { + case serializer.OperationCreate: + resp.createEvents += 1 + case serializer.OperationUpdate: + resp.updateEvents += 1 + case serializer.OperationDelete: + resp.deleteEvents += 1 + default: + klog.Fatalf("Unkown operation: %+v, message: %+v", message.Operation, message) + } } } @@ -529,7 +545,10 @@ func (b *batchProcessor) Process( err: nil, batchID: i + 1, batchSchemaID: -1, - skipMerge: true, + skipMerge: false, + createEvents: 0, + updateEvents: 0, + deleteEvents: 0, s3Key: "", bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)), maskSchema: make(map[string]serializer.MaskInfo), diff --git a/redshiftsink/pkg/redshiftloader/job.go b/redshiftsink/pkg/redshiftloader/job.go index 8282d8eb5..2367112c1 100644 --- a/redshiftsink/pkg/redshiftloader/job.go +++ b/redshiftsink/pkg/redshiftloader/job.go @@ -19,7 +19,10 @@ var JobAvroSchema string = `{ {"name": "schemaIdKey", "type": "int", "default": -1}, {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, - {"name": "batchBytes", "type": "long", "default": 0} + {"name": "batchBytes", "type": "long", "default": 0}, + {"name": "createEvents", "type": "long", "default": 0}, + {"name": "updateEvents", "type": "long", "default": 0}, + {"name": "deleteEvents", "type": "long", "default": 0} ] }` @@ -32,15 +35,18 @@ type Job struct { SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic) SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic) MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"` - SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY - BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch + SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents + BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch + CreateEvents int64 `json:"createEvents"` // stores count of create events + UpdateEvents int64 `json:"updateEvents"` // stores count of update events + DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events } func NewJob( upstreamTopic string, startOffset int64, endOffset int64, csvDialect string, s3Path string, schemaId int, schemaIdKey int, maskSchema map[string]serializer.MaskInfo, skipMerge bool, - batchBytes int64) Job { + batchBytes, createEvents, updateEvents, deleteEvents int64) Job { return Job{ UpstreamTopic: upstreamTopic, @@ -51,8 +57,11 @@ func NewJob( SchemaId: schemaId, SchemaIdKey: schemaIdKey, MaskSchema: maskSchema, - SkipMerge: skipMerge, + SkipMerge: skipMerge, // deprecated BatchBytes: batchBytes, + CreateEvents: createEvents, + UpdateEvents: updateEvents, + DeleteEvents: deleteEvents, } } @@ -115,6 +124,24 @@ func StringMapToJob(data map[string]interface{}) Job { } else { // backward compatibility job.BatchBytes = 0 } + case "createEvents": + if value, ok := v.(int64); ok { + job.CreateEvents = value + } else { // backward compatibility + job.CreateEvents = -1 + } + case "updateEvents": + if value, ok := v.(int64); ok { + job.UpdateEvents = value + } else { // backward compatibility + job.UpdateEvents = -1 + } + case "deleteEvents": + if value, ok := v.(int64); ok { + job.DeleteEvents = value + } else { // backward compatibility + job.DeleteEvents = -1 + } } } @@ -216,7 +243,7 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string { // ToStringMap returns a map representation of the Job func (c Job) ToStringMap() map[string]interface{} { - skipMerge := "false" + skipMerge := "false" // deprecated not used anymore, backward compatibility if c.SkipMerge { skipMerge = "true" } @@ -231,5 +258,8 @@ func (c Job) ToStringMap() map[string]interface{} { "skipMerge": skipMerge, "maskSchema": ToSchemaString(c.MaskSchema), "batchBytes": c.BatchBytes, + "createEvents": c.CreateEvents, + "updateEvents": c.UpdateEvents, + "deleteEvents": c.DeleteEvents, } } diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index a79bed2c8..3d60cb3ec 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -691,8 +691,9 @@ func (b *loadProcessor) processBatch( b.targetTable = nil b.upstreamTopic = "" - var skipMerge bool + var eventsInfoMissing bool var entries []s3sink.S3ManifestEntry + var totalCreateEvents, totalUpdateEvents, totalDeleteEvents int64 for id, message := range msgBuf { select { case <-ctx.Done(): @@ -700,9 +701,14 @@ func (b *loadProcessor) processBatch( "session ctx done, err: %v", ctx.Err()) default: job := StringMapToJob(message.Value.(map[string]interface{})) - if job.SkipMerge { - skipMerge = true + // backward comaptibility + if job.CreateEvents == -1 || job.UpdateEvents == -1 || job.DeleteEvents == -1 { + eventsInfoMissing = true } + totalCreateEvents += job.CreateEvents + totalUpdateEvents += job.UpdateEvents + totalDeleteEvents += job.DeleteEvents + schemaId = job.SchemaId schemaIdKey = job.SchemaIdKey b.batchEndOffset = message.Offset @@ -766,7 +772,14 @@ func (b *loadProcessor) processBatch( ) } - if !skipMerge { + allowMerge := true + if !eventsInfoMissing { + if totalCreateEvents > 0 && totalUpdateEvents == 0 && totalDeleteEvents == 0 { + allowMerge = false + } + } + + if allowMerge { // load data in target using staging table merge start := time.Now() klog.V(2).Infof("%s, load staging (use merge)", b.topic) From 2adb02b4d6c0b1c2c30fd8d624aa899e6ccf6a66 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sun, 16 May 2021 21:35:18 +0530 Subject: [PATCH 3/8] Use default as -1 backward comaptibility --- redshiftsink/pkg/redshiftloader/job.go | 6 +++--- redshiftsink/pkg/redshiftloader/job_test.go | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/redshiftsink/pkg/redshiftloader/job.go b/redshiftsink/pkg/redshiftloader/job.go index 2367112c1..68f964b9c 100644 --- a/redshiftsink/pkg/redshiftloader/job.go +++ b/redshiftsink/pkg/redshiftloader/job.go @@ -20,9 +20,9 @@ var JobAvroSchema string = `{ {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0}, - {"name": "createEvents", "type": "long", "default": 0}, - {"name": "updateEvents", "type": "long", "default": 0}, - {"name": "deleteEvents", "type": "long", "default": 0} + {"name": "createEvents", "type": "long", "default": -1}, + {"name": "updateEvents", "type": "long", "default": -1}, + {"name": "deleteEvents", "type": "long", "default": -1} ] }` diff --git a/redshiftsink/pkg/redshiftloader/job_test.go b/redshiftsink/pkg/redshiftloader/job_test.go index 092a498d2..84c3026a2 100644 --- a/redshiftsink/pkg/redshiftloader/job_test.go +++ b/redshiftsink/pkg/redshiftloader/job_test.go @@ -24,6 +24,9 @@ func TestToStringMap(t *testing.T) { maskSchema, false, 10, + -1, + -1, + -1, ) // fmt.Printf("job_now=%+v\n\n", job) From e9218d2763fd79d4def2d524c0aed830acd79efd Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Sun, 16 May 2021 21:49:45 +0530 Subject: [PATCH 4/8] Events info plz --- redshiftsink/pkg/redshiftloader/load_processor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index 3d60cb3ec..ed3fd96d1 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -779,6 +779,8 @@ func (b *loadProcessor) processBatch( } } + klog.V(2).Infof("%s, create:%v, update:%v, delete:%v events", b.topic, totalCreateEvents, totalUpdateEvents, totalDeleteEvents) + if allowMerge { // load data in target using staging table merge start := time.Now() From 04839979fe6a1b27bada3cc95bbd4198d422555d Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Mon, 17 May 2021 06:40:46 +0000 Subject: [PATCH 5/8] Fixes after tests --- redshiftsink/pkg/redshiftloader/job.go | 6 +++--- redshiftsink/pkg/redshiftloader/load_processor.go | 7 ++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/redshiftsink/pkg/redshiftloader/job.go b/redshiftsink/pkg/redshiftloader/job.go index 68f964b9c..2367112c1 100644 --- a/redshiftsink/pkg/redshiftloader/job.go +++ b/redshiftsink/pkg/redshiftloader/job.go @@ -20,9 +20,9 @@ var JobAvroSchema string = `{ {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, {"name": "batchBytes", "type": "long", "default": 0}, - {"name": "createEvents", "type": "long", "default": -1}, - {"name": "updateEvents", "type": "long", "default": -1}, - {"name": "deleteEvents", "type": "long", "default": -1} + {"name": "createEvents", "type": "long", "default": 0}, + {"name": "updateEvents", "type": "long", "default": 0}, + {"name": "deleteEvents", "type": "long", "default": 0} ] }` diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index ed3fd96d1..d6cc24a85 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -702,7 +702,8 @@ func (b *loadProcessor) processBatch( default: job := StringMapToJob(message.Value.(map[string]interface{})) // backward comaptibility - if job.CreateEvents == -1 || job.UpdateEvents == -1 || job.DeleteEvents == -1 { + if job.CreateEvents <= 0 && job.UpdateEvents <= 0 && job.DeleteEvents <= 0 { + klog.V(2).Infof("%s, events info missing", b.topic) eventsInfoMissing = true } totalCreateEvents += job.CreateEvents @@ -784,7 +785,7 @@ func (b *loadProcessor) processBatch( if allowMerge { // load data in target using staging table merge start := time.Now() - klog.V(2).Infof("%s, load staging (use merge)", b.topic) + klog.V(2).Infof("%s, load staging (using merge)", b.topic) err = b.loadStagingTable( ctx, schemaId, @@ -804,7 +805,7 @@ func (b *loadProcessor) processBatch( } } else { // directy load data in target table as there is no update or delete - klog.V(2).Infof("%s, load target (skip merge)", b.topic) + klog.V(2).Infof("%s, load target (skipping merge)", b.topic) tx, err := b.redshifter.Begin(ctx) if err != nil { return bytesProcessed, fmt.Errorf("Error creating database tx, err: %v\n", err) From 18d5987f7fdbb9b73150797cbee68edc43da9fc8 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Mon, 17 May 2021 06:54:26 +0000 Subject: [PATCH 6/8] added , --- redshiftsink/pkg/redshiftloader/load_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index d6cc24a85..21427b722 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -607,7 +607,7 @@ func (b *loadProcessor) migrateTable( func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTable redshift.Table) error { targetTableCache, ok := b.schemaTargetTable[schemaId] if ok { - klog.V(2).Infof("%s using cache for targetTable", b.topic) + klog.V(2).Infof("%s, using cache for targetTable", b.topic) b.targetTable = &targetTableCache return nil } From fc2268319eb6bb4b93108a0a8fc4dbfa00d93308 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Mon, 17 May 2021 12:52:17 +0530 Subject: [PATCH 7/8] Format fixes --- redshiftsink/pkg/redshiftloader/load_processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redshiftsink/pkg/redshiftloader/load_processor.go b/redshiftsink/pkg/redshiftloader/load_processor.go index 21427b722..9cbb24f62 100644 --- a/redshiftsink/pkg/redshiftloader/load_processor.go +++ b/redshiftsink/pkg/redshiftloader/load_processor.go @@ -703,7 +703,7 @@ func (b *loadProcessor) processBatch( job := StringMapToJob(message.Value.(map[string]interface{})) // backward comaptibility if job.CreateEvents <= 0 && job.UpdateEvents <= 0 && job.DeleteEvents <= 0 { - klog.V(2).Infof("%s, events info missing", b.topic) + klog.V(2).Infof("%s, events info missing", b.topic) eventsInfoMissing = true } totalCreateEvents += job.CreateEvents From 96f66701d3728c97bf087a0133226da273a10728 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Mon, 17 May 2021 07:40:31 +0000 Subject: [PATCH 8/8] Changing the default for auto commit as 1sec commits are sufficient, no need to wait for it --- redshiftsink/controllers/loader_deployment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redshiftsink/controllers/loader_deployment.go b/redshiftsink/controllers/loader_deployment.go index e52ba09c1..35ffed77d 100644 --- a/redshiftsink/controllers/loader_deployment.go +++ b/redshiftsink/controllers/loader_deployment.go @@ -274,7 +274,7 @@ func NewLoader( Assignor: "range", Oldest: true, Log: true, - AutoCommit: false, + AutoCommit: true, SessionTimeoutSeconds: &sessionTimeoutSeconds, HearbeatIntervalSeconds: &hearbeatIntervalSeconds, MaxProcessingTime: &maxProcessingTime,