Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
1 parent 8eae927 commit d7c329a
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
17 changes: 9 additions & 8 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,21 +1583,22 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
if err != nil {
return false
}
return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows)
if !e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) {
return false
}

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.datasetName, dstName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
return err != nil && numNewRows == 1
},
)

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.datasetName, dstName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
require.Equal(s.t, int64(1), numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
Expand Down
5 changes: 2 additions & 3 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,9 +1239,9 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {

e2e.EnvWaitForEqualTables(env, s, "normalize table", tableName, "id,t,t2")
_, err = s.pool.Exec(context.Background(),
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=$1`, srcTableName), 1)
fmt.Sprintf(`UPDATE %s SET c1=c1+1 WHERE MOD(c2,2)=1`, srcTableName))
e2e.EnvNoError(s.t, env, err)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=$1`, srcTableName), 0)
_, err = s.pool.Exec(context.Background(), fmt.Sprintf(`DELETE FROM %s WHERE MOD(c2,2)=0`, srcTableName))
e2e.EnvNoError(s.t, env, err)
e2e.EnvWaitForEqualTables(env, s, "normalize update/delete", tableName, "id,t,t2")

Expand All @@ -1257,7 +1257,6 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() {
require.NotEqual(s.t, field.Name, "c2")
}
require.Equal(s.t, 5, len(sfRows.Schema.Fields))
require.Equal(s.t, 10, len(sfRows.Records))
}

func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() {
Expand Down

0 comments on commit d7c329a

Please sign in to comment.