diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 288282053c..fb2e3c4b48 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -515,13 +515,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return err } - command := ` - BEGIN; - DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` + command := peerdbenv.PeerDBWALHeartbeatQuery() // run above command for each Postgres peer for _, pgPeer := range pgPeers { activity.RecordHeartbeat(ctx, pgPeer.Name) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index 3a052af371..5edca2aa92 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -97,25 +97,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - var srcConn TPull - if sessionID == "" { - srcConn, err = connectors.GetAs[TPull](ctx, config.Source) - if err != nil { - return nil, err - } - defer connectors.CloseConnector(ctx, srcConn) - - if err := srcConn.SetupReplConn(ctx); err != nil { - return nil, err - } - } else { - srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID) - if err != nil { - return nil, err - } - if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) - } + srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + if err != nil { + return nil, err + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } batchSize := options.BatchSize diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index ad2597478a..1908d43972 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -197,7 +197,11 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( numRows, writeOcfError = p.WriteOCF(ctx, w) }() - _, err = manager.NewUploader(s3svc).Upload(ctx, &s3.PutObjectInput{ + uploader := manager.NewUploader(s3svc, func(u *manager.Uploader) { + u.PartSize = 4 * 1024 * 1024 * 1024 + }) + + _, err = uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: r, diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index aadc38c36a..bcf1ac050e 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -66,11 +66,6 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync() bool { - return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) -} - // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) @@ -106,6 +101,15 @@ func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } +// PEERDB_WAL_HEARTBEAT_QUERY +func PeerDBWALHeartbeatQuery() string { + return GetEnvString("PEERDB_WAL_HEARTBEAT_QUERY", `BEGIN; +DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); +CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); +DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); +END;`) +} + // PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index e421759909..711928ef5c 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -30,38 +30,30 @@ func SyncFlowWorkflow( parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - enableOneSync := GetSideEffect(ctx, func(_ workflow.Context) bool { - return !peerdbenv.PeerDBDisableOneSync() - }) - var fMaintain workflow.Future - var sessionID string - syncSessionCtx := ctx - if enableOneSync { - sessionOptions := &workflow.SessionOptions{ - CreationTimeout: 5 * time.Minute, - ExecutionTimeout: 144 * time.Hour, - HeartbeatTimeout: time.Minute, - } - var err error - syncSessionCtx, err = workflow.CreateSession(ctx, sessionOptions) - if err != nil { - return err - } - defer workflow.CompleteSession(syncSessionCtx) - sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + } - maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 14 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - fMaintain = workflow.ExecuteActivity( - maintainCtx, - flowable.MaintainPull, - config, - sessionID, - ) + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return err } + defer workflow.CompleteSession(syncSessionCtx) + + sessionID := workflow.GetSessionInfo(syncSessionCtx).SessionID + maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain := workflow.ExecuteActivity( + maintainCtx, + flowable.MaintainPull, + config, + sessionID, + ) var stop, syncErr bool currentSyncFlowNum := 0