Skip to content

Commit

Permalink
Try fix temporal dynamic typing errors
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Dec 28, 2023
1 parent 896b0f9 commit bbb9134
Showing 1 changed file with 5 additions and 36 deletions.
41 changes: 5 additions & 36 deletions flow/workflows/normalize_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,13 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"go.temporal.io/sdk/log"
"go.temporal.io/sdk/workflow"
)

type NormalizeFlowState struct {
CDCFlowName string
Progress []string
}

type NormalizeFlowExecution struct {
NormalizeFlowState
executionID string
logger log.Logger
}

func NewNormalizeFlowExecution(ctx workflow.Context, state *NormalizeFlowState) *NormalizeFlowExecution {
return &NormalizeFlowExecution{
NormalizeFlowState: *state,
executionID: workflow.GetInfo(ctx).WorkflowExecution.ID,
logger: workflow.GetLogger(ctx),
}
}

func NormalizeFlowWorkflow(ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) model.NormalizeFlowResponse {
s := NewNormalizeFlowExecution(ctx, &NormalizeFlowState{
CDCFlowName: config.FlowJobName,
Progress: []string{},
})

return s.executeNormalizeFlow(ctx, config)
}

func (s *NormalizeFlowExecution) executeNormalizeFlow(
ctx workflow.Context,
config *protos.FlowConnectionConfigs,
) model.NormalizeFlowResponse {
s.logger.Info("executing normalize flow - ", s.CDCFlowName)
) (*model.NormalizeFlowResponse, error) {
logger := workflow.GetLogger(ctx)

normalizeFlowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
StartToCloseTimeout: 7 * 24 * time.Hour,
Expand All @@ -58,6 +26,7 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow(
needSync := true
for {
if needSync {
logger.Info("executing normalize - ", config.FlowJobName)
startNormalizeInput := &protos.StartNormalizeInput{
FlowConnectionConfigs: config,
}
Expand Down Expand Up @@ -91,8 +60,8 @@ func (s *NormalizeFlowExecution) executeNormalizeFlow(
}
}

return model.NormalizeFlowResponse{
return &model.NormalizeFlowResponse{
Results: results,
Errors: errors,
}
}, nil
}

0 comments on commit bbb9134

Please sign in to comment.