Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefer OriginalRunID to generating UUID with side effect #1334

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
})
defer shutdown()

errGroup, errCtx := errgroup.WithContext(ctx)

batchSize := input.SyncFlowOptions.BatchSize
if batchSize <= 0 {
batchSize = 1_000_000
Expand All @@ -251,6 +249,8 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
recordBatch := model.NewCDCRecordStream()
startTime := time.Now()
flowName := input.FlowConnectionConfigs.FlowJobName

errGroup, errCtx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return srcConn.PullRecords(errCtx, a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type startReplicationOpts struct {
}

// Create a new PostgresCDCSource
func (c *PostgresConnector) NewPostgresCDCSource(ctx context.Context, cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource {
return &PostgresCDCSource{
PostgresConnector: c,
replConn: cdcConfig.Connection,
Expand All @@ -86,7 +86,7 @@ func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u
SELECT parent.oid AS parentrelid, child.oid AS childrelid
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
WHERE parent.relkind='p';
`

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo
return fmt.Errorf("error getting child to parent relid map: %w", err)
}

cdc := c.NewPostgresCDCSource(ctx, &PostgresCDCConfig{
cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{
Connection: replConn,
SrcTableIDNameMapping: req.SrcTableIDNameMapping,
Slot: slotName,
Expand Down Expand Up @@ -922,7 +922,6 @@ func (c *PostgresConnector) HandleSlotInfo(
return monitoring.AppendSlotSizeInfo(ctx, catalogPool, peerName, slotInfo[0])
}

// GetLastOffset returns the last synced offset for a job.
func getOpenConnectionsForUser(ctx context.Context, conn *pgx.Conn, user string) (*protos.GetOpenConnectionsForUserResult, error) {
row := conn.QueryRow(ctx, getNumConnectionsForUser, user)

Expand Down
3 changes: 1 addition & 2 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,9 +543,8 @@ func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnv
&slog.HandlerOptions{Level: slog.LevelWarn},
),
))
tLogger := TStructuredLogger{logger: logger}
testSuite.SetLogger(&TStructuredLogger{logger: logger})

testSuite.SetLogger(&tLogger)
env := testSuite.NewTestWorkflowEnvironment()
RegisterWorkflowsAndActivities(t, env)
env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow)
Expand Down
65 changes: 32 additions & 33 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@ func (s *CDCFlowWorkflowState) TruncateProgress(logger log.Logger) {
}

if s.SyncFlowErrors != nil {
logger.Warn("SyncFlowErrors: ", s.SyncFlowErrors)
logger.Warn("SyncFlowErrors", slog.Any("errors", s.SyncFlowErrors))
s.SyncFlowErrors = nil
}

if s.NormalizeFlowErrors != nil {
logger.Warn("NormalizeFlowErrors: ", s.NormalizeFlowErrors)
logger.Warn("NormalizeFlowErrors", slog.Any("errors", s.NormalizeFlowErrors))
s.NormalizeFlowErrors = nil
}
}
Expand All @@ -119,21 +119,24 @@ func NewCDCFlowWorkflowExecution(ctx workflow.Context) *CDCFlowWorkflowExecution
}
}

func GetChildWorkflowID(
ctx workflow.Context,
prefix string,
peerFlowName string,
) (string, error) {
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid.New().String())
func GetUUID(ctx workflow.Context) (string, error) {
uuidSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
return "", fmt.Errorf("failed to get child workflow ID: %w", err)
var uuidString string
if err := uuidSideEffect.Get(&uuidString); err != nil {
return "", fmt.Errorf("failed to generate UUID: %w", err)
}
return uuidString, nil
}

return childWorkflowID, nil
func GetChildWorkflowID(
prefix string,
peerFlowName string,
uuid string,
) string {
return fmt.Sprintf("%s-%s-%s", prefix, peerFlowName, uuid)
}

// CDCFlowWorkflowResult is the result of the PeerFlowWorkflow.
Expand Down Expand Up @@ -164,17 +167,20 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
return err
}

additionalTablesUUID, err := GetUUID(ctx)
if err != nil {
return err
}
childAdditionalTablesCDCFlowID := GetChildWorkflowID("additional-cdc-flow", cfg.FlowJobName, additionalTablesUUID)
if err != nil {
return err
}

additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs)
additionalTablesWorkflowCfg.DoInitialSnapshot = true
additionalTablesWorkflowCfg.InitialSnapshotOnly = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables

childAdditionalTablesCDCFlowID,
err := GetChildWorkflowID(ctx, "additional-cdc-flow", additionalTablesWorkflowCfg.FlowJobName)
if err != nil {
return err
}

// execute the sync flow as a child workflow
childAdditionalTablesCDCFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: childAdditionalTablesCDCFlowID,
Expand Down Expand Up @@ -248,6 +254,8 @@ func CDCFlowWorkflowWithConfig(
shared.MirrorNameSearchAttribute: cfg.FlowJobName,
}

originalRunID := workflow.GetInfo(ctx).OriginalRunID

// we cannot skip SetupFlow if SnapshotFlow did not complete in cases where Resync is enabled
// because Resync modifies TableMappings before Setup and also before Snapshot
// for safety, rely on the idempotency of SetupFlow instead
Expand All @@ -268,10 +276,7 @@ func CDCFlowWorkflowWithConfig(

// start the SetupFlow workflow as a child workflow, and wait for it to complete
// it should return the table schema for the source peer
setupFlowID, err := GetChildWorkflowID(ctx, "setup-flow", cfg.FlowJobName)
if err != nil {
return state, err
}
setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID)
childSetupFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: setupFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand All @@ -292,10 +297,7 @@ func CDCFlowWorkflowWithConfig(
state.CurrentFlowStatus = protos.FlowStatus_STATUS_SNAPSHOT

// next part of the setup is to snapshot-initial-copy and setup replication slots.
snapshotFlowID, err := GetChildWorkflowID(ctx, "snapshot-flow", cfg.FlowJobName)
if err != nil {
return state, err
}
snapshotFlowID := GetChildWorkflowID("snapshot-flow", cfg.FlowJobName, originalRunID)

taskQueue, err := shared.GetPeerFlowTaskQueueName(shared.SnapshotFlowTaskQueueID)
if err != nil {
Expand Down Expand Up @@ -376,11 +378,7 @@ func CDCFlowWorkflowWithConfig(
currentSyncFlowNum := 0
totalRecordsSynced := int64(0)

normalizeFlowID, err := GetChildWorkflowID(ctx, "normalize-flow", cfg.FlowJobName)
if err != nil {
return state, err
}

normalizeFlowID := GetChildWorkflowID("normalize-flow", cfg.FlowJobName, originalRunID)
childNormalizeFlowOpts := workflow.ChildWorkflowOptions{
WorkflowID: normalizeFlowID,
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL,
Expand Down Expand Up @@ -493,11 +491,12 @@ func CDCFlowWorkflowWithConfig(
}
currentSyncFlowNum++

syncFlowID, err := GetChildWorkflowID(ctx, "sync-flow", cfg.FlowJobName)
syncFlowUUID, err := GetUUID(ctx)
if err != nil {
finishNormalize()
return state, err
}
syncFlowID := GetChildWorkflowID("sync-flow", cfg.FlowJobName, syncFlowUUID)

// execute the sync flow as a child workflow
childSyncFlowOpts := workflow.ChildWorkflowOptions{
Expand Down
5 changes: 3 additions & 2 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
"github.com/PeerDB-io/peer-flow/shared"
)

func NormalizeFlowWorkflow(ctx workflow.Context,
func NormalizeFlowWorkflow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) (*model.NormalizeFlowResponse, error) {
logger := workflow.GetLogger(ctx)
Expand All @@ -25,10 +26,10 @@ func NormalizeFlowWorkflow(ctx workflow.Context,
results := make([]model.NormalizeResponse, 0, 4)
errors := make([]string, 0)
syncChan := workflow.GetSignalChannel(ctx, shared.NormalizeSyncSignalName)
var tableNameSchemaMapping map[string]*protos.TableSchema

var stopLoop, canceled bool
var lastSyncBatchID, syncBatchID int64
var tableNameSchemaMapping map[string]*protos.TableSchema
lastSyncBatchID = -1
syncBatchID = -1
selector := workflow.NewNamedSelector(ctx, config.FlowJobName+"-normalize")
Expand Down
24 changes: 5 additions & 19 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"time"

"github.com/google/uuid"
"go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
Expand Down Expand Up @@ -202,16 +201,12 @@ func (q *QRepPartitionFlowExecution) ReplicatePartitions(ctx workflow.Context,

// getPartitionWorkflowID returns the child workflow ID for a new sync flow.
func (q *QRepFlowExecution) getPartitionWorkflowID(ctx workflow.Context) (string, error) {
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, uuid.New().String())
})

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
id, err := GetUUID(ctx)
if err != nil {
return "", fmt.Errorf("failed to get child workflow ID: %w", err)
}

return childWorkflowID, nil
return fmt.Sprintf("qrep-part-%s-%s", q.config.FlowJobName, id), nil
}

// startChildWorkflow starts a single child workflow.
Expand Down Expand Up @@ -433,6 +428,7 @@ func QRepFlowWorkflow(
// 4. Wait for all the workflows to complete.
// 5. Sleep for a while and repeat the loop.

originalRunID := workflow.GetInfo(ctx).OriginalRunID
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
logger := workflow.GetLogger(ctx)

Expand All @@ -455,17 +451,7 @@ func QRepFlowWorkflow(
return fmt.Errorf("failed to register query handler: %w", err)
}

// get qrep run uuid via side-effect
runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})

var runUUID string
if err := runUUIDSideEffect.Get(&runUUID); err != nil {
return fmt.Errorf("failed to get run uuid: %w", err)
}

q := NewQRepFlowExecution(ctx, config, runUUID)
q := NewQRepFlowExecution(ctx, config, originalRunID)

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
Expand Down
16 changes: 4 additions & 12 deletions flow/workflows/snapshot_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"strings"
"time"

"github.com/google/uuid"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -97,18 +96,11 @@ func (s *SnapshotFlowExecution) cloneTable(

srcName := mapping.SourceTableIdentifier
dstName := mapping.DestinationTableIdentifier
childWorkflowIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, uuid.New().String())
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
return reg.ReplaceAllString(childWorkflowID, "_")
})
originalRunID := workflow.GetInfo(ctx).OriginalRunID

var childWorkflowID string
if err := childWorkflowIDSideEffect.Get(&childWorkflowID); err != nil {
s.logger.Error(fmt.Sprintf("failed to get child id for source table %s and destination table %s",
srcName, dstName), slog.Any("error", err), cloneLog)
return fmt.Errorf("failed to get child workflow ID: %w", err)
}
childWorkflowID := fmt.Sprintf("clone_%s_%s_%s", flowName, dstName, originalRunID)
reg := regexp.MustCompile("[^a-zA-Z0-9_]+")
childWorkflowID = reg.ReplaceAllString(childWorkflowID, "_")

s.logger.Info(fmt.Sprintf("Obtained child id %s for source table %s and destination table %s",
childWorkflowID, srcName, dstName), cloneLog)
Expand Down
13 changes: 2 additions & 11 deletions flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log/slog"
"time"

"github.com/google/uuid"
"go.temporal.io/sdk/workflow"

"github.com/PeerDB-io/peer-flow/generated/protos"
Expand All @@ -18,23 +17,15 @@ func XminFlowWorkflow(
config *protos.QRepConfig,
state *protos.QRepFlowState,
) error {
originalRunID := workflow.GetInfo(ctx).OriginalRunID
ctx = workflow.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
// Support a Query for the current state of the xmin flow.
err := setWorkflowQueries(ctx, state)
if err != nil {
return err
}

// get xmin run uuid via side-effect
runUUIDSideEffect := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return uuid.New().String()
})
var runUUID string
if err := runUUIDSideEffect.Get(&runUUID); err != nil {
return fmt.Errorf("failed to get run uuid: %w", err)
}

q := NewQRepFlowExecution(ctx, config, runUUID)
q := NewQRepFlowExecution(ctx, config, originalRunID)

err = q.SetupWatermarkTableOnDestination(ctx)
if err != nil {
Expand Down
Loading