Skip to content

Commit 1eba958

Browse files
committed
fixed review comments pt.1
1 parent 02d19db commit 1eba958

File tree

7 files changed

+21
-25
lines changed

7 files changed

+21
-25
lines changed

flow/activities/flowable.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,9 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
234234
SrcTableIDNameMapping: input.SrcTableIdNameMapping,
235235
TableNameMapping: tblNameMapping,
236236
LastOffset: input.LastSyncState.Checkpoint,
237-
MaxBatchSize: uint32(input.SyncFlowOptions.BatchSize),
237+
MaxBatchSize: input.SyncFlowOptions.BatchSize,
238238
IdleTimeout: peerdbenv.PeerDBCDCIdleTimeoutSeconds(
239-
int(input.FlowConnectionConfigs.IdleTimeoutSeconds),
239+
int(input.SyncFlowOptions.IdleTimeoutSeconds),
240240
),
241241
TableNameSchemaMapping: input.TableNameSchemaMapping,
242242
OverridePublicationName: input.FlowConnectionConfigs.PublicationName,

flow/cmd/handler.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,9 @@ func (h *FlowRequestHandler) CreateCDCFlow(
131131
},
132132
}
133133

134-
maxBatchSize := int(cfg.MaxBatchSize)
134+
maxBatchSize := cfg.MaxBatchSize
135135
if maxBatchSize == 0 {
136136
maxBatchSize = 1_000_000
137-
cfg.MaxBatchSize = uint32(maxBatchSize)
138137
}
139138

