Skip to content

Commit

Permalink
add flow name to normalize input
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 28, 2023
1 parent 0956254 commit 0accb70
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 248 deletions.
5 changes: 2 additions & 3 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ func (a *FlowableActivity) GetTableSchema(
func (a *FlowableActivity) CreateNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
flowName string,
) (*protos.SetupNormalizedTableBatchOutput, error) {
ctx = context.WithValue(ctx, shared.FlowNameKey, flowName)
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName)
conn, err := connectors.GetCDCSyncConnector(ctx, config.PeerConnectionConfig)
if err != nil {
return nil, fmt.Errorf("failed to get connector: %w", err)
Expand All @@ -178,7 +177,7 @@ func (a *FlowableActivity) CreateNormalizedTable(

setupNormalizedTablesOutput, err := conn.SetupNormalizedTables(config)
if err != nil {
a.Alerter.LogFlowError(ctx, flowName, err)
a.Alerter.LogFlowError(ctx, config.FlowName, err)
return nil, fmt.Errorf("failed to setup normalized tables: %w", err)
}

Expand Down
492 changes: 251 additions & 241 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
q.config.DestinationTableIdentifier: tblSchemaOutput.TableNameSchemaMapping[q.config.WatermarkTable],
},
SyncedAtColName: q.config.SyncedAtColName,
FlowName: q.config.FlowJobName,
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig, q.config.FlowJobName)
future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
q.logger.Error("failed to create watermark table: ", err)
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/setup_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ func (s *SetupFlowExecution) fetchTableSchemaAndSetupNormalizedTables(
TableNameSchemaMapping: normalizedTableMapping,
SoftDeleteColName: flowConnectionConfigs.SoftDeleteColName,
SyncedAtColName: flowConnectionConfigs.SyncedAtColName,
FlowName: flowConnectionConfigs.FlowJobName,
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig, flowConnectionConfigs.FlowJobName)
future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
s.logger.Error("failed to create normalized tables: ", err)
Expand Down
3 changes: 2 additions & 1 deletion flow/workflows/xmin_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ func (q *XminFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
TableNameSchemaMapping: map[string]*protos.TableSchema{
q.config.DestinationTableIdentifier: tblSchemaOutput.TableNameSchemaMapping[q.config.WatermarkTable],
},
FlowName: q.config.FlowJobName,
}

