From 43a366bb6e8b834760f4059ec5b16bfcabf06dc5 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Fri, 2 Feb 2024 03:34:48 +0530 Subject: [PATCH] fixes for UI and general --- flow/activities/flowable.go | 21 +++++++++---------- flow/cmd/handler.go | 4 ++-- flow/cmd/mirror_status.go | 1 + flow/connectors/postgres/cdc.go | 3 --- flow/peerdbenv/config.go | 11 ---------- flow/workflows/cdc_flow.go | 28 +++++++++++++++++-------- flow/workflows/normalize_flow.go | 3 ++- flow/workflows/qrep_flow.go | 5 +++-- flow/workflows/sync_flow.go | 3 ++- flow/workflows/xmin_flow.go | 3 ++- nexus/flow-rs/src/grpc.rs | 4 ++-- protos/flow.proto | 13 ++++++------ protos/route.proto | 4 ++-- ui/app/api/mirrors/cdc/route.ts | 6 +++--- ui/app/api/mirrors/qrep/route.ts | 2 +- ui/app/mirrors/[mirrorId]/cdc.tsx | 2 +- ui/app/mirrors/[mirrorId]/edit/page.tsx | 2 +- 17 files changed, 57 insertions(+), 58 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 5b3f26f974..d7f037bb77 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -192,21 +192,21 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, ctx = context.WithValue(ctx, shared.FlowNameKey, input.FlowConnectionConfigs.FlowJobName) logger := activity.GetLogger(ctx) activity.RecordHeartbeat(ctx, "starting flow...") - conn := input.FlowConnectionConfigs - dstConn, err := connectors.GetCDCSyncConnector(ctx, conn.Destination) + config := input.FlowConnectionConfigs + dstConn, err := connectors.GetCDCSyncConnector(ctx, config.Destination) if err != nil { return nil, fmt.Errorf("failed to get destination connector: %w", err) } defer connectors.CloseConnector(dstConn) logger.Info("pulling records...") - tblNameMapping := make(map[string]model.NameAndExclude) - for _, v := range input.FlowConnectionConfigs.TableMappings { + tblNameMapping := make(map[string]model.NameAndExclude, len(input.SyncFlowOptions.TableMappings)) + for _, v := range input.SyncFlowOptions.TableMappings { tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } errGroup, errCtx := errgroup.WithContext(ctx) - srcConn, err := connectors.GetCDCPullConnector(errCtx, conn.Source) + srcConn, err := connectors.GetCDCPullConnector(errCtx, config.Source) if err != nil { return nil, fmt.Errorf("failed to get source connector: %w", err) } @@ -236,9 +236,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, TableNameMapping: tblNameMapping, LastOffset: input.LastSyncState.Checkpoint, MaxBatchSize: input.SyncFlowOptions.BatchSize, - IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds( - int(input.SyncFlowOptions.IdleTimeoutSeconds), - ), + IdleTimeout: time.Duration(input.SyncFlowOptions.IdleTimeoutSeconds) * + time.Second, TableNameSchemaMapping: input.TableNameSchemaMapping, OverridePublicationName: input.FlowConnectionConfigs.PublicationName, OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName, @@ -251,7 +250,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) hasRecords := !recordBatch.WaitAndCheckEmpty() - logger.Info("current sync flow has records?", hasRecords) + logger.Info("current sync flow has records?", slog.Bool("hasRecords", hasRecords)) if !hasRecords { // wait for the pull goroutine to finish @@ -275,7 +274,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, } syncBatchID, err := dstConn.GetLastSyncBatchID(flowName) - if err != nil && conn.Destination.Type != protos.DBType_EVENTHUB { + if err != nil && config.Destination.Type != protos.DBType_EVENTHUB { return nil, err } syncBatchID += 1 @@ -297,7 +296,7 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, SyncBatchID: syncBatchID, Records: recordBatch, FlowJobName: input.FlowConnectionConfigs.FlowJobName, - TableMappings: input.FlowConnectionConfigs.TableMappings, + TableMappings: input.SyncFlowOptions.TableMappings, StagingPath: input.FlowConnectionConfigs.CdcStagingPath, }) if err != nil { diff --git a/flow/cmd/handler.go b/flow/cmd/handler.go index cbc9ed2fb3..b2cc162b46 100644 --- a/flow/cmd/handler.go +++ b/flow/cmd/handler.go @@ -188,7 +188,7 @@ func (h *FlowRequestHandler) CreateCDCFlow( } return &protos.CreateCDCFlowResponse{ - WorflowId: workflowID, + WorkflowId: workflowID, }, nil } @@ -290,7 +290,7 @@ func (h *FlowRequestHandler) CreateQRepFlow( } return &protos.CreateQRepFlowResponse{ - WorflowId: workflowID, + WorkflowId: workflowID, }, nil } diff --git a/flow/cmd/mirror_status.go b/flow/cmd/mirror_status.go index ceca719c69..d3ba9cc26a 100644 --- a/flow/cmd/mirror_status.go +++ b/flow/cmd/mirror_status.go @@ -94,6 +94,7 @@ func (h *FlowRequestHandler) CDCFlowStatus( // patching config to show latest values from state config.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds config.MaxBatchSize = state.SyncFlowOptions.BatchSize + config.TableMappings = state.TableMappings var initialCopyStatus *protos.SnapshotStatus diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 92071bd71f..ea0b0394c5 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -524,9 +524,6 @@ func (p *PostgresCDCSource) processMessage(batch *model.CDCRecordStream, xld pgl return nil, nil } - // TODO (kaushik): consider persistent state for a mirror job - // to be stored somewhere in temporal state. We might need to persist - // the state of the relation message somewhere p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v", msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns)) if p.relationMessageMapping[msg.RelationID] == nil { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 65254de73a..1ba0f1009d 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -29,17 +29,6 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration { return time.Duration(x) * time.Second } -// PEERDB_CDC_IDLE_TIMEOUT_SECONDS -func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration { - var x int - if providedValue > 0 { - x = providedValue - } else { - x = getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60) - } - return time.Duration(x) * time.Second -} - // PEERDB_CDC_DISK_SPILL_THRESHOLD func PeerDBCDCDiskSpillThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_THRESHOLD", 1_000_000) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index 60a085f33b..fd3145299a 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -57,10 +57,16 @@ type CDCFlowWorkflowState struct { SyncFlowOptions *protos.SyncFlowOptions // options passed to all NormalizeFlows NormalizeFlowOptions *protos.NormalizeFlowOptions + // initially copied from config, all changes are made here though + TableMappings []*protos.TableMapping } // returns a new empty PeerFlowState -func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { +func NewCDCFlowWorkflowState(cfgTableMappings []*protos.TableMapping) *CDCFlowWorkflowState { + tableMappings := make([]*protos.TableMapping, 0, len(cfgTableMappings)) + for _, tableMapping := range cfgTableMappings { + tableMappings = append(tableMappings, proto.Clone(tableMapping).(*protos.TableMapping)) + } return &CDCFlowWorkflowState{ Progress: []string{"started"}, SyncFlowStatuses: nil, @@ -81,6 +87,7 @@ func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState { FlowConfigUpdates: nil, SyncFlowOptions: nil, NormalizeFlowOptions: nil, + TableMappings: tableMappings, } } @@ -151,7 +158,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont if len(flowConfigUpdate.AdditionalTables) == 0 { continue } - if shared.AdditionalTablesHasOverlap(cfg.TableMappings, flowConfigUpdate.AdditionalTables) { + if shared.AdditionalTablesHasOverlap(state.TableMappings, flowConfigUpdate.AdditionalTables) { return fmt.Errorf("duplicate source/destination tables found in additionalTables") } @@ -173,7 +180,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables childAdditionalTablesCDCFlowID, - err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName) + err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName) if err != nil { return err } @@ -207,7 +214,8 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont for tableName, tableSchema := range res.TableNameSchemaMapping { state.TableNameSchemaMapping[tableName] = tableSchema } - cfg.TableMappings = append(cfg.TableMappings, flowConfigUpdate.AdditionalTables...) + state.TableMappings = append(state.TableMappings, flowConfigUpdate.AdditionalTables...) + state.SyncFlowOptions.TableMappings = state.TableMappings // finished processing, wipe it state.FlowConfigUpdates = nil } @@ -224,7 +232,7 @@ func CDCFlowWorkflowWithConfig( return nil, fmt.Errorf("invalid connection configs") } if state == nil { - state = NewCDCFlowWorkflowState(len(cfg.TableMappings)) + state = NewCDCFlowWorkflowState(cfg.TableMappings) } w := NewCDCFlowWorkflowExecution(ctx) @@ -260,7 +268,7 @@ func CDCFlowWorkflowWithConfig( // if resync is true, alter the table name schema mapping to temporarily add // a suffix to the table names. if cfg.Resync { - for _, mapping := range cfg.TableMappings { + for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier newName := fmt.Sprintf("%s_resync", oldName) mapping.DestinationTableIdentifier = newName @@ -328,7 +336,7 @@ func CDCFlowWorkflowWithConfig( } renameOpts.SyncedAtColName = &cfg.SyncedAtColName correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema) - for _, mapping := range cfg.TableMappings { + for _, mapping := range state.TableMappings { oldName := mapping.DestinationTableIdentifier newName := strings.TrimSuffix(oldName, "_resync") renameOpts.RenameTableOptions = append(renameOpts.RenameTableOptions, &protos.RenameTableOption{ @@ -368,6 +376,7 @@ func CDCFlowWorkflowWithConfig( IdleTimeoutSeconds: cfg.IdleTimeoutSeconds, SrcTableIdNameMapping: state.SrcTableIdNameMapping, TableNameSchemaMapping: state.TableNameSchemaMapping, + TableMappings: state.TableMappings, } state.NormalizeFlowOptions = &protos.NormalizeFlowOptions{ TableNameSchemaMapping: state.TableNameSchemaMapping, @@ -468,7 +477,7 @@ func CDCFlowWorkflowWithConfig( state.CurrentFlowStatus = protos.FlowStatus_STATUS_PAUSED for state.ActiveSignal == shared.PauseSignal { - w.logger.Info("mirror has been paused for ", time.Since(startTime)) + w.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate mainLoopSelector.Select(ctx) if state.ActiveSignal == shared.NoopSignal { @@ -539,7 +548,8 @@ func CDCFlowWorkflowWithConfig( state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) state.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping totalRecordsSynced += childSyncFlowRes.NumRecordsSynced - w.logger.Info("Total records synced: ", totalRecordsSynced) + w.logger.Info("Total records synced: ", + slog.Int64("totalRecordsSynced", totalRecordsSynced)) } if childSyncFlowRes != nil { diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index a598c51792..33a882d831 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "go.temporal.io/sdk/workflow" @@ -65,7 +66,7 @@ func NormalizeFlowWorkflow(ctx workflow.Context, if lastSyncBatchID != syncBatchID { lastSyncBatchID = syncBatchID - logger.Info("executing normalize - ", config.FlowJobName) + logger.Info("executing normalize - ", slog.String("flowName", config.FlowJobName)) startNormalizeInput := &protos.StartNormalizeInput{ FlowConnectionConfigs: config, TableNameSchemaMapping: tableNameSchemaMapping, diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 15ef7e50f9..3f7c0a8662 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "fmt" + "log/slog" "strings" "time" @@ -471,7 +472,7 @@ func QRepFlowWorkflow( return err } - logger.Info("consolidating partitions for peer flow - ", config.FlowJobName) + logger.Info("consolidating partitions for peer flow - ", slog.String("flowName", config.FlowJobName)) if err = q.consolidatePartitions(ctx); err != nil { return err } @@ -515,7 +516,7 @@ func QRepFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", time.Since(startTime)) + q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index 4c930ebf66..8a8ca98aec 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -2,6 +2,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "go.temporal.io/sdk/log" @@ -38,7 +39,7 @@ func (s *SyncFlowExecution) executeSyncFlow( opts *protos.SyncFlowOptions, relationMessageMapping model.RelationMessageMapping, ) (*model.SyncResponse, error) { - s.logger.Info("executing sync flow - ", s.CDCFlowName) + s.logger.Info("executing sync flow - ", slog.String("flowName", s.CDCFlowName)) syncMetaCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{ StartToCloseTimeout: 1 * time.Minute, diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index 8c19382c24..e67a3e951f 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -3,6 +3,7 @@ package peerflow import ( "fmt" + "log/slog" "time" "github.com/google/uuid" @@ -100,7 +101,7 @@ func XminFlowWorkflow( var signalVal shared.CDCFlowSignal for q.activeSignal == shared.PauseSignal { - q.logger.Info("mirror has been paused for ", time.Since(startTime)) + q.logger.Info("mirror has been paused for ", slog.Any("duration", time.Since(startTime))) // only place we block on receive, so signal processing is immediate ok, _ := signalChan.ReceiveWithTimeout(ctx, 1*time.Minute, &signalVal) if ok { diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 9b5e2f4d97..a962c4ec4f 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -51,7 +51,7 @@ impl FlowGrpcClient { create_catalog_entry: false, }; let response = self.client.create_q_rep_flow(create_qrep_flow_req).await?; - let workflow_id = response.into_inner().worflow_id; + let workflow_id = response.into_inner().workflow_id; Ok(workflow_id) } @@ -82,7 +82,7 @@ impl FlowGrpcClient { create_catalog_entry: false, }; let response = self.client.create_cdc_flow(create_peer_flow_req).await?; - let workflow_id = response.into_inner().worflow_id; + let workflow_id = response.into_inner().workflow_id; Ok(workflow_id) } diff --git a/protos/flow.proto b/protos/flow.proto index 57a226ccdf..fc407a7b9f 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -42,7 +42,7 @@ message FlowConnectionConfigs { peerdb_peers.Peer destination = 3; // config for the CDC flow itself - // currently, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals + // currently, TableMappings, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals repeated TableMapping table_mappings = 4; uint32 max_batch_size = 5; uint64 idle_timeout_seconds = 6; @@ -103,6 +103,7 @@ message SyncFlowOptions { uint64 idle_timeout_seconds = 3; map src_table_id_name_mapping = 4; map table_name_schema_mapping = 5; + repeated TableMapping table_mappings = 6; } message NormalizeFlowOptions { @@ -363,22 +364,20 @@ message GetOpenConnectionsForUserResult { // STATUS_RUNNING -> STATUS_PAUSED/STATUS_TERMINATED // STATUS_PAUSED -> STATUS_RUNNING/STATUS_TERMINATED // UI can read everything except STATUS_UNKNOWN +// terminate button should always be enabled enum FlowStatus { // should never be read by UI, bail STATUS_UNKNOWN = 0; // enable pause and terminate buttons STATUS_RUNNING = 1; - // pause button becomes resume button, terminate button should still be enabled + // pause button becomes resume button STATUS_PAUSED = 2; - // neither button should be enabled STATUS_PAUSING = 3; - // neither button should be enabled, not reachable in QRep mirrors + // not reachable in QRep mirrors STATUS_SETUP = 4; - // neither button should be enabled, not reachable in QRep mirrors + // not reachable in QRep mirrors STATUS_SNAPSHOT = 5; - // neither button should be enabled STATUS_TERMINATING = 6; - // neither button should be enabled STATUS_TERMINATED = 7; } diff --git a/protos/route.proto b/protos/route.proto index f0d7dd0511..316459d78f 100644 --- a/protos/route.proto +++ b/protos/route.proto @@ -14,7 +14,7 @@ message CreateCDCFlowRequest { } message CreateCDCFlowResponse { - string worflow_id = 1; + string workflow_id = 1; } message CreateQRepFlowRequest { @@ -23,7 +23,7 @@ message CreateQRepFlowRequest { } message CreateQRepFlowResponse { - string worflow_id = 1; + string workflow_id = 1; } message ShutdownRequest { diff --git a/ui/app/api/mirrors/cdc/route.ts b/ui/app/api/mirrors/cdc/route.ts index 0e48fec524..3de3101264 100644 --- a/ui/app/api/mirrors/cdc/route.ts +++ b/ui/app/api/mirrors/cdc/route.ts @@ -1,5 +1,5 @@ import { UCreateMirrorResponse } from '@/app/dto/MirrorsDTO'; -import { CreateCDCFlowRequest } from '@/grpc_generated/route'; +import { CreateCDCFlowRequest, CreateCDCFlowResponse } from '@/grpc_generated/route'; import { GetFlowHttpAddressFromEnv } from '@/rpc/http'; export async function POST(request: Request) { @@ -12,14 +12,14 @@ export async function POST(request: Request) { createCatalogEntry: true, }; try { - const createStatus = await fetch(`${flowServiceAddr}/v1/flows/cdc/create`, { + const createStatus: CreateCDCFlowResponse = await fetch(`${flowServiceAddr}/v1/flows/cdc/create`, { method: 'POST', body: JSON.stringify(req), }).then((res) => { return res.json(); }); - if (!createStatus.worflowId) { + if (!createStatus.workflowId) { return new Response(JSON.stringify(createStatus)); } let response: UCreateMirrorResponse = { diff --git a/ui/app/api/mirrors/qrep/route.ts b/ui/app/api/mirrors/qrep/route.ts index 6f199dc53f..63b8519fca 100644 --- a/ui/app/api/mirrors/qrep/route.ts +++ b/ui/app/api/mirrors/qrep/route.ts @@ -25,7 +25,7 @@ export async function POST(request: Request) { return res.json(); }); let response: UCreateMirrorResponse = { - created: !!createStatus.worflowId, + created: !!createStatus.workflowId, }; return new Response(JSON.stringify(response)); diff --git a/ui/app/mirrors/[mirrorId]/cdc.tsx b/ui/app/mirrors/[mirrorId]/cdc.tsx index dd55b5a9bf..3ca1737251 100644 --- a/ui/app/mirrors/[mirrorId]/cdc.tsx +++ b/ui/app/mirrors/[mirrorId]/cdc.tsx @@ -248,7 +248,7 @@ export function CDCMirror({ setSelectedTab(index); }; - let snapshot = <>; + let snapshot = null; if (status.cdcStatus?.snapshotStatus) { snapshot = ( diff --git a/ui/app/mirrors/[mirrorId]/edit/page.tsx b/ui/app/mirrors/[mirrorId]/edit/page.tsx index b86e312255..46439e2aa7 100644 --- a/ui/app/mirrors/[mirrorId]/edit/page.tsx +++ b/ui/app/mirrors/[mirrorId]/edit/page.tsx @@ -173,7 +173,7 @@ const EditMirror = ({ params: { mirrorId } }: EditMirrorProps) => { }} variant='normalSolid' disabled={ - config.additionalTables.length > 0 && + additionalTables.length > 0 && mirrorState.currentFlowState.toString() !== FlowStatus[FlowStatus.STATUS_PAUSED] }