Skip to content

Commit

Permalink
Merge branch 'main' into check-existing-table
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Dec 7, 2023
2 parents 0f419c0 + 4eed39a commit d5d7cb0
Show file tree
Hide file tree
Showing 45 changed files with 2,616 additions and 1,889 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
flow_test:
strategy:
matrix:
runner: [ubicloud-standard-8]
runner: [ubicloud-standard-8, ubuntu-latest]
runs-on: ${{ matrix.runner }}
timeout-minutes: 30
services:
Expand Down
39 changes: 27 additions & 12 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) error {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, time.Now())
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}
Expand Down Expand Up @@ -764,12 +764,6 @@ func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context,
waitBetweenBatches = time.Duration(config.WaitBetweenBatchesSeconds) * time.Second
}

if config.WatermarkColumn == "xmin" {
// for xmin we ignore the wait between batches, as seq scan time is
// extremely slow.
waitBetweenBatches = 10 * time.Second
}

srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -853,11 +847,7 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
partition *protos.QRepPartition,
runUUID string,
) (int64, error) {
err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition)
if err != nil {
return 0, fmt.Errorf("failed to update start time for partition: %w", err)
}

startTime := time.Now()
srcConn, err := connectors.GetQRepPullConnector(ctx, config.SourcePeer)
if err != nil {
return 0, fmt.Errorf("failed to get qrep source connector: %w", err)
Expand Down Expand Up @@ -891,11 +881,36 @@ func (a *FlowableActivity) ReplicateXminPartition(ctx context.Context,
}).Errorf("failed to pull records: %v", err)
return err
}

// The first sync of an XMIN mirror will have a partition without a range
// A nil range is not supported by the catalog mirror monitor functions below
// So I'm creating a partition with a range of 0 to numRecords
partitionForMetrics := partition
if partition.Range == nil {
partitionForMetrics = &protos.QRepPartition{
PartitionId: partition.PartitionId,
Range: &protos.PartitionRange{
Range: &protos.PartitionRange_IntRange{
IntRange: &protos.IntPartitionRange{Start: 0, End: int64(numRecords)},
}},
}
}
updateErr := a.CatalogMirrorMonitor.InitializeQRepRun(ctx, config, runUUID, []*protos.QRepPartition{partitionForMetrics})
if updateErr != nil {
return updateErr
}

err := a.CatalogMirrorMonitor.UpdateStartTimeForPartition(ctx, runUUID, partition, startTime)
if err != nil {
return fmt.Errorf("failed to update start time for partition: %w", err)
}

err = a.CatalogMirrorMonitor.UpdatePullEndTimeAndRowsForPartition(errCtx, runUUID, partition, int64(numRecords))
if err != nil {
log.Errorf("%v", err)
return err
}

return nil
})

Expand Down
7 changes: 3 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
}

func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
tableIdentifierParts := strings.Split(tableIdentifier, ".")
if len(tableIdentifierParts) == 1 && peerDBType != int32(protos.DBType_BIGQUERY) {
tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...)
if peerDBType != int32(protos.DBType_BIGQUERY) && !strings.ContainsRune(tableIdentifier, '.') {
return "public." + tableIdentifier
}
return strings.Join(tableIdentifierParts, ".")
return tableIdentifier
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ and updating the other columns (not the unchanged toast columns)
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
unchangedColsArray := strings.Split(cols, ", ")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ type AvroSchema struct {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := map[string]bool{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
avroType, err := GetAvroType(bqField)
Expand All @@ -197,7 +197,7 @@ func DefineAvroSchema(dstTableName string,
// If a field is nullable, its Avro type should be ["null", actualType]
if !bqField.Required {
avroType = []interface{}{"null", avroType}
nullableFields[bqField.Name] = true
nullableFields[bqField.Name] = struct{}{}
}

avroFields = append(avroFields, AvroField{
Expand Down
Loading

0 comments on commit d5d7cb0

Please sign in to comment.