diff --git a/src/services/favorite/main.go b/src/services/favorite/main.go index 0796673..bda348d 100644 --- a/src/services/favorite/main.go +++ b/src/services/favorite/main.go @@ -9,8 +9,15 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "net" + "net/http" + "os" + "syscall" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -34,34 +41,78 @@ func main() { }() // Configure Pyroscope - profiling.InitPyroscope("GuGoTik.LikeService") - - s := grpc.NewServer( - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) + profiling.InitPyroscope("GuGoTik.FavoriteService") log := logging.LogService(config.FavoriteRpcServerName) - lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.FavoriteRpcServerPort) if err != nil { log.Panicf("Rpc %s listen happens error: %v", config.FavoriteRpcServerName, err) } + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + reg := prom.Client + reg.MustRegister(srvMetrics) + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) + + if err := consul.RegisterConsul(config.FavoriteRpcServerName, config.FavoriteRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.FavoriteRpcServerName, config.FavoriteRpcServerPort) + var srv FavoriteServiceServerImpl var probe healthImpl.ProbeImpl - favorite.RegisterFavoriteServiceServer(s, srv) - health.RegisterHealthServer(s, &probe) defer CloseMQConn() if err := consul.RegisterConsul(config.FavoriteRpcServerName, config.FavoriteRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.FavoriteRpcServerName, err) } - srv.New() - log.Infof("Rpc %s is running at %s now", config.FavoriteRpcServerName, config.FavoriteRpcServerPort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.FavoriteRpcServerName, err) + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.FavoriteRpcServerName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.FavoriteRpcServerName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } } diff --git a/src/services/message/main.go b/src/services/message/main.go index 7470a0f..235631e 100644 --- a/src/services/message/main.go +++ b/src/services/message/main.go @@ -9,8 +9,15 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "net" + "net/http" + "os" + "syscall" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -36,31 +43,74 @@ func main() { // Configure Pyroscope profiling.InitPyroscope("GuGoTik.ChatService") - s := grpc.NewServer( - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) - log := logging.LogService(config.MessageRpcServerName) - lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.MessageRpcServerPort) if err != nil { log.Panicf("Rpc %s listen happens error: %v", config.MessageRpcServerName, err) } - var srv MessageServiceImpl - var probe healthImpl.ProbeImpl + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) - chat.RegisterChatServiceServer(s, srv) + reg := prom.Client + reg.MustRegister(srvMetrics) - health.RegisterHealthServer(s, &probe) + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) if err := consul.RegisterConsul(config.MessageRpcServerName, config.MessageRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.MessageRpcServerName, err) } - srv.New() log.Infof("Rpc %s is running at %s now", config.MessageRpcServerName, config.MessageRpcServerPort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err) + + var srv MessageServiceImpl + var probe healthImpl.ProbeImpl + chat.RegisterChatServiceServer(s, srv) + health.RegisterHealthServer(s, &probe) + + srv.New() + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.MessageRpcServerName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.MessageRpcServerName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } } diff --git a/src/services/publish/main.go b/src/services/publish/main.go index 9648fb9..d8a9ab9 100644 --- a/src/services/publish/main.go +++ b/src/services/publish/main.go @@ -9,11 +9,18 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" + "net/http" + "os" + "syscall" ) func main() { @@ -35,13 +42,6 @@ func main() { // Configure Pyroscope profiling.InitPyroscope("GuGoTik.PublishService") - maxSize := 500 * 1024 * 1024 - s := grpc.NewServer( - grpc.MaxRecvMsgSize(maxSize), - grpc.MaxSendMsgSize(maxSize), - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) - log := logging.LogService(config.PublishRpcServerName) lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.PublishRpcServerPort) @@ -49,17 +49,69 @@ func main() { log.Panicf("Rpc %s listen happens error: %v", config.PublishRpcServerName, err) } + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + reg := prom.Client + reg.MustRegister(srvMetrics) + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) + + if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort) + var srv PublishServiceImpl var probe healthImpl.ProbeImpl - defer CloseMQConn() publish.RegisterPublishServiceServer(s, srv) health.RegisterHealthServer(s, &probe) + defer CloseMQConn() if err := consul.RegisterConsul(config.PublishRpcServerName, config.PublishRpcServerPort); err != nil { log.Panicf("Rpc %s register consul happens error for: %v", config.PublishRpcServerName, err) } srv.New() - log.Infof("Rpc %s is running at %s now", config.PublishRpcServerName, config.PublishRpcServerPort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.PublishRpcServerName, err) + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.PublishRpcServerName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.PublishRpcServerName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } } diff --git a/src/services/recommend/main.go b/src/services/recommend/main.go index 98eaaeb..1311824 100644 --- a/src/services/recommend/main.go +++ b/src/services/recommend/main.go @@ -9,11 +9,18 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" + "net/http" + "os" + "syscall" ) func main() { @@ -35,10 +42,6 @@ func main() { // Configure Pyroscope profiling.InitPyroscope("GuGoTik.RecommendService") - s := grpc.NewServer( - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) - log := logging.LogService(config.RecommendRpcServiceName) lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.RecommendRpcServicePort) @@ -46,16 +49,67 @@ func main() { log.Panicf("Rpc %s listen happens error: %v", config.RecommendRpcServiceName, err) } + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + + reg := prom.Client + reg.MustRegister(srvMetrics) + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) + + if err := consul.RegisterConsul(config.RecommendRpcServiceName, config.RecommendRpcServicePort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.RecommendRpcServiceName, err) + } + log.Infof("Rpc %s is running at %s now", config.RecommendRpcServiceName, config.RecommendRpcServicePort) + var srv RecommendServiceImpl var probe healthImpl.ProbeImpl recommend.RegisterRecommendServiceServer(s, srv) health.RegisterHealthServer(s, &probe) - if err := consul.RegisterConsul(config.RecommendRpcServiceName, config.RecommendRpcServicePort); err != nil { - log.Panicf("Rpc %s register consul happens error for: %v", config.RecommendRpcServiceName, err) - } + srv.New() - log.Infof("Rpc %s is running at %s now", config.RecommendRpcServiceName, config.RecommendRpcServicePort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.RecommendRpcServiceName, err) + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.RecommendRpcServiceName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.RecommendRpcServiceName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } } diff --git a/src/services/relation/main.go b/src/services/relation/main.go index e640a8e..b761256 100644 --- a/src/services/relation/main.go +++ b/src/services/relation/main.go @@ -9,11 +9,18 @@ import ( healthImpl "GuGoTik/src/services/health" "GuGoTik/src/utils/consul" "GuGoTik/src/utils/logging" + "GuGoTik/src/utils/prom" "context" + grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "net" + "net/http" + "os" + "syscall" ) func main() { @@ -35,10 +42,6 @@ func main() { // Configure Pyroscope profiling.InitPyroscope("GuGoTik.RelationService") - s := grpc.NewServer( - grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - ) - log := logging.LogService(config.RelationRpcServerName) lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.RelationRpcServerPort) @@ -46,16 +49,67 @@ func main() { log.Panicf("Rpc %s listen happens error: %v", config.RelationRpcServerName, err) } + srvMetrics := grpcprom.NewServerMetrics( + grpcprom.WithServerHandlingTimeHistogram( + grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}), + ), + ) + + reg := prom.Client + reg.MustRegister(srvMetrics) + + s := grpc.NewServer( + grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()), + grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))), + ) + + if err := consul.RegisterConsul(config.RelationRpcServerName, config.RelationRpcServerPort); err != nil { + log.Panicf("Rpc %s register consul happens error for: %v", config.RelationRpcServerName, err) + } + log.Infof("Rpc %s is running at %s now", config.RelationRpcServerName, config.RelationRpcServerPort) + var srv RelationServiceImpl var probe healthImpl.ProbeImpl relation.RegisterRelationServiceServer(s, srv) health.RegisterHealthServer(s, &probe) - if err := consul.RegisterConsul(config.RelationRpcServerName, config.RelationRpcServerPort); err != nil { - log.Panicf("Rpc %s register consul happens error for: %v", config.RelationRpcServerName, err) - } + srv.New() - log.Infof("Rpc %s is running at %s now", config.RelationRpcServerName, config.RelationRpcServerPort) - if err := s.Serve(lis); err != nil { - log.Panicf("Rpc %s listen happens error for: %v", config.RelationRpcServerName, err) + srvMetrics.InitializeMetrics(s) + + g := &run.Group{} + g.Add(func() error { + return s.Serve(lis) + }, func(err error) { + s.GracefulStop() + s.Stop() + log.Errorf("Rpc %s listen happens error for: %v", config.RelationRpcServerName, err) + }) + + httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics} + g.Add(func() error { + m := http.NewServeMux() + m.Handle("/metrics", promhttp.HandlerFor( + reg, + promhttp.HandlerOpts{ + EnableOpenMetrics: true, + }, + )) + httpSrv.Handler = m + log.Infof("Promethus now running") + return httpSrv.ListenAndServe() + }, func(error) { + if err := httpSrv.Close(); err != nil { + log.Errorf("Prometheus %s listen happens error for: %v", config.RelationRpcServerName, err) + } + }) + + g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM)) + + if err := g.Run(); err != nil { + log.WithFields(logrus.Fields{ + "err": err, + }).Errorf("Error when runing http server") + os.Exit(1) } }