From d52ca92be6df8ed3a92caededcd731d8aea6b09f Mon Sep 17 00:00:00 2001 From: Shawn Poulson <92753637+Baliedge@users.noreply.github.com> Date: Mon, 25 Sep 2023 09:48:53 -0400 Subject: [PATCH] gRPC connection pooling `grpcconn` (#178) * gRPC connection pooling `grpcconn` --- .github/workflows/lint.yaml | 2 +- .github/workflows/test.yaml | 1 + .golangci.yml | 33 ++- Makefile | 11 +- callstack/callstack.go | 2 +- errors/context_map.go | 2 +- errors/errors.go | 2 +- errors/example_test.go | 2 +- go.mod | 24 +- go.sum | 55 ++-- grpcconn/grpcconn.go | 489 ++++++++++++++++++++++++++++++++++++ grpcconn/idpool.go | 61 +++++ grpcconn/idpool_test.go | 49 ++++ 13 files changed, 675 insertions(+), 58 deletions(-) create mode 100644 grpcconn/grpcconn.go create mode 100644 grpcconn/idpool.go create mode 100644 grpcconn/idpool_test.go diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index aa95815..f10ef7d 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -30,6 +30,6 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v3 with: - version: v1.52.2 + version: v1.54.2 skip-cache: true # cache/restore is done by actions/setup-go@v3 step args: -v diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 77949b5..0cd5029 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -18,6 +18,7 @@ jobs: go-version: - 1.19.x - 1.20.x + - 1.21.x os: [ ubuntu-latest ] runs-on: ${{ matrix.os }} steps: diff --git a/.golangci.yml b/.golangci.yml index dfa86c5..bf7c8cc 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -5,7 +5,7 @@ linters: disable-all: true enable: - bodyclose # https://github.com/timakin/bodyclose - - depguard + - gomodguard - errcheck # Mandatory. Do not disable. - gocritic - goimports @@ -74,23 +74,20 @@ linters-settings: - shadow - fieldalignment - depguard: - # Kind of list is passed in. - # Allowed values: allowlist|denylist - # Default: denylist - list-type: denylist - # Check the list against standard lib. - # Default: false - include-go-root: true - # A list of packages for the list type specified. - # Default: [] - packages: - - github.com/pkg/errors - # A list of packages for the list type specified. - # Specify an error message to output when a denied package is used. - # Default: [] - packages-with-error-message: - - github.com/pkg/errors: 'Deprecated: use standard "errors" or "github.com/mailgun/holster/v4/errors" instead.' + gomodguard: + blocked: + # List of blocked modules. + # Default: [] + modules: + - github.com/golang/protobuf: + recommendations: + - google.golang.org/protobuf + reason: "see https://developers.google.com/protocol-buffers/docs/reference/go/faq#modules" + - github.com/pkg/errors: + recommendations: + - errors + - github.com/mailgun/errors + reason: "Deprecated" stylecheck: # Select the Go version to target. diff --git a/Makefile b/Makefile index 411400c..739780f 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,12 @@ -GOLINT = $(GOPATH)/bin/golangci-lint +GOLANGCI_LINT = $(GOPATH)/bin/golangci-lint +GOLANGCI_LINT_VERSION = v1.54.1 .PHONY: lint -lint: $(GOLINT) - $(GOLINT) run +lint: $(GOLANGCI_LINT) + $(GOLANGCI_LINT) run -$(GOLINT): - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.52.2 +$(GOLANGCI_LINT): + curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin $(GOLANGCI_LINT_VERSION) .PHONY: test test: diff --git a/callstack/callstack.go b/callstack/callstack.go index 8dfe4c8..4a7ddba 100644 --- a/callstack/callstack.go +++ b/callstack/callstack.go @@ -7,7 +7,7 @@ import ( "strconv" "strings" - pkgerrors "github.com/pkg/errors" //nolint:depguard // Legacy code requires deprecated package. + pkgerrors "github.com/pkg/errors" //nolint:gomodguard // Legacy code requires deprecated package. ) type FrameInfo struct { diff --git a/errors/context_map.go b/errors/context_map.go index 17d476e..b6889d2 100644 --- a/errors/context_map.go +++ b/errors/context_map.go @@ -6,7 +6,7 @@ import ( "io" "github.com/mailgun/holster/v4/callstack" - pkgerrors "github.com/pkg/errors" //nolint:depguard // Legacy code requires deprecated package. + pkgerrors "github.com/pkg/errors" //nolint:gomodguard // Legacy code requires deprecated package. ) // Implements the `error` `causer` and `Contexter` interfaces diff --git a/errors/errors.go b/errors/errors.go index 9cabb26..0223b5d 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -96,7 +96,7 @@ import ( "io" stack "github.com/mailgun/holster/v4/callstack" - pkgerrors "github.com/pkg/errors" //nolint:depguard // Legacy code requires deprecated package. + pkgerrors "github.com/pkg/errors" //nolint:gomodguard // Legacy code requires deprecated package. "github.com/sirupsen/logrus" ) diff --git a/errors/example_test.go b/errors/example_test.go index 9dbf9cd..80da4ec 100644 --- a/errors/example_test.go +++ b/errors/example_test.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/mailgun/holster/v4/errors" - pkgerrors "github.com/pkg/errors" //nolint:depguard // Legacy code requires deprecated package. + pkgerrors "github.com/pkg/errors" //nolint:gomodguard // Legacy code requires deprecated package. ) func ExampleNew() { diff --git a/go.mod b/go.mod index 094f28e..c8bfaa1 100644 --- a/go.mod +++ b/go.mod @@ -11,24 +11,26 @@ require ( github.com/hashicorp/consul/api v1.15.2 github.com/hashicorp/go-hclog v1.3.1 github.com/hashicorp/memberlist v0.5.0 + github.com/mailgun/errors v0.1.5 github.com/miekg/dns v1.1.50 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 github.com/sirupsen/logrus v1.9.2 - github.com/stretchr/testify v1.8.3 + github.com/stretchr/testify v1.8.4 github.com/uptrace/opentelemetry-go-extra/otellogrus v0.2.1 go.etcd.io/etcd/api/v3 v3.5.5 go.etcd.io/etcd/client/v3 v3.5.5 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.43.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 - go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel v1.17.0 go.opentelemetry.io/otel/exporters/jaeger v1.16.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.16.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 - go.opentelemetry.io/otel/trace v1.16.0 - golang.org/x/net v0.8.0 - google.golang.org/grpc v1.55.0 + go.opentelemetry.io/otel/trace v1.17.0 + golang.org/x/net v0.14.0 + google.golang.org/grpc v1.57.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -71,16 +73,18 @@ require ( github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.1 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 // indirect - go.opentelemetry.io/otel/metric v1.16.0 // indirect + go.opentelemetry.io/otel/metric v1.17.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/mod v0.8.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/sys v0.11.0 // indirect + golang.org/x/text v0.12.0 // indirect golang.org/x/tools v0.6.0 // indirect - google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect - google.golang.org/protobuf v1.30.0 // indirect + google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect ) diff --git a/go.sum b/go.sum index b497500..223271f 100644 --- a/go.sum +++ b/go.sum @@ -13,12 +13,15 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= +cloud.google.com/go v0.110.0 h1:Zc8gqp3+a9/Eyph2KDmcGaPtbKRIoqq4YTlL4NMD0Ys= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg= cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= +cloud.google.com/go/compute v1.19.1 h1:am86mquDUgjGNWxiGn+5PGLbmgiWXlE/yNWpIpNvuXY= +cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= @@ -80,6 +83,7 @@ github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWH github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20230607035331-e9ce68804cb4 h1:/inchEIKaYC1Akx+H+gqO04wryn5h75LSazbRlnya1k= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= @@ -97,6 +101,7 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.m github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byAbud7miNWJ1WwEVf8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= @@ -272,6 +277,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mailgun/errors v0.1.5 h1:riRpZqfUKTdc8saXvoEg2tYkbRyZESU1KvQ3UxPbdus= +github.com/mailgun/errors v0.1.5/go.mod h1:lw+Nh4r/aoUTz6uK915FdfZJo3yq60gPiflFHNpK4NQ= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= @@ -373,8 +380,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= -github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/uptrace/opentelemetry-go-extra/otellogrus v0.2.1 h1:klz16Hi1ydcI/AkFCbgxXvqwfKNPb+/EVHMlu1PdEKo= github.com/uptrace/opentelemetry-go-extra/otellogrus v0.2.1/go.mod h1:CufwvpLoGqj/uJFKsxBy09MKEM/o9QqaWjkB4RnFVdI= @@ -396,10 +403,12 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.43.0 h1:7XZai4VhA473clBrOqqHdjHBImGfyEtv0qW4nnn/kAo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.43.0/go.mod h1:1WpsUwjQrUJSNugfMlPn0rPRJ9Do7wwBgTBPK7MLiS4= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= -go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= -go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel v1.17.0 h1:MW+phZ6WZ5/uk2nd93ANk/6yJ+dVrvNWUjGhnnFU5jM= +go.opentelemetry.io/otel v1.17.0/go.mod h1:I2vmBGtFaODIVMBSTPVDlJSzBDNf93k60E6Ft0nyjo0= go.opentelemetry.io/otel/exporters/jaeger v1.16.0 h1:YhxxmXZ011C0aDZKoNw+juVWAmEfv/0W2XBOv9aHTaA= go.opentelemetry.io/otel/exporters/jaeger v1.16.0/go.mod h1:grYbBo/5afWlPpdPZYhyn78Bk04hnvxn2+hvxQhKIQM= go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.16.0 h1:t4ZwRPU+emrcvM2e9DHd0Fsf0JTPVcbfa/BhTDF03d0= @@ -410,12 +419,12 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0 h1:TVQp/ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.16.0/go.mod h1:I33vtIe0sR96wfrUcilIzLoA3mLHhRmz9S9Te0S3gDo= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0 h1:iqjq9LAB8aK++sKVcELezzn655JnBNdsDhghU4G/So8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.16.0/go.mod h1:hGXzO5bhhSHZnKvrDaXB82Y9DRFour0Nz/KrBh7reWw= -go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= -go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/metric v1.17.0 h1:iG6LGVz5Gh+IuO0jmgvpTB6YVrCGngi8QGm+pMd8Pdc= +go.opentelemetry.io/otel/metric v1.17.0/go.mod h1:h4skoxdZI17AxwITdmdZjjYJQH5nzijUUjm+wtPph5o= go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= -go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= -go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= +go.opentelemetry.io/otel/trace v1.17.0 h1:/SWhSRHmDPOImIAetP1QAeMnZYiQXrTy4fMMYOdSKWQ= +go.opentelemetry.io/otel/trace v1.17.0/go.mod h1:I/4vKTgFclIsXRVucpH25X0mpFSczM7aHeaz0ZBLWjY= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= @@ -509,8 +518,8 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= -golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -519,6 +528,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc= +golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -587,8 +597,8 @@ golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -599,8 +609,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -679,6 +689,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -711,8 +722,12 @@ google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 h1:DdoeryqhaXp1LtT/emMP1BRJPHHKFi5akj/nbx/zNTA= -google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4/go.mod h1:NWraEVixdDnqcqQ30jipen1STv2r/n24Wb7twVTGR4s= +google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54 h1:9NWlQfY2ePejTmfwUH1OWwmznFa+0kKcHGPDvcPza9M= +google.golang.org/genproto v0.0.0-20230526161137-0005af68ea54/go.mod h1:zqTuNwFlFRsw5zIts5VnzLQxSRqh+CGOTVMlYbY0Eyk= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:m8v1xLLLzMe1m5P+gCTF8nJB9epwZQUBERm20Oy1poQ= +google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e h1:NumxXLPfHSndr3wBBdeKiVHjGVFzi9RX2HwwQke94iY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230526203410-71b5a4ffd15e/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -731,8 +746,8 @@ google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.41.0/go.mod h1:U3l9uK9J0sini8mHphKoXyaqDA/8VyGnDee1zzIUK6k= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag= -google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -746,8 +761,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/grpcconn/grpcconn.go b/grpcconn/grpcconn.go new file mode 100644 index 0000000..9fbe3af --- /dev/null +++ b/grpcconn/grpcconn.go @@ -0,0 +1,489 @@ +package grpcconn + +// gRPC connection pooling + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + + "github.com/mailgun/errors" + "github.com/mailgun/holster/v4/clock" + "github.com/mailgun/holster/v4/setter" + "github.com/mailgun/holster/v4/tracing" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + grpcCodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + grpcStatus "google.golang.org/grpc/status" +) + +const ( + minConnectionTTL = 10 * clock.Second + defaultConnPoolCapacity = 16 + defaultNumConnections = 1 +) + +var ( + ErrConnMgrClosed = errors.New("connection manager closed") + errConnPoolEmpty = errors.New("connection pool empty") + + MetricGRPCConnections = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "grpcconn_connections", + Help: "The number of gRPC connections used by grpcconn.", + }, []string{"remote_service", "peer"}) +) + +type Config struct { + RPCTimeout clock.Duration + BackOffTimeout clock.Duration + Zone string + OverrideHostHeader string + + // NumConnections is the number of client connections to establish + // per target endpoint + // + // NOTE: A single GRPC client opens a maximum of 100 HTTP/2 Connections + // to an endpoint. Once those connections are saturated, it will queue + // requests to be delivered once there is availability. The recommended + // method of overcoming this limitation is establishing multiple GPRC client + // connections. See https://grpc.io/docs/guides/performance/ + NumConnections int +} + +// ConnFactory creates gRPC client objects. +type ConnFactory[T any] interface { + NewGRPCClient(cc grpc.ClientConnInterface) T + GetServerListURL() string + ServiceName() string + ShouldDisposeOfConn(err error) bool +} + +// ConnMgr automates gRPC `Connection` pooling. This is necessary for use +// cases requiring frequent stream creation and high stream concurrency to +// avoid reaching the default 100 stream per connection limit. +// ConnMgr resolves gRPC instance endpoints and connects to them. Both +// resolution and connection is performed on the background allowing any number +// of concurrent AcquireConn to result in only one reconnect event. +type ConnMgr[T any] struct { + cfg *Config + getEndpointsURL string + connFactory ConnFactory[T] + httpClt *http.Client + reconnectCh chan struct{} + ctx context.Context + cancel context.CancelFunc + closeWG sync.WaitGroup + idPool *IDPool + + connPoolMu sync.RWMutex + connPool []*Conn[T] + nextConnPivot uint64 + connectedCh chan struct{} +} + +type Conn[T any] struct { + inner *grpc.ClientConn + client T + counter int + broken bool + id ID +} + +func (c *Conn[T]) Client() T { return c.client } +func (c *Conn[T]) Target() string { return c.inner.Target() } +func (c *Conn[T]) ID() string { return c.id.String() } + +// NewConnMgr instantiates a connection manager that maintains a gRPC +// connection pool. +func NewConnMgr[T any](cfg *Config, httpClient *http.Client, connFactory ConnFactory[T]) *ConnMgr[T] { + // This ensures NumConnections is always at least 1 + setter.SetDefault(&cfg.NumConnections, defaultNumConnections) + gc := ConnMgr[T]{ + cfg: cfg, + getEndpointsURL: connFactory.GetServerListURL() + "?zone=" + cfg.Zone, + connFactory: connFactory, + httpClt: httpClient, + reconnectCh: make(chan struct{}, 1), + connPool: make([]*Conn[T], 0, defaultConnPoolCapacity), + idPool: NewIDPool(), + } + gc.ctx, gc.cancel = context.WithCancel(context.Background()) + gc.closeWG.Add(1) + go gc.run() + return &gc +} + +func (cm *ConnMgr[T]) AcquireConn(ctx context.Context) (_ *Conn[T], err error) { + ctx = tracing.StartScope(ctx) + defer func() { + tracing.EndScope(ctx, err) + }() + + for { + // If the connection manager is already closed then return an error. + if cm.ctx.Err() != nil { + return nil, ErrConnMgrClosed + } + cm.connPoolMu.Lock() + // Increment the connection index pivot to ensure that we select a + // different connection every time when the load distribution is even. + cm.nextConnPivot++ + // Select the least loaded connection. + connPoolSize := len(cm.connPool) + var leastLoadedConn *Conn[T] + if connPoolSize > 0 { + currConnIdx := cm.nextConnPivot % uint64(connPoolSize) + leastLoadedConn = cm.connPool[currConnIdx] + for i := 1; i < connPoolSize; i++ { + currConnIdx = (cm.nextConnPivot + uint64(i)) % uint64(connPoolSize) + currConn := cm.connPool[currConnIdx] + if currConn.counter < leastLoadedConn.counter { + leastLoadedConn = currConn + } + } + } + // If a least loaded connection is selected, then return it. + if leastLoadedConn != nil { + leastLoadedConn.counter++ + cm.connPoolMu.Unlock() + return leastLoadedConn, nil + } + // We have got nothing to offer, let's refresh the connection pool to + // get more connections. + connectedCh := cm.ensureReconnect() + cm.connPoolMu.Unlock() + // Wait for the connection pool to be refreshed on the background or + // the operation timeout elapsing. + select { + case <-connectedCh: + continue + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +func (cm *ConnMgr[T]) ensureReconnect() chan struct{} { + if cm.connectedCh != nil { + return cm.connectedCh + } + cm.connectedCh = make(chan struct{}) + select { + case cm.reconnectCh <- struct{}{}: + default: + } + return cm.connectedCh +} + +func (cm *ConnMgr[T]) ReleaseConn(conn *Conn[T], err error) bool { + cm.connPoolMu.Lock() + removedFromPool := false + connPoolSize := len(cm.connPool) + if cm.shouldDisposeOfConn(conn, err) { + conn.broken = true + // Remove the connection from the pool. + for i, currConn := range cm.connPool { + if currConn != conn { + continue + } + copy(cm.connPool[i:], cm.connPool[i+1:]) + lastIdx := len(cm.connPool) - 1 + cm.connPool[lastIdx] = nil + cm.connPool = cm.connPool[:lastIdx] + removedFromPool = true + connPoolSize = len(cm.connPool) + cm.idPool.Release(conn.id) + MetricGRPCConnections.WithLabelValues(cm.connFactory.ServiceName(), conn.Target()).Dec() + break + } + cm.ensureReconnect() + } + conn.counter-- + closeConn := false + if conn.broken && conn.counter < 1 { + closeConn = true + } + cm.connPoolMu.Unlock() + + if removedFromPool { + logrus.WithError(err).Warnf("Server removed from %s pool: %s, poolSize=%d, reason=%s", + cm.connFactory.ServiceName(), conn.Target(), connPoolSize, err) + } + if closeConn { + _ = conn.inner.Close() + logrus.Warnf("Disconnected from %s server %s", cm.connFactory.ServiceName(), conn.Target()) + return true + } + return false +} + +func (cm *ConnMgr[T]) shouldDisposeOfConn(conn *Conn[T], err error) bool { + if conn.broken { + return false + } + if err == nil { + return false + } + + rootErr := errors.Cause(err) + if errors.Is(rootErr, context.Canceled) { + return false + } + if errors.Is(rootErr, context.DeadlineExceeded) { + return false + } + switch grpcStatus.Code(err) { + case grpcCodes.Canceled: + return false + case grpcCodes.DeadlineExceeded: + return false + } + + return cm.connFactory.ShouldDisposeOfConn(rootErr) +} + +func (cm *ConnMgr[T]) Close() { + cm.cancel() + cm.closeWG.Wait() +} + +func (cm *ConnMgr[T]) run() { + defer func() { + cm.connPoolMu.Lock() + for i, conn := range cm.connPool { + _ = conn.inner.Close() + cm.connPool[i] = nil + cm.idPool.Release(conn.id) + MetricGRPCConnections.WithLabelValues(cm.connFactory.ServiceName(), conn.Target()).Dec() + } + cm.connPool = cm.connPool[:0] + if cm.connectedCh != nil { + close(cm.connectedCh) + cm.connectedCh = nil + } + cm.connPoolMu.Unlock() + cm.closeWG.Done() + }() + var nilOrReconnectCh <-chan clock.Time + for { + select { + case <-nilOrReconnectCh: + case <-cm.reconnectCh: + logrus.Info("Force connection pool refresh") + case <-cm.ctx.Done(): + return + } + reconnectPeriod, err := cm.refreshConnPool() + if err != nil { + // If the client is closing, then return immediately. + if errors.Is(err, context.Canceled) { + return + } + logrus.WithError(err).Errorf("Failed to refresh connection pool") + reconnectPeriod = cm.cfg.BackOffTimeout + } + // If a server returns zero TTL it means that periodic server list + // refreshes should be disabled. + if reconnectPeriod > 0 { + nilOrReconnectCh = clock.After(reconnectPeriod) + } + } +} + +func (cm *ConnMgr[T]) refreshConnPool() (clock.Duration, error) { + begin := clock.Now() + ctx, cancel := context.WithTimeout(cm.ctx, cm.cfg.RPCTimeout) + defer cancel() + + getGRPCEndpointRs, err := cm.getServerEndpoints(ctx) + if err != nil { + return 0, errors.Wrap(err, "while getting gRPC endpoints") + } + + // Adjust TTL to be a reasonable value. Zero disables periodic refreshes. + ttl := clock.Duration(getGRPCEndpointRs.TTL) * clock.Second + if ttl <= 0 { + ttl = 0 + } else if ttl < minConnectionTTL { + ttl = minConnectionTTL + } + + newConnCount := 0 + crossZoneCount := 0 + logrus.Infof("Connecting to %d %s servers", len(getGRPCEndpointRs.Servers), cm.connFactory.ServiceName()) + for _, serverSpec := range getGRPCEndpointRs.Servers { + if serverSpec.Zone != cm.cfg.Zone { + crossZoneCount++ + } + // Do we have the correct number of connections for this serverSpec in the pool? + activeConnections := cm.countConnections(serverSpec.Endpoint) + if activeConnections >= cm.cfg.NumConnections { + continue + } + + for i := 0; i < (cm.cfg.NumConnections - activeConnections); i++ { + conn, err := cm.newConnection(serverSpec.Endpoint) + if err != nil { + // If the client is closing, then return immediately. + if errors.Is(err, context.Canceled) { + return 0, err + } + logrus.WithError(err).Errorf("Failed to dial %s server: %s", + cm.connFactory.ServiceName(), serverSpec.Endpoint) + break + } + + // Add the connection to the pool and notify + // goroutines waiting for a connection. + cm.connPoolMu.Lock() + cm.connPool = append(cm.connPool, conn) + if cm.connectedCh != nil { + close(cm.connectedCh) + cm.connectedCh = nil + } + MetricGRPCConnections.WithLabelValues(cm.connFactory.ServiceName(), conn.Target()).Inc() + cm.connPoolMu.Unlock() + newConnCount++ + logrus.Infof("Connected to %s server: %s, zone=%s", cm.connFactory.ServiceName(), serverSpec.Endpoint, serverSpec.Zone) + } + } + cm.connPoolMu.Lock() + connPoolSize := len(cm.connPool) + // If there has been no new connection established but the pool is not + // empty then trigger the requested connected notification anyway. + if connPoolSize > 0 && cm.connectedCh != nil { + close(cm.connectedCh) + cm.connectedCh = nil + } + cm.connPoolMu.Unlock() + took := clock.Since(begin).Truncate(clock.Millisecond) + logrus.Warnf("Connection pool refreshed: took=%s, zone=%s, poolSize=%d, newConnCount=%d, knownServerCount=%d, crossZoneCount=%d, ttl=%s", + took, cm.cfg.Zone, connPoolSize, newConnCount, len(getGRPCEndpointRs.Servers), crossZoneCount, ttl) + if connPoolSize < 1 { + return 0, errConnPoolEmpty + } + return ttl, nil +} + +// countConnections returns the total number of connections in the pool for the provided endpoint. +func (cm *ConnMgr[T]) countConnections(endpoint string) (count int) { + cm.connPoolMu.RLock() + defer cm.connPoolMu.RUnlock() + for i := 0; i < len(cm.connPool); i++ { + if cm.connPool[i].Target() == endpoint { + count++ + } + } + return count +} + +// newConnection establishes a new GRPC connection to the provided endpoint +func (cm *ConnMgr[T]) newConnection(endpoint string) (*Conn[T], error) { + // Establish a connection with the server. + ctx, cancel := context.WithTimeout(cm.ctx, cm.cfg.RPCTimeout) + opts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), + grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()), + } + grpcConn, err := grpc.DialContext(ctx, endpoint, opts...) + cancel() + if err != nil { + return nil, err + } + id := cm.idPool.Allocate() + return &Conn[T]{ + inner: grpcConn, + client: cm.connFactory.NewGRPCClient(grpcConn), + id: id, + }, nil +} + +func (cm *ConnMgr[T]) getServerEndpoints(ctx context.Context) (*GetGRPCEndpointsRs, error) { + rq, err := http.NewRequestWithContext(ctx, "GET", cm.getEndpointsURL, http.NoBody) + if err != nil { + return nil, errors.Wrap(err, "during request") + } + + // Override the host header if provided in config + if cm.cfg.OverrideHostHeader != "" { + rq.Host = cm.cfg.OverrideHostHeader + } + + rs, err := cm.httpClt.Do(rq) + if err != nil { + return nil, errors.Stack(err) + } + defer rs.Body.Close() + + if rs.StatusCode != http.StatusOK { + return nil, errFromResponse(rs) + } + var rsBody GetGRPCEndpointsRs + if err := readResponseBody(rs, &rsBody); err != nil { + return nil, errors.Wrap(err, "while unmarshalling response") + } + return &rsBody, nil +} + +type GenericResponse struct { + Msg string `json:"message"` +} + +func errFromResponse(rs *http.Response) error { + body, err := io.ReadAll(rs.Body) + if err != nil { + return fmt.Errorf("HTTP request error, status=%s", rs.Status) + } + defer rs.Body.Close() + var rsBody GenericResponse + if err := json.Unmarshal(body, &rsBody); err != nil { + return errors.Wrapf(err, "HTTP request error, status=%s, body=%s", rs.Status, body) + } + return fmt.Errorf("HTTP request error, status=%s, message=%s", rs.Status, rsBody.Msg) +} + +// TransCountInTests returns the total number of pending read/write operations. +// It is only supposed to be used in tests, hence it is not exposed in Client +// interface. +func (cm *ConnMgr[T]) TransCountInTests() int { + transCount := 0 + cm.connPoolMu.RLock() + for _, serverConn := range cm.connPool { + transCount += serverConn.counter + } + cm.connPoolMu.RUnlock() + return transCount +} + +type GetGRPCEndpointsRs struct { + Servers []ServerSpec `json:"servers"` + TTL int `json:"ttl"` + // FIXME: Remove the following fields once all clients are upgraded. + Endpoint string `json:"grpc_endpoint"` + Zone string `json:"zone"` +} + +type ServerSpec struct { + Endpoint string `json:"endpoint"` + Zone string `json:"zone"` + Timestamp clock.Time `json:"timestamp"` +} + +func readResponseBody(rs *http.Response, body interface{}) error { + bodyBytes, err := io.ReadAll(rs.Body) + if err != nil { + return errors.Wrap(err, "while reading response") + } + if err := json.Unmarshal(bodyBytes, &body); err != nil { + return errors.Wrapf(err, "while parsing response %s", bodyBytes) + } + return nil +} diff --git a/grpcconn/idpool.go b/grpcconn/idpool.go new file mode 100644 index 0000000..5c5f2c9 --- /dev/null +++ b/grpcconn/idpool.go @@ -0,0 +1,61 @@ +package grpcconn + +import ( + "strconv" + "sync" +) + +// IDPool maintains a pool of ID values that can be released and reused. +// This is handy to keep cardinality low as gRPC connections are periodically +// released and reconnected and require an id. +// This solves the problem of infinitely incrementing ids used in Prometheus +// metric labels causing infinite growth of historical metric values. +type IDPool struct { + pool map[ID]bool + allocated int + allocMu sync.RWMutex +} + +type ID int + +func NewIDPool() *IDPool { + return &IDPool{ + pool: make(map[ID]bool), + } +} + +func (i *IDPool) Allocate() ID { + i.allocMu.Lock() + defer i.allocMu.Unlock() + + for id, allocFlag := range i.pool { + if allocFlag { + continue + } + + i.allocated++ + i.pool[id] = true + return id + } + + // Dynamically expand pool. + newID := ID(len(i.pool) + 1) + i.pool[newID] = true + return newID +} + +func (i *IDPool) Release(id ID) { + i.allocMu.Lock() + defer i.allocMu.Unlock() + + if allocFlag, ok := i.pool[id]; !ok || !allocFlag { + return + } + + i.allocated-- + i.pool[id] = false +} + +func (id ID) String() string { + return strconv.Itoa(int(id)) +} diff --git a/grpcconn/idpool_test.go b/grpcconn/idpool_test.go new file mode 100644 index 0000000..3366f26 --- /dev/null +++ b/grpcconn/idpool_test.go @@ -0,0 +1,49 @@ +package grpcconn_test + +import ( + "testing" + + "github.com/mailgun/holster/v4/grpcconn" + "github.com/stretchr/testify/assert" +) + +func TestIDPool(t *testing.T) { + t.Run("Allocate and release", func(t *testing.T) { + pool := grpcconn.NewIDPool() + id := pool.Allocate() + assert.Equal(t, 1, int(id)) + pool.Release(id) + }) + + t.Run("Redundant release id ignored", func(t *testing.T) { + pool := grpcconn.NewIDPool() + id := pool.Allocate() + pool.Release(id) + pool.Release(id) + }) + + t.Run("Allocate and release all", func(t *testing.T) { + const poolSize = 10 + pool := grpcconn.NewIDPool() + var ids []grpcconn.ID + + // Allocate all. + for i := 0; i < poolSize; i++ { + id := pool.Allocate() + assert.Equal(t, i+1, int(id)) + ids = append(ids, id) + } + + // Release all. + for _, id := range ids { + pool.Release(id) + } + + // Allocate all again. + for i := 0; i < poolSize; i++ { + id := pool.Allocate() + // Expect id to reuse original pool of ids. + assert.LessOrEqual(t, int(id), poolSize) + } + }) +}