Skip to content

Commit

Permalink
Parameters for EH CDC (#375)
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored Sep 5, 2023
1 parent 5e20aa2 commit cba15f0
Show file tree
Hide file tree
Showing 10 changed files with 523 additions and 418 deletions.
10 changes: 6 additions & 4 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,12 @@ func (a *FlowableActivity) StartFlow(ctx context.Context, input *protos.StartFlo
}

res, err := dest.SyncRecords(&model.SyncRecordsRequest{
Records: records,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncMode: input.FlowConnectionConfigs.CdcSyncMode,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
Records: records,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncMode: input.FlowConnectionConfigs.CdcSyncMode,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
PushBatchSize: input.FlowConnectionConfigs.PushBatchSize,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
})
if err != nil {
log.Warnf("failed to push records: %v", err)
Expand Down
17 changes: 12 additions & 5 deletions flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
batch := req.Records

eventsPerHeartBeat := 1000
eventsPerBatch := 100000
eventsPerBatch := int(req.PushBatchSize)
maxParallelism := req.PushParallelism

batchPerTopic := make(map[string][]*eventhub.Event)
for i, record := range batch.Records {
Expand All @@ -131,7 +132,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}

if (i+1)%eventsPerBatch == 0 {
err := c.sendEventBatch(batchPerTopic)
err := c.sendEventBatch(batchPerTopic, maxParallelism)
if err != nil {
return nil, err
}
Expand All @@ -142,7 +143,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S

// send the remaining events.
if len(batchPerTopic) > 0 {
err := c.sendEventBatch(batchPerTopic)
err := c.sendEventBatch(batchPerTopic, maxParallelism)
if err != nil {
return nil, err
}
Expand All @@ -163,7 +164,7 @@ func (c *EventHubConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}, nil
}

func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event) error {
func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event, maxParallelism int64) error {
if len(events) == 0 {
log.Info("no events to send")
return nil
Expand All @@ -176,11 +177,17 @@ func (c *EventHubConnector) sendEventBatch(events map[string][]*eventhub.Event)
var wg sync.WaitGroup
var once sync.Once
var firstErr error
// Limiting concurrent sends
guard := make(chan struct{}, maxParallelism)

for tblName, eventBatch := range events {
guard <- struct{}{}
wg.Add(1)
go func(tblName string, eventBatch []*eventhub.Event) {
defer wg.Done()
defer func() {
<-guard
wg.Done()
}()

hub, err := c.getOrCreateHubConnection(tblName)
if err != nil {
Expand Down
836 changes: 429 additions & 407 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ type SyncRecordsRequest struct {
SyncMode protos.QRepSyncMode
// Staging path for AVRO files in CDC
StagingPath string
// PushBatchSize is the number of records to push in a batch for EventHub.
PushBatchSize int64
// PushParallelism is the number of batches in Event Hub to push in parallel.
PushParallelism int64
}

type NormalizeRecordsRequest struct {
Expand Down
19 changes: 18 additions & 1 deletion nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,21 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
_ => false,
};

let push_parallelism: Option<i64> = match raw_options
.remove("push_parallelism")
{
Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::<i64>()?),
_ => None,
};

let push_batch_size: Option<i64> = match raw_options
.remove("push_batch_size")
{
Some(sqlparser::ast::Value::Number(n, _)) => Some(n.parse::<i64>()?),
_ => None,
};


let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
source_peer: cdc.source_peer.to_string().to_lowercase(),
Expand All @@ -246,7 +261,9 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
cdc_sync_mode,
cdc_staging_path,
soft_delete,
replication_slot_name
replication_slot_name,
push_batch_size,
push_parallelism,
};

// Error reporting
Expand Down
2 changes: 2 additions & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ impl FlowGrpcClient {
cdc_staging_path: job.cdc_staging_path.clone().unwrap_or_default(),
soft_delete: job.soft_delete,
replication_slot_name: replication_slot_name.unwrap_or_default(),
push_batch_size: job.push_batch_size.unwrap_or_default(),
push_parallelism: job.push_parallelism.unwrap_or_default(),
..Default::default()
};

Expand Down
4 changes: 3 additions & 1 deletion nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ pub struct FlowJob {
pub cdc_sync_mode: Option<FlowSyncMode>,
pub cdc_staging_path: Option<String>,
pub soft_delete: bool,
pub replication_slot_name: Option<String>
pub replication_slot_name: Option<String>,
pub push_parallelism: Option<i64>,
pub push_batch_size: Option<i64>,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
Expand Down
5 changes: 5 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ pub struct FlowConnectionConfigs {
pub soft_delete: bool,
#[prost(string, tag="20")]
pub replication_slot_name: ::prost::alloc::string::String,
/// the below two are for eventhub only
#[prost(int64, tag="21")]
pub push_batch_size: i64,
#[prost(int64, tag="22")]
pub push_parallelism: i64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
40 changes: 40 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.replication_slot_name.is_empty() {
len += 1;
}
if self.push_batch_size != 0 {
len += 1;
}
if self.push_parallelism != 0 {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.FlowConnectionConfigs", len)?;
if let Some(v) = self.source.as_ref() {
struct_ser.serialize_field("source", v)?;
Expand Down Expand Up @@ -936,6 +942,12 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.replication_slot_name.is_empty() {
struct_ser.serialize_field("replicationSlotName", &self.replication_slot_name)?;
}
if self.push_batch_size != 0 {
struct_ser.serialize_field("pushBatchSize", ToString::to_string(&self.push_batch_size).as_str())?;
}
if self.push_parallelism != 0 {
struct_ser.serialize_field("pushParallelism", ToString::to_string(&self.push_parallelism).as_str())?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -984,6 +996,10 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"softDelete",
"replication_slot_name",
"replicationSlotName",
"push_batch_size",
"pushBatchSize",
"push_parallelism",
"pushParallelism",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -1008,6 +1024,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
CdcStagingPath,
SoftDelete,
ReplicationSlotName,
PushBatchSize,
PushParallelism,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -1050,6 +1068,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"cdcStagingPath" | "cdc_staging_path" => Ok(GeneratedField::CdcStagingPath),
"softDelete" | "soft_delete" => Ok(GeneratedField::SoftDelete),
"replicationSlotName" | "replication_slot_name" => Ok(GeneratedField::ReplicationSlotName),
"pushBatchSize" | "push_batch_size" => Ok(GeneratedField::PushBatchSize),
"pushParallelism" | "push_parallelism" => Ok(GeneratedField::PushParallelism),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -1089,6 +1109,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
let mut cdc_staging_path__ = None;
let mut soft_delete__ = None;
let mut replication_slot_name__ = None;
let mut push_batch_size__ = None;
let mut push_parallelism__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Source => {
Expand Down Expand Up @@ -1226,6 +1248,22 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
}
replication_slot_name__ = Some(map.next_value()?);
}
GeneratedField::PushBatchSize => {
if push_batch_size__.is_some() {
return Err(serde::de::Error::duplicate_field("pushBatchSize"));
}
push_batch_size__ =
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::PushParallelism => {
if push_parallelism__.is_some() {
return Err(serde::de::Error::duplicate_field("pushParallelism"));
}
push_parallelism__ =
Some(map.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
;
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -1252,6 +1290,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
cdc_staging_path: cdc_staging_path__.unwrap_or_default(),
soft_delete: soft_delete__.unwrap_or_default(),
replication_slot_name: replication_slot_name__.unwrap_or_default(),
push_batch_size: push_batch_size__.unwrap_or_default(),
push_parallelism: push_parallelism__.unwrap_or_default(),
})
}
}
Expand Down
4 changes: 4 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ message FlowConnectionConfigs {
bool soft_delete = 19;

string replication_slot_name = 20;

// the below two are for eventhub only
int64 push_batch_size = 21;
int64 push_parallelism = 22;
}

message SyncFlowOptions { int32 batch_size = 1; }
Expand Down

0 comments on commit cba15f0

Please sign in to comment.