Skip to content

Commit

Permalink
feat: initial workflow
Browse files Browse the repository at this point in the history
Signed-off-by: mikeee <[email protected]>
  • Loading branch information
mikeee committed Dec 17, 2023
1 parent ae8becf commit 61356f4
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 0 deletions.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,23 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.1
github.com/microsoft/durabletask-go v0.3.1
github.com/stretchr/testify v1.8.4
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/marusama/semaphore/v2 v2.5.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/otel v1.16.0 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.opentelemetry.io/otel/trace v1.16.0 // indirect
golang.org/x/net v0.15.0 // indirect
golang.org/x/sys v0.12.0 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.12.1-0.20231030205344-441017b888c5 h1:IlC2/2TemJw3dC1P8DsFZ4/ANl6IojDr50B7B8dIGIk=
github.com/dapr/dapr v1.12.1-0.20231030205344-441017b888c5/go.mod h1:zHcMel+UwYnMWfvJwpaDr43p95JteXyvBsSjXNnPU+c=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.0.10 h1:rLz5avzKpjqxrYwXNfmjkrYYXOyLJd37pz53UFHC6vk=
github.com/go-chi/chi/v5 v5.0.10/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
Expand All @@ -20,13 +27,19 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
github.com/microsoft/durabletask-go v0.3.1 h1:Y7RrPefd4cz5GMxjMx/Zvf9r5INombNlzI0DaQd994k=
github.com/microsoft/durabletask-go v0.3.1/go.mod h1:t3u0iRvIadT1y4MD5cUG0mbTOqgANT6IFcLogv7o0M0=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down
13 changes: 13 additions & 0 deletions workflow/activity_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package workflow

import (
"github.com/microsoft/durabletask-go/task"
)

type ActivityContext struct {
ctx task.ActivityContext
}

func (wfac *ActivityContext) GetInput(v interface{}) error {
return wfac.ctx.GetInput(&v)
}
60 changes: 60 additions & 0 deletions workflow/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package workflow

import (
"fmt"
"log"
"time"

"github.com/microsoft/durabletask-go/task"
)

type Context struct {
orchestrationContext *task.OrchestrationContext
}

func (wfc *Context) GetInput(v interface{}) error {
return wfc.orchestrationContext.GetInput(&v)
}

func (wfc *Context) Name() string {
return wfc.orchestrationContext.Name
}

func (wfc *Context) InstanceID() string {
return fmt.Sprintf("%v", wfc.orchestrationContext.ID)
}

func (wfc *Context) CurrentUTCDateTime() time.Time {
return wfc.orchestrationContext.CurrentTimeUtc
}

func (wfc *Context) IsReplaying() bool {
return wfc.orchestrationContext.IsReplaying
}

func (wfc *Context) CallActivity(activity interface{}) task.Task {
var inp string
if err := wfc.GetInput(&inp); err != nil {
log.Printf("unable to get activity input: %v", err)
}
// the call should continue despite being unable to obtain an input

return wfc.orchestrationContext.CallActivity(activity, task.WithActivityInput(inp))
}

func (wfc *Context) CallChildWorkflow() {
// TODO: implement
// call suborchestrator
}

func (wfc *Context) CreateTimer() {
// TODO: implement
}

func (wfc *Context) WaitForExternalEvent() {
// TODO: implement
}

func (wfc *Context) ContinueAsNew() {
// TODO: implement
}
131 changes: 131 additions & 0 deletions workflow/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package workflow

import (
"context"
"errors"
"fmt"
"log"
"reflect"
"runtime"
"strings"
"sync"
"time"

"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/client"
"github.com/microsoft/durabletask-go/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type WorkflowRuntime struct {
tasks *task.TaskRegistry
client *client.TaskHubGrpcClient

mutex sync.Mutex // TODO: implement
quit chan bool
cancel context.CancelFunc
}

type Workflow func(ctx *Context) (any, error)

type Activity func(ctx ActivityContext) (any, error)

func NewRuntime(host string, port string) (*WorkflowRuntime, error) {
ctx, canc := context.WithTimeout(context.Background(), time.Second*10)
defer canc()

address := fmt.Sprintf("%s:%s", host, port)

clientConn, err := grpc.DialContext(
ctx,
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(), // TODO: config
)
if err != nil {
return nil, fmt.Errorf("failed to create runtime - grpc connection failed: %v", err)
}

return &WorkflowRuntime{
tasks: task.NewTaskRegistry(),
client: client.NewTaskHubGrpcClient(clientConn, backend.DefaultLogger()),
quit: make(chan bool),
cancel: canc,
}, nil
}

func getDecorator(f interface{}) (string, error) {
if f == nil {
return "", errors.New("nil function name")
}

callSplit := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")

funcName := callSplit[len(callSplit)-1]

return funcName, nil
}

func (wr *WorkflowRuntime) RegisterWorkflow(w Workflow) error {
wrappedOrchestration := func(ctx *task.OrchestrationContext) (any, error) {
wfCtx := &Context{orchestrationContext: ctx}

return w(wfCtx)
}

// getdecorator for workflow
name, err := getDecorator(w)
if err != nil {
return fmt.Errorf("failed to get workflow decorator: %v", err)
}

err = wr.tasks.AddOrchestratorN(name, wrappedOrchestration)
return err
}

func (wr *WorkflowRuntime) RegisterActivity(a Activity) error {
wrappedActivity := func(ctx task.ActivityContext) (any, error) {
ac := ActivityContext{ctx: ctx}

return a(ac)
}

// getdecorator for activity
name, err := getDecorator(a)
if err != nil {
return fmt.Errorf("failed to get activity decorator: %v", err)
}

err = wr.tasks.AddActivityN(name, wrappedActivity)
return err
}

func (wr *WorkflowRuntime) Start() error {
// go func start
go func() {
err := wr.client.StartWorkItemListener(context.Background(), wr.tasks)
if err != nil {
log.Fatalf("failed to start work stream: %v", err)
}
for {
select {
case <-wr.quit:
return
default:
// continue serving
}
}
}()

return nil
}

func (wr *WorkflowRuntime) Shutdown() error {
// cancel grpc context
wr.cancel()
// send close signal
wr.quit <- true

return nil
}
28 changes: 28 additions & 0 deletions workflow/runtime_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package workflow

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestWorkflowRuntime(t *testing.T) {
// TODO: Mock grpc conn - currently requires dapr to be available
t.Run("test workflow name is correct", func(t *testing.T) {
wr, err := NewRuntime("localhost", "50001")
require.NoError(t, err)
err = wr.RegisterWorkflow(testOrchestrator)
require.NoError(t, err)
})
}

func TestGetDecorator(t *testing.T) {
name, err := getDecorator(testOrchestrator)
require.NoError(t, err)
assert.Equal(t, "testOrchestrator", name)
}

func testOrchestrator(ctx *Context) (any, error) {
return nil, nil
}

0 comments on commit 61356f4

Please sign in to comment.