Skip to content

Commit

Permalink
Merge pull request #203 from Attack825/dev
Browse files Browse the repository at this point in the history
feature: add feed and comment metrics
  • Loading branch information
liaosunny123 authored Aug 30, 2023
2 parents f4588e1 + e40c105 commit 0a659a2
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 16 deletions.
69 changes: 62 additions & 7 deletions src/services/comment/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ import (
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
"context"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"net"
"net/http"
"os"
"syscall"
)

func main() {
Expand All @@ -35,17 +42,32 @@ func main() {
// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.CommentService")

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.CommentRpcServerName)
lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.CommentRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.CommentRpcServerName, err)
}

srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg := prom.Client
reg.MustRegister(srvMetrics)

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
)

if err := consul.RegisterConsul(config.CommentRpcServerName, config.CommentRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.CommentRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.CommentRpcServerName, config.CommentRpcServerPort)

var srv CommentServiceImpl
var probe healthImpl.ProbeImpl
comment.RegisterCommentServiceServer(s, srv)
Expand All @@ -55,8 +77,41 @@ func main() {
log.Panicf("Rpc %s register consul happens error for: %v", config.CommentRpcServerName, err)
}
srv.New()
log.Infof("Rpc %s is running at %s now", config.CommentRpcServerName, config.CommentRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen happens error for: %v", config.CommentRpcServerName, err)
srvMetrics.InitializeMetrics(s)

g := &run.Group{}
g.Add(func() error {
return s.Serve(lis)
}, func(err error) {
s.GracefulStop()
s.Stop()
log.Errorf("Rpc %s listen happens error for: %v", config.CommentRpcServerName, err)
})

httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics}
g.Add(func() error {
m := http.NewServeMux()
m.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
))
httpSrv.Handler = m
log.Infof("Promethus now running")
return httpSrv.ListenAndServe()
}, func(error) {
if err := httpSrv.Close(); err != nil {
log.Errorf("Prometheus %s listen happens error for: %v", config.CommentRpcServerName, err)
}
})

g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM))

if err := g.Run(); err != nil {
log.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when runing http server")
os.Exit(1)
}
}
71 changes: 63 additions & 8 deletions src/services/feed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,18 @@ import (
healthImpl "GuGoTik/src/services/health"
"GuGoTik/src/utils/consul"
"GuGoTik/src/utils/logging"
"GuGoTik/src/utils/prom"
"context"
grpcprom "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"net"
"net/http"
"os"
"syscall"
)

func main() {
Expand All @@ -35,28 +42,76 @@ func main() {
// Configure Pyroscope
profiling.InitPyroscope("GuGoTik.FeedService")

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
)

log := logging.LogService(config.FeedRpcServerName)
lis, err := net.Listen("tcp", config.EnvCfg.PodIpAddr+config.FeedRpcServerPort)

if err != nil {
log.Panicf("Rpc %s listen happens error: %v", config.FeedRpcServerName, err)
}

srvMetrics := grpcprom.NewServerMetrics(
grpcprom.WithServerHandlingTimeHistogram(
grpcprom.WithHistogramBuckets([]float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}),
),
)
reg := prom.Client
reg.MustRegister(srvMetrics)

s := grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.ChainUnaryInterceptor(srvMetrics.UnaryServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
grpc.ChainStreamInterceptor(srvMetrics.StreamServerInterceptor(grpcprom.WithExemplarFromContext(prom.ExtractContext))),
)

if err := consul.RegisterConsul(config.FeedRpcServerName, config.FeedRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul happens error for: %v", config.FeedRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.FeedRpcServerName, config.FeedRpcServerPort)

var srv FeedServiceImpl
var probe healthImpl.ProbeImpl
feed.RegisterFeedServiceServer(s, srv)
health.RegisterHealthServer(s, &probe)
defer CloseMQConn()
if err := consul.RegisterConsul(config.FeedRpcServerName, config.FeedRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul hanpens error for: %v", config.FeedRpcServerName, err)
log.Panicf("Rpc %s register consul happens error for: %v", config.FeedRpcServerName, err)
}
srv.New()
log.Infof("Rpc %s is running at %s now", config.FeedRpcServerName, config.FeedRpcServerPort)
if err := s.Serve(lis); err != nil {
log.Panicf("Rpc %s listen hanpens error for: %v", config.FeedRpcServerName, err)
srvMetrics.InitializeMetrics(s)

g := &run.Group{}
g.Add(func() error {
return s.Serve(lis)
}, func(err error) {
s.GracefulStop()
s.Stop()
log.Errorf("Rpc %s listen happens error for: %v", config.FeedRpcServerName, err)
})

httpSrv := &http.Server{Addr: config.EnvCfg.PodIpAddr + config.Metrics}
g.Add(func() error {
m := http.NewServeMux()
m.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
))
httpSrv.Handler = m
log.Infof("Promethus now running")
return httpSrv.ListenAndServe()
}, func(error) {
if err := httpSrv.Close(); err != nil {
log.Errorf("Prometheus %s listen happens error for: %v", config.FeedRpcServerName, err)
}
})

g.Add(run.SignalHandler(context.Background(), syscall.SIGINT, syscall.SIGTERM))

if err := g.Run(); err != nil {
log.WithFields(logrus.Fields{
"err": err,
}).Errorf("Error when runing http server")
os.Exit(1)
}
}
2 changes: 1 addition & 1 deletion src/services/user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
)

if err := consul.RegisterConsul(config.UserRpcServerName, config.UserRpcServerPort); err != nil {
log.Panicf("Rpc %s register consul hanpens error for: %v", config.UserRpcServerName, err)
log.Panicf("Rpc %s register consul happens error for: %v", config.UserRpcServerName, err)
}
log.Infof("Rpc %s is running at %s now", config.UserRpcServerName, config.UserRpcServerPort)

Expand Down

0 comments on commit 0a659a2

Please sign in to comment.