From 0d21eb890ec663244d37657550470501fcf61aa4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 20:15:50 +0000 Subject: [PATCH] one-sync --- flow/activities/flowable.go | 75 +++++++++++++- flow/cmd/worker.go | 6 +- flow/connectors/core.go | 4 + flow/connectors/postgres/cdc.go | 91 ++--------------- flow/connectors/postgres/postgres.go | 143 +++++++++++++++++++++++++-- flow/e2e/test_utils.go | 4 + flow/workflows/cdc_flow.go | 85 +++++++++++++--- 7 files changed, 296 insertions(+), 112 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 56e4101ce0..f9a373a05c 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -42,6 +42,8 @@ type SlotSnapshotSignal struct { type FlowableActivity struct { CatalogPool *pgxpool.Pool Alerter *alerting.Alerter + CdcCacheRw sync.RWMutex + CdcCache map[string]connectors.CDCPullConnector } func (a *FlowableActivity) CheckConnection( @@ -204,10 +206,71 @@ func (a *FlowableActivity) CreateNormalizedTable( }, nil } +func (a *FlowableActivity) MaintainPull( + ctx context.Context, + config *protos.FlowConnectionConfigs, + sessionID string, +) error { + srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) + if err != nil { + return err + } + defer connectors.CloseConnector(ctx, srcConn) + + if err := srcConn.SetupReplConn(ctx); err != nil { + return err + } + + a.CdcCacheRw.Lock() + a.CdcCache[sessionID] = srcConn + a.CdcCacheRw.Unlock() + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + activity.RecordHeartbeat(ctx, "keep session alive") + if err := srcConn.ReplPing(ctx); err != nil { + activity.GetLogger(ctx).Error("Failed to send keep alive ping to replication connection", slog.Any("error", err)) + } + case <-ctx.Done(): + a.CdcCacheRw.Lock() + delete(a.CdcCache, sessionID) + a.CdcCacheRw.Unlock() + return nil + } + } +} + +func (a *FlowableActivity) WaitForSourceConnector(ctx context.Context, sessionID string) error { + logger := activity.GetLogger(ctx) + attempt := 0 + for { + a.CdcCacheRw.RLock() + _, ok := a.CdcCache[sessionID] + a.CdcCacheRw.RUnlock() + if ok { + return nil + } + activity.RecordHeartbeat(ctx, "wait another second for source connector") + attempt += 1 + if attempt > 2 { + logger.Info("waiting on source connector setup", slog.Int("attempt", attempt)) + } + if err := ctx.Err(); err != nil { + return err + } + time.Sleep(time.Second) + } +} + func (a *FlowableActivity) SyncFlow( ctx context.Context, config *protos.FlowConnectionConfigs, options *protos.SyncFlowOptions, + sessionID string, ) (*model.SyncResponse, error) { flowName := config.FlowJobName ctx = context.WithValue(ctx, shared.FlowNameKey, flowName) @@ -225,11 +288,15 @@ func (a *FlowableActivity) SyncFlow( tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - srcConn, err := connectors.GetCDCPullConnector(ctx, config.Source) - if err != nil { - return nil, fmt.Errorf("failed to get source connector: %w", err) + a.CdcCacheRw.RLock() + srcConn, ok := a.CdcCache[sessionID] + a.CdcCacheRw.RUnlock() + if !ok { + return nil, errors.New("source connector missing from CdcCache") + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, err } - defer connectors.CloseConnector(ctx, srcConn) shutdown := utils.HeartbeatRoutine(ctx, func() string { return fmt.Sprintf("transferring records for job - %s", flowName) diff --git a/flow/cmd/worker.go b/flow/cmd/worker.go index 753ef14f14..ee9218a9da 100644 --- a/flow/cmd/worker.go +++ b/flow/cmd/worker.go @@ -16,6 +16,7 @@ import ( "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" + "github.com/PeerDB-io/peer-flow/connectors" utils "github.com/PeerDB-io/peer-flow/connectors/utils/catalog" "github.com/PeerDB-io/peer-flow/logger" "github.com/PeerDB-io/peer-flow/shared" @@ -127,7 +128,9 @@ func WorkerMain(opts *WorkerOptions) error { return queueErr } - w := worker.New(c, taskQueue, worker.Options{}) + w := worker.New(c, taskQueue, worker.Options{ + EnableSessionWorker: true, + }) peerflow.RegisterFlowWorkerWorkflows(w) alerter, err := alerting.NewAlerter(conn) @@ -138,6 +141,7 @@ func WorkerMain(opts *WorkerOptions) error { w.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, Alerter: alerter, + CdcCache: make(map[string]connectors.CDCPullConnector), }) err = w.Run(worker.InterruptCh()) diff --git a/flow/connectors/core.go b/flow/connectors/core.go index 889250d804..b166efc745 100644 --- a/flow/connectors/core.go +++ b/flow/connectors/core.go @@ -39,6 +39,10 @@ type CDCPullConnector interface { *protos.EnsurePullabilityBatchOutput, error) // Methods related to retrieving and pushing records for this connector as a source and destination. + SetupReplConn(context.Context) error + + // Ping source to keep connection alive. Can be called concurrently with PullRecords; skips ping in that case. + ReplPing(context.Context) error // PullRecords pulls records from the source, and returns a RecordBatch. // This method should be idempotent, and should be able to be called multiple times with the same request. diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 80bb00e5d3..6517ecd011 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -3,7 +3,6 @@ package connpostgres import ( "context" "crypto/sha256" - "errors" "fmt" "log/slog" "time" @@ -28,7 +27,6 @@ import ( type PostgresCDCSource struct { *PostgresConnector - replConn *pgx.Conn SrcTableIDNameMapping map[uint32]string TableNameMapping map[string]model.NameAndExclude slot string @@ -46,7 +44,6 @@ type PostgresCDCSource struct { } type PostgresCDCConfig struct { - Connection *pgx.Conn Slot string Publication string SrcTableIDNameMapping map[uint32]string @@ -67,21 +64,20 @@ type startReplicationOpts struct { func (c *PostgresConnector) NewPostgresCDCSource(cdcConfig *PostgresCDCConfig) *PostgresCDCSource { return &PostgresCDCSource{ PostgresConnector: c, - replConn: cdcConfig.Connection, SrcTableIDNameMapping: cdcConfig.SrcTableIDNameMapping, TableNameMapping: cdcConfig.TableNameMapping, slot: cdcConfig.Slot, publication: cdcConfig.Publication, relationMessageMapping: cdcConfig.RelationMessageMapping, - typeMap: pgtype.NewMap(), childToParentRelIDMapping: cdcConfig.ChildToParentRelIDMap, + typeMap: pgtype.NewMap(), commitLock: false, catalogPool: cdcConfig.CatalogPool, flowJobName: cdcConfig.FlowJobName, } } -func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) { +func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]uint32, error) { query := ` SELECT parent.oid AS parentrelid, child.oid AS childrelid FROM pg_inherits @@ -94,7 +90,6 @@ func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u if err != nil { return nil, fmt.Errorf("error querying for child to parent relid map: %w", err) } - defer rows.Close() childToParentRelIDMap := make(map[uint32]uint32) @@ -113,85 +108,14 @@ func getChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u // PullRecords pulls records from the cdc stream func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullRecordsRequest) error { - replicationOpts, err := p.replicationOptions() - if err != nil { - return fmt.Errorf("error getting replication options: %w", err) - } - - pgConn := p.replConn.PgConn() - - // start replication - var clientXLogPos, startLSN pglogrepl.LSN - if req.LastOffset > 0 { - p.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) - clientXLogPos = pglogrepl.LSN(req.LastOffset) - startLSN = clientXLogPos + 1 - } - - opts := startReplicationOpts{ - conn: pgConn, - startLSN: startLSN, - replicationOpts: *replicationOpts, - } - - err = p.startReplication(ctx, opts) - if err != nil { - return fmt.Errorf("error starting replication: %w", err) - } - - p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, startLSN)) - - return p.consumeStream(ctx, pgConn, req, clientXLogPos, req.RecordStream) -} - -func (p *PostgresCDCSource) startReplication(ctx context.Context, opts startReplicationOpts) error { - err := pglogrepl.StartReplication(ctx, opts.conn, p.slot, opts.startLSN, opts.replicationOpts) - if err != nil { - p.logger.Error("error starting replication", slog.Any("error", err)) - return fmt.Errorf("error starting replication at startLsn - %d: %w", opts.startLSN, err) - } - - p.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", p.slot, opts.startLSN)) - return nil -} - -func (p *PostgresCDCSource) replicationOptions() (*pglogrepl.StartReplicationOptions, error) { - pluginArguments := []string{ - "proto_version '1'", - } - - if p.publication != "" { - pubOpt := fmt.Sprintf("publication_names '%s'", p.publication) - pluginArguments = append(pluginArguments, pubOpt) - } else { - return nil, errors.New("publication name is not set") - } - - return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil -} - -// start consuming the cdc stream -func (p *PostgresCDCSource) consumeStream( - ctx context.Context, - conn *pgconn.PgConn, - req *model.PullRecordsRequest, - clientXLogPos pglogrepl.LSN, - records *model.CDCRecordStream, -) error { - defer func() { - timeout, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - err := conn.Close(timeout) - if err != nil { - p.logger.Error("error closing replication connection", slog.Any("error", err)) - } - cancel() - }() - + conn := p.replConn.PgConn() + records := req.RecordStream // clientXLogPos is the last checkpoint id, we need to ack that we have processed // until clientXLogPos each time we send a standby status update. // consumedXLogPos is the lsn that has been committed on the destination. - consumedXLogPos := pglogrepl.LSN(0) - if clientXLogPos > 0 { + var clientXLogPos, consumedXLogPos pglogrepl.LSN + if req.LastOffset > 0 { + clientXLogPos = pglogrepl.LSN(req.LastOffset) consumedXLogPos = clientXLogPos err := pglogrepl.SendStandbyStatusUpdate(ctx, conn, @@ -300,7 +224,6 @@ func (p *PostgresCDCSource) consumeStream( var receiveCtx context.Context var cancel context.CancelFunc - if cdcRecordsStorage.IsEmpty() { receiveCtx, cancel = context.WithCancel(ctx) } else { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index c44923926a..6ce8d90c4d 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -7,9 +7,11 @@ import ( "log/slog" "regexp" "strings" + "sync" "time" "github.com/google/uuid" + "github.com/jackc/pglogrepl" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" @@ -31,12 +33,21 @@ type PostgresConnector struct { ssh *SSHTunnel conn *pgx.Conn replConfig *pgx.ConnConfig + replConn *pgx.Conn + replState *ReplState + replLock sync.Mutex customTypesMapping map[uint32]string metadataSchema string hushWarnOID map[uint32]struct{} logger log.Logger } +type ReplState struct { + Slot string + Publication string + Offset int64 +} + func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) (*PostgresConnector, error) { connectionString := utils.GetPGConnectionString(pgConfig) @@ -82,6 +93,8 @@ func NewPostgresConnector(ctx context.Context, pgConfig *protos.PostgresConfig) ssh: tunnel, conn: conn, replConfig: replConfig, + replState: nil, + replLock: sync.Mutex{}, customTypesMapping: customTypeMap, metadataSchema: metadataSchema, hushWarnOID: make(map[uint32]struct{}), @@ -95,19 +108,124 @@ func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, erro logger.LoggerFromCtx(ctx).Error("failed to create replication connection", "error", err) return nil, fmt.Errorf("failed to create replication connection: %w", err) } - return conn, nil } +func (c *PostgresConnector) SetupReplConn(ctx context.Context) error { + conn, err := c.CreateReplConn(ctx) + if err != nil { + return err + } + c.replConn = conn + return nil +} + +// To keep connection alive between sync batches. +// By default postgres drops connection after 1 minute of inactivity. +func (c *PostgresConnector) ReplPing(ctx context.Context) error { + if c.replLock.TryLock() { + defer c.replLock.Unlock() + if c.replState != nil { + return pglogrepl.SendStandbyStatusUpdate( + ctx, + c.replConn.PgConn(), + pglogrepl.StandbyStatusUpdate{WALWritePosition: pglogrepl.LSN(c.replState.Offset)}, + ) + } + } + return nil +} + +func (c *PostgresConnector) MaybeStartReplication( + ctx context.Context, + slotName string, + publicationName string, + req *model.PullRecordsRequest, +) error { + if c.replState != nil && (c.replState.Offset != req.LastOffset || + c.replState.Slot != slotName || + c.replState.Publication != publicationName) { + return fmt.Errorf("replState changed, reset connector. slot name: old=%s new=%s, publication: old=%s new=%s, offset: old=%d new=%d", + c.replState.Slot, slotName, c.replState.Publication, publicationName, c.replState.Offset, req.LastOffset, + ) + } + + if c.replState == nil { + replicationOpts, err := c.replicationOptions(publicationName) + if err != nil { + return fmt.Errorf("error getting replication options: %w", err) + } + + var startLSN pglogrepl.LSN + if req.LastOffset > 0 { + c.logger.Info("starting replication from last sync state", slog.Int64("last checkpoint", req.LastOffset)) + startLSN = pglogrepl.LSN(req.LastOffset + 1) + } + + opts := startReplicationOpts{ + conn: c.replConn.PgConn(), + startLSN: startLSN, + replicationOpts: *replicationOpts, + } + + err = c.startReplication(ctx, slotName, opts) + if err != nil { + return fmt.Errorf("error starting replication: %w", err) + } + + c.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", slotName, startLSN)) + c.replState = &ReplState{ + Slot: slotName, + Publication: publicationName, + Offset: req.LastOffset, + } + } + return nil +} + +func (c *PostgresConnector) startReplication(ctx context.Context, slotName string, opts startReplicationOpts) error { + err := pglogrepl.StartReplication(ctx, opts.conn, slotName, opts.startLSN, opts.replicationOpts) + if err != nil { + c.logger.Error("error starting replication", slog.Any("error", err)) + return fmt.Errorf("error starting replication at startLsn - %d: %w", opts.startLSN, err) + } + + c.logger.Info(fmt.Sprintf("started replication on slot %s at startLSN: %d", slotName, opts.startLSN)) + return nil +} + +func (c *PostgresConnector) replicationOptions(publicationName string) (*pglogrepl.StartReplicationOptions, error) { + pluginArguments := []string{ + "proto_version '1'", + } + + if publicationName != "" { + pubOpt := fmt.Sprintf("publication_names %s", QuoteLiteral(publicationName)) + pluginArguments = append(pluginArguments, pubOpt) + } else { + return nil, errors.New("publication name is not set") + } + + return &pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments}, nil +} + // Close closes all connections. func (c *PostgresConnector) Close() error { + var connerr, replerr error if c != nil { - timeout, cancel := context.WithTimeout(context.Background(), time.Minute) + timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - c.conn.Close(timeout) + connerr = c.conn.Close(timeout) + + if c.replConn != nil { + timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + replerr = c.replConn.Close(timeout) + } + c.ssh.Close() } - return nil + return errors.Join(connerr, replerr) } func (c *PostgresConnector) Conn() *pgx.Conn { @@ -215,19 +333,20 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo c.logger.Info("PullRecords: performed checks for slot and publication") - replConn, err := c.CreateReplConn(ctx) + childToParentRelIDMap, err := GetChildToParentRelIDMap(ctx, c.conn) if err != nil { - return err + return fmt.Errorf("error getting child to parent relid map: %w", err) } - defer replConn.Close(ctx) - childToParentRelIDMap, err := getChildToParentRelIDMap(ctx, replConn) + c.replLock.Lock() + defer c.replLock.Unlock() + + err = c.MaybeStartReplication(ctx, slotName, publicationName, req) if err != nil { - return fmt.Errorf("error getting child to parent relid map: %w", err) + return err } cdc := c.NewPostgresCDCSource(&PostgresCDCConfig{ - Connection: replConn, SrcTableIDNameMapping: req.SrcTableIDNameMapping, Slot: slotName, Publication: publicationName, @@ -243,10 +362,14 @@ func (c *PostgresConnector) PullRecords(ctx context.Context, catalogPool *pgxpoo return err } + req.RecordStream.Close() + c.replState.Offset = req.RecordStream.GetLastCheckpoint() + latestLSN, err := c.getCurrentLSN(ctx) if err != nil { return fmt.Errorf("failed to get current LSN: %w", err) } + err = monitoring.UpdateLatestLSNAtSourceForCDCFlow(ctx, catalogPool, req.FlowJobName, int64(latestLSN)) if err != nil { return fmt.Errorf("failed to update latest LSN at source for CDC flow: %w", err) diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 505ec2055a..7c2b2a8968 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -20,8 +20,10 @@ import ( "github.com/stretchr/testify/require" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" + "go.temporal.io/sdk/worker" "github.com/PeerDB-io/peer-flow/activities" + "github.com/PeerDB-io/peer-flow/connectors" connpostgres "github.com/PeerDB-io/peer-flow/connectors/postgres" connsnowflake "github.com/PeerDB-io/peer-flow/connectors/snowflake" "github.com/PeerDB-io/peer-flow/connectors/utils" @@ -67,6 +69,7 @@ func RegisterWorkflowsAndActivities(t *testing.T, env *testsuite.TestWorkflowEnv env.RegisterActivity(&activities.FlowableActivity{ CatalogPool: conn, Alerter: alerter, + CdcCache: make(map[string]connectors.CDCPullConnector), }) env.RegisterActivity(&activities.SnapshotActivity{ Alerter: alerter, @@ -546,6 +549,7 @@ func NewTemporalTestWorkflowEnvironment(t *testing.T) *testsuite.TestWorkflowEnv testSuite.SetLogger(&TStructuredLogger{logger: logger}) env := testSuite.NewTestWorkflowEnvironment() + env.SetWorkerOptions(worker.Options{EnableSessionWorker: true}) RegisterWorkflowsAndActivities(t, env) env.RegisterWorkflow(peerflow.SnapshotFlowWorkflow) return env diff --git a/flow/workflows/cdc_flow.go b/flow/workflows/cdc_flow.go index e48f9e55fd..2206bc71f6 100644 --- a/flow/workflows/cdc_flow.go +++ b/flow/workflows/cdc_flow.go @@ -165,10 +165,10 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont return err } - additionalTablesWorkflowCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) - additionalTablesWorkflowCfg.DoInitialSnapshot = true - additionalTablesWorkflowCfg.InitialSnapshotOnly = true - additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables + additionalTablesCfg := proto.Clone(cfg).(*protos.FlowConnectionConfigs) + additionalTablesCfg.DoInitialSnapshot = true + additionalTablesCfg.InitialSnapshotOnly = true + additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables // execute the sync flow as a child workflow childAdditionalTablesCDCFlowOpts := workflow.ChildWorkflowOptions{ @@ -184,7 +184,7 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont childAdditionalTablesCDCFlowFuture := workflow.ExecuteChildWorkflow( childAdditionalTablesCDCFlowCtx, CDCFlowWorkflowWithConfig, - additionalTablesWorkflowCfg, + additionalTablesCfg, nil, ) var res *CDCFlowWorkflowResult @@ -266,6 +266,7 @@ func CDCFlowWorkflowWithConfig( // start the SetupFlow workflow as a child workflow, and wait for it to complete // it should return the table schema for the source peer setupFlowID := GetChildWorkflowID("setup-flow", cfg.FlowJobName, originalRunID) + childSetupFlowOpts := workflow.ChildWorkflowOptions{ WorkflowID: setupFlowID, ParentClosePolicy: enums.PARENT_CLOSE_POLICY_REQUEST_CANCEL, @@ -352,6 +353,55 @@ func CDCFlowWorkflowWithConfig( } } + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 144 * time.Hour, + HeartbeatTimeout: time.Minute, + } + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return nil, err + } + defer workflow.CompleteSession(syncSessionCtx) + sessionInfo := workflow.GetSessionInfo(syncSessionCtx) + + syncCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 72 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain := workflow.ExecuteActivity( + syncCtx, + flowable.MaintainPull, + cfg, + sessionInfo.SessionID, + ) + fSessionSetup := workflow.ExecuteActivity( + syncCtx, + flowable.WaitForSourceConnector, + sessionInfo.SessionID, + ) + + var sessionError error + sessionSelector := workflow.NewNamedSelector(ctx, "Session Setup") + sessionSelector.AddFuture(fMaintain, func(f workflow.Future) { + // MaintainPull should never exit without an error before this point + sessionError = f.Get(syncCtx, nil) + }) + sessionSelector.AddFuture(fSessionSetup, func(f workflow.Future) { + // Happy path is waiting for this to return without error + sessionError = f.Get(syncCtx, nil) + }) + sessionSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { + sessionError = ctx.Err() + }) + sessionSelector.Select(ctx) + if sessionError != nil { + state.SyncFlowErrors = append(state.SyncFlowErrors, sessionError.Error()) + state.TruncateProgress(w.logger) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, state) + } + currentSyncFlowNum := 0 totalRecordsSynced := int64(0) @@ -366,6 +416,7 @@ func CDCFlowWorkflowWithConfig( WaitForCancellation: true, } normCtx := workflow.WithChildOptions(ctx, childNormalizeFlowOpts) + childNormalizeFlowFuture := workflow.ExecuteChildWorkflow( normCtx, NormalizeFlowWorkflow, @@ -397,12 +448,12 @@ func CDCFlowWorkflowWithConfig( } var canceled bool - signalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) - mainLoopSelector := workflow.NewSelector(ctx) + flowSignalChan := workflow.GetSignalChannel(ctx, shared.FlowSignalName) + mainLoopSelector := workflow.NewNamedSelector(ctx, "Main Loop") mainLoopSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) { canceled = true }) - mainLoopSelector.AddReceive(signalChan, func(c workflow.ReceiveChannel, _ bool) { + mainLoopSelector.AddReceive(flowSignalChan, func(c workflow.ReceiveChannel, _ bool) { var signalVal shared.CDCFlowSignal c.ReceiveAsync(&signalVal) state.ActiveSignal = shared.FlowSignalHandler(state.ActiveSignal, signalVal, w.logger) @@ -466,7 +517,8 @@ func CDCFlowWorkflowWithConfig( " limit on the number of syncflows to be executed: ", currentSyncFlowNum) break } - currentSyncFlowNum++ + currentSyncFlowNum += 1 + w.logger.Info("executing sync flow", slog.Int("count", currentSyncFlowNum), slog.String("flowName", cfg.FlowJobName)) // execute the sync flow syncFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ @@ -476,9 +528,9 @@ func CDCFlowWorkflowWithConfig( }) w.logger.Info("executing sync flow", slog.String("flowName", cfg.FlowJobName)) - syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions) + syncFlowFuture := workflow.ExecuteActivity(syncFlowCtx, flowable.SyncFlow, cfg, state.SyncFlowOptions, sessionInfo.SessionID) - var syncDone bool + var syncDone, syncErr bool var normalizeSignalError error normDone := normWaitChan == nil mainLoopSelector.AddFuture(syncFlowFuture, func(f workflow.Future) { @@ -486,8 +538,9 @@ func CDCFlowWorkflowWithConfig( var childSyncFlowRes *model.SyncResponse if err := f.Get(ctx, &childSyncFlowRes); err != nil { - w.logger.Error("failed to execute sync flow: ", err) + w.logger.Error("failed to execute sync flow", slog.Any("error", err)) state.SyncFlowErrors = append(state.SyncFlowErrors, err.Error()) + syncErr = true } else if childSyncFlowRes != nil { state.SyncFlowStatuses = append(state.SyncFlowStatuses, childSyncFlowRes) state.SyncFlowOptions.RelationMessageMapping = childSyncFlowRes.RelationMessageMapping @@ -547,8 +600,14 @@ func CDCFlowWorkflowWithConfig( if canceled { break } + if syncErr { + state.TruncateProgress(w.logger) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, state) + } if normalizeSignalError != nil { - return state, normalizeSignalError + state.NormalizeFlowErrors = append(state.NormalizeFlowErrors, normalizeSignalError.Error()) + state.TruncateProgress(w.logger) + return state, workflow.NewContinueAsNewError(ctx, CDCFlowWorkflowWithConfig, cfg, state) } if !normDone { normWaitChan.Receive(ctx, nil)