diff --git a/flow/connectors/snowflake/qrep_avro_sync.go b/flow/connectors/snowflake/qrep_avro_sync.go index 6abfe71fa9..a8946321db 100644 --- a/flow/connectors/snowflake/qrep_avro_sync.go +++ b/flow/connectors/snowflake/qrep_avro_sync.go @@ -379,11 +379,6 @@ func GenerateMergeCommand( upsertKeyCols[i] = caseMatchedCols[strings.ToLower(col)] } - watermarkCol, ok := caseMatchedCols[strings.ToLower(watermarkCol)] - if !ok { - return "", fmt.Errorf("watermark column '%s' not found in destination table", watermarkCol) - } - upsertKeys := []string{} partitionKeyCols := []string{} for _, key := range upsertKeyCols { @@ -405,23 +400,36 @@ func GenerateMergeCommand( updateSetClause := strings.Join(updateSetClauses, ", ") insertColumnsClause := strings.Join(insertColumnsClauses, ", ") insertValuesClause := strings.Join(insertValuesClauses, ", ") - - quotedWMC := utils.QuoteIdentifier(watermarkCol) - selectCmd := fmt.Sprintf(` SELECT * FROM %s QUALIFY ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s DESC) = 1 - `, tempTableName, strings.Join(partitionKeyCols, ","), quotedWMC) - - mergeCmd := fmt.Sprintf(` - MERGE INTO %s dst - USING (%s) src - ON %s - WHEN MATCHED AND src.%s > dst.%s THEN UPDATE SET %s - WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s) - `, dstTable, selectCmd, upsertKeyClause, quotedWMC, quotedWMC, - updateSetClause, insertColumnsClause, insertValuesClause) + `, tempTableName, strings.Join(partitionKeyCols, ","), partitionKeyCols[0]) + var mergeCmd string + if watermarkCol == "xmin" { // we don't want to depend on wmc + mergeCmd = fmt.Sprintf(` + MERGE INTO %s dst + USING (%s) src + ON %s + WHEN MATCHED THEN UPDATE SET %s + WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s) + `, dstTable, selectCmd, upsertKeyClause, + updateSetClause, insertColumnsClause, insertValuesClause) + } else { + watermarkCol, ok := caseMatchedCols[strings.ToLower(watermarkCol)] + if !ok { + return "", fmt.Errorf("watermark column '%s' not found in destination table", watermarkCol) + } + quotedWMC := utils.QuoteIdentifier(watermarkCol) + mergeCmd = fmt.Sprintf(` + MERGE INTO %s dst + USING (%s) src + ON %s + WHEN MATCHED AND src.%s > dst.%s THEN UPDATE SET %s + WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s) + `, dstTable, selectCmd, upsertKeyClause, quotedWMC, quotedWMC, + updateSetClause, insertColumnsClause, insertValuesClause) + } return mergeCmd, nil } diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 7af595c0f2..7f4b142168 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -255,6 +255,52 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_XMIN_SF_Append() { env.AssertExpectations(s.T()) } +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + numRows := 10 + + tblName := "test_qrep_flow_avro_sf_ups_xmin" + s.setupSourceTable(tblName, numRows) + s.setupSFDestinationTable(tblName) + + dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}", + snowflakeSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_flow_avro_sf_xmin", + fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + dstSchemaQualified, + query, + protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + s.sfHelper.Peer, + "", + ) + qrepConfig.WriteMode = &protos.QRepWriteMode{ + WriteType: protos.QRepWriteType_QREP_WRITE_MODE_UPSERT, + UpsertKeyColumns: []string{"id"}, + } + qrepConfig.WatermarkColumn = "xmin" + s.NoError(err) + + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + // assert that error contains "invalid connection configs" + err = env.GetWorkflowError() + s.NoError(err) + + sel := e2e.GetOwnersSelectorString() + s.compareTableContentsSF(tblName, sel, true) + + env.AssertExpectations(s.T()) +} + func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env)