Skip to content

Commit

Permalink
finish overwrite, fix drop
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 20, 2024
1 parent a951776 commit 3b1de85
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 12 deletions.
1 change: 1 addition & 0 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ const (

dropTableIfExistsSQL = "DROP TABLE IF EXISTS %s.%s"
deleteJobMetadataSQL = "DELETE FROM %s.%s WHERE mirror_job_name=$1"
checkIfTableExistsSQL = `SELECT EXISTS(SELECT 1 FROM pg_tables WHERE schemaname=$1 AND tablename=$2) AS table_exists`
getNumConnectionsForUser = "SELECT COUNT(*) FROM pg_stat_activity WHERE usename=$1 AND client_addr IS NOT NULL"
)

Expand Down
39 changes: 28 additions & 11 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ func (c *PostgresConnector) GetTableSchema(
if activity.IsActivity(ctx) {
activity.RecordHeartbeat(ctx, "fetching schema for table "+tableName)
}
tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System)
tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System, req.SkipChecksForQRep)
if err != nil {
return nil, err
}
Expand All @@ -747,19 +747,24 @@ func (c *PostgresConnector) getTableSchemaForTable(
ctx context.Context,
tableName string,
system protos.TypeSystem,
skipTableAndReplIdentityChecks bool,
) (*protos.TableSchema, error) {
schemaTable, err := utils.ParseSchemaTable(tableName)
if err != nil {
return nil, err
}

replicaIdentityType, err := c.getReplicaIdentityType(ctx, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err)
}
pKeyCols, err := c.getUniqueColumns(ctx, replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err)
var pKeyCols []string
var replicaIdentityType ReplicaIdentityType
if !skipTableAndReplIdentityChecks {
replicaIdentityType, err := c.getReplicaIdentityType(ctx, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting replica identity for table %s: %w", schemaTable, err)
}
pKeyCols, err = c.getUniqueColumns(ctx, replicaIdentityType, schemaTable)
if err != nil {
return nil, fmt.Errorf("[getTableSchema] error getting primary key column for table %s: %w", schemaTable, err)
}
}

// Get the column names and types
Expand Down Expand Up @@ -1094,15 +1099,27 @@ func (c *PostgresConnector) SyncFlowCleanup(ctx context.Context, jobName string)
if err != nil {
return fmt.Errorf("unable to drop raw table: %w", err)
}
_, err = syncFlowCleanupTx.Exec(ctx,
fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)

// check if mirrorJobsTableIdentifier exists
var mirrorJobsTableExists bool
err = syncFlowCleanupTx.QueryRow(ctx, checkIfTableExistsSQL, c.metadataSchema,
mirrorJobsTableIdentifier).Scan(&mirrorJobsTableExists)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)
return fmt.Errorf("unable to check if mirror jobs table exists: %w", err)
}

if mirrorJobsTableExists {
_, err = syncFlowCleanupTx.Exec(ctx,
fmt.Sprintf(deleteJobMetadataSQL, c.metadataSchema, mirrorJobsTableIdentifier), jobName)
if err != nil {
return fmt.Errorf("unable to delete job metadata: %w", err)
}
}
err = syncFlowCleanupTx.Commit(ctx)
if err != nil {
return fmt.Errorf("unable to commit transaction for sync flow cleanup: %w", err)
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions flow/workflows/qrep_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func (q *QRepFlowExecution) getTableSchema(ctx workflow.Context, tableName strin
TableIdentifiers: []string{tableName},
FlowName: q.config.FlowJobName,
System: q.config.System,
SkipChecksForQRep: true,
}

future := workflow.ExecuteActivity(ctx, flowable.GetTableSchema, tableSchemaInput)
Expand Down
1 change: 1 addition & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ message GetTableSchemaBatchInput {
repeated string table_identifiers = 2;
string flow_name = 3;
TypeSystem system = 4;
bool skipChecksForQRep = 5;
}

message GetTableSchemaBatchOutput {
Expand Down
2 changes: 1 addition & 1 deletion ui/app/mirrors/create/qrep/qrep.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export default function QRepConfigForm({
}}
>
<Switch
checked={
defaultChecked={
setting.label === 'Create Destination Table'
? mirrorConfig.setupWatermarkTableOnDestination
: setting.label === 'Initial Copy Only'
Expand Down

0 comments on commit 3b1de85

Please sign in to comment.