Skip to content

Commit

Permalink
Use discovery interface for client calls
Browse files Browse the repository at this point in the history
  • Loading branch information
pipe01 committed Dec 2, 2020
1 parent b2d6c16 commit 8cfe2fe
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 8 deletions.
20 changes: 16 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var ErrInvalidInput = errors.New("func must have 1 input")
var ErrInputPointer = errors.New("the func must take a pointer as an input")

type Client interface {
Call(ctx context.Context, service string, path string, req interface{}, resp interface{}, opts ...CallOption) error
Call(service string, path string, req interface{}, resp interface{}, opts ...CallOption) error
Subscribe(topic string, callback interface{})
}

Expand All @@ -29,6 +29,7 @@ type client struct {
broker broker.Broker
log logger.Logger
disc discovery.Discovery
port int16
}

type CallError struct {
Expand All @@ -45,10 +46,12 @@ func NewClient(opts *options.Options) Client {
trans: opts.Transport,
broker: opts.Broker,
log: opts.Logger,
disc: opts.Discovery,
port: opts.RPCPort,
}
}

func (c *client) Call(ctx context.Context, service string, path string, reqval interface{}, respval interface{}, opts ...CallOption) error {
func (c *client) Call(service string, path string, reqval interface{}, respval interface{}, opts ...CallOption) error {
var callopts CallOptions

for _, o := range opts {
Expand All @@ -59,7 +62,16 @@ func (c *client) Call(ctx context.Context, service string, path string, reqval i
callopts.Context = context.Background()
}

s, err := c.trans.Dial(ctx, service)
if c.disc == nil {
panic("no discovery has been set up")
}

host, err := c.disc.Find(service)
if err != nil {
return fmt.Errorf("discover service: %w", err)
}

s, err := c.trans.Dial(callopts.Context, fmt.Sprintf("%s:%d", host, c.port))
if err != nil {
return fmt.Errorf("dial: %w", err)
}
Expand All @@ -80,7 +92,7 @@ func (c *client) Call(ctx context.Context, service string, path string, reqval i
}

var respmsg transport.Message
if err := s.Receive(ctx, &respmsg); err != nil {
if err := s.Receive(callopts.Context, &respmsg); err != nil {
return fmt.Errorf("receive message: %w", err)
}

Expand Down
6 changes: 4 additions & 2 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package discovery

import "net"
import "errors"

type Discovery interface {
Find(svc string) (net.IP, error)
Find(svc string) (host string, err error)
}

var ErrServiceNotRegistered = errors.New("service not registered")
6 changes: 5 additions & 1 deletion options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Options struct {
Discovery discovery.Discovery
}

// DefaultRPCPort is the port that will be used for RPC connections if no other is specified
const DefaultRPCPort = 7070

// Option represents a function that can be used to mutate an Options object
type Option func(*Options)

Expand All @@ -34,7 +37,8 @@ func Name(name string) Option {
}
}

// RPCPort sets the port in which this service's RPC will listen on, as well as the port in which other services' RPC servers are listening on
// RPCPort sets the port in which this service's RPC will listen on, as well as the port in which other services' RPC servers are listening on.
// Defaults to DefaultRPCPort
func RPCPort(port int16) Option {
return func(o *Options) {
o.RPCPort = port
Expand Down
2 changes: 1 addition & 1 deletion service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type service struct {
func NewService(opts ...options.Option) Service {
svc := &service{}
svc.options.Logger = logger.NewStdoutLogger()
svc.options.RPCPort = 7070
svc.options.RPCPort = options.DefaultRPCPort

svc.Apply(opts...)

Expand Down

0 comments on commit 8cfe2fe

Please sign in to comment.