Skip to content

Commit

Permalink
adds test
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 20, 2023
1 parent 6d357f5 commit b3008e1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
3 changes: 3 additions & 0 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func (c *PostgresConnector) getMinMaxValues(
) (interface{}, interface{}, error) {
var minValue, maxValue interface{}
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)
if config.WatermarkColumn == "xmin" {
quotedWatermarkColumn = fmt.Sprintf("%s::text::bigint", quotedWatermarkColumn)
}
// Get the maximum value from the database
maxQuery := fmt.Sprintf("SELECT MAX(%[1]s) FROM %[2]s", quotedWatermarkColumn, config.WatermarkTable)
row := tx.QueryRow(c.ctx, maxQuery)
Expand Down
43 changes: 43 additions & 0 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,49 @@ func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_XMIN_SF_Append() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)

numRows := 10

tblName := "test_qrep_flow_avro_sf_xmin_append"
s.setupSourceTable(tblName, numRows)
s.setupSFDestinationTable(tblName)

dstSchemaQualified := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, tblName)

query := fmt.Sprintf("SELECT * FROM e2e_test_%s.%s WHERE xmin::text::bigint BETWEEN {{.start}} AND {{.end}}",
snowflakeSuffix, tblName)

qrepConfig, err := e2e.CreateQRepWorkflowConfig(
"test_qrep_flow_avro_sf_xmin",
fmt.Sprintf("e2e_test_%s.%s", snowflakeSuffix, tblName),
dstSchemaQualified,
query,
protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO,
s.sfHelper.Peer,
"",
)

qrepConfig.WatermarkColumn = "xmin"
s.NoError(err)

e2e.RunQrepFlowWorkflow(env, qrepConfig)

// Verify workflow completes without error
s.True(env.IsWorkflowCompleted())

// assert that error contains "invalid connection configs"
err = env.GetWorkflowError()
s.NoError(err)

sel := e2e.GetOwnersSelectorString()
s.compareTableContentsSF(tblName, sel, true)

env.AssertExpectations(s.T())
}

func (s *PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration() {
env := s.NewTestWorkflowEnvironment()
e2e.RegisterWorkflowsAndActivities(env)
Expand Down

0 comments on commit b3008e1

Please sign in to comment.