From a5472f40b1fcdaa8e0fa7e8279e68b59cbac77c2 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 6 Nov 2024 21:50:43 -0500 Subject: [PATCH] Optimize row replication Signed-off-by: Matt Lord --- examples/common/scripts/vttablet-up.sh | 1 + .../vreplication/replicator_plan.go | 57 ++++++++++++------- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/examples/common/scripts/vttablet-up.sh b/examples/common/scripts/vttablet-up.sh index daa40aee894..282cd0553ea 100755 --- a/examples/common/scripts/vttablet-up.sh +++ b/examples/common/scripts/vttablet-up.sh @@ -54,6 +54,7 @@ vttablet \ --service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \ --pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \ --heartbeat_on_demand_duration=5s \ + --pprof-http \ > $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 & # Block waiting for the tablet to be listening diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 6a416cb4414..eba3fe9eb46 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -64,6 +64,13 @@ type ReplicatorPlan struct { workflowConfig *vttablet.VReplicationConfig } +type colInfo struct { + typ querypb.Type + length int64 + offset int64 + field *querypb.Field +} + // buildExecution plan uses the field info as input and the partially built // TablePlan for that table to build a full plan. func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent) (*TablePlan, error) { @@ -224,6 +231,12 @@ type TablePlan struct { CollationEnv *collations.Environment WorkflowConfig *vttablet.VReplicationConfig + + // rowInfo is used as a lazily initiated cache of column information associated + // with querypb.Row values for bulk inserts. The base information is calculated + // once based on the table plan and only the row specific values are updated for + // each row as the row is processed. + rowInfo []*colInfo } // MarshalJSON performs a custom JSON Marshalling. @@ -622,34 +635,38 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { len(tp.Fields), len(bindLocations)) } - type colInfo struct { - typ querypb.Type - length int64 - offset int64 - field *querypb.Field - } - rowInfo := make([]*colInfo, 0) - offset := int64(0) - for i, field := range tp.Fields { // collect info required for fields to be bound - length := row.Lengths[i] - if !tp.FieldsToSkip[strings.ToLower(field.Name)] { - rowInfo = append(rowInfo, &colInfo{ - typ: field.Type, - length: length, - offset: offset, - field: field, - }) + if tp.rowInfo == nil { + tp.rowInfo = make([]*colInfo, 0, len(tp.Fields)) + for i, field := range tp.Fields { // collect info required for fields to be bound + length := row.Lengths[i] + if !tp.FieldsToSkip[strings.ToLower(field.Name)] { + tp.rowInfo = append(tp.rowInfo, &colInfo{ + typ: field.Type, + length: length, + offset: offset, + field: field, + }) + } + if length > 0 { + offset += row.Lengths[i] + } } - if length > 0 { - offset += row.Lengths[i] + } else { + for i, ri := range tp.rowInfo { + length := row.Lengths[i] + ri.length = length + ri.offset = offset + if length > 0 { + offset += row.Lengths[i] + } } } // bind field values to locations var offsetQuery int for i, loc := range bindLocations { - col := rowInfo[i] + col := tp.rowInfo[i] buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset]) typ := col.typ