From 3dfe841f8b492762edeb31599120a64d5c7d8174 Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Thu, 14 Mar 2024 00:14:26 +0530 Subject: [PATCH] stilgar improvements post demo (#1479) --- flow/activities/flowable.go | 10 +++++ flow/connectors/postgres/qrep.go | 46 ++++++++--------------- flow/connectors/postgres/qrep_sql_sync.go | 6 ++- flow/workflows/qrep_flow.go | 1 + protos/flow.proto | 1 + 5 files changed, 33 insertions(+), 31 deletions(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index bf808b0ee6..73c47d8417 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -190,6 +190,16 @@ func (a *FlowableActivity) CreateNormalizedTable( tableExistsMapping := make(map[string]bool) for tableIdentifier, tableSchema := range config.TableNameSchemaMapping { + if config.AttemptDrop && config.PeerConnectionConfig.Type == protos.DBType_POSTGRES { + parsedNormalizedTable, err := utils.ParseSchemaTable(tableIdentifier) + if err != nil { + return nil, fmt.Errorf("error while parsing table schema and name: %w", err) + } + _, err = tx.(pgx.Tx).Exec(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", parsedNormalizedTable.String())) + if err != nil { + return nil, fmt.Errorf("error while attempting to drop normalized table: %w", err) + } + } existing, err := conn.SetupNormalizedTable( ctx, tx, diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index c9c271eb78..c4f26aa53d 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -607,7 +607,7 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi destinationTables := strings.Split(config.DestinationTableIdentifier, ";") constraintsHookExists := true - _, err := c.conn.Exec(ctx, fmt.Sprintf("SELECT '_peerdb_post_run_hook_%s()'::regprocedure", + _, err := c.conn.Exec(ctx, fmt.Sprintf("SELECT '_peerdb_pre_run_hook_%s()'::regprocedure", config.FlowJobName)) if err != nil { constraintsHookExists = false @@ -626,19 +626,31 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi } }() - _, err = tx.Exec(ctx, "SET statement_timeout=0") + _, err = tx.Exec(ctx, "SET LOCAL statement_timeout=0") if err != nil { return fmt.Errorf("failed to set statement_timeout: %w", err) } - _, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout=0") + _, err = tx.Exec(ctx, "SET LOCAL idle_in_transaction_session_timeout=0") if err != nil { return fmt.Errorf("failed to set idle_in_transaction_session_timeout: %w", err) } - _, err = tx.Exec(ctx, "SET lock_timeout=0") + _, err = tx.Exec(ctx, "SET LOCAL lock_timeout='60s'") if err != nil { return fmt.Errorf("failed to set lock_timeout: %w", err) } + if constraintsHookExists { + c.logger.Info("executing constraints hook", slog.String("procName", + fmt.Sprintf("_peerdb_pre_run_hook_%s()", config.FlowJobName))) + _, err = tx.Exec(ctx, fmt.Sprintf("SELECT _peerdb_pre_run_hook_%s()", config.FlowJobName)) + if err != nil { + return fmt.Errorf("failed to execute stored procedure for applying constraints: %w", err) + } + } else { + c.logger.Info("no constraints hook found", slog.String("procName", + fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName))) + } + for _, dstTableName := range destinationTables { dstSchemaTable, err := utils.ParseSchemaTable(dstTableName) if err != nil { @@ -651,38 +663,12 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi if err != nil { return fmt.Errorf("failed to drop %s: %v", dstTableName, err) } - - if config.SyncedAtColName != "" { - updateSyncedAtStmt := fmt.Sprintf( - `UPDATE %s SET %s = CURRENT_TIMESTAMP`, - tempTableIdentifier.Sanitize(), - QuoteIdentifier(config.SyncedAtColName), - ) - _, err = tx.Exec(ctx, updateSyncedAtStmt) - if err != nil { - return fmt.Errorf("failed to update synced_at column: %v", err) - } - } - _, err = tx.Exec(ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", tempTableIdentifier.Sanitize(), QuoteIdentifier(dstSchemaTable.Table))) if err != nil { return fmt.Errorf("failed to rename %s to %s: %v", tempTableIdentifier.Sanitize(), dstTableIdentifier.Sanitize(), err) } - - } - - if constraintsHookExists { - c.logger.Info("executing constraints hook", slog.String("procName", - fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName))) - _, err = tx.Exec(ctx, fmt.Sprintf("SELECT _peerdb_post_run_hook_%s()", config.FlowJobName)) - if err != nil { - return fmt.Errorf("failed to execute stored procedure for applying constraints: %w", err) - } - } else { - c.logger.Info("no constraints hook found", slog.String("procName", - fmt.Sprintf("_peerdb_post_run_hook_%s()", config.FlowJobName))) } err = tx.Commit(ctx) diff --git a/flow/connectors/postgres/qrep_sql_sync.go b/flow/connectors/postgres/qrep_sql_sync.go index 868db7de26..b6690628dc 100644 --- a/flow/connectors/postgres/qrep_sql_sync.go +++ b/flow/connectors/postgres/qrep_sql_sync.go @@ -109,7 +109,11 @@ func (s *QRepStagingTableSync) SyncQRepRecords( } } else if writeMode.WriteType == protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { tempTableIdentifier := pgx.Identifier{dstTableName.Schema, dstTableName.Table} - _, err = tx.CopyFrom(ctx, tempTableIdentifier, schema.GetColumnNames(), copySource) + normalizedColNames := make([]string, 0, len(schema.GetColumnNames())) + for _, colName := range schema.GetColumnNames() { + normalizedColNames = append(normalizedColNames, utils.PostgresIdentifierNormalize(colName)) + } + _, err = tx.CopyFrom(ctx, tempTableIdentifier, normalizedColNames, copySource) if err != nil { return -1, fmt.Errorf("failed to copy records into %s: %v", tempTableIdentifier, err) } diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 4b85b9fff8..4d09ca4826 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -138,6 +138,7 @@ func (q *QRepFlowExecution) SetupWatermarkTableOnDestination(ctx workflow.Contex TableNameSchemaMapping: dstTableNameSchemaMapping, SyncedAtColName: q.config.SyncedAtColName, FlowName: q.config.FlowJobName, + AttemptDrop: true, } future := workflow.ExecuteActivity(ctx, flowable.CreateNormalizedTable, setupConfig) diff --git a/protos/flow.proto b/protos/flow.proto index dac49782dc..aa69a6cff3 100644 --- a/protos/flow.proto +++ b/protos/flow.proto @@ -188,6 +188,7 @@ message SetupNormalizedTableBatchInput { string soft_delete_col_name = 4; string synced_at_col_name = 5; string flow_name = 6; + bool attempt_drop = 7; } message SetupNormalizedTableOutput {