diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index df79659f5c..2dc1a5bd09 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -599,6 +599,11 @@ func (c *PostgresConnector) isPartitionSynced(ctx context.Context, partitionID s } func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig) error { + if config.SourcePeer.Type != protos.DBType_SNOWFLAKE || + config.WriteMode.WriteType != protos.QRepWriteType_QREP_WRITE_MODE_OVERWRITE { + return nil + } + destinationTables := strings.Split(config.DestinationTableIdentifier, ";") constraintsHookExists := true @@ -621,6 +626,19 @@ func (c *PostgresConnector) ConsolidateQRepPartitions(ctx context.Context, confi } }() + _, err = tx.Exec(ctx, "SET statement_timeout=0") + if err != nil { + return fmt.Errorf("failed to set statement_timeout: %w", err) + } + _, err = tx.Exec(ctx, "SET idle_in_transaction_session_timeout=0") + if err != nil { + return fmt.Errorf("failed to set idle_in_transaction_session_timeout: %w", err) + } + _, err = tx.Exec(ctx, "SET lock_timeout=0") + if err != nil { + return fmt.Errorf("failed to set lock_timeout: %w", err) + } + for _, dstTableName := range destinationTables { dstSchemaTable, err := utils.ParseSchemaTable(dstTableName) if err != nil {