Skip to content

Commit

Permalink
Optimize row replication
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 7, 2024
1 parent 23024bd commit a5472f4
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
1 change: 1 addition & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ vttablet \
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
--heartbeat_on_demand_duration=5s \
--pprof-http \
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
57 changes: 37 additions & 20 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ type ReplicatorPlan struct {
workflowConfig *vttablet.VReplicationConfig
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}

// buildExecution plan uses the field info as input and the partially built
// TablePlan for that table to build a full plan.
func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent) (*TablePlan, error) {
Expand Down Expand Up @@ -224,6 +231,12 @@ type TablePlan struct {

CollationEnv *collations.Environment
WorkflowConfig *vttablet.VReplicationConfig

// rowInfo is used as a lazily initiated cache of column information associated
// with querypb.Row values for bulk inserts. The base information is calculated
// once based on the table plan and only the row specific values are updated for
// each row as the row is processed.
rowInfo []*colInfo
}

// MarshalJSON performs a custom JSON Marshalling.
Expand Down Expand Up @@ -622,34 +635,38 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
if tp.rowInfo == nil {
tp.rowInfo = make([]*colInfo, 0, len(tp.Fields))
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
tp.rowInfo = append(tp.rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
offset += row.Lengths[i]
}
}
if length > 0 {
offset += row.Lengths[i]
} else {
for i, ri := range tp.rowInfo {
length := row.Lengths[i]
ri.length = length
ri.offset = offset
if length > 0 {
offset += row.Lengths[i]
}
}
}

// bind field values to locations
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
col := tp.rowInfo[i]
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ

Expand Down

0 comments on commit a5472f4

Please sign in to comment.