diff --git a/controllers/loader_deployment.go b/controllers/loader_deployment.go index f82e7cb52..ef2a559f9 100644 --- a/controllers/loader_deployment.go +++ b/controllers/loader_deployment.go @@ -274,7 +274,7 @@ func NewLoader( Assignor: "range", Oldest: true, Log: true, - AutoCommit: false, + AutoCommit: true, SessionTimeoutSeconds: &sessionTimeoutSeconds, HearbeatIntervalSeconds: &hearbeatIntervalSeconds, MaxProcessingTime: &maxProcessingTime, diff --git a/pkg/redshiftbatcher/batch_processor.go b/pkg/redshiftbatcher/batch_processor.go index a990230f4..ef9a13af2 100644 --- a/pkg/redshiftbatcher/batch_processor.go +++ b/pkg/redshiftbatcher/batch_processor.go @@ -173,6 +173,9 @@ type response struct { batchSchemaID int batchSchemaTable redshift.Table skipMerge bool + createEvents int64 + updateEvents int64 + deleteEvents int64 s3Key string bodyBuf *bytes.Buffer startOffset int64 @@ -269,6 +272,9 @@ func (b *batchProcessor) signalLoad(resp *response) error { resp.maskSchema, resp.skipMerge, resp.bytesProcessed, + resp.createEvents, + resp.updateEvents, + resp.deleteEvents, ) err := b.signaler.Add( @@ -384,9 +390,8 @@ func (b *batchProcessor) processMessage( if b.maskMessages && len(resp.maskSchema) == 0 { resp.maskSchema = message.MaskSchema } - if message.Operation != serializer.OperationCreate { - resp.skipMerge = false - } + + resp.skipMerge = false // deprecated klog.V(5).Infof( "%s: batchID:%d id:%d: transformed\n", b.topic, resp.batchID, messageID, @@ -415,6 +420,17 @@ func (b *batchProcessor) processMessages( return totalBytesProcessed, err } totalBytesProcessed += bytesProcessed + + switch message.Operation { + case serializer.OperationCreate: + resp.createEvents += 1 + case serializer.OperationUpdate: + resp.updateEvents += 1 + case serializer.OperationDelete: + resp.deleteEvents += 1 + default: + klog.Fatalf("Unkown operation: %+v, message: %+v", message.Operation, message) + } } } @@ -529,7 +545,10 @@ func (b *batchProcessor) Process( err: nil, batchID: i + 1, batchSchemaID: -1, - skipMerge: true, + skipMerge: false, + createEvents: 0, + updateEvents: 0, + deleteEvents: 0, s3Key: "", bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)), maskSchema: make(map[string]serializer.MaskInfo), diff --git a/pkg/redshiftloader/job.go b/pkg/redshiftloader/job.go index 8282d8eb5..2367112c1 100644 --- a/pkg/redshiftloader/job.go +++ b/pkg/redshiftloader/job.go @@ -19,7 +19,10 @@ var JobAvroSchema string = `{ {"name": "schemaIdKey", "type": "int", "default": -1}, {"name": "maskSchema", "type": "string"}, {"name": "skipMerge", "type": "string", "default": ""}, - {"name": "batchBytes", "type": "long", "default": 0} + {"name": "batchBytes", "type": "long", "default": 0}, + {"name": "createEvents", "type": "long", "default": 0}, + {"name": "updateEvents", "type": "long", "default": 0}, + {"name": "deleteEvents", "type": "long", "default": 0} ] }` @@ -32,15 +35,18 @@ type Job struct { SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic) SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic) MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"` - SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY - BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch + SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents + BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch + CreateEvents int64 `json:"createEvents"` // stores count of create events + UpdateEvents int64 `json:"updateEvents"` // stores count of update events + DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events } func NewJob( upstreamTopic string, startOffset int64, endOffset int64, csvDialect string, s3Path string, schemaId int, schemaIdKey int, maskSchema map[string]serializer.MaskInfo, skipMerge bool, - batchBytes int64) Job { + batchBytes, createEvents, updateEvents, deleteEvents int64) Job { return Job{ UpstreamTopic: upstreamTopic, @@ -51,8 +57,11 @@ func NewJob( SchemaId: schemaId, SchemaIdKey: schemaIdKey, MaskSchema: maskSchema, - SkipMerge: skipMerge, + SkipMerge: skipMerge, // deprecated BatchBytes: batchBytes, + CreateEvents: createEvents, + UpdateEvents: updateEvents, + DeleteEvents: deleteEvents, } } @@ -115,6 +124,24 @@ func StringMapToJob(data map[string]interface{}) Job { } else { // backward compatibility job.BatchBytes = 0 } + case "createEvents": + if value, ok := v.(int64); ok { + job.CreateEvents = value + } else { // backward compatibility + job.CreateEvents = -1 + } + case "updateEvents": + if value, ok := v.(int64); ok { + job.UpdateEvents = value + } else { // backward compatibility + job.UpdateEvents = -1 + } + case "deleteEvents": + if value, ok := v.(int64); ok { + job.DeleteEvents = value + } else { // backward compatibility + job.DeleteEvents = -1 + } } } @@ -216,7 +243,7 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string { // ToStringMap returns a map representation of the Job func (c Job) ToStringMap() map[string]interface{} { - skipMerge := "false" + skipMerge := "false" // deprecated not used anymore, backward compatibility if c.SkipMerge { skipMerge = "true" } @@ -231,5 +258,8 @@ func (c Job) ToStringMap() map[string]interface{} { "skipMerge": skipMerge, "maskSchema": ToSchemaString(c.MaskSchema), "batchBytes": c.BatchBytes, + "createEvents": c.CreateEvents, + "updateEvents": c.UpdateEvents, + "deleteEvents": c.DeleteEvents, } } diff --git a/pkg/redshiftloader/job_test.go b/pkg/redshiftloader/job_test.go index 092a498d2..84c3026a2 100644 --- a/pkg/redshiftloader/job_test.go +++ b/pkg/redshiftloader/job_test.go @@ -24,6 +24,9 @@ func TestToStringMap(t *testing.T) { maskSchema, false, 10, + -1, + -1, + -1, ) // fmt.Printf("job_now=%+v\n\n", job) diff --git a/pkg/redshiftloader/load_processor.go b/pkg/redshiftloader/load_processor.go index 5930f35d5..9cbb24f62 100644 --- a/pkg/redshiftloader/load_processor.go +++ b/pkg/redshiftloader/load_processor.go @@ -607,7 +607,7 @@ func (b *loadProcessor) migrateTable( func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTable redshift.Table) error { targetTableCache, ok := b.schemaTargetTable[schemaId] if ok { - klog.V(2).Infof("%s using cache for targetTable", b.topic) + klog.V(2).Infof("%s, using cache for targetTable", b.topic) b.targetTable = &targetTableCache return nil } @@ -691,7 +691,9 @@ func (b *loadProcessor) processBatch( b.targetTable = nil b.upstreamTopic = "" + var eventsInfoMissing bool var entries []s3sink.S3ManifestEntry + var totalCreateEvents, totalUpdateEvents, totalDeleteEvents int64 for id, message := range msgBuf { select { case <-ctx.Done(): @@ -699,6 +701,15 @@ func (b *loadProcessor) processBatch( "session ctx done, err: %v", ctx.Err()) default: job := StringMapToJob(message.Value.(map[string]interface{})) + // backward comaptibility + if job.CreateEvents <= 0 && job.UpdateEvents <= 0 && job.DeleteEvents <= 0 { + klog.V(2).Infof("%s, events info missing", b.topic) + eventsInfoMissing = true + } + totalCreateEvents += job.CreateEvents + totalUpdateEvents += job.UpdateEvents + totalDeleteEvents += job.DeleteEvents + schemaId = job.SchemaId schemaIdKey = job.SchemaIdKey b.batchEndOffset = message.Offset @@ -762,25 +773,57 @@ func (b *loadProcessor) processBatch( ) } - // load in staging - start := time.Now() - klog.V(2).Infof("%s, load staging", b.topic) - err = b.loadStagingTable( - ctx, - schemaId, - schemaIdKey, - inputTable, - s3ManifestKey, - ) - if err != nil { - return bytesProcessed, err + allowMerge := true + if !eventsInfoMissing { + if totalCreateEvents > 0 && totalUpdateEvents == 0 && totalDeleteEvents == 0 { + allowMerge = false + } } - b.metric.setCopyStageSeconds(time.Since(start).Seconds()) - // merge and load in target - err = b.merge(ctx) - if err != nil { - return bytesProcessed, err + klog.V(2).Infof("%s, create:%v, update:%v, delete:%v events", b.topic, totalCreateEvents, totalUpdateEvents, totalDeleteEvents) + + if allowMerge { + // load data in target using staging table merge + start := time.Now() + klog.V(2).Infof("%s, load staging (using merge)", b.topic) + err = b.loadStagingTable( + ctx, + schemaId, + schemaIdKey, + inputTable, + s3ManifestKey, + ) + if err != nil { + return bytesProcessed, err + } + b.metric.setCopyStageSeconds(time.Since(start).Seconds()) + + // merge and load in target + err = b.merge(ctx) + if err != nil { + return bytesProcessed, err + } + } else { + // directy load data in target table as there is no update or delete + klog.V(2).Infof("%s, load target (skipping merge)", b.topic) + tx, err := b.redshifter.Begin(ctx) + if err != nil { + return bytesProcessed, fmt.Errorf("Error creating database tx, err: %v\n", err) + } + err = b.loadTable( + ctx, + tx, + b.targetTable.Meta.Schema, + b.targetTable.Name, + s3ManifestKey, + ) + if err != nil { + return bytesProcessed, err + } + err = tx.Commit() + if err != nil { + return bytesProcessed, fmt.Errorf("Error committing tx, err:%v\n", err) + } } if b.redshiftStats {