Skip to content

Commit

Permalink
feat: add kubernetes event watcher (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
colesnodgrass authored May 16, 2024
1 parent 9665569 commit 4e7c057
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 22 deletions.
27 changes: 27 additions & 0 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package k8s
import (
"context"
"fmt"
"io"
coreV1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"strings"
)

// Client primarily for testing purposes
Expand All @@ -32,8 +35,14 @@ type Client interface {

// ServerVersionGet returns the kubernetes version.
ServerVersionGet() (string, error)

EventsWatch(ctx context.Context, namespace string) (watch.Interface, error)

LogsGet(ctx context.Context, namespace string, name string) (string, error)
}

var _ Client = (*DefaultK8sClient)(nil)

// DefaultK8sClient converts the official kubernetes client to our more manageable (and testable) interface
type DefaultK8sClient struct {
ClientSet *kubernetes.Clientset
Expand Down Expand Up @@ -113,3 +122,21 @@ func (d *DefaultK8sClient) ServerVersionGet() (string, error) {
func (d *DefaultK8sClient) ServiceGet(ctx context.Context, namespace string, name string) (*coreV1.Service, error) {
return d.ClientSet.CoreV1().Services(namespace).Get(ctx, name, metav1.GetOptions{})
}

func (d *DefaultK8sClient) EventsWatch(ctx context.Context, namespace string) (watch.Interface, error) {
return d.ClientSet.EventsV1().Events(namespace).Watch(ctx, metav1.ListOptions{})
}

func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name string) (string, error) {
req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(name, &coreV1.PodLogOptions{Previous: true})
reader, err := req.Stream(ctx)
if err != nil {
return "", fmt.Errorf("could not get logs for pod %s: %w", name, err)
}
defer reader.Close()
buf := new(strings.Builder)
if _, err := io.Copy(buf, reader); err != nil {
return "", fmt.Errorf("could not copy logs from pod %s: %w", name, err)
}
return buf.String(), nil
}
2 changes: 1 addition & 1 deletion internal/cmd/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ func NewCmdLocal(provider k8s.Provider) *cobra.Command {
func printProviderDetails(p k8s.Provider) {
userHome, _ := os.UserHomeDir()
configPath := filepath.Join(userHome, p.Kubeconfig)
pterm.Info.Printfln("Using Kubernetes provider:\n\tProvider: %s\n\tKubeconfig: %s\n\tContext: %s", p.Name, configPath, p.Context)
pterm.Info.Printfln("Using Kubernetes provider:\n Provider: %s\n Kubeconfig: %s\n Context: %s", p.Name, configPath, p.Context)
}
80 changes: 79 additions & 1 deletion internal/cmd/local/local/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"helm.sh/helm/v3/pkg/release"
"helm.sh/helm/v3/pkg/repo"
"helm.sh/helm/v3/pkg/storage/driver"
v1events "k8s.io/api/events/v1"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -220,6 +221,8 @@ func New(provider k8s.Provider, opts ...Option) (*Command, error) {

// Install handles the installation of Airbyte
func (c *Command) Install(ctx context.Context, user, pass string) error {
go c.watchEvents(ctx)

if err := c.handleChart(ctx, chartRequest{
name: "airbyte",
repoName: airbyteRepoName,
Expand Down Expand Up @@ -294,6 +297,81 @@ func (c *Command) Install(ctx context.Context, user, pass string) error {
return nil
}

func (c *Command) watchEvents(ctx context.Context) {
watcher, err := c.k8s.EventsWatch(ctx, airbyteNamespace)
if err != nil {
pterm.Warning.Printfln("Unable to watch airbyte events\n %s", err)
return
}
defer watcher.Stop()

for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
pterm.Debug.Println("Event watcher completed.")
return
}
if convertedEvent, ok := event.Object.(*v1events.Event); ok {
c.handleEvent(ctx, convertedEvent)
} else {
pterm.Debug.Printfln("Received unexpected event: %T", event.Object)
}
case <-ctx.Done():
pterm.Debug.Printfln("Event watcher context completed:\n %s", ctx.Err())
return
}
}
}

// now is used to filter out kubernetes events that happened in the past.
// Kubernetes wants to use the ResourceVersion on the event watch request itself, but that approach
// is more complicated as it requires determining which ResourceVersion to initially provide.
var now = func() *v1.Time {
t := v1.Now()
return &t
}()

// handleEvent converts a kubernetes event into a console log message
func (c *Command) handleEvent(ctx context.Context, e *v1events.Event) {
// TODO: replace DeprecatedLastTimestamp,
// this is supposed to be replaced with series.lastObservedTime, however that field is always nil...
if e.DeprecatedLastTimestamp.Before(now) {
return
}

switch {
case strings.EqualFold(e.Type, "normal"):
pterm.Debug.Println(e.Note)
case strings.EqualFold(e.Type, "warning"):
var logs = ""
if strings.EqualFold(e.Reason, "backoff") {
var err error
logs, err = c.k8s.LogsGet(ctx, e.Regarding.Namespace, e.Regarding.Name)
if err != nil {
pterm.Debug.Printfln("Unable to retrieve logs for %s:%s\n %s", e.Regarding.Namespace, e.Regarding.Name, err)
}
}

// TODO: replace DeprecatedCount
// Similar issue to DeprecatedLastTimestamp, the series attribute is always nil
if logs != "" {
pterm.Warning.Printfln(
"Encountered an issue deploying Airbyte:\n Pod: %s\n Reason: %s\n Message: %s\n Count: %d\n Logs: %s",
e.Name, e.Reason, e.Note, e.DeprecatedCount, logs,
)
} else {
pterm.Warning.Printfln(
"Encountered an issue deploying Airbyte:\n Pod: %s\n Reason: %s\n Message: %s\n Count: %d",
e.Name, e.Reason, e.Note, e.DeprecatedCount,
)
}

default:
pterm.Debug.Printfln("Received an unsupported event type: %s", e.Type)
}
}

// handleBasicAuthSecret creates or updates the appropriate basic auth credentials for ingress.
func (c *Command) handleBasicAuthSecret(ctx context.Context, user, pass string) error {
hashedPass, err := bcrypt.GenerateFromPassword([]byte(pass), bcrypt.DefaultCost)
Expand Down Expand Up @@ -465,7 +543,7 @@ func (c *Command) handleChart(
c.tel.Attr(fmt.Sprintf("helm_%s_release_version", req.name), strconv.Itoa(helmRelease.Version))

pterm.Success.Printfln(
"Installed Helm Chart %s:\n\tname: %s\n\tnamespace: %s\n\tversion: %s\n\trelease: %d",
"Installed Helm Chart %s:\n name: %s\n namespace: %s\n version: %s\n release: %d",
req.chartName, helmRelease.Name, helmRelease.Namespace, helmRelease.Chart.Metadata.Version, helmRelease.Version)
return nil
}
Expand Down
57 changes: 37 additions & 20 deletions internal/cmd/local/local/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"helm.sh/helm/v3/pkg/repo"
coreV1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/watch"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -108,16 +109,16 @@ func TestCommand_Install(t *testing.T) {
}

k8sClient := mockK8sClient{
getServerVersion: func() (string, error) {
serverVersionGet: func() (string, error) {
return "test", nil
},
createOrUpdateSecret: func(ctx context.Context, namespace, name string, data map[string][]byte) error {
secretCreateOrUpdate: func(ctx context.Context, namespace, name string, data map[string][]byte) error {
return nil
},
existsIngress: func(ctx context.Context, namespace string, ingress string) bool {
ingressExists: func(ctx context.Context, namespace string, ingress string) bool {
return false
},
createIngress: func(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error {
ingressCreate: func(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error {
return nil
},
}
Expand Down Expand Up @@ -190,46 +191,62 @@ func (m *mockHelmClient) UninstallReleaseByName(s string) error {
var _ k8s.Client = (*mockK8sClient)(nil)

type mockK8sClient struct {
createIngress func(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error
existsIngress func(ctx context.Context, namespace string, ingress string) bool
updateIngress func(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error
existsNamespace func(ctx context.Context, namespace string) bool
deleteNamespace func(ctx context.Context, namespace string) error
createOrUpdateSecret func(ctx context.Context, namespace, name string, data map[string][]byte) error
getService func(ctx context.Context, namespace, name string) (*coreV1.Service, error)
getServerVersion func() (string, error)
ingressCreate func(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error
ingressExists func(ctx context.Context, namespace string, ingress string) bool
ingressUpdate func(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error
namespaceExists func(ctx context.Context, namespace string) bool
namespaceDelete func(ctx context.Context, namespace string) error
secretCreateOrUpdate func(ctx context.Context, namespace, name string, data map[string][]byte) error
serviceGet func(ctx context.Context, namespace, name string) (*coreV1.Service, error)
serverVersionGet func() (string, error)
eventsWatch func(ctx context.Context, namespace string) (watch.Interface, error)
logsGet func(ctx context.Context, namespace string, name string) (string, error)
}

func (m *mockK8sClient) IngressCreate(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error {
return m.createIngress(ctx, namespace, ingress)
return m.ingressCreate(ctx, namespace, ingress)
}

func (m *mockK8sClient) IngressExists(ctx context.Context, namespace string, ingress string) bool {
return m.existsIngress(ctx, namespace, ingress)
return m.ingressExists(ctx, namespace, ingress)
}

func (m *mockK8sClient) IngressUpdate(ctx context.Context, namespace string, ingress *networkingv1.Ingress) error {
return m.updateIngress(ctx, namespace, ingress)
return m.ingressUpdate(ctx, namespace, ingress)
}

func (m *mockK8sClient) NamespaceExists(ctx context.Context, namespace string) bool {
return m.existsNamespace(ctx, namespace)
return m.namespaceExists(ctx, namespace)
}

func (m *mockK8sClient) NamespaceDelete(ctx context.Context, namespace string) error {
return m.deleteNamespace(ctx, namespace)
return m.namespaceDelete(ctx, namespace)
}

func (m *mockK8sClient) SecretCreateOrUpdate(ctx context.Context, namespace, name string, data map[string][]byte) error {
return m.createOrUpdateSecret(ctx, namespace, name, data)
return m.secretCreateOrUpdate(ctx, namespace, name, data)
}

func (m *mockK8sClient) ServiceGet(ctx context.Context, namespace, name string) (*coreV1.Service, error) {
return m.getService(ctx, namespace, name)
return m.serviceGet(ctx, namespace, name)
}

func (m *mockK8sClient) ServerVersionGet() (string, error) {
return m.getServerVersion()
return m.serverVersionGet()
}

func (m *mockK8sClient) EventsWatch(ctx context.Context, namespace string) (watch.Interface, error) {
if m.eventsWatch == nil {
return watch.NewFake(), nil
}
return m.eventsWatch(ctx, namespace)
}

func (m *mockK8sClient) LogsGet(ctx context.Context, namespace string, name string) (string, error) {
if m.logsGet == nil {
return "LogsGet called", nil
}
return m.logsGet(ctx, namespace, name)
}

var _ telemetry.Client = (*mockTelemetryClient)(nil)
Expand Down
3 changes: 3 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func main() {
cancel()
}()

// ensure the pterm info width matches the other printers
pterm.Info.Prefix.Text = " INFO "

root := cmd.NewCmd()
cmd.Execute(ctx, root)

Expand Down

0 comments on commit 4e7c057

Please sign in to comment.