Skip to content

Commit

Permalink
postgres table check function modified
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Dec 5, 2023
1 parent a4a439f commit 7c3b604
Show file tree
Hide file tree
Showing 10 changed files with 405 additions and 1,401 deletions.
28 changes: 17 additions & 11 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,28 @@ func (c *PostgresConnector) getPrimaryKeyColumns(schemaTable *utils.SchemaTable)
return pkCols, nil
}

func (c *PostgresConnector) tableExists(schemaTable *utils.SchemaTable) (bool, error) {
var exists bool
err := c.pool.QueryRow(c.ctx,
`SELECT EXISTS (
SELECT FROM pg_tables
WHERE schemaname = $1
AND tablename = $2
)`,
func (c *PostgresConnector) tableExists(schemaTable *utils.SchemaTable) (*[]protos.TableColumn, error) {
rows, err := c.pool.Query(c.ctx,
`SELECT COLUMN_NAME, DATA_TYPE FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2`,
schemaTable.Schema,
schemaTable.Table,
).Scan(&exists)
)
if err != nil {
return false, fmt.Errorf("error checking if table exists: %w", err)
return nil, fmt.Errorf("error checking if table exists: %w", err)
}

defer rows.Close()
var columns []protos.TableColumn
for rows.Next() {
var colName, colType string
err = rows.Scan(&colName, &colType)
if err != nil {
return nil, fmt.Errorf("error while checking for existing table: %w", err)
}
columns = append(columns, protos.TableColumn{Name: colName, Type: colType})
}

return exists, nil
return &columns, nil
}

// checkSlotAndPublication checks if the replication slot and publication exist.
Expand Down
8 changes: 4 additions & 4 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ func (c *PostgresConnector) ConnectionActive() error {

// NeedsSetupMetadataTables returns true if the metadata tables need to be set up.
func (c *PostgresConnector) NeedsSetupMetadataTables() bool {
result, err := c.tableExists(&utils.SchemaTable{
columns, err := c.tableExists(&utils.SchemaTable{
Schema: c.metadataSchema,
Table: mirrorJobsTableIdentifier,
})
if err != nil {
return true
}
return !result
return len(*columns) != 0
}

// SetupMetadataTables sets up the metadata tables.
Expand Down Expand Up @@ -640,11 +640,11 @@ func (c *PostgresConnector) SetupNormalizedTables(req *protos.SetupNormalizedTab
if err != nil {
return nil, fmt.Errorf("error while parsing table schema and name: %w", err)
}
tableAlreadyExists, err := c.tableExists(parsedNormalizedTable)
destinationColumns, err := c.tableExists(parsedNormalizedTable)
if err != nil {
return nil, fmt.Errorf("error occurred while checking if normalized table exists: %w", err)
}
if tableAlreadyExists {
if destinationColumns != nil {
tableExistsMapping[tableIdentifier] = true
continue
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,12 +466,12 @@ func (c *PostgresConnector) SyncQRepRecords(
return 0, fmt.Errorf("failed to parse destination table identifier: %w", err)
}

exists, err := c.tableExists(dstTable)
sourceColumns, err := c.tableExists(dstTable)
if err != nil {
return 0, fmt.Errorf("failed to check if table exists: %w", err)
}

if !exists {
if len(*sourceColumns) == 0 {
return 0, fmt.Errorf("table %s does not exist, used schema: %s", dstTable.Table, dstTable.Schema)
}

Expand Down
204 changes: 138 additions & 66 deletions flow/generated/protos/peers.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions nexus/pt/src/peerdb_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ pub struct SqlServerConfig {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TableColumn {
#[prost(string, tag="1")]
pub name: ::prost::alloc::string::String,
#[prost(string, tag="2")]
pub r#type: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Peer {
#[prost(string, tag="1")]
pub name: ::prost::alloc::string::String,
Expand Down
112 changes: 112 additions & 0 deletions nexus/pt/src/peerdb_peers.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1927,3 +1927,115 @@ impl<'de> serde::Deserialize<'de> for SqlServerConfig {
deserializer.deserialize_struct("peerdb_peers.SqlServerConfig", FIELDS, GeneratedVisitor)
}
}
impl serde::Serialize for TableColumn {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct;
let mut len = 0;
if !self.name.is_empty() {
len += 1;
}
if !self.r#type.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("peerdb_peers.TableColumn", len)?;
if !self.name.is_empty() {
struct_ser.serialize_field("name", &self.name)?;
}
if !self.r#type.is_empty() {
struct_ser.serialize_field("type", &self.r#type)?;
}
struct_ser.end()
}
}
impl<'de> serde::Deserialize<'de> for TableColumn {
#[allow(deprecated)]
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
const FIELDS: &[&str] = &[
"name",
"type",
];

#[allow(clippy::enum_variant_names)]
enum GeneratedField {
Name,
Type,
__SkipField__,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
where
D: serde::Deserializer<'de>,
{
struct GeneratedVisitor;

impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = GeneratedField;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "expected one of: {:?}", &FIELDS)
}

#[allow(unused_variables)]
fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
where
E: serde::de::Error,
{
match value {
"name" => Ok(GeneratedField::Name),
"type" => Ok(GeneratedField::Type),
_ => Ok(GeneratedField::__SkipField__),
}
}
}
deserializer.deserialize_identifier(GeneratedVisitor)
}
}
struct GeneratedVisitor;
impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
type Value = TableColumn;

fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("struct peerdb_peers.TableColumn")
}

fn visit_map<V>(self, mut map: V) -> std::result::Result<TableColumn, V::Error>
where
V: serde::de::MapAccess<'de>,
{
let mut name__ = None;
let mut r#type__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Name => {
if name__.is_some() {
return Err(serde::de::Error::duplicate_field("name"));
}
name__ = Some(map.next_value()?);
}
GeneratedField::Type => {
if r#type__.is_some() {
return Err(serde::de::Error::duplicate_field("type"));
}
r#type__ = Some(map.next_value()?);
}
GeneratedField::__SkipField__ => {
let _ = map.next_value::<serde::de::IgnoredAny>()?;
}
}
}
Ok(TableColumn {
name: name__.unwrap_or_default(),
r#type: r#type__.unwrap_or_default(),
})
}
}
deserializer.deserialize_struct("peerdb_peers.TableColumn", FIELDS, GeneratedVisitor)
}
}
5 changes: 5 additions & 0 deletions protos/peers.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ enum DBType {
EVENTHUB_GROUP = 7;
}

message TableColumn{
string name = 1;
string type = 2;
}

message Peer {
string name = 1;
DBType type = 2;
Expand Down
Loading

0 comments on commit 7c3b604

Please sign in to comment.