diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index 448abbca2c..d33f9a3f5c 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -241,6 +241,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { @@ -279,6 +280,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } // Test_Complete_Simple_Flow_BQ tests a complete flow with data in the source table. @@ -331,6 +333,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { @@ -389,6 +392,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { @@ -453,6 +457,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { @@ -509,6 +514,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { @@ -565,6 +571,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { @@ -664,6 +671,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { @@ -708,6 +716,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { @@ -792,6 +801,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { @@ -851,6 +861,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } // TODO: not checking schema exactly, add later @@ -937,6 +948,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { @@ -999,6 +1011,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { @@ -1063,6 +1076,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { @@ -1124,6 +1138,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { @@ -1177,6 +1192,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { @@ -1239,6 +1255,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) require.NoError(s.t, s.bqHelper.DropDataset(secondDataset)) } @@ -1316,6 +1333,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf( "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", @@ -1403,6 +1421,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { @@ -1483,6 +1502,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf( "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", @@ -1564,6 +1584,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf( "SELECT COUNT(*) FROM `%s.%s` WHERE _PEERDB_IS_DELETED", diff --git a/flow/e2e/postgres/peer_flow_pg_test.go b/flow/e2e/postgres/peer_flow_pg_test.go index 74f49f04f9..74f6a39c6d 100644 --- a/flow/e2e/postgres/peer_flow_pg_test.go +++ b/flow/e2e/postgres/peer_flow_pg_test.go @@ -126,6 +126,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { @@ -172,6 +173,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { @@ -238,6 +240,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { @@ -288,6 +291,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { @@ -474,6 +478,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { @@ -536,6 +541,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { @@ -606,6 +612,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { @@ -675,6 +682,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { @@ -730,6 +738,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { @@ -806,6 +815,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) // verify our updates and delete happened err = s.comparePGTables(cmpTableName, dstTableName, "id,c1,c2,t") @@ -899,6 +909,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { @@ -975,6 +986,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) // verify our updates and delete happened require.NoError(s.t, err) @@ -1055,6 +1067,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) softDeleteQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE "_PEERDB_IS_DELETED"`, diff --git a/flow/e2e/s3/cdc_s3_test.go b/flow/e2e/s3/cdc_s3_test.go index c968bc2dfb..c9a537bdf0 100644 --- a/flow/e2e/s3/cdc_s3_test.go +++ b/flow/e2e/s3/cdc_s3_test.go @@ -72,4 +72,5 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 211a387634..64dd662268 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -170,6 +170,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Complete_Simple_Flow_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) // check the number of rows where _PEERDB_SYNCED_AT is newer than 5 mins ago // it should match the count. @@ -236,6 +237,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Flow_ReplicaIdentity_Index_No_Pkey() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { @@ -319,6 +321,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Invalid_Geo_SF_Avro_CDC() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { @@ -375,6 +378,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { @@ -437,6 +441,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_1_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { @@ -492,6 +497,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_2_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { @@ -548,6 +554,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Toast_Advance_3_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { @@ -646,6 +653,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { @@ -703,6 +711,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Multi_Table_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { @@ -920,6 +929,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Simple_Schema_Changes_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { @@ -979,6 +989,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { @@ -1043,6 +1054,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_1_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { @@ -1104,6 +1116,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Composite_PKey_Toast_2_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { @@ -1175,6 +1188,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Column_Exclusion() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) sfRows, err := s.GetRows(tableName, "*") require.NoError(s.t, err) @@ -1256,6 +1270,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Basic() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) @@ -1342,6 +1357,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_IUD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { @@ -1429,6 +1445,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_UD_Same_Batch() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { @@ -1502,6 +1519,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Soft_Delete_Insert_After_Delete() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) newerSyncedAtQuery := fmt.Sprintf(` SELECT COUNT(*) FROM %s WHERE _PEERDB_IS_DELETED`, dstTableName) @@ -1566,4 +1584,5 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() { }() env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, &limits, nil) + e2e.RequireEnvCanceled(s.t, env) } diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 857072a2a0..3baa05b3ee 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -18,6 +18,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/require" + "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" "github.com/PeerDB-io/peer-flow/activities" @@ -164,6 +165,16 @@ func EnvWaitForEqualTablesWithNames( }) } +func RequireEnvCanceled(t *testing.T, env *testsuite.TestWorkflowEnvironment) { + t.Helper() + err := env.GetWorkflowError() + if err == nil { + t.Fatal("Expected workflow to be canceled, not completed") + } else if _, ok := errors.Unwrap(err).(*temporal.CanceledError); !ok { + t.Fatalf("Expected workflow to be canceled, not %v", err) + } +} + func SetupCDCFlowStatusQuery(t *testing.T, env *testsuite.TestWorkflowEnvironment, connectionGen FlowConnectionGenerationConfig, ) { @@ -410,7 +421,6 @@ func RunQrepFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos. func RunXminFlowWorkflow(env *testsuite.TestWorkflowEnvironment, config *protos.QRepConfig) { state := peerflow.NewQRepFlowState() state.LastPartition.PartitionId = uuid.New().String() - time.Sleep(5 * time.Second) env.ExecuteWorkflow(peerflow.XminFlowWorkflow, config, state) }