From 7d6f341e7b0db8198126d9afce1394134d473bba Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Tue, 12 Mar 2024 20:15:28 +0530 Subject: [PATCH] setting timeouts to 0 for consolidate txn (#1474) --- flow/connectors/postgres/qrep.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index c2633a9ee0..98a5b7c3c3 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -592,6 +592,11 @@ func (c *PostgresConnector) IsQRepPartitionSynced(ctx context.Context, } func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error { + if config.SourcePeer.Type != protos.DBType_SNOWFLAKE || + config.WriteMode.WriteType != protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + return nil + } + destinationTables := strings.Split(config.DestinationTableIdentifier, ";") constraintsHookExists := true @@ -614,6 +619,19 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi } }() + _, err = tx.Exec(ctx, "SET statement_timeout=0") + if err != nil { + return fmt.Errorf("failed to set statement_timeout: %w", err) + } + _, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout=0") + if err != nil { + return fmt.Errorf("failed to set idle_in_transaction_session_timeout: %w", err) + } + _, err = tx.Exec(ctx, "SET lock_timeout=0") + if err != nil { + return fmt.Errorf("failed to set lock_timeout: %w", err) + } + for _, dstTableName := range destinationTables { dstSchemaTable, err := utils.ParseSchemaTable(dstTableName) if err != nil {