Skip to content

Commit

Permalink
parses max_batch_size
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 21, 2023
1 parent 06a53ed commit 3bdf024
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 0 deletions.
7 changes: 7 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
_ => None,
};

let max_batch_size: Option<u32> = match raw_options.remove("max_batch_size")
{
Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::<u32>()?),
_ => None,
};

let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
source_peer: cdc.source_peer.to_string().to_lowercase(),
Expand All @@ -268,6 +274,7 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
replication_slot_name,
push_batch_size,
push_parallelism,
max_batch_size,
};

// Error reporting
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ impl FlowGrpcClient {
replication_slot_name: replication_slot_name.unwrap_or_default(),
push_batch_size: job.push_batch_size.unwrap_or_default(),
push_parallelism: job.push_parallelism.unwrap_or_default(),
max_batch_size: job.max_batch_size.unwrap_or_default(),
..Default::default()
};

Expand Down
1 change: 1 addition & 0 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct FlowJob {
pub replication_slot_name: Option<String>,
pub push_parallelism: Option<i64>,
pub push_batch_size: Option<i64>,
pub max_batch_size: Option<u32>,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
Expand Down

0 comments on commit 3bdf024

Please sign in to comment.