Skip to content

Commit

Permalink
Try waiting a long time for cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 4, 2024
1 parent 56f0752 commit a98d6e4
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
7 changes: 6 additions & 1 deletion flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,14 @@ func (p *PostgresCDCSource) consumeStream(
} else {
ctx, cancel = context.WithDeadline(p.ctx, nextStandbyMessageDeadline)
}

rawMsg, err := conn.ReceiveMessage(ctx)
cancel()

ctxErr := p.ctx.Err()
if ctxErr != nil {
return fmt.Errorf("consumeStream preempted: %w", ctxErr)
}

if err != nil && !p.commitLock {
if pgconn.Timeout(err) {
p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
time.Sleep(10 * time.Second)
time.Sleep(2 * time.Minute)
wg.Wait()
env.AssertExpectations(s.t)
}
Expand Down

0 comments on commit a98d6e4

Please sign in to comment.