Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ec): send upgrade events instead of the operator #5017

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion e2e/playwright/tests/@smoke-test/test.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ test("smoke test", async ({ page }) => {
);
await page.getByRole("button", { name: "Continue" }).click();
await expect(page.locator("#app")).toContainText("Results", {
timeout: 30000,
timeout: 60 * 1000,
});
await expect(page.locator("#app")).toContainText("Sequence is 0");
await page.getByRole("button", { name: "Deploy" }).click();
Expand Down
107 changes: 107 additions & 0 deletions pkg/embeddedcluster/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package embeddedcluster

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"

embeddedclusterv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
"github.com/replicatedhq/kots/pkg/logger"
)

// UpgradeStartedEvent is send back home when the upgrade starts.
type UpgradeStartedEvent struct {
ClusterID string `json:"clusterID"`
TargetVersion string `json:"targetVersion"`
InitialVersion string `json:"initialVersion"`
AppVersion string `json:"appVersion"`
}

// UpgradeFailedEvent is send back home when the upgrade fails.
type UpgradeFailedEvent struct {
ClusterID string `json:"clusterID"`
TargetVersion string `json:"targetVersion"`
InitialVersion string `json:"initialVersion"`
Reason string `json:"reason"`
}

// UpgradeSucceededEvent event is send back home when the upgrade succeeds.
type UpgradeSucceededEvent struct {
ClusterID string `json:"clusterID"`
TargetVersion string `json:"targetVersion"`
InitialVersion string `json:"initialVersion"`
}

// NotifyUpgradeStarted notifies the metrics server that an upgrade has started.
func NotifyUpgradeStarted(ctx context.Context, baseURL string, ins, prev *embeddedclusterv1beta1.Installation, versionLabel string) error {
if ins.Spec.AirGap {
return nil
}
return sendEvent(ctx, "UpgradeStarted", baseURL, UpgradeStartedEvent{
ClusterID: ins.Spec.ClusterID,
TargetVersion: ins.Spec.Config.Version,
InitialVersion: prev.Spec.Config.Version,
AppVersion: versionLabel,
})
}

// NotifyUpgradeFailed notifies the metrics server that an upgrade has failed.
func NotifyUpgradeFailed(ctx context.Context, baseURL string, ins, prev *embeddedclusterv1beta1.Installation, reason string) error {
if ins.Spec.AirGap {
return nil
}
return sendEvent(ctx, "UpgradeFailed", baseURL, UpgradeFailedEvent{
ClusterID: ins.Spec.ClusterID,
TargetVersion: ins.Spec.Config.Version,
InitialVersion: prev.Spec.Config.Version,
Reason: reason,
})
}

// NotifyUpgradeSucceeded notifies the metrics server that an upgrade has succeeded.
func NotifyUpgradeSucceeded(ctx context.Context, baseURL string, ins, prev *embeddedclusterv1beta1.Installation) error {
if ins.Spec.AirGap {
return nil
}
return sendEvent(ctx, "UpgradeSucceeded", baseURL, UpgradeSucceededEvent{
ClusterID: ins.Spec.ClusterID,
TargetVersion: ins.Spec.Config.Version,
InitialVersion: prev.Spec.Config.Version,
})
}

// sendEvent sends the received event to the metrics server through a post request.
func sendEvent(ctx context.Context, evname, baseURL string, ev interface{}) error {
url := fmt.Sprintf("%s/embedded_cluster_metrics/%s", baseURL, evname)

logger.Infof("Sending event %s to %s", evname, url)

body := map[string]interface{}{"event": ev}
buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(body); err != nil {
return err
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, buf)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send event: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to send event: %s", resp.Status)
}
return nil
}
9 changes: 9 additions & 0 deletions pkg/embeddedcluster/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
dockerregistrytypes "github.com/replicatedhq/kots/pkg/docker/registry/types"
"github.com/replicatedhq/kots/pkg/imageutil"
"github.com/replicatedhq/kots/pkg/k8sutil"
"github.com/replicatedhq/kots/pkg/logger"
registrytypes "github.com/replicatedhq/kots/pkg/registry/types"
"github.com/replicatedhq/kots/pkg/util"
kotsv1beta1 "github.com/replicatedhq/kotskinds/apis/kots/v1beta1"
Expand Down Expand Up @@ -75,8 +76,16 @@ func startClusterUpgrade(

log.Printf("Starting cluster upgrade to version %s...", newcfg.Version)

// We cannot notify the upgrade started until the new install is available
if err := NotifyUpgradeStarted(ctx, license.Spec.Endpoint, newInstall, current, versionLabel); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so if the upgrade fails prior to this point, we wouldn't report that an upgrade started, but we would report that it failed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess that's the caveat that you captured in the comment that the new installation object is required to report that the upgrade started. so nevermind.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated. im not sure if its better or worse this way

logger.Errorf("Failed to notify upgrade started: %v", err)
}

