diff --git a/README.md b/README.md new file mode 100644 index 0000000..b32a6ce --- /dev/null +++ b/README.md @@ -0,0 +1,11 @@ +# Optable Package Lib + +`optable-pkglib` is a set of useful packages for serving and maintaing gRPC services. + +## Install + +```sh + +go get github.com/optable/optable-pkglib + +``` diff --git a/lifecycle/serve.go b/lifecycle/serve.go index fef9c97..5d1c26c 100644 --- a/lifecycle/serve.go +++ b/lifecycle/serve.go @@ -70,10 +70,10 @@ func ServeWithGracefulShutdown(ctx context.Context, listen net.Listener, server return shutdownCompleted } -// ServeGrpcAndMetrics behaves like ServeWithGracefulShutdown excepts that it -// also starts a prometheus HTTP1 service on the same Listener to expose +// ServeGRPCAndHTTP behaves like ServeWithGracefulShutdown excepts that it +// also starts an HTTP1 service on the same Listener to expose // metrics. -func ServeGrpcAndMetrics(ctx context.Context, l net.Listener, server *grpc.Server, shutdownTimeout time.Duration) <-chan error { +func ServeGRPCAndHTTP(ctx context.Context, l net.Listener, handler http.Handler, server *grpc.Server, shutdownTimeout time.Duration) <-chan error { errs := make(chan error, 1) go func() { @@ -101,7 +101,7 @@ func ServeGrpcAndMetrics(ctx context.Context, l net.Listener, server *grpc.Serve // Serve requests for the prometheus HTTP metric handler. group.Go(func() error { httpServer := &http.Server{ - Handler: promhttp.Handler(), + Handler: handler, } if err := httpServer.Serve(httpL); err != nil && !isClosedErr(err) { return fmt.Errorf("Failed serving http: %w", err) @@ -130,6 +130,13 @@ func ServeGrpcAndMetrics(ctx context.Context, l net.Listener, server *grpc.Serve return errs } +// ServeGRPCAndMetrics behaves like ServeWithGracefulShutdown excepts that it +// also starts a prometheus HTTP1 service on the same Listener to expose +// metrics. +func ServeGRPCAndMetrics(ctx context.Context, l net.Listener, server *grpc.Server, shutdownTimeout time.Duration) <-chan error { + return ServeGRPCAndHTTP(ctx, l, promhttp.Handler(), server, shutdownTimeout) +} + func isClosedErr(err error) bool { return errors.Is(err, net.ErrClosed) || errors.Is(err, cmux.ErrServerClosed) } diff --git a/service/server.go b/service/server.go index 5228ffa..df8152c 100644 --- a/service/server.go +++ b/service/server.go @@ -14,14 +14,13 @@ import ( "google.golang.org/grpc" ) -// NewGrpcService creates a grpc service with various defaults middlewares. +// NewGRPCService creates a grpc service with various defaults middlewares. // Notably, the logging and metrics are automatically registered for sane // defaults of observability. -func NewGrpcService(ctx context.Context, service interface{}, descriptors []*grpc.ServiceDesc) (*grpc.Server, error) { +func NewGRPCService(ctx context.Context, service interface{}, descriptors []*grpc.ServiceDesc, unaryIntercepts []grpc.UnaryServerInterceptor, streamIntercepts []grpc.StreamServerInterceptor) (*grpc.Server, error) { if len(descriptors) == 0 { return nil, errors.New("Missing descriptors") } - // By using prometheus.DefaultRegister we benefits from the go runtime // defaults metrics and Linux processes metrics. registry := prometheus.DefaultRegisterer @@ -34,19 +33,21 @@ func NewGrpcService(ctx context.Context, service interface{}, descriptors []*grp } logger := zerolog.Ctx(ctx) + defaultStreamInterceptors := []grpc.StreamServerInterceptor{ + logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(*logger)), + metrics.StreamServerInterceptor(m), + recovery.StreamServerInterceptor(), + } + defaultUnaryInterceptors := []grpc.UnaryServerInterceptor{ + logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(*logger)), + metrics.UnaryServerInterceptor(m), + recovery.UnaryServerInterceptor(), + } + + defaultUnaryInterceptors = append(defaultUnaryInterceptors, unaryIntercepts...) + defaultStreamInterceptors = append(defaultStreamInterceptors, streamIntercepts...) - server := grpc.NewServer( - grpc.ChainStreamInterceptor( - logging.StreamServerInterceptor(grpczerolog.InterceptorLogger(*logger)), - metrics.StreamServerInterceptor(m), - recovery.StreamServerInterceptor(), - ), - grpc.ChainUnaryInterceptor( - logging.UnaryServerInterceptor(grpczerolog.InterceptorLogger(*logger)), - metrics.UnaryServerInterceptor(m), - recovery.UnaryServerInterceptor(), - ), - ) + server := grpc.NewServer(grpc.ChainStreamInterceptor(defaultStreamInterceptors...), grpc.ChainUnaryInterceptor(defaultUnaryInterceptors...)) for _, desc := range descriptors { logger.Info().Msgf("Registering grpc service: %s", desc.ServiceName)