From d407f9e8ba3b5ea2a8b6e2319dcc2cef49b60385 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 25 Dec 2023 13:09:21 +0000 Subject: [PATCH] BQ/SF: always use GREATEST when setting offset (#896) While adding SetLastOffset, missed updating previous metadata update sql to use GREATEST too --- flow/connectors/bigquery/bigquery.go | 2 +- flow/connectors/snowflake/snowflake.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 64d12057f6..3c0787527e 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -927,7 +927,7 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec c.datasetID, MirrorJobsTable, jobName, lastSyncedCheckpointID, batchID) if hasJob { jobStatement = fmt.Sprintf( - "UPDATE %s.%s SET offset = %d,sync_batch_id=%d WHERE mirror_job_name = '%s';", + "UPDATE %s.%s SET offset=GREATEST(offset,%d),sync_batch_id=%d WHERE mirror_job_name = '%s';", c.datasetID, MirrorJobsTable, lastSyncedCheckpointID, batchID, jobName) } diff --git a/flow/connectors/snowflake/snowflake.go b/flow/connectors/snowflake/snowflake.go index e47c27a5f4..db13e188b8 100644 --- a/flow/connectors/snowflake/snowflake.go +++ b/flow/connectors/snowflake/snowflake.go @@ -39,11 +39,10 @@ const ( rawTableMultiValueInsertSQL = "INSERT INTO %s.%s VALUES%s" createNormalizedTableSQL = "CREATE TABLE IF NOT EXISTS %s(%s)" toVariantColumnName = "VAR_COLS" - mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS (SELECT _PEERDB_UID, - _PEERDB_TIMESTAMP, - TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, - _PEERDB_UNCHANGED_TOAST_COLUMNS FROM - _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND + mergeStatementSQL = `MERGE INTO %s TARGET USING (WITH VARIANT_CONVERTED AS ( + SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,TO_VARIANT(PARSE_JSON(_PEERDB_DATA)) %s,_PEERDB_RECORD_TYPE, + _PEERDB_MATCH_DATA,_PEERDB_BATCH_ID,_PEERDB_UNCHANGED_TOAST_COLUMNS + FROM _PEERDB_INTERNAL.%s WHERE _PEERDB_BATCH_ID > %d AND _PEERDB_BATCH_ID <= %d AND _PEERDB_DESTINATION_TABLE_NAME = ? ), FLATTENED AS (SELECT _PEERDB_UID,_PEERDB_TIMESTAMP,_PEERDB_RECORD_TYPE,_PEERDB_MATCH_DATA,_PEERDB_BATCH_ID, _PEERDB_UNCHANGED_TOAST_COLUMNS,%s @@ -66,7 +65,8 @@ const ( insertJobMetadataSQL = "INSERT INTO %s.%s VALUES (?,?,?,?)" - updateMetadataForSyncRecordsSQL = "UPDATE %s.%s SET OFFSET=?, SYNC_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" + updateMetadataForSyncRecordsSQL = `UPDATE %s.%s SET OFFSET=GREATEST(OFFSET, ?), SYNC_BATCH_ID=? + WHERE MIRROR_JOB_NAME=?` updateMetadataForNormalizeRecordsSQL = "UPDATE %s.%s SET NORMALIZE_BATCH_ID=? WHERE MIRROR_JOB_NAME=?" checkIfTableExistsSQL = `SELECT TO_BOOLEAN(COUNT(1)) FROM INFORMATION_SCHEMA.TABLES