From 348515872add875a487feaccc38c1b86a37b3880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 27 May 2024 12:31:13 +0000 Subject: [PATCH 1/3] wal heartbeat: allow overriding query with env variable (#1725) To enable heartbeating, set these two environment vars: ```yaml PEERDB_WAL_HEARTBEAT_QUERY: ${PEERDB_WAL_HEARTBEAT_QUERY:-UPDATE _peerdb_heartbeat SET last_heartbeat = now() WHERE id = 1;} PEERDB_ENABLE_WAL_HEARTBEAT: ${PEERDB_ENABLE_WAL_HEARTBEAT:-true} ``` --- flow/activities/flowable.go | 8 +------- flow/peerdbenv/config.go | 9 +++++++++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 6834402b3b..cdb9dbce74 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -512,13 +512,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context) error { return err } - command := ` - BEGIN; - DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); - CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); - DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); - END; - ` + command := peerdbenv.PeerDBWALHeartbeatQuery() // run above command for each Postgres peer for _, pgPeer := range pgPeers { activity.RecordHeartbeat(ctx, pgPeer.Name) diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index aadc38c36a..895b9376a6 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -106,6 +106,15 @@ func PeerDBEnableWALHeartbeat() bool { return getEnvBool("PEERDB_ENABLE_WAL_HEARTBEAT", false) } +// PEERDB_WAL_HEARTBEAT_QUERY +func PeerDBWALHeartbeatQuery() string { + return GetEnvString("PEERDB_WAL_HEARTBEAT_QUERY", `BEGIN; +DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); +CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); +DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); +END;`) +} + // PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE func PeerDBEnableParallelSyncNormalize() bool { return getEnvBool("PEERDB_ENABLE_PARALLEL_SYNC_NORMALIZE", false) From 19ea9ea56bc7342e7210ea282fed7b8aa0fce0f4 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 27 May 2024 09:57:35 -0400 Subject: [PATCH 2/3] Enable oneSync always (#1755) This feature has been stabilized and no longer needs a disable flag. --- flow/activities/flowable_core.go | 27 +++++------------ flow/peerdbenv/config.go | 5 --- flow/workflows/sync_flow.go | 52 ++++++++++++++------------------ 3 files changed, 29 insertions(+), 55 deletions(-) diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index ad3dca378a..e5c56ee59e 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -11,7 +11,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "go.temporal.io/sdk/activity" "go.temporal.io/sdk/log" "go.temporal.io/sdk/temporal" @@ -78,25 +78,12 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon tblNameMapping[v.SourceTableIdentifier] = model.NewNameAndExclude(v.DestinationTableIdentifier, v.Exclude) } - var srcConn TPull - if sessionID == "" { - srcConn, err = connectors.GetAs[TPull](ctx, config.Source) - if err != nil { - return nil, err - } - defer connectors.CloseConnector(ctx, srcConn) - - if err := srcConn.SetupReplConn(ctx); err != nil { - return nil, err - } - } else { - srcConn, err = waitForCdcCache[TPull](ctx, a, sessionID) - if err != nil { - return nil, err - } - if err := srcConn.ConnectionActive(ctx); err != nil { - return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) - } + srcConn, err := waitForCdcCache[TPull](ctx, a, sessionID) + if err != nil { + return nil, err + } + if err := srcConn.ConnectionActive(ctx); err != nil { + return nil, temporal.NewNonRetryableApplicationError("connection to source down", "disconnect", nil) } shutdown := utils.HeartbeatRoutine(ctx, func() string { diff --git a/flow/peerdbenv/config.go b/flow/peerdbenv/config.go index 895b9376a6..bcf1ac050e 100644 --- a/flow/peerdbenv/config.go +++ b/flow/peerdbenv/config.go @@ -66,11 +66,6 @@ func PeerDBCDCDiskSpillMemPercentThreshold() int { return getEnvInt("PEERDB_CDC_DISK_SPILL_MEM_PERCENT_THRESHOLD", -1) } -// PEERDB_DISABLE_ONE_SYNC -func PeerDBDisableOneSync() bool { - return getEnvBool("PEERDB_DISABLE_ONE_SYNC", false) -} - // GOMEMLIMIT is a variable internal to Golang itself, we use this for internal targets, 0 means no maximum func PeerDBFlowWorkerMaxMemBytes() uint64 { return getEnvUint[uint64]("GOMEMLIMIT", 0) diff --git a/flow/workflows/sync_flow.go b/flow/workflows/sync_flow.go index e421759909..711928ef5c 100644 --- a/flow/workflows/sync_flow.go +++ b/flow/workflows/sync_flow.go @@ -30,38 +30,30 @@ func SyncFlowWorkflow( parent := workflow.GetInfo(ctx).ParentWorkflowExecution logger := log.With(workflow.GetLogger(ctx), slog.String(string(shared.FlowNameKey), config.FlowJobName)) - enableOneSync := GetSideEffect(ctx, func(_ workflow.Context) bool { - return !peerdbenv.PeerDBDisableOneSync() - }) - var fMaintain workflow.Future - var sessionID string - syncSessionCtx := ctx - if enableOneSync { - sessionOptions := &workflow.SessionOptions{ - CreationTimeout: 5 * time.Minute, - ExecutionTimeout: 144 * time.Hour, - HeartbeatTimeout: time.Minute, - } - var err error - syncSessionCtx, err = workflow.CreateSession(ctx, sessionOptions) - if err != nil { - return err - } - defer workflow.CompleteSession(syncSessionCtx) - sessionID = workflow.GetSessionInfo(syncSessionCtx).SessionID + sessionOptions := &workflow.SessionOptions{ + CreationTimeout: 5 * time.Minute, + ExecutionTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + } - maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ - StartToCloseTimeout: 14 * 24 * time.Hour, - HeartbeatTimeout: time.Minute, - WaitForCancellation: true, - }) - fMaintain = workflow.ExecuteActivity( - maintainCtx, - flowable.MaintainPull, - config, - sessionID, - ) + syncSessionCtx, err := workflow.CreateSession(ctx, sessionOptions) + if err != nil { + return err } + defer workflow.CompleteSession(syncSessionCtx) + + sessionID := workflow.GetSessionInfo(syncSessionCtx).SessionID + maintainCtx := workflow.WithActivityOptions(syncSessionCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 14 * 24 * time.Hour, + HeartbeatTimeout: time.Minute, + WaitForCancellation: true, + }) + fMaintain := workflow.ExecuteActivity( + maintainCtx, + flowable.MaintainPull, + config, + sessionID, + ) var stop, syncErr bool currentSyncFlowNum := 0 From 9cbaf176ffa4d7a0f9ea745ae1129488e8d381a4 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Mon, 27 May 2024 09:58:24 -0400 Subject: [PATCH 3/3] increase the partsize for s3 upload (#1732) --- flow/connectors/utils/avro/avro_writer.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index ad2597478a..1908d43972 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -197,7 +197,11 @@ func (p *peerDBOCFWriter) WriteRecordsToS3( numRows, writeOcfError = p.WriteOCF(ctx, w) }() - _, err = manager.NewUploader(s3svc).Upload(ctx, &s3.PutObjectInput{ + uploader := manager.NewUploader(s3svc, func(u *manager.Uploader) { + u.PartSize = 4 * 1024 * 1024 * 1024 + }) + + _, err = uploader.Upload(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: r,