Skip to content

Commit

Permalink
feat: initial wfclient implementation
Browse files Browse the repository at this point in the history
Signed-off-by: mikeee <[email protected]>
  • Loading branch information
mikeee committed Jan 14, 2024
1 parent a55d81d commit 947414d
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 3 deletions.
19 changes: 19 additions & 0 deletions examples/workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ expected_stdout_lines:
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow terminated'
- '== APP == workflow purged'
- '== APP == workflow client test'
- '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == [wfclient] workflow running'
- '== APP == [wfclient] stage: 1'
- '== APP == [wfclient] event raised'
- '== APP == [wfclient] stage: 2'
- '== APP == [wfclient] workflow terminated'
- '== APP == [wfclient] workflow purged'
- '== APP == workflow runtime successfully shutdown'
background: true
sleep: 60
-->
Expand Down Expand Up @@ -61,4 +71,13 @@ dapr run --app-id workflow \
- '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == workflow terminated'
- '== APP == workflow purged'
- '== APP == workflow client test'
- '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9'
- '== APP == [wfclient] workflow running'
- '== APP == [wfclient] stage: 1'
- '== APP == [wfclient] event raised'
- '== APP == [wfclient] stage: 2'
- '== APP == [wfclient] workflow terminated'
- '== APP == [wfclient] workflow purged'
- '== APP == workflow runtime successfully shutdown'
```
66 changes: 66 additions & 0 deletions examples/workflow/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,77 @@ func main() {

fmt.Println("workflow purged")

// WFClient
// TODO: Expand client validation

stage = 0
fmt.Println("workflow client test")

wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("[wfclient] faield to initialize: %v", err)
}

id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("[wfclient] failed to start workflow: %v", err)
}

fmt.Printf("[wfclient] started workflow with id: %s\n", id)

metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
if err != nil {
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
}

fmt.Printf("[wfclient] workflow running: %v\n", metadata.IsRunning())

if stage != 1 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// TODO: WaitForWorkflowStart
// TODO: WaitForWorkflowCompletion

// raise event

if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
log.Fatalf("[wfclient] failed to raise event: %v", err)
}

fmt.Println("[wfclient] event raised")

// Sleep to allow the workflow to advance
time.Sleep(time.Second)

if stage != 2 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
}

fmt.Printf("[wfclient] stage: %d\n", stage)

// stop workflow
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
}

fmt.Println("[wfclient] workflow terminated")

if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
}

fmt.Println("[wfclient] workflow purged")

// stop workflow runtime
if err := wr.Shutdown(); err != nil {
log.Fatalf("failed to shutdown runtime: %v", err)
}

fmt.Println("workflow runtime successfully shutdown")

time.Sleep(time.Second * 5)
}

Expand Down
162 changes: 162 additions & 0 deletions workflow/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package workflow

import (
"context"
"errors"
"time"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
durabletaskclient "github.com/microsoft/durabletask-go/client"

dapr "github.com/dapr/go-sdk/client"
)

type Client interface {
ScheduleNewWorkflow(ctx context.Context) (string, error)
FetchWorkflowMetadata(ctx context.Context) (string, error)
WaitForWorkflowStart(ctx context.Context) (string, error)
WaitForWorkflowCompletion(ctx context.Context) (string, error)
TerminateWorkflow(ctx context.Context) error
RaiseEvent(ctx context.Context) error
SuspendWorkflow(ctx context.Context) error
ResumeWorkflow(ctx context.Context) error
PurgeWorkflow(ctx context.Context) error
}

type client struct {
taskHubClient *durabletaskclient.TaskHubGrpcClient
}

func WithInstanceID(id string) api.NewOrchestrationOptions {
return api.WithInstanceID(api.InstanceID(id))

Check warning on line 32 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L31-L32

Added lines #L31 - L32 were not covered by tests
}

// TODO: Implement WithOrchestrationIdReusePolicy

func WithInput(input any) api.NewOrchestrationOptions {
return api.WithInput(input)

Check warning on line 38 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L37-L38

Added lines #L37 - L38 were not covered by tests
}

func WithRawInput(input string) api.NewOrchestrationOptions {
return api.WithRawInput(input)

Check warning on line 42 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L41-L42

Added lines #L41 - L42 were not covered by tests
}

func WithStartTime(time time.Time) api.NewOrchestrationOptions {
return api.WithStartTime(time)

Check warning on line 46 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}

func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions {
return api.WithFetchPayloads(fetchPayloads)

Check warning on line 50 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L49-L50

Added lines #L49 - L50 were not covered by tests
}

func WithEventPayload(data any) api.RaiseEventOptions {
return api.WithEventPayload(data)

Check warning on line 54 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L53-L54

Added lines #L53 - L54 were not covered by tests
}

func WithRawEventData(data string) api.RaiseEventOptions {
return api.WithRawEventData(data)

Check warning on line 58 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

func WithOutput(data any) api.TerminateOptions {
return api.WithOutput(data)

Check warning on line 62 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L61-L62

Added lines #L61 - L62 were not covered by tests
}

func WithRawOutput(data string) api.TerminateOptions {
return api.WithRawOutput(data)

Check warning on line 66 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L65-L66

Added lines #L65 - L66 were not covered by tests
}

// TODO: Implement mocks

func NewClient() (client, error) { // TODO: Implement custom connection
daprClient, err := dapr.NewClient()
if err != nil {
return client{}, err
}

taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())

return client{
taskHubClient: taskHubClient,
}, nil

Check warning on line 81 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L77-L81

Added lines #L77 - L81 were not covered by tests
}

func (c *client) ScheduleNewWorkflow(ctx context.Context, workflow string, opts ...api.NewOrchestrationOptions) (id string, err error) {
if workflow == "" {
return "", errors.New("no workflow specified")
}
workflowID, err := c.taskHubClient.ScheduleNewOrchestration(ctx, workflow, opts...)
if err != nil {
return "", err
}
return string(workflowID), nil

Check warning on line 92 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L84-L92

Added lines #L84 - L92 were not covered by tests
}

func (c *client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
if id == "" {
return nil, errors.New("no workflow id specified")
}
return c.taskHubClient.FetchOrchestrationMetadata(ctx, api.InstanceID(id), opts...)

Check warning on line 99 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L95-L99

Added lines #L95 - L99 were not covered by tests
}

func (c *client) WaitForWorkflowStart(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
if id == "" {
return nil, errors.New("no workflow id specified")
}
return c.taskHubClient.WaitForOrchestrationStart(ctx, api.InstanceID(id), opts...)

Check warning on line 106 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L102-L106

Added lines #L102 - L106 were not covered by tests
}

func (c *client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) {
if id == "" {
return nil, errors.New("no workflow id specified")
}
return c.taskHubClient.WaitForOrchestrationCompletion(ctx, api.InstanceID(id), opts...)

Check warning on line 113 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L109-L113

Added lines #L109 - L113 were not covered by tests
}

func (c *client) TerminateWorkflow(ctx context.Context, id string, opts ...api.TerminateOptions) error {
if id == "" {
return errors.New("no workflow id specified")
}
return c.taskHubClient.TerminateOrchestration(ctx, api.InstanceID(id), opts...)

Check warning on line 120 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L116-L120

Added lines #L116 - L120 were not covered by tests
}

func (c *client) RaiseEvent(ctx context.Context, id, eventName string, opts ...api.RaiseEventOptions) error {
if id == " " {
return errors.New("no workflow id specified")
}
if eventName == "" {
return errors.New("no event name specified")
}
return c.taskHubClient.RaiseEvent(ctx, api.InstanceID(id), eventName, opts...)

Check warning on line 130 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L123-L130

Added lines #L123 - L130 were not covered by tests
}

func (c *client) SuspendWorkflow(ctx context.Context, id, reason string) error {
if id == "" {
return errors.New("no workflow id specified")
}
if reason == "" {
return errors.New("no reason specified")
}
return c.taskHubClient.SuspendOrchestration(ctx, api.InstanceID(id), reason)

Check warning on line 140 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L133-L140

Added lines #L133 - L140 were not covered by tests
}

func (c *client) ResumeWorkflow(ctx context.Context, id, reason string) error {
if id == "" {
return errors.New("no workflow id specified")
}
if reason == "" {
return errors.New("no reason specified")
}
return c.taskHubClient.ResumeOrchestration(ctx, api.InstanceID(id), reason)

Check warning on line 150 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L143-L150

Added lines #L143 - L150 were not covered by tests
}

func (c *client) PurgeWorkflow(ctx context.Context, id string) error {
if id == "" {
return errors.New("no workflow id specified")
}
return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id))

Check warning on line 157 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L153-L157

Added lines #L153 - L157 were not covered by tests
}

func (c *client) Close() error {
return nil

Check warning on line 161 in workflow/client.go

View check run for this annotation

Codecov / codecov/patch

workflow/client.go#L160-L161

Added lines #L160 - L161 were not covered by tests
}
15 changes: 15 additions & 0 deletions workflow/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package workflow

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewClient(t *testing.T) {
// Currently will always fail if no dapr connection available
client, err := NewClient()
assert.Empty(t, client)
require.Error(t, err)
}
6 changes: 3 additions & 3 deletions workflow/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
dapr "github.com/dapr/go-sdk/client"

"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/client"
durabletaskclient "github.com/microsoft/durabletask-go/client"
"github.com/microsoft/durabletask-go/task"
)

type WorkflowRuntime struct {
tasks *task.TaskRegistry
client *client.TaskHubGrpcClient
client *durabletaskclient.TaskHubGrpcClient

mutex sync.Mutex // TODO: implement
quit chan bool
Expand All @@ -38,7 +38,7 @@ func NewRuntime() (*WorkflowRuntime, error) {

return &WorkflowRuntime{
tasks: task.NewTaskRegistry(),
client: client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()),
client: durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()),
quit: make(chan bool),
close: daprClient.Close,
}, nil

Check warning on line 44 in workflow/runtime.go

View check run for this annotation

Codecov / codecov/patch

workflow/runtime.go#L39-L44

Added lines #L39 - L44 were not covered by tests
Expand Down
22 changes: 22 additions & 0 deletions workflow/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package workflow

import "time"

type Metadata struct {
InstanceID string `json:"id"`
Name string `json:"name"`
RuntimeStatus Status `json:"status"`
CreatedAt time.Time `json:"createdAt"`
LastUpdatedAt time.Time `json:"lastUpdatedAt"`
SerializedInput string `json:"serializedInput"`
SerializedOutput string `json:"serializedOutput"`
SerializedCustomStatus string `json:"serializedCustomStatus"`
FailureDetails *FailureDetails `json:"failureDetails"`
}

type FailureDetails struct {
Type string `json:"type"`
Message string `json:"message"`
StackTrace string `json:"stackTrace"`
InnerFailure *FailureDetails `json:"innerFailure"`
}

0 comments on commit 947414d

Please sign in to comment.