future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig, q.config.FlowJobName)
future = workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig)
var createNormalizedTablesOutput *protos.SetupNormalizedTableBatchOutput
if err := future.Get(ctx, &createNormalizedTablesOutput); err != nil {
q.logger.Error("failed to create watermark table: ", err)
Expand Down
2 changes: 2 additions & 0 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ pub struct SetupNormalizedTableBatchInput {
pub soft_delete_col_name: ::prost::alloc::string::String,
#[prost(string, tag="5")]
pub synced_at_col_name: ::prost::alloc::string::String,
#[prost(string, tag="6")]
pub flow_name: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 18 additions & 0 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4974,6 +4974,9 @@ impl serde::Serialize for SetupNormalizedTableBatchInput {
if !self.synced_at_col_name.is_empty() {
len += 1;
}
if !self.flow_name.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.SetupNormalizedTableBatchInput", len)?;
if let Some(v) = self.peer_connection_config.as_ref() {
struct_ser.serialize_field("peerConnectionConfig", v)?;
Expand All @@ -4987,6 +4990,9 @@ impl serde::Serialize for SetupNormalizedTableBatchInput {
if !self.synced_at_col_name.is_empty() {
struct_ser.serialize_field("syncedAtColName", &self.synced_at_col_name)?;
}
if !self.flow_name.is_empty() {
struct_ser.serialize_field("flowName", &self.flow_name)?;
}
struct_ser.end()
}
}
Expand All @@ -5005,6 +5011,8 @@ impl<'de> serde::Deserialize<'de> for SetupNormalizedTableBatchInput {
"softDeleteColName",
"synced_at_col_name",
"syncedAtColName",
"flow_name",
"flowName",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -5013,6 +5021,7 @@ impl<'de> serde::Deserialize<'de> for SetupNormalizedTableBatchInput {
TableNameSchemaMapping,
SoftDeleteColName,
SyncedAtColName,
FlowName,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -5039,6 +5048,7 @@ impl<'de> serde::Deserialize<'de> for SetupNormalizedTableBatchInput {
"tableNameSchemaMapping" | "table_name_schema_mapping" => Ok(GeneratedField::TableNameSchemaMapping),
"softDeleteColName" | "soft_delete_col_name" => Ok(GeneratedField::SoftDeleteColName),
"syncedAtColName" | "synced_at_col_name" => Ok(GeneratedField::SyncedAtColName),
"flowName" | "flow_name" => Ok(GeneratedField::FlowName),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -5062,6 +5072,7 @@ impl<'de> serde::Deserialize<'de> for SetupNormalizedTableBatchInput {
let mut table_name_schema_mapping__ = None;
let mut soft_delete_col_name__ = None;
let mut synced_at_col_name__ = None;
let mut flow_name__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::PeerConnectionConfig => {
Expand Down Expand Up @@ -5090,6 +5101,12 @@ impl<'de> serde::Deserialize<'de> for SetupNormalizedTableBatchInput {
}
synced_at_col_name__ = Some(map.next_value()?);
}
GeneratedField::FlowName => {
if flow_name__.is_some() {
return Err(serde::de::Error::duplicate_field("flowName"));
}
flow_name__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -5100,6 +5117,7 @@ impl<'de> serde::Deserialize<'de> for SetupNormalizedTableBatchInput {
table_name_schema_mapping: table_name_schema_mapping__.unwrap_or_default(),
soft_delete_col_name: soft_delete_col_name__.unwrap_or_default(),
synced_at_col_name: synced_at_col_name__.unwrap_or_default(),
flow_name: flow_name__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ message SetupNormalizedTableBatchInput {
// migration related columns
string soft_delete_col_name = 4;
string synced_at_col_name = 5;
string flow_name = 6;
}

message SetupNormalizedTableOutput {
Expand Down
24 changes: 23 additions & 1 deletion ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ export interface SetupNormalizedTableBatchInput {
/** migration related columns */
softDeleteColName: string;
syncedAtColName: string;
flowName: string;
}

export interface SetupNormalizedTableBatchInput_TableNameSchemaMappingEntry {
Expand Down Expand Up @@ -4485,7 +4486,13 @@ export const SetupNormalizedTableInput = {
};

function createBaseSetupNormalizedTableBatchInput(): SetupNormalizedTableBatchInput {
return { peerConnectionConfig: undefined, tableNameSchemaMapping: {}, softDeleteColName: "", syncedAtColName: "" };
return {
peerConnectionConfig: undefined,
tableNameSchemaMapping: {},
softDeleteColName: "",
syncedAtColName: "",
flowName: "",
};
}

export const SetupNormalizedTableBatchInput = {
Expand All @@ -4505,6 +4512,9 @@ export const SetupNormalizedTableBatchInput = {
if (message.syncedAtColName !== "") {
writer.uint32(42).string(message.syncedAtColName);
}
if (message.flowName !== "") {
writer.uint32(50).string(message.flowName);
}
return writer;
},

Expand Down Expand Up @@ -4546,6 +4556,13 @@ export const SetupNormalizedTableBatchInput = {

message.syncedAtColName = reader.string();
continue;
case 6:
if (tag !== 50) {
break;
}

message.flowName = reader.string();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -4566,6 +4583,7 @@ export const SetupNormalizedTableBatchInput = {
: {},
softDeleteColName: isSet(object.softDeleteColName) ? String(object.softDeleteColName) : "",
syncedAtColName: isSet(object.syncedAtColName) ? String(object.syncedAtColName) : "",
flowName: isSet(object.flowName) ? String(object.flowName) : "",
};
},

Expand All @@ -4589,6 +4607,9 @@ export const SetupNormalizedTableBatchInput = {
if (message.syncedAtColName !== "") {
obj.syncedAtColName = message.syncedAtColName;
}
if (message.flowName !== "") {
obj.flowName = message.flowName;
}
return obj;
},

Expand All @@ -4612,6 +4633,7 @@ export const SetupNormalizedTableBatchInput = {
}, {});
message.softDeleteColName = object.softDeleteColName ?? "";
message.syncedAtColName = object.syncedAtColName ?? "";
message.flowName = object.flowName ?? "";
return message;
},
};
Expand Down

0 comments on commit 0accb70

Please sign in to comment.