Skip to content

Commit

Permalink
enable Register() to support dynamically registering flow
Browse files Browse the repository at this point in the history
  • Loading branch information
danli001 authored and s8sg committed Jul 29, 2023
1 parent 1564016 commit d3eded4
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 123 deletions.
167 changes: 119 additions & 48 deletions runtime/flow_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type FlowRuntime struct {
EnableMonitoring bool
RetryQueueCount int
DebugEnabled bool
workerMode bool

eventHandler sdk.EventHandler

Expand Down Expand Up @@ -101,6 +102,11 @@ func (fRuntime *FlowRuntime) Init() error {
}
}

fRuntime.rmqConnection, err = OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
if err != nil {
return fmt.Errorf("failed to initiate rmq connection, error %v", err)
}

if fRuntime.Logger == nil {
fRuntime.Logger = &log2.StdErrLogger{}
}
Expand Down Expand Up @@ -133,28 +139,79 @@ func (fRuntime *FlowRuntime) CreateExecutor(req *runtime.Request) (executor.Exec
return ex, err
}

// AppendFlows appends flows to the runtime
// If the queue worker not started or the flow is already registered, it returns an error
func (fRuntime *FlowRuntime) AppendFlows(flows map[string]FlowDefinitionHandler) error {

// Register flows to the runtime
// If the flow is already registered, it returns an error
func (fRuntime *FlowRuntime) Register(flows map[string]FlowDefinitionHandler) error {
if reflect.ValueOf(fRuntime.rmqConnection).IsNil() {
return fmt.Errorf("unable to append flows, queue worker not started")
return fmt.Errorf("unable to register flows, rmq connection not initialized")
}

if len(flows) == 0 {
return nil
}

var flowNames []string
for flowName := range flows {
if _, ok := fRuntime.Flows[flowName]; ok {
return fmt.Errorf("flow %s already registered", flowName)
}

flowNames = append(flowNames, flowName)
}

// register flows to runtime
for flowName, flowHandler := range flows {
fRuntime.Flows[flowName] = flowHandler
}

err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, flows)
// initialize task queues when in worker mode
if fRuntime.workerMode {
err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, flows)
if err != nil {
return fmt.Errorf(fmt.Sprintf("failed to initialize task queues for flows %v, error %v", flowNames, err))
}
}

fRuntime.Logger.Log(fmt.Sprintf("[goflow] queue workers for flows %v started successfully", flowNames))

return nil
}

// EnterWorkerMode put the runtime into worker mode
func (fRuntime *FlowRuntime) EnterWorkerMode() error {
if reflect.ValueOf(fRuntime.rmqConnection).IsNil() {
return fmt.Errorf("unable to enter worker mode, rmq connection not initialized")
}

if fRuntime.workerMode {
// already in worker mode
return nil
}
fRuntime.workerMode = true

err := fRuntime.initializeTaskQueues(&fRuntime.rmqConnection, fRuntime.Flows)
if err != nil {
return fmt.Errorf("failed to enter worker mode, error: " + err.Error())
}

return nil
}

// ExitWorkerMode take the runtime out of worker mode
func (fRuntime *FlowRuntime) ExitWorkerMode() error {
if reflect.ValueOf(fRuntime.rmqConnection).IsNil() {
return nil
}

if !fRuntime.workerMode {
// already not in worker mode
return nil
}
fRuntime.workerMode = false

err := fRuntime.cleanTaskQueues()
if err != nil {
return fmt.Errorf("failed to initialize task queues, error %v", err)
return fmt.Errorf("failed to exit worker mode, error: " + err.Error())
}

return nil
Expand Down Expand Up @@ -289,60 +346,55 @@ func (fRuntime *FlowRuntime) StopServer() error {
return nil
}

// StartQueueWorker starts listening for request in queue
func (fRuntime *FlowRuntime) StartQueueWorker(errorChan chan error) error {
connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil)
if err != nil {
return fmt.Errorf("failed to initiate connection, error %v", err)
}
fRuntime.rmqConnection = connection

err = fRuntime.initializeTaskQueues(&connection, fRuntime.Flows)
if err != nil {
return fmt.Errorf("failed to initiate task queues, error %v", err)
}

fRuntime.Logger.Log("[goflow] queue worker started successfully")

err = <-errorChan
<-fRuntime.rmqConnection.StopAllConsuming()
return err
}

