Skip to content

Commit

Permalink
move checks to EnsurePullability
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 23, 2023
1 parent c95d2ce commit baf5cfe
Show file tree
Hide file tree
Showing 8 changed files with 290 additions and 332 deletions.
10 changes: 3 additions & 7 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,15 @@ func (c *PostgresConnector) getPrimaryKeyColumns(schemaTable *utils.SchemaTable)
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
defer rows.Close()
// 0 rows returned, table has no primary keys
if !rows.Next() {
return nil, fmt.Errorf("table %s has no primary keys", schemaTable)
}
for {
if !rows.Next() {
break
}
err = rows.Scan(&pkCol)
if err != nil {
return nil, fmt.Errorf("error scanning primary key column for table %s: %w", schemaTable, err)
}
pkCols = append(pkCols, pkCol)
if !rows.Next() {
break
}
}

return pkCols, nil
Expand Down
30 changes: 21 additions & 9 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (c *PostgresConnector) GetTableSchema(
req *protos.GetTableSchemaBatchInput) (*protos.GetTableSchemaBatchOutput, error) {
res := make(map[string]*protos.TableSchema)
for _, tableName := range req.TableIdentifiers {
tableSchema, err := c.getTableSchemaForTable(tableName, req.IgnorePkeyRequirements)
tableSchema, err := c.getTableSchemaForTable(tableName)
if err != nil {
return nil, err
}
Expand All @@ -562,7 +562,6 @@ func (c *PostgresConnector) GetTableSchema(

func (c *PostgresConnector) getTableSchemaForTable(
tableName string,
ignorePkeyRequirements bool,
) (*protos.TableSchema, error) {
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
Expand All @@ -574,6 +573,11 @@ func (c *PostgresConnector) getTableSchemaForTable(
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}

// Get the column names and types
rows, err := c.pool.Query(c.ctx,
fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, schemaTable.String()),
Expand All @@ -583,13 +587,6 @@ func (c *PostgresConnector) getTableSchemaForTable(
}
defer rows.Close()

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
if !(isFullReplica || ignorePkeyRequirements) {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}

res := &protos.TableSchema{
TableIdentifier: tableName,
Columns: make(map[string]string),
Expand Down Expand Up @@ -745,6 +742,21 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch
return nil, err
}

isFullReplica, replErr := c.isTableFullReplica(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}

// we only allow no primary key if the table has REPLICA IDENTITY FULL
if len(pKeyCols) == 0 && !isFullReplica {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

tableIdentifierMapping[tableName] = &protos.TableIdentifier{
TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{
PostgresTableIdentifier: &protos.PostgresTableIdentifier{
Expand Down
538 changes: 263 additions & 275 deletions flow/generated/protos/flow.pb.go

Large diffs are not rendered by default.

5 changes: 2 additions & 3 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,8 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex
})

tableSchemaInput := &protos.GetTableSchemaBatchInput{
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
IgnorePkeyRequirements: true,
PeerConnectionConfig: q.config.SourcePeer,
TableIdentifiers: []string{q.config.WatermarkTable},
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
2 changes: 0 additions & 2 deletions nexus/pt/src/peerdb_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,6 @@ pub struct GetTableSchemaBatchInput {
pub peer_connection_config: ::core::option::Option<super::peerdb_peers::Peer>,
#[prost(string, repeated, tag="2")]
pub table_identifiers: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(bool, tag="3")]
pub ignore_pkey_requirements: bool,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
18 changes: 0 additions & 18 deletions nexus/pt/src/peerdb_flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1820,19 +1820,13 @@ impl serde::Serialize for GetTableSchemaBatchInput {
if !self.table_identifiers.is_empty() {
len += 1;
}
if self.ignore_pkey_requirements {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_flow.GetTableSchemaBatchInput", len)?;
if let Some(v) = self.peer_connection_config.as_ref() {
struct_ser.serialize_field("peerConnectionConfig", v)?;
}
if !self.table_identifiers.is_empty() {
struct_ser.serialize_field("tableIdentifiers", &self.table_identifiers)?;
}
if self.ignore_pkey_requirements {
struct_ser.serialize_field("ignorePkeyRequirements", &self.ignore_pkey_requirements)?;
}
struct_ser.end()
}
}
Expand All @@ -1847,15 +1841,12 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
"peerConnectionConfig",
"table_identifiers",
"tableIdentifiers",
"ignore_pkey_requirements",
"ignorePkeyRequirements",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
PeerConnectionConfig,
TableIdentifiers,
IgnorePkeyRequirements,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand All @@ -1880,7 +1871,6 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
match value {
"peerConnectionConfig" | "peer_connection_config" => Ok(GeneratedField::PeerConnectionConfig),
"tableIdentifiers" | "table_identifiers" => Ok(GeneratedField::TableIdentifiers),
"ignorePkeyRequirements" | "ignore_pkey_requirements" => Ok(GeneratedField::IgnorePkeyRequirements),
_ => Ok(GeneratedField::__SkipField__),
}
}
Expand All @@ -1902,7 +1892,6 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
{
let mut peer_connection_config__ = None;
let mut table_identifiers__ = None;
let mut ignore_pkey_requirements__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::PeerConnectionConfig => {
Expand All @@ -1917,12 +1906,6 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
}
table_identifiers__ = Some(map.next_value()?);
}
GeneratedField::IgnorePkeyRequirements => {
if ignore_pkey_requirements__.is_some() {
return Err(serde::de::Error::duplicate_field("ignorePkeyRequirements"));
}
ignore_pkey_requirements__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
Expand All @@ -1931,7 +1914,6 @@ impl<'de> serde::Deserialize<'de> for GetTableSchemaBatchInput {
Ok(GetTableSchemaBatchInput {
peer_connection_config: peer_connection_config__,
table_identifiers: table_identifiers__.unwrap_or_default(),
ignore_pkey_requirements: ignore_pkey_requirements__.unwrap_or_default(),
})
}
}
Expand Down
1 change: 0 additions & 1 deletion protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ message TableSchema {
message GetTableSchemaBatchInput {
peerdb_peers.Peer peer_connection_config = 1;
repeated string table_identifiers = 2;
bool ignore_pkey_requirements = 3;
}

message GetTableSchemaBatchOutput {
Expand Down
18 changes: 1 addition & 17 deletions ui/grpc_generated/flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ export interface TableSchema_ColumnsEntry {
export interface GetTableSchemaBatchInput {
peerConnectionConfig: Peer | undefined;
tableIdentifiers: string[];
ignorePkeyRequirements: boolean;
}

export interface GetTableSchemaBatchOutput {
Expand Down Expand Up @@ -4029,7 +4028,7 @@ export const TableSchema_ColumnsEntry = {
};

function createBaseGetTableSchemaBatchInput(): GetTableSchemaBatchInput {
return { peerConnectionConfig: undefined, tableIdentifiers: [], ignorePkeyRequirements: false };
return { peerConnectionConfig: undefined, tableIdentifiers: [] };
}

export const GetTableSchemaBatchInput = {
Expand All @@ -4040,9 +4039,6 @@ export const GetTableSchemaBatchInput = {
for (const v of message.tableIdentifiers) {
writer.uint32(18).string(v!);
}
if (message.ignorePkeyRequirements === true) {
writer.uint32(24).bool(message.ignorePkeyRequirements);
}
return writer;
},

Expand All @@ -4067,13 +4063,6 @@ export const GetTableSchemaBatchInput = {

message.tableIdentifiers.push(reader.string());
continue;
case 3:
if (tag !== 24) {
break;
}

message.ignorePkeyRequirements = reader.bool();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -4089,7 +4078,6 @@ export const GetTableSchemaBatchInput = {
tableIdentifiers: Array.isArray(object?.tableIdentifiers)
? object.tableIdentifiers.map((e: any) => String(e))
: [],
ignorePkeyRequirements: isSet(object.ignorePkeyRequirements) ? Boolean(object.ignorePkeyRequirements) : false,
};
},

Expand All @@ -4101,9 +4089,6 @@ export const GetTableSchemaBatchInput = {
if (message.tableIdentifiers?.length) {
obj.tableIdentifiers = message.tableIdentifiers;
}
if (message.ignorePkeyRequirements === true) {
obj.ignorePkeyRequirements = message.ignorePkeyRequirements;
}
return obj;
},

Expand All @@ -4116,7 +4101,6 @@ export const GetTableSchemaBatchInput = {
? Peer.fromPartial(object.peerConnectionConfig)
: undefined;
message.tableIdentifiers = object.tableIdentifiers?.map((e) => e) || [];
message.ignorePkeyRequirements = object.ignorePkeyRequirements ?? false;
return message;
},
};
Expand Down

0 comments on commit baf5cfe

Please sign in to comment.