Skip to content

Commit

Permalink
remove logging, move count check out of wait for
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
1 parent 4ce1f80 commit 21fd480
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,8 +1480,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
// verify our updates and delete happened
e2e.RequireEqualTables(s, "test_softdel_iud", "id,c1,c2,t")

newerSyncedAtQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM`+"`%s.%s`"+`WHERE _PEERDB_IS_DELETED`,
newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.Config.DatasetId, dstTableName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
Expand Down Expand Up @@ -1555,32 +1555,26 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
e2e.EnvNoError(s.t, env, insertTx.Commit(context.Background()))

e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize transaction", func() bool {
rows2, _ := s.GetRows(dstName, "id,c1,c2,_PEERDB_IS_DELETED")
s.t.Log("ROWS", rows2)

pgRows, err := e2e.GetPgRows(s.pool, s.bqSuffix, srcName, "id,c1,c2,t")
e2e.EnvNoError(s.t, env, err)
rows, err := s.GetRowsWhere(dstName, "id,c1,c2,t", "NOT _PEERDB_IS_DELETED")
if err != nil {
return false
}
if !e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows) {
return false
}

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.Config.DatasetId, dstName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
e2e.EnvNoError(s.t, env, err)
s.t.Log("waiting on _PEERDB_IS_DELETED to be 1, currently", numNewRows)
return err == nil && numNewRows == 1
return e2eshared.CheckEqualRecordBatches(s.t, pgRows, rows)
})

env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
s.bqHelper.Config.DatasetId, dstName)
numNewRows, err := s.bqHelper.RunInt64Query(newerSyncedAtQuery)
require.NoError(s.t, err)
require.Equal(s.t, int64(0), numNewRows)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
Expand Down

0 comments on commit 21fd480

Please sign in to comment.