Skip to content

Commit

Permalink
Remove ExitAfterRecords, always use WaitFor (#1206)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 5, 2024
1 parent 92b752d commit e49b4c3
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 395 deletions.
3 changes: 1 addition & 2 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
}

limits := &peerflow.CDCFlowLimits{
ExitAfterRecords: -1,
MaxBatchSize: maxBatchSize,
MaxBatchSize: maxBatchSize,
}

if req.ConnectionConfigs.SoftDeleteColName == "" {
Expand Down
11 changes: 7 additions & 4 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,9 @@ func (b *BigQueryTestHelper) CheckNull(tableName string, colName []string) (bool
}

// check if NaN, Inf double values are null
func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, colName []string) (bool, error) {
csep := strings.Join(colName, ",")
command := fmt.Sprintf("SELECT %s FROM `%s.%s`",
csep, b.Config.DatasetId, tableName)
func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, c1 string, c2 string) (bool, error) {
command := fmt.Sprintf("SELECT %s, %s FROM `%s.%s`",
c1, c2, b.Config.DatasetId, tableName)
q := b.client.Query(command)
q.DisableQueryCache = true
it, err := q.Read(context.Background())
Expand All @@ -437,6 +436,10 @@ func (b *BigQueryTestHelper) CheckDoubleValues(tableName string, colName []strin
}
}

if len(row) == 0 {
return false, nil
}

floatArr, _ := row[1].([]float64)
if row[0] != nil || len(floatArr) > 0 {
return false, nil
Expand Down
Loading

0 comments on commit e49b4c3

Please sign in to comment.