diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 65f78e3b35..dbaa6d2a9b 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -235,8 +235,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, }) defer shutdown() - errGroup, errCtx := errgroup.WithContext(ctx) - batchSize := input.SyncFlowOptions.BatchSize if batchSize <= 0 { batchSize = 1_000_000 @@ -251,6 +249,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, recordBatch := model.NewCDCRecordStream() startTime := time.Now() flowName := input.FlowConnectionConfigs.FlowJobName + + errGroup, errCtx := errgroup.WithContext(ctx) errGroup.Go(func() error { return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{ FlowJobName: flowName, diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 774c57a0ca..e7fd1aea23 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -64,7 +64,7 @@ type startReplicationOpts struct { } // Create a new PostgresCDCSource -func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) *PostgresCDCSource { +func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ PostgresConnector: c, replConn: cdcConfig.Connection, @@ -86,7 +86,7 @@ func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u SELECT parent.oid AS parentrelid, child.oid AS childrelid FROM pg_inherits JOIN pg_class parent ON pg_inherits.inhparent = parent.oid - JOIN pg_class child ON pg_inherits.inhrelid = child.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid WHERE parent.relkind='p'; ` diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 052b148fef..c44923926a 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -226,7 +226,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo return fmt.Errorf("error getting child to parent relid map: %w", err) } - cdc := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{ + cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{ Connection: replConn, SrcTableIDNameMapping: req.SrcTableIDNameMapping, Slot: slotName, @@ -922,7 +922,6 @@ func (c *PostgresConnector) HandleSlotInfo( return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0]) } -// GetLastOffset returns the last synced offset for a job. func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) { row := conn.QueryRow(ctx, getNumConnectionsForUser, user) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 602ace6b8f..505ec2055a 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -543,9 +543,8 @@ func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnv &slog.HandlerOptions{Level: slog.LevelWarn}, ), )) - tLogger := TStructuredLogger{logger: logger} + testSuite.SetLogger(&TStructuredLogger{logger: logger}) - testSuite.SetLogger(&tLogger) env := testSuite.NewTestWorkflowEnvironment() RegisterWorkflowsAndActivities(t, env) env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index afc17927ce..fa95b852e3 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -95,12 +95,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) { } if s.SyncFlowErrors != nil { - logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors) + logger.Warn("SyncFlowErrors", slog.Any("errors", s.SyncFlowErrors)) s.SyncFlowErrors = nil } if s.NormalizeFlowErrors != nil { - logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors) + logger.Warn("NormalizeFlowErrors", slog.Any("errors", s.NormalizeFlowErrors)) s.NormalizeFlowErrors = nil } } @@ -119,21 +119,24 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution } } -func GetChildWorkflowID( - ctx workflow.Context, - prefix string, - peerFlowName string, -) (string, error) { - childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid.New().String()) +func GetUUID(ctx workflow.Context) (string, error) { + uuidSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return uuid.New().String() }) - var childWorkflowID string - if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil { - return "", fmt.Errorf("failed to get child workflow ID: %w", err) + var uuidString string + if err := uuidSideEffect.Get(&uuidString); err != nil { + return "", fmt.Errorf("failed to generate UUID: %w", err) } + return uuidString, nil +} - return childWorkflowID, nil +func GetChildWorkflowID( + prefix string, + peerFlowName string, + uuid string, +) string { + return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid) } // CDCFlowWorkflowResult is the result of the PeerFlowWorkflow. @@ -164,17 +167,20 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } + additionalTablesUUID, err := GetUUID(ctx) + if err != nil { + return err + } + childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID) + if err != nil { + return err + } + additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) additionalTablesWorkflowCfg.DoInitialSnapshot = true additionalTablesWorkflowCfg.InitialSnapshotOnly = true additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables - childAdditionalTablesCDCFlowID, - err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName) - if err != nil { - return err - } - // execute the sync flow as a child workflow childAdditionalTablesCDCFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: childAdditionalTablesCDCFlowID, @@ -248,6 +254,8 @@ func CDCFlowWorkflowWithConfig( shared.MirrorNameSearchAttribute: cfg.FlowJobName, } + originalRunID := workflow.GetInfo(ctx).OriginalRunID + // we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled // because Resync modifies TableMappings before Setup and also before Snapshot // for safety, rely on the idempotency of SetupFlow instead @@ -268,10 +276,7 @@ func CDCFlowWorkflowWithConfig( // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer - setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName) - if err != nil { - return state, err - } + setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID) childSetupFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: setupFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -292,10 +297,7 @@ func CDCFlowWorkflowWithConfig( state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT // next part of the setup is to snapshot-initial-copy and setup replication slots. - snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName) - if err != nil { - return state, err - } + snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID) taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID) if err != nil { @@ -376,11 +378,7 @@ func CDCFlowWorkflowWithConfig( currentSyncFlowNum := 0 totalRecordsSynced := int64(0) - normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName) - if err != nil { - return state, err - } - + normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID) childNormalizeFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: normalizeFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -493,11 +491,12 @@ func CDCFlowWorkflowWithConfig( } currentSyncFlowNum++ - syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName) + syncFlowUUID, err := GetUUID(ctx) if err != nil { finishNormalize() return state, err } + syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, syncFlowUUID) // execute the sync flow as a child workflow childSyncFlowOpts := workflow.ChildWorkflowOptions{ diff --git a/flow/workflows/normalize_flow.go b/flow/workflows/normalize_flow.go index f37544b14a..16c53ba7a5 100644 --- a/flow/workflows/normalize_flow.go +++ b/flow/workflows/normalize_flow.go @@ -12,7 +12,8 @@ import ( "github.com/PeerDB-io/peer-flow/shared" ) -func NormalizeFlowWorkflow(ctx workflow.Context, +func NormalizeFlowWorkflow( + ctx workflow.Context, config *protos.FlowConnectionConfigs, ) (*model.NormalizeFlowResponse, error) { logger := workflow.GetLogger(ctx) @@ -25,10 +26,10 @@ func NormalizeFlowWorkflow(ctx workflow.Context, results := make([]model.NormalizeResponse, 0, 4) errors := make([]string, 0) syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName) - var tableNameSchemaMapping map[string]*protos.TableSchema var stopLoop, canceled bool var lastSyncBatchID, syncBatchID int64 + var tableNameSchemaMapping map[string]*protos.TableSchema lastSyncBatchID = -1 syncBatchID = -1 selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize") diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index b59eba7430..7656ca47e6 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -7,7 +7,6 @@ import ( "strings" "time" - "github.com/google/uuid" "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -202,16 +201,12 @@ func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context, // getPartitionWorkflowID returns the child workflow ID for a new sync flow. func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string, error) { - childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, uuid.New().String()) - }) - - var childWorkflowID string - if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil { + id, err := GetUUID(ctx) + if err != nil { return "", fmt.Errorf("failed to get child workflow ID: %w", err) } - return childWorkflowID, nil + return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, id), nil } // startChildWorkflow starts a single child workflow. @@ -433,6 +428,7 @@ func QRepFlowWorkflow( // 4. Wait for all the workflows to complete. // 5. Sleep for a while and repeat the loop. + originalRunID := workflow.GetInfo(ctx).OriginalRunID ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := workflow.GetLogger(ctx) @@ -455,17 +451,7 @@ func QRepFlowWorkflow( return fmt.Errorf("failed to register query handler: %w", err) } - // get qrep run uuid via side-effect - runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - return uuid.New().String() - }) - - var runUUID string - if err := runUUIDSideEffect.Get(&runUUID); err != nil { - return fmt.Errorf("failed to get run uuid: %w", err) - } - - q := NewQRepFlowExecution(ctx, config, runUUID) + q := NewQRepFlowExecution(ctx, config, originalRunID) err = q.SetupWatermarkTableOnDestination(ctx) if err != nil { diff --git a/flow/workflows/snapshot_flow.go b/flow/workflows/snapshot_flow.go index f7de2f8850..ed38066015 100644 --- a/flow/workflows/snapshot_flow.go +++ b/flow/workflows/snapshot_flow.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/google/uuid" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -97,18 +96,11 @@ func (s *SnapshotFlowExecution) cloneTable( srcName := mapping.SourceTableIdentifier dstName := mapping.DestinationTableIdentifier - childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, uuid.New().String()) - reg := regexp.MustCompile("[^a-zA-Z0-9]+") - return reg.ReplaceAllString(childWorkflowID, "_") - }) + originalRunID := workflow.GetInfo(ctx).OriginalRunID - var childWorkflowID string - if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil { - s.logger.Error(fmt.Sprintf("failed to get child id for source table %s and destination table %s", - srcName, dstName), slog.Any("error", err), cloneLog) - return fmt.Errorf("failed to get child workflow ID: %w", err) - } + childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, originalRunID) + reg := regexp.MustCompile("[^a-zA-Z0-9_]+") + childWorkflowID = reg.ReplaceAllString(childWorkflowID, "_") s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s", childWorkflowID, srcName, dstName), cloneLog) diff --git a/flow/workflows/xmin_flow.go b/flow/workflows/xmin_flow.go index ac01270ee3..52fc9cd47f 100644 --- a/flow/workflows/xmin_flow.go +++ b/flow/workflows/xmin_flow.go @@ -6,7 +6,6 @@ import ( "log/slog" "time" - "github.com/google/uuid" "go.temporal.io/sdk/workflow" "github.com/PeerDB-io/peer-flow/generated/protos" @@ -18,6 +17,7 @@ func XminFlowWorkflow( config *protos.QRepConfig, state *protos.QRepFlowState, ) error { + originalRunID := workflow.GetInfo(ctx).OriginalRunID ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) // Support a Query for the current state of the xmin flow. err := setWorkflowQueries(ctx, state) @@ -25,16 +25,7 @@ func XminFlowWorkflow( return err } - // get xmin run uuid via side-effect - runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - return uuid.New().String() - }) - var runUUID string - if err := runUUIDSideEffect.Get(&runUUID); err != nil { - return fmt.Errorf("failed to get run uuid: %w", err) - } - - q := NewQRepFlowExecution(ctx, config, runUUID) + q := NewQRepFlowExecution(ctx, config, originalRunID) err = q.SetupWatermarkTableOnDestination(ctx) if err != nil {