From 169112ac5eb964f1bbcf1761260a09716847f175 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Sun, 4 Feb 2024 21:59:35 +0530 Subject: [PATCH] fixes for UI and general (#1197) 1. `TableMappings` is now cloned and stored in state, and modifications affect only state version. Pass it through to `SyncFlow`. 2. Fixed some logs 3. removed env var for idle timeout seconds, but retaining function for tests. 4. Renamed mispelt proto field. 5. bug in UI, wasn't looking at the correct version of `additionalTables` 6. optimization - don't store records from CDC in some cases 7. don't heartbeat after every record. --- flow/activities/flowable.go | 16 +-- flow/cmd/handler.go | 4 +- flow/cmd/mirror_status.go | 1 + flow/connectors/postgres/cdc.go | 98 +++++++++---------- .../utils/cdc_records/cdc_records_storage.go | 58 +++++------ .../cdc_records/cdc_records_storage_test.go | 28 +++++- flow/peerdbenv/config.go | 5 +- flow/workflows/cdc_flow.go | 28 ++++-- flow/workflows/normalize_flow.go | 3 +- flow/workflows/qrep_flow.go | 5 +- flow/workflows/setup_flow.go | 3 +- flow/workflows/sync_flow.go | 3 +- flow/workflows/xmin_flow.go | 3 +- nexus/flow-rs/src/grpc.rs | 4 +- protos/flow.proto | 13 ++- protos/route.proto | 4 +- ui/app/api/mirrors/cdc/route.ts | 18 ++-- ui/app/api/mirrors/qrep/route.ts | 2 +- ui/app/mirrors/[mirrorId]/cdc.tsx | 2 +- ui/app/mirrors/[mirrorId]/edit/page.tsx | 2 +- 20 files changed, 168 insertions(+), 132 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5b3f26f974..26a6bbe061 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -192,21 +192,21 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName) logger := activity.GetLogger(ctx) activity.RecordHeartbeat(ctx, "starting flow...") - conn := input.FlowConnectionConfigs - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) + config := input.FlowConnectionConfigs + dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) } defer connectors.CloseConnector(dstConn) logger.Info("pulling records...") - tblNameMapping := make(map[string]model.NameAndExclude) - for _, v := range input.FlowConnectionConfigs.TableMappings { + tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings)) + for _, v := range input.SyncFlowOptions.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } errGroup, errCtx := errgroup.WithContext(ctx) - srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source) + srcConn, err := connectors.GetCDCPullConnector(errCtx, config.Source) if err != nil { return nil, fmt.Errorf("failed to get source connector: %w", err) } @@ -251,7 +251,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) hasRecords := !recordBatch.WaitAndCheckEmpty() - logger.Info("current sync flow has records?", hasRecords) + logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) if !hasRecords { // wait for the pull goroutine to finish @@ -275,7 +275,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } syncBatchID, err := dstConn.GetLastSyncBatchID(flowName) - if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { + if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { return nil, err } syncBatchID += 1 @@ -297,7 +297,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, SyncBatchID: syncBatchID, Records: recordBatch, FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.FlowConnectionConfigs.TableMappings, + TableMappings: input.SyncFlowOptions.TableMappings, StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index cbc9ed2fb3..b2cc162b46 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -188,7 +188,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( } return &protos.CreateCDCFlowResponse{ - WorflowId: workflowID, + WorkflowId: workflowID, }, nil } @@ -290,7 +290,7 @@ func (h *FlowRequestHandler) CreateQRepFlow( } return &protos.CreateQRepFlowResponse{ - WorflowId: workflowID, + WorkflowId: workflowID, }, nil } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ceca719c69..d3ba9cc26a 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -94,6 +94,7 @@ func (h *FlowRequestHandler) CDCFlowStatus( // patching config to show latest values from state config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize + config.TableMappings = state.TableMappings var initialCopyStatus *protos.SnapshotStatus diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 92071bd71f..36d04d1ef7 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -5,7 +5,6 @@ import ( "crypto/sha256" "fmt" "log/slog" - "regexp" "time" "github.com/jackc/pglogrepl" @@ -45,8 +44,6 @@ type PostgresCDCSource struct { // for storing chema delta audit logs to catalog catalogPool *pgxpool.Pool flowJobName string - - walSegmentRemovedRegex *regexp.Regexp } type PostgresCDCConfig struct { @@ -74,9 +71,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 return nil, fmt.Errorf("error getting child to parent relid map: %w", err) } - pattern := "requested WAL segment .* has already been removed.*" - regex := regexp.MustCompile(pattern) - flowName, _ := cdcConfig.AppContext.Value(shared.FlowNameKey).(string) return &PostgresCDCSource{ ctx: cdcConfig.AppContext, @@ -93,7 +87,6 @@ func NewPostgresCDCSource(cdcConfig *PostgresCDCConfig, customTypeMap map[uint32 logger: *slog.With(slog.String(string(shared.FlowNameKey), flowName)), catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, - walSegmentRemovedRegex: regex, }, nil } @@ -239,12 +232,12 @@ func (p *PostgresCDCSource) consumeStream( standbyMessageTimeout := req.IdleTimeout nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout) - addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error { - records.AddRecord(rec) + addRecordWithKey := func(key *model.TableWithPkey, rec model.Record) error { err := cdcRecordsStorage.Set(key, rec) if err != nil { return err } + records.AddRecord(rec) if cdcRecordsStorage.Len() == 1 { records.SignalAsNotEmpty() @@ -254,14 +247,6 @@ func (p *PostgresCDCSource) consumeStream( return nil } - addRecord := func(rec model.Record) error { - key, err := p.recToTablePKey(req, rec) - if err != nil { - return err - } - return addRecordWithKey(*key, rec) - } - pkmRequiresResponse := false waitingForCommit := false @@ -329,7 +314,6 @@ func (p *PostgresCDCSource) consumeStream( rawMsg, err := conn.ReceiveMessage(ctx) cancel() - utils.RecordHeartbeat(p.ctx, "consumeStream ReceiveMessage") ctxErr := p.ctx.Err() if ctxErr != nil { return fmt.Errorf("consumeStream preempted: %w", ctxErr) @@ -396,7 +380,7 @@ func (p *PostgresCDCSource) consumeStream( // will change in future isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err = addRecord(rec) + err := addRecordWithKey(nil, rec) if err != nil { return err } @@ -411,14 +395,14 @@ func (p *PostgresCDCSource) consumeStream( return err } if !ok { - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) } else { // iterate through unchanged toast cols and set them in new record updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems()) for _, col := range updatedCols { delete(r.UnchangedToastColumns, col) } - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) } if err != nil { return err @@ -428,7 +412,7 @@ func (p *PostgresCDCSource) consumeStream( case *model.InsertRecord: isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull if isFullReplica { - err = addRecord(rec) + err := addRecordWithKey(nil, rec) if err != nil { return err } @@ -438,42 +422,53 @@ func (p *PostgresCDCSource) consumeStream( return err } - err = addRecordWithKey(*tablePkeyVal, rec) + err = addRecordWithKey(tablePkeyVal, rec) if err != nil { return err } } case *model.DeleteRecord: - tablePkeyVal, err := p.recToTablePKey(req, rec) - if err != nil { - return err - } + isFullReplica := req.TableNameSchemaMapping[tableName].IsReplicaIdentityFull + if isFullReplica { + err := addRecordWithKey(nil, rec) + if err != nil { + return err + } + } else { + tablePkeyVal, err := p.recToTablePKey(req, rec) + if err != nil { + return err + } - latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal) - if err != nil { - return err - } - if ok { - deleteRecord := rec.(*model.DeleteRecord) - deleteRecord.Items = latestRecord.GetItems() - updateRecord, ok := latestRecord.(*model.UpdateRecord) + latestRecord, ok, err := cdcRecordsStorage.Get(*tablePkeyVal) + if err != nil { + return err + } if ok { - deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + deleteRecord := rec.(*model.DeleteRecord) + deleteRecord.Items = latestRecord.GetItems() + updateRecord, ok := latestRecord.(*model.UpdateRecord) + if ok { + deleteRecord.UnchangedToastColumns = updateRecord.UnchangedToastColumns + } + } else { + deleteRecord := rec.(*model.DeleteRecord) + // there is nothing to backfill the items in the delete record with, + // so don't update the row with this record + // add sentinel value to prevent update statements from selecting + deleteRecord.UnchangedToastColumns = map[string]struct{}{ + "_peerdb_not_backfilled_delete": {}, + } } - } else { - deleteRecord := rec.(*model.DeleteRecord) - // there is nothing to backfill the items in the delete record with, - // so don't update the row with this record - // add sentinel value to prevent update statements from selecting - deleteRecord.UnchangedToastColumns = map[string]struct{}{ - "_peerdb_not_backfilled_delete": {}, + + // A delete can only be followed by an INSERT, which does not need backfilling + // No need to store DeleteRecords in memory or disk. + err = addRecordWithKey(nil, rec) + if err != nil { + return err } } - err = addRecord(rec) - if err != nil { - return err - } case *model.RelationRecord: tableSchemaDelta := r.TableSchemaDelta if len(tableSchemaDelta.AddedColumns) > 0 { @@ -524,9 +519,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl return nil, nil } - // TODO (kaushik): consider persistent state for a mirror job - // to be stored somewhere in temporal state. We might need to persist - // the state of the relation message somewhere p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) if p.relationMessageMapping[msg.RelationID] == nil { @@ -556,7 +548,7 @@ func (p *PostgresCDCSource) processInsertMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("InsertMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -591,7 +583,7 @@ func (p *PostgresCDCSource) processUpdateMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("UpdateMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] @@ -634,7 +626,7 @@ func (p *PostgresCDCSource) processDeleteMessage( } // log lsn and relation id for debugging - p.logger.Debug(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", + p.logger.Info(fmt.Sprintf("DeleteMessage => LSN: %d, RelationID: %d, Relation Name: %s", lsn, relID, tableName)) rel, ok := p.relationMessageMapping[relID] diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage.go b/flow/connectors/utils/cdc_records/cdc_records_storage.go index a1bde8d614..c44892c2c0 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage.go @@ -72,37 +72,39 @@ func (c *cdcRecordsStore) initPebbleDB() error { return nil } -func (c *cdcRecordsStore) Set(key model.TableWithPkey, rec model.Record) error { - _, ok := c.inMemoryRecords[key] - if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { - c.inMemoryRecords[key] = rec - } else { - if c.pebbleDB == nil { - slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", - c.numRecordsSwitchThreshold), - slog.String(string(shared.FlowNameKey), c.flowJobName)) - err := c.initPebbleDB() +func (c *cdcRecordsStore) Set(key *model.TableWithPkey, rec model.Record) error { + if key != nil { + _, ok := c.inMemoryRecords[*key] + if ok || len(c.inMemoryRecords) < c.numRecordsSwitchThreshold { + c.inMemoryRecords[*key] = rec + } else { + if c.pebbleDB == nil { + slog.Info(fmt.Sprintf("more than %d primary keys read, spilling to disk", + c.numRecordsSwitchThreshold), + slog.String(string(shared.FlowNameKey), c.flowJobName)) + err := c.initPebbleDB() + if err != nil { + return err + } + } + + encodedKey, err := encVal(key) if err != nil { return err } - } - - encodedKey, err := encVal(key) - if err != nil { - return err - } - // necessary to point pointer to interface so the interface is exposed - // instead of the underlying type - encodedRec, err := encVal(&rec) - if err != nil { - return err - } - // we're using Pebble as a cache, no need for durability here. - err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{ - Sync: false, - }) - if err != nil { - return fmt.Errorf("unable to store value in Pebble: %w", err) + // necessary to point pointer to interface so the interface is exposed + // instead of the underlying type + encodedRec, err := encVal(&rec) + if err != nil { + return err + } + // we're using Pebble as a cache, no need for durability here. + err = c.pebbleDB.Set(encodedKey, encodedRec, &pebble.WriteOptions{ + Sync: false, + }) + if err != nil { + return fmt.Errorf("unable to store value in Pebble: %w", err) + } } } c.numRecords++ diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index 9f0b7a9f6e..c585541054 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -81,7 +81,7 @@ func TestSingleRecord(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 10 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) // should not spill into DB require.Len(t, cdcRecordsStore.inMemoryRecords, 1) @@ -103,7 +103,7 @@ func TestRecordsTillSpill(t *testing.T) { // add records upto set limit for i := 1; i <= 10; i++ { key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) require.Len(t, cdcRecordsStore.inMemoryRecords, i) require.Nil(t, cdcRecordsStore.pebbleDB) @@ -111,7 +111,7 @@ func TestRecordsTillSpill(t *testing.T) { // this record should be spilled to DB key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) _, ok := cdcRecordsStore.inMemoryRecords[key] require.False(t, ok) @@ -132,7 +132,7 @@ func TestTimeAndRatEncoding(t *testing.T) { cdcRecordsStore.numRecordsSwitchThreshold = 0 key, rec := genKeyAndRec(t) - err := cdcRecordsStore.Set(key, rec) + err := cdcRecordsStore.Set(&key, rec) require.NoError(t, err) retreived, ok, err := cdcRecordsStore.Get(key) @@ -145,3 +145,23 @@ func TestTimeAndRatEncoding(t *testing.T) { require.NoError(t, cdcRecordsStore.Close()) } + +func TestNullKeyDoesntStore(t *testing.T) { + t.Parallel() + + cdcRecordsStore := NewCDCRecordsStore("test_time_encoding") + cdcRecordsStore.numRecordsSwitchThreshold = 0 + + key, rec := genKeyAndRec(t) + err := cdcRecordsStore.Set(nil, rec) + require.NoError(t, err) + + retreived, ok, err := cdcRecordsStore.Get(key) + require.Nil(t, retreived) + require.NoError(t, err) + require.False(t, ok) + + require.Equal(t, 1, cdcRecordsStore.Len()) + + require.NoError(t, cdcRecordsStore.Close()) +} diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 65254de73a..e9c0a2b8fc 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -29,13 +29,14 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_IDLE_TIMEOUT_SECONDS +// env variable doesn't exist anymore, but tests appear to depend on this +// in lieu of an actual value of IdleTimeoutSeconds func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { var x int if providedValue > 0 { x = providedValue } else { - x = getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) + x = getEnvInt("", 10) } return time.Duration(x) * time.Second } diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index ed44b24680..820c78a438 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -57,10 +57,16 @@ type CDCFlowWorkflowState struct { SyncFlowOptions *protos.SyncFlowOptions // options passed to all NormalizeFlows NormalizeFlowOptions *protos.NormalizeFlowOptions + // initially copied from config, all changes are made here though + TableMappings []*protos.TableMapping } // returns a new empty PeerFlowState -func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { +func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWorkflowState { + tableMappings := make([]*protos.TableMapping, 0, len(cfgTableMappings)) + for _, tableMapping := range cfgTableMappings { + tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) + } return &CDCFlowWorkflowState{ Progress: []string{"started"}, SyncFlowStatuses: nil, @@ -81,6 +87,7 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { FlowConfigUpdates: nil, SyncFlowOptions: nil, NormalizeFlowOptions: nil, + TableMappings: tableMappings, } } @@ -151,7 +158,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont if len(flowConfigUpdate.AdditionalTables) == 0 { continue } - if shared.AdditionalTablesHasOverlap(cfg.TableMappings, flowConfigUpdate.AdditionalTables) { + if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) { return fmt.Errorf("duplicate source/destination tables found in additionalTables") } @@ -173,7 +180,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables childAdditionalTablesCDCFlowID, - err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName) + err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName) if err != nil { return err } @@ -207,7 +214,8 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont for tableName, tableSchema := range res.TableNameSchemaMapping { state.TableNameSchemaMapping[tableName] = tableSchema } - cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...) + state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...) + state.SyncFlowOptions.TableMappings = state.TableMappings // finished processing, wipe it state.FlowConfigUpdates = nil } @@ -224,7 +232,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } if state == nil { - state = NewCDCFlowWorkflowState(len(cfg.TableMappings)) + state = NewCDCFlowWorkflowState(cfg.TableMappings) } w := NewCDCFlowWorkflowExecution(ctx) @@ -260,7 +268,7 @@ func CDCFlowWorkflowWithConfig( // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { - for _, mapping := range cfg.TableMappings { + for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier newName := fmt.Sprintf("%s_resync", oldName) mapping.DestinationTableIdentifier = newName @@ -328,7 +336,7 @@ func CDCFlowWorkflowWithConfig( } renameOpts.SyncedAtColName = &cfg.SyncedAtColName correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema) - for _, mapping := range cfg.TableMappings { + for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier newName := strings.TrimSuffix(oldName, "_resync") renameOpts.RenameTableOptions = append(renameOpts.RenameTableOptions, &protos.RenameTableOption{ @@ -368,6 +376,7 @@ func CDCFlowWorkflowWithConfig( IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, TableNameSchemaMapping: state.TableNameSchemaMapping, + TableMappings: state.TableMappings, } state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, @@ -468,7 +477,7 @@ func CDCFlowWorkflowWithConfig( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for state.ActiveSignal == shared.PauseSignal { - w.logger.Info("mirror has been paused for ", time.Since(startTime)) + w.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { @@ -539,7 +548,8 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced - w.logger.Info("Total records synced: ", totalRecordsSynced) + w.logger.Info("Total records synced: ", + slog.Int64("totalRecordsSynced", totalRecordsSynced)) } if childSyncFlowRes != nil { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index a598c51792..8b51faeac3 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "go.temporal.io/sdk/workflow" @@ -65,7 +66,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, if lastSyncBatchID != syncBatchID { lastSyncBatchID = syncBatchID - logger.Info("executing normalize - ", config.FlowJobName) + logger.Info("executing normalize", slog.String("flowName", config.FlowJobName)) startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, TableNameSchemaMapping: tableNameSchemaMapping, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 15ef7e50f9..024cdc783a 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "fmt" + "log/slog" "strings" "time" @@ -471,7 +472,7 @@ func QRepFlowWorkflow( return err } - logger.Info("consolidating partitions for peer flow - ", config.FlowJobName) + logger.Info("consolidating partitions for peer flow - ", slog.String("flowName", config.FlowJobName)) if err = q.consolidatePartitions(ctx); err != nil { return err } @@ -515,7 +516,7 @@ func QRepFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", time.Since(startTime)) + q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/flow/workflows/setup_flow.go b/flow/workflows/setup_flow.go index 19581061db..118cb6e221 100644 --- a/flow/workflows/setup_flow.go +++ b/flow/workflows/setup_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "slices" "sort" "time" @@ -253,7 +254,7 @@ func (s *SetupFlowExecution) executeSetupFlow( ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*protos.SetupFlowOutput, error) { - s.logger.Info("executing setup flow - ", s.cdcFlowName) + s.logger.Info("executing setup flow", slog.String("flowName", s.cdcFlowName)) // first check the connectionsAndSetupMetadataTables if err := s.checkConnectionsAndSetupMetadataTables(ctx, config); err != nil { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 4c930ebf66..4f52a0c68e 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "go.temporal.io/sdk/log" @@ -38,7 +39,7 @@ func (s *SyncFlowExecution) executeSyncFlow( opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, ) (*model.SyncResponse, error) { - s.logger.Info("executing sync flow - ", s.CDCFlowName) + s.logger.Info("executing sync flow", slog.String("flowName", s.CDCFlowName)) syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: 1 * time.Minute, diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 8c19382c24..64a71e8104 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "github.com/google/uuid" @@ -100,7 +101,7 @@ func XminFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", time.Since(startTime)) + q.logger.Info("mirror has been paused", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 9b5e2f4d97..a962c4ec4f 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -51,7 +51,7 @@ impl FlowGrpcClient { create_catalog_entry: false, }; let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?; - let workflow_id = response.into_inner().worflow_id; + let workflow_id = response.into_inner().workflow_id; Ok(workflow_id) } @@ -82,7 +82,7 @@ impl FlowGrpcClient { create_catalog_entry: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; - let workflow_id = response.into_inner().worflow_id; + let workflow_id = response.into_inner().workflow_id; Ok(workflow_id) } diff --git a/protos/flow.proto b/protos/flow.proto index 57a226ccdf..fc407a7b9f 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -42,7 +42,7 @@ message FlowConnectionConfigs { peerdb_peers.Peer destination = 3; // config for the CDC flow itself - // currently, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals + // currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals repeated TableMapping table_mappings = 4; uint32 max_batch_size = 5; uint64 idle_timeout_seconds = 6; @@ -103,6 +103,7 @@ message SyncFlowOptions { uint64 idle_timeout_seconds = 3; map src_table_id_name_mapping = 4; map table_name_schema_mapping = 5; + repeated TableMapping table_mappings = 6; } message NormalizeFlowOptions { @@ -363,22 +364,20 @@ message GetOpenConnectionsForUserResult { // STATUS_RUNNING -> STATUS_PAUSED/STATUS_TERMINATED // STATUS_PAUSED -> STATUS_RUNNING/STATUS_TERMINATED // UI can read everything except STATUS_UNKNOWN +// terminate button should always be enabled enum FlowStatus { // should never be read by UI, bail STATUS_UNKNOWN = 0; // enable pause and terminate buttons STATUS_RUNNING = 1; - // pause button becomes resume button, terminate button should still be enabled + // pause button becomes resume button STATUS_PAUSED = 2; - // neither button should be enabled STATUS_PAUSING = 3; - // neither button should be enabled, not reachable in QRep mirrors + // not reachable in QRep mirrors STATUS_SETUP = 4; - // neither button should be enabled, not reachable in QRep mirrors + // not reachable in QRep mirrors STATUS_SNAPSHOT = 5; - // neither button should be enabled STATUS_TERMINATING = 6; - // neither button should be enabled STATUS_TERMINATED = 7; } diff --git a/protos/route.proto b/protos/route.proto index f0d7dd0511..316459d78f 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -14,7 +14,7 @@ message CreateCDCFlowRequest { } message CreateCDCFlowResponse { - string worflow_id = 1; + string workflow_id = 1; } message CreateQRepFlowRequest { @@ -23,7 +23,7 @@ message CreateQRepFlowRequest { } message CreateQRepFlowResponse { - string worflow_id = 1; + string workflow_id = 1; } message ShutdownRequest { diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index 0e48fec524..7e66cda826 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -1,5 +1,8 @@ import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO'; -import { CreateCDCFlowRequest } from '@/grpc_generated/route'; +import { + CreateCDCFlowRequest, + CreateCDCFlowResponse, +} from '@/grpc_generated/route'; import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { @@ -12,14 +15,17 @@ export async function POST(request: Request) { createCatalogEntry: true, }; try { - const createStatus = await fetch(`${flowServiceAddr}/v1/flows/cdc/create`, { - method: 'POST', - body: JSON.stringify(req), - }).then((res) => { + const createStatus: CreateCDCFlowResponse = await fetch( + `${flowServiceAddr}/v1/flows/cdc/create`, + { + method: 'POST', + body: JSON.stringify(req), + } + ).then((res) => { return res.json(); }); - if (!createStatus.worflowId) { + if (!createStatus.workflowId) { return new Response(JSON.stringify(createStatus)); } let response: UCreateMirrorResponse = { diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index 6f199dc53f..63b8519fca 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -25,7 +25,7 @@ export async function POST(request: Request) { return res.json(); }); let response: UCreateMirrorResponse = { - created: !!createStatus.worflowId, + created: !!createStatus.workflowId, }; return new Response(JSON.stringify(response)); diff --git a/ui/app/mirrors/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx index dd55b5a9bf..3ca1737251 100644 --- a/ui/app/mirrors/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/[mirrorId]/cdc.tsx @@ -248,7 +248,7 @@ export function CDCMirror({ setSelectedTab(index); }; - let snapshot = <>; + let snapshot = null; if (status.cdcStatus?.snapshotStatus) { snapshot = ( diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index b86e312255..46439e2aa7 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -173,7 +173,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { }} variant='normalSolid' disabled={ - config.additionalTables.length > 0 && + additionalTables.length > 0 && mirrorState.currentFlowState.toString() !== FlowStatus[FlowStatus.STATUS_PAUSED] }