From 011b13870eef9c169f920f89ead524c9585ce9e7 Mon Sep 17 00:00:00 2001 From: raghu0891 Date: Tue, 12 Nov 2024 17:53:03 +0530 Subject: [PATCH] first commit --- .editorconfig | 17 ++ .gitignore | 26 +++ .travis.yml | 12 ++ LICENSE.txt | 174 ++++++++++++++++++++ Makefile | 11 ++ README.md | 83 ++++++++++ go.mod | 16 ++ go.sum | 137 ++++++++++++++++ proxy/codec.go | 69 ++++++++ proxy/codec_test.go | 23 +++ proxy/director.go | 25 +++ proxy/doc.go | 15 ++ proxy/examples_test.go | 70 +++++++++ proxy/handler.go | 160 +++++++++++++++++++ proxy/handler_test.go | 262 ++++++++++++++++++++++++++++++ proxy/proxy.go | 33 ++++ proxy/proxy_test.go | 211 +++++++++++++++++++++++++ testservice/Makefile | 9 ++ testservice/ping.go | 168 ++++++++++++++++++++ testservice/server/main.go | 54 +++++++ testservice/test.pb.go | 255 ++++++++++++++++++++++++++++++ testservice/test.proto | 30 ++++ testservice/test_grpc.pb.go | 306 ++++++++++++++++++++++++++++++++++++ testservice/testping.go | 158 +++++++++++++++++++ tools.go | 7 + 25 files changed, 2331 insertions(+) create mode 100644 .editorconfig create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 LICENSE.txt create mode 100644 Makefile create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 proxy/codec.go create mode 100644 proxy/codec_test.go create mode 100644 proxy/director.go create mode 100644 proxy/doc.go create mode 100644 proxy/examples_test.go create mode 100644 proxy/handler.go create mode 100644 proxy/handler_test.go create mode 100644 proxy/proxy.go create mode 100644 proxy/proxy_test.go create mode 100644 testservice/Makefile create mode 100644 testservice/ping.go create mode 100644 testservice/server/main.go create mode 100644 testservice/test.pb.go create mode 100644 testservice/test.proto create mode 100644 testservice/test_grpc.pb.go create mode 100644 testservice/testping.go create mode 100644 tools.go diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..ff53e6f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,17 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 +indent_style = space +insert_final_newline = true +max_line_length = 120 +tab_width = 4 + +[*.proto] +indent_size = 2 +tab_width = 2 + +[{*.go,*.go2}] +indent_style = tab diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0152f2c --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +.idea + +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof \ No newline at end of file diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..6bff9e4 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,12 @@ +sudo: false +language: go +go: + - "1.14" + - "1.15" + - "1.16" + +install: + - go get ./... + +script: + - make test-ci diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..cbfdef8 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,174 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..30b1052 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +.PHONY: test test-ci install-staticcheck + +test: + go vet ./... + go test -race -v -timeout 5m ./... + CGO_ENABLED=0 staticcheck -f stylish --go 1.14 ./... + +install-staticcheck: + go get honnef.co/go/tools/cmd/staticcheck + +test-ci: install-staticcheck | test diff --git a/README.md b/README.md new file mode 100644 index 0000000..0b3f876 --- /dev/null +++ b/README.md @@ -0,0 +1,83 @@ +# gRPC Proxy + +[![Travis Build](https://travis-ci.org/mwitkow/grpc-proxy.svg?branch=master)](https://travis-ci.org/mwitkow/grpc-proxy) +[![Go Report Card](https://goreportcard.com/badge/github.com/mwitkow/grpc-proxy)](https://goreportcard.com/report/github.com/mwitkow/grpc-proxy) +[![Go Reference](https://pkg.go.dev/badge/github.com/mwitkow/grpc-proxy.svg)](https://pkg.go.dev/github.com/mwitkow/grpc-proxy) +[![Apache 2.0 License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE) + +[gRPC Go](https://github.com/grpc/grpc-go) Proxy server + +## Project Goal + +Build a transparent reverse proxy for gRPC targets that will make it easy to expose gRPC services +over the internet. This includes: + * no needed knowledge of the semantics of requests exchanged in the call (independent rollouts) + * easy, declarative definition of backends and their mappings to frontends + * simple round-robin load balancing of inbound requests from a single connection to multiple backends + +The project now exists as a **proof of concept**, with the key piece being the `proxy` package that +is a generic gRPC reverse proxy handler. + +## Proxy Handler + +The package [`proxy`](proxy/) contains a generic gRPC reverse proxy handler that allows a gRPC server to +not know about registered handlers or their data types. Please consult the docs, here's an exaple usage. + +You can call `proxy.NewProxy` to create a `*grpc.Server` that proxies requests. +```go +proxy := proxy.NewProxy(clientConn) +``` + +More advanced users will want to define a `StreamDirector` that can make more complex decisions on what +to do with the request. +```go +director = func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + md, _ := metadata.FromIncomingContext(ctx) + outCtx = metadata.NewOutgoingContext(ctx, md.Copy()) + return outCtx, cc, nil + + // Make sure we never forward internal services. + if strings.HasPrefix(fullMethodName, "/com.example.internal.") { + return outCtx, nil, status.Errorf(codes.Unimplemented, "Unknown method") + } + + if ok { + // Decide on which backend to dial + if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { + // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. + return outCtx, grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithCodec(proxy.Codec())), nil + } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { + return outCtx, grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithCodec(proxy.Codec())), nil + } + } + return outCtx, nil, status.Errorf(codes.Unimplemented, "Unknown method") +} +``` + +Then you need to register it with a `grpc.Server`. The server may have other handlers that will be served +locally. + +```go +server := grpc.NewServer( + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) +pb_test.RegisterTestServiceServer(server, &testImpl{}) +``` + +## Testing +To make debugging a bit simpler, there are some helpers. + +`testservice` contains a method `TestTestServiceServerImpl` which performs a complete test against +the reference implementation of the `TestServiceServer`. + +In `proxy_test.go`, the test framework spins up a `TestServiceServer` that it tests the proxy +against. To make debugging a bit simpler (eg. if the developer needs to step into +`google.golang.org/grpc` methods), this `TestServiceServer` can be provided by a server by +passing `-test-backend=addr` to `go test`. A simple, local-only implementation of +`TestServiceServer` exists in [`testservice/server`](./testservice/server). + + +## License + +`grpc-proxy` is released under the Apache 2.0 license. See [LICENSE.txt](LICENSE.txt). + diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..be52bea --- /dev/null +++ b/go.mod @@ -0,0 +1,16 @@ +module github.com/goplugin/grpc-proxy + +go 1.14 + +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/stretchr/testify v1.7.0 + golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54 // indirect + golang.org/x/text v0.3.6 // indirect + google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2 // indirect + google.golang.org/grpc v1.36.1 + google.golang.org/protobuf v1.26.0 + honnef.co/go/tools v0.1.3 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9b632d6 --- /dev/null +++ b/go.sum @@ -0,0 +1,137 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= +golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c h1:KHUzaHIpjWVlVVNh65G3hhuj3KB1HnjY6Cq5cTvRQT8= +golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54 h1:rF3Ohx8DRyl8h2zw9qojyLHLhrJpEMgyPOImREEryf0= +golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2 h1:vAXehPdSJwEE7hh/kh9WnAVuJsfl6IUYudW5IOnwpvw= +google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.36.1 h1:cmUfbeGKnz9+2DD/UYsMQXeqbHZqZDs4eQwW0sFOpBY= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= diff --git a/proxy/codec.go b/proxy/codec.go new file mode 100644 index 0000000..cd1a4be --- /dev/null +++ b/proxy/codec.go @@ -0,0 +1,69 @@ +package proxy + +import ( + "fmt" + + "google.golang.org/grpc" + "google.golang.org/protobuf/proto" +) + +// Codec returns a proxying grpc.Codec with the default protobuf codec as parent. +// +// See CodecWithParent. +// +// Deprecated: No longer necessary. +func Codec() grpc.Codec { + return CodecWithParent(&protoCodec{}) +} + +// CodecWithParent returns a proxying grpc.Codec with a user provided codec as parent. +// +// Deprecated: No longer necessary. +func CodecWithParent(fallback grpc.Codec) grpc.Codec { + return &rawCodec{fallback} +} + +type rawCodec struct { + parentCodec grpc.Codec +} + +type frame struct { + payload []byte +} + +func (c *rawCodec) Marshal(v interface{}) ([]byte, error) { + out, ok := v.(*frame) + if !ok { + return c.parentCodec.Marshal(v) + } + return out.payload, nil + +} + +func (c *rawCodec) Unmarshal(data []byte, v interface{}) error { + dst, ok := v.(*frame) + if !ok { + return c.parentCodec.Unmarshal(data, v) + } + dst.payload = data + return nil +} + +func (c *rawCodec) String() string { + return fmt.Sprintf("proxy>%s", c.parentCodec.String()) +} + +// protoCodec is a Codec implementation with protobuf. It is the default rawCodec for gRPC. +type protoCodec struct{} + +func (protoCodec) Marshal(v interface{}) ([]byte, error) { + return proto.Marshal(v.(proto.Message)) +} + +func (protoCodec) Unmarshal(data []byte, v interface{}) error { + return proto.Unmarshal(data, v.(proto.Message)) +} + +func (protoCodec) String() string { + return "proto" +} diff --git a/proxy/codec_test.go b/proxy/codec_test.go new file mode 100644 index 0000000..99569ac --- /dev/null +++ b/proxy/codec_test.go @@ -0,0 +1,23 @@ +package proxy + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCodec_ReadYourWrites(t *testing.T) { + framePtr := &frame{} + data := []byte{0xDE, 0xAD, 0xBE, 0xEF} + codec := rawCodec{} + require.NoError(t, codec.Unmarshal(data, framePtr), "unmarshalling must go ok") + out, err := codec.Marshal(framePtr) + require.NoError(t, err, "no marshal error") + require.Equal(t, data, out, "output and data must be the same") + + // reuse + require.NoError(t, codec.Unmarshal([]byte{0x55}, framePtr), "unmarshalling must go ok") + out, err = codec.Marshal(framePtr) + require.NoError(t, err, "no marshal error") + require.Equal(t, []byte{0x55}, out, "output and data must be the same") +} diff --git a/proxy/director.go b/proxy/director.go new file mode 100644 index 0000000..6104cbc --- /dev/null +++ b/proxy/director.go @@ -0,0 +1,25 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "context" + + "google.golang.org/grpc" +) + +// StreamDirector returns a gRPC ClientConn to be used to forward the call to. +// +// The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers). +// If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned. +// +// The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want +// to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you +// *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned. +// +// It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors +// are invoked. So decisions around authorization, monitoring etc. are better to be handled there. +// +// See the rather rich example. +type StreamDirector func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error) diff --git a/proxy/doc.go b/proxy/doc.go new file mode 100644 index 0000000..613b5c4 --- /dev/null +++ b/proxy/doc.go @@ -0,0 +1,15 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +/* +Package proxy provides a reverse proxy handler for gRPC. + +The implementation allows a grpc.Server to pass a received ServerStream to a ClientStream without understanding +the semantics of the messages exchanged. It basically provides a transparent reverse-proxy. + +This package is intentionally generic, exposing a StreamDirector function that allows users of this package +to implement whatever logic of backend-picking, dialing and service verification to perform. + +See examples on documented functions. +*/ +package proxy diff --git a/proxy/examples_test.go b/proxy/examples_test.go new file mode 100644 index 0000000..d98a414 --- /dev/null +++ b/proxy/examples_test.go @@ -0,0 +1,70 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy_test + +import ( + "context" + "log" + "strings" + + "github.com/goplugin/grpc-proxy/proxy" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +var ( + director proxy.StreamDirector +) + +func ExampleNewProxy() { + dst, err := grpc.Dial("example.com") + if err != nil { + log.Fatalf("dialing example.org: %v", err) + } + proxy := proxy.NewProxy(dst) + _ = proxy +} + +func ExampleRegisterService() { + // A gRPC server with the proxying codec enabled. + server := grpc.NewServer() + // Register a TestService with 4 of its methods explicitly. + proxy.RegisterService(server, director, + "mwitkow.testproto.TestService", + "PingEmpty", "Ping", "PingError", "PingList") +} + +func ExampleTransparentHandler() { + grpc.NewServer( + grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) +} + +// Provides a simple example of a director that shields internal services and dials a staging or production backend. +// This is a *very naive* implementation that creates a new connection on every request. Consider using pooling. +func ExampleStreamDirector() { + director = func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error) { + // Make sure we never forward internal services. + if strings.HasPrefix(fullMethodName, "/com.example.internal.") { + return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method") + } + md, ok := metadata.FromIncomingContext(ctx) + // Copy the inbound metadata explicitly. + outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) + if ok { + // Decide on which backend to dial + if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { + // Make sure we use DialContext so the dialing can be cancelled/time out together with the context. + conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local") + return outCtx, conn, err + } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { + conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local") + return outCtx, conn, err + } + } + return nil, nil, status.Errorf(codes.Unimplemented, "Unknown method") + } +} diff --git a/proxy/handler.go b/proxy/handler.go new file mode 100644 index 0000000..e4edbb9 --- /dev/null +++ b/proxy/handler.go @@ -0,0 +1,160 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "context" + "io" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" +) + +var ( + clientStreamDescForProxying = &grpc.StreamDesc{ + ServerStreams: true, + ClientStreams: true, + } +) + +// RegisterService sets up a proxy handler for a particular gRPC service and method. +// The behaviour is the same as if you were registering a handler method, e.g. from a generated pb.go file. +func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string) { + streamer := &handler{director} + fakeDesc := &grpc.ServiceDesc{ + ServiceName: serviceName, + HandlerType: (*interface{})(nil), + } + for _, m := range methodNames { + streamDesc := grpc.StreamDesc{ + StreamName: m, + Handler: streamer.handler, + ServerStreams: true, + ClientStreams: true, + } + fakeDesc.Streams = append(fakeDesc.Streams, streamDesc) + } + server.RegisterService(fakeDesc, streamer) +} + +// TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. +// The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the +// backends. It should be used as a `grpc.UnknownServiceHandler`. +func TransparentHandler(director StreamDirector) grpc.StreamHandler { + streamer := &handler{director: director} + return streamer.handler +} + +type handler struct { + director StreamDirector +} + +// handler is where the real magic of proxying happens. +// It is invoked like any gRPC server stream and uses the emptypb.Empty type server +// to proxy calls between the input and output streams. +func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error { + // little bit of gRPC internals never hurt anyone + fullMethodName, ok := grpc.MethodFromServerStream(serverStream) + if !ok { + return status.Errorf(codes.Internal, "lowLevelServerStream not exists in context") + } + // We require that the director's returned context inherits from the serverStream.Context(). + outgoingCtx, backendConn, err := s.director(serverStream.Context(), fullMethodName) + if err != nil { + return err + } + + clientCtx, clientCancel := context.WithCancel(outgoingCtx) + defer clientCancel() + // TODO(mwitkow): Add a `forwarded` header to metadata, https://en.wikipedia.org/wiki/X-Forwarded-For. + clientStream, err := backendConn.NewStream(clientCtx, clientStreamDescForProxying, fullMethodName) + if err != nil { + return err + } + // Explicitly *do not close* s2cErrChan and c2sErrChan, otherwise the select below will not terminate. + // Channels do not have to be closed, it is just a control flow mechanism, see + // https://groups.google.com/forum/#!msg/golang-nuts/pZwdYRGxCIk/qpbHxRRPJdUJ + s2cErrChan := s.forwardServerToClient(serverStream, clientStream) + c2sErrChan := s.forwardClientToServer(clientStream, serverStream) + // We don't know which side is going to stop sending first, so we need a select between the two. + for i := 0; i < 2; i++ { + select { + case s2cErr := <-s2cErrChan: + if s2cErr == io.EOF { + // this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./ + // the clientStream>serverStream may continue pumping though. + clientStream.CloseSend() + } else { + // however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need + // to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and + // exit with an error to the stack + clientCancel() + return status.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr) + } + case c2sErr := <-c2sErrChan: + // This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two + // cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers + // will be nil. + serverStream.SetTrailer(clientStream.Trailer()) + // c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error. + if c2sErr != io.EOF { + return c2sErr + } + return nil + } + } + return status.Errorf(codes.Internal, "gRPC proxying should never reach this stage.") +} + +func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error { + ret := make(chan error, 1) + go func() { + f := &emptypb.Empty{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if i == 0 { + // This is a bit of a hack, but client to server headers are only readable after first client msg is + // received but must be written to server stream before the first msg is flushed. + // This is the only place to do it nicely. + md, err := src.Header() + if err != nil { + ret <- err + break + } + if err := dst.SendHeader(md); err != nil { + ret <- err + break + } + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} + +func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientStream) chan error { + ret := make(chan error, 1) + go func() { + f := &emptypb.Empty{} + for i := 0; ; i++ { + if err := src.RecvMsg(f); err != nil { + ret <- err // this can be io.EOF which is happy case + break + } + if err := dst.SendMsg(f); err != nil { + ret <- err + break + } + } + }() + return ret +} diff --git a/proxy/handler_test.go b/proxy/handler_test.go new file mode 100644 index 0000000..af60d79 --- /dev/null +++ b/proxy/handler_test.go @@ -0,0 +1,262 @@ +// Copyright 2017 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy_test + +import ( + "context" + "fmt" + "io" + "net" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/goplugin/grpc-proxy/proxy" + pb "github.com/goplugin/grpc-proxy/testservice" +) + +const ( + pingDefaultValue = "I like kittens." + clientMdKey = "test-client-header" + serverHeaderMdKey = "test-client-header" + serverTrailerMdKey = "test-client-trailer" + + rejectingMdKey = "test-reject-rpc-if-in-context" + + countListResponses = 20 +) + +// asserting service is implemented on the server side and serves as a handler for stuff +type assertingService struct { + t *testing.T + pb.UnsafeTestServiceServer +} + +var _ pb.TestServiceServer = (*assertingService)(nil) + +func (s *assertingService) PingEmpty(ctx context.Context, _ *emptypb.Empty) (*pb.PingResponse, error) { + // Check that this call has client's metadata. + md, ok := metadata.FromIncomingContext(ctx) + assert.True(s.t, ok, "PingEmpty call must have metadata in context") + _, ok = md[clientMdKey] + assert.True(s.t, ok, "PingEmpty call must have clients's custom headers in metadata") + return &pb.PingResponse{Value: pingDefaultValue, Counter: 42}, nil +} + +func (s *assertingService) Ping(ctx context.Context, ping *pb.PingRequest) (*pb.PingResponse, error) { + // Send user trailers and headers. + grpc.SendHeader(ctx, metadata.Pairs(serverHeaderMdKey, "I like turtles.")) + grpc.SetTrailer(ctx, metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) + return &pb.PingResponse{Value: ping.Value, Counter: 42}, nil +} + +func (s *assertingService) PingError(ctx context.Context, ping *pb.PingRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.FailedPrecondition, "Userspace error.") +} + +func (s *assertingService) PingList(ping *pb.PingRequest, stream pb.TestService_PingListServer) error { + // Send user trailers and headers. + stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) + for i := 0; i < countListResponses; i++ { + stream.Send(&pb.PingResponse{Value: ping.Value, Counter: int32(i)}) + } + stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) + return nil +} + +func (s *assertingService) PingStream(stream pb.TestService_PingStreamServer) error { + stream.SendHeader(metadata.Pairs(serverHeaderMdKey, "I like turtles.")) + counter := int32(0) + for { + ping, err := stream.Recv() + if err == io.EOF { + break + } else if err != nil { + require.NoError(s.t, err, "can't fail reading stream") + return err + } + pong := &pb.PingResponse{Value: ping.Value, Counter: counter} + if err := stream.Send(pong); err != nil { + require.NoError(s.t, err, "can't fail sending back a pong") + } + counter += 1 + } + stream.SetTrailer(metadata.Pairs(serverTrailerMdKey, "I like ending turtles.")) + return nil +} + +// ProxyHappySuite tests the "happy" path of handling: that everything works in absence of connection issues. +type ProxyHappySuite struct { + suite.Suite + + serverListener net.Listener + server *grpc.Server + proxyListener net.Listener + proxy *grpc.Server + serverClientConn *grpc.ClientConn + + client *grpc.ClientConn + testClient pb.TestServiceClient +} + +func (s *ProxyHappySuite) TestPingEmptyCarriesClientMetadata() { + ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(clientMdKey, "true")) + out, err := s.testClient.PingEmpty(ctx, &emptypb.Empty{}) + require.NoError(s.T(), err, "PingEmpty should succeed without errors") + want := &pb.PingResponse{Value: pingDefaultValue, Counter: 42} + require.True(s.T(), proto.Equal(want, out)) +} + +func (s *ProxyHappySuite) TestPingEmpty_StressTest() { + for i := 0; i < 50; i++ { + s.TestPingEmptyCarriesClientMetadata() + } +} + +func (s *ProxyHappySuite) TestPingCarriesServerHeadersAndTrailers() { + headerMd := make(metadata.MD) + trailerMd := make(metadata.MD) + // This is an awkward calling convention... but meh. + out, err := s.testClient.Ping(context.Background(), &pb.PingRequest{Value: "foo"}, grpc.Header(&headerMd), grpc.Trailer(&trailerMd)) + want := &pb.PingResponse{Value: "foo", Counter: 42} + require.NoError(s.T(), err, "Ping should succeed without errors") + require.True(s.T(), proto.Equal(want, out)) + assert.Contains(s.T(), headerMd, serverHeaderMdKey, "server response headers must contain server data") + assert.Len(s.T(), trailerMd, 1, "server response trailers must contain server data") +} + +func (s *ProxyHappySuite) TestPingErrorPropagatesAppError() { + _, err := s.testClient.PingError(context.Background(), &pb.PingRequest{Value: "foo"}) + require.Error(s.T(), err, "PingError should never succeed") + assert.Equal(s.T(), codes.FailedPrecondition, status.Code(err)) + assert.Equal(s.T(), "Userspace error.", status.Convert(err).Message()) +} + +func (s *ProxyHappySuite) TestDirectorErrorIsPropagated() { + // See SetupSuite where the StreamDirector has a special case. + ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(rejectingMdKey, "true")) + _, err := s.testClient.Ping(ctx, &pb.PingRequest{Value: "foo"}) + require.Error(s.T(), err, "Director should reject this RPC") + assert.Equal(s.T(), codes.PermissionDenied, status.Code(err)) + assert.Equal(s.T(), "testing rejection", status.Convert(err).Message()) +} + +func (s *ProxyHappySuite) TestPingStream_FullDuplexWorks() { + stream, err := s.testClient.PingStream(context.Background()) + require.NoError(s.T(), err, "PingStream request should be successful.") + + for i := 0; i < countListResponses; i++ { + ping := &pb.PingRequest{Value: fmt.Sprintf("foo:%d", i)} + require.NoError(s.T(), stream.Send(ping), "sending to PingStream must not fail") + resp, err := stream.Recv() + if err == io.EOF { + break + } + if i == 0 { + // Check that the header arrives before all entries. + headerMd, err := stream.Header() + require.NoError(s.T(), err, "PingStream headers should not error.") + assert.Contains(s.T(), headerMd, serverHeaderMdKey, "PingStream response headers user contain metadata") + } + assert.EqualValues(s.T(), i, resp.Counter, "ping roundtrip must succeed with the correct id") + } + require.NoError(s.T(), stream.CloseSend(), "no error on close send") + _, err = stream.Recv() + require.Equal(s.T(), io.EOF, err, "stream should close with io.EOF, meaining OK") + // Check that the trailer headers are here. + trailerMd := stream.Trailer() + assert.Len(s.T(), trailerMd, 1, "PingList trailer headers user contain metadata") +} + +func (s *ProxyHappySuite) TestPingStream_StressTest() { + for i := 0; i < 50; i++ { + s.TestPingStream_FullDuplexWorks() + } +} + +func (s *ProxyHappySuite) SetupSuite() { + var err error + + s.proxyListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for proxyListener") + s.serverListener, err = net.Listen("tcp", "127.0.0.1:0") + require.NoError(s.T(), err, "must be able to allocate a port for serverListener") + + s.server = grpc.NewServer() + pb.RegisterTestServiceServer(s.server, &assertingService{t: s.T()}) + + // Setup of the proxy's Director. + //lint:ignore SA1019 regression test + s.serverClientConn, err = grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure(), grpc.WithCodec(proxy.Codec())) + require.NoError(s.T(), err, "must not error on deferred client Dial") + director := func(ctx context.Context, fullName string) (context.Context, grpc.ClientConnInterface, error) { + md, ok := metadata.FromIncomingContext(ctx) + if ok { + if _, exists := md[rejectingMdKey]; exists { + return ctx, nil, status.Errorf(codes.PermissionDenied, "testing rejection") + } + } + // Explicitly copy the metadata, otherwise the tests will fail. + outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) + return outCtx, s.serverClientConn, nil + } + s.proxy = grpc.NewServer( + //lint:ignore SA1019 regression test + grpc.CustomCodec(proxy.Codec()), + grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), + ) + // Ping handler is handled as an explicit registration and not as a TransparentHandler. + proxy.RegisterService(s.proxy, director, + "mwitkow.testproto.TestService", + "Ping") + + // Start the serving loops. + s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String()) + go func() { + s.server.Serve(s.serverListener) + }() + s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String()) + go func() { + s.proxy.Serve(s.proxyListener) + }() + + dCtx, ccl := context.WithTimeout(context.Background(), time.Second) + defer ccl() + clientConn, err := grpc.DialContext(dCtx, strings.Replace(s.proxyListener.Addr().String(), "127.0.0.1", "localhost", 1), grpc.WithInsecure()) + require.NoError(s.T(), err, "must not error on deferred client Dial") + s.testClient = pb.NewTestServiceClient(clientConn) +} + +func (s *ProxyHappySuite) TearDownSuite() { + if s.client != nil { + s.client.Close() + } + if s.serverClientConn != nil { + s.serverClientConn.Close() + } + // Close all transports so the logs don't get spammy. + time.Sleep(10 * time.Millisecond) + if s.proxy != nil { + s.proxy.Stop() + s.proxyListener.Close() + } + if s.serverListener != nil { + s.server.Stop() + s.serverListener.Close() + } +} + +func TestProxyHappySuite(t *testing.T) { + suite.Run(t, &ProxyHappySuite{}) +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..946fdcb --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,33 @@ +// Copyright 2021 Michal Witkowski. All Rights Reserved. +// See LICENSE for licensing terms. + +package proxy + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// NewProxy sets up a simple proxy that forwards all requests to dst. +func NewProxy(dst grpc.ClientConnInterface, opts ...grpc.ServerOption) *grpc.Server { + opts = append(opts, DefaultProxyOpt(dst)) + // Set up the proxy server and then serve from it like in step one. + return grpc.NewServer(opts...) +} + +// DefaultProxyOpt returns an grpc.UnknownServiceHandler with a DefaultDirector. +func DefaultProxyOpt(cc grpc.ClientConnInterface) grpc.ServerOption { + return grpc.UnknownServiceHandler(TransparentHandler(DefaultDirector(cc))) +} + +// DefaultDirector returns a very simple forwarding StreamDirector that forwards all +// calls. +func DefaultDirector(cc grpc.ClientConnInterface) StreamDirector { + return func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error) { + md, _ := metadata.FromIncomingContext(ctx) + ctx = metadata.NewOutgoingContext(ctx, md.Copy()) + return ctx, cc, nil + } +} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go new file mode 100644 index 0000000..b0af965 --- /dev/null +++ b/proxy/proxy_test.go @@ -0,0 +1,211 @@ +package proxy_test + +import ( + "context" + "flag" + "fmt" + "net" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/test/bufconn" + + "github.com/goplugin/grpc-proxy/proxy" + "github.com/goplugin/grpc-proxy/testservice" +) + +var testBackend = flag.String("test-backend", "", "Service providing TestServiceServer") + +// TestIntegrationV1 is a regression test of the proxy. +func TestLegacyBehaviour(t *testing.T) { + // These bufconns are test listeners used to make connections between our + // services. This test actually starts two fully functional grpc services. + proxyBc := bufconn.Listen(10) + + // Setup is a little thorough, but here's the gist of it: + // 1. Create the test backend using testservice.DefaultTestServiceServer + // 2. Create the proxy backend using this package + // 3. Make calls to 1 via 2. + + // 1. + //lint:ignore SA1019 regression test + testCC, err := backendDialer(t, grpc.WithCodec(proxy.Codec())) + if err != nil { + t.Fatal(err) + } + + // 2. + go func() { + // Second, we need to implement the SteamDirector. + directorFn := func(ctx context.Context, fullMethodName string) (context.Context, grpc.ClientConnInterface, error) { + md, _ := metadata.FromIncomingContext(ctx) + outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) + return outCtx, testCC, nil + } + + // Set up the proxy server and then serve from it like in step one. + proxySrv := grpc.NewServer( + //lint:ignore SA1019 regression test + grpc.CustomCodec(proxy.Codec()), // was previously needed for proxy to function. + grpc.UnknownServiceHandler(proxy.TransparentHandler(directorFn)), + ) + // run the proxy backend + go func() { + t.Log("Running proxySrv") + if err := proxySrv.Serve(proxyBc); err != nil { + if err == grpc.ErrServerStopped { + return + } + t.Logf("running proxy server: %v", err) + } + }() + t.Cleanup(func() { + t.Log("Gracefully stopping proxySrv") + proxySrv.GracefulStop() + }) + }() + + // 3. + // Connect to the proxy. We should not need to do anything special here - + // users do not need to know they're talking to a proxy. + proxyCC, err := grpc.Dial( + "bufnet", + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return proxyBc.Dial() + }), + ) + if err != nil { + t.Fatalf("dialing proxy: %v", err) + } + proxyClient := testservice.NewTestServiceClient(proxyCC) + + // 4. Run the tests! + testservice.TestTestServiceServerImpl(t, proxyClient) +} + +func TestNewProxy(t *testing.T) { + proxyBc := bufconn.Listen(10) + + // Setup is a little thorough, but here's the gist of it: + // 1. Create the test backend using testservice.DefaultTestServiceServer + // 2. Create the proxy backend using this package + // 3. Make calls to 1 via 2. + + // 1. + // First, we need to create a client connection to this backend. + testCC, err := backendDialer(t) + if err != nil { + t.Fatal(err) + } + + // 2. + go func() { + t.Helper() + + // First, we need to create a client connection to this backend. + proxySrv := proxy.NewProxy(testCC) + + // run the proxy backend + go func() { + t.Log("Running proxySrv") + if err := proxySrv.Serve(proxyBc); err != nil { + if err == grpc.ErrServerStopped { + return + } + t.Logf("running proxy server: %v", err) + } + }() + t.Cleanup(func() { + t.Log("Gracefully stopping proxySrv") + proxySrv.GracefulStop() + }) + }() + + // 3. + // Connect to the proxy. We should not need to do anything special here - + // users do not need to know they're talking to a proxy. + t.Logf("dialing %s", proxyBc.Addr()) + proxyCC, err := grpc.Dial( + proxyBc.Addr().String(), + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return proxyBc.Dial() + }), + ) + if err != nil { + t.Fatalf("dialing proxy: %v", err) + } + proxyClient := testservice.NewTestServiceClient(proxyCC) + + // 4. Run the tests! + testservice.TestTestServiceServerImpl(t, proxyClient) +} + +// backendDialer dials the testservice.TestServiceServer either by connecting +// to the user-supplied server, or by creating a mock server using bufconn. +func backendDialer(t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + t.Helper() + + if *testBackend != "" { + return backendSvcDialer(t, *testBackend, opts...) + } + + backendBc := bufconn.Listen(10) + // set up the backend using a "real" server over a bufconn + testSrv := grpc.NewServer() + testservice.RegisterTestServiceServer(testSrv, testservice.DefaultTestServiceServer) + + // run the test backend + go func() { + t.Log("Running testSrv") + if err := testSrv.Serve(backendBc); err != nil { + if err == grpc.ErrServerStopped { + return + } + t.Logf("running test server: %v", err) + } + }() + t.Cleanup(func() { + t.Log("Gracefully stopping testSrv") + testSrv.GracefulStop() + }) + + opts = append(opts, + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) { + return backendBc.Dial() + }), + ) + + backendCC, err := grpc.Dial( + "bufnet", + opts..., + ) + if err != nil { + return nil, fmt.Errorf("dialing backend: %v", err) + } + return backendCC, nil +} + +func backendSvcDialer(t *testing.T, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { + opts = append(opts, + grpc.WithInsecure(), + grpc.WithBlock(), + ) + + t.Logf("connecting to %s", addr) + cc, err := grpc.Dial( + addr, + opts..., + ) + if err != nil { + return nil, fmt.Errorf("dialing backend: %v", err) + } + + return cc, nil +} diff --git a/testservice/Makefile b/testservice/Makefile new file mode 100644 index 0000000..6812a00 --- /dev/null +++ b/testservice/Makefile @@ -0,0 +1,9 @@ + +all: test_go + +test_go: test.proto + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + test.proto + + diff --git a/testservice/ping.go b/testservice/ping.go new file mode 100644 index 0000000..9638781 --- /dev/null +++ b/testservice/ping.go @@ -0,0 +1,168 @@ +package testservice + +import ( + "context" + "fmt" + "io" + + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" +) + +var DefaultTestServiceServer = defaultPingServer{} + +const ( + PingHeader = "ping-header" + PingHeaderCts = "Arbitrary header text" + PingTrailer = "ping-trailer" + PingTrailerCts = "Arbitrary trailer text" + PingEchoHeader = "ping-echo-header" + PingEchoTrailer = "ping-echo-trailer" +) + +// defaultPingServer is the canonical implementation of a TestServiceServer. +type defaultPingServer struct { + UnsafeTestServiceServer +} + +func (s defaultPingServer) PingEmpty(ctx context.Context, empty *emptypb.Empty) (*PingResponse, error) { + if err := s.sendHeader(ctx); err != nil { + return nil, err + } + if err := s.setTrailer(ctx); err != nil { + return nil, err + } + return &PingResponse{}, nil +} + +func (s defaultPingServer) Ping(ctx context.Context, request *PingRequest) (*PingResponse, error) { + if err := s.sendHeader(ctx); err != nil { + return nil, err + } + if err := s.setTrailer(ctx); err != nil { + return nil, err + } + + return &PingResponse{Value: request.Value}, nil +} + +func (s defaultPingServer) PingError(ctx context.Context, request *PingRequest) (*emptypb.Empty, error) { + if err := s.sendHeader(ctx); err != nil { + return nil, err + } + if err := s.setTrailer(ctx); err != nil { + return nil, err + } + return nil, status.Error(codes.Unknown, "Something is wrong and this is a message that describes it") +} + +func (s defaultPingServer) PingList(request *PingRequest, server TestService_PingListServer) error { + if err := s.sendHeader(server.Context()); err != nil { + return err + } + s.setStreamTrailer(server) + for i := 0; i < 10; i++ { + if err := server.Send(&PingResponse{ + Value: request.Value, + Counter: int32(i), + }); err != nil { + return err + } + } + return nil +} + +func (s defaultPingServer) PingStream(server TestService_PingStreamServer) error { + g, ctx := errgroup.WithContext(context.Background()) + + if err := s.sendHeader(server.Context()); err != nil { + return err + } + + pings := make(chan *PingRequest) + g.Go(func() error { + defer close(pings) + for { + m, err := server.Recv() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + select { + case pings <- m: + case <-ctx.Done(): + return ctx.Err() + } + } + }) + g.Go(func() error { + var i int32 + for m := range pings { + if err := server.Send(&PingResponse{ + Value: m.Value, + Counter: i, + }); err != nil { + return err + } + i++ + } + return nil + }) + + return g.Wait() +} + +func (s *defaultPingServer) sendHeader(ctx context.Context) error { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + md = metadata.New(nil) + } + + if tvs := md.Get(PingEchoHeader); len(tvs) > 0 { + md.Append(PingEchoHeader, tvs...) + } + + md.Append(PingHeader, PingHeaderCts) + + if err := grpc.SendHeader(ctx, md); err != nil { + return fmt.Errorf("setting header: %w", err) + } + return nil +} + +func (s *defaultPingServer) setTrailer(ctx context.Context) error { + md := s.buildTrailer(ctx) + + if err := grpc.SetTrailer(ctx, md); err != nil { + return fmt.Errorf("setting trailer: %w", err) + } + + return nil +} + +func (s *defaultPingServer) buildTrailer(ctx context.Context) metadata.MD { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + md = metadata.New(nil) + } + + if tvs := md.Get(PingEchoTrailer); len(tvs) > 0 { + md.Append(PingEchoTrailer, tvs...) + } + + md.Append(PingTrailer, PingTrailerCts) + + return md +} + +func (s defaultPingServer) setStreamTrailer(server grpc.ServerStream) { + server.SetTrailer(s.buildTrailer(server.Context())) +} + +var _ TestServiceServer = (*defaultPingServer)(nil) diff --git a/testservice/server/main.go b/testservice/server/main.go new file mode 100644 index 0000000..adbb3ed --- /dev/null +++ b/testservice/server/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "flag" + "fmt" + "log" + "net" + "os" + "os/signal" + "syscall" + + "google.golang.org/grpc" + + "github.com/goplugin/grpc-proxy/testservice" +) + +var ( + port = flag.Uint("port", 8080, "Port to listen to") +) + +func main() { + srv := grpc.NewServer() + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port)) + if err != nil { + panic(err) + } + testservice.RegisterTestServiceServer(srv, testservice.DefaultTestServiceServer) + + errs := make(chan error) + + go func() { + log.Printf("listening on %s", lis.Addr().String()) + errs <- srv.Serve(lis) + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + log.Printf("shutdown due to %s", sig) + srv.GracefulStop() + }() + + if err := <-errs; err != nil { + log.Println(err) + os.Exit(1) + } + os.Exit(0) +} + +func init() { + flag.Parse() +} diff --git a/testservice/test.pb.go b/testservice/test.pb.go new file mode 100644 index 0000000..ff4b0d8 --- /dev/null +++ b/testservice/test.pb.go @@ -0,0 +1,255 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0-devel +// protoc v3.15.5 +// source: test.proto + +package testservice + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PingRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` +} + +func (x *PingRequest) Reset() { + *x = PingRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingRequest) ProtoMessage() {} + +func (x *PingRequest) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingRequest.ProtoReflect.Descriptor instead. +func (*PingRequest) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{0} +} + +func (x *PingRequest) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +type PingResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"` + Counter int32 `protobuf:"varint,2,opt,name=counter,proto3" json:"counter,omitempty"` +} + +func (x *PingResponse) Reset() { + *x = PingResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_test_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PingResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PingResponse) ProtoMessage() {} + +func (x *PingResponse) ProtoReflect() protoreflect.Message { + mi := &file_test_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PingResponse.ProtoReflect.Descriptor instead. +func (*PingResponse) Descriptor() ([]byte, []int) { + return file_test_proto_rawDescGZIP(), []int{1} +} + +func (x *PingResponse) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *PingResponse) GetCounter() int32 { + if x != nil { + return x.Counter + } + return 0 +} + +var File_test_proto protoreflect.FileDescriptor + +var file_test_proto_rawDesc = []byte{ + 0x0a, 0x0a, 0x74, 0x65, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x11, 0x6d, 0x77, + 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, + 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x23, 0x0a, 0x0b, + 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, + 0x65, 0x22, 0x3e, 0x0a, 0x0c, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x65, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x65, + 0x72, 0x32, 0x8d, 0x03, 0x0a, 0x0b, 0x54, 0x65, 0x73, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x46, 0x0a, 0x09, 0x50, 0x69, 0x6e, 0x67, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x12, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, + 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x49, 0x0a, 0x04, 0x50, 0x69, 0x6e, + 0x67, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x00, 0x12, 0x45, 0x0a, 0x09, 0x50, 0x69, 0x6e, 0x67, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x4f, 0x0a, 0x08, 0x50, + 0x69, 0x6e, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, + 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, + 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, 0x69, 0x6e, 0x67, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x12, 0x53, 0x0a, 0x0a, + 0x50, 0x69, 0x6e, 0x67, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x1e, 0x2e, 0x6d, 0x77, 0x69, + 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, + 0x69, 0x6e, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x77, 0x69, + 0x74, 0x6b, 0x6f, 0x77, 0x2e, 0x74, 0x65, 0x73, 0x74, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x50, + 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, + 0x01, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x6d, 0x77, 0x69, 0x74, 0x6b, 0x6f, 0x77, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2d, 0x70, 0x72, 0x6f, + 0x78, 0x79, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_test_proto_rawDescOnce sync.Once + file_test_proto_rawDescData = file_test_proto_rawDesc +) + +func file_test_proto_rawDescGZIP() []byte { + file_test_proto_rawDescOnce.Do(func() { + file_test_proto_rawDescData = protoimpl.X.CompressGZIP(file_test_proto_rawDescData) + }) + return file_test_proto_rawDescData +} + +var file_test_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_test_proto_goTypes = []interface{}{ + (*PingRequest)(nil), // 0: mwitkow.testproto.PingRequest + (*PingResponse)(nil), // 1: mwitkow.testproto.PingResponse + (*emptypb.Empty)(nil), // 2: google.protobuf.Empty +} +var file_test_proto_depIdxs = []int32{ + 2, // 0: mwitkow.testproto.TestService.PingEmpty:input_type -> google.protobuf.Empty + 0, // 1: mwitkow.testproto.TestService.Ping:input_type -> mwitkow.testproto.PingRequest + 0, // 2: mwitkow.testproto.TestService.PingError:input_type -> mwitkow.testproto.PingRequest + 0, // 3: mwitkow.testproto.TestService.PingList:input_type -> mwitkow.testproto.PingRequest + 0, // 4: mwitkow.testproto.TestService.PingStream:input_type -> mwitkow.testproto.PingRequest + 1, // 5: mwitkow.testproto.TestService.PingEmpty:output_type -> mwitkow.testproto.PingResponse + 1, // 6: mwitkow.testproto.TestService.Ping:output_type -> mwitkow.testproto.PingResponse + 2, // 7: mwitkow.testproto.TestService.PingError:output_type -> google.protobuf.Empty + 1, // 8: mwitkow.testproto.TestService.PingList:output_type -> mwitkow.testproto.PingResponse + 1, // 9: mwitkow.testproto.TestService.PingStream:output_type -> mwitkow.testproto.PingResponse + 5, // [5:10] is the sub-list for method output_type + 0, // [0:5] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_test_proto_init() } +func file_test_proto_init() { + if File_test_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_test_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_test_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PingResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_test_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_test_proto_goTypes, + DependencyIndexes: file_test_proto_depIdxs, + MessageInfos: file_test_proto_msgTypes, + }.Build() + File_test_proto = out.File + file_test_proto_rawDesc = nil + file_test_proto_goTypes = nil + file_test_proto_depIdxs = nil +} diff --git a/testservice/test.proto b/testservice/test.proto new file mode 100644 index 0000000..a999832 --- /dev/null +++ b/testservice/test.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package mwitkow.testproto; + +option go_package="github.com/mwitkow/grpc-proxy/testservice"; + +import "google/protobuf/empty.proto"; + +message PingRequest { + string value = 1; +} + +message PingResponse { + string value = 1; + int32 counter = 2; +} + +service TestService { + rpc PingEmpty(google.protobuf.Empty) returns (PingResponse) {} + + rpc Ping(PingRequest) returns (PingResponse) {} + + rpc PingError(PingRequest) returns (google.protobuf.Empty) {} + + rpc PingList(PingRequest) returns (stream PingResponse) {} + + rpc PingStream(stream PingRequest) returns (stream PingResponse) {} + +} + diff --git a/testservice/test_grpc.pb.go b/testservice/test_grpc.pb.go new file mode 100644 index 0000000..473c361 --- /dev/null +++ b/testservice/test_grpc.pb.go @@ -0,0 +1,306 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package testservice + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TestServiceClient is the client API for TestService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TestServiceClient interface { + PingEmpty(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PingResponse, error) + Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) + PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (TestService_PingListClient, error) + PingStream(ctx context.Context, opts ...grpc.CallOption) (TestService_PingStreamClient, error) +} + +type testServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTestServiceClient(cc grpc.ClientConnInterface) TestServiceClient { + return &testServiceClient{cc} +} + +func (c *testServiceClient) PingEmpty(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PingResponse, error) { + out := new(PingResponse) + err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/PingEmpty", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *testServiceClient) Ping(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*PingResponse, error) { + out := new(PingResponse) + err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/Ping", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *testServiceClient) PingError(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, "/mwitkow.testproto.TestService/PingError", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *testServiceClient) PingList(ctx context.Context, in *PingRequest, opts ...grpc.CallOption) (TestService_PingListClient, error) { + stream, err := c.cc.NewStream(ctx, &TestService_ServiceDesc.Streams[0], "/mwitkow.testproto.TestService/PingList", opts...) + if err != nil { + return nil, err + } + x := &testServicePingListClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type TestService_PingListClient interface { + Recv() (*PingResponse, error) + grpc.ClientStream +} + +type testServicePingListClient struct { + grpc.ClientStream +} + +func (x *testServicePingListClient) Recv() (*PingResponse, error) { + m := new(PingResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *testServiceClient) PingStream(ctx context.Context, opts ...grpc.CallOption) (TestService_PingStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &TestService_ServiceDesc.Streams[1], "/mwitkow.testproto.TestService/PingStream", opts...) + if err != nil { + return nil, err + } + x := &testServicePingStreamClient{stream} + return x, nil +} + +type TestService_PingStreamClient interface { + Send(*PingRequest) error + Recv() (*PingResponse, error) + grpc.ClientStream +} + +type testServicePingStreamClient struct { + grpc.ClientStream +} + +func (x *testServicePingStreamClient) Send(m *PingRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *testServicePingStreamClient) Recv() (*PingResponse, error) { + m := new(PingResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TestServiceServer is the server API for TestService service. +// All implementations must embed UnimplementedTestServiceServer +// for forward compatibility +type TestServiceServer interface { + PingEmpty(context.Context, *emptypb.Empty) (*PingResponse, error) + Ping(context.Context, *PingRequest) (*PingResponse, error) + PingError(context.Context, *PingRequest) (*emptypb.Empty, error) + PingList(*PingRequest, TestService_PingListServer) error + PingStream(TestService_PingStreamServer) error + mustEmbedUnimplementedTestServiceServer() +} + +// UnimplementedTestServiceServer must be embedded to have forward compatible implementations. +type UnimplementedTestServiceServer struct { +} + +func (UnimplementedTestServiceServer) PingEmpty(context.Context, *emptypb.Empty) (*PingResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PingEmpty not implemented") +} +func (UnimplementedTestServiceServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") +} +func (UnimplementedTestServiceServer) PingError(context.Context, *PingRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method PingError not implemented") +} +func (UnimplementedTestServiceServer) PingList(*PingRequest, TestService_PingListServer) error { + return status.Errorf(codes.Unimplemented, "method PingList not implemented") +} +func (UnimplementedTestServiceServer) PingStream(TestService_PingStreamServer) error { + return status.Errorf(codes.Unimplemented, "method PingStream not implemented") +} +func (UnimplementedTestServiceServer) mustEmbedUnimplementedTestServiceServer() {} + +// UnsafeTestServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TestServiceServer will +// result in compilation errors. +type UnsafeTestServiceServer interface { + mustEmbedUnimplementedTestServiceServer() +} + +func RegisterTestServiceServer(s grpc.ServiceRegistrar, srv TestServiceServer) { + s.RegisterService(&TestService_ServiceDesc, srv) +} + +func _TestService_PingEmpty_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestServiceServer).PingEmpty(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mwitkow.testproto.TestService/PingEmpty", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestServiceServer).PingEmpty(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _TestService_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestServiceServer).Ping(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mwitkow.testproto.TestService/Ping", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestServiceServer).Ping(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _TestService_PingError_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(PingRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TestServiceServer).PingError(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mwitkow.testproto.TestService/PingError", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TestServiceServer).PingError(ctx, req.(*PingRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _TestService_PingList_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(PingRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TestServiceServer).PingList(m, &testServicePingListServer{stream}) +} + +type TestService_PingListServer interface { + Send(*PingResponse) error + grpc.ServerStream +} + +type testServicePingListServer struct { + grpc.ServerStream +} + +func (x *testServicePingListServer) Send(m *PingResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _TestService_PingStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(TestServiceServer).PingStream(&testServicePingStreamServer{stream}) +} + +type TestService_PingStreamServer interface { + Send(*PingResponse) error + Recv() (*PingRequest, error) + grpc.ServerStream +} + +type testServicePingStreamServer struct { + grpc.ServerStream +} + +func (x *testServicePingStreamServer) Send(m *PingResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *testServicePingStreamServer) Recv() (*PingRequest, error) { + m := new(PingRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// TestService_ServiceDesc is the grpc.ServiceDesc for TestService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TestService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "mwitkow.testproto.TestService", + HandlerType: (*TestServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "PingEmpty", + Handler: _TestService_PingEmpty_Handler, + }, + { + MethodName: "Ping", + Handler: _TestService_Ping_Handler, + }, + { + MethodName: "PingError", + Handler: _TestService_PingError_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "PingList", + Handler: _TestService_PingList_Handler, + ServerStreams: true, + }, + { + StreamName: "PingStream", + Handler: _TestService_PingStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "test.proto", +} diff --git a/testservice/testping.go b/testservice/testping.go new file mode 100644 index 0000000..ef9cd8b --- /dev/null +++ b/testservice/testping.go @@ -0,0 +1,158 @@ +package testservice + +import ( + "context" + "io" + "reflect" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +const ( + returnHeader = "test-client-header" +) + +// TestTestServiceServerImpl can be called to test the underlying TestServiceServer. +func TestTestServiceServerImpl(t *testing.T, client TestServiceClient) { + t.Run("Unary ping", func(t *testing.T) { + want := "hello, world" + hdr := metadata.MD{} + res, err := client.Ping(context.TODO(), &PingRequest{Value: want}, grpc.Header(&hdr)) + if err != nil { + t.Errorf("want no err; got %v", err) + return + } + checkHeaders(t, hdr) + t.Logf("got %v (%d)", res.Value, res.Counter) + if got := res.Value; got != want { + t.Errorf("res.Value = %q; want %q", got, want) + } + }) + + t.Run("Error ping", func(t *testing.T) { + _, err := client.PingError(context.TODO(), &PingRequest{}) + if err == nil { + t.Errorf("want err; got %v", err) + } + }) + + t.Run("Server streaming ping", func(t *testing.T) { + want := "hello, world" + stream, err := client.PingList(context.TODO(), &PingRequest{Value: want}) + if err != nil { + t.Errorf("want no err; got %v", err) + if err := stream.CloseSend(); err != nil { + t.Fatalf("closing send channel: %v", err) + } + return + } + hdr, err := stream.Header() + if err != nil { + t.Errorf("reading headers: %v", err) + } + checkHeaders(t, hdr) + + for { + res, err := stream.Recv() + if err != nil { + if err == io.EOF { + checkTrailers(t, stream.Trailer()) + return + } + t.Errorf("want no err; got %v", err) + return + } + t.Logf("got %v (%d)", res.Value, res.Counter) + if got := res.Value; got != want { + t.Errorf("res.Value = %q; want %q", got, want) + } + } + }) + + t.Run("Bidirectional pinging", func(t *testing.T) { + want := "hello, world" + stream, err := client.PingStream(context.TODO()) + if err != nil { + t.Errorf("want no err; got %v", err) + if err := stream.CloseSend(); err != nil { + t.Fatalf("closing send channel: %v", err) + } + return + } + + d := make(chan struct{}) + go func() { + hdr, err := stream.Header() + if err != nil { + t.Errorf("reading headers: %v", err) + } + checkHeaders(t, hdr) + close(d) + }() + + for i := 0; i < 25; i++ { + if err := stream.Send(&PingRequest{Value: want}); err != nil { + t.Errorf("want no err; got %v", err) + return + } + res, err := stream.Recv() + if err != nil { + t.Errorf("receiving full duplex stream: %v", err) + return + } + t.Logf("got %v (%d)", res.Value, res.Counter) + if got := res.Value; got != want { + t.Errorf("res.Value = %q; want %q", got, want) + } + if got, want := res.Counter, int32(i); got != want { + t.Errorf("res.Counter = %d; want %d", got, want) + } + } + if err := stream.CloseSend(); err != nil { + t.Errorf("closing full duplex stream: %v", err) + } + <-d + }) + + t.Run("Unary ping with headers", func(t *testing.T) { + want := "hello, world" + req := &PingRequest{Value: want} + + ctx := metadata.AppendToOutgoingContext(context.Background(), returnHeader, "I like turtles.") + inHeader := make(metadata.MD) + + res, err := client.Ping(ctx, req, grpc.Header(&inHeader)) + if err != nil { + t.Errorf("want no err; got %v", err) + return + } + t.Logf("got %v (%d)", res.Value, res.Counter) + if !reflect.DeepEqual(inHeader.Get(returnHeader), []string{"I like turtles."}) { + t.Errorf("did not receive correct return headers") + } + }) +} + +func checkTrailers(t *testing.T, md metadata.MD) { + vs := md.Get(PingTrailer) + if want, got := 1, len(vs); want != got { + t.Errorf("trailer %q not present", PingTrailer) + return + } + if want, got := []string{PingTrailerCts}, vs; !reflect.DeepEqual(got, want) { + t.Errorf("trailer mismatch; want %q, got %q", want, got) + } +} + +func checkHeaders(t *testing.T, md metadata.MD) { + vs := md.Get(PingHeader) + if want, got := 1, len(vs); want != got { + t.Errorf("header %q not present", PingHeader) + return + } + if want, got := []string{PingHeaderCts}, vs; !reflect.DeepEqual(got, want) { + t.Errorf("header mismatch; want %q, got %q", want, got) + } +} diff --git a/tools.go b/tools.go new file mode 100644 index 0000000..eaba55b --- /dev/null +++ b/tools.go @@ -0,0 +1,7 @@ +//+build tools + +package grpc_proxy + +import ( + _ "honnef.co/go/tools/cmd/staticcheck" +) \ No newline at end of file