// StartRuntime starts the runtime
func (fRuntime *FlowRuntime) StartRuntime() error {
worker := &Worker{
ID: getNewId(),
Flows: make([]string, 0, len(fRuntime.Flows)),
Concurrency: fRuntime.Concurrency,
}
// Get the flow details for each flow
flowDetails := make(map[string]string)
for flowID, defHandler := range fRuntime.Flows {
worker.Flows = append(worker.Flows, flowID)
dag, err := getFlowDefinition(defHandler)

registerDetails := func() error {
// Get the flow details for each flow
flowDetails := make(map[string]string)
for flowID, defHandler := range fRuntime.Flows {
worker.Flows = append(worker.Flows, flowID)
dag, err := getFlowDefinition(defHandler)
if err != nil {
return fmt.Errorf("failed to start runtime, dag export failed, error %v", err)
}
flowDetails[flowID] = dag
}

if fRuntime.workerMode {
err := fRuntime.saveWorkerDetails(worker)
if err != nil {
return fmt.Errorf("failed to register worker details, %v", err)
}
} else {
err := fRuntime.deleteWorkerDetails(worker)
if err != nil {
return fmt.Errorf("failed to deregister worker details, %v", err)
}
}

err := fRuntime.saveFlowDetails(flowDetails)
if err != nil {
return fmt.Errorf("failed to start runtime, dag export failed, error %v", err)
return fmt.Errorf("failed to register flow details, %v", err)
}
flowDetails[flowID] = dag
}
err := fRuntime.saveWorkerDetails(worker)
if err != nil {
return fmt.Errorf("failed to register worker details, %v", err)

return nil
}
err = fRuntime.saveFlowDetails(flowDetails)

err := registerDetails()
if err != nil {
return fmt.Errorf("failed to register worker details, %v", err)
log.Printf("failed to register details, %v", err)
return err
}

err = gocron.Every(GoFlowRegisterInterval).Second().Do(func() {
var err error
err = fRuntime.saveWorkerDetails(worker)
err := registerDetails()
if err != nil {
log.Printf("failed to register worker details, %v", err)
}
err = fRuntime.saveFlowDetails(flowDetails)
if err != nil {
log.Printf("failed to register worker details, %v", err)
log.Printf("failed to register details, %v", err)
}
})
if err != nil {
Expand Down Expand Up @@ -563,6 +615,18 @@ func (fRuntime *FlowRuntime) initializeTaskQueues(connection *rmq.Connection, fl
return nil
}

func (fRuntime *FlowRuntime) cleanTaskQueues() error {

if !reflect.ValueOf(fRuntime.rmqConnection).IsNil() {
endChan := fRuntime.rmqConnection.StopAllConsuming()
<-endChan
}

fRuntime.taskQueues = map[string]rmq.Queue{}

return nil
}

func (fRuntime *FlowRuntime) internalRequestQueueId(flowName string) string {
return fmt.Sprintf("%s:%s", InternalRequestQueueInitial, flowName)
}
Expand All @@ -579,6 +643,13 @@ func (fRuntime *FlowRuntime) saveWorkerDetails(worker *Worker) error {
return nil
}

func (fRuntime *FlowRuntime) deleteWorkerDetails(worker *Worker) error {
rdb := fRuntime.rdb
key := fmt.Sprintf("%s:%s", WorkerKeyInitial, worker.ID)
rdb.Del(key)
return nil
}

func (fRuntime *FlowRuntime) saveFlowDetails(flows map[string]string) error {
rdb := fRuntime.rdb
for flowId, definition := range flows {
Expand Down
Loading

0 comments on commit d3eded4

Please sign in to comment.