Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skipmerge when possible #227

Merged
merged 8 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redshiftsink/controllers/loader_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func NewLoader(
Assignor: "range",
Oldest: true,
Log: true,
AutoCommit: false,
AutoCommit: true,
SessionTimeoutSeconds: &sessionTimeoutSeconds,
HearbeatIntervalSeconds: &hearbeatIntervalSeconds,
MaxProcessingTime: &maxProcessingTime,
Expand Down
27 changes: 23 additions & 4 deletions redshiftsink/pkg/redshiftbatcher/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type response struct {
batchSchemaID int
batchSchemaTable redshift.Table
skipMerge bool
createEvents int64
updateEvents int64
deleteEvents int64
s3Key string
bodyBuf *bytes.Buffer
startOffset int64
Expand Down Expand Up @@ -269,6 +272,9 @@ func (b *batchProcessor) signalLoad(resp *response) error {
resp.maskSchema,
resp.skipMerge,
resp.bytesProcessed,
resp.createEvents,
resp.updateEvents,
resp.deleteEvents,
)

err := b.signaler.Add(
Expand Down Expand Up @@ -384,9 +390,8 @@ func (b *batchProcessor) processMessage(
if b.maskMessages && len(resp.maskSchema) == 0 {
resp.maskSchema = message.MaskSchema
}
if message.Operation != serializer.OperationCreate {
resp.skipMerge = false
}

resp.skipMerge = false // deprecated
klog.V(5).Infof(
"%s: batchID:%d id:%d: transformed\n",
b.topic, resp.batchID, messageID,
Expand Down Expand Up @@ -415,6 +420,17 @@ func (b *batchProcessor) processMessages(
return totalBytesProcessed, err
}
totalBytesProcessed += bytesProcessed

switch message.Operation {
case serializer.OperationCreate:
resp.createEvents += 1
case serializer.OperationUpdate:
resp.updateEvents += 1
case serializer.OperationDelete:
resp.deleteEvents += 1
default:
klog.Fatalf("Unkown operation: %+v, message: %+v", message.Operation, message)
}
}
}

Expand Down Expand Up @@ -529,7 +545,10 @@ func (b *batchProcessor) Process(
err: nil,
batchID: i + 1,
batchSchemaID: -1,
skipMerge: true,
skipMerge: false,
createEvents: 0,
updateEvents: 0,
deleteEvents: 0,
s3Key: "",
bodyBuf: bytes.NewBuffer(make([]byte, 0, 4096)),
maskSchema: make(map[string]serializer.MaskInfo),
Expand Down
42 changes: 36 additions & 6 deletions redshiftsink/pkg/redshiftloader/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ var JobAvroSchema string = `{
{"name": "schemaIdKey", "type": "int", "default": -1},
{"name": "maskSchema", "type": "string"},
{"name": "skipMerge", "type": "string", "default": ""},
{"name": "batchBytes", "type": "long", "default": 0}
{"name": "batchBytes", "type": "long", "default": 0},
{"name": "createEvents", "type": "long", "default": 0},
{"name": "updateEvents", "type": "long", "default": 0},
{"name": "deleteEvents", "type": "long", "default": 0}
]
}`

Expand All @@ -32,15 +35,18 @@ type Job struct {
SchemaId int `json:"schemaId"` // schema id of debezium event for the value for upstream topic (batcher topic)
SchemaIdKey int `json:"schemaIdKey"` // schema id of debezium event for the key for upstream topic (batcher topic)
MaskSchema map[string]serializer.MaskInfo `json:"maskSchema"`
SkipMerge bool `json:"skipMerge"` // to load using merge strategy or directy COPY
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
SkipMerge bool `json:"skipMerge"` // deprecated in favour of createEvents, updateEvents and deleteEvents
BatchBytes int64 `json:"batchBytes"` // batch bytes store sum of all message bytes in this batch
CreateEvents int64 `json:"createEvents"` // stores count of create events
UpdateEvents int64 `json:"updateEvents"` // stores count of update events
DeleteEvents int64 `json:"deleteEvents"` // stores count of delete events
}

func NewJob(
upstreamTopic string, startOffset int64, endOffset int64,
csvDialect string, s3Path string, schemaId int, schemaIdKey int,
maskSchema map[string]serializer.MaskInfo, skipMerge bool,
batchBytes int64) Job {
batchBytes, createEvents, updateEvents, deleteEvents int64) Job {

return Job{
UpstreamTopic: upstreamTopic,
Expand All @@ -51,8 +57,11 @@ func NewJob(
SchemaId: schemaId,
SchemaIdKey: schemaIdKey,
MaskSchema: maskSchema,
SkipMerge: skipMerge,
SkipMerge: skipMerge, // deprecated
BatchBytes: batchBytes,
CreateEvents: createEvents,
UpdateEvents: updateEvents,
DeleteEvents: deleteEvents,
}
}

Expand Down Expand Up @@ -115,6 +124,24 @@ func StringMapToJob(data map[string]interface{}) Job {
} else { // backward compatibility
job.BatchBytes = 0
}
case "createEvents":
if value, ok := v.(int64); ok {
job.CreateEvents = value
} else { // backward compatibility
job.CreateEvents = -1
}
case "updateEvents":
if value, ok := v.(int64); ok {
job.UpdateEvents = value
} else { // backward compatibility
job.UpdateEvents = -1
}
case "deleteEvents":
if value, ok := v.(int64); ok {
job.DeleteEvents = value
} else { // backward compatibility
job.DeleteEvents = -1
}
}
}

Expand Down Expand Up @@ -216,7 +243,7 @@ func ToSchemaString(m map[string]serializer.MaskInfo) string {

// ToStringMap returns a map representation of the Job
func (c Job) ToStringMap() map[string]interface{} {
skipMerge := "false"
skipMerge := "false" // deprecated not used anymore, backward compatibility
if c.SkipMerge {
skipMerge = "true"
}
Expand All @@ -231,5 +258,8 @@ func (c Job) ToStringMap() map[string]interface{} {
"skipMerge": skipMerge,
"maskSchema": ToSchemaString(c.MaskSchema),
"batchBytes": c.BatchBytes,
"createEvents": c.CreateEvents,
"updateEvents": c.UpdateEvents,
"deleteEvents": c.DeleteEvents,
}
}
3 changes: 3 additions & 0 deletions redshiftsink/pkg/redshiftloader/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func TestToStringMap(t *testing.T) {
maskSchema,
false,
10,
-1,
-1,
-1,
)
// fmt.Printf("job_now=%+v\n\n", job)

Expand Down
79 changes: 61 additions & 18 deletions redshiftsink/pkg/redshiftloader/load_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (b *loadProcessor) migrateTable(
func (b *loadProcessor) migrateSchema(ctx context.Context, schemaId int, inputTable redshift.Table) error {
targetTableCache, ok := b.schemaTargetTable[schemaId]
if ok {
klog.V(2).Infof("%s using cache for targetTable", b.topic)
klog.V(2).Infof("%s, using cache for targetTable", b.topic)
b.targetTable = &targetTableCache
return nil
}
Expand Down Expand Up @@ -691,14 +691,25 @@ func (b *loadProcessor) processBatch(
b.targetTable = nil
b.upstreamTopic = ""

var eventsInfoMissing bool
var entries []s3sink.S3ManifestEntry
var totalCreateEvents, totalUpdateEvents, totalDeleteEvents int64
for id, message := range msgBuf {
select {
case <-ctx.Done():
return bytesProcessed, fmt.Errorf(
"session ctx done, err: %v", ctx.Err())
default:
job := StringMapToJob(message.Value.(map[string]interface{}))
// backward comaptibility
if job.CreateEvents <= 0 && job.UpdateEvents <= 0 && job.DeleteEvents <= 0 {
klog.V(2).Infof("%s, events info missing", b.topic)
eventsInfoMissing = true
}
totalCreateEvents += job.CreateEvents
totalUpdateEvents += job.UpdateEvents
totalDeleteEvents += job.DeleteEvents

schemaId = job.SchemaId
schemaIdKey = job.SchemaIdKey
b.batchEndOffset = message.Offset
Expand Down Expand Up @@ -762,25 +773,57 @@ func (b *loadProcessor) processBatch(
)
}

// load in staging
start := time.Now()
klog.V(2).Infof("%s, load staging", b.topic)
err = b.loadStagingTable(
ctx,
schemaId,
schemaIdKey,
inputTable,
s3ManifestKey,
)
if err != nil {
return bytesProcessed, err
allowMerge := true
if !eventsInfoMissing {
if totalCreateEvents > 0 && totalUpdateEvents == 0 && totalDeleteEvents == 0 {
allowMerge = false
}
}
b.metric.setCopyStageSeconds(time.Since(start).Seconds())

// merge and load in target
err = b.merge(ctx)
if err != nil {
return bytesProcessed, err
klog.V(2).Infof("%s, create:%v, update:%v, delete:%v events", b.topic, totalCreateEvents, totalUpdateEvents, totalDeleteEvents)

if allowMerge {
// load data in target using staging table merge
start := time.Now()
klog.V(2).Infof("%s, load staging (using merge)", b.topic)
err = b.loadStagingTable(
ctx,
schemaId,
schemaIdKey,
inputTable,
s3ManifestKey,
)
if err != nil {
return bytesProcessed, err
}
b.metric.setCopyStageSeconds(time.Since(start).Seconds())

// merge and load in target
err = b.merge(ctx)
if err != nil {
return bytesProcessed, err
}
} else {
// directy load data in target table as there is no update or delete
klog.V(2).Infof("%s, load target (skipping merge)", b.topic)
tx, err := b.redshifter.Begin(ctx)
if err != nil {
return bytesProcessed, fmt.Errorf("Error creating database tx, err: %v\n", err)
}
err = b.loadTable(
ctx,
tx,
b.targetTable.Meta.Schema,
b.targetTable.Name,
s3ManifestKey,
)
if err != nil {
return bytesProcessed, err
}
err = tx.Commit()
if err != nil {
return bytesProcessed, fmt.Errorf("Error committing tx, err:%v\n", err)
}
}

if b.redshiftStats {
Expand Down