Skip to content

Commit

Permalink
add proxy grpc health server
Browse files Browse the repository at this point in the history
  • Loading branch information
aaperis committed Aug 19, 2024
1 parent c11e8c6 commit c80a501
Showing 1 changed file with 90 additions and 4 deletions.
94 changes: 90 additions & 4 deletions sda/cmd/reencrypt/reencrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/neicnordic/crypt4gh/keys"
"github.com/neicnordic/crypt4gh/model/headers"
Expand All @@ -19,7 +20,9 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/chacha20poly1305"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
Expand All @@ -32,6 +35,15 @@ type server struct {
c4ghPrivateKey *[32]byte
}

// hServer struct is used to implement the proxy grpc health.HealthServer.
type hServer struct {
healthgrpc.UnimplementedHealthServer
srvCert tls.Certificate
srvCACert *x509.CertPool
srvHost string
srvPort int
}

// ReencryptHeader implements reencrypt.ReEncryptHeader
// called with a crypt4gh header and a public key along with an optional dataeditlist,
// returns a new crypt4gh header using the same symmetric key as the original header
Expand Down Expand Up @@ -84,6 +96,56 @@ func (s *server) ReencryptHeader(_ context.Context, in *re.ReencryptRequest) (*r
return &re.ReencryptResponse{Header: newheader}, nil
}

// Check implements the healthgrpc.HealthServer Check method for the proxy grpc Health server.
// This method probes internally the health of reencrypt's server and returns the service or
// server status. The corresponding grpc health server serves as a proxy to the internal health
// service of the reencrypt server so that k8s grpc probes can be used when TLS is enabled.
func (p *hServer) Check(ctx context.Context, in *healthgrpc.HealthCheckRequest) (*healthgrpc.HealthCheckResponse, error) {

rpcCtx, rpcCancel := context.WithTimeout(ctx, time.Second*2)
defer rpcCancel()

var opts []grpc.DialOption
if p.srvCert.Certificate != nil {
creds := credentials.NewTLS(
&tls.Config{
ServerName: p.srvHost,
Certificates: []tls.Certificate{p.srvCert},
MinVersion: tls.VersionTLS13,
RootCAs: p.srvCACert,
},
)
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", p.srvHost, p.srvPort), opts...)
if err != nil {
log.Printf("failed to dial: %v", err)

return nil, status.Error(codes.NotFound, "unknown service")
}
defer conn.Close()

resp, err := healthgrpc.NewHealthClient(conn).Check(rpcCtx,
&healthgrpc.HealthCheckRequest{
Service: in.Service})
if err != nil {
log.Printf("failed to check: %v", err)

return nil, status.Error(codes.NotFound, "unknown service")
}

if resp.GetStatus() != healthgrpc.HealthCheckResponse_SERVING {
log.Debugf("service unhealthy (responded with %q)", resp.GetStatus().String())
}

return &healthgrpc.HealthCheckResponse{
Status: resp.GetStatus(),
}, nil
}

func main() {
conf, err := config.NewConfig("reencrypt")
if err != nil {
Expand All @@ -105,7 +167,11 @@ func main() {
panic(err)
}

var opts []grpc.ServerOption
var (
opts []grpc.ServerOption
serverCert tls.Certificate
caCert *x509.CertPool
)
if conf.ReEncrypt.ServerCert != "" && conf.ReEncrypt.ServerKey != "" {
switch {
case conf.ReEncrypt.CACert != "":
Expand All @@ -116,13 +182,13 @@ func main() {
panic(err)
}

caCert := x509.NewCertPool()
caCert = x509.NewCertPool()
if !caCert.AppendCertsFromPEM(caFile) {
sigc <- syscall.SIGINT
panic("Failed to append ca certificate")
}

serverCert, err := tls.LoadX509KeyPair(conf.ReEncrypt.ServerCert, conf.ReEncrypt.ServerKey)
serverCert, err = tls.LoadX509KeyPair(conf.ReEncrypt.ServerCert, conf.ReEncrypt.ServerKey)
if err != nil {
log.Errorf("Failed to parse certificates: %v", err)
sigc <- syscall.SIGINT
Expand Down Expand Up @@ -153,12 +219,32 @@ func main() {
re.RegisterReencryptServer(s, &server{c4ghPrivateKey: conf.ReEncrypt.Crypt4GHKey})
reflection.Register(s)

// Add health check
// Add health service for reencrypt server
healthServer := health.NewServer()
healthServer.SetServingStatus("", healthgrpc.HealthCheckResponse_SERVING)
healthServer.SetServingStatus(re.Reencrypt_ServiceDesc.ServiceName, healthgrpc.HealthCheckResponse_SERVING)
healthgrpc.RegisterHealthServer(s, healthServer)

// Start proxy health server
p := grpc.NewServer()
healthgrpc.RegisterHealthServer(p, &hServer{srvCert: serverCert, srvCACert: caCert, srvHost: conf.ReEncrypt.Host, srvPort: conf.ReEncrypt.Port})

healthServerListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.ReEncrypt.Host, conf.ReEncrypt.Port+1))
if err != nil {
log.Errorf("failed to listen: %v", err)
sigc <- syscall.SIGINT
panic(err)
}
go func() {
log.Debugf("health server listening at %v", healthServerListener.Addr())
if err := p.Serve(healthServerListener); err != nil {
log.Errorf("failed to serve: %v", err)
sigc <- syscall.SIGINT
panic(err)
}
}()

// Start reencrypt server
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Errorf("failed to serve: %v", err)
Expand Down

0 comments on commit c80a501

Please sign in to comment.