Skip to content

Commit

Permalink
[breaking] pruning and reorganizing fields from protos, moving stuff …
Browse files Browse the repository at this point in the history
…to state (#1087)

### ⚠️ This change can break existing CDC mirrors!

Performs several breaking changes to our protobufs:

1) Removes unused fields from `FlowConnectionConfigs`: `table_schema,
src_table_id_name_mapping, table_name_schema_mapping,
snapshot_sync_mode, cdc_sync_mode, push_batch_size, push_parallelism,
metadata_peer`. `src_table_id_name_mapping` and
`table_name_schema_mapping` are now moved to state, setup from
`SetupFlow` and passed accordingly via more proto changes. Order
shuffled around to group similar fields together.
2) Removed unnecessary generalization of `table_identifier`
3) Removed unused messages `EnsurePullabilityInput` and
`EnsurePullabilityOutput`
4) Changed wording of `do_initial_copy` and `initial_copy_only` to
better align with other options.
5) Removed `columns` from `TableSchema` and fixed related methods up.
6) `batch_size` had 3 different datatypes and was being modified in
multiple places, making sure `batch_size` and `idle_timeout_seconds` has
one source of truth.
  • Loading branch information
heavycrystal authored Jan 17, 2024
1 parent a0a663f commit 9c447cf
Show file tree
Hide file tree
Showing 20 changed files with 174 additions and 255 deletions.
20 changes: 9 additions & 11 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
errGroup.Go(func() error {
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
MaxBatchSize: input.SyncFlowOptions.BatchSize,
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.FlowConnectionConfigs.IdleTimeoutSeconds),
int(input.SyncFlowOptions.IdleTimeoutSeconds),
),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
TableNameSchemaMapping: input.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
RelationMessageMapping: input.RelationMessageMapping,
Expand Down Expand Up @@ -293,12 +293,10 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,

syncStartTime := time.Now()
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
PushBatchSize: input.FlowConnectionConfigs.PushBatchSize,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
TableMappings: input.FlowConnectionConfigs.TableMappings,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
})
if err != nil {
slog.Warn("failed to push records", slog.Any("error", err))
Expand Down Expand Up @@ -404,7 +402,7 @@ func (a *FlowableActivity) StartNormalize(
SoftDelete: input.FlowConnectionConfigs.SoftDelete,
SoftDeleteColName: input.FlowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: input.FlowConnectionConfigs.SyncedAtColName,
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
TableNameSchemaMapping: input.TableNameSchemaMapping,
})
if err != nil {
a.Alerter.LogFlowError(ctx, input.FlowConnectionConfigs.FlowJobName, err)
Expand Down
6 changes: 2 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
},
}

maxBatchSize := int(cfg.MaxBatchSize)
maxBatchSize := cfg.MaxBatchSize
if maxBatchSize == 0 {
maxBatchSize = 1_000_000
cfg.MaxBatchSize = uint32(maxBatchSize)
}

