Skip to content

Commit

Permalink
added dynamic table addition to existing props signal (#1106)
Browse files Browse the repository at this point in the history
Syntax has changed as a result due to switching to a proto:

```
{
  "batch_size":100000,
  "idle_timeout":10,
  "additional_tables": [{
    "source_table_identifier": "public.oss2",
    "destination_table_identifier": "public.oss2dst"
  }]
}
```
  • Loading branch information
heavycrystal authored Jan 24, 2024
1 parent a013eee commit 4010599
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 15 deletions.
28 changes: 13 additions & 15 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ type CDCFlowWorkflowState struct {
FlowConfigUpdates []*protos.CDCFlowConfigUpdate
}

type SignalProps struct {
BatchSize uint32
IdleTimeout uint64
}

// returns a new empty PeerFlowState
func NewCDCFlowWorkflowState(numTables int) *CDCFlowWorkflowState {
return &CDCFlowWorkflowState{
Expand Down Expand Up @@ -168,8 +163,6 @@ func (w *CDCFlowWorkflowExecution) processCDCFlowConfigUpdates(ctx workflow.Cont
additionalTablesWorkflowCfg.DoInitialSnapshot = true
additionalTablesWorkflowCfg.InitialSnapshotOnly = true
additionalTablesWorkflowCfg.TableMappings = flowConfigUpdate.AdditionalTables
additionalTablesWorkflowCfg.FlowJobName = fmt.Sprintf("%s_additional_tables_%s", cfg.FlowJobName,
strings.ToLower(shared.RandomString(8)))

childAdditionalTablesCDCFlowID,
err := GetChildWorkflowID(ctx, "cdc-flow", additionalTablesWorkflowCfg.FlowJobName)
Expand Down Expand Up @@ -375,18 +368,23 @@ func CDCFlowWorkflowWithConfig(
cdcPropertiesSignalChannel := workflow.GetSignalChannel(ctx, shared.CDCDynamicPropertiesSignalName)
cdcPropertiesSelector := workflow.NewSelector(ctx)
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
var cdcSignal SignalProps
c.Receive(ctx, &cdcSignal)
var cdcConfigUpdate *protos.CDCFlowConfigUpdate
c.Receive(ctx, &cdcConfigUpdate)
// only modify for options since SyncFlow uses it
if cdcSignal.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcSignal.BatchSize
if cdcConfigUpdate.BatchSize > 0 {
syncFlowOptions.BatchSize = cdcConfigUpdate.BatchSize
}
if cdcConfigUpdate.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcConfigUpdate.IdleTimeout
}
if cdcSignal.IdleTimeout > 0 {
syncFlowOptions.IdleTimeoutSeconds = cdcSignal.IdleTimeout
if len(cdcConfigUpdate.AdditionalTables) > 0 {
state.FlowConfigUpdates = append(state.FlowConfigUpdates, cdcConfigUpdate)
}

slog.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(cfg.MaxBatchSize)),
slog.Int("IdleTimeout", int(cfg.IdleTimeoutSeconds)))
slog.Info("CDC Signal received. Parameters on signal reception:",
slog.Int("BatchSize", int(syncFlowOptions.BatchSize)),
slog.Int("IdleTimeout", int(syncFlowOptions.IdleTimeoutSeconds)),
slog.Any("AdditionalTables", cdcConfigUpdate.AdditionalTables))
})

cdcPropertiesSelector.AddDefault(func() {
Expand Down
3 changes: 3 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ enum FlowStatus {

message CDCFlowConfigUpdate {
repeated TableMapping additional_tables = 1;
uint32 batch_size = 2;
uint64 idle_timeout = 3;
}

message QRepFlowConfigUpdate {
Expand All @@ -411,3 +413,4 @@ message AddTablesToPublicationInput{
string publication_name = 2;
repeated TableMapping additional_tables = 3;
}

0 comments on commit 4010599

Please sign in to comment.