Skip to content

Commit

Permalink
Parameter for cdc with only snapshot (#884)
Browse files Browse the repository at this point in the history
This PR introduces a parameter - `initial_copy_only` which when set,
will initiate CDC and exit after Snapshot Flow is complete. No Sync
Flows will be called. By default this is set to false.

<img width="1089" alt="Screenshot 2023-12-22 at 10 15 30 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/99296169-7dfb-425f-bb9a-6dcddb3182f5">

In the UI, this parameter is listed in the Advanced Settings section:
<img width="1195" alt="Screenshot 2023-12-22 at 10 51 13 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/ae6f5b2a-c637-495d-9451-a205934ad163">

Validation for this parameter is set in both query layer and UI:
```bash
amogh=> CREATE MIRROR sfnosnappy1                                                                                                                                                                                                              
amogh-> FROM postgres_local TO sf_peer
amogh-> WITH TABLE MAPPING (
amogh(> public.oss2:public.oss2nexusso6,
amogh(> public.oss2ud1:public.oss2ud1nexusso6
amogh(> )
amogh-> WITH(
amogh(> do_initial_copy = false,
amogh(> snapshot_staging_path = '',
amogh(> initial_copy_only = true
amogh(> );
ERROR:  initial_copy_only is set to true, but do_initial_copy is set to false
```

<img width="997" alt="Screenshot 2023-12-22 at 10 57 46 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/e1d83b9f-49a8-4951-ab69-5826e242bf38">

This has been tested via UI and query layer and the happy path where it
is not set it also tested.
  • Loading branch information
Amogh-Bharadwaj authored Dec 22, 2023
1 parent 8dbe7c9 commit 0cb7abe
Show file tree
Hide file tree
Showing 12 changed files with 658 additions and 571 deletions.
1,153 changes: 582 additions & 571 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func CDCFlowWorkflowWithConfig(

state.SnapshotComplete = true
state.Progress = append(state.Progress, "executed setup flow and snapshot flow")
// if initial_copy_only is opted for, we end the flow here.
if cfg.InitialCopyOnly {
return nil, nil
}
}

syncFlowOptions := &protos.SyncFlowOptions{
Expand Down
10 changes: 10 additions & 0 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
_ => None,
};

let initial_copy_only = match raw_options.remove("initial_copy_only") {
Some(sqlparser::ast::Value::Boolean(b)) => *b,
_ => false,
};

let flow_job = FlowJob {
name: cdc.mirror_name.to_string().to_lowercase(),
source_peer: cdc.source_peer.to_string().to_lowercase(),
Expand All @@ -327,8 +332,13 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
resync,
soft_delete_col_name,
synced_at_col_name,
initial_copy_only,
};

if initial_copy_only && !do_initial_copy {
anyhow::bail!("initial_copy_only is set to true, but do_initial_copy is set to false");
}

Ok(Some(PeerDDL::CreateMirrorForCDC {
if_not_exists: *if_not_exists,
flow_job,
Expand Down
1 change: 1 addition & 0 deletions nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ impl FlowGrpcClient {
resync: job.resync,
soft_delete_col_name: job.soft_delete_col_name.clone().unwrap_or_default(),
synced_at_col_name: job.synced_at_col_name.clone().unwrap_or_default(),
initial_copy_only: job.initial_copy_only,
..Default::default()
};

Expand Down
1 change: 1 addition & 0 deletions nexus/pt/src/flow_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub struct FlowJob {
pub resync: bool,
pub soft_delete_col_name: Option<String>,
pub synced_at_col_name: Option<String>,
pub initial_copy_only: bool,
}

#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub struct FlowConnectionConfigs {
pub soft_delete_col_name: ::prost::alloc::string::String,
#[prost(string, tag="25")]
pub synced_at_col_name: ::prost::alloc::string::String,
#[prost(bool, tag="26")]
pub initial_copy_only: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1229,6 +1229,9 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.synced_at_col_name.is_empty() {
len += 1;
}
if self.initial_copy_only {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.FlowConnectionConfigs", len)?;
if let Some(v) = self.source.as_ref() {
struct_ser.serialize_field("source", v)?;
Expand Down Expand Up @@ -1309,6 +1312,9 @@ impl serde::Serialize for FlowConnectionConfigs {
if !self.synced_at_col_name.is_empty() {
struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?;
}
if self.initial_copy_only {
struct_ser.serialize_field("initialCopyOnly", &self.initial_copy_only)?;
}
struct_ser.end()
}
}
Expand Down Expand Up @@ -1366,6 +1372,8 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"softDeleteColName",
"synced_at_col_name",
"syncedAtColName",
"initial_copy_only",
"initialCopyOnly",
];

#[allow(clippy::enum_variant_names)]
Expand Down Expand Up @@ -1395,6 +1403,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
Resync,
SoftDeleteColName,
SyncedAtColName,
InitialCopyOnly,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -1442,6 +1451,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
"resync" => Ok(GeneratedField::Resync),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
"initialCopyOnly" | "initial_copy_only" => Ok(GeneratedField::InitialCopyOnly),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand Down Expand Up @@ -1486,6 +1496,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
let mut resync__ = None;
let mut soft_delete_col_name__ = None;
let mut synced_at_col_name__ = None;
let mut initial_copy_only__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Source => {
Expand Down Expand Up @@ -1655,6 +1666,12 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
}
synced_at_col_name__ = Some(map.next_value()?);
}
GeneratedField::InitialCopyOnly => {
if initial_copy_only__.is_some() {
return Err(serde::de::Error::duplicate_field("initialCopyOnly"));
}
initial_copy_only__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand Down Expand Up @@ -1686,6 +1703,7 @@ impl<'de> serde::Deserialize<'de> for FlowConnectionConfigs {
resync: resync__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(),
synced_at_col_name: synced_at_col_name__.unwrap_or_default(),
initial_copy_only: initial_copy_only__.unwrap_or_default(),
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ message FlowConnectionConfigs {

string soft_delete_col_name = 24;
string synced_at_col_name = 25;

bool initial_copy_only = 26;
}

message RenameTableOption {
Expand Down
9 changes: 9 additions & 0 deletions ui/app/mirrors/create/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ export const handleCreateCDC = async (
config.cdcSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
config.snapshotSyncMode = QRepSyncMode.QREP_SYNC_MODE_STORAGE_AVRO;
}

if (config.doInitialCopy == false && config.initialCopyOnly == true) {
setMsg({
ok: false,
msg: 'Initial Copy Only cannot be true if Initial Copy is false.',
});
return;
}

setLoading(true);
const statusMessage: UCreateMirrorResponse = await fetch('/api/mirrors/cdc', {
method: 'POST',
Expand Down
11 changes: 11 additions & 0 deletions ui/app/mirrors/create/helpers/cdc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,15 @@ export const cdcSettings: MirrorSetting[] = [
default: 'SQL',
type: 'switch',
},
{
label: 'Initial Copy Only',
stateHandler: (value, setter) =>
setter((curr: CDCConfig) => ({
...curr,
initialCopyOnly: (value as boolean) || false,
})),
tips: 'If set, PeerDB will only perform initial load and will not perform CDC sync.',
type: 'switch',
advanced: true,
},
];
1 change: 1 addition & 0 deletions ui/app/mirrors/create/helpers/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export const blankCDCSetting: FlowConnectionConfigs = {
resync: false,
softDeleteColName: '',
syncedAtColName: '',
initialCopyOnly: false,
};

export const blankQRepSetting = {
Expand Down
17 changes: 17 additions & 0 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ export interface FlowConnectionConfigs {
resync: boolean;
softDeleteColName: string;
syncedAtColName: string;
initialCopyOnly: boolean;
}

export interface FlowConnectionConfigs_SrcTableIdNameMappingEntry {
Expand Down Expand Up @@ -872,6 +873,7 @@ function createBaseFlowConnectionConfigs(): FlowConnectionConfigs {
resync: false,
softDeleteColName: "",
syncedAtColName: "",
initialCopyOnly: false,
};
}

Expand Down Expand Up @@ -954,6 +956,9 @@ export const FlowConnectionConfigs = {
if (message.syncedAtColName !== "") {
writer.uint32(202).string(message.syncedAtColName);
}
if (message.initialCopyOnly === true) {
writer.uint32(208).bool(message.initialCopyOnly);
}
return writer;
},

Expand Down Expand Up @@ -1145,6 +1150,13 @@ export const FlowConnectionConfigs = {

message.syncedAtColName = reader.string();
continue;
case 26:
if (tag !== 208) {
break;
}

message.initialCopyOnly = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand Down Expand Up @@ -1199,6 +1211,7 @@ export const FlowConnectionConfigs = {
resync: isSet(object.resync) ? Boolean(object.resync) : false,
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "",
syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "",
initialCopyOnly: isSet(object.initialCopyOnly) ? Boolean(object.initialCopyOnly) : false,
};
},

Expand Down Expand Up @@ -1291,6 +1304,9 @@ export const FlowConnectionConfigs = {
if (message.syncedAtColName !== "") {
obj.syncedAtColName = message.syncedAtColName;
}
if (message.initialCopyOnly === true) {
obj.initialCopyOnly = message.initialCopyOnly;
}
return obj;
},

Expand Down Expand Up @@ -1346,6 +1362,7 @@ export const FlowConnectionConfigs = {
message.resync = object.resync ?? false;
message.softDeleteColName = object.softDeleteColName ?? "";
message.syncedAtColName = object.syncedAtColName ?? "";
message.initialCopyOnly = object.initialCopyOnly ?? false;
return message;
},
};
Expand Down

0 comments on commit 0cb7abe

Please sign in to comment.