diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 078b89b9..b226803e 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -28,6 +28,16 @@ expected_stdout_lines: - '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' - '== APP == workflow terminated' - '== APP == workflow purged' + - '== APP == workflow client test' + - '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' + - '== APP == [wfclient] workflow running' + - '== APP == [wfclient] stage: 1' + - '== APP == [wfclient] event raised' + - '== APP == [wfclient] stage: 2' + - '== APP == [wfclient] workflow terminated' + - '== APP == [wfclient] workflow purged' + - '== APP == workflow runtime successfully shutdown' + background: true sleep: 60 --> @@ -61,4 +71,13 @@ dapr run --app-id workflow \ - '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' - '== APP == workflow terminated' - '== APP == workflow purged' + - '== APP == workflow client test' + - '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' + - '== APP == [wfclient] workflow running' + - '== APP == [wfclient] stage: 1' + - '== APP == [wfclient] event raised' + - '== APP == [wfclient] stage: 2' + - '== APP == [wfclient] workflow terminated' + - '== APP == [wfclient] workflow purged' + - '== APP == workflow runtime successfully shutdown' ``` \ No newline at end of file diff --git a/examples/workflow/main.go b/examples/workflow/main.go index aa04ec8c..96bd1cd8 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -222,11 +222,77 @@ func main() { fmt.Println("workflow purged") + // WFClient + // TODO: Expand client validation + + stage = 0 + fmt.Println("workflow client test") + + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatalf("[wfclient] faield to initialize: %v", err) + } + + id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) + if err != nil { + log.Fatalf("[wfclient] failed to start workflow: %v", err) + } + + fmt.Printf("[wfclient] started workflow with id: %s\n", id) + + metadata, err := wfClient.FetchWorkflowMetadata(ctx, id) + if err != nil { + log.Fatalf("[wfclient] failed to get worfklow: %v", err) + } + + fmt.Printf("[wfclient] workflow running: %v\n", metadata.IsRunning()) + + if stage != 1 { + log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage) + } + + fmt.Printf("[wfclient] stage: %d\n", stage) + + // TODO: WaitForWorkflowStart + // TODO: WaitForWorkflowCompletion + + // raise event + + if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil { + log.Fatalf("[wfclient] failed to raise event: %v", err) + } + + fmt.Println("[wfclient] event raised") + + // Sleep to allow the workflow to advance + time.Sleep(time.Second) + + if stage != 2 { + log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage) + } + + fmt.Printf("[wfclient] stage: %d\n", stage) + + // stop workflow + if err := wfClient.TerminateWorkflow(ctx, id); err != nil { + log.Fatalf("[wfclient] failed to terminate workflow: %v", err) + } + + fmt.Println("[wfclient] workflow terminated") + + if err := wfClient.PurgeWorkflow(ctx, id); err != nil { + log.Fatalf("[wfclient] failed to purge workflow: %v", err) + } + + fmt.Println("[wfclient] workflow purged") + // stop workflow runtime if err := wr.Shutdown(); err != nil { log.Fatalf("failed to shutdown runtime: %v", err) } + fmt.Println("workflow runtime successfully shutdown") + time.Sleep(time.Second * 5) } diff --git a/workflow/client.go b/workflow/client.go new file mode 100644 index 00000000..43734806 --- /dev/null +++ b/workflow/client.go @@ -0,0 +1,162 @@ +package workflow + +import ( + "context" + "errors" + "time" + + "github.com/microsoft/durabletask-go/api" + "github.com/microsoft/durabletask-go/backend" + durabletaskclient "github.com/microsoft/durabletask-go/client" + + dapr "github.com/dapr/go-sdk/client" +) + +type Client interface { + ScheduleNewWorkflow(ctx context.Context) (string, error) + FetchWorkflowMetadata(ctx context.Context) (string, error) + WaitForWorkflowStart(ctx context.Context) (string, error) + WaitForWorkflowCompletion(ctx context.Context) (string, error) + TerminateWorkflow(ctx context.Context) error + RaiseEvent(ctx context.Context) error + SuspendWorkflow(ctx context.Context) error + ResumeWorkflow(ctx context.Context) error + PurgeWorkflow(ctx context.Context) error +} + +type client struct { + taskHubClient *durabletaskclient.TaskHubGrpcClient +} + +func WithInstanceID(id string) api.NewOrchestrationOptions { + return api.WithInstanceID(api.InstanceID(id)) +} + +// TODO: Implement WithOrchestrationIdReusePolicy + +func WithInput(input any) api.NewOrchestrationOptions { + return api.WithInput(input) +} + +func WithRawInput(input string) api.NewOrchestrationOptions { + return api.WithRawInput(input) +} + +func WithStartTime(time time.Time) api.NewOrchestrationOptions { + return api.WithStartTime(time) +} + +func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions { + return api.WithFetchPayloads(fetchPayloads) +} + +func WithEventPayload(data any) api.RaiseEventOptions { + return api.WithEventPayload(data) +} + +func WithRawEventData(data string) api.RaiseEventOptions { + return api.WithRawEventData(data) +} + +func WithOutput(data any) api.TerminateOptions { + return api.WithOutput(data) +} + +func WithRawOutput(data string) api.TerminateOptions { + return api.WithRawOutput(data) +} + +// TODO: Implement mocks + +func NewClient() (client, error) { // TODO: Implement custom connection + daprClient, err := dapr.NewClient() + if err != nil { + return client{}, err + } + + taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()) + + return client{ + taskHubClient: taskHubClient, + }, nil +} + +func (c *client) ScheduleNewWorkflow(ctx context.Context, workflow string, opts ...api.NewOrchestrationOptions) (id string, err error) { + if workflow == "" { + return "", errors.New("no workflow specified") + } + workflowID, err := c.taskHubClient.ScheduleNewOrchestration(ctx, workflow, opts...) + if err != nil { + return "", err + } + return string(workflowID), nil +} + +func (c *client) FetchWorkflowMetadata(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) { + if id == "" { + return nil, errors.New("no workflow id specified") + } + return c.taskHubClient.FetchOrchestrationMetadata(ctx, api.InstanceID(id), opts...) +} + +func (c *client) WaitForWorkflowStart(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) { + if id == "" { + return nil, errors.New("no workflow id specified") + } + return c.taskHubClient.WaitForOrchestrationStart(ctx, api.InstanceID(id), opts...) +} + +func (c *client) WaitForWorkflowCompletion(ctx context.Context, id string, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) { + if id == "" { + return nil, errors.New("no workflow id specified") + } + return c.taskHubClient.WaitForOrchestrationCompletion(ctx, api.InstanceID(id), opts...) +} + +func (c *client) TerminateWorkflow(ctx context.Context, id string, opts ...api.TerminateOptions) error { + if id == "" { + return errors.New("no workflow id specified") + } + return c.taskHubClient.TerminateOrchestration(ctx, api.InstanceID(id), opts...) +} + +func (c *client) RaiseEvent(ctx context.Context, id, eventName string, opts ...api.RaiseEventOptions) error { + if id == " " { + return errors.New("no workflow id specified") + } + if eventName == "" { + return errors.New("no event name specified") + } + return c.taskHubClient.RaiseEvent(ctx, api.InstanceID(id), eventName, opts...) +} + +func (c *client) SuspendWorkflow(ctx context.Context, id, reason string) error { + if id == "" { + return errors.New("no workflow id specified") + } + if reason == "" { + return errors.New("no reason specified") + } + return c.taskHubClient.SuspendOrchestration(ctx, api.InstanceID(id), reason) +} + +func (c *client) ResumeWorkflow(ctx context.Context, id, reason string) error { + if id == "" { + return errors.New("no workflow id specified") + } + if reason == "" { + return errors.New("no reason specified") + } + return c.taskHubClient.ResumeOrchestration(ctx, api.InstanceID(id), reason) +} + +func (c *client) PurgeWorkflow(ctx context.Context, id string) error { + if id == "" { + return errors.New("no workflow id specified") + } + return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id)) +} + +func (c *client) Close() error { + return nil +} diff --git a/workflow/client_test.go b/workflow/client_test.go new file mode 100644 index 00000000..69c1378e --- /dev/null +++ b/workflow/client_test.go @@ -0,0 +1,15 @@ +package workflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewClient(t *testing.T) { + // Currently will always fail if no dapr connection available + client, err := NewClient() + assert.Empty(t, client) + require.Error(t, err) +} diff --git a/workflow/runtime.go b/workflow/runtime.go index 1841f7b1..9b7e09d5 100644 --- a/workflow/runtime.go +++ b/workflow/runtime.go @@ -13,13 +13,13 @@ import ( dapr "github.com/dapr/go-sdk/client" "github.com/microsoft/durabletask-go/backend" - "github.com/microsoft/durabletask-go/client" + durabletaskclient "github.com/microsoft/durabletask-go/client" "github.com/microsoft/durabletask-go/task" ) type WorkflowRuntime struct { tasks *task.TaskRegistry - client *client.TaskHubGrpcClient + client *durabletaskclient.TaskHubGrpcClient mutex sync.Mutex // TODO: implement quit chan bool @@ -38,7 +38,7 @@ func NewRuntime() (*WorkflowRuntime, error) { return &WorkflowRuntime{ tasks: task.NewTaskRegistry(), - client: client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()), + client: durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()), quit: make(chan bool), close: daprClient.Close, }, nil diff --git a/workflow/workflow.go b/workflow/workflow.go new file mode 100644 index 00000000..76d4cc43 --- /dev/null +++ b/workflow/workflow.go @@ -0,0 +1,22 @@ +package workflow + +import "time" + +type Metadata struct { + InstanceID string `json:"id"` + Name string `json:"name"` + RuntimeStatus Status `json:"status"` + CreatedAt time.Time `json:"createdAt"` + LastUpdatedAt time.Time `json:"lastUpdatedAt"` + SerializedInput string `json:"serializedInput"` + SerializedOutput string `json:"serializedOutput"` + SerializedCustomStatus string `json:"serializedCustomStatus"` + FailureDetails *FailureDetails `json:"failureDetails"` +} + +type FailureDetails struct { + Type string `json:"type"` + Message string `json:"message"` + StackTrace string `json:"stackTrace"` + InnerFailure *FailureDetails `json:"innerFailure"` +}