From 3b1de85734439fae46a8803cbe0d2b17d5c15b4a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 21 May 2024 03:32:08 +0530 Subject: [PATCH] finish overwrite, fix drop --- flow/connectors/postgres/client.go | 1 + flow/connectors/postgres/postgres.go | 39 ++++++++++++++++++++-------- flow/workflows/qrep_flow.go | 1 + protos/flow.proto | 1 + ui/app/mirrors/create/qrep/qrep.tsx | 2 +- 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/flow/connectors/postgres/client.go b/flow/connectors/postgres/client.go index c12ca20559..33d0a73ada 100644 --- a/flow/connectors/postgres/client.go +++ b/flow/connectors/postgres/client.go @@ -74,6 +74,7 @@ const ( dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s" deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE mirror_job_name=$1" + checkIfTableExistsSQL = `SELECT EXISTS(SELECT 1 FROM pg_tables WHERE schemaname=$1 AND tablename=$2) AS table_exists` getNumConnectionsForUser = "SELECT COUNT(*) FROM pg_stat_activity WHERE usename=$1 AND client_addr IS NOT NULL" ) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 03ee0c02b1..bb42cca8b0 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -730,7 +730,7 @@ func (c *PostgresConnector) GetTableSchema( if activity.IsActivity(ctx) { activity.RecordHeartbeat(ctx, "fetching schema for table "+tableName) } - tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System) + tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System, req.SkipChecksForQRep) if err != nil { return nil, err } @@ -747,19 +747,24 @@ func (c *PostgresConnector) getTableSchemaForTable( ctx context.Context, tableName string, system protos.TypeSystem, + skipTableAndReplIdentityChecks bool, ) (*protos.TableSchema, error) { schemaTable, err := utils.ParseSchemaTable(tableName) if err != nil { return nil, err } - replicaIdentityType, err := c.getReplicaIdentityType(ctx, schemaTable) - if err != nil { - return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err) - } - pKeyCols, err := c.getUniqueColumns(ctx, replicaIdentityType, schemaTable) - if err != nil { - return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err) + var pKeyCols []string + var replicaIdentityType ReplicaIdentityType + if !skipTableAndReplIdentityChecks { + replicaIdentityType, err := c.getReplicaIdentityType(ctx, schemaTable) + if err != nil { + return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err) + } + pKeyCols, err = c.getUniqueColumns(ctx, replicaIdentityType, schemaTable) + if err != nil { + return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err) + } } // Get the column names and types @@ -1094,15 +1099,27 @@ func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string) if err != nil { return fmt.Errorf("unable to drop raw table: %w", err) } - _, err = syncFlowCleanupTx.Exec(ctx, - fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + + // check if mirrorJobsTableIdentifier exists + var mirrorJobsTableExists bool + err = syncFlowCleanupTx.QueryRow(ctx, checkIfTableExistsSQL, c.metadataSchema, + mirrorJobsTableIdentifier).Scan(&mirrorJobsTableExists) if err != nil { - return fmt.Errorf("unable to delete job metadata: %w", err) + return fmt.Errorf("unable to check if mirror jobs table exists: %w", err) + } + + if mirrorJobsTableExists { + _, err = syncFlowCleanupTx.Exec(ctx, + fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName) + if err != nil { + return fmt.Errorf("unable to delete job metadata: %w", err) + } } err = syncFlowCleanupTx.Commit(ctx) if err != nil { return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err) } + return nil } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 49dbdf6ce5..e4dcd34df3 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -113,6 +113,7 @@ func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName strin TableIdentifiers: []string{tableName}, FlowName: q.config.FlowJobName, System: q.config.System, + SkipChecksForQRep: true, } future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput) diff --git a/protos/flow.proto b/protos/flow.proto index 4d4343f71a..c0ecba8b1b 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -178,6 +178,7 @@ message GetTableSchemaBatchInput { repeated string table_identifiers = 2; string flow_name = 3; TypeSystem system = 4; + bool skipChecksForQRep = 5; } message GetTableSchemaBatchOutput { diff --git a/ui/app/mirrors/create/qrep/qrep.tsx b/ui/app/mirrors/create/qrep/qrep.tsx index 82812cf53a..48099dd6ff 100644 --- a/ui/app/mirrors/create/qrep/qrep.tsx +++ b/ui/app/mirrors/create/qrep/qrep.tsx @@ -151,7 +151,7 @@ export default function QRepConfigForm({ }} >