Skip to content

Commit

Permalink
KUBE-554: add healthcheck endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Trojan295 committed Sep 17, 2024
1 parent 2b25120 commit 2e0b369
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 30 deletions.
14 changes: 14 additions & 0 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"cloud-proxy/internal/cloud/gcp"
"cloud-proxy/internal/cloud/gcp/gcpauth"
"cloud-proxy/internal/config"
"cloud-proxy/internal/healthz"
"cloud-proxy/internal/proxy"

"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -49,6 +51,7 @@ func main() {
},
}
dialOpts = append(dialOpts, grpc.WithConnectParams(connectParams))
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{}))

logger.Infof(
"Creating grpc channel against (%s) with connection config (%+v) and TLS enabled=%v",
Expand Down Expand Up @@ -77,6 +80,17 @@ func main() {

client := proxy.New(conn, gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger,
cfg.ClusterID, GetVersion(), cfg.KeepAlive, cfg.KeepAliveTimeout)

go func() {
healthchecks := healthz.NewServer(logger, client)

logger.Infof("Starting healthcheck server on address %v", cfg.HealthAddress)

if err := healthchecks.Run(cfg.HealthAddress); err != nil {
logger.WithError(err).Errorf("Failed to run healthcheck server")
}
}()

err = client.Run(ctx)
if err != nil {
logger.Panicf("Failed to run client: %v", err)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ require (
google.golang.org/api v0.197.0
google.golang.org/grpc v1.66.2
google.golang.org/protobuf v1.34.2
sigs.k8s.io/controller-runtime v0.19.0
)

require (
Expand Down
12 changes: 0 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
Expand All @@ -62,8 +60,6 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM=
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM=
github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand All @@ -83,10 +79,6 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -198,8 +190,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -245,5 +235,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q=
sigs.k8s.io/controller-runtime v0.19.0/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4=
10 changes: 8 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ type Config struct {
PodMetadata PodMetadata `mapstructure:"podmetadata"`

//MetricsAddress string `mapstructure:"metricsaddress"`
//HealthAddress string `mapstructure:"healthaddress"`
Log Log `mapstructure:"log"`
HealthAddress string `mapstructure:"healthaddress"`
Log Log `mapstructure:"log"`
}

// PodMetadata stores metadata for the pod, mostly used for logging and debugging purposes.
Expand Down Expand Up @@ -85,6 +85,8 @@ func Get() Config {

_ = v.BindEnv("log.level", "LOG_LEVEL")

_ = v.BindEnv("healthaddress", "HEALTH_ADDRESS")

cfg = &Config{}
if err := v.Unmarshal(cfg); err != nil {
panic(fmt.Errorf("while parsing config: %w", err))
Expand Down Expand Up @@ -118,6 +120,10 @@ func Get() Config {
}
}

if cfg.HealthAddress == "" {
cfg.HealthAddress = ":9091"
}

return *cfg
}

Expand Down
63 changes: 63 additions & 0 deletions internal/healthz/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package healthz

import (
"encoding/json"
"net/http"

"github.com/sirupsen/logrus"
)

type ProxyClient interface {
IsAlive() bool
}

type Server struct {
log *logrus.Logger
proxyClient ProxyClient
}

func NewServer(log *logrus.Logger, proxyClient ProxyClient) *Server {
return &Server{
log: log,
proxyClient: proxyClient,
}
}

func (hc *Server) Run(addr string) error {
mux := http.NewServeMux()

mux.HandleFunc("/healthz", hc.healthCheck)

return http.ListenAndServe(addr, mux)
}

func (hc *Server) healthCheck(w http.ResponseWriter, r *http.Request) {
status := true
response := make(map[string]string)

if hc.proxyClient.IsAlive() {
response["proxyClient"] = "alive"
} else {
response["proxyClient"] = "not alive"
status = false
}

w.Header().Set("content-type", "application/json")

body, err := json.Marshal(response)
if err != nil {
hc.log.WithError(err).Errorf("Failed to marshal readiness check response")
w.WriteHeader(http.StatusInternalServerError)
return
}

if status {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}

if _, err := w.Write(body); err != nil {
hc.log.WithError(err).Errorf("Failed to write response body")
}
}
24 changes: 9 additions & 15 deletions internal/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ func (c *Client) Run(ctx context.Context) error {
return ctx.Err()
case <-t.C:
c.log.Info("Starting proxy client")
stream, closeStream, err := c.getStream()
stream, closeStream, err := c.getStream(ctx)
if err != nil {
c.log.Errorf("Could not get stream, restarting proxy client in %vs: %v", time.Duration(c.keepAlive.Load()).Seconds(), err)
t.Reset(time.Duration(c.keepAlive.Load()))
continue
}

err = c.run(ctx, stream, closeStream)
if err != nil {
c.log.Errorf("Restarting proxy client in %vs: due to error: %v", time.Duration(c.keepAlive.Load()).Seconds(), err)
Expand All @@ -81,10 +82,10 @@ func (c *Client) Run(ctx context.Context) error {
}
}

func (c *Client) getStream() (cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, func(), error) {
func (c *Client) getStream(ctx context.Context) (cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, func(), error) {
c.log.Info("Connecting to castai")
apiClient := cloudproxyv1alpha.NewCloudProxyAPIClient(c.grpcConn)
stream, err := apiClient.StreamCloudProxy(context.Background())
stream, err := apiClient.StreamCloudProxy(ctx)
if err != nil {
return nil, nil, fmt.Errorf("proxyCastAIClient.StreamCloudProxy: %w", err)
}
Expand Down Expand Up @@ -114,7 +115,7 @@ func (c *Client) sendInitialRequest(stream cloudproxyv1alpha.CloudProxyAPI_Strea
if err != nil {
return fmt.Errorf("stream.Send: initial request %w", err)
}
c.lastSeen.Store(time.Now().UnixNano())

c.lastSeenError.Store(nil)

c.log.Info("Stream to castai started successfully")
Expand All @@ -140,9 +141,6 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI
case <-stream.Context().Done():
return
default:
if !c.isAlive() {
return
}
}

c.log.Debugf("Polling stream for messages")
Expand All @@ -167,7 +165,7 @@ func (c *Client) run(ctx context.Context, stream cloudproxyv1alpha.CloudProxyAPI
case <-stream.Context().Done():
return fmt.Errorf("stream closed %w", stream.Context().Err())
case <-time.After(time.Duration(c.keepAlive.Load())):
if !c.isAlive() {
if !c.IsAlive() {
if err := c.lastSeenError.Load(); err != nil {
return fmt.Errorf("recived error: %w", *err)
}
Expand Down Expand Up @@ -212,7 +210,6 @@ func (c *Client) handleMessage(in *cloudproxyv1alpha.StreamCloudProxyResponse, s
if err != nil {
c.log.Errorf("error sending response for msg_id=%v %v", in.GetMessageId(), err)
}
return
}

func (c *Client) processConfigurationRequest(in *cloudproxyv1alpha.StreamCloudProxyResponse) {
Expand Down Expand Up @@ -250,12 +247,9 @@ func (c *Client) processHttpRequest(req *cloudproxyv1alpha.HTTPRequest) *cloudpr
return c.toResponse(resp)
}

func (c *Client) isAlive() bool {
func (c *Client) IsAlive() bool {
lastSeen := c.lastSeen.Load()
if time.Now().UnixNano()-lastSeen > c.keepAliveTimeout.Load() {
return false
}
return true
return time.Now().UnixNano()-lastSeen <= c.keepAliveTimeout.Load()
}

func (c *Client) sendKeepAlive(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) {
Expand All @@ -269,7 +263,7 @@ func (c *Client) sendKeepAlive(stream cloudproxyv1alpha.CloudProxyAPI_StreamClou
c.log.Infof("Stopping keep-alive loop: stream ended with %v", stream.Context().Err())
return
case <-ticker.C:
if !c.isAlive() {
if !c.IsAlive() {
c.log.Info("Stopping keep-alive loop: client connection is not alive")
return
}
Expand Down

0 comments on commit 2e0b369

Please sign in to comment.