limits := &peerflow.CDCFlowLimits{
Expand Down Expand Up @@ -172,14 +171,13 @@ func (h *FlowRequestHandler) CreateCDCFlow(
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

state := peerflow.NewCDCFlowWorkflowState()
_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.CDCFlowWorkflowWithConfig, // workflow function
cfg, // workflow input
limits, // workflow limits
state, // workflow state
nil, // workflow state
)
if err != nil {
slog.Error("unable to start PeerFlow workflow", slog.Any("error", err))
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ func (c *BigQueryConnector) SetupNormalizedTables(
}

// convert the column names and types to bigquery types
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2)
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.ColumnNames)+2)
utils.IterColumns(tableSchema, func(colName, genericColType string) {
columns = append(columns, &bigquery.FieldSchema{
Name: colName,
Expand Down
13 changes: 4 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,6 @@ func (c *PostgresConnector) getTableSchemaForTable(

return &protos.TableSchema{
TableIdentifier: tableName,
Columns: nil,
PrimaryKeyColumns: pKeyCols,
IsReplicaIdentityFull: replicaIdentityType == ReplicaIdentityFull,
ColumnNames: columnNames,
Expand Down Expand Up @@ -776,7 +775,7 @@ func (c *PostgresConnector) ReplayTableSchemaDeltas(flowJobName string,
func (c *PostgresConnector) EnsurePullability(
req *protos.EnsurePullabilityBatchInput,
) (*protos.EnsurePullabilityBatchOutput, error) {
tableIdentifierMapping := make(map[string]*protos.TableIdentifier)
tableIdentifierMapping := make(map[string]*protos.PostgresTableIdentifier)
for _, tableName := range req.SourceTableIdentifiers {
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
Expand Down Expand Up @@ -804,12 +803,8 @@ func (c *PostgresConnector) EnsurePullability(
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

tableIdentifierMapping[tableName] = &protos.TableIdentifier{
TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{
PostgresTableIdentifier: &protos.PostgresTableIdentifier{
RelId: relID,
},
},
tableIdentifierMapping[tableName] = &protos.PostgresTableIdentifier{
RelId: relID,
}
utils.RecordHeartbeatWithRecover(c.ctx, fmt.Sprintf("ensured pullability table %s", tableName))
}
Expand Down Expand Up @@ -852,7 +847,7 @@ func (c *PostgresConnector) SetupReplication(signal SlotSignal, req *protos.Setu
}
// Create the replication slot and publication
err = c.createSlotAndPublication(signal, exists,
slotName, publicationName, tableNameMapping, req.DoInitialCopy)
slotName, publicationName, tableNameMapping, req.DoInitialSnapshot)
if err != nil {
return fmt.Errorf("error creating replication slot and publication: %w", err)
}
Expand Down
40 changes: 8 additions & 32 deletions flow/connectors/utils/columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,27 @@ import (
"slices"

"github.com/PeerDB-io/peer-flow/generated/protos"
"golang.org/x/exp/maps"
)

func TableSchemaColumns(schema *protos.TableSchema) int {
if schema.Columns != nil {
return len(schema.Columns)
} else {
return len(schema.ColumnNames)
}
return len(schema.ColumnNames)
}

func TableSchemaColumnNames(schema *protos.TableSchema) []string {
if schema.Columns != nil {
return maps.Keys(schema.Columns)
} else {
return slices.Clone(schema.ColumnNames)
}
return slices.Clone(schema.ColumnNames)
}

func IterColumns(schema *protos.TableSchema, iter func(k, v string)) {
if schema.Columns != nil {
for k, v := range schema.Columns {
iter(k, v)
}
} else {
for i, name := range schema.ColumnNames {
iter(name, schema.ColumnTypes[i])
}
for i, name := range schema.ColumnNames {
iter(name, schema.ColumnTypes[i])
}
}

func IterColumnsError(schema *protos.TableSchema, iter func(k, v string) error) error {
if schema.Columns != nil {
for k, v := range schema.Columns {
err := iter(k, v)
if err != nil {
return err
}
}
} else {
for i, name := range schema.ColumnNames {
err := iter(name, schema.ColumnTypes[i])
if err != nil {
return err
}
for i, name := range schema.ColumnNames {
err := iter(name, schema.ColumnTypes[i])
if err != nil {
return err
}
}
return nil
Expand Down
4 changes: 0 additions & 4 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,10 +512,6 @@ type SyncRecordsRequest struct {
TableMappings []*protos.TableMapping
// Staging path for AVRO files in CDC
StagingPath string
// PushBatchSize is the number of records to push in a batch for EventHub.
PushBatchSize int64
// PushParallelism is the number of batches in Event Hub to push in parallel.
PushParallelism int64
}

type NormalizeRecordsRequest struct {
Expand Down
53 changes: 32 additions & 21 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type CDCFlowLimits struct {
// This is typically non-zero for testing purposes.
TotalSyncFlows int
// Maximum number of rows in a sync flow batch.
MaxBatchSize int
MaxBatchSize uint32
// Rows synced after which we can say a test is done.
ExitAfterRecords int
}
Expand All @@ -49,15 +49,18 @@ type CDCFlowWorkflowState struct {
RelationMessageMapping model.RelationMessageMapping
// current workflow state
CurrentFlowState protos.FlowStatus
// moved from config here, set by SetupFlow
SrcTableIdNameMapping map[uint32]string
TableNameSchemaMapping map[string]*protos.TableSchema
}

type SignalProps struct {
BatchSize int32
IdleTimeout int64
BatchSize uint32
IdleTimeout uint64
}

// returns a new empty PeerFlowState
func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
return &CDCFlowWorkflowState{
Progress: []string{"started"},
SyncFlowStatuses: nil,
Expand All @@ -72,7 +75,9 @@ func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
RelationName: "protobuf_workaround",
},
},
CurrentFlowState: protos.FlowStatus_STATUS_SETUP,
CurrentFlowState: protos.FlowStatus_STATUS_SETUP,
SrcTableIdNameMapping: make(map[uint32]string, numTables),
TableNameSchemaMapping: make(map[string]*protos.TableSchema, numTables),
}
}

Expand Down Expand Up @@ -151,13 +156,12 @@ func CDCFlowWorkflowWithConfig(
limits *CDCFlowLimits,
state *CDCFlowWorkflowState,
) (*CDCFlowWorkflowResult, error) {
if state == nil {
state = NewCDCFlowWorkflowState()
}

if cfg == nil {
return nil, fmt.Errorf("invalid connection configs")
}
if state == nil {
state = NewCDCFlowWorkflowState(len(cfg.TableMappings))
}

w := NewCDCFlowWorkflowExecution(ctx)

Expand Down Expand Up @@ -221,9 +225,12 @@ func CDCFlowWorkflowWithConfig(
}
setupFlowCtx := workflow.WithChildOptions(ctx, childSetupFlowOpts)
setupFlowFuture := workflow.ExecuteChildWorkflow(setupFlowCtx, SetupFlowWorkflow, cfg)
if err := setupFlowFuture.Get(setupFlowCtx, &cfg); err != nil {
var setupFlowOutput *protos.SetupFlowOutput
if err := setupFlowFuture.Get(setupFlowCtx, &setupFlowOutput); err != nil {
return state, fmt.Errorf("failed to execute child workflow: %w", err)
}
state.SrcTableIdNameMapping = setupFlowOutput.SrcTableIdNameMapping
state.TableNameSchemaMapping = setupFlowOutput.TableNameSchemaMapping
state.CurrentFlowState = protos.FlowStatus_STATUS_SNAPSHOT

// next part of the setup is to snapshot-initial-copy and setup replication slots.
Expand Down Expand Up @@ -269,14 +276,14 @@ func CDCFlowWorkflowWithConfig(
CurrentName: oldName,
NewName: newName,
// oldName is what was used for the TableNameSchema mapping
TableSchema: cfg.TableNameSchemaMapping[oldName],
TableSchema: state.TableNameSchemaMapping[oldName],
})
mapping.DestinationTableIdentifier = newName
// TableNameSchemaMapping is referring to the _resync tables, not the actual names
correctedTableNameSchemaMapping[newName] = cfg.TableNameSchemaMapping[oldName]
correctedTableNameSchemaMapping[newName] = state.TableNameSchemaMapping[oldName]
}

cfg.TableNameSchemaMapping = correctedTableNameSchemaMapping
state.TableNameSchemaMapping = correctedTableNameSchemaMapping
renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 12 * time.Hour,
HeartbeatTimeout: 1 * time.Hour,
Expand All @@ -291,14 +298,19 @@ func CDCFlowWorkflowWithConfig(
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")

// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialCopyOnly {
return nil, nil
if cfg.InitialSnapshotOnly {
return state, nil
}
}

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: int32(limits.MaxBatchSize),
IdleTimeoutSeconds: 0,
BatchSize: limits.MaxBatchSize,
IdleTimeoutSeconds: 0,
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
TableNameSchemaMapping: state.TableNameSchemaMapping,
}
normalizeFlowOptions := &protos.NormalizeFlowOptions{
TableNameSchemaMapping: state.TableNameSchemaMapping,
}

// add a signal to change CDC properties
Expand All @@ -307,14 +319,12 @@ func CDCFlowWorkflowWithConfig(
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcSignal SignalProps
c.Receive(ctx, &cdcSignal)
// only modify for options since SyncFlow uses it
if cdcSignal.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcSignal.BatchSize
cfg.MaxBatchSize = uint32(cdcSignal.BatchSize)
limits.MaxBatchSize = int(cdcSignal.BatchSize)
}
if cdcSignal.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcSignal.IdleTimeout
cfg.IdleTimeoutSeconds = cdcSignal.IdleTimeout
}

slog.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(cfg.MaxBatchSize)),
Expand Down Expand Up @@ -451,7 +461,7 @@ func CDCFlowWorkflowWithConfig(
state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error())
} else {
for i := range modifiedSrcTables {
cfg.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
state.TableNameSchemaMapping[modifiedDstTables[i]] = getModifiedSchemaRes.TableNameSchemaMapping[modifiedSrcTables[i]]
}
}
}
Expand All @@ -475,6 +485,7 @@ func CDCFlowWorkflowWithConfig(
normCtx,
NormalizeFlowWorkflow,
cfg,
normalizeFlowOptions,
)

var childNormalizeFlowRes *model.NormalizeResponse
Expand Down
7 changes: 5 additions & 2 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState)

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
options *protos.NormalizeFlowOptions,
) (*model.NormalizeResponse, error) {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
return s.executeNormalizeFlow(ctx, config, options)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
options *protos.NormalizeFlowOptions,
) (*model.NormalizeResponse, error) {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)

Expand All @@ -52,7 +54,8 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow(
})

startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
FlowConnectionConfigs: config,
TableNameSchemaMapping: options.TableNameSchemaMapping,
}
fStartNormalize := workflow.ExecuteActivity(normalizeFlowCtx, flowable.StartNormalize, startNormalizeInput)

Expand Down
Loading

0 comments on commit 9c447cf

Please sign in to comment.