diff --git a/core/redis-statestore/redis.go b/core/redis-statestore/redis.go index a8a6e60..9f0d707 100644 --- a/core/redis-statestore/redis.go +++ b/core/redis-statestore/redis.go @@ -14,11 +14,6 @@ type RedisStateStore struct { RetryCount int } -// Update Compare and Update a valuer -type Incrementer interface { - Incr(key string, value int64) (int64, error) -} - func GetRedisStateStore(redisUri string, password string) (sdk.StateStore, error) { stateStore := &RedisStateStore{} @@ -74,7 +69,7 @@ func (this *RedisStateStore) Update(key string, oldValue string, newValue string } // Update Compare and Update a valuer -func (this *RedisStateStore) Incr(key string, value int64) (int64, error) { +func (this *RedisStateStore) IncrementBy(key string, value int64) (int64, error) { key = this.KeyPath + "." + key client := this.rds return client.IncrBy(key, value).Result() diff --git a/core/sdk/executor/executor.go b/core/sdk/executor/executor.go index 7a98cd7..eddc7c5 100644 --- a/core/sdk/executor/executor.go +++ b/core/sdk/executor/executor.go @@ -187,7 +187,7 @@ func (fexec *FlowExecutor) getDynamicBranchOptions(nodeUniqueId string) ([]strin func (fexec *FlowExecutor) incrementCounter(counter string, incrementBy int) (int, error) { var serr error for i := 0; i < counterUpdateRetryCount; i++ { - count, err := fexec.stateStore.Incr(counter, int64(incrementBy)) + count, err := fexec.stateStore.IncrementBy(counter, int64(incrementBy)) if err != nil { serr = fmt.Errorf("failed to update counter %s, error %v", counter, err) continue diff --git a/core/sdk/types.go b/core/sdk/types.go index 56b569a..750a0b0 100644 --- a/core/sdk/types.go +++ b/core/sdk/types.go @@ -29,7 +29,7 @@ type StateStore interface { // Get a value Get(key string) (string, error) // Increase the value of key with a given increment - Incr(key string, value int64) (int64, error) + IncrementBy(key string, value int64) (int64, error) // Compare and Update a value Update(key string, oldValue string, newValue string) error // Cleanup all the resources in StateStore (called only once in a request span) diff --git a/dashboard/service.go b/dashboard/service.go index 0a41c03..80bdc34 100644 --- a/dashboard/service.go +++ b/dashboard/service.go @@ -14,10 +14,14 @@ import ( var rdb *redis.Client +const ( + FlowKeyInitial = "goflow.flow" +) + // listGoFLows get list of go-flows func listGoFLows() ([]*Flow, error) { rdb = getRDB() - command := rdb.Keys("goflow-flow:*") + command := rdb.Keys(FlowKeyInitial + ".*") rdb.Process(command) flowKeys, err := command.Result() if err != nil { @@ -41,7 +45,7 @@ func listGoFLows() ([]*Flow, error) { // getDot request to dot-generator for the dag dot graph func getDot(flowName string) (string, error) { rdb = getRDB() - command := rdb.Get("goflow-flow:" + flowName) + command := rdb.Get(FlowKeyInitial + "." + flowName) rdb.Process(command) definition, err := command.Result() if err != nil { diff --git a/operation/operation.go b/operation/operation.go index 559d11e..8e65fb2 100644 --- a/operation/operation.go +++ b/operation/operation.go @@ -99,4 +99,3 @@ func (operation *GoFlowOperation) GetProperties() map[string][]string { return result } - diff --git a/runtime/flow_runtime.go b/runtime/flow_runtime.go index ed5134a..e1dbc2c 100644 --- a/runtime/flow_runtime.go +++ b/runtime/flow_runtime.go @@ -19,11 +19,13 @@ import ( "github.com/s8sg/goflow/core/sdk/exporter" "github.com/s8sg/goflow/eventhandler" log2 "github.com/s8sg/goflow/log" + "github.com/s8sg/goflow/operation" "gopkg.in/redis.v5" ) type FlowRuntime struct { Flows map[string]FlowDefinitionHandler + Workloads map[string]operation.Modifier OpenTracingUrl string RedisURL string RedisPassword string @@ -42,9 +44,10 @@ type FlowRuntime struct { eventHandler sdk.EventHandler - taskQueues map[string]rmq.Queue - srv *http.Server - rdb *redis.Client + taskQueues map[string]rmq.Queue + workloadQueues map[string]rmq.Queue + srv *http.Server + rdb *redis.Client } type Worker struct { @@ -61,12 +64,17 @@ type Task struct { RawQuery string `json:"raw_query"` Query map[string][]string `json:"query"` RequestType string `json:"request_type"` + + WorkLoadName string `json:"workload_name"` + } const ( InternalRequestQueueInitial = "goflow-internal-request" - FlowKeyInitial = "goflow-flow" - WorkerKeyInitial = "goflow-worker" + WorkloadRequestQueueInitial = "goflow-workload-request" + FlowKeyInitial = "goflow.flow" + WorkerKeyInitial = "goflow.worker" + WorkloadKeyInitial = "goflow.workload" GoFlowRegisterInterval = 4 RDBKeyTimeOut = 10 @@ -329,6 +337,43 @@ func (fRuntime *FlowRuntime) StartQueueWorker(errorChan chan error) error { return err } +// StartWorkloadQueueWorker starts listening for request in queue +func (fRuntime *FlowRuntime) StartWorkloadQueueWorker(errorChan chan error) error { + connection, err := OpenConnectionV2("goflow", "tcp", fRuntime.RedisURL, fRuntime.RedisPassword, 0, nil) + if err != nil { + return fmt.Errorf("failed to initiate connection, error %v", err) + } + + fRuntime.taskQueues = make(map[string]rmq.Queue) + for workloadName := range fRuntime.Workloads { + workloadQueue, err := connection.OpenQueue(fRuntime.workloadRequestQueueId(workloadName)) + if err != nil { + return fmt.Errorf("failed to open queue, error %v", err) + } + + err = workloadQueue.StartConsuming(10, time.Second) + if err != nil { + return fmt.Errorf("failed to start workload consumer taskQueue, error %v", err) + } + fRuntime.workloadQueues[workloadName] = workloadQueue + + index := 0 + for index < fRuntime.Concurrency { + _, err := workloadQueue.AddConsumer(fmt.Sprintf("workload-request-consumer-%d", index), fRuntime) + if err != nil { + return fmt.Errorf("failed to add consumer, error %v", err) + } + index++ + } + } + + fRuntime.Logger.Log("[goflow] workload queue worker started successfully") + + err = <-errorChan + <-connection.StopAllConsuming() + return err +} + // StartRuntime starts the runtime func (fRuntime *FlowRuntime) StartRuntime() error { worker := &Worker{ @@ -374,6 +419,28 @@ func (fRuntime *FlowRuntime) StartRuntime() error { return fmt.Errorf("[goflow] runtime stopped") } +// StartWorkloadRuntime starts the runtime +func (fRuntime *FlowRuntime) StartWorkloadRuntime() error { + err := fRuntime.saveWorkLoadDetails(fRuntime.Workloads) + if err != nil { + return fmt.Errorf("failed to register worker details, %v", err) + } + err = gocron.Every(GoFlowRegisterInterval).Second().Do(func() { + var err error + err = fRuntime.saveWorkLoadDetails(fRuntime.Workloads) + if err != nil { + log.Printf("failed to register worker details, %v", err) + } + }) + if err != nil { + return fmt.Errorf("failed to start runtime, %v", err) + } + + <-gocron.Start() + + return fmt.Errorf("[goflow] workload runtime stopped") +} + func (fRuntime *FlowRuntime) EnqueuePartialRequest(pr *runtime.Request) error { data, _ := json.Marshal(&Task{ FlowName: pr.FlowName, @@ -524,13 +591,26 @@ func (fRuntime *FlowRuntime) internalRequestQueueId(flowName string) string { return fmt.Sprintf("%s:%s", InternalRequestQueueInitial, flowName) } +func (fRuntime *FlowRuntime) workloadRequestQueueId(workloadName string) string { + return fmt.Sprintf("%s:%s", WorkloadRequestQueueInitial, workloadName) +} + func (fRuntime *FlowRuntime) requestQueueId(flowName string) string { return flowName } +func (fRuntime *FlowRuntime) saveWorkLoadDetails(workloads map[string]operation.Modifier) error { + rdb := fRuntime.rdb + for workloadName, _ := range workloads { + key := fmt.Sprintf(WorkloadKeyInitial+".%s", workloadName) + rdb.Set(key, workloadName, time.Second*RDBKeyTimeOut) + } + return nil +} + func (fRuntime *FlowRuntime) saveWorkerDetails(worker *Worker) error { rdb := fRuntime.rdb - key := fmt.Sprintf("%s:%s", WorkerKeyInitial, worker.ID) + key := fmt.Sprintf(WorkerKeyInitial+".%s", worker.ID) value := marshalWorker(worker) rdb.Set(key, value, time.Second*RDBKeyTimeOut) return nil @@ -539,7 +619,7 @@ func (fRuntime *FlowRuntime) saveWorkerDetails(worker *Worker) error { func (fRuntime *FlowRuntime) saveFlowDetails(flows map[string]string) error { rdb := fRuntime.rdb for flowId, definition := range flows { - key := fmt.Sprintf("%s:%s", FlowKeyInitial, flowId) + key := fmt.Sprintf(FlowKeyInitial+".%s", flowId) rdb.Set(key, definition, time.Second*RDBKeyTimeOut) } return nil diff --git a/v1/goflow.go b/v1/goflow.go index 03b4dea..e7bd347 100644 --- a/v1/goflow.go +++ b/v1/goflow.go @@ -7,6 +7,7 @@ import ( runtimePkg "github.com/s8sg/goflow/core/runtime" "github.com/s8sg/goflow/core/sdk" "github.com/s8sg/goflow/runtime" + "github.com/s8sg/goflow/operation" ) type FlowService struct { @@ -18,6 +19,7 @@ type FlowService struct { WorkerConcurrency int RetryCount int Flows map[string]runtime.FlowDefinitionHandler + Workloads map[string]operation.Modifier RequestReadTimeout time.Duration RequestWriteTimeout time.Duration OpenTraceUrl string @@ -160,6 +162,28 @@ func (fs *FlowService) Stop(flowName string, requestId string) error { return nil } + +func (fs *FlowService) RegisterWorkload(workloadName string, handler operation.Modifier) error { + if workloadName == "" { + return fmt.Errorf("workload-name must not be empty") + } + if handler == nil { + return fmt.Errorf("handler must not be nil") + } + + if fs.Workloads == nil { + fs.Workloads = make(map[string]operation.Modifier) + } + + if fs.Workloads[workloadName] != nil { + return fmt.Errorf("duplicate workload registration, workload-name must be unique for each workload") + } + + fs.Workloads[workloadName] = handler + + return nil +} + func (fs *FlowService) Register(flowName string, handler runtime.FlowDefinitionHandler) error { if flowName == "" { return fmt.Errorf("flow-name must not be empty") @@ -173,7 +197,7 @@ func (fs *FlowService) Register(flowName string, handler runtime.FlowDefinitionH } if fs.Flows[flowName] != nil { - return fmt.Errorf("flow-name must be unique for each flow") + return fmt.Errorf("duplicate flow registration, flow-name must be unique for each flow") } fs.Flows[flowName] = handler @@ -271,6 +295,33 @@ func (fs *FlowService) StartWorker() error { return fmt.Errorf("worker has stopped, error: %v", err) } +func (fs *FlowService) StartWorkLoad() error { + fs.ConfigureDefault() + fs.runtime = &runtime.FlowRuntime{ + Workloads: fs.Workloads, + OpenTracingUrl: fs.OpenTraceUrl, + RedisURL: fs.RedisURL, + RedisPassword: fs.RedisPassword, + DataStore: fs.DataStore, + Logger: fs.Logger, + Concurrency: fs.WorkerConcurrency, + RequestAuthSharedSecret: fs.RequestAuthSharedSecret, + RequestAuthEnabled: fs.RequestAuthEnabled, + EnableMonitoring: fs.EnableMonitoring, + RetryQueueCount: fs.RetryCount, + DebugEnabled: fs.DebugEnabled, + } + errorChan := make(chan error) + defer close(errorChan) + if err := fs.initRuntime(); err != nil { + return err + } + go fs.workloadRuntimeWorker(errorChan) + go fs.workloadQueueWorker(errorChan) + err := <-errorChan + return fmt.Errorf("workload has stopped, error: %v", err) +} + func (fs *FlowService) ConfigureDefault() { if fs.OpenTraceUrl == "" { fs.OpenTraceUrl = DefaultTraceUrl @@ -305,11 +356,21 @@ func (fs *FlowService) runtimeWorker(errorChan chan error) { errorChan <- fmt.Errorf("runtime has stopped, error: %v", err) } +func (fs *FlowService) workloadRuntimeWorker(errorChan chan error) { + err := fs.runtime.StartWorkloadRuntime() + errorChan <- fmt.Errorf("runtime has stopped, error: %v", err) +} + func (fs *FlowService) queueWorker(errorChan chan error) { err := fs.runtime.StartQueueWorker(errorChan) errorChan <- fmt.Errorf("worker has stopped, error: %v", err) } +func (fs *FlowService) workloadQueueWorker(errorChan chan error) { + err := fs.runtime.StartWorkloadQueueWorker(errorChan) + errorChan <- fmt.Errorf("workload has stopped, error: %v", err) +} + func (fs *FlowService) server(errorChan chan error) { err := fs.runtime.StartServer() errorChan <- fmt.Errorf("server has stopped, error: %v", err)