Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds idletimeout to flow config, ui and temporal signal #952

Merged
merged 1 commit into from
Jan 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
flowName := input.FlowConnectionConfigs.FlowJobName
errGroup.Go(func() error {
return srcConn.PullRecords(a.CatalogPool, &model.PullRecordsRequest{
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(),
FlowJobName: flowName,
SrcTableIDNameMapping: input.FlowConnectionConfigs.SrcTableIdNameMapping,
TableNameMapping: tblNameMapping,
LastOffset: input.LastSyncState.Checkpoint,
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
int(input.FlowConnectionConfigs.IdleTimeoutSeconds),
),
TableNameSchemaMapping: input.FlowConnectionConfigs.TableNameSchemaMapping,
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,
OverrideReplicationSlotName: input.FlowConnectionConfigs.ReplicationSlotName,
Expand Down
9 changes: 7 additions & 2 deletions flow/peerdbenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ func PeerDBEventhubFlushTimeoutSeconds() time.Duration {
}

// PEERDB_CDC_IDLE_TIMEOUT_SECONDS
func PeerDBCDCIdleTimeoutSeconds() time.Duration {
x := getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
func PeerDBCDCIdleTimeoutSeconds(providedValue int) time.Duration {
var x int
if providedValue > 0 {
x = providedValue
} else {
x = getEnvInt("PEERDB_CDC_IDLE_TIMEOUT_SECONDS", 60)
}
return time.Duration(x) * time.Second
}

Expand Down
8 changes: 4 additions & 4 deletions flow/shared/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

const (
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"
CDCFlowSignalName = "peer-flow-signal"
CDCBatchSizeSignalName = "cdc-batch-size-signal"
peerFlowTaskQueue = "peer-flow-task-queue"
snapshotFlowTaskQueue = "snapshot-flow-task-queue"
CDCFlowSignalName = "peer-flow-signal"
CDCDynamicPropertiesSignalName = "cdc-dynamic-properties"
)

const MirrorNameSearchAttribute = "MirrorName"
Expand Down
41 changes: 28 additions & 13 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peerflow

import (
"fmt"
"log/slog"
"strings"
"time"

Expand Down Expand Up @@ -53,6 +54,11 @@ type CDCFlowWorkflowState struct {
RelationMessageMapping model.RelationMessageMapping
}

type SignalProps struct {
BatchSize int32
IdleTimeout int64
}

// returns a new empty PeerFlowState
func NewCDCFlowWorkflowState() *CDCFlowWorkflowState {
return &CDCFlowWorkflowState{
Expand Down Expand Up @@ -280,22 +286,31 @@ func CDCFlowWorkflowWithConfig(
}

syncFlowOptions := &protos.SyncFlowOptions{
BatchSize: int32(limits.MaxBatchSize),
BatchSize: int32(limits.MaxBatchSize),
IdleTimeoutSeconds: 0,
}

// add a signal to change the batch size
batchSizeSignalChan := workflow.GetSignalChannel(ctx, shared.CDCBatchSizeSignalName)
batchSizeSelector := workflow.NewSelector(ctx)
batchSizeSelector.AddReceive(batchSizeSignalChan, func(c workflow.ReceiveChannel, more bool) {
var batchSize int32
c.Receive(ctx, &batchSize)
w.logger.Info("received batch size signal: ", batchSize)
syncFlowOptions.BatchSize = batchSize
cfg.MaxBatchSize = uint32(batchSize)
limits.MaxBatchSize = int(batchSize)
// add a signal to change CDC properties
cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
cdcPropertiesSelector := workflow.NewSelector(ctx)
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcSignal SignalProps
c.Receive(ctx, &cdcSignal)
if cdcSignal.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcSignal.BatchSize
cfg.MaxBatchSize = uint32(cdcSignal.BatchSize)
limits.MaxBatchSize = int(cdcSignal.BatchSize)
}
if cdcSignal.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcSignal.IdleTimeout
cfg.IdleTimeoutSeconds = cdcSignal.IdleTimeout
}

slog.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(cfg.MaxBatchSize)),
slog.Int("IdleTimeout", int(cfg.IdleTimeoutSeconds)))
})

batchSizeSelector.AddDefault(func() {
cdcPropertiesSelector.AddDefault(func() {
w.logger.Info("no batch size signal received, batch size remains: ",
syncFlowOptions.BatchSize)
})
Expand Down Expand Up @@ -443,7 +458,7 @@ func CDCFlowWorkflowWithConfig(
} else {
state.NormalizeFlowStatuses = append(state.NormalizeFlowStatuses, childNormalizeFlowRes)
}
batchSizeSelector.Select(ctx)
cdcPropertiesSelector.Select(ctx)
}

state.TruncateProgress(w.logger)
Expand Down
3 changes: 3 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ message FlowConnectionConfigs {
string synced_at_col_name = 25;

bool initial_copy_only = 26;

int64 idle_timeout_seconds = 27;
}

message RenameTableOption {
Expand Down Expand Up @@ -111,6 +113,7 @@ message CreateTablesFromExistingOutput {
message SyncFlowOptions {
int32 batch_size = 1;
map<uint32, RelationMessage> relation_message_mapping = 2;
int64 idle_timeout_seconds = 3;
}

message NormalizeFlowOptions {
Expand Down
14 changes: 13 additions & 1 deletion ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ export const cdcSettings: MirrorSetting[] = [
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
maxBatchSize: (value as boolean) || false,
maxBatchSize: (value as number) || 100000,
})),
tips: 'The number of rows PeerDB will pull from source at a time. If left empty, the default value is 100,000 rows.',
type: 'number',
default: '100000',
advanced: true,
},
{
label: 'Idle Timeout (Seconds)',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
idleTimeoutSeconds: (value as number) || 100000,
})),
tips: 'Time after which a Sync flow ends, if it happens before pull batch size is reached. Defaults to 60 seconds.',
helpfulLink: 'https://docs.peerdb.io/metrics/important_cdc_configs',
type: 'number',
default: '60',
},
{
label: 'Publication Name',
stateHandler: (value, setter) =>
Expand Down
1 change: 1 addition & 0 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
softDeleteColName: '',
syncedAtColName: '',
initialCopyOnly: false,
idleTimeoutSeconds: 60,
};

export const blankQRepSetting = {
Expand Down