Skip to content

Commit

Permalink
Merge pull request #5 from dispatchrun/coroutine
Browse files Browse the repository at this point in the history
Coroutines
  • Loading branch information
chriso authored Jun 23, 2024
2 parents f931dd7 + 0010d75 commit 44a121b
Show file tree
Hide file tree
Showing 39 changed files with 2,162 additions and 845 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*_durable.go
15 changes: 14 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: fmt lint test
.PHONY: clean coroc fmt lint test integration-test clean coroc

fmt:
go fmt ./...
Expand All @@ -8,3 +8,16 @@ lint:

test:
go test ./...

integration-test: clean coroc
go run ./dispatchtest/integration # volatile mode
coroc ./dispatchtest/integration
go run -tags durable ./dispatchtest/integration # durable mode

clean:
find . -name '*_durable.go' -delete

coroc:
@which coroc &>/dev/null \
|| echo "Installing coroc..." \
&& go install github.com/dispatchrun/coroutine/compiler/cmd/coroc@latest
127 changes: 69 additions & 58 deletions dispatch.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build !durable

package dispatch

import (
Expand All @@ -10,12 +12,16 @@ import (
"os"
"strings"
"sync"
_ "unsafe"

"buf.build/gen/go/stealthrocket/dispatch-proto/connectrpc/go/dispatch/sdk/v1/sdkv1connect"
sdkv1 "buf.build/gen/go/stealthrocket/dispatch-proto/protocolbuffers/go/dispatch/sdk/v1"
"connectrpc.com/connect"
"connectrpc.com/validate"
"github.com/dispatchrun/dispatch-go/dispatchclient"
"github.com/dispatchrun/dispatch-go/dispatchproto"
"github.com/dispatchrun/dispatch-go/internal/auth"
"github.com/dispatchrun/dispatch-go/internal/env"
)

// Dispatch is a Dispatch endpoint.
Expand All @@ -24,31 +30,33 @@ type Dispatch struct {
verificationKey string
serveAddr string
env []string
opts []Option

client *Client
client *dispatchclient.Client
clientErr error

path string
handler http.Handler

functions map[string]Function
functions dispatchproto.FunctionMap
mu sync.Mutex
}

// New creates a Dispatch endpoint.
func New(opts ...DispatchOption) (*Dispatch, error) {
func New(opts ...Option) (*Dispatch, error) {
d := &Dispatch{
env: os.Environ(),
functions: map[string]Function{},
opts: opts,
functions: map[string]dispatchproto.Function{},
}
for _, opt := range opts {
opt.configureDispatch(d)
opt(d)
}

// Prepare the endpoint URL.
var endpointUrlFromEnv bool
if d.endpointUrl == "" {
d.endpointUrl = getenv(d.env, "DISPATCH_ENDPOINT_URL")
d.endpointUrl = env.Get(d.env, "DISPATCH_ENDPOINT_URL")
endpointUrlFromEnv = true
}
if d.endpointUrl == "" {
Expand All @@ -64,7 +72,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) {

// Prepare the address to serve on.
if d.serveAddr == "" {
d.serveAddr = getenv(d.env, "DISPATCH_ENDPOINT_ADDR")
d.serveAddr = env.Get(d.env, "DISPATCH_ENDPOINT_ADDR")
if d.serveAddr == "" {
d.serveAddr = "127.0.0.1:8000"
}
Expand All @@ -73,7 +81,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) {
// Prepare the verification key.
var verificationKeyFromEnv bool
if d.verificationKey == "" {
d.verificationKey = getenv(d.env, "DISPATCH_VERIFICATION_KEY")
d.verificationKey = env.Get(d.env, "DISPATCH_VERIFICATION_KEY")
verificationKeyFromEnv = true
}
var verificationKey ed25519.PublicKey
Expand All @@ -93,8 +101,7 @@ func New(opts ...DispatchOption) (*Dispatch, error) {
if err != nil {
return nil, err
}
grpcHandler := &dispatchFunctionServiceHandler{d}
d.path, d.handler = sdkv1connect.NewFunctionServiceHandler(grpcHandler, connect.WithInterceptors(validator))
d.path, d.handler = sdkv1connect.NewFunctionServiceHandler(dispatchHandler{d}, connect.WithInterceptors(validator))

// Setup request signature validation.
if verificationKey == nil {
Expand All @@ -109,29 +116,21 @@ func New(opts ...DispatchOption) (*Dispatch, error) {

// Optionally attach a client.
if d.client == nil {
d.client, d.clientErr = NewClient(Env(d.env...))
d.client, d.clientErr = dispatchclient.New(dispatchclient.Env(d.env...))
}

return d, nil
}

// DispatchOption configures a Dispatch endpoint.
type DispatchOption interface {
configureDispatch(d *Dispatch)
}

type dispatchOptionFunc func(d *Dispatch)

func (fn dispatchOptionFunc) configureDispatch(d *Dispatch) {
fn(d)
}
// Option configures a Dispatch endpoint.
type Option func(*Dispatch)

// EndpointUrl sets the URL of the Dispatch endpoint.
//
// It defaults to the value of the DISPATCH_ENDPOINT_URL environment
// variable.
func EndpointUrl(endpointUrl string) DispatchOption {
return dispatchOptionFunc(func(d *Dispatch) { d.endpointUrl = endpointUrl })
func EndpointUrl(endpointUrl string) Option {
return func(d *Dispatch) { d.endpointUrl = endpointUrl }
}

// VerificationKey sets the verification key to use when verifying
Expand All @@ -144,8 +143,8 @@ func EndpointUrl(endpointUrl string) DispatchOption {
//
// If a verification key is not provided, request signatures will
// not be validated.
func VerificationKey(verificationKey string) DispatchOption {
return dispatchOptionFunc(func(d *Dispatch) { d.verificationKey = verificationKey })
func VerificationKey(verificationKey string) Option {
return func(d *Dispatch) { d.verificationKey = verificationKey }
}

// ServeAddress sets the address that the Dispatch endpoint
Expand All @@ -157,28 +156,41 @@ func VerificationKey(verificationKey string) DispatchOption {
// It defaults to the value of the DISPATCH_ENDPOINT_ADDR environment
// variable, which is automatically set by the Dispatch CLI. If this
// is unset, it defaults to 127.0.0.1:8000.
func ServeAddress(addr string) DispatchOption {
return dispatchOptionFunc(func(d *Dispatch) { d.serveAddr = addr })
func ServeAddress(addr string) Option {
return func(d *Dispatch) { d.serveAddr = addr }
}

// Register registers a function.
func (d *Dispatch) Register(fn Function) {
d.mu.Lock()
defer d.mu.Unlock()
// Env sets the environment variables that a Dispatch endpoint
// parses its default configuration from.
//
// It defaults to os.Environ().
func Env(env ...string) Option {
return func(d *Dispatch) { d.env = env }
}

d.functions[fn.Name()] = fn
// Client sets the client to use when dispatching calls
// from functions registered on the endpoint.
//
// By default the Dispatch endpoint will attempt to construct
// a dispatchclient.Client instance using the DISPATCH_API_KEY
// and optional DISPATCH_API_URL environment variables. If more
// control is required over client configuration, the custom
// client instance can be registered here and used instead.
func Client(client *dispatchclient.Client) Option {
return func(d *Dispatch) { d.client = client }
}

// Bind the function to this endpoint, so that the function's
// NewCall and Dispatch methods can be used to build and
// dispatch calls.
fn.bind(d)
// Register registers a function.
func (d *Dispatch) Register(fn AnyFunction) {
d.RegisterPrimitive(fn.Register(d))
}

func (d *Dispatch) lookupFunction(name string) Function {
// RegisterPrimitive registers a primitive function.
func (d *Dispatch) RegisterPrimitive(name string, fn dispatchproto.Function) {
d.mu.Lock()
defer d.mu.Unlock()

return d.functions[name]
d.functions[name] = fn
}

// URL is the URL of the Dispatch endpoint.
Expand All @@ -193,29 +205,12 @@ func (d *Dispatch) Handler() (string, http.Handler) {
}

// Client returns the Client attached to this endpoint.
func (d *Dispatch) Client() (*Client, error) {
func (d *Dispatch) Client() (*dispatchclient.Client, error) {
return d.client, d.clientErr
}

// The gRPC handler is unexported so that the http.Handler can
// be wrapped in order to validate request signatures.
type dispatchFunctionServiceHandler struct {
dispatch *Dispatch
}

func (d *dispatchFunctionServiceHandler) Run(ctx context.Context, req *connect.Request[sdkv1.RunRequest]) (*connect.Response[sdkv1.RunResponse], error) {
var res Response
fn := d.dispatch.lookupFunction(req.Msg.Function)
if fn == nil {
res = NewResponseErrorf("%w: function %q not found", ErrNotFound, req.Msg.Function)
} else {
res = fn.Run(ctx, Request{req.Msg})
}
return connect.NewResponse(res.proto), nil
}

// Serve serves the Dispatch endpoint.
func (d *Dispatch) Serve() error {
// ListenAndServe serves the Dispatch endpoint.
func (d *Dispatch) ListenAndServe() error {
mux := http.NewServeMux()
mux.Handle(d.Handler())

Expand All @@ -224,3 +219,19 @@ func (d *Dispatch) Serve() error {
server := &http.Server{Addr: d.serveAddr, Handler: mux}
return server.ListenAndServe()
}

// The gRPC handler is deliberately unexported. This forces
// the user to access it through Dispatch.Handler, and get
// a handler that has signature verification middleware attached.
type dispatchHandler struct{ dispatch *Dispatch }

func (d dispatchHandler) Run(ctx context.Context, req *connect.Request[sdkv1.RunRequest]) (*connect.Response[sdkv1.RunResponse], error) {
res := d.dispatch.functions.Run(ctx, newProtoRequest(req.Msg))
return connect.NewResponse(responseProto(res)), nil
}

//go:linkname newProtoRequest github.com/dispatchrun/dispatch-go/dispatchproto.newProtoRequest
func newProtoRequest(r *sdkv1.RunRequest) dispatchproto.Request

//go:linkname responseProto github.com/dispatchrun/dispatch-go/dispatchproto.responseProto
func responseProto(r dispatchproto.Response) *sdkv1.RunResponse
Loading

0 comments on commit 44a121b

Please sign in to comment.