Skip to content

Commit

Permalink
cleanup code, move cdc timeout into code, raise to 30, fix teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 6, 2024
1 parent 8a696b0 commit a77938b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 20 deletions.
1 change: 0 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,3 @@ jobs:
PEERDB_CATALOG_USER: postgres
PEERDB_CATALOG_PASSWORD: postgres
PEERDB_CATALOG_DATABASE: postgres
PEERDB_CDC_IDLE_TIMEOUT_SECONDS: 10
36 changes: 18 additions & 18 deletions flow/e2e/congen.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,16 @@ func TearDownPostgres(pool *pgxpool.Pool, suffix string) error {
// drop the e2e_test schema
if pool != nil {
deadline := time.Now().Add(time.Minute)
var err error
for {
err = cleanPostgres(pool, suffix)
if time.Now().Compare(deadline) > 0 {
break
err := cleanPostgres(pool, suffix)
if err == nil {
pool.Close()
return nil
} else if time.Now().After(deadline) {
return err
}
time.Sleep(time.Second)
}
if err != nil {
return err
}
pool.Close()
}
return nil
}
Expand Down Expand Up @@ -196,7 +194,7 @@ func GenerateSnowflakePeer(snowflakeConfig *protos.SnowflakeConfig) (*protos.Pee
return ret, nil
}

func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*protos.FlowConnectionConfigs, error) {
func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() *protos.FlowConnectionConfigs {
tblMappings := []*protos.TableMapping{}
for k, v := range c.TableNameMapping {
tblMappings = append(tblMappings, &protos.TableMapping{
Expand All @@ -205,18 +203,20 @@ func (c *FlowConnectionGenerationConfig) GenerateFlowConnectionConfigs() (*proto
})
}

ret := &protos.FlowConnectionConfigs{}
ret.FlowJobName = c.FlowJobName
ret.TableMappings = tblMappings
ret.Source = GeneratePostgresPeer(c.PostgresPort)
ret.Destination = c.Destination
ret.CdcStagingPath = c.CdcStagingPath
ret.SoftDelete = c.SoftDelete
ret := &protos.FlowConnectionConfigs{
FlowJobName: c.FlowJobName,
TableMappings: tblMappings,
Source: GeneratePostgresPeer(c.PostgresPort),
Destination: c.Destination,
CdcStagingPath: c.CdcStagingPath,
SoftDelete: c.SoftDelete,
SyncedAtColName: "_PEERDB_SYNCED_AT",
IdleTimeoutSeconds: 30,
}
if ret.SoftDelete {
ret.SoftDeleteColName = "_PEERDB_IS_DELETED"
}
ret.SyncedAtColName = "_PEERDB_SYNCED_AT"
return ret, nil
return ret
}

type QRepFlowConnectionGenerationConfig struct {
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func EnvWaitFor(t *testing.T, env *testsuite.TestWorkflowEnvironment, timeout ti
deadline, _ := ctx.Deadline()
t.Log("WaitFor", reason)
for !f(ctx) {
if time.Now().Compare(deadline) >= 0 {
if time.Now().After(deadline) {
t.Error("UNEXPECTED TIMEOUT", reason)
env.CancelWorkflow()
runtime.Goexit()
Expand Down

0 comments on commit a77938b

Please sign in to comment.