Skip to content

Commit

Permalink
Prefer OriginalRunID to generating UUID with side effect (#1334)
Browse files Browse the repository at this point in the history
Less side effects,
less error handling,
can correlate different workflows with same run id

Also pull in some other cleanup from #1211
  • Loading branch information
serprex authored Feb 20, 2024
1 parent 5cdc27d commit 8b591fd
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 85 deletions.
4 changes: 2 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
Expand All @@ -251,6 +249,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type startReplicationOpts struct {
}

// Create a new PostgresCDCSource
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
return &PostgresCDCSource{
PostgresConnector: c,
replConn: cdcConfig.Connection,
Expand All @@ -86,7 +86,7 @@ func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u
SELECT parent.oid AS parentrelid, child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind='p';
`

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return fmt.Errorf("error getting child to parent relid map: %w", err)
}

cdc := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{
Connection: replConn,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Expand Down Expand Up @@ -922,7 +922,6 @@ func (c *PostgresConnector) HandleSlotInfo(
return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0])
}

// GetLastOffset returns the last synced offset for a job.
func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) {
row := conn.QueryRow(ctx, getNumConnectionsForUser, user)

Expand Down
3 changes: 1 addition & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,8 @@ func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnv
&slog.HandlerOptions{Level: slog.LevelWarn},
),
))
tLogger := TStructuredLogger{logger: logger}
testSuite.SetLogger(&TStructuredLogger{logger: logger})

testSuite.SetLogger(&tLogger)
env := testSuite.NewTestWorkflowEnvironment()
RegisterWorkflowsAndActivities(t, env)
env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
Expand Down
65 changes: 32 additions & 33 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) {
}

if s.SyncFlowErrors != nil {
logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors)
logger.Warn("SyncFlowErrors", slog.Any("errors", s.SyncFlowErrors))
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
logger.Warn("NormalizeFlowErrors", slog.Any("errors", s.NormalizeFlowErrors))
s.NormalizeFlowErrors = nil
}
}
Expand All @@ -119,21 +119,24 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution
}
}

func GetChildWorkflowID(
ctx workflow.Context,
prefix string,
peerFlowName string,
) (string, error) {
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid.New().String())
func GetUUID(ctx workflow.Context) (string, error) {
uuidSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
return "", fmt.Errorf("failed to get child workflow ID: %w", err)
var uuidString string
if err := uuidSideEffect.Get(&uuidString); err != nil {
return "", fmt.Errorf("failed to generate UUID: %w", err)
}
return uuidString, nil
}

return childWorkflowID, nil
func GetChildWorkflowID(
prefix string,
peerFlowName string,
uuid string,
) string {
return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid)
}

// CDCFlowWorkflowResult is the result of the PeerFlowWorkflow.
Expand Down Expand Up @@ -164,17 +167,20 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
return err
}

additionalTablesUUID, err := GetUUID(ctx)
if err != nil {
return err
}
childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID)
if err != nil {
return err
}

additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialSnapshot = true
additionalTablesWorkflowCfg.InitialSnapshotOnly = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables

childAdditionalTablesCDCFlowID,
err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName)
if err != nil {
return err
}

// execute the sync flow as a child workflow
childAdditionalTablesCDCFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: childAdditionalTablesCDCFlowID,
Expand Down Expand Up @@ -248,6 +254,8 @@ func CDCFlowWorkflowWithConfig(
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

originalRunID := workflow.GetInfo(ctx).OriginalRunID

// we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
Expand All @@ -268,10 +276,7 @@ func CDCFlowWorkflowWithConfig(

// start the SetupFlow workflow as a child workflow, and wait for it to complete
// it should return the table schema for the source peer
setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName)
if err != nil {
return state, err
}
setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID)
childSetupFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: setupFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand All @@ -292,10 +297,7 @@ func CDCFlowWorkflowWithConfig(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName)
if err != nil {
return state, err
}
snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID)

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if err != nil {
Expand Down Expand Up @@ -376,11 +378,7 @@ func CDCFlowWorkflowWithConfig(
currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID)
childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand Down Expand Up @@ -493,11 +491,12 @@ func CDCFlowWorkflowWithConfig(
}
currentSyncFlowNum++

syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName)
syncFlowUUID, err := GetUUID(ctx)
if err != nil {
finishNormalize()
return state, err
}
syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, syncFlowUUID)

// execute the sync flow as a child workflow
childSyncFlowOpts := workflow.ChildWorkflowOptions{
Expand Down
5 changes: 3 additions & 2 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func NormalizeFlowWorkflow(ctx workflow.Context,
func NormalizeFlowWorkflow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeFlowResponse, error) {
logger := workflow.GetLogger(ctx)
Expand All @@ -25,10 +26,10 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
results := make([]model.NormalizeResponse, 0, 4)
errors := make([]string, 0)
syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName)
var tableNameSchemaMapping map[string]*protos.TableSchema

var stopLoop, canceled bool
var lastSyncBatchID, syncBatchID int64
var tableNameSchemaMapping map[string]*protos.TableSchema
lastSyncBatchID = -1
syncBatchID = -1
selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize")
Expand Down
24 changes: 5 additions & 19 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"time"

"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -202,16 +201,12 @@ func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context,

// getPartitionWorkflowID returns the child workflow ID for a new sync flow.
func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string, error) {
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, uuid.New().String())
})

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
id, err := GetUUID(ctx)
if err != nil {
return "", fmt.Errorf("failed to get child workflow ID: %w", err)
}

return childWorkflowID, nil
return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, id), nil
}

// startChildWorkflow starts a single child workflow.
Expand Down Expand Up @@ -433,6 +428,7 @@ func QRepFlowWorkflow(
// 4. Wait for all the workflows to complete.
// 5. Sleep for a while and repeat the loop.

originalRunID := workflow.GetInfo(ctx).OriginalRunID
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := workflow.GetLogger(ctx)

Expand All @@ -455,17 +451,7 @@ func QRepFlowWorkflow(
return fmt.Errorf("failed to register query handler: %w", err)
}

// get qrep run uuid via side-effect
runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})

var runUUID string
if err := runUUIDSideEffect.Get(&runUUID); err != nil {
return fmt.Errorf("failed to get run uuid: %w", err)
}

q := NewQRepFlowExecution(ctx, config, runUUID)
q := NewQRepFlowExecution(ctx, config, originalRunID)

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
Expand Down
16 changes: 4 additions & 12 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"time"

"github.com/google/uuid"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -97,18 +96,11 @@ func (s *SnapshotFlowExecution) cloneTable(

srcName := mapping.SourceTableIdentifier
dstName := mapping.DestinationTableIdentifier
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, uuid.New().String())
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
return reg.ReplaceAllString(childWorkflowID, "_")
})
originalRunID := workflow.GetInfo(ctx).OriginalRunID

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
s.logger.Error(fmt.Sprintf("failed to get child id for source table %s and destination table %s",
srcName, dstName), slog.Any("error", err), cloneLog)
return fmt.Errorf("failed to get child workflow ID: %w", err)
}
childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, originalRunID)
reg := regexp.MustCompile("[^a-zA-Z0-9_]+")
childWorkflowID = reg.ReplaceAllString(childWorkflowID, "_")

s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s",
childWorkflowID, srcName, dstName), cloneLog)
Expand Down
13 changes: 2 additions & 11 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"time"

"github.com/google/uuid"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -18,23 +17,15 @@ func XminFlowWorkflow(
config *protos.QRepConfig,
state *protos.QRepFlowState,
) error {
originalRunID := workflow.GetInfo(ctx).OriginalRunID
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
// Support a Query for the current state of the xmin flow.
err := setWorkflowQueries(ctx, state)
if err != nil {
return err
}

// get xmin run uuid via side-effect
runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})
var runUUID string
if err := runUUIDSideEffect.Get(&runUUID); err != nil {
return fmt.Errorf("failed to get run uuid: %w", err)
}

q := NewQRepFlowExecution(ctx, config, runUUID)
q := NewQRepFlowExecution(ctx, config, originalRunID)

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
Expand Down

0 comments on commit 8b591fd

Please sign in to comment.