Skip to content

Commit

Permalink
e2e: require workflows exit with canceled error when expected to be c…
Browse files Browse the repository at this point in the history
…anceled (#1214)

Thinking in #1211 the workflow is exiting with an error
which we're then ignoring & letting test pass but cleanup fail
  • Loading branch information
serprex authored Feb 7, 2024
1 parent 199ce12 commit 202ca2b
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 1 deletion.
21 changes: 21 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
Expand Down Expand Up @@ -279,6 +280,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

// Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table.
Expand Down Expand Up @@ -331,6 +333,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
Expand Down Expand Up @@ -389,6 +392,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
Expand Down Expand Up @@ -453,6 +457,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
Expand Down Expand Up @@ -509,6 +514,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
Expand Down Expand Up @@ -565,6 +571,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
Expand Down Expand Up @@ -664,6 +671,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
Expand Down Expand Up @@ -708,6 +716,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
Expand Down Expand Up @@ -792,6 +801,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
Expand Down Expand Up @@ -851,6 +861,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

// TODO: not checking schema exactly, add later
Expand Down Expand Up @@ -937,6 +948,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
Expand Down Expand Up @@ -999,6 +1011,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
Expand Down Expand Up @@ -1063,6 +1076,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
Expand Down Expand Up @@ -1124,6 +1138,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
Expand Down Expand Up @@ -1177,6 +1192,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
Expand Down Expand Up @@ -1239,6 +1255,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

require.NoError(s.t, s.bqHelper.DropDataset(secondDataset))
}
Expand Down Expand Up @@ -1316,6 +1333,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
Expand Down Expand Up @@ -1403,6 +1421,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
Expand Down Expand Up @@ -1483,6 +1502,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
Expand Down Expand Up @@ -1564,6 +1584,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

newerSyncedAtQuery := fmt.Sprintf(
"SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED",
Expand Down
13 changes: 13 additions & 0 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
Expand Down Expand Up @@ -172,6 +173,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
Expand Down Expand Up @@ -238,6 +240,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
Expand Down Expand Up @@ -288,6 +291,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
Expand Down Expand Up @@ -474,6 +478,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
Expand Down Expand Up @@ -536,6 +541,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
Expand Down Expand Up @@ -606,6 +612,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
Expand Down Expand Up @@ -675,6 +682,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
Expand Down Expand Up @@ -730,6 +738,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
Expand Down Expand Up @@ -806,6 +815,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

// verify our updates and delete happened
err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t")
Expand Down Expand Up @@ -899,6 +909,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
Expand Down Expand Up @@ -975,6 +986,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

// verify our updates and delete happened
require.NoError(s.t, err)
Expand Down Expand Up @@ -1055,6 +1067,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)

softDeleteQuery := fmt.Sprintf(`
SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`,
Expand Down
1 change: 1 addition & 0 deletions flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil)
e2e.RequireEnvCanceled(s.t, env)
}
Loading

0 comments on commit 202ca2b

Please sign in to comment.