Skip to content

Commit

Permalink
Merge branch 'main' into walheartbeat-for-peers
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 24, 2023
2 parents 79e28a7 + b7b6a8d commit c6148e2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
24 changes: 11 additions & 13 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,15 @@ func (c *PostgresConnector) getPrimaryKeyColumns(schemaTable *utils.SchemaTable)
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
defer rows.Close()
// 0 rows returned, table has no primary keys
if !rows.Next() {
return nil, fmt.Errorf("table %s has no primary keys", schemaTable)
}
for {
if !rows.Next() {
break
}
err = rows.Scan(&pkCol)
if err != nil {
return nil, fmt.Errorf("error scanning primary key column for table %s: %w", schemaTable, err)
}
pkCols = append(pkCols, pkCol)
if !rows.Next() {
break
}
}

return pkCols, nil
Expand Down Expand Up @@ -314,13 +310,15 @@ func generateCreateTableSQLForNormalizedTable(sourceTableIdentifier string,
}

// add composite primary key to the table
primaryKeyColsQuoted := make([]string, 0)
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted,
fmt.Sprintf(`"%s"`, primaryKeyCol))
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
primaryKeyColsQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsQuoted = append(primaryKeyColsQuoted,
fmt.Sprintf(`"%s"`, primaryKeyCol))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsQuoted, ","), ",")))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsQuoted, ","), ",")))

return fmt.Sprintf(createNormalizedTableSQL, sourceTableIdentifier,
strings.TrimSuffix(strings.Join(createTableSQLArray, ""), ","))
Expand Down
27 changes: 20 additions & 7 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ func (c *PostgresConnector) getTableSchemaForTable(
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}

// Get the column names and types
rows, err := c.pool.Query(c.ctx,
fmt.Sprintf(`SELECT * FROM %s LIMIT 0`, schemaTable.String()),
Expand All @@ -582,13 +587,6 @@ func (c *PostgresConnector) getTableSchemaForTable(
}
defer rows.Close()

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
if !isFullReplica {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}
}

res := &protos.TableSchema{
TableIdentifier: tableName,
Columns: make(map[string]string),
Expand Down Expand Up @@ -744,6 +742,21 @@ func (c *PostgresConnector) EnsurePullability(req *protos.EnsurePullabilityBatch
return nil, err
}

isFullReplica, replErr := c.isTableFullReplica(schemaTable)
if replErr != nil {
return nil, fmt.Errorf("error getting replica identity for table %s: %w", schemaTable, replErr)
}

pKeyCols, err := c.getPrimaryKeyColumns(schemaTable)
if err != nil {
return nil, fmt.Errorf("error getting primary key column for table %s: %w", schemaTable, err)
}

// we only allow no primary key if the table has REPLICA IDENTITY FULL
if len(pKeyCols) == 0 && !isFullReplica {
return nil, fmt.Errorf("table %s has no primary keys and does not have REPLICA IDENTITY FULL", schemaTable)
}

tableIdentifierMapping[tableName] = &protos.TableIdentifier{
TableIdentifier: &protos.TableIdentifier_PostgresTableIdentifier{
PostgresTableIdentifier: &protos.PostgresTableIdentifier{
Expand Down
14 changes: 8 additions & 6 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,13 +783,15 @@ func generateCreateTableSQLForNormalizedTable(
}

// add composite primary key to the table
primaryKeyColsUpperQuoted := make([]string, 0)
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsUpperQuoted = append(primaryKeyColsUpperQuoted,
fmt.Sprintf(`"%s"`, strings.ToUpper(primaryKeyCol)))
if len(sourceTableSchema.PrimaryKeyColumns) > 0 {
primaryKeyColsUpperQuoted := make([]string, 0, len(sourceTableSchema.PrimaryKeyColumns))
for _, primaryKeyCol := range sourceTableSchema.PrimaryKeyColumns {
primaryKeyColsUpperQuoted = append(primaryKeyColsUpperQuoted,
fmt.Sprintf(`"%s"`, strings.ToUpper(primaryKeyCol)))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsUpperQuoted, ","), ",")))
}
createTableSQLArray = append(createTableSQLArray, fmt.Sprintf("PRIMARY KEY(%s),",
strings.TrimSuffix(strings.Join(primaryKeyColsUpperQuoted, ","), ",")))

return fmt.Sprintf(createNormalizedTableSQL, sourceTableIdentifier,
strings.TrimSuffix(strings.Join(createTableSQLArray, ""), ","))
Expand Down

0 comments on commit c6148e2

Please sign in to comment.