diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index d4dab82..678105a 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -11,6 +11,7 @@ import ( "cloud-proxy/internal/cloud/gcp" "cloud-proxy/internal/cloud/gcp/gcpauth" "cloud-proxy/internal/config" + "cloud-proxy/internal/healthz" "cloud-proxy/internal/proxy" "github.com/sirupsen/logrus" @@ -77,6 +78,9 @@ func main() { client := proxy.New(conn, gcp.New(gcpauth.NewCredentialsSource(), http.DefaultClient), logger, cfg.ClusterID, GetVersion(), cfg.KeepAlive, cfg.KeepAliveTimeout) + + go startHealthServer(logger, cfg.HealthAddress) + err = client.Run(ctx) if err != nil { logger.Panicf("Failed to run client: %v", err) @@ -109,3 +113,13 @@ func setupLogger(cfg config.Config) *logrus.Logger { return logger } + +func startHealthServer(logger *logrus.Logger, addr string) { + healthchecks := healthz.NewServer(logger) + + logger.Infof("Starting healthcheck server on address %v", addr) + + if err := healthchecks.Run(addr); err != nil { + logger.WithError(err).Errorf("Failed to run healthcheck server") + } +} diff --git a/go.mod b/go.mod index 640be46..be4da2e 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( google.golang.org/api v0.197.0 google.golang.org/grpc v1.66.2 google.golang.org/protobuf v1.34.2 - sigs.k8s.io/controller-runtime v0.19.0 ) require ( diff --git a/go.sum b/go.sum index f621643..d5f294e 100644 --- a/go.sum +++ b/go.sum @@ -34,8 +34,6 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= -github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -62,8 +60,6 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM= -github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM= github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -83,10 +79,6 @@ github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0V github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= -github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= -github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= -github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -198,8 +190,6 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -245,5 +235,3 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -sigs.k8s.io/controller-runtime v0.19.0 h1:nWVM7aq+Il2ABxwiCizrVDSlmDcshi9llbaFbC0ji/Q= -sigs.k8s.io/controller-runtime v0.19.0/go.mod h1:iRmWllt8IlaLjvTTDLhRBXIEtkCK6hwVBJJsYS9Ajf4= diff --git a/internal/config/config.go b/internal/config/config.go index 6a5af2d..ee6b306 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -11,6 +11,7 @@ import ( const ( KeepAliveDefault = 10 * time.Second KeepAliveTimeoutDefault = time.Minute + HealthAddressDefault = ":9091" ) type Config struct { @@ -22,8 +23,8 @@ type Config struct { PodMetadata PodMetadata `mapstructure:"podmetadata"` //MetricsAddress string `mapstructure:"metricsaddress"` - //HealthAddress string `mapstructure:"healthaddress"` - Log Log `mapstructure:"log"` + HealthAddress string `mapstructure:"healthaddress"` + Log Log `mapstructure:"log"` } // PodMetadata stores metadata for the pod, mostly used for logging and debugging purposes. @@ -85,6 +86,8 @@ func Get() Config { _ = v.BindEnv("log.level", "LOG_LEVEL") + _ = v.BindEnv("healthaddress", "HEALTH_ADDRESS") + cfg = &Config{} if err := v.Unmarshal(cfg); err != nil { panic(fmt.Errorf("while parsing config: %w", err)) @@ -118,6 +121,10 @@ func Get() Config { } } + if cfg.HealthAddress == "" { + cfg.HealthAddress = HealthAddressDefault + } + return *cfg } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 69df649..d58bcf7 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -47,6 +47,7 @@ func TestConfig(t *testing.T) { }, KeepAlive: KeepAliveDefault, KeepAliveTimeout: KeepAliveTimeoutDefault, + HealthAddress: HealthAddressDefault, } got := Get() diff --git a/internal/healthz/healthz.go b/internal/healthz/healthz.go new file mode 100644 index 0000000..641c115 --- /dev/null +++ b/internal/healthz/healthz.go @@ -0,0 +1,73 @@ +package healthz + +import ( + "encoding/json" + "net/http" + + "github.com/sirupsen/logrus" +) + +type Server struct { + log *logrus.Logger +} + +func NewServer(log *logrus.Logger) *Server { + return &Server{ + log: log, + } +} + +func (hc *Server) Run(addr string) error { + mux := http.NewServeMux() + + mux.HandleFunc("/readyz", hc.readyCheck) + mux.HandleFunc("/livez", hc.liveCheck) + + return http.ListenAndServe(addr, mux) +} + +func (hc *Server) readyCheck(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + + // TODO: Implement proper readiness checks. + + response := map[string]string{ + "status": "ok", + } + + body, err := json.Marshal(response) + if err != nil { + hc.log.WithError(err).Errorf("Failed to marshal readiness check response") + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + + if _, err := w.Write(body); err != nil { + hc.log.WithError(err).Errorf("Failed to write response body") + } +} + +func (hc *Server) liveCheck(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + + // TODO: Implement proper liveness checks. + + response := map[string]string{ + "status": "ok", + } + + body, err := json.Marshal(response) + if err != nil { + hc.log.WithError(err).Errorf("Failed to marshal readiness check response") + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + + if _, err := w.Write(body); err != nil { + hc.log.WithError(err).Errorf("Failed to write response body") + } +} diff --git a/internal/proxy/client.go b/internal/proxy/client.go index 2309028..20a0098 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -66,12 +66,13 @@ func (c *Client) Run(ctx context.Context) error { return ctx.Err() case <-t.C: c.log.Info("Starting proxy client") - stream, closeStream, err := c.getStream() + stream, closeStream, err := c.getStream(ctx) if err != nil { c.log.Errorf("Could not get stream, restarting proxy client in %vs: %v", time.Duration(c.keepAlive.Load()).Seconds(), err) t.Reset(time.Duration(c.keepAlive.Load())) continue } + err = c.run(ctx, stream, closeStream) if err != nil { c.log.Errorf("Restarting proxy client in %vs: due to error: %v", time.Duration(c.keepAlive.Load()).Seconds(), err) @@ -81,10 +82,10 @@ func (c *Client) Run(ctx context.Context) error { } } -func (c *Client) getStream() (cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, func(), error) { +func (c *Client) getStream(ctx context.Context) (cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient, func(), error) { c.log.Info("Connecting to castai") apiClient := cloudproxyv1alpha.NewCloudProxyAPIClient(c.grpcConn) - stream, err := apiClient.StreamCloudProxy(context.Background()) + stream, err := apiClient.StreamCloudProxy(ctx) if err != nil { return nil, nil, fmt.Errorf("proxyCastAIClient.StreamCloudProxy: %w", err) } @@ -212,7 +213,6 @@ func (c *Client) handleMessage(in *cloudproxyv1alpha.StreamCloudProxyResponse, s if err != nil { c.log.Errorf("error sending response for msg_id=%v %v", in.GetMessageId(), err) } - return } func (c *Client) processConfigurationRequest(in *cloudproxyv1alpha.StreamCloudProxyResponse) { @@ -252,10 +252,7 @@ func (c *Client) processHttpRequest(req *cloudproxyv1alpha.HTTPRequest) *cloudpr func (c *Client) isAlive() bool { lastSeen := c.lastSeen.Load() - if time.Now().UnixNano()-lastSeen > c.keepAliveTimeout.Load() { - return false - } - return true + return time.Now().UnixNano()-lastSeen <= c.keepAliveTimeout.Load() } func (c *Client) sendKeepAlive(stream cloudproxyv1alpha.CloudProxyAPI_StreamCloudProxyClient) {