Skip to content

Commit

Permalink
set synced at for resync table
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Nov 17, 2023
1 parent 8453899 commit ee36f43
Show file tree
Hide file tree
Showing 9 changed files with 1,899 additions and 549 deletions.
19 changes: 19 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,25 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
}
}()

if req.SyncedAtColName != nil {
for _, renameRequest := range req.RenameTableOptions {
resyncTblName := renameRequest.CurrentName

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("setting synced at column for table '%s'...", resyncTblName)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...",
resyncTblName))

_, err = renameTablesTx.ExecContext(c.ctx,
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP", resyncTblName, *req.SyncedAtColName))
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", resyncTblName, err)
}
}
}

if req.SoftDeleteColName != nil {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
Expand Down
1,002 changes: 507 additions & 495 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func CDCFlowWorkflowWithConfig(
if cfg.SoftDelete {
renameOpts.SoftDeleteColName = &cfg.SoftDeleteColName
}
renameOpts.SyncedAtColName = &cfg.SyncedAtColName
correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema)
for _, mapping := range cfg.TableMappings {
oldName := mapping.DestinationTableIdentifier
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ pub struct RenameTablesInput {
pub rename_table_options: ::prost::alloc::vec::Vec<RenameTableOption>,
#[prost(string, optional, tag="4")]
pub soft_delete_col_name: ::core::option::Option<::prost::alloc::string::String>,
#[prost(string, optional, tag="5")]
pub synced_at_col_name: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4146,6 +4146,9 @@ impl serde::Serialize for RenameTablesInput {
if self.soft_delete_col_name.is_some() {
len += 1;
}
if self.synced_at_col_name.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.RenameTablesInput", len)?;
if !self.flow_job_name.is_empty() {
struct_ser.serialize_field("flowJobName", &self.flow_job_name)?;
Expand All @@ -4159,6 +4162,9 @@ impl serde::Serialize for RenameTablesInput {
if let Some(v) = self.soft_delete_col_name.as_ref() {
struct_ser.serialize_field("softDeleteColName", v)?;
}
if let Some(v) = self.synced_at_col_name.as_ref() {
struct_ser.serialize_field("syncedAtColName", v)?;
}
struct_ser.end()
}
}
Expand All @@ -4176,6 +4182,8 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
"renameTableOptions",
"soft_delete_col_name",
"softDeleteColName",
"synced_at_col_name",
"syncedAtColName",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -4184,6 +4192,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
Peer,
RenameTableOptions,
SoftDeleteColName,
SyncedAtColName,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -4210,6 +4219,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
"peer" => Ok(GeneratedField::Peer),
"renameTableOptions" | "rename_table_options" => Ok(GeneratedField::RenameTableOptions),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -4233,6 +4243,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
let mut peer__ = None;
let mut rename_table_options__ = None;
let mut soft_delete_col_name__ = None;
let mut synced_at_col_name__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FlowJobName => {
Expand All @@ -4259,6 +4270,12 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
}
soft_delete_col_name__ = map.next_value()?;
}
GeneratedField::SyncedAtColName => {
if synced_at_col_name__.is_some() {
return Err(serde::de::Error::duplicate_field("syncedAtColName"));
}
synced_at_col_name__ = map.next_value()?;
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -4269,6 +4286,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
peer: peer__,
rename_table_options: rename_table_options__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__,
synced_at_col_name: synced_at_col_name__,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ message RenameTablesInput {
peerdb_peers.Peer peer = 2;
repeated RenameTableOption rename_table_options = 3;
optional string soft_delete_col_name = 4;
optional string synced_at_col_name = 5;
}

message RenameTablesOutput {
Expand Down
24 changes: 23 additions & 1 deletion ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export interface RenameTablesInput {
peer: Peer | undefined;
renameTableOptions: RenameTableOption[];
softDeleteColName?: string | undefined;
syncedAtColName?: string | undefined;
}

export interface RenameTablesOutput {
Expand Down Expand Up @@ -1592,7 +1593,13 @@ export const RenameTableOption = {
};

function createBaseRenameTablesInput(): RenameTablesInput {
return { flowJobName: "", peer: undefined, renameTableOptions: [], softDeleteColName: undefined };
return {
flowJobName: "",
peer: undefined,
renameTableOptions: [],
softDeleteColName: undefined,
syncedAtColName: undefined,
};
}

export const RenameTablesInput = {
Expand All @@ -1609,6 +1616,9 @@ export const RenameTablesInput = {
if (message.softDeleteColName !== undefined) {
writer.uint32(34).string(message.softDeleteColName);
}
if (message.syncedAtColName !== undefined) {
writer.uint32(42).string(message.syncedAtColName);
}
return writer;
},

Expand Down Expand Up @@ -1647,6 +1657,13 @@ export const RenameTablesInput = {

message.softDeleteColName = reader.string();
continue;
case 5:
if (tag !== 42) {
break;
}

message.syncedAtColName = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -1664,6 +1681,7 @@ export const RenameTablesInput = {
? object.renameTableOptions.map((e: any) => RenameTableOption.fromJSON(e))
: [],
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : undefined,
syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : undefined,
};
},

Expand All @@ -1681,6 +1699,9 @@ export const RenameTablesInput = {
if (message.softDeleteColName !== undefined) {
obj.softDeleteColName = message.softDeleteColName;
}
if (message.syncedAtColName !== undefined) {
obj.syncedAtColName = message.syncedAtColName;
}
return obj;
},

Expand All @@ -1693,6 +1714,7 @@ export const RenameTablesInput = {
message.peer = (object.peer !== undefined && object.peer !== null) ? Peer.fromPartial(object.peer) : undefined;
message.renameTableOptions = object.renameTableOptions?.map((e) => RenameTableOption.fromPartial(e)) || [];
message.softDeleteColName = object.softDeleteColName ?? undefined;
message.syncedAtColName = object.syncedAtColName ?? undefined;
return message;
},
};
Expand Down
Loading

0 comments on commit ee36f43

Please sign in to comment.