Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handling soft deleted rows during resync #676

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,10 @@ func (s *SnowflakeAvroSyncMethod) putFileToStage(localFilePath string, stage str
return nil
}

func (sc *SnowflakeConnector) GetCopyTransformation(
func (c *SnowflakeConnector) GetCopyTransformation(
dstTableName string,
) (*CopyInfo, error) {
colInfo, colsErr := sc.getColsFromTable(dstTableName)
colInfo, colsErr := c.getColsFromTable(dstTableName)
if colsErr != nil {
return nil, fmt.Errorf("failed to get columns from destination table: %w", colsErr)
}
Expand Down
24 changes: 24 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,30 @@ func (c *SnowflakeConnector) RenameTables(req *protos.RenameTablesInput) (*proto
}
}()

if req.SoftDeleteColName != nil {
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName
allCols := strings.Join(maps.Keys(renameRequest.TableSchema.Columns), ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")

log.WithFields(log.Fields{
"flowName": req.FlowJobName,
}).Infof("handling soft-deletes for table '%s'...", dst)

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dst))

_, err = renameTablesTx.ExecContext(c.ctx,
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
src, fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), allCols, *req.SoftDeleteColName,
dst, pkeyCols, pkeyCols, src))
if err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dst, err)
}
}
}

// renaming and dropping such that the _resync table is the new destination
for _, renameRequest := range req.RenameTableOptions {
src := renameRequest.CurrentName
dst := renameRequest.NewName
Expand Down
1,195 changes: 611 additions & 584 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,25 @@ func CDCFlowWorkflowWithConfig(
renameOpts := &protos.RenameTablesInput{}
renameOpts.FlowJobName = cfg.FlowJobName
renameOpts.Peer = cfg.Destination
if cfg.SoftDelete {
renameOpts.SoftDeleteColName = &cfg.SoftDeleteColName
}
correctedTableNameSchemaMapping := make(map[string]*protos.TableSchema)
for _, mapping := range cfg.TableMappings {
oldName := mapping.DestinationTableIdentifier
newName := strings.TrimSuffix(oldName, "_resync")
renameOpts.RenameTableOptions = append(renameOpts.RenameTableOptions, &protos.RenameTableOption{
CurrentName: oldName,
NewName: newName,
// oldName is what was used for the TableNameSchema mapping
TableSchema: cfg.TableNameSchemaMapping[oldName],
})
mapping.DestinationTableIdentifier = newName
// TableNameSchemaMapping is referring to the _resync tables, not the actual names
correctedTableNameSchemaMapping[newName] = cfg.TableNameSchemaMapping[oldName]
}

cfg.TableNameSchemaMapping = correctedTableNameSchemaMapping
renameTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 12 * time.Hour,
HeartbeatTimeout: 1 * time.Hour,
Expand Down
4 changes: 4 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ pub struct RenameTableOption {
pub current_name: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub new_name: ::prost::alloc::string::String,
#[prost(message, optional, tag="3")]
pub table_schema: ::core::option::Option<TableSchema>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand All @@ -116,6 +118,8 @@ pub struct RenameTablesInput {
pub peer: ::core::option::Option<super::peerdb_peers::Peer>,
#[prost(message, repeated, tag="3")]
pub rename_table_options: ::prost::alloc::vec::Vec<RenameTableOption>,
#[prost(string, optional, tag="4")]
pub soft_delete_col_name: ::core::option::Option<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
36 changes: 36 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4008,13 +4008,19 @@ impl serde::Serialize for RenameTableOption {
if !self.new_name.is_empty() {
len += 1;
}
if self.table_schema.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.RenameTableOption", len)?;
if !self.current_name.is_empty() {
struct_ser.serialize_field("currentName", &self.current_name)?;
}
if !self.new_name.is_empty() {
struct_ser.serialize_field("newName", &self.new_name)?;
}
if let Some(v) = self.table_schema.as_ref() {
struct_ser.serialize_field("tableSchema", v)?;
}
struct_ser.end()
}
}
Expand All @@ -4029,12 +4035,15 @@ impl<'de> serde::Deserialize<'de> for RenameTableOption {
"currentName",
"new_name",
"newName",
"table_schema",
"tableSchema",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
CurrentName,
NewName,
TableSchema,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -4059,6 +4068,7 @@ impl<'de> serde::Deserialize<'de> for RenameTableOption {
match value {
"currentName" | "current_name" => Ok(GeneratedField::CurrentName),
"newName" | "new_name" => Ok(GeneratedField::NewName),
"tableSchema" | "table_schema" => Ok(GeneratedField::TableSchema),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -4080,6 +4090,7 @@ impl<'de> serde::Deserialize<'de> for RenameTableOption {
{
let mut current_name__ = None;
let mut new_name__ = None;
let mut table_schema__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::CurrentName => {
Expand All @@ -4094,6 +4105,12 @@ impl<'de> serde::Deserialize<'de> for RenameTableOption {
}
new_name__ = Some(map.next_value()?);
}
GeneratedField::TableSchema => {
if table_schema__.is_some() {
return Err(serde::de::Error::duplicate_field("tableSchema"));
}
table_schema__ = map.next_value()?;
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -4102,6 +4119,7 @@ impl<'de> serde::Deserialize<'de> for RenameTableOption {
Ok(RenameTableOption {
current_name: current_name__.unwrap_or_default(),
new_name: new_name__.unwrap_or_default(),
table_schema: table_schema__,
})
}
}
Expand All @@ -4125,6 +4143,9 @@ impl serde::Serialize for RenameTablesInput {
if !self.rename_table_options.is_empty() {
len += 1;
}
if self.soft_delete_col_name.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.RenameTablesInput", len)?;
if !self.flow_job_name.is_empty() {
struct_ser.serialize_field("flowJobName", &self.flow_job_name)?;
Expand All @@ -4135,6 +4156,9 @@ impl serde::Serialize for RenameTablesInput {
if !self.rename_table_options.is_empty() {
struct_ser.serialize_field("renameTableOptions", &self.rename_table_options)?;
}
if let Some(v) = self.soft_delete_col_name.as_ref() {
struct_ser.serialize_field("softDeleteColName", v)?;
}
struct_ser.end()
}
}
Expand All @@ -4150,13 +4174,16 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
"peer",
"rename_table_options",
"renameTableOptions",
"soft_delete_col_name",
"softDeleteColName",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
FlowJobName,
Peer,
RenameTableOptions,
SoftDeleteColName,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -4182,6 +4209,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
"flowJobName" | "flow_job_name" => Ok(GeneratedField::FlowJobName),
"peer" => Ok(GeneratedField::Peer),
"renameTableOptions" | "rename_table_options" => Ok(GeneratedField::RenameTableOptions),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -4204,6 +4232,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
let mut flow_job_name__ = None;
let mut peer__ = None;
let mut rename_table_options__ = None;
let mut soft_delete_col_name__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::FlowJobName => {
Expand All @@ -4224,6 +4253,12 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
}
rename_table_options__ = Some(map.next_value()?);
}
GeneratedField::SoftDeleteColName => {
if soft_delete_col_name__.is_some() {
return Err(serde::de::Error::duplicate_field("softDeleteColName"));
}
soft_delete_col_name__ = map.next_value()?;
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -4233,6 +4268,7 @@ impl<'de> serde::Deserialize<'de> for RenameTablesInput {
flow_job_name: flow_job_name__.unwrap_or_default(),
peer: peer__,
rename_table_options: rename_table_options__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__,
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ message FlowConnectionConfigs {
message RenameTableOption {
string current_name = 1;
string new_name = 2;
TableSchema table_schema = 3;
}

message RenameTablesInput {
string flow_job_name = 1;
peerdb_peers.Peer peer = 2;
repeated RenameTableOption rename_table_options = 3;
optional string soft_delete_col_name = 4;
}

message RenameTablesOutput {
Expand Down
38 changes: 36 additions & 2 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,14 @@ export interface FlowConnectionConfigs_TableNameSchemaMappingEntry {
export interface RenameTableOption {
currentName: string;
newName: string;
tableSchema: TableSchema | undefined;
}

export interface RenameTablesInput {
flowJobName: string;
peer: Peer | undefined;
renameTableOptions: RenameTableOption[];
softDeleteColName?: string | undefined;
}

export interface RenameTablesOutput {
Expand Down Expand Up @@ -1483,7 +1485,7 @@ export const FlowConnectionConfigs_TableNameSchemaMappingEntry = {
};

function createBaseRenameTableOption(): RenameTableOption {
return { currentName: "", newName: "" };
return { currentName: "", newName: "", tableSchema: undefined };
}

export const RenameTableOption = {
Expand All @@ -1494,6 +1496,9 @@ export const RenameTableOption = {
if (message.newName !== "") {
writer.uint32(18).string(message.newName);
}
if (message.tableSchema !== undefined) {
TableSchema.encode(message.tableSchema, writer.uint32(26).fork()).ldelim();
}
return writer;
},

Expand All @@ -1518,6 +1523,13 @@ export const RenameTableOption = {

message.newName = reader.string();
continue;
case 3:
if (tag !== 26) {
break;
}

message.tableSchema = TableSchema.decode(reader, reader.uint32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -1531,6 +1543,7 @@ export const RenameTableOption = {
return {
currentName: isSet(object.currentName) ? String(object.currentName) : "",
newName: isSet(object.newName) ? String(object.newName) : "",
tableSchema: isSet(object.tableSchema) ? TableSchema.fromJSON(object.tableSchema) : undefined,
};
},

Expand All @@ -1542,6 +1555,9 @@ export const RenameTableOption = {
if (message.newName !== "") {
obj.newName = message.newName;
}
if (message.tableSchema !== undefined) {
obj.tableSchema = TableSchema.toJSON(message.tableSchema);
}
return obj;
},

Expand All @@ -1552,12 +1568,15 @@ export const RenameTableOption = {
const message = createBaseRenameTableOption();
message.currentName = object.currentName ?? "";
message.newName = object.newName ?? "";
message.tableSchema = (object.tableSchema !== undefined && object.tableSchema !== null)
? TableSchema.fromPartial(object.tableSchema)
: undefined;
return message;
},
};

function createBaseRenameTablesInput(): RenameTablesInput {
return { flowJobName: "", peer: undefined, renameTableOptions: [] };
return { flowJobName: "", peer: undefined, renameTableOptions: [], softDeleteColName: undefined };
}

export const RenameTablesInput = {
Expand All @@ -1571,6 +1590,9 @@ export const RenameTablesInput = {
for (const v of message.renameTableOptions) {
RenameTableOption.encode(v!, writer.uint32(26).fork()).ldelim();
}
if (message.softDeleteColName !== undefined) {
writer.uint32(34).string(message.softDeleteColName);
}
return writer;
},

Expand Down Expand Up @@ -1602,6 +1624,13 @@ export const RenameTablesInput = {

message.renameTableOptions.push(RenameTableOption.decode(reader, reader.uint32()));
continue;
case 4:
if (tag !== 34) {
break;
}

message.softDeleteColName = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -1618,6 +1647,7 @@ export const RenameTablesInput = {
renameTableOptions: Array.isArray(object?.renameTableOptions)
? object.renameTableOptions.map((e: any) => RenameTableOption.fromJSON(e))
: [],
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : undefined,
};
},

Expand All @@ -1632,6 +1662,9 @@ export const RenameTablesInput = {
if (message.renameTableOptions?.length) {
obj.renameTableOptions = message.renameTableOptions.map((e) => RenameTableOption.toJSON(e));
}
if (message.softDeleteColName !== undefined) {
obj.softDeleteColName = message.softDeleteColName;
}
return obj;
},

Expand All @@ -1643,6 +1676,7 @@ export const RenameTablesInput = {
message.flowJobName = object.flowJobName ?? "";
message.peer = (object.peer !== undefined && object.peer !== null) ? Peer.fromPartial(object.peer) : undefined;
message.renameTableOptions = object.renameTableOptions?.map((e) => RenameTableOption.fromPartial(e)) || [];
message.softDeleteColName = object.softDeleteColName ?? undefined;
return message;
},
};
Expand Down
Loading