From 8cfe2fe9164d2ad8750c79b8deb508b94417b7c5 Mon Sep 17 00:00:00 2001 From: Felipe Martinez Date: Wed, 2 Dec 2020 16:37:36 +0100 Subject: [PATCH] Use discovery interface for client calls --- client/client.go | 20 ++++++++++++++++---- discovery/discovery.go | 6 ++++-- options/options.go | 6 +++++- service.go | 2 +- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/client/client.go b/client/client.go index 6daaff0..e3c52fe 100644 --- a/client/client.go +++ b/client/client.go @@ -19,7 +19,7 @@ var ErrInvalidInput = errors.New("func must have 1 input") var ErrInputPointer = errors.New("the func must take a pointer as an input") type Client interface { - Call(ctx context.Context, service string, path string, req interface{}, resp interface{}, opts ...CallOption) error + Call(service string, path string, req interface{}, resp interface{}, opts ...CallOption) error Subscribe(topic string, callback interface{}) } @@ -29,6 +29,7 @@ type client struct { broker broker.Broker log logger.Logger disc discovery.Discovery + port int16 } type CallError struct { @@ -45,10 +46,12 @@ func NewClient(opts *options.Options) Client { trans: opts.Transport, broker: opts.Broker, log: opts.Logger, + disc: opts.Discovery, + port: opts.RPCPort, } } -func (c *client) Call(ctx context.Context, service string, path string, reqval interface{}, respval interface{}, opts ...CallOption) error { +func (c *client) Call(service string, path string, reqval interface{}, respval interface{}, opts ...CallOption) error { var callopts CallOptions for _, o := range opts { @@ -59,7 +62,16 @@ func (c *client) Call(ctx context.Context, service string, path string, reqval i callopts.Context = context.Background() } - s, err := c.trans.Dial(ctx, service) + if c.disc == nil { + panic("no discovery has been set up") + } + + host, err := c.disc.Find(service) + if err != nil { + return fmt.Errorf("discover service: %w", err) + } + + s, err := c.trans.Dial(callopts.Context, fmt.Sprintf("%s:%d", host, c.port)) if err != nil { return fmt.Errorf("dial: %w", err) } @@ -80,7 +92,7 @@ func (c *client) Call(ctx context.Context, service string, path string, reqval i } var respmsg transport.Message - if err := s.Receive(ctx, &respmsg); err != nil { + if err := s.Receive(callopts.Context, &respmsg); err != nil { return fmt.Errorf("receive message: %w", err) } diff --git a/discovery/discovery.go b/discovery/discovery.go index 31e860f..ea2a687 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -1,7 +1,9 @@ package discovery -import "net" +import "errors" type Discovery interface { - Find(svc string) (net.IP, error) + Find(svc string) (host string, err error) } + +var ErrServiceNotRegistered = errors.New("service not registered") diff --git a/options/options.go b/options/options.go index 5060fe1..cfdc3c6 100644 --- a/options/options.go +++ b/options/options.go @@ -24,6 +24,9 @@ type Options struct { Discovery discovery.Discovery } +// DefaultRPCPort is the port that will be used for RPC connections if no other is specified +const DefaultRPCPort = 7070 + // Option represents a function that can be used to mutate an Options object type Option func(*Options) @@ -34,7 +37,8 @@ func Name(name string) Option { } } -// RPCPort sets the port in which this service's RPC will listen on, as well as the port in which other services' RPC servers are listening on +// RPCPort sets the port in which this service's RPC will listen on, as well as the port in which other services' RPC servers are listening on. +// Defaults to DefaultRPCPort func RPCPort(port int16) Option { return func(o *Options) { o.RPCPort = port diff --git a/service.go b/service.go index d51120e..2f5b410 100644 --- a/service.go +++ b/service.go @@ -39,7 +39,7 @@ type service struct { func NewService(opts ...options.Option) Service { svc := &service{} svc.options.Logger = logger.NewStdoutLogger() - svc.options.RPCPort = 7070 + svc.options.RPCPort = options.DefaultRPCPort svc.Apply(opts...)