Skip to content

Commit

Permalink
Merge branch 'main' into wait-for
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 8, 2024
2 parents 446df16 + 311a65d commit 6b411e7
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 7 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (m *mergeStmtGenerator) generateUpdateStatements(
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
unchangedColsArray := strings.Split(cols, ", ")
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestGenerateUpdateStatement_WithUnchangedToastCols(t *testing.T) {
},
}
allCols := []string{"col1", "col2", "col3"}
unchangedToastCols := []string{"", "col2, col3", "col2", "col3"}
unchangedToastCols := []string{"", "col2,col3", "col2", "col3"}

expected := []string{
"WHEN MATCHED AND _rt!=2 AND _ut=''" +
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro

// PullRecords pulls records from the source.
func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error {
defer req.RecordStream.Close()
defer func() {
req.RecordStream.Close()
}()

// Slotname would be the job name prefixed with "peerflow_slot_"
slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName)
Expand Down
8 changes: 5 additions & 3 deletions flow/connectors/utils/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@ func HeartbeatRoutine(
ctx context.Context,
interval time.Duration,
message func() string,
) context.CancelFunc {
ctx, cancel := context.WithCancel(ctx)
) func() {
shutdown := make(chan struct{})
go func() {
counter := 0
for {
counter += 1
msg := fmt.Sprintf("heartbeat #%d: %s", counter, message())
RecordHeartbeatWithRecover(ctx, msg)
select {
case <-shutdown:
return
case <-ctx.Done():
return
case <-time.After(interval):
}
}
}()
return cancel
return func() { shutdown <- struct{}{} }
}

// if the functions are being called outside the context of a Temporal workflow,
Expand Down
1 change: 0 additions & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func EnvWaitForEqualTables(
cols string,
) {
suite.T().Helper()

EnvWaitForEqualTablesWithNames(env, suite, reason, table, table, cols)
}

Expand Down

0 comments on commit 6b411e7

Please sign in to comment.