Skip to content

Commit

Permalink
refactored PeerFlow -> CDCFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Sep 12, 2023
1 parent 9529f86 commit f7947e0
Show file tree
Hide file tree
Showing 21 changed files with 353 additions and 504 deletions.
121 changes: 0 additions & 121 deletions flow/activities/fetch_config.go

This file was deleted.

24 changes: 12 additions & 12 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func NewFlowRequestHandler(temporalClient client.Client) *FlowRequestHandler {
}
}

func (h *FlowRequestHandler) CreatePeerFlow(
ctx context.Context, req *protos.CreatePeerFlowRequest) (*protos.CreatePeerFlowResponse, error) {
func (h *FlowRequestHandler) CreateCDCFlow(
ctx context.Context, req *protos.CreateCDCFlowRequest) (*protos.CreateCDCFlowResponse, error) {
cfg := req.ConnectionConfigs
workflowID := fmt.Sprintf("%s-peerflow-%s", cfg.FlowJobName, uuid.New())
workflowOptions := client.StartWorkflowOptions{
Expand All @@ -38,26 +38,26 @@ func (h *FlowRequestHandler) CreatePeerFlow(
cfg.MaxBatchSize = uint32(maxBatchSize)
}

limits := &peerflow.PeerFlowLimits{
limits := &peerflow.CDCFlowLimits{
TotalSyncFlows: 0,
TotalNormalizeFlows: 0,
MaxBatchSize: maxBatchSize,
}

state := peerflow.NewStartedPeerFlowState()
state := peerflow.NewCDCFlowState()
_, err := h.temporalClient.ExecuteWorkflow(
ctx, // context
workflowOptions, // workflow start options
peerflow.PeerFlowWorkflowWithConfig, // workflow function
cfg, // workflow input
limits, // workflow limits
state, // workflow state
ctx, // context
workflowOptions, // workflow start options
peerflow.CDCFlowWorkflowWithConfig, // workflow function
cfg, // workflow input
limits, // workflow limits
state, // workflow state
)
if err != nil {
return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err)
}

return &protos.CreatePeerFlowResponse{
return &protos.CreateCDCFlowResponse{
WorflowId: workflowID,
}, nil
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (h *FlowRequestHandler) ShutdownFlow(
ctx,
req.WorkflowId,
"",
shared.PeerFlowSignalName,
shared.CDCFlowSignalName,
shared.ShutdownSignal,
)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions flow/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,13 @@ func WorkerMain(opts *WorkerOptions) error {
defer c.Close()

w := worker.New(c, shared.PeerFlowTaskQueue, worker.Options{})
w.RegisterWorkflow(peerflow.PeerFlowWorkflow)
w.RegisterWorkflow(peerflow.PeerFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.CDCFlowWorkflowWithConfig)
w.RegisterWorkflow(peerflow.SyncFlowWorkflow)
w.RegisterWorkflow(peerflow.SetupFlowWorkflow)
w.RegisterWorkflow(peerflow.NormalizeFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepFlowWorkflow)
w.RegisterWorkflow(peerflow.QRepPartitionWorkflow)
w.RegisterWorkflow(peerflow.DropFlowWorkflow)
w.RegisterActivity(&activities.FetchConfigActivity{})
w.RegisterActivity(&activities.FlowableActivity{
EnableMetrics: opts.EnableMetrics,
CatalogMirrorMonitor: &catalogMirrorMonitor,
Expand Down
Loading

0 comments on commit f7947e0

Please sign in to comment.