140139
limits := &peerflow.CDCFlowLimits{

flow/workflows/cdc_flow.go

+10-12
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type CDCFlowLimits struct {
2626
// This is typically non-zero for testing purposes.
2727
TotalSyncFlows int
2828
// Maximum number of rows in a sync flow batch.
29-
MaxBatchSize int
29+
MaxBatchSize uint32
3030
// Rows synced after which we can say a test is done.
3131
ExitAfterRecords int
3232
}
@@ -48,14 +48,15 @@ type CDCFlowWorkflowState struct {
4848
// Needed to support schema changes.
4949
RelationMessageMapping model.RelationMessageMapping
5050
// current workflow state
51-
CurrentFlowState protos.FlowStatus
51+
CurrentFlowState protos.FlowStatus
52+
// moved from config here, set by SetupFlow
5253
SrcTableIdNameMapping map[uint32]string
5354
TableNameSchemaMapping map[string]*protos.TableSchema
5455
}
5556

5657
type SignalProps struct {
57-
BatchSize int32
58-
IdleTimeout int64
58+
BatchSize uint32
59+
IdleTimeout uint64
5960
}
6061

6162
// returns a new empty PeerFlowState
@@ -155,13 +156,12 @@ func CDCFlowWorkflowWithConfig(
155156
limits *CDCFlowLimits,
156157
state *CDCFlowWorkflowState,
157158
) (*CDCFlowWorkflowResult, error) {
158-
if state == nil {
159-
state = NewCDCFlowWorkflowState(len(cfg.TableMappings))
160-
}
161-
162159
if cfg == nil {
163160
return nil, fmt.Errorf("invalid connection configs")
164161
}
162+
if state == nil {
163+
state = NewCDCFlowWorkflowState(len(cfg.TableMappings))
164+
}
165165

166166
w := NewCDCFlowWorkflowExecution(ctx)
167167

@@ -304,7 +304,7 @@ func CDCFlowWorkflowWithConfig(
304304
}
305305

306306
syncFlowOptions := &protos.SyncFlowOptions{
307-
BatchSize: int32(limits.MaxBatchSize),
307+
BatchSize: limits.MaxBatchSize,
308308
IdleTimeoutSeconds: 0,
309309
SrcTableIdNameMapping: state.SrcTableIdNameMapping,
310310
TableNameSchemaMapping: state.TableNameSchemaMapping,
@@ -319,14 +319,12 @@ func CDCFlowWorkflowWithConfig(
319319
cdcPropertiesSelector.AddReceive(cdcPropertiesSignalChannel, func(c workflow.ReceiveChannel, more bool) {
320320
var cdcSignal SignalProps
321321
c.Receive(ctx, &cdcSignal)
322+
// only modify for options since SyncFlow uses it
322323
if cdcSignal.BatchSize > 0 {
323324
syncFlowOptions.BatchSize = cdcSignal.BatchSize
324-
cfg.MaxBatchSize = uint32(cdcSignal.BatchSize)
325-
limits.MaxBatchSize = int(cdcSignal.BatchSize)
326325
}
327326
if cdcSignal.IdleTimeout > 0 {
328327
syncFlowOptions.IdleTimeoutSeconds = cdcSignal.IdleTimeout
329-
cfg.IdleTimeoutSeconds = cdcSignal.IdleTimeout
330328
}
331329

332330
slog.Info("CDC Signal received. Parameters on signal reception:", slog.Int("BatchSize", int(cfg.MaxBatchSize)),

nexus/analyzer/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
332332
resync,
333333
soft_delete_col_name,
334334
synced_at_col_name,
335-
initial_copy_only,
335+
initial_snapshot_only: initial_copy_only,
336336
};
337337

338338
if initial_copy_only && !do_initial_copy {

nexus/flow-rs/src/grpc.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ impl FlowGrpcClient {
143143
});
144144
});
145145

146-
let do_initial_copy = job.do_initial_copy;
146+
let do_initial_snapshot = job.do_initial_copy;
147147
let publication_name = job.publication_name.clone();
148148
let replication_slot_name = job.replication_slot_name.clone();
149149
let snapshot_num_rows_per_partition = job.snapshot_num_rows_per_partition;
@@ -155,7 +155,7 @@ impl FlowGrpcClient {
155155
destination: Some(dst),
156156
flow_job_name: job.name.clone(),
157157
table_mappings,
158-
do_initial_snapshot: do_initial_copy,
158+
do_initial_snapshot,
159159
publication_name: publication_name.unwrap_or_default(),
160160
snapshot_num_rows_per_partition: snapshot_num_rows_per_partition.unwrap_or(0),
161161
snapshot_max_parallel_workers: snapshot_max_parallel_workers.unwrap_or(0),
@@ -168,7 +168,7 @@ impl FlowGrpcClient {
168168
resync: job.resync,
169169
soft_delete_col_name: job.soft_delete_col_name.clone().unwrap_or_default(),
170170
synced_at_col_name: job.synced_at_col_name.clone().unwrap_or_default(),
171-
initial_snapshot_only: job.initial_copy_only,
171+
initial_snapshot_only: job.initial_snapshot_only,
172172
..Default::default()
173173
};
174174

nexus/pt/src/flow_model.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ pub struct FlowJob {
7070
pub resync: bool,
7171
pub soft_delete_col_name: Option<String>,
7272
pub synced_at_col_name: Option<String>,
73-
pub initial_copy_only: bool,
73+
pub initial_snapshot_only: bool,
7474
}
7575

7676
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]

protos/flow.proto

+3-4
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ message FlowConnectionConfigs {
4545
// currently, MaxBatchSize and IdleTimeoutSeconds are dynamic via Temporal signals
4646
repeated TableMapping table_mappings = 4;
4747
uint32 max_batch_size = 5;
48-
int64 idle_timeout_seconds = 6;
48+
uint64 idle_timeout_seconds = 6;
4949
string cdc_staging_path = 7;
5050
string publication_name = 8;
5151
string replication_slot_name = 9;
@@ -98,9 +98,9 @@ message CreateTablesFromExistingOutput {
9898
}
9999

100100
message SyncFlowOptions {
101-
int32 batch_size = 1;
101+
uint32 batch_size = 1;
102102
map<uint32, RelationMessage> relation_message_mapping = 2;
103-
int64 idle_timeout_seconds = 3;
103+
uint64 idle_timeout_seconds = 3;
104104
map<uint32, string> src_table_id_name_mapping = 4;
105105
map<string, TableSchema> table_name_schema_mapping = 5;
106106
}
@@ -111,7 +111,6 @@ message NormalizeFlowOptions {
111111

112112
message LastSyncState {
113113
int64 checkpoint = 1;
114-
google.protobuf.Timestamp last_synced_at = 2;
115114
}
116115

117116
message StartFlowInput {

0 commit comments

Comments
 (0)