From b1be612036b52964db58149667ed25640710f561 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Sun, 22 Oct 2023 20:58:28 -0400 Subject: [PATCH] scope to a smaller transaction (#559) --- flow/connectors/snowflake/snowflake.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index 6a05bf9832..6658b9e5cb 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -494,6 +494,14 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. } syncBatchID = syncBatchID + 1 + var res *model.SyncResponse + if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { + res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) + if err != nil { + return nil, err + } + } + // transaction for SyncRecords syncRecordsTx, err := c.database.BeginTx(c.ctx, nil) if err != nil { @@ -510,13 +518,7 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. } }() - var res *model.SyncResponse - if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO { - res, err = c.syncRecordsViaAvro(req, rawTableIdentifier, syncBatchID) - if err != nil { - return nil, err - } - } else if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT { + if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT { res, err = c.syncRecordsViaSQL(req, rawTableIdentifier, syncBatchID, syncRecordsTx) if err != nil { return nil, err @@ -539,7 +541,6 @@ func (c *SnowflakeConnector) SyncRecords(req *model.SyncRecordsRequest) (*model. func (c *SnowflakeConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest, rawTableIdentifier string, syncBatchID int64, syncRecordsTx *sql.Tx) (*model.SyncResponse, error) { - records := make([]snowflakeRawRecord, 0) tableNameRowsMapping := make(map[string]uint32)