diff --git a/src/services/comment/main.go b/src/services/comment/main.go index a1dbced..f11ce9f 100644 --- a/src/services/comment/main.go +++ b/src/services/comment/main.go @@ -9,11 +9,18 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" + "net/http" + "os" + "syscall" ) func main() { @@ -35,10 +42,6 @@ func main() { // Configure Pyroscope profiling.InitPyroscope("GuGoTik.CommentService") - s := grpc.NewServer( - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) - log := logging.LogService(config.CommentRpcServerName) lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.CommentRpcServerPort) @@ -46,6 +49,25 @@ func main() { log.Panicf("Rpc %s listen happens error: %v", config.CommentRpcServerName, err) } + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + reg := prom.Client + reg.MustRegister(srvMetrics) + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) + + if err := consul.RegisterConsul(config.CommentRpcServerName, config.CommentRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.CommentRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.CommentRpcServerName, config.CommentRpcServerPort) + var srv CommentServiceImpl var probe healthImpl.ProbeImpl comment.RegisterCommentServiceServer(s, srv) @@ -55,8 +77,41 @@ func main() { log.Panicf("Rpc %s register consul happens error for: %v", config.CommentRpcServerName, err) } srv.New() - log.Infof("Rpc %s is running at %s now", config.CommentRpcServerName, config.CommentRpcServerPort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.CommentRpcServerName, err) + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.CommentRpcServerName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.CommentRpcServerName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } } diff --git a/src/services/feed/main.go b/src/services/feed/main.go index 9066b6e..6ef19cc 100644 --- a/src/services/feed/main.go +++ b/src/services/feed/main.go @@ -9,11 +9,18 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" + "net/http" + "os" + "syscall" ) func main() { @@ -35,10 +42,6 @@ func main() { // Configure Pyroscope profiling.InitPyroscope("GuGoTik.FeedService") - s := grpc.NewServer( - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) - log := logging.LogService(config.FeedRpcServerName) lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.FeedRpcServerPort) @@ -46,17 +49,69 @@ func main() { log.Panicf("Rpc %s listen happens error: %v", config.FeedRpcServerName, err) } + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + reg := prom.Client + reg.MustRegister(srvMetrics) + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) + + if err := consul.RegisterConsul(config.FeedRpcServerName, config.FeedRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.FeedRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.FeedRpcServerName, config.FeedRpcServerPort) + var srv FeedServiceImpl var probe healthImpl.ProbeImpl feed.RegisterFeedServiceServer(s, srv) health.RegisterHealthServer(s, &probe) defer CloseMQConn() if err := consul.RegisterConsul(config.FeedRpcServerName, config.FeedRpcServerPort); err != nil { - log.Panicf("Rpc %s register consul hanpens error for: %v", config.FeedRpcServerName, err) + log.Panicf("Rpc %s register consul happens error for: %v", config.FeedRpcServerName, err) } srv.New() - log.Infof("Rpc %s is running at %s now", config.FeedRpcServerName, config.FeedRpcServerPort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen hanpens error for: %v", config.FeedRpcServerName, err) + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.FeedRpcServerName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.FeedRpcServerName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } } diff --git a/src/services/user/main.go b/src/services/user/main.go index 9807506..0ce697e 100644 --- a/src/services/user/main.go +++ b/src/services/user/main.go @@ -68,7 +68,7 @@ func main() { ) if err := consul.RegisterConsul(config.UserRpcServerName, config.UserRpcServerPort); err != nil { - log.Panicf("Rpc %s register consul hanpens error for: %v", config.UserRpcServerName, err) + log.Panicf("Rpc %s register consul happens error for: %v", config.UserRpcServerName, err) } log.Infof("Rpc %s is running at %s now", config.UserRpcServerName, config.UserRpcServerPort)