diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5b3f26f974..26a6bbe061 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -192,21 +192,21 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName) logger := activity.GetLogger(ctx) activity.RecordHeartbeat(ctx, "starting flow...") - conn := input.FlowConnectionConfigs - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) + config := input.FlowConnectionConfigs + dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) } defer connectors.CloseConnector(dstConn) logger.Info("pulling records...") - tblNameMapping := make(map[string]model.NameAndExclude) - for _, v := range input.FlowConnectionConfigs.TableMappings { + tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings)) + for _, v := range input.SyncFlowOptions.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } errGroup, errCtx := errgroup.WithContext(ctx) - srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source) + srcConn, err := connectors.GetCDCPullConnector(errCtx, config.Source) if err != nil { return nil, fmt.Errorf("failed to get source connector: %w", err) } @@ -251,7 +251,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) hasRecords := !recordBatch.WaitAndCheckEmpty() - logger.Info("current sync flow has records?", hasRecords) + logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) if !hasRecords { // wait for the pull goroutine to finish @@ -275,7 +275,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } syncBatchID, err := dstConn.GetLastSyncBatchID(flowName) - if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { + if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { return nil, err } syncBatchID += 1 @@ -297,7 +297,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, SyncBatchID: syncBatchID, Records: recordBatch, FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.FlowConnectionConfigs.TableMappings, + TableMappings: input.SyncFlowOptions.TableMappings, StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index cbc9ed2fb3..b2cc162b46 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -188,7 +188,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( } return &protos.CreateCDCFlowResponse{ - WorflowId: workflowID, + WorkflowId: workflowID, }, nil } @@ -290,7 +290,7 @@ func (h *FlowRequestHandler) CreateQRepFlow( } return &protos.CreateQRepFlowResponse{ - WorflowId: workflowID, + WorkflowId: workflowID, }, nil } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ceca719c69..d3ba9cc26a 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -94,6 +94,7 @@ func (h *FlowRequestHandler) CDCFlowStatus( // patching config to show latest values from state config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize + config.TableMappings = state.TableMappings var initialCopyStatus *protos.SnapshotStatus diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 92071bd71f..36d04d1ef7 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,7 +5,6 @@ import ( "crypto/sha256" "fmt" "log/slog" - "regexp" "time" "github.com/jackc/pglogrepl" @@ -45,8 +44,6 @@ type PostgresCDCSource struct { // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool flowJobName string - - walSegmentRemovedRegex *regexp.Regexp } type PostgresCDCConfig struct { @@ -74,9 +71,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 return nil, fmt.Errorf("error getting child to parent relid map: %w", err) } - pattern := "requested WAL segment .* has already been removed.*" - regex := regexp.MustCompile(pattern) - flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string) return &PostgresCDCSource{ ctx: cdcConfig.AppContext, @@ -93,7 +87,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, - walSegmentRemovedRegex: regex, }, nil } @@ -239,12 +232,12 @@ func (p *PostgresCDCSource) consumeStream( standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error { - records.AddRecord(rec) + addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error { err := cdcRecordsStorage.Set(key, rec) if err != nil { return err } + records.AddRecord(rec) if cdcRecordsStorage.Len() == 1 { records.SignalAsNotEmpty() @@ -254,14 +247,6 @@ func (p *PostgresCDCSource) consumeStream( return nil } - addRecord := func(rec model.Record) error { - key, err := p.recToTablePKey(req, rec) - if err != nil { - return err - } - return addRecordWithKey(*key, rec) - } - pkmRequiresResponse := false waitingForCommit := false @@ -329,7 +314,6 @@ func (p *PostgresCDCSource) consumeStream( rawMsg, err := conn.ReceiveMessage(ctx) cancel() - utils.RecordHeartbeat(p.ctx, "consumeStream ReceiveMessage") ctxErr := p.ctx.Err() if ctxErr != nil { return fmt.Errorf("consumeStream preempted: %w", ctxErr) @@ -396,7 +380,7 @@ func (p *PostgresCDCSource) consumeStream( // will change in future isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err = addRecord(rec) + err := addRecordWithKey(nil, rec) if err != nil { return err } @@ -411,14 +395,14 @@ func (p *PostgresCDCSource) consumeStream( return err } if !ok { - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) } else { // iterate through unchanged toast cols and set them in new record updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems()) for _, col := range updatedCols { delete(r.UnchangedToastColumns, col) } - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) } if err != nil { return err @@ -428,7 +412,7 @@ func (p *PostgresCDCSource) consumeStream( case *model.InsertRecord: isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err = addRecord(rec) + err := addRecordWithKey(nil, rec) if err != nil { return err } @@ -438,42 +422,53 @@ func (p *PostgresCDCSource) consumeStream( return err } - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) if err != nil { return err } } case *model.DeleteRecord: - tablePkeyVal, err := p.recToTablePKey(req, rec) - if err != nil { - return err - } + isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull + if isFullReplica { + err := addRecordWithKey(nil, rec) + if err != nil { + return err + } + } else { + tablePkeyVal, err := p.recToTablePKey(req, rec) + if err != nil { + return err + } - latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal) - if err != nil { - return err - } - if ok { - deleteRecord := rec.(*model.DeleteRecord) - deleteRecord.Items = latestRecord.GetItems() - updateRecord, ok := latestRecord.(*model.UpdateRecord) + latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal) + if err != nil { + return err + } if ok { - deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + deleteRecord := rec.(*model.DeleteRecord) + deleteRecord.Items = latestRecord.GetItems() + updateRecord, ok := latestRecord.(*model.UpdateRecord) + if ok { + deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + } + } else { + deleteRecord := rec.(*model.DeleteRecord) + // there is nothing to backfill the items in the delete record with, + // so don't update the row with this record + // add sentinel value to prevent update statements from selecting + deleteRecord.UnchangedToastColumns = map[string]struct{}{ + "_peerdb_not_backfilled_delete": {}, + } } - } else { - deleteRecord := rec.(*model.DeleteRecord) - // there is nothing to backfill the items in the delete record with, - // so don't update the row with this record - // add sentinel value to prevent update statements from selecting - deleteRecord.UnchangedToastColumns = map[string]struct{}{ - "_peerdb_not_backfilled_delete": {}, + + // A delete can only be followed by an INSERT, which does not need backfilling + // No need to store DeleteRecords in memory or disk. + err = addRecordWithKey(nil, rec) + if err != nil { + return err } } - err = addRecord(rec) - if err != nil { - return err - } case *model.RelationRecord: tableSchemaDelta := r.TableSchemaDelta if len(tableSchemaDelta.AddedColumns) > 0 { @@ -524,9 +519,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl return nil, nil } - // TODO (kaushik): consider persistent state for a mirror job - // to be stored somewhere in temporal state. We might need to persist - // the state of the relation message somewhere p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) if p.relationMessageMapping[msg.RelationID] == nil { @@ -556,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -591,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -634,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index a1bde8d614..c44892c2c0 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -72,37 +72,39 @@ func (c *cdcRecordsStore) initPebbleDB() error { return nil } -func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error { - _, ok := c.inMemoryRecords[key] - if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { - c.inMemoryRecords[key] = rec - } else { - if c.pebbleDB == nil { - slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", - c.numRecordsSwitchThreshold), - slog.String(string(shared.FlowNameKey), c.flowJobName)) - err := c.initPebbleDB() +func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error { + if key != nil { + _, ok := c.inMemoryRecords[*key] + if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { + c.inMemoryRecords[*key] = rec + } else { + if c.pebbleDB == nil { + slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", + c.numRecordsSwitchThreshold), + slog.String(string(shared.FlowNameKey), c.flowJobName)) + err := c.initPebbleDB() + if err != nil { + return err + } + } + + encodedKey, err := encVal(key) if err != nil { return err } - } - - encodedKey, err := encVal(key) - if err != nil { - return err - } - // necessary to point pointer to interface so the interface is exposed - // instead of the underlying type - encodedRec, err := encVal(&rec) - if err != nil { - return err - } - // we're using Pebble as a cache, no need for durability here. - err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{ - Sync: false, - }) - if err != nil { - return fmt.Errorf("unable to store value in Pebble: %w", err) + // necessary to point pointer to interface so the interface is exposed + // instead of the underlying type + encodedRec, err := encVal(&rec) + if err != nil { + return err + } + // we're using Pebble as a cache, no need for durability here. + err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{ + Sync: false, + }) + if err != nil { + return fmt.Errorf("unable to store value in Pebble: %w", err) + } } } c.numRecords++ diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 9f0b7a9f6e..c585541054 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -81,7 +81,7 @@ func TestSingleRecord(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 10 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) // should not spill into DB require.Len(t, cdcRecordsStore.inMemoryRecords, 1) @@ -103,7 +103,7 @@ func TestRecordsTillSpill(t *testing.T) { // add records upto set limit for i := 1; i <= 10; i++ { key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) require.Len(t, cdcRecordsStore.inMemoryRecords, i) require.Nil(t, cdcRecordsStore.pebbleDB) @@ -111,7 +111,7 @@ func TestRecordsTillSpill(t *testing.T) { // this record should be spilled to DB key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] require.False(t, ok) @@ -132,7 +132,7 @@ func TestTimeAndRatEncoding(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) @@ -145,3 +145,23 @@ func TestTimeAndRatEncoding(t *testing.T) { require.NoError(t, cdcRecordsStore.Close()) } + +func TestNullKeyDoesntStore(t *testing.T) { + t.Parallel() + + cdcRecordsStore := NewCDCRecordsStore("test_time_encoding") + cdcRecordsStore.numRecordsSwitchThreshold = 0 + + key, rec := genKeyAndRec(t) + err := cdcRecordsStore.Set(nil, rec) + require.NoError(t, err) + + retreived, ok, err := cdcRecordsStore.Get(key) + require.Nil(t, retreived) + require.NoError(t, err) + require.False(t, ok) + + require.Equal(t, 1, cdcRecordsStore.Len()) + + require.NoError(t, cdcRecordsStore.Close()) +} diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 65254de73a..e9c0a2b8fc 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -29,13 +29,14 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_IDLE_TIMEOUT_SECONDS +// env variable doesn't exist anymore, but tests appear to depend on this +// in lieu of an actual value of IdleTimeoutSeconds func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { var x int if providedValue > 0 { x = providedValue } else { - x = getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) + x = getEnvInt("", 10) } return time.Duration(x) * time.Second } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index ed44b24680..820c78a438 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -57,10 +57,16 @@ type CDCFlowWorkflowState struct { SyncFlowOptions *protos.SyncFlowOptions // options passed to all NormalizeFlows NormalizeFlowOptions *protos.NormalizeFlowOptions + // initially copied from config, all changes are made here though + TableMappings []*protos.TableMapping } // returns a new empty PeerFlowState -func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { +func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWorkflowState { + tableMappings := make([]*protos.TableMapping, 0, len(cfgTableMappings)) + for _, tableMapping := range cfgTableMappings { + tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) + } return &CDCFlowWorkflowState{ Progress: []string{"started"}, SyncFlowStatuses: nil, @@ -81,6 +87,7 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { FlowConfigUpdates: nil, SyncFlowOptions: nil, NormalizeFlowOptions: nil, + TableMappings: tableMappings, } } @@ -151,7 +158,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont if len(flowConfigUpdate.AdditionalTables) == 0 { continue } - if shared.AdditionalTablesHasOverlap(cfg.TableMappings, flowConfigUpdate.AdditionalTables) { + if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) { return fmt.Errorf("duplicate source/destination tables found in additionalTables") } @@ -173,7 +180,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables childAdditionalTablesCDCFlowID, - err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName) + err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName) if err != nil { return err } @@ -207,7 +214,8 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont for tableName, tableSchema := range res.TableNameSchemaMapping { state.TableNameSchemaMapping[tableName] = tableSchema } - cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...) + state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...) + state.SyncFlowOptions.TableMappings = state.TableMappings // finished processing, wipe it state.FlowConfigUpdates = nil } @@ -224,7 +232,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } if state == nil { - state = NewCDCFlowWorkflowState(len(cfg.TableMappings)) + state = NewCDCFlowWorkflowState(cfg.TableMappings) } w := NewCDCFlowWorkflowExecution(ctx) @@ -260,7 +268,7 @@ func CDCFlowWorkflowWithConfig( // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { - for _, mapping := range cfg.TableMappings { + for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier newName := fmt.Sprintf("%s_resync", oldName) mapping.DestinationTableIdentifier = newName @@ -328,7 +336,7 @@ func CDCFlowWorkflowWithConfig( } renameOpts.SyncedAtColName = &cfg.SyncedAtColName correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema) - for _, mapping := range cfg.TableMappings { + for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier newName := strings.TrimSuffix(oldName, "_resync") renameOpts.RenameTableOptions = append(renameOpts.RenameTableOptions, &protos.RenameTableOption{ @@ -368,6 +376,7 @@ func CDCFlowWorkflowWithConfig( IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, TableNameSchemaMapping: state.TableNameSchemaMapping, + TableMappings: state.TableMappings, } state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, @@ -468,7 +477,7 @@ func CDCFlowWorkflowWithConfig( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for state.ActiveSignal == shared.PauseSignal { - w.logger.Info("mirror has been paused for ", time.Since(startTime)) + w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { @@ -539,7 +548,8 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced - w.logger.Info("Total records synced: ", totalRecordsSynced) + w.logger.Info("Total records synced: ", + slog.Int64("totalRecordsSynced", totalRecordsSynced)) } if childSyncFlowRes != nil { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index a598c51792..8b51faeac3 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "go.temporal.io/sdk/workflow" @@ -65,7 +66,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, if lastSyncBatchID != syncBatchID { lastSyncBatchID = syncBatchID - logger.Info("executing normalize - ", config.FlowJobName) + logger.Info("executing normalize", slog.String("flowName", config.FlowJobName)) startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, TableNameSchemaMapping: tableNameSchemaMapping, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 15ef7e50f9..024cdc783a 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "fmt" + "log/slog" "strings" "time" @@ -471,7 +472,7 @@ func QRepFlowWorkflow( return err } - logger.Info("consolidating partitions for peer flow - ", config.FlowJobName) + logger.Info("consolidating partitions for peer flow - ", slog.String("flowName", config.FlowJobName)) if err = q.consolidatePartitions(ctx); err != nil { return err } @@ -515,7 +516,7 @@ func QRepFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", time.Since(startTime)) + q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 19581061db..118cb6e221 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "slices" "sort" "time" @@ -253,7 +254,7 @@ func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*protos.SetupFlowOutput, error) { - s.logger.Info("executing setup flow - ", s.cdcFlowName) + s.logger.Info("executing setup flow", slog.String("flowName", s.cdcFlowName)) // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 4c930ebf66..4f52a0c68e 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "go.temporal.io/sdk/log" @@ -38,7 +39,7 @@ func (s *SyncFlowExecution) executeSyncFlow( opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, ) (*model.SyncResponse, error) { - s.logger.Info("executing sync flow - ", s.CDCFlowName) + s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName)) syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: 1 * time.Minute, diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 8c19382c24..64a71e8104 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "github.com/google/uuid" @@ -100,7 +101,7 @@ func XminFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", time.Since(startTime)) + q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 9b5e2f4d97..a962c4ec4f 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -51,7 +51,7 @@ impl FlowGrpcClient { create_catalog_entry: false, }; let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?; - let workflow_id = response.into_inner().worflow_id; + let workflow_id = response.into_inner().workflow_id; Ok(workflow_id) } @@ -82,7 +82,7 @@ impl FlowGrpcClient { create_catalog_entry: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; - let workflow_id = response.into_inner().worflow_id; + let workflow_id = response.into_inner().workflow_id; Ok(workflow_id) } diff --git a/protos/flow.proto b/protos/flow.proto index 57a226ccdf..fc407a7b9f 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -42,7 +42,7 @@ message FlowConnectionConfigs { peerdb_peers.Peer destination = 3; // config for the CDC flow itself - // currently, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals + // currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals repeated TableMapping table_mappings = 4; uint32 max_batch_size = 5; uint64 idle_timeout_seconds = 6; @@ -103,6 +103,7 @@ message SyncFlowOptions { uint64 idle_timeout_seconds = 3; map src_table_id_name_mapping = 4; map table_name_schema_mapping = 5; + repeated TableMapping table_mappings = 6; } message NormalizeFlowOptions { @@ -363,22 +364,20 @@ message GetOpenConnectionsForUserResult { // STATUS_RUNNING -> STATUS_PAUSED/STATUS_TERMINATED // STATUS_PAUSED -> STATUS_RUNNING/STATUS_TERMINATED // UI can read everything except STATUS_UNKNOWN +// terminate button should always be enabled enum FlowStatus { // should never be read by UI, bail STATUS_UNKNOWN = 0; // enable pause and terminate buttons STATUS_RUNNING = 1; - // pause button becomes resume button, terminate button should still be enabled + // pause button becomes resume button STATUS_PAUSED = 2; - // neither button should be enabled STATUS_PAUSING = 3; - // neither button should be enabled, not reachable in QRep mirrors + // not reachable in QRep mirrors STATUS_SETUP = 4; - // neither button should be enabled, not reachable in QRep mirrors + // not reachable in QRep mirrors STATUS_SNAPSHOT = 5; - // neither button should be enabled STATUS_TERMINATING = 6; - // neither button should be enabled STATUS_TERMINATED = 7; } diff --git a/protos/route.proto b/protos/route.proto index f0d7dd0511..316459d78f 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -14,7 +14,7 @@ message CreateCDCFlowRequest { } message CreateCDCFlowResponse { - string worflow_id = 1; + string workflow_id = 1; } message CreateQRepFlowRequest { @@ -23,7 +23,7 @@ message CreateQRepFlowRequest { } message CreateQRepFlowResponse { - string worflow_id = 1; + string workflow_id = 1; } message ShutdownRequest { diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index 0e48fec524..7e66cda826 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -1,5 +1,8 @@ import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO'; -import { CreateCDCFlowRequest } from '@/grpc_generated/route'; +import { + CreateCDCFlowRequest, + CreateCDCFlowResponse, +} from '@/grpc_generated/route'; import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { @@ -12,14 +15,17 @@ export async function POST(request: Request) { createCatalogEntry: true, }; try { - const createStatus = await fetch(`${flowServiceAddr}/v1/flows/cdc/create`, { - method: 'POST', - body: JSON.stringify(req), - }).then((res) => { + const createStatus: CreateCDCFlowResponse = await fetch( + `${flowServiceAddr}/v1/flows/cdc/create`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { return res.json(); }); - if (!createStatus.worflowId) { + if (!createStatus.workflowId) { return new Response(JSON.stringify(createStatus)); } let response: UCreateMirrorResponse = { diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index 6f199dc53f..63b8519fca 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -25,7 +25,7 @@ export async function POST(request: Request) { return res.json(); }); let response: UCreateMirrorResponse = { - created: !!createStatus.worflowId, + created: !!createStatus.workflowId, }; return new Response(JSON.stringify(response)); diff --git a/ui/app/mirrors/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx index dd55b5a9bf..3ca1737251 100644 --- a/ui/app/mirrors/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/[mirrorId]/cdc.tsx @@ -248,7 +248,7 @@ export function CDCMirror({ setSelectedTab(index); }; - let snapshot = <>; + let snapshot = null; if (status.cdcStatus?.snapshotStatus) { snapshot = ( diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index b86e312255..46439e2aa7 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -173,7 +173,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { }} variant='normalSolid' disabled={ - config.additionalTables.length > 0 && + additionalTables.length > 0 && mirrorState.currentFlowState.toString() !== FlowStatus[FlowStatus.STATUS_PAUSED] }