diff --git a/flow/connectors/postgres/qrep.go b/flow/connectors/postgres/qrep.go index ad42970924..3c0d3124b8 100644 --- a/flow/connectors/postgres/qrep.go +++ b/flow/connectors/postgres/qrep.go @@ -230,6 +230,9 @@ func (c *PostgresConnector) getMinMaxValues( ) (interface{}, interface{}, error) { var minValue, maxValue interface{} quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn) + if config.WatermarkColumn == "xmin" { + quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn) + } // Get the maximum value from the database maxQuery := fmt.Sprintf("SELECT MAX(%[1]s) FROM %[2]s", quotedWatermarkColumn, config.WatermarkTable) row := tx.QueryRow(c.ctx, maxQuery) diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 4595a198e9..7af595c0f2 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -212,6 +212,49 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() { env.AssertExpectations(s.T()) } +func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_XMIN_SF_Append() { + env := s.NewTestWorkflowEnvironment() + e2e.RegisterWorkflowsAndActivities(env) + + numRows := 10 + + tblName := "test_qrep_flow_avro_sf_xmin_append" + s.setupSourceTable(tblName, numRows) + s.setupSFDestinationTable(tblName) + + dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName) + + query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}", + snowflakeSuffix, tblName) + + qrepConfig, err := e2e.CreateQRepWorkflowConfig( + "test_qrep_flow_avro_sf_xmin", + fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName), + dstSchemaQualified, + query, + protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO, + s.sfHelper.Peer, + "", + ) + + qrepConfig.WatermarkColumn = "xmin" + s.NoError(err) + + e2e.RunQrepFlowWorkflow(env, qrepConfig) + + // Verify workflow completes without error + s.True(env.IsWorkflowCompleted()) + + // assert that error contains "invalid connection configs" + err = env.GetWorkflowError() + s.NoError(err) + + sel := e2e.GetOwnersSelectorString() + s.compareTableContentsSF(tblName, sel, true) + + env.AssertExpectations(s.T()) +} + func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() { env := s.NewTestWorkflowEnvironment() e2e.RegisterWorkflowsAndActivities(env)