diff --git a/.github/workflows/flow.yml b/.github/workflows/flow.yml index 777b23039c..3e3a4a3c77 100644 --- a/.github/workflows/flow.yml +++ b/.github/workflows/flow.yml @@ -126,4 +126,3 @@ jobs: PEERDB_CATALOG_USER: postgres PEERDB_CATALOG_PASSWORD: postgres PEERDB_CATALOG_DATABASE: postgres - PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10 diff --git a/flow/e2e/congen.go b/flow/e2e/congen.go index c6300301cd..10f91c162f 100644 --- a/flow/e2e/congen.go +++ b/flow/e2e/congen.go @@ -139,18 +139,16 @@ func TearDownPostgres(pool *pgxpool.Pool, suffix string) error { // drop the e2e_test schema if pool != nil { deadline := time.Now().Add(time.Minute) - var err error for { - err = cleanPostgres(pool, suffix) - if time.Now().Compare(deadline) > 0 { - break + err := cleanPostgres(pool, suffix) + if err == nil { + pool.Close() + return nil + } else if time.Now().After(deadline) { + return err } time.Sleep(time.Second) } - if err != nil { - return err - } - pool.Close() } return nil } @@ -196,7 +194,7 @@ func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Pee return ret, nil } -func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*protos.FlowConnectionConfigs, error) { +func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos.FlowConnectionConfigs { tblMappings := []*protos.TableMapping{} for k, v := range c.TableNameMapping { tblMappings = append(tblMappings, &protos.TableMapping{ @@ -205,18 +203,20 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto }) } - ret := &protos.FlowConnectionConfigs{} - ret.FlowJobName = c.FlowJobName - ret.TableMappings = tblMappings - ret.Source = GeneratePostgresPeer(c.PostgresPort) - ret.Destination = c.Destination - ret.CdcStagingPath = c.CdcStagingPath - ret.SoftDelete = c.SoftDelete + ret := &protos.FlowConnectionConfigs{ + FlowJobName: c.FlowJobName, + TableMappings: tblMappings, + Source: GeneratePostgresPeer(c.PostgresPort), + Destination: c.Destination, + CdcStagingPath: c.CdcStagingPath, + SoftDelete: c.SoftDelete, + SyncedAtColName: "_PEERDB_SYNCED_AT", + IdleTimeoutSeconds: 30, + } if ret.SoftDelete { ret.SoftDeleteColName = "_PEERDB_IS_DELETED" } - ret.SyncedAtColName = "_PEERDB_SYNCED_AT" - return ret, nil + return ret } type QRepFlowConnectionGenerationConfig struct { diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 62636dd526..4aa058c06f 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -543,7 +543,7 @@ func EnvWaitFor(t *testing.T, env *testsuite.TestWorkflowEnvironment, timeout ti deadline, _ := ctx.Deadline() t.Log("WaitFor", reason) for !f(ctx) { - if time.Now().Compare(deadline) >= 0 { + if time.Now().After(deadline) { t.Error("UNEXPECTED TIMEOUT", reason) env.CancelWorkflow() runtime.Goexit()