Skip to content

Commit

Permalink
Merge pull request #227 from practo/skipmerge
Browse files Browse the repository at this point in the history
Skipmerge when possible
  • Loading branch information
alok87 authored May 17, 2021
2 parents 0be3d5d + e392b5d commit 4e75070
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 29 deletions.
2 changes: 1 addition & 1 deletion controllers/loader_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func NewLoader(
Assignor: "range",
Oldest: true,
Log: true,
AutoCommit: false,
AutoCommit: true,
SessionTimeoutSeconds: &sessionTimeoutSeconds,
HearbeatIntervalSeconds: &hearbeatIntervalSeconds,
MaxProcessingTime: &maxProcessingTime,
Expand Down
27 changes: 23 additions & 4 deletions pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type response struct {
batchSchemaID int
batchSchemaTable redshift.Table
skipMerge bool
createEvents int64
updateEvents int64
deleteEvents int64
s3Key string
bodyBuf *bytes.Buffer
startOffset int64
Expand Down Expand Up @@ -269,6 +272,9 @@ func (b *batchProcessor) signalLoad(resp *response) error {
resp.maskSchema,
resp.skipMerge,
resp.bytesProcessed,
resp.createEvents,
resp.updateEvents,
resp.deleteEvents,
)

err := b.signaler.Add(
Expand Down Expand Up @@ -384,9 +390,8 @@ func (b *batchProcessor) processMessage(
if b.maskMessages && len(resp.maskSchema) == 0 {
resp.maskSchema = message.MaskSchema
}
if message.Operation != serializer.OperationCreate {
resp.skipMerge = false
}

resp.skipMerge = false // deprecated
klog.V(5).Infof(
"%s: batchID:%d id:%d: transformed\n",
b.topic, resp.batchID, messageID,
Expand Down Expand Up @@ -415,6 +420,17 @@ func (b *batchProcessor) processMessages(
return totalBytesProcessed, err
}
totalBytesProcessed += bytesProcessed

switch message.Operation {
case serializer.OperationCreate:
resp.createEvents += 1
case serializer.OperationUpdate:
resp.updateEvents += 1
case serializer.OperationDelete:
resp.deleteEvents += 1
default:
klog.Fatalf("Unkown operation: %+v, message: %+v", message.Operation, message)
}
}
}

Expand Down Expand Up @@ -529,7 +545,10 @@ func (b *batchProcessor) Process(
err: nil,
batchID: i + 1,
batchSchemaID: -1,
skipMerge: true,
skipMerge: false,
createEvents: 0,
updateEvents: 0,
deleteEvents: 0,
s3Key: "",
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
maskSchema: make(map[string]serializer.MaskInfo),
Expand Down
42 changes: 36 additions & 6 deletions pkg/redshiftloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ var JobAvroSchema string = `{
{"name": "schemaIdKey", "type": "int", "default": -1},
{"name": "maskSchema", "type": "string"},
{"name": "skipMerge", "type": "string", "default": ""},
{"name": "batchBytes", "type": "long", "default": 0}
{"name": "batchBytes", "type": "long", "default": 0},
{"name": "createEvents", "type": "long", "default": 0},
{"name": "updateEvents", "type": "long", "default": 0},
{"name": "deleteEvents", "type": "long", "default": 0}
]
}`

Expand All @@ -32,15 +35,18 @@ type Job struct {
SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic)
SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic)
MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"`
SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
CreateEvents int64 `json:"createEvents"` // stores count of create events
UpdateEvents int64 `json:"updateEvents"` // stores count of update events
DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events
}

func NewJob(
upstreamTopic string, startOffset int64, endOffset int64,
csvDialect string, s3Path string, schemaId int, schemaIdKey int,
maskSchema map[string]serializer.MaskInfo, skipMerge bool,
batchBytes int64) Job {
batchBytes, createEvents, updateEvents, deleteEvents int64) Job {

return Job{
UpstreamTopic: upstreamTopic,
Expand All @@ -51,8 +57,11 @@ func NewJob(
SchemaId: schemaId,
SchemaIdKey: schemaIdKey,
MaskSchema: maskSchema,
SkipMerge: skipMerge,
SkipMerge: skipMerge, // deprecated
BatchBytes: batchBytes,
CreateEvents: createEvents,
UpdateEvents: updateEvents,
DeleteEvents: deleteEvents,
}
}

Expand Down Expand Up @@ -115,6 +124,24 @@ func StringMapToJob(data map[string]interface{}) Job {
} else { // backward compatibility
job.BatchBytes = 0
}
case "createEvents":
if value, ok := v.(int64); ok {
job.CreateEvents = value
} else { // backward compatibility
job.CreateEvents = -1
}
case "updateEvents":
if value, ok := v.(int64); ok {
job.UpdateEvents = value
} else { // backward compatibility
job.UpdateEvents = -1
}
case "deleteEvents":
if value, ok := v.(int64); ok {
job.DeleteEvents = value
} else { // backward compatibility
job.DeleteEvents = -1
}
}
}

Expand Down Expand Up @@ -216,7 +243,7 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string {

// ToStringMap returns a map representation of the Job
func (c Job) ToStringMap() map[string]interface{} {
skipMerge := "false"
skipMerge := "false" // deprecated not used anymore, backward compatibility
if c.SkipMerge {
skipMerge = "true"
}
Expand All @@ -231,5 +258,8 @@ func (c Job) ToStringMap() map[string]interface{} {
"skipMerge": skipMerge,
"maskSchema": ToSchemaString(c.MaskSchema),
"batchBytes": c.BatchBytes,
"createEvents": c.CreateEvents,
"updateEvents": c.UpdateEvents,
"deleteEvents": c.DeleteEvents,
}
}
3 changes: 3 additions & 0 deletions pkg/redshiftloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func TestToStringMap(t *testing.T) {
maskSchema,
false,
10,
-1,
-1,
-1,
)
// fmt.Printf("job_now=%+v\n\n", job)

Expand Down
79 changes: 61 additions & 18 deletions pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (b *loadProcessor) migrateTable(
func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTable redshift.Table) error {
targetTableCache, ok := b.schemaTargetTable[schemaId]
if ok {
klog.V(2).Infof("%s using cache for targetTable", b.topic)
klog.V(2).Infof("%s, using cache for targetTable", b.topic)
b.targetTable = &targetTableCache
return nil
}
Expand Down Expand Up @@ -691,14 +691,25 @@ func (b *loadProcessor) processBatch(
b.targetTable = nil
b.upstreamTopic = ""

var eventsInfoMissing bool
var entries []s3sink.S3ManifestEntry
var totalCreateEvents, totalUpdateEvents, totalDeleteEvents int64
for id, message := range msgBuf {
select {
case <-ctx.Done():
return bytesProcessed, fmt.Errorf(
"session ctx done, err: %v", ctx.Err())
default:
job := StringMapToJob(message.Value.(map[string]interface{}))
// backward comaptibility
if job.CreateEvents <= 0 && job.UpdateEvents <= 0 && job.DeleteEvents <= 0 {
klog.V(2).Infof("%s, events info missing", b.topic)
eventsInfoMissing = true
}
totalCreateEvents += job.CreateEvents
totalUpdateEvents += job.UpdateEvents
totalDeleteEvents += job.DeleteEvents

schemaId = job.SchemaId
schemaIdKey = job.SchemaIdKey
b.batchEndOffset = message.Offset
Expand Down Expand Up @@ -762,25 +773,57 @@ func (b *loadProcessor) processBatch(
)
}

// load in staging
start := time.Now()
klog.V(2).Infof("%s, load staging", b.topic)
err = b.loadStagingTable(
ctx,
schemaId,
schemaIdKey,
inputTable,
s3ManifestKey,
)
if err != nil {
return bytesProcessed, err
allowMerge := true
if !eventsInfoMissing {
if totalCreateEvents > 0 && totalUpdateEvents == 0 && totalDeleteEvents == 0 {
allowMerge = false
}
}
b.metric.setCopyStageSeconds(time.Since(start).Seconds())

// merge and load in target
err = b.merge(ctx)
if err != nil {
return bytesProcessed, err
klog.V(2).Infof("%s, create:%v, update:%v, delete:%v events", b.topic, totalCreateEvents, totalUpdateEvents, totalDeleteEvents)

if allowMerge {
// load data in target using staging table merge
start := time.Now()
klog.V(2).Infof("%s, load staging (using merge)", b.topic)
err = b.loadStagingTable(
ctx,
schemaId,
schemaIdKey,
inputTable,
s3ManifestKey,
)
if err != nil {
return bytesProcessed, err
}
b.metric.setCopyStageSeconds(time.Since(start).Seconds())

// merge and load in target
err = b.merge(ctx)
if err != nil {
return bytesProcessed, err
}
} else {
// directy load data in target table as there is no update or delete
klog.V(2).Infof("%s, load target (skipping merge)", b.topic)
tx, err := b.redshifter.Begin(ctx)
if err != nil {
return bytesProcessed, fmt.Errorf("Error creating database tx, err: %v\n", err)
}
err = b.loadTable(
ctx,
tx,
b.targetTable.Meta.Schema,
b.targetTable.Name,
s3ManifestKey,
)
if err != nil {
return bytesProcessed, err
}
err = tx.Commit()
if err != nil {
return bytesProcessed, fmt.Errorf("Error committing tx, err:%v\n", err)
}
}

if b.redshiftStats {
Expand Down

0 comments on commit 4e75070

Please sign in to comment.