Skip to content

Commit

Permalink
Log workflow not being finished
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 29, 2024
1 parent b04ca20 commit ddcdfb1
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 22 deletions.
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
tc := e2e.NewTemporalClient(s.t)

env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, nil, nil)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.Error(s.t, env.Error())

// assert that error contains "invalid connection configs"
Expand Down
6 changes: 3 additions & 3 deletions flow/e2e/bigquery/qrep_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_QRep_Flow_Avro() {
"")
require.NoError(s.t, err)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

e2e.RequireEqualTables(s, tblName, "*")
Expand All @@ -103,7 +103,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Timestamps_And_Date_QRep() {
qrepConfig.WatermarkColumn = "watermark_ts"
require.NoError(s.t, err)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

goodValues := []string{"watermark_ts", "mydate", "medieval"}
Expand Down Expand Up @@ -143,7 +143,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_PeerDB_Columns_QRep_BQ() {
"_PEERDB_SYNCED_AT")
require.NoError(s.t, err)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

err = s.checkPeerdbColumns(tblName, false)
Expand Down
15 changes: 6 additions & 9 deletions flow/e2e/postgres/qrep_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ func (s PeerFlowE2ETestSuitePG) TestSimpleSlotCreation() {
}

func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 10

srcTable := "test_qrep_flow_avro_pg_1"
Expand Down Expand Up @@ -247,17 +245,16 @@ func (s PeerFlowE2ETestSuitePG) Test_Complete_QRep_Flow_Multi_Insert_PG() {
)
require.NoError(s.t, err)

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

err = s.comparePGTables(srcSchemaQualified, dstSchemaQualified, "*")
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 10

srcTable := "test_qrep_columns_pg_1"
Expand Down Expand Up @@ -285,17 +282,16 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns_QRep_PG() {
)
require.NoError(s.t, err)

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

err = s.checkSyncedAt(dstSchemaQualified)
require.NoError(s.t, err)
}

func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() {
tc := e2e.NewTemporalClient(s.t)

numRows := 0

srcTable := "test_no_rows_qrep_pg_1"
Expand Down Expand Up @@ -323,7 +319,8 @@ func (s PeerFlowE2ETestSuitePG) Test_No_Rows_QRep_PG() {
)
require.NoError(s.t, err)

tc := e2e.NewTemporalClient(s.t)
env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())
}
4 changes: 2 additions & 2 deletions flow/e2e/s3/qrep_flow_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3() {
qrepConfig.StagingPath = s.s3Helper.s3Config.Url

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

// Verify destination has 1 file
Expand Down Expand Up @@ -170,7 +170,7 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_QRep_Flow_S3_CTID() {
qrepConfig.WatermarkColumn = "ctid"

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

// Verify destination has 1 file
Expand Down
12 changes: 6 additions & 6 deletions flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF() {
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_Simple()
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down Expand Up @@ -149,7 +149,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3() {
qrepConfig.SetupWatermarkTableOnDestination = true

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down Expand Up @@ -188,7 +188,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_Upsert_XMIN() {
require.NoError(s.t, err)

env := e2e.RunXminFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down Expand Up @@ -226,7 +226,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_QRep_Flow_Avro_SF_S3_Integration()
qrepConfig.SetupWatermarkTableOnDestination = true

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

sel := e2e.GetOwnersSelectorStringsSF()
Expand Down Expand Up @@ -264,7 +264,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_PeerDB_Columns_QRep_SF() {
require.NoError(s.t, err)

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

err = s.sfHelper.checkSyncedAt(fmt.Sprintf(`SELECT "_PEERDB_SYNCED_AT" FROM %s.%s`,
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/sqlserver/qrep_flow_sqlserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s PeerFlowE2ETestSuiteSQLServer) Test_Complete_QRep_Flow_SqlServer_Append(
}

env := e2e.RunQrepFlowWorkflow(tc, qrepConfig)
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "finish", env.Finished)
e2e.EnvWaitForFinished(s.t, env, 3*time.Minute)
require.NoError(s.t, env.Error())

// Verify that the destination table has the same number of rows as the source table
Expand Down
17 changes: 17 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,20 @@ func EnvWaitFor(t *testing.T, env WorkflowRun, timeout time.Duration, reason str
time.Sleep(time.Second)
}
}

func EnvWaitForFinished(t *testing.T, env WorkflowRun, timeout time.Duration) {
t.Helper()

EnvWaitFor(t, env, timeout, "finish", func() bool {
desc, err := env.c.DescribeWorkflowExecution(context.Background(), env.GetID(), "")
if err != nil {
t.Log("Not finished", err)
return false
}
status := desc.GetWorkflowExecutionInfo().GetStatus()
if status != enums.WORKFLOW_EXECUTION_STATUS_RUNNING {
t.Log("Not finished", status)
}
return status != enums.WORKFLOW_EXECUTION_STATUS_RUNNING
})
}

0 comments on commit ddcdfb1

Please sign in to comment.