err = runClusterUpgrade(ctx, k8sClient, newInstall, registrySettings, license, versionLabel)
if err != nil {
if err := NotifyUpgradeFailed(ctx, license.Spec.Endpoint, newInstall, current, err.Error()); err != nil {
logger.Errorf("Failed to notify upgrade failed: %v", err)
}
return fmt.Errorf("run cluster upgrade: %w", err)
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/embeddedcluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ func GetCurrentInstallation(ctx context.Context, kbClient kbclient.Client) (*emb
return &installations[0], nil
}

// GetCurrentInstallation returns the second most recent installation object from the cluster.
func GetPreviousInstallation(ctx context.Context, kbClient kbclient.Client) (*embeddedclusterv1beta1.Installation, error) {
installations, err := ListInstallations(ctx, kbClient)
if err != nil {
return nil, fmt.Errorf("failed to list installations: %w", err)
}
if len(installations) < 2 {
return nil, nil
}
sort.SliceStable(installations, func(i, j int) bool {
return installations[j].Name < installations[i].Name
})
return &installations[1], nil
}

func ListInstallations(ctx context.Context, kbClient kbclient.Client) ([]embeddedclusterv1beta1.Installation, error) {
var installationList embeddedclusterv1beta1.InstallationList
if err := kbClient.List(ctx, &installationList, &kbclient.ListOptions{}); err != nil {
Expand Down
76 changes: 69 additions & 7 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/pkg/errors"
embeddedclusterv1beta1 "github.com/replicatedhq/embedded-cluster/kinds/apis/v1beta1"
downstreamtypes "github.com/replicatedhq/kots/pkg/api/downstream/types"
"github.com/replicatedhq/kots/pkg/app"
apptypes "github.com/replicatedhq/kots/pkg/app/types"
Expand Down Expand Up @@ -53,6 +54,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
)

var (
Expand Down Expand Up @@ -933,7 +935,7 @@ func (o *Operator) reconcileDeployment(cm *corev1.ConfigMap) (finalError error)
if cm.Data["requires-cluster-upgrade"] == "true" {
// wait for cluster upgrade even if the embedded cluster version doesn't match yet
// in order to continuously report progress to the user
if err := o.waitForClusterUpgrade(cm.Data["app-slug"]); err != nil {
if err := o.waitForClusterUpgrade(cm.Data["app-id"], cm.Data["app-slug"]); err != nil {
return errors.Wrap(err, "failed to wait for cluster upgrade")
}
}
Expand Down Expand Up @@ -1035,21 +1037,31 @@ func (o *Operator) reconcileDeployment(cm *corev1.ConfigMap) (finalError error)
return nil
}

func (o *Operator) waitForClusterUpgrade(appSlug string) error {
kbClient, err := k8sutil.GetKubeClient(context.Background())
func (o *Operator) waitForClusterUpgrade(appID string, appSlug string) error {
ctx := context.Background()

kbClient, err := k8sutil.GetKubeClient(ctx)
if err != nil {
return errors.Wrap(err, "failed to get kube client")
}
logger.Infof("waiting for cluster upgrade to finish")
logger.Infof("Waiting for cluster upgrade to finish")
for {
ins, err := embeddedcluster.GetCurrentInstallation(context.Background(), kbClient)
ins, err := embeddedcluster.GetCurrentInstallation(ctx, kbClient)
if err != nil {
return errors.Wrap(err, "failed to wait for embedded cluster installation")
}
if embeddedcluster.InstallationSucceeded(context.Background(), ins) {
if embeddedcluster.InstallationSucceeded(ctx, ins) {
logger.Infof("Cluster upgrade succeeded")
if err := o.notifyClusterUpgradeSucceeded(ctx, kbClient, ins, appID); err != nil {
logger.Errorf("Failed to notify upgrade succeeded: %v", err)
}
return nil
}
if embeddedcluster.InstallationFailed(context.Background(), ins) {
if embeddedcluster.InstallationFailed(ctx, ins) {
logger.Infof("Cluster upgrade failed")
if err := o.notifyClusterUpgradeFailed(ctx, kbClient, ins, appID); err != nil {
logger.Errorf("Failed to notify upgrade failed: %v", err)
}
if err := upgradeservicetask.SetStatusUpgradeFailed(appSlug, ins.Status.Reason); err != nil {
return errors.Wrap(err, "failed to set task status to failed")
}
Expand All @@ -1061,3 +1073,53 @@ func (o *Operator) waitForClusterUpgrade(appSlug string) error {
time.Sleep(5 * time.Second)
}
}

// notifyClusterUpgradeSucceeded sends a metrics event to the api that the upgrade succeeded.
func (o *Operator) notifyClusterUpgradeSucceeded(ctx context.Context, kbClient kbclient.Client, ins *embeddedclusterv1beta1.Installation, appID string) error {
if ins.Spec.AirGap {
return nil
}

license, err := o.store.GetLatestLicenseForApp(appID)
if err != nil {
return errors.Wrapf(err, "failed to get latest license for app %s", appID)
}

prev, err := embeddedcluster.GetPreviousInstallation(ctx, kbClient)
if err != nil {
return errors.Wrap(err, "failed to get previous installation")
} else if prev == nil {
return errors.New("previous installation not found")
}

err = embeddedcluster.NotifyUpgradeSucceeded(ctx, license.Spec.Endpoint, ins, prev)
if err != nil {
return errors.Wrap(err, "failed to send event")
}
return nil
}

// notifyClusterUpgradeFailed sends a metrics event to the api that the upgrade failed.
func (o *Operator) notifyClusterUpgradeFailed(ctx context.Context, kbClient kbclient.Client, ins *embeddedclusterv1beta1.Installation, appID string) error {
if ins.Spec.AirGap {
return nil
}

license, err := o.store.GetLatestLicenseForApp(appID)
if err != nil {
return errors.Wrapf(err, "failed to get latest license for app %s", appID)
}

prev, err := embeddedcluster.GetPreviousInstallation(ctx, kbClient)
if err != nil {
return errors.Wrap(err, "failed to get previous installation")
} else if prev == nil {
return errors.New("previous installation not found")
}

err = embeddedcluster.NotifyUpgradeFailed(ctx, license.Spec.Endpoint, ins, prev, ins.Status.Reason)
if err != nil {
return errors.Wrap(err, "failed to send event")
}
return nil
}
31 changes: 31 additions & 0 deletions pkg/upgradeservice/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
)

type CanDeployOptions struct {
Expand Down Expand Up @@ -141,6 +142,11 @@ func Deploy(opts DeployOptions) error {
tgzArchiveKey: tgzArchiveKey,
requiresClusterUpgrade: true,
}); err != nil {
// The operator is responsible for notifying of upgrade success/failure using the deployment.
// If we cannot create the deployment, the operator cannot take over and we need to notify of failure here.
if err := notifyClusterUpgradeFailed(context.Background(), kbClient, opts, finalError.Error()); err != nil {
logger.Errorf("Failed to notify upgrade failed: %v", err)
}
return errors.Wrap(err, "failed to create deployment")
}

Expand All @@ -150,6 +156,31 @@ func Deploy(opts DeployOptions) error {
return nil
}

// notifyClusterUpgradeFailed sends a metrics event to the api that the upgrade failed.
func notifyClusterUpgradeFailed(ctx context.Context, kbClient kbclient.Client, opts DeployOptions, reason string) error {
ins, err := embeddedcluster.GetCurrentInstallation(ctx, kbClient)
if err != nil {
return fmt.Errorf("failed to get current installation: %w", err)
}

if ins.Spec.AirGap {
return nil
}

prev, err := embeddedcluster.GetPreviousInstallation(ctx, kbClient)
if err != nil {
return errors.Wrap(err, "failed to get previous installation")
} else if prev == nil {
return errors.New("previous installation not found")
}

err = embeddedcluster.NotifyUpgradeFailed(ctx, opts.KotsKinds.License.Spec.Endpoint, ins, prev, reason)
if err != nil {
return errors.Wrap(err, "failed to send event")
}
return nil
}

type createDeploymentOptions struct {
ctx context.Context
isSkipPreflights bool
Expand Down
Loading