diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index ec9c987247..97ecc65fac 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -1480,8 +1480,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { // verify our updates and delete happened e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t") - newerSyncedAtQuery := fmt.Sprintf(` - SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED`, + newerSyncedAtQuery := fmt.Sprintf( + "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", s.bqHelper.Config.DatasetId, dstTableName) numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) require.NoError(s.t, err) @@ -1555,32 +1555,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background())) e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize transaction", func() bool { - rows2, _ := s.GetRows(dstName, "id,c1,c2,_PEERDB_IS_DELETED") - s.t.Log("ROWS", rows2) - pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t") e2e.EnvNoError(s.t, env, err) rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED") if err != nil { return false } - if !e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) { - return false - } - - newerSyncedAtQuery := fmt.Sprintf( - "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", - s.bqHelper.Config.DatasetId, dstName) - numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) - e2e.EnvNoError(s.t, env, err) - s.t.Log("waiting on _PEERDB_IS_DELETED to be 1, currently", numNewRows) - return err == nil && numNewRows == 1 + return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) }) env.CancelWorkflow() }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + + newerSyncedAtQuery := fmt.Sprintf( + "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", + s.bqHelper.Config.DatasetId, dstName) + numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery) + require.NoError(s.t, err) + require.Equal(s.t, int64(0), numNewRows) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {