Skip to content

Commit

Permalink
Resubscribe services when the gRPC stream terminated unexpectedly (#14)
Browse files Browse the repository at this point in the history
* add discovery client

* refactor discovery

* refactor unit tests
  • Loading branch information
kirk91 authored Dec 9, 2019
1 parent cb093af commit 1ea5b18
Show file tree
Hide file tree
Showing 9 changed files with 1,112 additions and 666 deletions.
6 changes: 3 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"encoding/json"
"sync"

"github.com/samaritan-proxy/samaritan/logger"
"github.com/samaritan-proxy/samaritan/pb/config/bootstrap"
"github.com/samaritan-proxy/samaritan/pb/config/service"
"github.com/samaritan-proxy/samaritan/logger"
)

type serviceWrapper struct {
Expand Down Expand Up @@ -115,7 +115,7 @@ func (c *Config) initDynamic() error {
return err
}

d.SetSvcHook(c.handleSvcUpdate)
d.SetDependencyHook(c.handleDependencyUpdate)
d.SetSvcConfigHook(c.handleSvcConfigUpdate)
d.SetSvcEndpointHook(c.handleSvcEndpointUpdate)
c.d = d
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *Config) MarshalJSON() ([]byte, error) {
return json.Marshal(res)
}

func (c *Config) handleSvcUpdate(added, removed []*service.Service) {
func (c *Config) handleDependencyUpdate(added, removed []*service.Service) {
c.Lock()
defer c.Unlock()

Expand Down
18 changes: 9 additions & 9 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
gomock "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/samaritan-proxy/samaritan/pb/config/bootstrap"
"github.com/samaritan-proxy/samaritan/pb/common"
"github.com/samaritan-proxy/samaritan/pb/config/bootstrap"
"github.com/samaritan-proxy/samaritan/pb/config/hc"
"github.com/samaritan-proxy/samaritan/pb/config/protocol"
"github.com/samaritan-proxy/samaritan/pb/config/service"
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestInitDynamic(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
d := NewMockDynamicSource(ctrl)
d.EXPECT().SetSvcHook(gomock.Any())
d.EXPECT().SetDependencyHook(gomock.Any())
d.EXPECT().SetSvcConfigHook(gomock.Any())
d.EXPECT().SetSvcEndpointHook(gomock.Any())
d.EXPECT().Serve()
Expand All @@ -180,7 +180,7 @@ func TestInitDynamic(t *testing.T) {
time.Sleep(time.Millisecond * 100) // wait dynamic source serving.
}

func TestHandleSvcUpdate(t *testing.T) {
func TestHandleDependencyUpdate(t *testing.T) {
b := &bootstrap.Bootstrap{
Admin: &bootstrap.Admin{
Bind: &common.Address{
Expand All @@ -198,14 +198,14 @@ func TestHandleSvcUpdate(t *testing.T) {
{Name: "foo"},
{Name: "bar"},
}
c.handleSvcUpdate(added, nil)
c.handleDependencyUpdate(added, nil)
assert.Equal(t, 2, len(c.sws))

removed := []*service.Service{
{Name: "bar"},
{Name: "zoo"},
}
c.handleSvcUpdate(nil, removed)
c.handleDependencyUpdate(nil, removed)
assert.Equal(t, 1, len(c.sws))

e := <-evtCh
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestHandleSvcConfigUpdate(t *testing.T) {
addedSvcs := []*service.Service{
{Name: "foo"},
}
c.handleSvcUpdate(addedSvcs, nil)
c.handleDependencyUpdate(addedSvcs, nil)

newCfg := new(service.Config)
c.handleSvcConfigUpdate("foo", newCfg)
Expand All @@ -252,7 +252,7 @@ func TestHandleSvcConfigUpdate(t *testing.T) {
assert.NoError(t, err)
evtCh := c.Subscribe()

c.handleSvcUpdate(
c.handleDependencyUpdate(
[]*service.Service{{Name: "foo"}},
nil,
)
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestHandleSvEndpointUpdate(t *testing.T) {
addedSvcs := []*service.Service{
{Name: "foo"},
}
c.handleSvcUpdate(addedSvcs, nil)
c.handleDependencyUpdate(addedSvcs, nil)

added := []*service.Endpoint{
{Address: &common.Address{Ip: "127.0.0.1", Port: 8888}},
Expand All @@ -323,7 +323,7 @@ func TestHandleSvEndpointUpdate(t *testing.T) {
assert.NoError(t, err)
evtCh := c.Subscribe()

c.handleSvcUpdate(
c.handleDependencyUpdate(
[]*service.Service{{Name: "foo"}},
nil,
)
Expand Down
Loading

0 comments on commit 1ea5b18

Please sign in to comment.