diff --git a/sda/cmd/reencrypt/reencrypt.go b/sda/cmd/reencrypt/reencrypt.go index 6d0815c27..383c77340 100644 --- a/sda/cmd/reencrypt/reencrypt.go +++ b/sda/cmd/reencrypt/reencrypt.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/neicnordic/crypt4gh/keys" "github.com/neicnordic/crypt4gh/model/headers" @@ -19,7 +20,9 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/crypto/chacha20poly1305" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" @@ -32,6 +35,15 @@ type server struct { c4ghPrivateKey *[32]byte } +// hServer struct is used to implement the proxy grpc health.HealthServer. +type hServer struct { + healthgrpc.UnimplementedHealthServer + srvCert tls.Certificate + srvCACert *x509.CertPool + srvHost string + srvPort int +} + // ReencryptHeader implements reencrypt.ReEncryptHeader // called with a crypt4gh header and a public key along with an optional dataeditlist, // returns a new crypt4gh header using the same symmetric key as the original header @@ -84,6 +96,56 @@ func (s *server) ReencryptHeader(_ context.Context, in *re.ReencryptRequest) (*r return &re.ReencryptResponse{Header: newheader}, nil } +// Check implements the healthgrpc.HealthServer Check method for the proxy grpc Health server. +// This method probes internally the health of reencrypt's server and returns the service or +// server status. The corresponding grpc health server serves as a proxy to the internal health +// service of the reencrypt server so that k8s grpc probes can be used when TLS is enabled. +func (p *hServer) Check(ctx context.Context, in *healthgrpc.HealthCheckRequest) (*healthgrpc.HealthCheckResponse, error) { + + rpcCtx, rpcCancel := context.WithTimeout(ctx, time.Second*2) + defer rpcCancel() + + var opts []grpc.DialOption + if p.srvCert.Certificate != nil { + creds := credentials.NewTLS( + &tls.Config{ + ServerName: p.srvHost, + Certificates: []tls.Certificate{p.srvCert}, + MinVersion: tls.VersionTLS13, + RootCAs: p.srvCACert, + }, + ) + opts = append(opts, grpc.WithTransportCredentials(creds)) + } else { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + + conn, err := grpc.NewClient(fmt.Sprintf("%s:%d", p.srvHost, p.srvPort), opts...) + if err != nil { + log.Printf("failed to dial: %v", err) + + return nil, status.Error(codes.NotFound, "unknown service") + } + defer conn.Close() + + resp, err := healthgrpc.NewHealthClient(conn).Check(rpcCtx, + &healthgrpc.HealthCheckRequest{ + Service: in.Service}) + if err != nil { + log.Printf("failed to check: %v", err) + + return nil, status.Error(codes.NotFound, "unknown service") + } + + if resp.GetStatus() != healthgrpc.HealthCheckResponse_SERVING { + log.Debugf("service unhealthy (responded with %q)", resp.GetStatus().String()) + } + + return &healthgrpc.HealthCheckResponse{ + Status: resp.GetStatus(), + }, nil +} + func main() { conf, err := config.NewConfig("reencrypt") if err != nil { @@ -105,7 +167,11 @@ func main() { panic(err) } - var opts []grpc.ServerOption + var ( + opts []grpc.ServerOption + serverCert tls.Certificate + caCert *x509.CertPool + ) if conf.ReEncrypt.ServerCert != "" && conf.ReEncrypt.ServerKey != "" { switch { case conf.ReEncrypt.CACert != "": @@ -116,13 +182,13 @@ func main() { panic(err) } - caCert := x509.NewCertPool() + caCert = x509.NewCertPool() if !caCert.AppendCertsFromPEM(caFile) { sigc <- syscall.SIGINT panic("Failed to append ca certificate") } - serverCert, err := tls.LoadX509KeyPair(conf.ReEncrypt.ServerCert, conf.ReEncrypt.ServerKey) + serverCert, err = tls.LoadX509KeyPair(conf.ReEncrypt.ServerCert, conf.ReEncrypt.ServerKey) if err != nil { log.Errorf("Failed to parse certificates: %v", err) sigc <- syscall.SIGINT @@ -153,12 +219,32 @@ func main() { re.RegisterReencryptServer(s, &server{c4ghPrivateKey: conf.ReEncrypt.Crypt4GHKey}) reflection.Register(s) - // Add health check + // Add health service for reencrypt server healthServer := health.NewServer() healthServer.SetServingStatus("", healthgrpc.HealthCheckResponse_SERVING) healthServer.SetServingStatus(re.Reencrypt_ServiceDesc.ServiceName, healthgrpc.HealthCheckResponse_SERVING) healthgrpc.RegisterHealthServer(s, healthServer) + // Start proxy health server + p := grpc.NewServer() + healthgrpc.RegisterHealthServer(p, &hServer{srvCert: serverCert, srvCACert: caCert, srvHost: conf.ReEncrypt.Host, srvPort: conf.ReEncrypt.Port}) + + healthServerListener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", conf.ReEncrypt.Host, conf.ReEncrypt.Port+1)) + if err != nil { + log.Errorf("failed to listen: %v", err) + sigc <- syscall.SIGINT + panic(err) + } + go func() { + log.Debugf("health server listening at %v", healthServerListener.Addr()) + if err := p.Serve(healthServerListener); err != nil { + log.Errorf("failed to serve: %v", err) + sigc <- syscall.SIGINT + panic(err) + } + }() + + // Start reencrypt server log.Printf("server listening at %v", lis.Addr()) if err := s.Serve(lis); err != nil { log.Errorf("failed to serve: %v", err)