diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 47dd2b3936..3582f5a2f6 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -204,7 +204,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements( updateStmts := make([]string, 0, len(unchangedToastCols)) for _, cols := range unchangedToastCols { - unchangedColsArray := strings.Split(cols, ", ") + unchangedColsArray := strings.Split(cols, ",") otherCols := utils.ArrayMinus(allCols, unchangedColsArray) tmpArray := make([]string, 0, len(otherCols)) for _, colName := range otherCols { diff --git a/flow/connectors/bigquery/merge_stmt_generator_test.go b/flow/connectors/bigquery/merge_stmt_generator_test.go index 0818caed56..141b3999b7 100644 --- a/flow/connectors/bigquery/merge_stmt_generator_test.go +++ b/flow/connectors/bigquery/merge_stmt_generator_test.go @@ -17,7 +17,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) { }, } allCols := []string{"col1", "col2", "col3"} - unchangedToastCols := []string{"", "col2, col3", "col2", "col3"} + unchangedToastCols := []string{"", "col2,col3", "col2", "col3"} expected := []string{ "WHEN MATCHED AND _rt!=2 AND _ut=''" + diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 7fef9d26c2..43a11a2e72 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -196,7 +196,9 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { - defer req.RecordStream.Close() + defer func() { + req.RecordStream.Close() + }() // Slotname would be the job name prefixed with "peerflow_slot_" slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) diff --git a/flow/connectors/utils/heartbeat.go b/flow/connectors/utils/heartbeat.go index 37f00bc72f..270680ded1 100644 --- a/flow/connectors/utils/heartbeat.go +++ b/flow/connectors/utils/heartbeat.go @@ -13,8 +13,8 @@ func HeartbeatRoutine( ctx context.Context, interval time.Duration, message func() string, -) context.CancelFunc { - ctx, cancel := context.WithCancel(ctx) +) func() { + shutdown := make(chan struct{}) go func() { counter := 0 for { @@ -22,13 +22,15 @@ func HeartbeatRoutine( msg := fmt.Sprintf("heartbeat #%d: %s", counter, message()) RecordHeartbeatWithRecover(ctx, msg) select { + case <-shutdown: + return case <-ctx.Done(): return case <-time.After(interval): } } }() - return cancel + return func() { shutdown <- struct{}{} } } // if the functions are being called outside the context of a Temporal workflow, diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 6db3d5c6fc..7dfc5441ad 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -133,7 +133,6 @@ func EnvWaitForEqualTables( cols string, ) { suite.T().Helper() - EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols) }