From 51760c99ffd217a51c4bb12940cfebe5468e3dd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Thu, 4 Jan 2024 06:17:25 +0000 Subject: [PATCH] idea: need to sleep to avoid test ending before slot closed --- flow/connectors/postgres/postgres.go | 4 +--- flow/e2e/postgres/peer_flow_pg_test.go | 2 ++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index 43a11a2e72..7fef9d26c2 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -196,9 +196,7 @@ func (c *PostgresConnector) SetLastOffset(jobName string, lastOffset int64) erro // PullRecords pulls records from the source. func (c *PostgresConnector) PullRecords(catalogPool *pgxpool.Pool, req *model.PullRecordsRequest) error { - defer func() { - req.RecordStream.Close() - }() + defer req.RecordStream.Close() // Slotname would be the job name prefixed with "peerflow_slot_" slotName := fmt.Sprintf("peerflow_slot_%s", req.FlowJobName) diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index db23a27175..182d628bf7 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -267,6 +267,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + time.Sleep(10 * time.Second) wg.Wait() env.AssertExpectations(s.t) } @@ -337,6 +338,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + time.Sleep(10 * time.Second) wg.Wait() env.AssertExpectations(s.t) }