Skip to content

Commit

Permalink
Rename CDCFlowWorkflowWithConfig to CDCFlowWorkflow (#1380)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Feb 26, 2024
1 parent 89eee70 commit 8ff37bf
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 76 deletions.
15 changes: 2 additions & 13 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,7 @@ func (h *FlowRequestHandler) CreateCDCFlow(
return nil, fmt.Errorf("unable to update flow config in catalog: %w", err)
}

_, err = h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.CDCFlowWorkflowWithConfig, // workflow function
cfg, // workflow input
nil, // workflow state
)
_, err = h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, cfg, nil)
if err != nil {
slog.Error("unable to start PeerFlow workflow", slog.Any("error", err))
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
Expand Down Expand Up @@ -333,12 +327,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
shared.MirrorNameSearchAttribute: req.FlowJobName,
},
}
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.DropFlowWorkflow, // workflow function
req, // workflow input
)
dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, req)
if err != nil {
slog.Error("unable to start DropFlow workflow",
logs,
Expand Down
44 changes: 22 additions & 22 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func setupSuite(t *testing.T) PeerFlowE2ETestSuiteBQ {
func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Connection_Config() {
env := e2e.NewTemporalTestWorkflowEnvironment(s.t)

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, nil, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, nil, nil)

// Verify workflow completes
require.True(s.t, env.IsWorkflowCompleted())
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -274,7 +274,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -324,7 +324,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Simple_Flow_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -380,7 +380,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -442,7 +442,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -496,7 +496,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -550,7 +550,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -647,7 +647,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -689,7 +689,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -771,7 +771,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -828,7 +828,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -912,7 +912,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -972,7 +972,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -1034,7 +1034,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -1093,7 +1093,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -1144,7 +1144,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -1204,7 +1204,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)

require.NoError(s.t, s.bqHelper.DropDataset(secondDataset))
Expand Down Expand Up @@ -1279,7 +1279,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Basic() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)

newerSyncedAtQuery := fmt.Sprintf(
Expand Down Expand Up @@ -1364,7 +1364,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_IUD_Same_Batch() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -1442,7 +1442,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_UD_Same_Batch() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)

newerSyncedAtQuery := fmt.Sprintf(
Expand Down Expand Up @@ -1521,7 +1521,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)

newerSyncedAtQuery := fmt.Sprintf(
Expand Down
30 changes: 15 additions & 15 deletions flow/e2e/postgres/peer_flow_pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Flow_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Geospatial_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -234,7 +234,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Types_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -282,7 +282,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Enums_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -466,7 +466,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Simple_Schema_Changes_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -526,7 +526,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -594,7 +594,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -661,7 +661,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -714,7 +714,7 @@ func (s PeerFlowE2ETestSuitePG) Test_PeerDB_Columns() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -788,7 +788,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Basic() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)

// verify our updates and delete happened
Expand Down Expand Up @@ -879,7 +879,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_IUD_Same_Batch() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)
}

Expand Down Expand Up @@ -954,7 +954,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_UD_Same_Batch() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)

// verify our updates and delete happened
Expand Down Expand Up @@ -1032,7 +1032,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Soft_Delete_Insert_After_Delete() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)

softDeleteQuery := fmt.Sprintf(`
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Supported_Mixed_Case_Table() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
}

// test don't work, make it work later
Expand Down Expand Up @@ -1302,6 +1302,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Dynamic_Mirror_Config_Via_Signals() {
}
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, config, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, config, nil)
e2e.RequireEnvCanceled(s.t, env)
}
2 changes: 1 addition & 1 deletion flow/e2e/s3/cdc_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ func (s PeerFlowE2ETestSuiteS3) Test_Complete_Simple_Flow_S3() {
env.CancelWorkflow()
}()

env.ExecuteWorkflow(peerflow.CDCFlowWorkflowWithConfig, flowConnConfig, nil)
env.ExecuteWorkflow(peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.RequireEnvCanceled(s.t, env)
}
Loading

0 comments on commit 8ff37bf

Please sign in to comment.