From 1a01a7600503e0374668fde9752f1df7f2423f42 Mon Sep 17 00:00:00 2001 From: kirk Date: Thu, 28 Nov 2019 15:56:15 +0800 Subject: [PATCH 1/4] discovery: fix stream hang (#12) * fix stream hang * enable keepalive --- config/dynamic.go | 182 ++++++++++++++++++++++------------------- config/dynamic_test.go | 161 +++++++++++++++++++++++------------- 2 files changed, 202 insertions(+), 141 deletions(-) diff --git a/config/dynamic.go b/config/dynamic.go index 22d7e68..ba8966e 100644 --- a/config/dynamic.go +++ b/config/dynamic.go @@ -25,6 +25,7 @@ import ( "time" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" "github.com/samaritan-proxy/samaritan/logger" "github.com/samaritan-proxy/samaritan/pb/api" @@ -84,7 +85,14 @@ type dynamicSource struct { var newDiscoveryServiceClient = func(cfg *bootstrap.ConfigSource) (c api.DiscoveryServiceClient, shutdown func() error, err error) { target := cfg.Endpoint // TODO: support Authentication - cc, err := grpc.Dial(target, grpc.WithInsecure()) + options := []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Second * 30, + Timeout: time.Second * 10, + }), + } + cc, err := grpc.Dial(target, options...) if err != nil { return nil, nil, err } @@ -195,7 +203,6 @@ func (d *dynamicSource) streamSvcs(ctx context.Context) { } // TODO: validate resp - if d.svcHook != nil { d.svcHook(resp.Added, resp.Removed) } @@ -230,11 +237,14 @@ func (d *dynamicSource) unsubscribeSvc(svc *service.Service) { } func (d *dynamicSource) StreamSvcConfigs() { + defer func() { + logger.Debugf("StreamSvcConfigs done") + }() + for { d.streamSvcConfigs(context.Background()) select { case <-d.quit: - logger.Debugf("StreamSvcConfigs done") return default: } @@ -251,80 +261,86 @@ func (d *dynamicSource) StreamSvcConfigs() { } func (d *dynamicSource) streamSvcConfigs(ctx context.Context) { + // create stream client stream, err := d.c.StreamSvcConfigs(ctx) if err != nil { logger.Warnf("Fail to create stream client: %v", err) return } - sendDone := make(chan struct{}) + recvDone := make(chan struct{}) defer func() { - <-sendDone + // wait recv goroutine done + <-recvDone }() go func() { - defer close(sendDone) + defer close(recvDone) for { - var subscribe, unsubscribe []string - select { - case svc := <-d.svcCfgSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcCfgUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - case <-d.quit: + resp, err := stream.Recv() + if err != nil { + logger.Warnf("Recv failed: %v", err) return } - // batch - // TODO: limit the batch size - for { - select { - case svc := <-d.svcCfgSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcCfgUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - default: - goto SEND - } - } - - SEND: - req := &api.SvcConfigDiscoveryRequest{ - SvcNamesSubscribe: subscribe, - SvcNamesUnsubscribe: unsubscribe, + // TODO: validate resp + if d.svcCfgHook == nil { + continue } - err := stream.Send(req) - if err != nil { - logger.Warnf("Send failed: %v", err) - return + for svcName, svcConfig := range resp.Updated { + d.svcCfgHook(svcName, svcConfig) } } }() for { - resp, err := stream.Recv() - if err != nil { - logger.Warnf("Recv failed: %v", err) + var subscribe, unsubscribe []string + select { + case svc := <-d.svcCfgSubCh: + subscribe = append(subscribe, svc.Name) + case svc := <-d.svcCfgUnsubCh: + unsubscribe = append(unsubscribe, svc.Name) + case <-recvDone: return } - // TODO: validate resp + // batch + // TODO: limit the size + for { + select { + case svc := <-d.svcCfgSubCh: + subscribe = append(subscribe, svc.Name) + case svc := <-d.svcCfgUnsubCh: + unsubscribe = append(unsubscribe, svc.Name) + case <-recvDone: + return + default: + goto SEND + } + } - if d.svcCfgHook == nil { - continue + SEND: + req := &api.SvcConfigDiscoveryRequest{ + SvcNamesSubscribe: subscribe, + SvcNamesUnsubscribe: unsubscribe, } - for svcName, svcConfig := range resp.Updated { - d.svcCfgHook(svcName, svcConfig) + err := stream.Send(req) + if err != nil { + logger.Warnf("Send failed: %v", err) + return } } } func (d *dynamicSource) StreamSvcEndpoints() { + defer func() { + logger.Debugf("StreamSvcEndpoints done") + }() + for { d.streamSvcEndpoints(context.Background()) select { case <-d.quit: - logger.Debugf("StreamSvcEndpoints done") return default: } @@ -341,68 +357,70 @@ func (d *dynamicSource) StreamSvcEndpoints() { } func (d *dynamicSource) streamSvcEndpoints(ctx context.Context) { + // make the stream client stream, err := d.c.StreamSvcEndpoints(ctx) if err != nil { logger.Warnf("Fail to create stream client: %v", err) return } - sendDone := make(chan struct{}) + recvDone := make(chan struct{}) defer func() { - <-sendDone + // wait recv goroutine done + <-recvDone }() go func() { - defer close(sendDone) + defer close(recvDone) for { - var subscribe, unsubscribe []string - select { - case svc := <-d.svcEtSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcEtUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - case <-d.quit: + resp, err := stream.Recv() + if err != nil { + logger.Warnf("Recv failed: %v", err) return } - // batch - // TODO: limit the batch size - for { - select { - case svc := <-d.svcEtSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcEtUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - default: - goto SEND - } - } - - SEND: - req := &api.SvcEndpointDiscoveryRequest{ - SvcNamesSubscribe: subscribe, - SvcNamesUnsubscribe: unsubscribe, - } - err := stream.Send(req) - if err != nil { - logger.Warnf("Send failed: %v", err) - return + // TODO: validate resp + if d.svcEtHook != nil { + d.svcEtHook(resp.SvcName, resp.Added, resp.Removed) } } }() for { - resp, err := stream.Recv() - if err != nil { - logger.Warnf("Recv failed: %v", err) + var subscribe, unsubscribe []string + select { + case svc := <-d.svcEtSubCh: + subscribe = append(subscribe, svc.Name) + case svc := <-d.svcEtUnsubCh: + unsubscribe = append(unsubscribe, svc.Name) + case <-recvDone: return } - // TODO: validate resp + // batch + // TODO: limit the size + for { + select { + case svc := <-d.svcEtSubCh: + subscribe = append(subscribe, svc.Name) + case svc := <-d.svcEtUnsubCh: + unsubscribe = append(unsubscribe, svc.Name) + case <-recvDone: + return + default: + goto SEND + } + } - if d.svcEtHook == nil { - continue + SEND: + req := &api.SvcEndpointDiscoveryRequest{ + SvcNamesSubscribe: subscribe, + SvcNamesUnsubscribe: unsubscribe, + } + err := stream.Send(req) + if err != nil { + logger.Warnf("Send failed: %v", err) + return } - d.svcEtHook(resp.SvcName, resp.Added, resp.Removed) } } diff --git a/config/dynamic_test.go b/config/dynamic_test.go index 701d42d..2ae02be 100644 --- a/config/dynamic_test.go +++ b/config/dynamic_test.go @@ -17,7 +17,6 @@ package config import ( "context" "errors" - "fmt" "io" "testing" "time" @@ -66,7 +65,7 @@ func (r *rpcMsg) Matches(msg interface{}) bool { } func (r *rpcMsg) String() string { - return fmt.Sprintf("is %s", r.msg) + return r.msg.String() } func newTestInstance() *common.Instance { @@ -77,35 +76,50 @@ func newTestInstance() *common.Instance { } } +func makeDependencyDiscoveryResponse(added, removed []*service.Service) *api.DependencyDiscoveryResponse { + return &api.DependencyDiscoveryResponse{ + Added: added, + Removed: removed, + } +} + +func makeService(name string) *service.Service { + return &service.Service{ + Name: name, + } +} + func TestDynamicSourceStreamSvcs(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() b := new(bootstrap.Bootstrap) b.Instance = newTestInstance() - quit := make(chan struct{}) - // mock discovery service client and stream - req := &api.DependencyDiscoveryRequest{Instance: b.Instance} + addedSvcs := []*service.Service{makeService("foo")} + removedSvcs := []*service.Service{makeService("bar")} + + // mock stream stream := NewMockDiscoveryService_StreamDependenciesClient(ctrl) - addedSvcs := []*service.Service{ - {Name: "foo"}, - } - removedSvcs := []*service.Service{ - {Name: "bar"}, - } - stream.EXPECT().Recv().Return(&api.DependencyDiscoveryResponse{ - Added: addedSvcs, - Removed: removedSvcs, - }, nil) + streamQuitCh := make(chan struct{}) + abortStream := func() { close(streamQuitCh) } + recvTimes := 0 stream.EXPECT().Recv().DoAndReturn(func() (*api.DependencyDiscoveryResponse, error) { - <-quit + recvTimes++ + if recvTimes < 2 { + return makeDependencyDiscoveryResponse(addedSvcs, removedSvcs), nil + } + // wait the stream closed + <-streamQuitCh return nil, io.EOF - }) + }).Times(2) + + // mock client c := NewMockDiscoveryServiceClient(ctrl) + req := &api.DependencyDiscoveryRequest{Instance: b.Instance} c.EXPECT().StreamDependencies(gomock.Any(), &rpcMsg{msg: req}).Return(stream, nil) - // mock discovery service client facotory + // mock client facotory factory := newDiscoveryServiceClientFacotry(c, nil) rollback := mockNewDiscoveryServiceClient(factory) defer rollback() @@ -132,10 +146,8 @@ func TestDynamicSourceStreamSvcs(t *testing.T) { assert.Contains(t, removedSvcs, svc) } - // close the discovery service client - time.AfterFunc(time.Millisecond*100, func() { - close(quit) - }) + // abort stream + time.AfterFunc(time.Millisecond*100, abortStream) d.streamSvcs(context.Background()) // assert hooks @@ -144,29 +156,40 @@ func TestDynamicSourceStreamSvcs(t *testing.T) { assert.True(t, svcUnsubHookCalled) } +func makeSvcConfigDiscoveryResponse(configs map[string]*service.Config) *api.SvcConfigDiscoveryResponse { + return &api.SvcConfigDiscoveryResponse{ + Updated: configs, + } +} + func TestDynamicSourceStreamSvcConfigs(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - b := new(bootstrap.Bootstrap) - quit := make(chan struct{}) - - // mock discovery service client and stream + // mock stream stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + streamQuitCh := make(chan struct{}) + abortStream := func() { + close(streamQuitCh) + } + // mock send method req := &api.SvcConfigDiscoveryRequest{ SvcNamesSubscribe: []string{"foo"}, SvcNamesUnsubscribe: []string{"bar", "zoo"}, } stream.EXPECT().Send(&rpcMsg{msg: req}).Return(nil) - stream.EXPECT().Recv().Return(&api.SvcConfigDiscoveryResponse{ - Updated: map[string]*service.Config{ - "foo": nil, - }, - }, nil) + // mock recv method + recvTimes := 0 stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { - <-quit + recvTimes++ + if recvTimes < 2 { + return makeSvcConfigDiscoveryResponse(map[string]*service.Config{"foo": nil}), nil + } + <-streamQuitCh return nil, io.EOF - }) + }).Times(2) + + // mock client c := NewMockDiscoveryServiceClient(ctrl) c.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) @@ -175,6 +198,7 @@ func TestDynamicSourceStreamSvcConfigs(t *testing.T) { rollback := mockNewDiscoveryServiceClient(factory) defer rollback() + b := new(bootstrap.Bootstrap) d, err := newDynamicSource(b) assert.NoError(t, err) @@ -188,49 +212,70 @@ func TestDynamicSourceStreamSvcConfigs(t *testing.T) { d.unsubscribeSvc(&service.Service{Name: "zoo"}) // close the discovery service client - time.AfterFunc(time.Millisecond*100, func() { - close(quit) - close(d.quit) - }) + time.AfterFunc(time.Millisecond*100, abortStream) d.streamSvcConfigs(context.Background()) assert.True(t, hookCalled) } +func makeEndpoint(ip string, port uint32) *service.Endpoint { + return &service.Endpoint{ + Address: &common.Address{ + Ip: ip, + Port: port, + }, + } +} + +func makeSvcEndpointDiscoveryResponse(svcName string, added, removed []*service.Endpoint) *api.SvcEndpointDiscoveryResponse { + return &api.SvcEndpointDiscoveryResponse{ + SvcName: svcName, + Added: added, + Removed: removed, + } +} + func TestDynamicSourceStreamSvcEndpoints(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() b := new(bootstrap.Bootstrap) - quit := make(chan struct{}) + addedEndpoints := []*service.Endpoint{ + makeEndpoint("127.0.0.1", 8888), + makeEndpoint("127.0.0.1", 8889), + } + removedEndpoints := []*service.Endpoint{ + makeEndpoint("127.0.0.1", 9000), + makeEndpoint("127.0.0.1", 9001), + } - // mock discovery service client and stream + // mock stream stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + streamQuitCh := make(chan struct{}) + abortStream := func() { + close(streamQuitCh) + } + // mock send method req := &api.SvcEndpointDiscoveryRequest{ SvcNamesSubscribe: []string{"foo"}, SvcNamesUnsubscribe: []string{"bar", "zoo"}, } stream.EXPECT().Send(&rpcMsg{msg: req}).Return(nil) - addedEndpoints := []*service.Endpoint{ - {Address: &common.Address{Ip: "127.0.0.1", Port: 8888}}, - {Address: &common.Address{Ip: "127.0.0.1", Port: 8889}}, - } - removedEndpoints := []*service.Endpoint{ - {Address: &common.Address{Ip: "127.0.0.1", Port: 9000}}, - {Address: &common.Address{Ip: "127.0.0.1", Port: 9001}}, - } - stream.EXPECT().Recv().Return(&api.SvcEndpointDiscoveryResponse{ - SvcName: "foo", - Added: addedEndpoints, - Removed: removedEndpoints, - }, nil) + // mock recv method + recvTimes := 0 stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { - <-quit + recvTimes++ + if recvTimes < 2 { + return makeSvcEndpointDiscoveryResponse("foo", addedEndpoints, removedEndpoints), nil + } + <-streamQuitCh return nil, io.EOF - }) + }).Times(2) + + // mock client c := NewMockDiscoveryServiceClient(ctrl) c.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) - // mock discovery service client facotory + // mock client factory factory := newDiscoveryServiceClientFacotry(c, nil) rollback := mockNewDiscoveryServiceClient(factory) defer rollback() @@ -238,6 +283,7 @@ func TestDynamicSourceStreamSvcEndpoints(t *testing.T) { d, err := newDynamicSource(b) assert.NoError(t, err) + // register hook hookCalled := false d.SetSvcEndpointHook(func(svcName string, added, removed []*service.Endpoint) { hookCalled = true @@ -250,10 +296,7 @@ func TestDynamicSourceStreamSvcEndpoints(t *testing.T) { d.unsubscribeSvc(&service.Service{Name: "zoo"}) // close the discovery service client - time.AfterFunc(time.Millisecond*100, func() { - close(quit) - close(d.quit) - }) + time.AfterFunc(time.Millisecond*100, abortStream) d.streamSvcEndpoints(context.Background()) assert.True(t, hookCalled) } From 6de6fd8bfbf5e4f009202ba5cb2fc15f4a4078b6 Mon Sep 17 00:00:00 2001 From: kirk Date: Fri, 6 Dec 2019 10:38:03 +0800 Subject: [PATCH 2/4] examples: add tcp example (#15) * add tcp proxy example --- docs/src/start.md | 26 ++++++++++++++++++++++++++ examples/tcp/Dockerfile-httpserver | 6 ++++++ examples/tcp/README.md | 2 ++ examples/tcp/docker-compose.yaml | 20 ++++++++++++++++++++ examples/tcp/samaritan.yaml | 20 ++++++++++++++++++++ 5 files changed, 74 insertions(+) create mode 100644 examples/tcp/Dockerfile-httpserver create mode 100644 examples/tcp/docker-compose.yaml create mode 100644 examples/tcp/samaritan.yaml diff --git a/docs/src/start.md b/docs/src/start.md index c3e0caa..e802479 100644 --- a/docs/src/start.md +++ b/docs/src/start.md @@ -232,6 +232,30 @@ The communication way between them is grpc, and [here][discovery service message ## Examples +We have created some sandboxes using [Docker] and [Docker Compose] to show how to use Samaritan. To run them, please make sure you have docker and docker-compose installed. + +### TCP + +In this example, we show how to use Samaritan as a L4 proxy. + +**Step 1: Start all containers including http-server and proxy** + +```sh +$ git clone --depth 1 https://github.com/samaritan-proxy/samaritan.git /tmp/samaritan +$ cd /tmp/samaritan/examples/tcp +$ docker-compose up +``` + +**Step 2: View http://127.0.0.1 using curl or browser** + +```sh +$ curl http://127.0.0.1 +``` + +You could see the response: `hello, world!` + +### Redis + TBD @@ -240,6 +264,8 @@ TBD [example.com]: http://example.com [Prometheus]: https://prometheus.io [gRPC]: https://grpc.io +[Docker]: https://docs.docker.com/ +[Docker Compose]: https://docs.docker.com/compose/ [admin api]: diff --git a/examples/tcp/Dockerfile-httpserver b/examples/tcp/Dockerfile-httpserver new file mode 100644 index 0000000..b9a212d --- /dev/null +++ b/examples/tcp/Dockerfile-httpserver @@ -0,0 +1,6 @@ +FROM python:3-alpine + +WORKDIR ~/html +RUN echo "hello, world!" > index.html + +CMD ["python", "-m", "http.server"] diff --git a/examples/tcp/README.md b/examples/tcp/README.md index ca1cf52..1424d0c 100644 --- a/examples/tcp/README.md +++ b/examples/tcp/README.md @@ -1 +1,3 @@ # TCP example + +To learn how to run it, please read the [docs](https://samaritan-proxy.github.io/docs/start/#tcp) first. diff --git a/examples/tcp/docker-compose.yaml b/examples/tcp/docker-compose.yaml new file mode 100644 index 0000000..56b7ffa --- /dev/null +++ b/examples/tcp/docker-compose.yaml @@ -0,0 +1,20 @@ +version: "3.7" +services: + proxy: + image: samaritanproxy/samaritan:latest + volumes: + - ./samaritan.yaml:/etc/samaritan.yaml + ports: + - "80:80" + - "12345:12345" + networks: + - samaritan + + http-server: + build: + context: . + dockerfile: Dockerfile-httpserver + network_mode: "service:proxy" + +networks: + samaritan: {} diff --git a/examples/tcp/samaritan.yaml b/examples/tcp/samaritan.yaml new file mode 100644 index 0000000..85402eb --- /dev/null +++ b/examples/tcp/samaritan.yaml @@ -0,0 +1,20 @@ +admin: + bind: + ip: 0.0.0.0 + port: 12345 + +log: + level: INFO + +static_services: + - name: tcp-demo + config: + listener: + address: + ip: 0.0.0.0 + port: 80 + protocol: TCP + endpoints: + - address: + ip: 127.0.0.1 + port: 8000 From cb093afd48b54554c0072879ef2d75d60b539041 Mon Sep 17 00:00:00 2001 From: kirk Date: Fri, 6 Dec 2019 14:05:22 +0800 Subject: [PATCH 3/4] examples: add redis cluster example (#16) * add redis cluster exmaple --- docs/src/start.md | 31 +++++++++++++++- examples/redis/README.md | 2 + examples/redis/docker-compose.yaml | 59 ++++++++++++++++++++++++++++++ examples/redis/redis.conf | 5 +++ examples/redis/samaritan.yaml | 26 +++++++++++++ 5 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 examples/redis/docker-compose.yaml create mode 100644 examples/redis/redis.conf create mode 100644 examples/redis/samaritan.yaml diff --git a/docs/src/start.md b/docs/src/start.md index e802479..9023515 100644 --- a/docs/src/start.md +++ b/docs/src/start.md @@ -256,7 +256,33 @@ You could see the response: `hello, world!` ### Redis -TBD +In this example, we show how to use Samaritan as a redis cluster proxy. + +**Step1: Start all containers including redis-cluster and proxy** + +```sh +$ git clone --depth 1 https://github.com/samaritan-proxy/samaritan.git /tmp/samaritan +$ cd /tmp/samaritan/examples/redis +$ docker-compose up +``` + +**Step2: Issue commands using [redis-cli]** + +```sh +$ redis-cli set a 1 +OK +$ redis-cli get a +"1" +$ redis-cli mset a 1 b 2 +OK +$ redis-cli mget a b +1) "1" +2) "2" +$ redis-cli --scan +b +a +``` +All supported commands could found in [here][redis supported commands]. [release page]: https://github.com/samaritan-proxy/samaritan/releases @@ -266,6 +292,7 @@ TBD [gRPC]: https://grpc.io [Docker]: https://docs.docker.com/ [Docker Compose]: https://docs.docker.com/compose/ +[redis-cli]: https://redis.io/topics/rediscli [admin api]: @@ -276,3 +303,5 @@ TBD [static service message]: proto-ref.md#staticservice [dynamic source message]: proto-ref.md#bootstrap.ConfigSource [discovery service message]: proto-ref.md#discoveryservice + +[redis supported commands]: https://samaritan-proxy.github.io/docs/arch/protocol/redis/redis/#commands diff --git a/examples/redis/README.md b/examples/redis/README.md index f9e462d..9ae4bdd 100644 --- a/examples/redis/README.md +++ b/examples/redis/README.md @@ -1 +1,3 @@ # Redis example + +If want to run it, please read the [docs](https://samaritan-proxy.github.io/docs/start/#redis) first. diff --git a/examples/redis/docker-compose.yaml b/examples/redis/docker-compose.yaml new file mode 100644 index 0000000..c3fc0a3 --- /dev/null +++ b/examples/redis/docker-compose.yaml @@ -0,0 +1,59 @@ +version: "3.7" +services: + redis1: + image: redis:5-alpine + volumes: + - ./redis.conf:/etc/redis.conf + command: redis-server /etc/redis.conf + networks: + samaritan: + ipv4_address: 176.17.0.2 + redis2: + image: redis:5-alpine + volumes: + - ./redis.conf:/etc/redis.conf + command: redis-server /etc/redis.conf + networks: + samaritan: + ipv4_address: 176.17.0.3 + redis3: + image: redis:5-alpine + volumes: + - ./redis.conf:/etc/redis.conf + command: redis-server /etc/redis.conf + networks: + samaritan: + ipv4_address: 176.17.0.4 + redis-cluster-setup: + image: redis:5-alpine + command: sh -c " + sleep 5 + && yes yes | redis-cli + --cluster create 176.17.0.2:7000 176.17.0.3:7000 176.17.0.4:7000 + --cluster-replicas 0 + && redis-cli --cluster check 176.17.0.2:7000" + networks: + - samaritan + depends_on: + - redis1 + - redis2 + - redis3 + + proxy: + image: samaritanproxy/samaritan:latest + volumes: + - ./samaritan.yaml:/etc/samaritan.yaml + networks: + samaritan: + ipv4_address: 176.17.0.8 + ports: + - "12345:12345" + - "6379:6379" + +networks: + samaritan: + driver: bridge + ipam: + driver: default + config: + - subnet: 176.17.0.0/16 diff --git a/examples/redis/redis.conf b/examples/redis/redis.conf new file mode 100644 index 0000000..31937a5 --- /dev/null +++ b/examples/redis/redis.conf @@ -0,0 +1,5 @@ +port 7000 +cluster-enabled yes +cluster-config-file nodes.conf +cluster-node-timeout 5000 +appendonly yes diff --git a/examples/redis/samaritan.yaml b/examples/redis/samaritan.yaml new file mode 100644 index 0000000..99e90ef --- /dev/null +++ b/examples/redis/samaritan.yaml @@ -0,0 +1,26 @@ +admin: + bind: + ip: 0.0.0.0 + port: 12345 + +log: + level: INFO + +static_services: + - name: redis-demo + config: + listener: + address: + ip: 0.0.0.0 + port: 6379 + protocol: Redis + endpoints: + - address: + ip: 176.17.0.2 + port: 7000 + - address: + ip: 176.17.0.3 + port: 7000 + - address: + ip: 176.17.0.4 + port: 7000 From 1ea5b1801a2fcdc7fb9354fefcb58768188e90ba Mon Sep 17 00:00:00 2001 From: kirk Date: Mon, 9 Dec 2019 11:34:04 +0800 Subject: [PATCH 4/4] Resubscribe services when the gRPC stream terminated unexpectedly (#14) * add discovery client * refactor discovery * refactor unit tests --- config/config.go | 6 +- config/config_test.go | 18 +- config/discovery.go | 434 +++++++++++++++++++++++++++ config/discovery_test.go | 574 ++++++++++++++++++++++++++++++++++++ config/dynamic.go | 349 +++------------------- config/dynamic_test.go | 327 +++----------------- config/mock_dynamic_test.go | 24 +- go.sum | 2 - pb/api/discovery.pb.json.go | 44 +-- 9 files changed, 1112 insertions(+), 666 deletions(-) create mode 100644 config/discovery.go create mode 100644 config/discovery_test.go diff --git a/config/config.go b/config/config.go index 8d58803..755c091 100644 --- a/config/config.go +++ b/config/config.go @@ -18,9 +18,9 @@ import ( "encoding/json" "sync" + "github.com/samaritan-proxy/samaritan/logger" "github.com/samaritan-proxy/samaritan/pb/config/bootstrap" "github.com/samaritan-proxy/samaritan/pb/config/service" - "github.com/samaritan-proxy/samaritan/logger" ) type serviceWrapper struct { @@ -115,7 +115,7 @@ func (c *Config) initDynamic() error { return err } - d.SetSvcHook(c.handleSvcUpdate) + d.SetDependencyHook(c.handleDependencyUpdate) d.SetSvcConfigHook(c.handleSvcConfigUpdate) d.SetSvcEndpointHook(c.handleSvcEndpointUpdate) c.d = d @@ -144,7 +144,7 @@ func (c *Config) MarshalJSON() ([]byte, error) { return json.Marshal(res) } -func (c *Config) handleSvcUpdate(added, removed []*service.Service) { +func (c *Config) handleDependencyUpdate(added, removed []*service.Service) { c.Lock() defer c.Unlock() diff --git a/config/config_test.go b/config/config_test.go index 6665e99..3306577 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -24,8 +24,8 @@ import ( gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - "github.com/samaritan-proxy/samaritan/pb/config/bootstrap" "github.com/samaritan-proxy/samaritan/pb/common" + "github.com/samaritan-proxy/samaritan/pb/config/bootstrap" "github.com/samaritan-proxy/samaritan/pb/config/hc" "github.com/samaritan-proxy/samaritan/pb/config/protocol" "github.com/samaritan-proxy/samaritan/pb/config/service" @@ -166,7 +166,7 @@ func TestInitDynamic(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() d := NewMockDynamicSource(ctrl) - d.EXPECT().SetSvcHook(gomock.Any()) + d.EXPECT().SetDependencyHook(gomock.Any()) d.EXPECT().SetSvcConfigHook(gomock.Any()) d.EXPECT().SetSvcEndpointHook(gomock.Any()) d.EXPECT().Serve() @@ -180,7 +180,7 @@ func TestInitDynamic(t *testing.T) { time.Sleep(time.Millisecond * 100) // wait dynamic source serving. } -func TestHandleSvcUpdate(t *testing.T) { +func TestHandleDependencyUpdate(t *testing.T) { b := &bootstrap.Bootstrap{ Admin: &bootstrap.Admin{ Bind: &common.Address{ @@ -198,14 +198,14 @@ func TestHandleSvcUpdate(t *testing.T) { {Name: "foo"}, {Name: "bar"}, } - c.handleSvcUpdate(added, nil) + c.handleDependencyUpdate(added, nil) assert.Equal(t, 2, len(c.sws)) removed := []*service.Service{ {Name: "bar"}, {Name: "zoo"}, } - c.handleSvcUpdate(nil, removed) + c.handleDependencyUpdate(nil, removed) assert.Equal(t, 1, len(c.sws)) e := <-evtCh @@ -239,7 +239,7 @@ func TestHandleSvcConfigUpdate(t *testing.T) { addedSvcs := []*service.Service{ {Name: "foo"}, } - c.handleSvcUpdate(addedSvcs, nil) + c.handleDependencyUpdate(addedSvcs, nil) newCfg := new(service.Config) c.handleSvcConfigUpdate("foo", newCfg) @@ -252,7 +252,7 @@ func TestHandleSvcConfigUpdate(t *testing.T) { assert.NoError(t, err) evtCh := c.Subscribe() - c.handleSvcUpdate( + c.handleDependencyUpdate( []*service.Service{{Name: "foo"}}, nil, ) @@ -308,7 +308,7 @@ func TestHandleSvEndpointUpdate(t *testing.T) { addedSvcs := []*service.Service{ {Name: "foo"}, } - c.handleSvcUpdate(addedSvcs, nil) + c.handleDependencyUpdate(addedSvcs, nil) added := []*service.Endpoint{ {Address: &common.Address{Ip: "127.0.0.1", Port: 8888}}, @@ -323,7 +323,7 @@ func TestHandleSvEndpointUpdate(t *testing.T) { assert.NoError(t, err) evtCh := c.Subscribe() - c.handleSvcUpdate( + c.handleDependencyUpdate( []*service.Service{{Name: "foo"}}, nil, ) diff --git a/config/discovery.go b/config/discovery.go new file mode 100644 index 0000000..7c3c599 --- /dev/null +++ b/config/discovery.go @@ -0,0 +1,434 @@ +// Copyright 2019 Samaritan Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "context" + "math/rand" + "sync" + "time" + + "github.com/samaritan-proxy/samaritan/logger" + "github.com/samaritan-proxy/samaritan/pb/api" + "github.com/samaritan-proxy/samaritan/pb/common" + "github.com/samaritan-proxy/samaritan/pb/config/service" +) + +//go:generate mockgen -package $GOPACKAGE --destination ./mock_discovery_test.go $REPO_URI/pb/api DiscoveryServiceClient,DiscoveryService_StreamDependenciesClient,DiscoveryService_StreamSvcConfigsClient,DiscoveryService_StreamSvcEndpointsClient + +type discoveryClient struct { + dependency *dependencyDiscoveryClient + svcConfig *svcConfigDiscoveryClient + svcEndpoint *svcEndpointDiscoveryClient +} + +func newDiscoveryClient(stub api.DiscoveryServiceClient) *discoveryClient { + return &discoveryClient{ + dependency: newDependencyDiscoveryClient(stub), + svcConfig: newSvcConfigDiscoveryClient(stub), + svcEndpoint: newSvcEndpointDiscoveryClient(stub), + } +} + +func (c *discoveryClient) StreamDependencies(ctx context.Context, inst *common.Instance, hook dependencyHook) { + wrappedHook := func(added, removed []*service.Service) { + if hook != nil { + hook(added, removed) + } + // subscribe the added services + for _, svc := range added { + c.svcConfig.Subscribe(svc.Name) + c.svcEndpoint.Subscribe(svc.Name) + } + // unsubscribe the removed services + for _, svc := range removed { + c.svcConfig.Unsubscribe(svc.Name) + c.svcEndpoint.Unsubscribe(svc.Name) + } + } + impl := c.dependency + impl.SetHook(wrappedHook) + impl.Run(ctx, inst) +} + +func (c *discoveryClient) StreamSvcConfigs(ctx context.Context, hook svcConfigHook) { + impl := c.svcConfig + impl.SetHook(hook) + impl.Run(ctx) +} + +func (c *discoveryClient) StreamSvcEndpoints(ctx context.Context, hook svcEndpointHook) { + impl := c.svcEndpoint + impl.SetHook(hook) + impl.Run(ctx) +} + +type dependencyDiscoveryClient struct { + api.DiscoveryServiceClient + + hook func(added, removed []*service.Service) +} + +func newDependencyDiscoveryClient(client api.DiscoveryServiceClient) *dependencyDiscoveryClient { + return &dependencyDiscoveryClient{ + DiscoveryServiceClient: client, + } +} + +// SetHook must be called before Run. +func (c *dependencyDiscoveryClient) SetHook(hook dependencyHook) { + c.hook = hook +} + +func (c *dependencyDiscoveryClient) Run(ctx context.Context, inst *common.Instance) { + jitter := 0.2 + baseInterval := time.Second + for { + c.run(ctx, inst) + select { + case <-ctx.Done(): + return + default: + } + + interval := time.Duration((1 + (2*rand.Float64()-1)*jitter) * float64(baseInterval)) + logger.Warnf("The discovery loop of dependencies terminated unexpectedly, retry after %s", interval) + t := time.NewTimer(interval) + select { + case <-t.C: + case <-ctx.Done(): + return + } + } +} + +func (c *dependencyDiscoveryClient) run(ctx context.Context, inst *common.Instance) { + req := &api.DependencyDiscoveryRequest{Instance: inst} + stream, err := c.StreamDependencies(ctx, req) + if err != nil { + logger.Warnf("Fail to create dependencies discovery stream: %v", err) + return + } + + for { + resp, err := stream.Recv() + if err != nil { + logger.Warnf("Recv from dependencies discovery stream failed: %v", err) + return + } + + if c.hook != nil { + c.hook(resp.Added, resp.Removed) + } + } +} + +type svcDiscoveryStream interface { + Send(subscribed, unsubscribed []string) error + Recv() error +} + +type svcDiscoveryStreamMaker func(ctx context.Context) (svcDiscoveryStream, error) + +type svcConfigDiscoveryStream struct { + c *svcConfigDiscoveryClient + raw api.DiscoveryService_StreamSvcConfigsClient +} + +func (stream *svcConfigDiscoveryStream) Send(subscribed, unsubscribed []string) error { + req := &api.SvcConfigDiscoveryRequest{ + SvcNamesSubscribe: subscribed, + SvcNamesUnsubscribe: unsubscribed, + } + return stream.raw.Send(req) +} + +func (stream *svcConfigDiscoveryStream) Recv() error { + resp, err := stream.raw.Recv() + if err != nil { + return err + } + + if stream.c.hook == nil { + return nil + } + for svcName, svcConfig := range resp.Updated { + stream.c.hook(svcName, svcConfig) + } + return nil +} + +type svcConfigDiscoveryClient struct { + api.DiscoveryServiceClient + *svcDiscoveryClient + hook svcConfigHook +} + +func newSvcConfigDiscoveryClient(raw api.DiscoveryServiceClient) *svcConfigDiscoveryClient { + c := &svcConfigDiscoveryClient{ + DiscoveryServiceClient: raw, + } + impl := newSvcDiscoveryClient("config", c.newStream) + c.svcDiscoveryClient = impl + return c +} + +func (c *svcConfigDiscoveryClient) newStream(ctx context.Context) (svcDiscoveryStream, error) { + raw, err := c.StreamSvcConfigs(ctx) + if err != nil { + return nil, err + } + stream := &svcConfigDiscoveryStream{ + c: c, + raw: raw, + } + return stream, nil +} + +// SetHook must be called before Run. +func (c *svcConfigDiscoveryClient) SetHook(hook svcConfigHook) { + c.hook = hook +} + +type svcEndpointDiscoveryStream struct { + c *svcEndpointDiscoveryClient + raw api.DiscoveryService_StreamSvcEndpointsClient +} + +func (stream *svcEndpointDiscoveryStream) Send(subscribed, unsubscribed []string) error { + req := &api.SvcEndpointDiscoveryRequest{ + SvcNamesSubscribe: subscribed, + SvcNamesUnsubscribe: unsubscribed, + } + return stream.raw.Send(req) +} + +func (stream *svcEndpointDiscoveryStream) Recv() error { + resp, err := stream.raw.Recv() + if err != nil { + return err + } + if stream.c.hook != nil { + stream.c.hook(resp.SvcName, resp.Added, resp.Removed) + } + return nil +} + +type svcEndpointDiscoveryClient struct { + api.DiscoveryServiceClient + *svcDiscoveryClient + + hook svcEndpointHook +} + +func newSvcEndpointDiscoveryClient(raw api.DiscoveryServiceClient) *svcEndpointDiscoveryClient { + c := &svcEndpointDiscoveryClient{ + DiscoveryServiceClient: raw, + } + impl := newSvcDiscoveryClient("endpoint", c.newStream) + c.svcDiscoveryClient = impl + return c +} + +func (c *svcEndpointDiscoveryClient) newStream(ctx context.Context) (svcDiscoveryStream, error) { + raw, err := c.StreamSvcEndpoints(ctx) + if err != nil { + return nil, err + } + stream := &svcEndpointDiscoveryStream{ + c: c, + raw: raw, + } + return stream, nil +} + +// SetHook must be called before Run. +func (c *svcEndpointDiscoveryClient) SetHook(hook svcEndpointHook) { + c.hook = hook +} + +type svcDiscoveryClient struct { + sync.RWMutex + scope string + + subscribed map[string]struct{} + subCh chan string + unsubCh chan string + + newStream svcDiscoveryStreamMaker +} + +func newSvcDiscoveryClient(scope string, streamMaker svcDiscoveryStreamMaker) *svcDiscoveryClient { + return &svcDiscoveryClient{ + scope: scope, + subscribed: make(map[string]struct{}, 16), + subCh: make(chan string, 16), + unsubCh: make(chan string, 16), + newStream: streamMaker, + } +} + +func (c *svcDiscoveryClient) Subscribe(svcName string) { + c.Lock() + defer c.Unlock() + _, ok := c.subscribed[svcName] + if ok { + return + } + c.subscribed[svcName] = struct{}{} + c.subCh <- svcName +} + +func (c *svcDiscoveryClient) Unsubscribe(svcName string) { + c.Lock() + defer c.Unlock() + _, ok := c.subscribed[svcName] + if !ok { + return + } + delete(c.subscribed, svcName) + c.unsubCh <- svcName +} + +func (c *svcDiscoveryClient) Run(ctx context.Context) { + jitter := 0.2 + baseInterval := time.Second + for { + c.run(ctx) + select { + case <-ctx.Done(): + return + default: + } + + interval := time.Duration((1 + (2*rand.Float64()-1)*jitter) * float64(baseInterval)) + logger.Warnf("The discovery loop of service %s terminated unexpectedly, retry after %s", c.scope, interval) + t := time.NewTimer(interval) + select { + case <-t.C: + case <-ctx.Done(): + return + } + } +} + +func (c *svcDiscoveryClient) run(ctx context.Context) { + stream, err := c.newStream(ctx) + if err != nil { + logger.Warnf("Fail to create service %s discovery stream: %v", c.scope, err) + return + } + + // resubscribe the services. + if err := c.resubscribe(stream); err != nil { + logger.Warnf("Resubscribe services on %s discovery stream failed: %v", c.scope, err) + return + } + + recvDone := make(chan struct{}) + defer func() { + <-recvDone + }() + + go func() { + defer close(recvDone) + c.loopRecv(stream) + }() + c.loopSend(stream, recvDone) +} + +func (c *svcDiscoveryClient) resubscribe(stream svcDiscoveryStream) error { + c.RLock() + // load all subscribed services. + svcNames := make([]string, 0, len(c.subscribed)) + for svcName := range c.subscribed { + svcNames = append(svcNames, svcName) + } + // clean sub/unsub channel + c.cleanSubChLocked() + c.cleanUnsubChLocked() + c.RUnlock() + + // skip if no subscribed services. + if len(svcNames) == 0 { + return nil + } + + return stream.Send(svcNames, nil) +} + +func (c *svcDiscoveryClient) cleanSubChLocked() { + for { + select { + case <-c.subCh: + default: + return + } + } +} + +func (c *svcDiscoveryClient) cleanUnsubChLocked() { + for { + select { + case <-c.unsubCh: + default: + return + } + } +} + +func (c *svcDiscoveryClient) loopRecv(stream svcDiscoveryStream) { + for { + if err := stream.Recv(); err != nil { + logger.Warnf("Recv from service %s stream failed: %v", c.scope, err) + return + } + } +} + +func (c *svcDiscoveryClient) loopSend(stream svcDiscoveryStream, stop <-chan struct{}) { + for { + var subscribed, unsubscribed []string + select { + case svcName := <-c.subCh: + subscribed = append(subscribed, svcName) + case svcName := <-c.unsubCh: + unsubscribed = append(unsubscribed, svcName) + case <-stop: + return + } + + // batch + for { + select { + case svcName := <-c.subCh: + subscribed = append(subscribed, svcName) + case svcName := <-c.unsubCh: + unsubscribed = append(unsubscribed, svcName) + case <-stop: + return + default: + goto SEND + } + } + + SEND: + err := stream.Send(subscribed, unsubscribed) + if err != nil { + logger.Warnf("Send to service %s discovery stream failed: %v", c.scope, err) + return + } + } +} diff --git a/config/discovery_test.go b/config/discovery_test.go new file mode 100644 index 0000000..d3aee7c --- /dev/null +++ b/config/discovery_test.go @@ -0,0 +1,574 @@ +// Copyright 2019 Samaritan Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + gomock "github.com/golang/mock/gomock" + "github.com/samaritan-proxy/samaritan/pb/api" + "github.com/samaritan-proxy/samaritan/pb/common" + "github.com/samaritan-proxy/samaritan/pb/config/service" + "github.com/stretchr/testify/assert" +) + +type rpcMsg struct { + msg proto.Message +} + +func (r *rpcMsg) Matches(msg interface{}) bool { + m, ok := msg.(proto.Message) + if !ok { + return false + } + return proto.Equal(m, r.msg) +} + +func (r *rpcMsg) String() string { + return r.msg.String() +} + +func mockInstance() *common.Instance { + return &common.Instance{ + Id: "test", + Version: "1.0.0", + Belong: "sash", + } +} + +func TestDependencyDiscoveryClientCreateStreamFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamDependencies(gomock.Any(), gomock.Any()). + Return(nil, errors.New("internal error")) + c := newDependencyDiscoveryClient(stub) + + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx, mockInstance()) +} + +func TestDependencyDiscoveryClientRecvFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamDependenciesClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*api.DependencyDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamDependencies(gomock.Any(), gomock.Any()). + Return(stream, nil) + + c := newDependencyDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx, mockInstance()) +} + +func makeDependencyDiscoveryResponse(addedSvcNames, removedSvcNames []string) *api.DependencyDiscoveryResponse { + var added, removed []*service.Service + for _, svcName := range addedSvcNames { + added = append(added, &service.Service{Name: svcName}) + } + for _, svcName := range removedSvcNames { + removed = append(removed, &service.Service{Name: svcName}) + } + return &api.DependencyDiscoveryResponse{ + Added: added, + Removed: removed, + } +} + +func TestDependencyDiscoveryClientHook(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamDependenciesClient(ctrl) + recvCallTimes := 0 + stream.EXPECT().Recv().DoAndReturn(func() (*api.DependencyDiscoveryResponse, error) { + recvCallTimes++ + // first call + if recvCallTimes == 1 { + return makeDependencyDiscoveryResponse([]string{"foo"}, []string{"bar"}), nil + } + // second call + <-ctx.Done() + return nil, io.EOF + }).Times(2) + + stub := NewMockDiscoveryServiceClient(ctrl) + req := &api.DependencyDiscoveryRequest{Instance: mockInstance()} + stub.EXPECT().StreamDependencies(gomock.Any(), &rpcMsg{req}). + Return(stream, nil) + + added := make(map[string]struct{}) + removed := make(map[string]struct{}) + hook := func(addedSvcs, removedSvcs []*service.Service) { + for _, svc := range addedSvcs { + added[svc.Name] = struct{}{} + } + for _, svc := range removedSvcs { + removed[svc.Name] = struct{}{} + } + } + + c := newDependencyDiscoveryClient(stub) + c.SetHook(hook) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx, mockInstance()) + + // assert hook execution + assert.Contains(t, added, "foo") + assert.Contains(t, removed, "bar") +} + +func TestDependencyDiscoveryClientAutoRetry(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamDependenciesClient(ctrl) + recvCallTimes := 0 + stream.EXPECT().Recv().DoAndReturn(func() (*api.DependencyDiscoveryResponse, error) { + recvCallTimes++ + // first call + if recvCallTimes == 1 { + return nil, io.EOF + } + <-ctx.Done() + return nil, io.EOF + }).Times(2) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamDependencies(gomock.Any(), gomock.Any()). + Return(stream, nil).Times(2) + + c := newDependencyDiscoveryClient(stub) + time.AfterFunc(time.Second*2, cancel) + c.Run(ctx, mockInstance()) +} + +func TestSvcConfigDiscoveryClientCreateStreamFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcConfigs(gomock.Any(), gomock.Any()). + Return(nil, errors.New("internal error")) + c := newSvcConfigDiscoveryClient(stub) + + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) +} + +func TestSvcConfigDiscoveryClientRecvFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) + + c := newSvcConfigDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) +} + +func TestSvcConfigDiscoveryClientSendFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(*api.SvcConfigDiscoveryRequest) error { + cancel() + return io.ErrShortWrite + }) + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) + + c := newSvcConfigDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, func() { + c.Subscribe("foo") + }) + c.Run(ctx) +} + +func makeSvcConfigDiscoveryRequest(subscribed, unsubscribed []string) *api.SvcConfigDiscoveryRequest { + return &api.SvcConfigDiscoveryRequest{ + SvcNamesSubscribe: subscribed, + SvcNamesUnsubscribe: unsubscribed, + } +} + +func makeSvcConfigDiscoveryResponse(configs map[string]*service.Config) *api.SvcConfigDiscoveryResponse { + return &api.SvcConfigDiscoveryResponse{ + Updated: configs, + } +} + +func TestSvcConfigDiscoveryClientAutoResubscribe(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + stream.EXPECT().Send(&rpcMsg{makeSvcConfigDiscoveryRequest([]string{"foo"}, nil)}).Return(io.ErrShortWrite) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) + + c := newSvcConfigDiscoveryClient(stub) + c.Subscribe("foo") + c.Unsubscribe("bar") + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) +} + +func TestSvcConfigDiscoveryClientHook(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + recvCallTimes := 0 + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { + recvCallTimes++ + if recvCallTimes == 1 { + return makeSvcConfigDiscoveryResponse(map[string]*service.Config{"foo": nil}), nil + } + <-ctx.Done() + return nil, io.EOF + }).Times(2) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) + + updated := make(map[string]*service.Config) + hook := func(svcName string, svcConfig *service.Config) { + updated[svcName] = svcConfig + } + + c := newSvcConfigDiscoveryClient(stub) + c.SetHook(hook) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) + + // assert hook execution + assert.Contains(t, updated, "foo") +} + +func TestSvcConfigDiscoveryClientSubAndUnsub(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + stream.EXPECT().Send(&rpcMsg{makeSvcConfigDiscoveryRequest([]string{"foo"}, nil)}).Return(nil) + stream.EXPECT().Send(&rpcMsg{makeSvcConfigDiscoveryRequest(nil, []string{"foo"})}).Return(nil) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) + + c := newSvcConfigDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, func() { + c.Subscribe("foo") + time.Sleep(time.Millisecond * 100) + c.Unsubscribe("foo") + time.Sleep(time.Millisecond * 100) + cancel() + }) + c.Run(ctx) +} +func TestSvcEndpointDiscoveryClientCreateStreamFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcEndpoints(gomock.Any(), gomock.Any()). + Return(nil, io.ErrUnexpectedEOF) + c := newSvcEndpointDiscoveryClient(stub) + + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) +} + +func TestSvcEndpointDiscoveryClientRecvFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) + + c := newSvcEndpointDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) +} + +func TestSvcEndpointDiscoveryClientSendFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(*api.SvcEndpointDiscoveryRequest) error { + cancel() + return io.ErrShortWrite + }) + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) + + c := newSvcEndpointDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, func() { + c.Subscribe("foo") + }) + c.Run(ctx) +} + +func makeSvcEndpointDiscoveryRequest(subscribed, unsubscribed []string) *api.SvcEndpointDiscoveryRequest { + return &api.SvcEndpointDiscoveryRequest{ + SvcNamesSubscribe: subscribed, + SvcNamesUnsubscribe: unsubscribed, + } +} + +func makeEndpoint(ip string, port uint32) *service.Endpoint { + return &service.Endpoint{ + Address: &common.Address{ + Ip: ip, + Port: port, + }, + } +} + +func makeSvcEndpointDiscoveryResponse(svcName string, added, removed []*service.Endpoint) *api.SvcEndpointDiscoveryResponse { + return &api.SvcEndpointDiscoveryResponse{ + SvcName: svcName, + Added: added, + Removed: removed, + } +} + +func TestSvcEndpointDiscoveryClientAutoResubscribe(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + stream.EXPECT().Send(&rpcMsg{makeSvcEndpointDiscoveryRequest([]string{"foo"}, nil)}).Return(io.ErrShortWrite) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) + + c := newSvcEndpointDiscoveryClient(stub) + c.Subscribe("foo") + c.Unsubscribe("bar") + ctx, cancel := context.WithCancel(context.Background()) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) +} + +func TestSvcEndpointDiscoveryClientHook(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + recvCallTimes := 0 + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { + recvCallTimes++ + if recvCallTimes == 1 { + resp := makeSvcEndpointDiscoveryResponse( + "foo", + []*service.Endpoint{makeEndpoint("127.0.0.1", 8888)}, + []*service.Endpoint{makeEndpoint("127.0.0.1", 9999)}, + ) + return resp, nil + } + <-ctx.Done() + return nil, io.EOF + }).Times(2) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) + + var added, removed map[string]struct{} + hook := func(svcName string, addedEts, removedEts []*service.Endpoint) { + toMap := func(ets []*service.Endpoint) map[string]struct{} { + m := make(map[string]struct{}) + for _, et := range ets { + addr := fmt.Sprintf("%s:%d", et.Address.Ip, et.Address.Port) + m[addr] = struct{}{} + } + return m + } + added = toMap(addedEts) + removed = toMap(removedEts) + } + + c := newSvcEndpointDiscoveryClient(stub) + c.SetHook(hook) + time.AfterFunc(time.Millisecond*100, cancel) + c.Run(ctx) + + // assert hook execution + assert.Equal(t, 1, len(added)) + assert.Contains(t, added, "127.0.0.1:8888") + assert.Equal(t, 1, len(removed)) + assert.Contains(t, removed, "127.0.0.1:9999") +} + +func TestSvcEndpointDiscoveryClientSubAndUnsub(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + + stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + stream.EXPECT().Send(&rpcMsg{makeSvcEndpointDiscoveryRequest([]string{"foo"}, nil)}).Return(nil) + stream.EXPECT().Send(&rpcMsg{makeSvcEndpointDiscoveryRequest(nil, []string{"foo"})}).Return(nil) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) + + c := newSvcEndpointDiscoveryClient(stub) + time.AfterFunc(time.Millisecond*100, func() { + c.Subscribe("foo") + time.Sleep(time.Millisecond * 100) + c.Unsubscribe("foo") + time.Sleep(time.Millisecond * 100) + cancel() + }) + c.Run(ctx) +} + +func TestDiscoveryClient(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancel := context.WithCancel(context.Background()) + + dependencyStream := NewMockDiscoveryService_StreamDependenciesClient(ctrl) + recvCallTimes := 0 + dependencyStream.EXPECT().Recv().DoAndReturn(func() (*api.DependencyDiscoveryResponse, error) { + recvCallTimes++ + switch recvCallTimes { + case 1: + time.Sleep(time.Millisecond * 50) + return makeDependencyDiscoveryResponse([]string{"foo"}, nil), nil + case 2: + time.Sleep(time.Millisecond * 50) + return makeDependencyDiscoveryResponse(nil, []string{"foo"}), nil + default: + <-ctx.Done() + return nil, io.EOF + } + }).Times(3) + + svcConfigStream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) + svcConfigStream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + svcConfigStream.EXPECT().Send(&rpcMsg{makeSvcConfigDiscoveryRequest([]string{"foo"}, nil)}).Return(nil) + svcConfigStream.EXPECT().Send(&rpcMsg{makeSvcConfigDiscoveryRequest(nil, []string{"foo"})}).Return(nil) + + svcEndpointStream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) + svcEndpointStream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { + <-ctx.Done() + return nil, io.EOF + }) + svcEndpointStream.EXPECT().Send(&rpcMsg{makeSvcEndpointDiscoveryRequest([]string{"foo"}, nil)}).Return(nil) + svcEndpointStream.EXPECT().Send(&rpcMsg{makeSvcEndpointDiscoveryRequest(nil, []string{"foo"})}).Return(nil) + + stub := NewMockDiscoveryServiceClient(ctrl) + stub.EXPECT().StreamDependencies(gomock.Any(), gomock.Any()).Return(dependencyStream, nil) + stub.EXPECT().StreamSvcConfigs(gomock.Any()).Return(svcConfigStream, nil) + stub.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(svcEndpointStream, nil) + + var dependHookCalled bool + dependHook := func(added, removed []*service.Service) { dependHookCalled = true } + + c := newDiscoveryClient(stub) + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + c.StreamDependencies(ctx, mockInstance(), dependHook) + }() + go func() { + defer wg.Done() + c.StreamSvcConfigs(ctx, nil) + }() + go func() { + defer wg.Done() + c.StreamSvcEndpoints(ctx, nil) + }() + + time.AfterFunc(time.Second, cancel) + wg.Wait() + + // assert hook execution + assert.True(t, dependHookCalled) +} diff --git a/config/dynamic.go b/config/dynamic.go index ba8966e..3d519f6 100644 --- a/config/dynamic.go +++ b/config/dynamic.go @@ -17,34 +17,31 @@ package config // FIXME: mockgen can't handle cycle imports in reflect mode when outside of GOPATH currently, // so add self_package parameter temporarily. Refer to: https://github.com/golang/mock/issues/310 //go:generate mockgen -package $GOPACKAGE -self_package $REPO_URI/$GOPACKAGE --destination ./mock_dynamic_test.go $REPO_URI/$GOPACKAGE DynamicSource -//go:generate mockgen -package $GOPACKAGE --destination ./mock_discovery_test.go $REPO_URI/pb/api DiscoveryServiceClient,DiscoveryService_StreamDependenciesClient,DiscoveryService_StreamSvcConfigsClient,DiscoveryService_StreamSvcEndpointsClient import ( "context" "sync" "time" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - - "github.com/samaritan-proxy/samaritan/logger" "github.com/samaritan-proxy/samaritan/pb/api" "github.com/samaritan-proxy/samaritan/pb/config/bootstrap" "github.com/samaritan-proxy/samaritan/pb/config/service" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) var _ DynamicSource = new(dynamicSource) -type svcHook func(added, removed []*service.Service) +type dependencyHook func(added, removed []*service.Service) type svcConfigHook func(svcName string, newCfg *service.Config) type svcEndpointHook func(svcName string, added, removed []*service.Endpoint) // DynamicSource represents the dynamic config source. type DynamicSource interface { - // SetSvcHook sets a hook which will be called - // when a service is added or removed. It must be + // SetDependencyHook sets a hook which will be called + // when a dependency is added or removed. It must be // called before Serve. - SetSvcHook(hook svcHook) + SetDependencyHook(hook dependencyHook) // SetSvcConfigHook sets a hook which wil be called // when the proxy config of subscribed service update. // It must be called before Serve. @@ -62,19 +59,12 @@ type DynamicSource interface { } type dynamicSource struct { - b *bootstrap.Bootstrap - c api.DiscoveryServiceClient - cShutdown func() error + b *bootstrap.Bootstrap - svcCfgSubCh chan *service.Service - svcEtSubCh chan *service.Service - svcSubHook func(*service.Service) // it's only used for testing. + conn *grpc.ClientConn + c *discoveryClient - svcCfgUnsubCh chan *service.Service - svcEtUnsubCh chan *service.Service - svcUnsubHook func(*service.Service) // it's only used for testing. - - svcHook svcHook + dependHook dependencyHook svcCfgHook svcConfigHook svcEtHook svcEndpointHook @@ -82,8 +72,18 @@ type dynamicSource struct { done chan struct{} } -var newDiscoveryServiceClient = func(cfg *bootstrap.ConfigSource) (c api.DiscoveryServiceClient, shutdown func() error, err error) { - target := cfg.Endpoint +func newDynamicSource(b *bootstrap.Bootstrap) (*dynamicSource, error) { + d := &dynamicSource{ + b: b, + quit: make(chan struct{}), + done: make(chan struct{}), + } + err := d.initDiscoveryClient() + return d, err +} + +func (d *dynamicSource) initDiscoveryClient() error { + target := d.b.DynamicSourceConfig.Endpoint // TODO: support Authentication options := []grpc.DialOption{ grpc.WithInsecure(), @@ -92,35 +92,19 @@ var newDiscoveryServiceClient = func(cfg *bootstrap.ConfigSource) (c api.Discove Timeout: time.Second * 10, }), } - cc, err := grpc.Dial(target, options...) + conn, err := grpc.Dial(target, options...) if err != nil { - return nil, nil, err + return err } - c = api.NewDiscoveryServiceClient(cc) - return c, cc.Close, nil -} -func newDynamicSource(b *bootstrap.Bootstrap) (*dynamicSource, error) { - c, shutdown, err := newDiscoveryServiceClient(b.DynamicSourceConfig) - if err != nil { - return nil, err - } - d := &dynamicSource{ - b: b, - c: c, - cShutdown: shutdown, - svcCfgSubCh: make(chan *service.Service, 16), - svcEtSubCh: make(chan *service.Service, 16), - svcCfgUnsubCh: make(chan *service.Service, 16), - svcEtUnsubCh: make(chan *service.Service, 16), - quit: make(chan struct{}), - done: make(chan struct{}), - } - return d, nil + d.conn = conn + stub := api.NewDiscoveryServiceClient(conn) + d.c = newDiscoveryClient(stub) + return nil } -func (d *dynamicSource) SetSvcHook(hook svcHook) { - d.svcHook = hook +func (d *dynamicSource) SetDependencyHook(hook dependencyHook) { + d.dependHook = hook } func (d *dynamicSource) SetSvcConfigHook(hook svcConfigHook) { @@ -134,27 +118,29 @@ func (d *dynamicSource) SetSvcEndpointHook(hook svcEndpointHook) { func (d *dynamicSource) Serve() { var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) wg.Add(1) go func() { defer wg.Done() - d.StreamSvcs() + d.c.StreamDependencies(ctx, d.b.Instance, d.dependHook) }() wg.Add(1) go func() { defer wg.Done() - d.StreamSvcConfigs() + d.c.StreamSvcConfigs(ctx, d.svcCfgHook) }() wg.Add(1) go func() { defer wg.Done() - d.StreamSvcEndpoints() + d.c.StreamSvcEndpoints(ctx, d.svcEtHook) }() <-d.quit - // shutdown the discovery service client - d.cShutdown() + cancel() + // close grpc client conn + d.conn.Close() wg.Wait() close(d.done) } @@ -163,264 +149,3 @@ func (d *dynamicSource) Stop() { close(d.quit) <-d.done } - -func (d *dynamicSource) StreamSvcs() { - defer logger.Debugf("StreamSvcs done") - for { - d.streamSvcs(context.Background()) - select { - case <-d.quit: - return - default: - } - logger.Warnf("StreamSvcs failed, retrying...") - - // TODO: exponential backoff - t := time.NewTimer(time.Millisecond * 100) - select { - case <-t.C: - case <-d.quit: - return - } - } -} - -func (d *dynamicSource) streamSvcs(ctx context.Context) { - req := &api.DependencyDiscoveryRequest{ - Instance: d.b.Instance, - } - stream, err := d.c.StreamDependencies(ctx, req) - if err != nil { - logger.Warnf("Fail to create stream client: %v", err) - return - } - - for { - resp, err := stream.Recv() - if err != nil { - logger.Warnf("Recv failed: %v", err) - return - } - - // TODO: validate resp - if d.svcHook != nil { - d.svcHook(resp.Added, resp.Removed) - } - - // subscribe - for _, svc := range resp.Added { - d.subscribeSvc(svc) - } - // unsubscribe - for _, svc := range resp.Removed { - d.unsubscribeSvc(svc) - } - } -} - -// subscribeSvc subscribes the config and endpoint change of specified service. -func (d *dynamicSource) subscribeSvc(svc *service.Service) { - d.svcCfgSubCh <- svc - d.svcEtSubCh <- svc - if d.svcSubHook != nil { - d.svcSubHook(svc) - } -} - -// unsubscribeSvc unsubscribes the config and endpoint change of specified service. -func (d *dynamicSource) unsubscribeSvc(svc *service.Service) { - d.svcCfgUnsubCh <- svc - d.svcEtUnsubCh <- svc - if d.svcUnsubHook != nil { - d.svcUnsubHook(svc) - } -} - -func (d *dynamicSource) StreamSvcConfigs() { - defer func() { - logger.Debugf("StreamSvcConfigs done") - }() - - for { - d.streamSvcConfigs(context.Background()) - select { - case <-d.quit: - return - default: - } - logger.Warnf("StreamSvcConfigs failed, retrying...") - - // TODO: exponential backoff - t := time.NewTimer(time.Millisecond * 100) - select { - case <-t.C: - case <-d.quit: - return - } - } -} - -func (d *dynamicSource) streamSvcConfigs(ctx context.Context) { - // create stream client - stream, err := d.c.StreamSvcConfigs(ctx) - if err != nil { - logger.Warnf("Fail to create stream client: %v", err) - return - } - - recvDone := make(chan struct{}) - defer func() { - // wait recv goroutine done - <-recvDone - }() - - go func() { - defer close(recvDone) - for { - resp, err := stream.Recv() - if err != nil { - logger.Warnf("Recv failed: %v", err) - return - } - - // TODO: validate resp - if d.svcCfgHook == nil { - continue - } - for svcName, svcConfig := range resp.Updated { - d.svcCfgHook(svcName, svcConfig) - } - } - }() - - for { - var subscribe, unsubscribe []string - select { - case svc := <-d.svcCfgSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcCfgUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - case <-recvDone: - return - } - - // batch - // TODO: limit the size - for { - select { - case svc := <-d.svcCfgSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcCfgUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - case <-recvDone: - return - default: - goto SEND - } - } - - SEND: - req := &api.SvcConfigDiscoveryRequest{ - SvcNamesSubscribe: subscribe, - SvcNamesUnsubscribe: unsubscribe, - } - err := stream.Send(req) - if err != nil { - logger.Warnf("Send failed: %v", err) - return - } - } -} - -func (d *dynamicSource) StreamSvcEndpoints() { - defer func() { - logger.Debugf("StreamSvcEndpoints done") - }() - - for { - d.streamSvcEndpoints(context.Background()) - select { - case <-d.quit: - return - default: - } - logger.Warnf("StreamSvcEndpoints failed, retrying...") - - // TODO: exponential backoff - t := time.NewTimer(time.Millisecond * 100) - select { - case <-t.C: - case <-d.quit: - return - } - } -} - -func (d *dynamicSource) streamSvcEndpoints(ctx context.Context) { - // make the stream client - stream, err := d.c.StreamSvcEndpoints(ctx) - if err != nil { - logger.Warnf("Fail to create stream client: %v", err) - return - } - - recvDone := make(chan struct{}) - defer func() { - // wait recv goroutine done - <-recvDone - }() - - go func() { - defer close(recvDone) - for { - resp, err := stream.Recv() - if err != nil { - logger.Warnf("Recv failed: %v", err) - return - } - - // TODO: validate resp - if d.svcEtHook != nil { - d.svcEtHook(resp.SvcName, resp.Added, resp.Removed) - } - } - }() - - for { - var subscribe, unsubscribe []string - select { - case svc := <-d.svcEtSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcEtUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - case <-recvDone: - return - } - - // batch - // TODO: limit the size - for { - select { - case svc := <-d.svcEtSubCh: - subscribe = append(subscribe, svc.Name) - case svc := <-d.svcEtUnsubCh: - unsubscribe = append(unsubscribe, svc.Name) - case <-recvDone: - return - default: - goto SEND - } - } - - SEND: - req := &api.SvcEndpointDiscoveryRequest{ - SvcNamesSubscribe: subscribe, - SvcNamesUnsubscribe: unsubscribe, - } - err := stream.Send(req) - if err != nil { - logger.Warnf("Send failed: %v", err) - return - } - } -} diff --git a/config/dynamic_test.go b/config/dynamic_test.go index 2ae02be..2b069cf 100644 --- a/config/dynamic_test.go +++ b/config/dynamic_test.go @@ -15,325 +15,70 @@ package config import ( - "context" - "errors" - "io" + "net" "testing" "time" - "github.com/gogo/protobuf/proto" - gomock "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" + "google.golang.org/grpc" "github.com/samaritan-proxy/samaritan/pb/api" - "github.com/samaritan-proxy/samaritan/pb/common" "github.com/samaritan-proxy/samaritan/pb/config/bootstrap" - "github.com/samaritan-proxy/samaritan/pb/config/service" ) -type discoveryServiceClientFactory func(*bootstrap.ConfigSource) (api.DiscoveryServiceClient, func() error, error) +type mockDiscoveryServiceServer struct{} -func newDiscoveryServiceClientFacotry(c api.DiscoveryServiceClient, shutdown func() error) discoveryServiceClientFactory { - if shutdown == nil { - shutdown = func() error { - return nil - } - } - return func(*bootstrap.ConfigSource) (api.DiscoveryServiceClient, func() error, error) { - return c, shutdown, nil - } +func newMockDiscoveryServiceServer() api.DiscoveryServiceServer { + return &mockDiscoveryServiceServer{} } -func mockNewDiscoveryServiceClient(factory discoveryServiceClientFactory) (rollback func()) { - oldFactory := newDiscoveryServiceClient - newDiscoveryServiceClient = factory - return func() { - newDiscoveryServiceClient = oldFactory - } +func (s *mockDiscoveryServiceServer) StreamDependencies(_ *api.DependencyDiscoveryRequest, stream api.DiscoveryService_StreamDependenciesServer) error { + // make it block + err := stream.RecvMsg(new(api.DependencyDiscoveryRequest)) + return err } -type rpcMsg struct { - msg proto.Message +func (s *mockDiscoveryServiceServer) StreamSvcConfigs(stream api.DiscoveryService_StreamSvcConfigsServer) error { + // make it block + _, err := stream.Recv() + return err } -func (r *rpcMsg) Matches(msg interface{}) bool { - m, ok := msg.(proto.Message) - if !ok { - return false - } - return proto.Equal(m, r.msg) -} - -func (r *rpcMsg) String() string { - return r.msg.String() -} - -func newTestInstance() *common.Instance { - return &common.Instance{ - Id: "test", - Version: "1.0.0", - Belong: "sam", - } +func (s *mockDiscoveryServiceServer) StreamSvcEndpoints(stream api.DiscoveryService_StreamSvcEndpointsServer) error { + // make it block + _, err := stream.Recv() + return err } -func makeDependencyDiscoveryResponse(added, removed []*service.Service) *api.DependencyDiscoveryResponse { - return &api.DependencyDiscoveryResponse{ - Added: added, - Removed: removed, +func TestDynamicSourceServe(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) } -} - -func makeService(name string) *service.Service { - return &service.Service{ - Name: name, - } -} - -func TestDynamicSourceStreamSvcs(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - b := new(bootstrap.Bootstrap) - b.Instance = newTestInstance() - - addedSvcs := []*service.Service{makeService("foo")} - removedSvcs := []*service.Service{makeService("bar")} - - // mock stream - stream := NewMockDiscoveryService_StreamDependenciesClient(ctrl) - streamQuitCh := make(chan struct{}) - abortStream := func() { close(streamQuitCh) } - recvTimes := 0 - stream.EXPECT().Recv().DoAndReturn(func() (*api.DependencyDiscoveryResponse, error) { - recvTimes++ - if recvTimes < 2 { - return makeDependencyDiscoveryResponse(addedSvcs, removedSvcs), nil - } - // wait the stream closed - <-streamQuitCh - return nil, io.EOF - }).Times(2) + defer l.Close() - // mock client - c := NewMockDiscoveryServiceClient(ctrl) - req := &api.DependencyDiscoveryRequest{Instance: b.Instance} - c.EXPECT().StreamDependencies(gomock.Any(), &rpcMsg{msg: req}).Return(stream, nil) - - // mock client facotory - factory := newDiscoveryServiceClientFacotry(c, nil) - rollback := mockNewDiscoveryServiceClient(factory) - defer rollback() - - d, err := newDynamicSource(b) - assert.NoError(t, err) - - // register hooks - svcHookCalled := false - d.SetSvcHook(func(added, removed []*service.Service) { - assert.Equal(t, added, addedSvcs) - assert.Equal(t, removed, removedSvcs) - svcHookCalled = true - }) - - svcSubHookCalled := false - svcUnsubHookCalled := false - d.svcSubHook = func(svc *service.Service) { - svcSubHookCalled = true - assert.Contains(t, addedSvcs, svc) - } - d.svcUnsubHook = func(svc *service.Service) { - svcUnsubHookCalled = true - assert.Contains(t, removedSvcs, svc) - } - - // abort stream - time.AfterFunc(time.Millisecond*100, abortStream) - d.streamSvcs(context.Background()) - - // assert hooks - assert.True(t, svcHookCalled) - assert.True(t, svcSubHookCalled) - assert.True(t, svcUnsubHookCalled) -} - -func makeSvcConfigDiscoveryResponse(configs map[string]*service.Config) *api.SvcConfigDiscoveryResponse { - return &api.SvcConfigDiscoveryResponse{ - Updated: configs, - } -} - -func TestDynamicSourceStreamSvcConfigs(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() + s := grpc.NewServer() + api.RegisterDiscoveryServiceServer(s, newMockDiscoveryServiceServer()) + go func() { + s.Serve(l) //nolint:errcheck + }() - // mock stream - stream := NewMockDiscoveryService_StreamSvcConfigsClient(ctrl) - streamQuitCh := make(chan struct{}) - abortStream := func() { - close(streamQuitCh) + config := &bootstrap.ConfigSource{} + config.Endpoint = l.Addr().String() + b := &bootstrap.Bootstrap{ + Instance: mockInstance(), + DynamicSourceConfig: config, } - // mock send method - req := &api.SvcConfigDiscoveryRequest{ - SvcNamesSubscribe: []string{"foo"}, - SvcNamesUnsubscribe: []string{"bar", "zoo"}, - } - stream.EXPECT().Send(&rpcMsg{msg: req}).Return(nil) - // mock recv method - recvTimes := 0 - stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcConfigDiscoveryResponse, error) { - recvTimes++ - if recvTimes < 2 { - return makeSvcConfigDiscoveryResponse(map[string]*service.Config{"foo": nil}), nil - } - <-streamQuitCh - return nil, io.EOF - }).Times(2) - - // mock client - c := NewMockDiscoveryServiceClient(ctrl) - c.EXPECT().StreamSvcConfigs(gomock.Any()).Return(stream, nil) - - // mock discovery service client facotory - factory := newDiscoveryServiceClientFacotry(c, nil) - rollback := mockNewDiscoveryServiceClient(factory) - defer rollback() - b := new(bootstrap.Bootstrap) d, err := newDynamicSource(b) - assert.NoError(t, err) - - hookCalled := false - d.SetSvcConfigHook(func(svcName string, newCfg *service.Config) { - hookCalled = true - assert.Equal(t, "foo", svcName) - }) - d.subscribeSvc(&service.Service{Name: "foo"}) - d.unsubscribeSvc(&service.Service{Name: "bar"}) - d.unsubscribeSvc(&service.Service{Name: "zoo"}) - - // close the discovery service client - time.AfterFunc(time.Millisecond*100, abortStream) - d.streamSvcConfigs(context.Background()) - assert.True(t, hookCalled) -} - -func makeEndpoint(ip string, port uint32) *service.Endpoint { - return &service.Endpoint{ - Address: &common.Address{ - Ip: ip, - Port: port, - }, + if err != nil { + t.Fatal(err) } -} - -func makeSvcEndpointDiscoveryResponse(svcName string, added, removed []*service.Endpoint) *api.SvcEndpointDiscoveryResponse { - return &api.SvcEndpointDiscoveryResponse{ - SvcName: svcName, - Added: added, - Removed: removed, - } -} - -func TestDynamicSourceStreamSvcEndpoints(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - b := new(bootstrap.Bootstrap) - addedEndpoints := []*service.Endpoint{ - makeEndpoint("127.0.0.1", 8888), - makeEndpoint("127.0.0.1", 8889), - } - removedEndpoints := []*service.Endpoint{ - makeEndpoint("127.0.0.1", 9000), - makeEndpoint("127.0.0.1", 9001), - } - - // mock stream - stream := NewMockDiscoveryService_StreamSvcEndpointsClient(ctrl) - streamQuitCh := make(chan struct{}) - abortStream := func() { - close(streamQuitCh) - } - // mock send method - req := &api.SvcEndpointDiscoveryRequest{ - SvcNamesSubscribe: []string{"foo"}, - SvcNamesUnsubscribe: []string{"bar", "zoo"}, - } - stream.EXPECT().Send(&rpcMsg{msg: req}).Return(nil) - // mock recv method - recvTimes := 0 - stream.EXPECT().Recv().DoAndReturn(func() (*api.SvcEndpointDiscoveryResponse, error) { - recvTimes++ - if recvTimes < 2 { - return makeSvcEndpointDiscoveryResponse("foo", addedEndpoints, removedEndpoints), nil - } - <-streamQuitCh - return nil, io.EOF - }).Times(2) - - // mock client - c := NewMockDiscoveryServiceClient(ctrl) - c.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(stream, nil) - - // mock client factory - factory := newDiscoveryServiceClientFacotry(c, nil) - rollback := mockNewDiscoveryServiceClient(factory) - defer rollback() - - d, err := newDynamicSource(b) - assert.NoError(t, err) - - // register hook - hookCalled := false - d.SetSvcEndpointHook(func(svcName string, added, removed []*service.Endpoint) { - hookCalled = true - assert.Equal(t, "foo", svcName) - assert.Equal(t, addedEndpoints, added) - assert.Equal(t, removedEndpoints, removed) - }) - d.subscribeSvc(&service.Service{Name: "foo"}) - d.unsubscribeSvc(&service.Service{Name: "bar"}) - d.unsubscribeSvc(&service.Service{Name: "zoo"}) - - // close the discovery service client - time.AfterFunc(time.Millisecond*100, abortStream) - d.streamSvcEndpoints(context.Background()) - assert.True(t, hookCalled) -} - -func TestStopDynamicSource(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - // mock - c := NewMockDiscoveryServiceClient(ctrl) - err := errors.New("internal error") - c.EXPECT().StreamDependencies(gomock.Any(), gomock.Any()).Return(nil, err).AnyTimes() - c.EXPECT().StreamSvcConfigs(gomock.Any()).Return(nil, err).AnyTimes() - c.EXPECT().StreamSvcEndpoints(gomock.Any()).Return(nil, err).AnyTimes() - - factory := newDiscoveryServiceClientFacotry(c, nil) - rollback := mockNewDiscoveryServiceClient(factory) - defer rollback() - - b := new(bootstrap.Bootstrap) - d, err := newDynamicSource(b) - assert.NoError(t, err) done := make(chan struct{}) - defer func() { - <-done - }() go func() { defer close(done) d.Serve() }() - - select { - case <-done: - t.Error("expect serving, but stopped") - return - case <-time.After(time.Millisecond * 500): - } - d.Stop() + time.AfterFunc(time.Millisecond*500, d.Stop) + <-done } diff --git a/config/mock_dynamic_test.go b/config/mock_dynamic_test.go index abd746b..ee3ec0a 100644 --- a/config/mock_dynamic_test.go +++ b/config/mock_dynamic_test.go @@ -44,6 +44,18 @@ func (mr *MockDynamicSourceMockRecorder) Serve() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockDynamicSource)(nil).Serve)) } +// SetDependencyHook mocks base method +func (m *MockDynamicSource) SetDependencyHook(arg0 dependencyHook) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetDependencyHook", arg0) +} + +// SetDependencyHook indicates an expected call of SetDependencyHook +func (mr *MockDynamicSourceMockRecorder) SetDependencyHook(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetDependencyHook", reflect.TypeOf((*MockDynamicSource)(nil).SetDependencyHook), arg0) +} + // SetSvcConfigHook mocks base method func (m *MockDynamicSource) SetSvcConfigHook(arg0 svcConfigHook) { m.ctrl.T.Helper() @@ -68,18 +80,6 @@ func (mr *MockDynamicSourceMockRecorder) SetSvcEndpointHook(arg0 interface{}) *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSvcEndpointHook", reflect.TypeOf((*MockDynamicSource)(nil).SetSvcEndpointHook), arg0) } -// SetSvcHook mocks base method -func (m *MockDynamicSource) SetSvcHook(arg0 svcHook) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetSvcHook", arg0) -} - -// SetSvcHook indicates an expected call of SetSvcHook -func (mr *MockDynamicSourceMockRecorder) SetSvcHook(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetSvcHook", reflect.TypeOf((*MockDynamicSource)(nil).SetSvcHook), arg0) -} - // Stop mocks base method func (m *MockDynamicSource) Stop() { m.ctrl.T.Helper() diff --git a/go.sum b/go.sum index a9a65e6..14d43ab 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,6 @@ github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8 h1:UUHMLvzt/31azWTN/ifG github.com/juju/loggo v0.0.0-20190526231331-6e530bcce5d8/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= github.com/kavu/go_reuseport v1.4.0 h1:YIp/96RZ3sJfn0LN+FFkkXIq3H3dfVOdRUtNejhDcxc= github.com/kavu/go_reuseport v1.4.0/go.mod h1:CG8Ee7ceMFSMnx/xr25Vm0qXaj2Z4i5PWoUx+JZ5/CU= -github.com/kirk91/stats v0.0.5-0.20191121063046-9621d5a6d574 h1:L8yhfuKEBD3YVoEFU+noZTPR4XZ7zQ4ox8v9q/G3FFM= -github.com/kirk91/stats v0.0.5-0.20191121063046-9621d5a6d574/go.mod h1:nEi2z+QVx+VFBqiOv8kFvz7W1QysihMd7denyKSaGlQ= github.com/kirk91/stats v0.0.5-0.20191121064423-8a4d70fadb55 h1:457zA1ptnacVw0/vXjJoi4z+NMrDgTQoRCs4z5w1/Jc= github.com/kirk91/stats v0.0.5-0.20191121064423-8a4d70fadb55/go.mod h1:nEi2z+QVx+VFBqiOv8kFvz7W1QysihMd7denyKSaGlQ= github.com/kirk91/statsd v0.0.0-20190123111500-eca5e2a6416c h1:xiy939HvpMPG4J5Qg6gTrtSEz1xQAUHlvS8m/jRURSE= diff --git a/pb/api/discovery.pb.json.go b/pb/api/discovery.pb.json.go index 9b7873e..1d688df 100644 --- a/pb/api/discovery.pb.json.go +++ b/pb/api/discovery.pb.json.go @@ -2,7 +2,6 @@ // source: api/discovery.proto package api - import ( "bytes" "encoding/json" @@ -13,7 +12,6 @@ import ( // instances of DependencyDiscoveryRequest. This struct is safe to replace or modify but // should not be done so concurrently. var DependencyDiscoveryRequestJSONMarshaler = new(jsonpb.Marshaler) - // MarshalJSON satisfies the encoding/json Marshaler interface. This method // uses the more correct jsonpb package to correctly marshal the message. func (m *DependencyDiscoveryRequest) MarshalJSON() ([]byte, error) { @@ -22,31 +20,26 @@ func (m *DependencyDiscoveryRequest) MarshalJSON() ([]byte, error) { } buf := &bytes.Buffer{} if err := DependencyDiscoveryRequestJSONMarshaler.Marshal(buf, m); err != nil { - return nil, err + return nil, err } return buf.Bytes(), nil } - var _ json.Marshaler = (*DependencyDiscoveryRequest)(nil) - // DependencyDiscoveryRequestJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all // instances of DependencyDiscoveryRequest. This struct is safe to replace or modify but // should not be done so concurrently. var DependencyDiscoveryRequestJSONUnmarshaler = new(jsonpb.Unmarshaler) - // UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method // uses the more correct jsonpb package to correctly unmarshal the message. func (m *DependencyDiscoveryRequest) UnmarshalJSON(b []byte) error { return DependencyDiscoveryRequestJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m) } - var _ json.Unmarshaler = (*DependencyDiscoveryRequest)(nil) // DependencyDiscoveryResponseJSONMarshaler describes the default jsonpb.Marshaler used by all // instances of DependencyDiscoveryResponse. This struct is safe to replace or modify but // should not be done so concurrently. var DependencyDiscoveryResponseJSONMarshaler = new(jsonpb.Marshaler) - // MarshalJSON satisfies the encoding/json Marshaler interface. This method // uses the more correct jsonpb package to correctly marshal the message. func (m *DependencyDiscoveryResponse) MarshalJSON() ([]byte, error) { @@ -55,31 +48,26 @@ func (m *DependencyDiscoveryResponse) MarshalJSON() ([]byte, error) { } buf := &bytes.Buffer{} if err := DependencyDiscoveryResponseJSONMarshaler.Marshal(buf, m); err != nil { - return nil, err + return nil, err } return buf.Bytes(), nil } - var _ json.Marshaler = (*DependencyDiscoveryResponse)(nil) - // DependencyDiscoveryResponseJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all // instances of DependencyDiscoveryResponse. This struct is safe to replace or modify but // should not be done so concurrently. var DependencyDiscoveryResponseJSONUnmarshaler = new(jsonpb.Unmarshaler) - // UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method // uses the more correct jsonpb package to correctly unmarshal the message. func (m *DependencyDiscoveryResponse) UnmarshalJSON(b []byte) error { return DependencyDiscoveryResponseJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m) } - var _ json.Unmarshaler = (*DependencyDiscoveryResponse)(nil) // SvcConfigDiscoveryRequestJSONMarshaler describes the default jsonpb.Marshaler used by all // instances of SvcConfigDiscoveryRequest. This struct is safe to replace or modify but // should not be done so concurrently. var SvcConfigDiscoveryRequestJSONMarshaler = new(jsonpb.Marshaler) - // MarshalJSON satisfies the encoding/json Marshaler interface. This method // uses the more correct jsonpb package to correctly marshal the message. func (m *SvcConfigDiscoveryRequest) MarshalJSON() ([]byte, error) { @@ -88,31 +76,26 @@ func (m *SvcConfigDiscoveryRequest) MarshalJSON() ([]byte, error) { } buf := &bytes.Buffer{} if err := SvcConfigDiscoveryRequestJSONMarshaler.Marshal(buf, m); err != nil { - return nil, err + return nil, err } return buf.Bytes(), nil } - var _ json.Marshaler = (*SvcConfigDiscoveryRequest)(nil) - // SvcConfigDiscoveryRequestJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all // instances of SvcConfigDiscoveryRequest. This struct is safe to replace or modify but // should not be done so concurrently. var SvcConfigDiscoveryRequestJSONUnmarshaler = new(jsonpb.Unmarshaler) - // UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method // uses the more correct jsonpb package to correctly unmarshal the message. func (m *SvcConfigDiscoveryRequest) UnmarshalJSON(b []byte) error { return SvcConfigDiscoveryRequestJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m) } - var _ json.Unmarshaler = (*SvcConfigDiscoveryRequest)(nil) // SvcConfigDiscoveryResponseJSONMarshaler describes the default jsonpb.Marshaler used by all // instances of SvcConfigDiscoveryResponse. This struct is safe to replace or modify but // should not be done so concurrently. var SvcConfigDiscoveryResponseJSONMarshaler = new(jsonpb.Marshaler) - // MarshalJSON satisfies the encoding/json Marshaler interface. This method // uses the more correct jsonpb package to correctly marshal the message. func (m *SvcConfigDiscoveryResponse) MarshalJSON() ([]byte, error) { @@ -121,31 +104,26 @@ func (m *SvcConfigDiscoveryResponse) MarshalJSON() ([]byte, error) { } buf := &bytes.Buffer{} if err := SvcConfigDiscoveryResponseJSONMarshaler.Marshal(buf, m); err != nil { - return nil, err + return nil, err } return buf.Bytes(), nil } - var _ json.Marshaler = (*SvcConfigDiscoveryResponse)(nil) - // SvcConfigDiscoveryResponseJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all // instances of SvcConfigDiscoveryResponse. This struct is safe to replace or modify but // should not be done so concurrently. var SvcConfigDiscoveryResponseJSONUnmarshaler = new(jsonpb.Unmarshaler) - // UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method // uses the more correct jsonpb package to correctly unmarshal the message. func (m *SvcConfigDiscoveryResponse) UnmarshalJSON(b []byte) error { return SvcConfigDiscoveryResponseJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m) } - var _ json.Unmarshaler = (*SvcConfigDiscoveryResponse)(nil) // SvcEndpointDiscoveryRequestJSONMarshaler describes the default jsonpb.Marshaler used by all // instances of SvcEndpointDiscoveryRequest. This struct is safe to replace or modify but // should not be done so concurrently. var SvcEndpointDiscoveryRequestJSONMarshaler = new(jsonpb.Marshaler) - // MarshalJSON satisfies the encoding/json Marshaler interface. This method // uses the more correct jsonpb package to correctly marshal the message. func (m *SvcEndpointDiscoveryRequest) MarshalJSON() ([]byte, error) { @@ -154,31 +132,26 @@ func (m *SvcEndpointDiscoveryRequest) MarshalJSON() ([]byte, error) { } buf := &bytes.Buffer{} if err := SvcEndpointDiscoveryRequestJSONMarshaler.Marshal(buf, m); err != nil { - return nil, err + return nil, err } return buf.Bytes(), nil } - var _ json.Marshaler = (*SvcEndpointDiscoveryRequest)(nil) - // SvcEndpointDiscoveryRequestJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all // instances of SvcEndpointDiscoveryRequest. This struct is safe to replace or modify but // should not be done so concurrently. var SvcEndpointDiscoveryRequestJSONUnmarshaler = new(jsonpb.Unmarshaler) - // UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method // uses the more correct jsonpb package to correctly unmarshal the message. func (m *SvcEndpointDiscoveryRequest) UnmarshalJSON(b []byte) error { return SvcEndpointDiscoveryRequestJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m) } - var _ json.Unmarshaler = (*SvcEndpointDiscoveryRequest)(nil) // SvcEndpointDiscoveryResponseJSONMarshaler describes the default jsonpb.Marshaler used by all // instances of SvcEndpointDiscoveryResponse. This struct is safe to replace or modify but // should not be done so concurrently. var SvcEndpointDiscoveryResponseJSONMarshaler = new(jsonpb.Marshaler) - // MarshalJSON satisfies the encoding/json Marshaler interface. This method // uses the more correct jsonpb package to correctly marshal the message. func (m *SvcEndpointDiscoveryResponse) MarshalJSON() ([]byte, error) { @@ -187,22 +160,19 @@ func (m *SvcEndpointDiscoveryResponse) MarshalJSON() ([]byte, error) { } buf := &bytes.Buffer{} if err := SvcEndpointDiscoveryResponseJSONMarshaler.Marshal(buf, m); err != nil { - return nil, err + return nil, err } return buf.Bytes(), nil } - var _ json.Marshaler = (*SvcEndpointDiscoveryResponse)(nil) - // SvcEndpointDiscoveryResponseJSONUnmarshaler describes the default jsonpb.Unmarshaler used by all // instances of SvcEndpointDiscoveryResponse. This struct is safe to replace or modify but // should not be done so concurrently. var SvcEndpointDiscoveryResponseJSONUnmarshaler = new(jsonpb.Unmarshaler) - // UnmarshalJSON satisfies the encoding/json Unmarshaler interface. This method // uses the more correct jsonpb package to correctly unmarshal the message. func (m *SvcEndpointDiscoveryResponse) UnmarshalJSON(b []byte) error { return SvcEndpointDiscoveryResponseJSONUnmarshaler.Unmarshal(bytes.NewReader(b), m) } - var _ json.Unmarshaler = (*SvcEndpointDiscoveryResponse)(nil) +