diff --git a/cmd/broker/broker_suite_test.go b/cmd/broker/broker_suite_test.go index 0832c929f3..d4dbab021e 100644 --- a/cmd/broker/broker_suite_test.go +++ b/cmd/broker/broker_suite_test.go @@ -216,20 +216,20 @@ func NewBrokerSuiteTestWithConfig(t *testing.T, cfg *Config, version ...string) k8sClientProvider := kubeconfig.NewFakeK8sClientProvider(fakeK8sSKRClient) provisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Provisioning, log.With("provisioning", "manager")) provisioningQueue := NewProvisioningProcessingQueue(context.Background(), provisionManager, workersAmount, cfg, db, provisionerClient, inputFactory, - edpClient, accountProvider, k8sClientProvider, cli, defaultOIDCValues(), logs) + edpClient, accountProvider, k8sClientProvider, cli, defaultOIDCValues(), log) provisioningQueue.SpeedUp(10000) provisionManager.SpeedUp(10000) updateManager := process.NewStagedManager(db.Operations(), eventBroker, time.Hour, cfg.Update, log.With("update", "manager")) updateQueue := NewUpdateProcessingQueue(context.Background(), updateManager, 1, db, inputFactory, provisionerClient, - eventBroker, *cfg, k8sClientProvider, cli, logs) + eventBroker, *cfg, k8sClientProvider, cli, log) updateQueue.SpeedUp(10000) updateManager.SpeedUp(10000) deprovisionManager := process.NewStagedManager(db.Operations(), eventBroker, time.Hour, cfg.Deprovisioning, log.With("deprovisioning", "manager")) deprovisioningQueue := NewDeprovisioningProcessingQueue(ctx, workersAmount, deprovisionManager, cfg, db, eventBroker, - provisionerClient, edpClient, accountProvider, k8sClientProvider, cli, configProvider, logs, + provisionerClient, edpClient, accountProvider, k8sClientProvider, cli, configProvider, log, ) deprovisionManager.SpeedUp(10000) @@ -249,7 +249,7 @@ func NewBrokerSuiteTestWithConfig(t *testing.T, cfg *Config, version ...string) } ts.poller = &broker.TimerPoller{PollInterval: 3 * time.Millisecond, PollTimeout: 3 * time.Second, Log: ts.t.Log} - ts.CreateAPI(inputFactory, cfg, db, provisioningQueue, deprovisioningQueue, updateQueue, logs, k8sClientProvider, gardener.NewFakeClient(), eventBroker) + ts.CreateAPI(inputFactory, cfg, db, provisioningQueue, deprovisioningQueue, updateQueue, logs, log, k8sClientProvider, gardener.NewFakeClient(), eventBroker) notificationFakeClient := notification.NewFakeClient() notificationBundleBuilder := notification.NewBundleBuilder(notificationFakeClient, cfg.Notification) @@ -261,7 +261,7 @@ func NewBrokerSuiteTestWithConfig(t *testing.T, cfg *Config, version ...string) Retry: 10 * time.Millisecond, StatusCheck: 100 * time.Millisecond, UpgradeClusterTimeout: 3 * time.Second, - }, 250*time.Millisecond, runtimeResolver, notificationBundleBuilder, logs, cli, *cfg, 1000) + }, 250*time.Millisecond, runtimeResolver, notificationBundleBuilder, log, cli, *cfg, 1000) clusterQueue.SpeedUp(1000) @@ -366,7 +366,7 @@ func (s *BrokerSuiteTest) CallAPI(method string, path string, body string) *http return resp } -func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Config, db storage.BrokerStorage, provisioningQueue *process.Queue, deprovisionQueue *process.Queue, updateQueue *process.Queue, logs logrus.FieldLogger, skrK8sClientProvider *kubeconfig.FakeProvider, gardenerClient client.Client, eventBroker *event.PubSub) { +func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Config, db storage.BrokerStorage, provisioningQueue *process.Queue, deprovisionQueue *process.Queue, updateQueue *process.Queue, logs logrus.FieldLogger, log *slog.Logger, skrK8sClientProvider *kubeconfig.FakeProvider, gardenerClient client.Client, eventBroker *event.PubSub) { servicesConfig := map[string]broker.Service{ broker.KymaServiceName: { Description: "", @@ -397,7 +397,7 @@ func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Conf kcBuilder := &kcMock.KcBuilder{} kcBuilder.On("Build", nil).Return("--kubeconfig file", nil) createAPI(s.router, servicesConfig, inputFactory, cfg, db, provisioningQueue, deprovisionQueue, updateQueue, - lager.NewLogger("api"), logs, planDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, fakeKcpK8sClient, eventBroker) + lager.NewLogger("api"), logs, log, planDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, fakeKcpK8sClient, eventBroker) s.httpServer = httptest.NewServer(s.router) } diff --git a/cmd/broker/deprovisioning.go b/cmd/broker/deprovisioning.go index 4ccd1e24fb..a72b9cbc99 100644 --- a/cmd/broker/deprovisioning.go +++ b/cmd/broker/deprovisioning.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "time" "github.com/kyma-project/kyma-environment-broker/common/hyperscaler" @@ -12,7 +13,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/process/steps" "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -20,7 +20,7 @@ func NewDeprovisioningProcessingQueue(ctx context.Context, workersAmount int, de cfg *Config, db storage.BrokerStorage, pub event.Publisher, provisionerClient provisioner.Client, edpClient deprovisioning.EDPClient, accountProvider hyperscaler.AccountProvider, - k8sClientProvider K8sClientProvider, cli client.Client, configProvider input.ConfigurationProvider, logs logrus.FieldLogger) *process.Queue { + k8sClientProvider K8sClientProvider, cli client.Client, configProvider input.ConfigurationProvider, logs *slog.Logger) *process.Queue { deprovisioningSteps := []struct { disabled bool diff --git a/cmd/broker/deprovisioning_suite_test.go b/cmd/broker/deprovisioning_suite_test.go index dbd7652d01..7b5d4eec35 100644 --- a/cmd/broker/deprovisioning_suite_test.go +++ b/cmd/broker/deprovisioning_suite_test.go @@ -31,7 +31,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/util/wait" @@ -73,9 +72,6 @@ func NewDeprovisioningSuite(t *testing.T) *DeprovisioningSuite { Level: slog.LevelInfo, })) - logs := logrus.New() - logs.Formatter.(*logrus.TextFormatter).TimestampFormat = "15:04:05.000" - cfg := fixConfig() cfg.EDP.Environment = edpEnvironment @@ -116,7 +112,7 @@ func NewDeprovisioningSuite(t *testing.T) *DeprovisioningSuite { deprovisioningQueue := NewDeprovisioningProcessingQueue(ctx, workersAmount, deprovisionManager, cfg, db, eventBroker, provisionerClient, - edpClient, accountProvider, kubeconfig.NewFakeK8sClientProvider(fakeK8sSKRClient), fakeK8sSKRClient, configProvider, logs, + edpClient, accountProvider, kubeconfig.NewFakeK8sClientProvider(fakeK8sSKRClient), fakeK8sSKRClient, configProvider, log, ) deprovisioningQueue.SpeedUp(10000) diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 33969e66d7..42f3e2b335 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -236,7 +236,7 @@ func main() { // create and fill config var cfg Config err = envconfig.InitWithPrefix(&cfg, "APP") - fatalOnError(err, logs) + fatalOnError(err, log) if cfg.LogLevel != "" { l, _ := logrus.ParseLevel(cfg.LogLevel) @@ -262,9 +262,9 @@ func main() { // create kubernetes client kcpK8sConfig, err := config.GetConfig() - fatalOnError(err, logs) + fatalOnError(err, log) kcpK8sClient, err := initClient(kcpK8sConfig) - fatalOnError(err, logs) + fatalOnError(err, log) skrK8sClientProvider := kubeconfig.NewK8sClientFromSecretProvider(kcpK8sClient) // create storage @@ -274,7 +274,7 @@ func main() { db = storage.NewMemoryStorage() } else { store, conn, err := storage.NewFromConfig(cfg.Database, cfg.Events, cipher) - fatalOnError(err, logs) + fatalOnError(err, log) db = store dbStatsCollector := sqlstats.NewStatsCollector("broker", conn) prometheus.MustRegister(dbStatsCollector) @@ -293,13 +293,13 @@ func main() { kebConfig.NewConfigMapKeysValidator(), kebConfig.NewConfigMapConverter()) gardenerClusterConfig, err := gardener.NewGardenerClusterConfig(cfg.Gardener.KubeconfigPath) - fatalOnError(err, logs) + fatalOnError(err, log) cfg.Gardener.DNSProviders, err = gardener.ReadDNSProvidersValuesFromYAML(cfg.SkrDnsProvidersValuesYAMLFilePath) - fatalOnError(err, logs) + fatalOnError(err, log) dynamicGardener, err := dynamic.NewForConfig(gardenerClusterConfig) - fatalOnError(err, logs) + fatalOnError(err, log) gardenerClient, err := initClient(gardenerClusterConfig) - fatalOnError(err, logs) + fatalOnError(err, log) gardenerNamespace := fmt.Sprintf("garden-%v", cfg.Gardener.Project) gardenerAccountPool := hyperscaler.NewAccountPool(dynamicGardener, gardenerNamespace) @@ -307,13 +307,13 @@ func main() { accountProvider := hyperscaler.NewAccountProvider(gardenerAccountPool, gardenerSharedPool) regions, err := provider.ReadPlatformRegionMappingFromFile(cfg.TrialRegionMappingFilePath) - fatalOnError(err, logs) + fatalOnError(err, log) log.Info(fmt.Sprintf("Platform region mapping for trial: %v", regions)) oidcDefaultValues, err := runtime.ReadOIDCDefaultValuesFromYAML(cfg.SkrOidcDefaultValuesYAMLFilePath) - fatalOnError(err, logs) + fatalOnError(err, log) inputFactory, err := input.NewInputBuilderFactory(configProvider, cfg.Provisioner, regions, cfg.FreemiumProviders, oidcDefaultValues, cfg.Broker.UseSmallerMachineTypes) - fatalOnError(err, logs) + fatalOnError(err, log) edpClient := edp.NewClient(cfg.EDP) @@ -326,25 +326,25 @@ func main() { // run queues provisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Provisioning, log.With("provisioning", "manager")) provisionQueue := NewProvisioningProcessingQueue(ctx, provisionManager, cfg.Provisioning.WorkersAmount, &cfg, db, provisionerClient, inputFactory, - edpClient, accountProvider, skrK8sClientProvider, kcpK8sClient, oidcDefaultValues, logs) + edpClient, accountProvider, skrK8sClientProvider, kcpK8sClient, oidcDefaultValues, log) deprovisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Deprovisioning, log.With("deprovisioning", "manager")) deprovisionQueue := NewDeprovisioningProcessingQueue(ctx, cfg.Deprovisioning.WorkersAmount, deprovisionManager, &cfg, db, eventBroker, provisionerClient, edpClient, accountProvider, - skrK8sClientProvider, kcpK8sClient, configProvider, logs) + skrK8sClientProvider, kcpK8sClient, configProvider, log) updateManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Update, log.With("update", "manager")) updateQueue := NewUpdateProcessingQueue(ctx, updateManager, cfg.Update.WorkersAmount, db, inputFactory, provisionerClient, eventBroker, - cfg, skrK8sClientProvider, kcpK8sClient, logs) + cfg, skrK8sClientProvider, kcpK8sClient, log) /***/ servicesConfig, err := broker.NewServicesConfigFromFile(cfg.CatalogFilePath) - fatalOnError(err, logs) + fatalOnError(err, log) // create kubeconfig builder kcBuilder := kubeconfig.NewBuilder(provisionerClient, kcpK8sClient, skrK8sClientProvider) // create server router := mux.NewRouter() - createAPI(router, servicesConfig, inputFactory, &cfg, db, provisionQueue, deprovisionQueue, updateQueue, logger, logs, + createAPI(router, servicesConfig, inputFactory, &cfg, db, provisionQueue, deprovisionQueue, updateQueue, logger, logs, log, inputFactory.GetPlanDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, kcpK8sClient, eventBroker) // create metrics endpoint @@ -358,20 +358,20 @@ func main() { runtimeResolver := orchestrationExt.NewGardenerRuntimeResolver(dynamicGardener, gardenerNamespace, runtimeLister, log) clusterQueue := NewClusterOrchestrationProcessingQueue(ctx, db, provisionerClient, eventBroker, inputFactory, - nil, time.Minute, runtimeResolver, notificationBuilder, logs, kcpK8sClient, cfg, 1) + nil, time.Minute, runtimeResolver, notificationBuilder, log, kcpK8sClient, cfg, 1) // TODO: in case of cluster upgrade the same Azure Zones must be send to the Provisioner orchestrationHandler := orchestrate.NewOrchestrationHandler(db, clusterQueue, cfg.MaxPaginationPage, log) if !cfg.DisableProcessOperationsInProgress { - err = processOperationsInProgressByType(internal.OperationTypeProvision, db.Operations(), provisionQueue, logs) - fatalOnError(err, logs) - err = processOperationsInProgressByType(internal.OperationTypeDeprovision, db.Operations(), deprovisionQueue, logs) - fatalOnError(err, logs) - err = processOperationsInProgressByType(internal.OperationTypeUpdate, db.Operations(), updateQueue, logs) - fatalOnError(err, logs) - err = reprocessOrchestrations(orchestrationExt.UpgradeClusterOrchestration, db.Orchestrations(), db.Operations(), clusterQueue, logs) - fatalOnError(err, logs) + err = processOperationsInProgressByType(internal.OperationTypeProvision, db.Operations(), provisionQueue, log) + fatalOnError(err, log) + err = processOperationsInProgressByType(internal.OperationTypeDeprovision, db.Operations(), deprovisionQueue, log) + fatalOnError(err, log) + err = processOperationsInProgressByType(internal.OperationTypeUpdate, db.Operations(), updateQueue, log) + fatalOnError(err, log) + err = reprocessOrchestrations(orchestrationExt.UpgradeClusterOrchestration, db.Orchestrations(), db.Operations(), clusterQueue, log) + fatalOnError(err, log) } else { log.Info("Skipping processing operation in progress on start") } @@ -381,7 +381,7 @@ func main() { "domain": cfg.DomainName, } err = swagger.NewTemplate("/swagger", swaggerTemplates).Execute() - fatalOnError(err, logs) + fatalOnError(err, log) // create /orchestration orchestrationHandler.AttachRoutes(router) @@ -403,7 +403,7 @@ func main() { log.Info(fmt.Sprintf("Call handled: method=%s url=%s statusCode=%d size=%d", params.Request.Method, params.URL.Path, params.StatusCode, params.Size)) }) - fatalOnError(http.ListenAndServe(cfg.Host+":"+cfg.Port, svr), logs) + fatalOnError(http.ListenAndServe(cfg.Host+":"+cfg.Port, svr), log) } func logConfiguration(logs *slog.Logger, cfg Config) { @@ -423,27 +423,27 @@ func logConfiguration(logs *slog.Logger, cfg Config) { } func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planValidator broker.PlanValidator, cfg *Config, db storage.BrokerStorage, - provisionQueue, deprovisionQueue, updateQueue *process.Queue, logger lager.Logger, logs logrus.FieldLogger, planDefaults broker.PlanDefaults, kcBuilder kubeconfig.KcBuilder, clientProvider K8sClientProvider, kubeconfigProvider KubeconfigProvider, gardenerClient, kcpK8sClient client.Client, publisher event.Publisher) { + provisionQueue, deprovisionQueue, updateQueue *process.Queue, logger lager.Logger, logs logrus.FieldLogger, log *slog.Logger, planDefaults broker.PlanDefaults, kcBuilder kubeconfig.KcBuilder, clientProvider K8sClientProvider, kubeconfigProvider KubeconfigProvider, gardenerClient, kcpK8sClient client.Client, publisher event.Publisher) { suspensionCtxHandler := suspension.NewContextUpdateHandler(db.Operations(), provisionQueue, deprovisionQueue, logs) defaultPlansConfig, err := servicesConfig.DefaultPlansConfig() - fatalOnError(err, logs) + fatalOnError(err, log) debugSink, err := lager.NewRedactingSink(lager.NewWriterSink(os.Stdout, lager.DEBUG), []string{"instance-details"}, []string{}) - fatalOnError(err, logs) + fatalOnError(err, log) logger.RegisterSink(debugSink) errorSink, err := lager.NewRedactingSink(lager.NewWriterSink(os.Stderr, lager.ERROR), []string{"instance-details"}, []string{}) - fatalOnError(err, logs) + fatalOnError(err, log) logger.RegisterSink(errorSink) freemiumGlobalAccountIds, err := whitelist.ReadWhitelistedGlobalAccountIdsFromFile(cfg.FreemiumWhitelistedGlobalAccountsFilePath) - fatalOnError(err, logs) + fatalOnError(err, log) logs.Infof("Number of globalAccountIds for unlimited freeemium: %d\n", len(freemiumGlobalAccountIds)) // backward compatibility for tests convergedCloudRegionProvider, err := broker.NewDefaultConvergedCloudRegionsProvider(cfg.SapConvergedCloudRegionMappingsFilePath, &broker.YamlRegionReader{}) - fatalOnError(err, logs) + fatalOnError(err, log) logs.Infof("%s plan region mappings loaded", broker.SapConvergedCloudPlanName) // create KymaEnvironmentBroker endpoints @@ -482,19 +482,19 @@ func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planVal } // queues all in progress operations by type -func processOperationsInProgressByType(opType internal.OperationType, op storage.Operations, queue *process.Queue, log logrus.FieldLogger) error { +func processOperationsInProgressByType(opType internal.OperationType, op storage.Operations, queue *process.Queue, log *slog.Logger) error { operations, err := op.GetNotFinishedOperationsByType(opType) if err != nil { return fmt.Errorf("while getting in progress operations from storage: %w", err) } for _, operation := range operations { queue.Add(operation.ID) - log.Infof("Resuming the processing of %s operation ID: %s", opType, operation.ID) + log.Info(fmt.Sprintf("Resuming the processing of %s operation ID: %s", opType, operation.ID)) } return nil } -func reprocessOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log logrus.FieldLogger) error { +func reprocessOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log *slog.Logger) error { if err := processCancelingOrchestrations(orchestrationType, orchestrationsStorage, operationsStorage, queue, log); err != nil { return fmt.Errorf("while processing canceled %s orchestrations: %w", orchestrationType, err) } @@ -510,7 +510,7 @@ func reprocessOrchestrations(orchestrationType orchestrationExt.Type, orchestrat return nil } -func processOrchestration(orchestrationType orchestrationExt.Type, state string, orchestrationsStorage storage.Orchestrations, queue *process.Queue, log logrus.FieldLogger) error { +func processOrchestration(orchestrationType orchestrationExt.Type, state string, orchestrationsStorage storage.Orchestrations, queue *process.Queue, log *slog.Logger) error { filter := dbmodel.OrchestrationFilter{ Types: []string{string(orchestrationType)}, States: []string{state}, @@ -525,14 +525,14 @@ func processOrchestration(orchestrationType orchestrationExt.Type, state string, for _, o := range orchestrations { queue.Add(o.OrchestrationID) - log.Infof("Resuming the processing of %s %s orchestration ID: %s", state, orchestrationType, o.OrchestrationID) + log.Info(fmt.Sprintf("Resuming the processing of %s %s orchestration ID: %s", state, orchestrationType, o.OrchestrationID)) } return nil } // processCancelingOrchestrations reprocess orchestrations with canceling state only when some in progress operations exists // reprocess only one orchestration to not clog up the orchestration queue on start -func processCancelingOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log logrus.FieldLogger) error { +func processCancelingOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log *slog.Logger) error { filter := dbmodel.OrchestrationFilter{ Types: []string{string(orchestrationType)}, States: []string{orchestrationExt.Canceling}, @@ -556,7 +556,7 @@ func processCancelingOrchestrations(orchestrationType orchestrationExt.Type, orc } if count > 0 { - log.Infof("Resuming the processing of %s %s orchestration ID: %s", orchestrationExt.Canceling, orchestrationType, o.OrchestrationID) + log.Info(fmt.Sprintf("Resuming the processing of %s %s orchestration ID: %s", orchestrationExt.Canceling, orchestrationType, o.OrchestrationID)) queue.Add(o.OrchestrationID) return nil } @@ -585,9 +585,10 @@ func initClient(cfg *rest.Config) (client.Client, error) { return cli, nil } -func fatalOnError(err error, log logrus.FieldLogger) { +func fatalOnError(err error, log *slog.Logger) { if err != nil { - log.Fatal(err) + log.Error(err.Error()) + os.Exit(1) } } diff --git a/cmd/broker/provisioning.go b/cmd/broker/provisioning.go index 3935190994..15fbfac5a3 100644 --- a/cmd/broker/provisioning.go +++ b/cmd/broker/provisioning.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "github.com/kyma-project/kyma-environment-broker/internal/provider" @@ -13,14 +14,13 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/process/steps" "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) func NewProvisioningProcessingQueue(ctx context.Context, provisionManager *process.StagedManager, workersAmount int, cfg *Config, db storage.BrokerStorage, provisionerClient provisioner.Client, inputFactory input.CreatorForPlan, edpClient provisioning.EDPClient, accountProvider hyperscaler.AccountProvider, - k8sClientProvider provisioning.K8sClientProvider, cli client.Client, defaultOIDC pkg.OIDCConfigDTO, logs logrus.FieldLogger) *process.Queue { + k8sClientProvider provisioning.K8sClientProvider, cli client.Client, defaultOIDC pkg.OIDCConfigDTO, logs *slog.Logger) *process.Queue { trialRegionsMapping, err := provider.ReadPlatformRegionMappingFromFile(cfg.TrialRegionMappingFilePath) if err != nil { diff --git a/cmd/broker/suite_test.go b/cmd/broker/suite_test.go index cd9106f536..3c518136db 100644 --- a/cmd/broker/suite_test.go +++ b/cmd/broker/suite_test.go @@ -41,7 +41,6 @@ import ( kebRuntime "github.com/kyma-project/kyma-environment-broker/internal/runtime" "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -107,9 +106,6 @@ func NewOrchestrationSuite(t *testing.T, additionalKymaVersions []string) *Orche Level: slog.LevelInfo, })) - logs := logrus.New() - logs.Formatter.(*logrus.TextFormatter).TimestampFormat = "15:04:05.000" - var cfg Config cfg.OrchestrationConfig = kebOrchestration.Config{ KubernetesVersion: "", @@ -166,7 +162,7 @@ func NewOrchestrationSuite(t *testing.T, additionalKymaVersions []string) *Orche Retry: 2 * time.Millisecond, StatusCheck: 20 * time.Millisecond, UpgradeClusterTimeout: 4 * time.Second, - }, 250*time.Millisecond, runtimeResolver, notificationBundleBuilder, logs, cli, cfg, 1000) + }, 250*time.Millisecond, runtimeResolver, notificationBundleBuilder, log, cli, cfg, 1000) clusterQueue.SpeedUp(1000) @@ -546,7 +542,6 @@ func NewProvisioningSuite(t *testing.T, multiZoneCluster bool, controlPlaneFailu log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) - logs := logrus.New() storageCleanup, db, err := GetStorageForE2ETests() assert.NoError(t, err) t.Cleanup(func() { @@ -593,7 +588,7 @@ func NewProvisioningSuite(t *testing.T, multiZoneCluster bool, controlPlaneFailu provisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Provisioning, log.With("provisioning", "manager")) provisioningQueue := NewProvisioningProcessingQueue(ctx, provisionManager, workersAmount, cfg, db, provisionerClient, inputFactory, edpClient, accountProvider, - kubeconfig.NewFakeK8sClientProvider(cli), cli, defaultOIDCValues(), logs) + kubeconfig.NewFakeK8sClientProvider(cli), cli, defaultOIDCValues(), log) provisioningQueue.SpeedUp(10000) provisionManager.SpeedUp(10000) diff --git a/cmd/broker/update.go b/cmd/broker/update.go index 8d72610bb3..68b890941d 100644 --- a/cmd/broker/update.go +++ b/cmd/broker/update.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "github.com/kyma-project/kyma-environment-broker/internal/process/steps" @@ -11,13 +12,12 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/process/update" "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) func NewUpdateProcessingQueue(ctx context.Context, manager *process.StagedManager, workersAmount int, db storage.BrokerStorage, inputFactory input.CreatorForPlan, provisionerClient provisioner.Client, publisher event.Publisher, - cfg Config, k8sClientProvider K8sClientProvider, cli client.Client, logs logrus.FieldLogger) *process.Queue { + cfg Config, k8sClientProvider K8sClientProvider, cli client.Client, logs *slog.Logger) *process.Queue { manager.DefineStages([]string{"cluster", "btp-operator", "btp-operator-check", "check", "runtime_resource", "check_runtime_resource"}) updateSteps := []struct { diff --git a/cmd/broker/upgrade_cluster.go b/cmd/broker/upgrade_cluster.go index 39dcc0e2d8..0ef17258f4 100644 --- a/cmd/broker/upgrade_cluster.go +++ b/cmd/broker/upgrade_cluster.go @@ -2,6 +2,7 @@ package main import ( "context" + "log/slog" "time" orchestrationExt "github.com/kyma-project/kyma-environment-broker/common/orchestration" @@ -14,16 +15,15 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/process/upgrade_cluster" "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" "sigs.k8s.io/controller-runtime/pkg/client" ) func NewClusterOrchestrationProcessingQueue(ctx context.Context, db storage.BrokerStorage, provisionerClient provisioner.Client, pub event.Publisher, inputFactory input.CreatorForPlan, icfg *upgrade_cluster.TimeSchedule, pollingInterval time.Duration, - runtimeResolver orchestrationExt.RuntimeResolver, notificationBuilder notification.BundleBuilder, logs logrus.FieldLogger, + runtimeResolver orchestrationExt.RuntimeResolver, notificationBuilder notification.BundleBuilder, logs *slog.Logger, cli client.Client, cfg Config, speedFactor int) *process.Queue { - upgradeClusterManager := upgrade_cluster.NewManager(db.Operations(), pub, logs.WithField("upgradeCluster", "manager")) + upgradeClusterManager := upgrade_cluster.NewManager(db.Operations(), pub, logs.With("upgradeCluster", "manager")) upgradeClusterInit := upgrade_cluster.NewInitialisationStep(db.Operations(), db.Orchestrations(), provisionerClient, inputFactory, icfg, notificationBuilder) upgradeClusterManager.InitStep(upgradeClusterInit) @@ -57,7 +57,7 @@ func NewClusterOrchestrationProcessingQueue(ctx context.Context, db storage.Brok } orchestrateClusterManager := manager.NewUpgradeClusterManager(db.Orchestrations(), db.Operations(), db.Instances(), - upgradeClusterManager, runtimeResolver, pollingInterval, logs.WithField("upgradeCluster", "orchestration"), + upgradeClusterManager, runtimeResolver, pollingInterval, logs.With("upgradeCluster", "orchestration"), cli, cfg.OrchestrationConfig, notificationBuilder, speedFactor) queue := process.NewQueue(orchestrateClusterManager, logs, "cluster-orchestration-processing", cfg.Broker.WorkerHealthCheckWarnInterval, cfg.Broker.WorkerHealthCheckInterval) diff --git a/common/orchestration/strategies/parallel.go b/common/orchestration/strategies/parallel.go index 3cebbddbf3..e5a2da2df5 100644 --- a/common/orchestration/strategies/parallel.go +++ b/common/orchestration/strategies/parallel.go @@ -2,13 +2,13 @@ package strategies import ( "fmt" + "log/slog" "runtime/debug" "sync" "time" "github.com/google/uuid" "github.com/kyma-project/kyma-environment-broker/common/orchestration" - "github.com/sirupsen/logrus" "k8s.io/client-go/util/workqueue" ) @@ -18,7 +18,7 @@ type ParallelOrchestrationStrategy struct { pq map[string]workqueue.DelayingInterface // processing queue, delaying queue for the in progress ops wg map[string]*sync.WaitGroup mux sync.RWMutex - log logrus.FieldLogger + log *slog.Logger rescheduleDelay time.Duration scheduleNum map[string]int speedFactor int @@ -26,7 +26,7 @@ type ParallelOrchestrationStrategy struct { // NewParallelOrchestrationStrategy returns a new parallel orchestration strategy, which // executes operations in parallel using a pool of workers and a delaying queue to support time-based scheduling. -func NewParallelOrchestrationStrategy(executor orchestration.OperationExecutor, log logrus.FieldLogger, rescheduleDelay time.Duration) orchestration.Strategy { +func NewParallelOrchestrationStrategy(executor orchestration.OperationExecutor, log *slog.Logger, rescheduleDelay time.Duration) orchestration.Strategy { strategy := &ParallelOrchestrationStrategy{ executor: executor, dq: map[string]workqueue.DelayingInterface{}, @@ -80,7 +80,7 @@ func (p *ParallelOrchestrationStrategy) Insert(execID string, operations []orche if err != nil { //error when read from storage or update to storage during maintenance window reschedule p.handleRescheduleErrorOperation(execID, &operations[i]) - p.log.Errorf("while processing operation %s: %v, will reschedule it", op.ID, err) + p.log.Error(fmt.Sprintf("while processing operation %s: %v, will reschedule it", op.ID, err)) } else { if p.dq[execID].ShuttingDown() { return fmt.Errorf("the execution ID %s is shutdown", execID) @@ -128,7 +128,7 @@ func (p *ParallelOrchestrationStrategy) scheduleOperationsLoop(execID string, st item, shutdown := dq.Get() if shutdown { - p.log.Infof("scheduling queue is shutdown") + p.log.Info("scheduling queue is shutdown") break } @@ -143,9 +143,9 @@ func (p *ParallelOrchestrationStrategy) scheduleOperationsLoop(execID string, st continue } - log := p.log.WithField("operationID", op.ID) + log := p.log.With("operationID", op.ID) if duration <= 0 { - log.Infof("operation is scheduled now") + log.Info("operation is scheduled now") pq.Add(item) p.processOperation(execID) @@ -154,7 +154,7 @@ func (p *ParallelOrchestrationStrategy) scheduleOperationsLoop(execID string, st p.scheduleNum[execID]-- p.mux.Unlock() } else { - log.Infof("operation will be scheduled in %v", duration) + log.Info(fmt.Sprintf("operation will be scheduled in %v", duration)) dq.AddAfter(item, duration) dq.Done(item) } @@ -169,32 +169,32 @@ func (p *ParallelOrchestrationStrategy) processOperation(execID string) { exit = func() bool { item, quit := p.pq[execID].Get() if quit { - p.log.Infof("processing queue is shutdown") + p.log.Info("processing queue is shutdown") return true } op := item.(*orchestration.RuntimeOperation) id := op.ID - log := p.log.WithField("operationID", id) + log := p.log.With("operationID", id) defer func() { if err := recover(); err != nil { - log.Errorf("panic error from process: %v. Stacktrace: %s", err, debug.Stack()) + log.Error(fmt.Sprintf("panic error from process: %v. Stacktrace: %s", err, debug.Stack())) } p.pq[execID].Done(item) }() when, err := p.executor.Execute(id) if err == nil && when != 0 { - log.Infof("Adding %q item after %v", id, when) + log.Info(fmt.Sprintf("Adding %q item after %v", id, when)) p.pq[execID].AddAfter(item, time.Duration(int64(when)/int64(p.speedFactor))) return false } if err != nil { - log.Errorf("Error from process: %v", err) + log.Error(fmt.Sprintf("Error from process: %v", err)) } - log.Infof("Finishing processing operation") + log.Info("Finishing processing operation") p.dq[execID].Done(item) return true @@ -249,7 +249,7 @@ func (p *ParallelOrchestrationStrategy) Cancel(executionID string) { if executionID == "" { return } - p.log.Infof("Cancelling strategy execution %s", executionID) + p.log.Info(fmt.Sprintf("Cancelling strategy execution %s", executionID)) p.mux.Lock() defer p.mux.Unlock() diff --git a/common/orchestration/strategies/parallel_test.go b/common/orchestration/strategies/parallel_test.go index 150329d4f4..fea03b4a09 100644 --- a/common/orchestration/strategies/parallel_test.go +++ b/common/orchestration/strategies/parallel_test.go @@ -1,13 +1,14 @@ package strategies import ( + "log/slog" + "os" "sync" "testing" "time" "github.com/kyma-project/kyma-environment-broker/common/orchestration" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/rand" ) @@ -38,7 +39,7 @@ func (t *testExecutor) Reschedule(operationID string, maintenanceWindowBegin, ma func TestNewParallelOrchestrationStrategy_Immediate(t *testing.T) { // given executor := &testExecutor{opCalled: map[string]bool{}} - s := NewParallelOrchestrationStrategy(executor, logrus.New(), 0) + s := NewParallelOrchestrationStrategy(executor, fixLogger(), 0) ops := make([]orchestration.RuntimeOperation, 3) for i := range ops { @@ -58,7 +59,7 @@ func TestNewParallelOrchestrationStrategy_Immediate(t *testing.T) { func TestNewParallelOrchestrationStrategy_MaintenanceWindow(t *testing.T) { // given executor := &testExecutor{opCalled: map[string]bool{}} - s := NewParallelOrchestrationStrategy(executor, logrus.New(), 0) + s := NewParallelOrchestrationStrategy(executor, fixLogger(), 0) start := time.Now().Add(3 * time.Second) @@ -84,7 +85,7 @@ func TestNewParallelOrchestrationStrategy_MaintenanceWindow(t *testing.T) { func TestNewParallelOrchestrationStrategy_Reschedule(t *testing.T) { // given executor := &testExecutor{opCalled: map[string]bool{}} - s := NewParallelOrchestrationStrategy(executor, logrus.New(), 5*time.Second) + s := NewParallelOrchestrationStrategy(executor, fixLogger(), 5*time.Second) start := time.Now().Add(-5 * time.Second) @@ -106,3 +107,9 @@ func TestNewParallelOrchestrationStrategy_Reschedule(t *testing.T) { assert.NoError(t, err) s.Wait(id) } + +func fixLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) +} diff --git a/internal/orchestration/handlers/cluster_handler_test.go b/internal/orchestration/handlers/cluster_handler_test.go index 7c6e4445d3..93aea6850d 100644 --- a/internal/orchestration/handlers/cluster_handler_test.go +++ b/internal/orchestration/handlers/cluster_handler_test.go @@ -16,7 +16,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/gorilla/mux" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) @@ -66,8 +65,7 @@ func fixClusterHandler(t *testing.T) *clusterHandler { log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) - logs := logrus.New() - q := process.NewQueue(&testExecutor{}, logs, "test-handler", 10*time.Second, 10*time.Second) + q := process.NewQueue(&testExecutor{}, log, "test-handler", 10*time.Second, 10*time.Second) handler := NewClusterHandler(db.Orchestrations(), q, log) return handler diff --git a/internal/orchestration/handlers/orchestration_handler_test.go b/internal/orchestration/handlers/orchestration_handler_test.go index ffb07a706a..ae2610b38d 100644 --- a/internal/orchestration/handlers/orchestration_handler_test.go +++ b/internal/orchestration/handlers/orchestration_handler_test.go @@ -19,7 +19,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/process" "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -251,8 +250,7 @@ func TestStatusRetryHandler_AttachRoutes(t *testing.T) { err = db.Operations().InsertUpgradeClusterOperation(sameInstOp) require.NoError(t, err) - logs := logrus.New() - clusterQueue := process.NewQueue(&testExecutor{}, logs, "orchestration-test", 10*time.Second, 10*time.Second) + clusterQueue := process.NewQueue(&testExecutor{}, log, "orchestration-test", 10*time.Second, 10*time.Second) kymaHandler := NewOrchestrationStatusHandler(db.Operations(), db.Orchestrations(), db.RuntimeStates(), clusterQueue, 100, log) for i, id := range operationIDs { @@ -334,8 +332,7 @@ func TestStatusRetryHandler_AttachRoutes(t *testing.T) { err = db.Operations().InsertUpgradeClusterOperation(sameInstOp) require.NoError(t, err) - logs := logrus.New() - clusterQueue := process.NewQueue(&testExecutor{}, logs, "status-retry", 10*time.Second, 10*time.Second) + clusterQueue := process.NewQueue(&testExecutor{}, log, "status-retry", 10*time.Second, 10*time.Second) kymaHandler := NewOrchestrationStatusHandler(db.Operations(), db.Orchestrations(), db.RuntimeStates(), clusterQueue, 100, log) req, err := http.NewRequest("POST", fmt.Sprintf("/orchestrations/%s/retry", orchestrationID), nil) @@ -417,8 +414,7 @@ func TestStatusRetryHandler_AttachRoutes(t *testing.T) { err = db.Operations().InsertDeprovisioningOperation(deprovisioningOperation) require.NoError(t, err) - logs := logrus.New() - clusterQueue := process.NewQueue(&testExecutor{}, logs, "status-retry", 10*time.Second, 10*time.Second) + clusterQueue := process.NewQueue(&testExecutor{}, log, "status-retry", 10*time.Second, 10*time.Second) kymaHandler := NewOrchestrationStatusHandler(db.Operations(), db.Orchestrations(), db.RuntimeStates(), clusterQueue, 100, log) req, err := http.NewRequest("POST", fmt.Sprintf("/orchestrations/%s/retry", orchestrationID), nil) @@ -480,8 +476,7 @@ func TestStatusRetryHandler_AttachRoutes(t *testing.T) { err = fixInProgressOrchestrationOperations(db, orchestrationID) require.NoError(t, err) - logs := logrus.New() - clusterQueue := process.NewQueue(&testExecutor{}, logs, "status-retry", 10*time.Second, 10*time.Second) + clusterQueue := process.NewQueue(&testExecutor{}, log, "status-retry", 10*time.Second, 10*time.Second) kymaHandler := NewOrchestrationStatusHandler(db.Operations(), db.Orchestrations(), db.RuntimeStates(), clusterQueue, 100, log) req, err := http.NewRequest("POST", fmt.Sprintf("/orchestrations/%s/retry", orchestrationID), nil) diff --git a/internal/orchestration/manager/manager.go b/internal/orchestration/manager/manager.go index 2eaa9ecdd5..6d410e4e2d 100644 --- a/internal/orchestration/manager/manager.go +++ b/internal/orchestration/manager/manager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "regexp" "strings" "time" @@ -20,7 +21,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/kyma-project/kyma-environment-broker/internal/storage/dberr" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" ) type OperationFactory interface { @@ -41,7 +41,7 @@ type orchestrationManager struct { resolver orchestration.RuntimeResolver factory OperationFactory executor orchestration.OperationExecutor - log logrus.FieldLogger + log *slog.Logger pollingInterval time.Duration k8sClient client.Client configNamespace string @@ -59,12 +59,12 @@ func (m *orchestrationManager) SpeedUp(factor int) { } func (m *orchestrationManager) Execute(orchestrationID string) (time.Duration, error) { - logger := m.log.WithField("orchestrationID", orchestrationID) - m.log.Infof("Processing orchestration %s", orchestrationID) + logger := m.log.With("orchestrationID", orchestrationID) + m.log.Info(fmt.Sprintf("Processing orchestration %s", orchestrationID)) o, err := m.orchestrationStorage.GetByID(orchestrationID) if err != nil { if o == nil { - m.log.Errorf("orchestration %s failed: %s", orchestrationID, err) + m.log.Error(fmt.Sprintf("orchestration %s failed: %s", orchestrationID, err)) return time.Minute, nil } return m.failOrchestration(o, fmt.Errorf("failed to get orchestration: %w", err)) @@ -89,12 +89,12 @@ func (m *orchestrationManager) Execute(orchestrationID string) (time.Duration, e err = m.orchestrationStorage.Update(*o) if err != nil { - logger.Errorf("while updating orchestration: %v", err) + logger.Error(fmt.Sprintf("while updating orchestration: %v", err)) return m.pollingInterval, nil } // do not perform any action if the orchestration is finished if o.IsFinished() { - m.log.Infof("Orchestration was already finished, state: %s", o.State) + m.log.Info(fmt.Sprintf("Orchestration was already finished, state: %s", o.State)) return 0, nil } @@ -115,11 +115,11 @@ func (m *orchestrationManager) Execute(orchestrationID string) (time.Duration, e o.UpdatedAt = time.Now() err = m.orchestrationStorage.Update(*o) if err != nil { - logger.Errorf("while updating orchestration: %v", err) + logger.Error(fmt.Sprintf("while updating orchestration: %v", err)) return m.pollingInterval, nil } - logger.Infof("Finished processing orchestration, state: %s", o.State) + logger.Info(fmt.Sprintf("Finished processing orchestration, state: %s", o.State)) return 0, nil } @@ -296,7 +296,7 @@ func (m *orchestrationManager) resolveOperations(o *internal.Orchestration, poli o.Description = updateRetryingDescription(o.Description, fmt.Sprintf("retried %d operations", len(filterRuntimes))) o.Parameters.RetryOperation.RetryOperations = nil o.Parameters.RetryOperation.Immediate = false - m.log.Infof("Resuming %d operations for orchestration %s", len(result), o.OrchestrationID) + m.log.Info(fmt.Sprintf("Resuming %d operations for orchestration %s", len(result), o.OrchestrationID)) } else { // Resume processing of not finished upgrade operations after restart var err error @@ -305,13 +305,13 @@ func (m *orchestrationManager) resolveOperations(o *internal.Orchestration, poli return result, filterRuntimes, fmt.Errorf("while resuming operation: %w", err) } - m.log.Infof("Resuming %d operations for orchestration %s", len(result), o.OrchestrationID) + m.log.Info(fmt.Sprintf("Resuming %d operations for orchestration %s", len(result), o.OrchestrationID)) } return result, filterRuntimes, nil } -func (m *orchestrationManager) resolveStrategy(sType orchestration.StrategyType, executor orchestration.OperationExecutor, log logrus.FieldLogger) orchestration.Strategy { +func (m *orchestrationManager) resolveStrategy(sType orchestration.StrategyType, executor orchestration.OperationExecutor, log *slog.Logger) orchestration.Strategy { switch sType { case orchestration.ParallelStrategy: s := strategies.NewParallelOrchestrationStrategy(executor, log, 0) @@ -324,7 +324,7 @@ func (m *orchestrationManager) resolveStrategy(sType orchestration.StrategyType, } // waitForCompletion waits until processing of given orchestration ends or if it's canceled -func (m *orchestrationManager) waitForCompletion(o *internal.Orchestration, strategy orchestration.Strategy, execID string, log logrus.FieldLogger) (*internal.Orchestration, error) { +func (m *orchestrationManager) waitForCompletion(o *internal.Orchestration, strategy orchestration.Strategy, execID string, log *slog.Logger) (*internal.Orchestration, error) { orchestrationID := o.OrchestrationID canceled := false var err error @@ -341,15 +341,15 @@ func (m *orchestrationManager) waitForCompletion(o *internal.Orchestration, stra canceled = true } case dberr.IsNotFound(err): - log.Errorf("while getting orchestration: %v", err) + log.Error(fmt.Sprintf("while getting orchestration: %v", err)) return false, err default: - log.Errorf("while getting orchestration: %v", err) + log.Error(fmt.Sprintf("while getting orchestration: %v", err)) return false, nil } s, err := m.operationStorage.GetOperationStatsForOrchestration(o.OrchestrationID) if err != nil { - log.Errorf("while getting operations: %v", err) + log.Error(fmt.Sprintf("while getting operations: %v", err)) return false, nil } stats = s @@ -372,12 +372,12 @@ func (m *orchestrationManager) waitForCompletion(o *internal.Orchestration, stra ops, err := m.factory.RetryOperations(o.Parameters.RetryOperation.RetryOperations) if err != nil { // don't block the polling and cancel signal - log.Errorf("PollUntilContextCancel() while handling retrying operations: %v", err) + log.Error(fmt.Sprintf("PollUntilContextCancel() while handling retrying operations: %v", err)) } result, o, _, err := m.NewOperationForPendingRetrying(o, orchestration.MaintenancePolicy{}, ops, false) if err != nil { - log.Errorf("PollUntilContextCancel() while new operation for retrying instanceid : %v", err) + log.Error(fmt.Sprintf("PollUntilContextCancel() while new operation for retrying instanceid : %v", err)) } err = strategy.Insert(execID, result, o.Parameters.Strategy) @@ -395,10 +395,10 @@ func (m *orchestrationManager) waitForCompletion(o *internal.Orchestration, stra err = m.orchestrationStorage.Update(*o) if err != nil { - log.Errorf("PollUntilContextCancel() while updating orchestration: %v", err) + log.Error(fmt.Sprintf("PollUntilContextCancel() while updating orchestration: %v", err)) return false, nil } - m.log.Infof("PollUntilContextCancel() while resuming %d operations for orchestration %s", len(result), o.OrchestrationID) + m.log.Info(fmt.Sprintf("PollUntilContextCancel() while resuming %d operations for orchestration %s", len(result), o.OrchestrationID)) } // don't wait for pending operations if orchestration was canceled @@ -545,7 +545,7 @@ func resolveMaintenanceWindowTime(r orchestration.Runtime, policy orchestration. } func (m *orchestrationManager) failOrchestration(o *internal.Orchestration, err error) (time.Duration, error) { - m.log.Errorf("orchestration %s failed: %s", o.OrchestrationID, err) + m.log.Error(fmt.Sprintf("orchestration %s failed: %s", o.OrchestrationID, err)) return m.updateOrchestration(o, orchestration.Failed, err.Error()), nil } @@ -556,7 +556,7 @@ func (m *orchestrationManager) updateOrchestration(o *internal.Orchestration, st err := m.orchestrationStorage.Update(*o) if err != nil { if !dberr.IsNotFound(err) { - m.log.Errorf("while updating orchestration: %v", err) + m.log.Error(fmt.Sprintf("while updating orchestration: %v", err)) return time.Minute } } @@ -597,12 +597,12 @@ func (m *orchestrationManager) sendNotificationCreate(o *internal.Orchestration, m.log.Info("Start to create notification") notificationBundle, err := m.bundleBuilder.NewBundle(o.OrchestrationID, notificationParams) if err != nil { - m.log.Errorf("%s: %s", "failed to create Notification Bundle", err) + m.log.Error(fmt.Sprintf("%s: %s", "failed to create Notification Bundle", err)) return err } err = notificationBundle.CreateNotificationEvent() if err != nil { - m.log.Errorf("%s: %s", "cannot send notification", err) + m.log.Error(fmt.Sprintf("%s: %s", "cannot send notification", err)) return err } m.log.Info("Creating notification succedded") @@ -634,12 +634,12 @@ func (m *orchestrationManager) sendNotificationCancel(o *internal.Orchestration, m.log.Info("Start to cancel notification") notificationBundle, err := m.bundleBuilder.NewBundle(o.OrchestrationID, notificationParams) if err != nil { - m.log.Errorf("%s: %s", "failed to create Notification Bundle", err) + m.log.Error(fmt.Sprintf("%s: %s", "failed to create Notification Bundle", err)) return err } err = notificationBundle.CancelNotificationEvent() if err != nil { - m.log.Errorf("%s: %s", "cannot cancel notification", err) + m.log.Error(fmt.Sprintf("%s: %s", "cannot cancel notification", err)) return err } m.log.Info("Cancelling notification succedded") @@ -658,7 +658,7 @@ func updateRetryingDescription(desc string, newDesc string) string { func (m *orchestrationManager) waitForStart(o *internal.Orchestration) ([]orchestration.RuntimeOperation, int, error) { maintenancePolicy, err := m.getMaintenancePolicy() if err != nil { - m.log.Warnf("while getting maintenance policy: %s", err) + m.log.Warn(fmt.Sprintf("while getting maintenance policy: %s", err)) } //polling every 5 min until ochestration start diff --git a/internal/orchestration/manager/upgrade_cluster.go b/internal/orchestration/manager/upgrade_cluster.go index 909078d5fd..a3bc08a6f6 100644 --- a/internal/orchestration/manager/upgrade_cluster.go +++ b/internal/orchestration/manager/upgrade_cluster.go @@ -2,6 +2,7 @@ package manager import ( "fmt" + "log/slog" "time" internalOrchestration "github.com/kyma-project/kyma-environment-broker/internal/orchestration" @@ -17,7 +18,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/kyma-project/kyma-environment-broker/internal/storage/dbmodel" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" ) type upgradeClusterFactory struct { @@ -26,7 +26,7 @@ type upgradeClusterFactory struct { func NewUpgradeClusterManager(orchestrationStorage storage.Orchestrations, operationStorage storage.Operations, instanceStorage storage.Instances, kymaClusterExecutor orchestration.OperationExecutor, resolver orchestration.RuntimeResolver, pollingInterval time.Duration, - log logrus.FieldLogger, cli client.Client, cfg internalOrchestration.Config, bundleBuilder notification.BundleBuilder, speedFactor int) process.Executor { + log *slog.Logger, cli client.Client, cfg internalOrchestration.Config, bundleBuilder notification.BundleBuilder, speedFactor int) process.Executor { return &orchestrationManager{ orchestrationStorage: orchestrationStorage, operationStorage: operationStorage, diff --git a/internal/orchestration/manager/upgrade_cluster_test.go b/internal/orchestration/manager/upgrade_cluster_test.go index f8facd028a..0c0d7c19df 100644 --- a/internal/orchestration/manager/upgrade_cluster_test.go +++ b/internal/orchestration/manager/upgrade_cluster_test.go @@ -2,6 +2,8 @@ package manager_test import ( "fmt" + "log/slog" + "os" "testing" "time" @@ -14,7 +16,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/orchestration/manager" "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/kyma-project/kyma-environment-broker/internal/storage/dbmodel" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -32,6 +33,9 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { Namespace: "default", Name: "policyConfig", } + log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) t.Run("Empty", func(t *testing.T) { // given @@ -72,7 +76,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { bundle.On("CreateNotificationEvent").Return(nil).Once() svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), nil, - resolver, 20*time.Millisecond, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + resolver, 20*time.Millisecond, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) // when _, err = svc.Execute(id) @@ -118,7 +122,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { bundle.On("CreateNotificationEvent").Return(nil).Once() svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), &testExecutor{}, - resolver, poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + resolver, poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) // when _, err = svc.Execute(id) @@ -166,7 +170,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { bundle.On("CreateNotificationEvent").Return(nil).Once() svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), nil, - resolver, poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + resolver, poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) // when _, err = svc.Execute(id) @@ -241,7 +245,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { bundle.On("CreateNotificationEvent").Return(nil).Once() svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), &testExecutor{}, - resolver, poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + resolver, poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) // when _, err = svc.Execute(id) @@ -294,7 +298,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { bundle.On("CancelNotificationEvent").Return(nil).Once() svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), &testExecutor{}, resolver, - poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) // when _, err = svc.Execute(id) @@ -370,7 +374,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { upgradeType: orchestration.UpgradeClusterOrchestration, } svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), &executor, resolver, - poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) _, err = store.Orchestrations().GetByID(id) require.NoError(t, err) @@ -474,7 +478,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { upgradeType: orchestration.UpgradeClusterOrchestration, } svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), &executor, resolver, - poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) _, err = store.Operations().GetUpgradeClusterOperationByID(opId) require.NoError(t, err) @@ -566,7 +570,7 @@ func TestUpgradeClusterManager_Execute(t *testing.T) { upgradeType: orchestration.UpgradeClusterOrchestration, } svc := manager.NewUpgradeClusterManager(store.Orchestrations(), store.Operations(), store.Instances(), &executor, resolver, - poolingInterval, logrus.New(), k8sClient, orchestrationConfig, notificationBuilder, 1000) + poolingInterval, log, k8sClient, orchestrationConfig, notificationBuilder, 1000) // when _, err = svc.Execute(id) diff --git a/internal/process/queue.go b/internal/process/queue.go index 16ae5a6929..2ce604a1f2 100644 --- a/internal/process/queue.go +++ b/internal/process/queue.go @@ -3,11 +3,11 @@ package process import ( "bytes" "fmt" + "log/slog" "runtime/debug" "sync" "time" - "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" ) @@ -20,7 +20,7 @@ type Queue struct { queue workqueue.RateLimitingInterface executor Executor waitGroup sync.WaitGroup - log logrus.FieldLogger + log *slog.Logger name string workerExecutionTimes map[string]time.Time warnAfterTime time.Duration @@ -29,13 +29,13 @@ type Queue struct { speedFactor int64 } -func NewQueue(executor Executor, log logrus.FieldLogger, name string, warnAfterTime, healthCheckIntervalTime time.Duration) *Queue { +func NewQueue(executor Executor, log *slog.Logger, name string, warnAfterTime, healthCheckIntervalTime time.Duration) *Queue { // add queue name field that could be logged later on return &Queue{ queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "operations"}), executor: executor, waitGroup: sync.WaitGroup{}, - log: log.WithField("queueName", name), + log: log.With("queueName", name), speedFactor: 1, name: name, workerExecutionTimes: make(map[string]time.Time), @@ -46,26 +46,26 @@ func NewQueue(executor Executor, log logrus.FieldLogger, name string, warnAfterT func (q *Queue) Add(processId string) { q.queue.Add(processId) - q.log.Infof("added item %s to the queue %s, queue length is %d", processId, q.name, q.queue.Len()) + q.log.Info(fmt.Sprintf("added item %s to the queue %s, queue length is %d", processId, q.name, q.queue.Len())) } func (q *Queue) AddAfter(processId string, duration time.Duration) { q.queue.AddAfter(processId, duration) - q.log.Infof("item %s will be added to the queue %s after duration of %d, queue length is %d", processId, q.name, duration, q.queue.Len()) + q.log.Info(fmt.Sprintf("item %s will be added to the queue %s after duration of %d, queue length is %d", processId, q.name, duration, q.queue.Len())) } func (q *Queue) ShutDown() { - q.log.Infof("shutting down the queue, queue length is %d", q.queue.Len()) + q.log.Info(fmt.Sprintf("shutting down the queue, queue length is %d", q.queue.Len())) q.queue.ShutDown() } func (q *Queue) Run(stop <-chan struct{}, workersAmount int) { - q.log.Infof("starting %d worker(s), queue length is %d", workersAmount, q.queue.Len()) + q.log.Info(fmt.Sprintf("starting %d worker(s), queue length is %d", workersAmount, q.queue.Len())) for i := 0; i < workersAmount; i++ { q.waitGroup.Add(1) - workerLogger := q.log.WithField("workerId", i) - workerLogger.Infof("starting worker with id %d", i) + workerLogger := q.log.With("workerId", i) + workerLogger.Info(fmt.Sprintf("starting worker with id %d", i)) q.createWorker(q.queue, q.executor.Execute, stop, &q.waitGroup, workerLogger, fmt.Sprintf("%s-%d", q.name, i)) } @@ -83,10 +83,10 @@ func (q *Queue) Run(stop <-chan struct{}, workersAmount int) { // This method should only be used for testing purposes func (q *Queue) SpeedUp(speedFactor int64) { q.speedFactor = speedFactor - q.log.Infof("queue speed factor set to %d", speedFactor) + q.log.Info(fmt.Sprintf("queue speed factor set to %d", speedFactor)) } -func (q *Queue) createWorker(queue workqueue.RateLimitingInterface, process func(id string) (time.Duration, error), stopCh <-chan struct{}, waitGroup *sync.WaitGroup, log logrus.FieldLogger, nameId string) { +func (q *Queue) createWorker(queue workqueue.RateLimitingInterface, process func(id string) (time.Duration, error), stopCh <-chan struct{}, waitGroup *sync.WaitGroup, log *slog.Logger, nameId string) { go func() { log.Info("worker routine - starting") wait.Until(q.worker(queue, process, log, nameId), time.Second, stopCh) @@ -95,25 +95,25 @@ func (q *Queue) createWorker(queue workqueue.RateLimitingInterface, process func }() } -func (q *Queue) worker(queue workqueue.RateLimitingInterface, process func(key string) (time.Duration, error), log logrus.FieldLogger, nameId string) func() { +func (q *Queue) worker(queue workqueue.RateLimitingInterface, process func(key string) (time.Duration, error), log *slog.Logger, nameId string) func() { return func() { exit := false for !exit { exit = func() bool { key, shutdown := queue.Get() if shutdown { - log.Infof("shutting down") + log.Info("shutting down") return true } id := key.(string) - log = log.WithField("operationID", id) - log.Infof("about to process item %s, queue length is %d", id, q.queue.Len()) + log = log.With("operationID", id) + log.Info(fmt.Sprintf("about to process item %s, queue length is %d", id, q.queue.Len())) q.logAndUpdateWorkerTimes(key.(string), nameId, log) defer func() { if err := recover(); err != nil { - log.Errorf("panic error from process: %v. Stacktrace: %s", err, debug.Stack()) + log.Error(fmt.Sprintf("panic error from process: %v. Stacktrace: %s", err, debug.Stack())) } queue.Done(key) log.Info("queue done processing") @@ -121,17 +121,17 @@ func (q *Queue) worker(queue workqueue.RateLimitingInterface, process func(key s when, err := process(id) if err == nil && when != 0 { - log.Infof("Adding %q item after %s, queue length %d", id, when, q.queue.Len()) + log.Info(fmt.Sprintf("Adding %q item after %s, queue length %d", id, when, q.queue.Len())) afterDuration := time.Duration(int64(when) / q.speedFactor) queue.AddAfter(key, afterDuration) return false } if err != nil { - log.Errorf("Error from process: %v", err) + log.Error(fmt.Sprintf("Error from process: %v", err)) } queue.Forget(key) - log.Infof("item for %s has been processed, no retry, element forgotten", id) + log.Info(fmt.Sprintf("item for %s has been processed, no retry, element forgotten", id)) return false }() @@ -139,20 +139,20 @@ func (q *Queue) worker(queue workqueue.RateLimitingInterface, process func(key s } } -func (q *Queue) logAndUpdateWorkerTimes(key string, name string, log logrus.FieldLogger) { +func (q *Queue) logAndUpdateWorkerTimes(key string, name string, log *slog.Logger) { // log time now := time.Now() lastTime, ok := q.workerExecutionTimes[name] if ok { - log.Infof("execution - worker %s last execution time %s, executed after %s", name, lastTime, now.Sub(lastTime)) + log.Info(fmt.Sprintf("execution - worker %s last execution time %s, executed after %s", name, lastTime, now.Sub(lastTime))) } q.workerExecutionTimes[name] = now - log.Infof("processing item %s, queue length is %d", key, q.queue.Len()) + log.Info(fmt.Sprintf("processing item %s, queue length is %d", key, q.queue.Len())) } func (q *Queue) logWorkersSummary() { - healthCheckLog := q.log.WithField("summary", q.name) + healthCheckLog := q.log.With("summary", q.name) var buffer bytes.Buffer buffer.WriteString(fmt.Sprintf("health - queue length %d", q.queue.Len())) @@ -167,7 +167,7 @@ func (q *Queue) logWorkersSummary() { for name, lastTime := range q.workerExecutionTimes { timeSinceLastExecution := time.Since(lastTime) if timeSinceLastExecution > q.warnAfterTime { - healthCheckLog.Infof("worker %s exceeded allowed limit of %s since last execution, its last execution is %s, time since last execution %s", name, q.warnAfterTime, lastTime, timeSinceLastExecution) + healthCheckLog.Info(fmt.Sprintf("worker %s exceeded allowed limit of %s since last execution, its last execution is %s, time since last execution %s", name, q.warnAfterTime, lastTime, timeSinceLastExecution)) } } } diff --git a/internal/process/queue_test.go b/internal/process/queue_test.go index 6af2ba47bb..1a9ae3c0db 100644 --- a/internal/process/queue_test.go +++ b/internal/process/queue_test.go @@ -4,12 +4,12 @@ import ( "bytes" "context" "fmt" + "log/slog" "strings" "sync" "testing" "time" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) @@ -26,10 +26,9 @@ func TestWorkerLogging(t *testing.T) { t.Run("should log basic worker information", func(t *testing.T) { // given - logger := logrus.New() - - var logs bytes.Buffer - logger.SetOutput(&logs) + cw := &captureWriter{buf: &bytes.Buffer{}} + handler := slog.NewTextHandler(cw, nil) + logger := slog.New(handler) cancelContext, cancel := context.WithCancel(context.Background()) var waitForProcessing sync.WaitGroup @@ -52,23 +51,31 @@ func TestWorkerLogging(t *testing.T) { queue.waitGroup.Wait() // then - stringLogs := logs.String() + stringLogs := cw.buf.String() t.Log(stringLogs) require.True(t, strings.Contains(stringLogs, "msg=\"starting 1 worker(s), queue length is 2\" queueName=test")) require.True(t, strings.Contains(stringLogs, "msg=\"starting worker with id 0\" queueName=test workerId=0")) require.True(t, strings.Contains(stringLogs, "msg=\"item processId2 will be added to the queue test after duration of 0, queue length is 1\" queueName=test")) require.True(t, strings.Contains(stringLogs, "msg=\"added item processId to the queue test, queue length is 2\" queueName=test")) - require.True(t, strings.Contains(stringLogs, "msg=\"processing item processId2, queue length is 1\" operationID=processId2 queueName=test")) - require.True(t, strings.Contains(stringLogs, "msg=\"processing item processId, queue length is 0\" operationID=processId queueName=test")) + require.True(t, strings.Contains(stringLogs, "msg=\"processing item processId2, queue length is 1\" queueName=test workerId=0 operationID=processId2")) + require.True(t, strings.Contains(stringLogs, "msg=\"processing item processId, queue length is 0\" queueName=test workerId=0 operationID=processId")) require.True(t, strings.Contains(stringLogs, "msg=\"shutting down the queue, queue length is 0\" queueName=test")) require.True(t, strings.Contains(stringLogs, "msg=\"queue speed factor set to 1\" queueName=test")) require.True(t, strings.Contains(stringLogs, "msg=\"worker routine - starting\" queueName=test workerId=0")) require.True(t, strings.Contains(stringLogs, "msg=\"worker done\" queueName=test workerId=0")) - require.True(t, strings.Contains(stringLogs, "msg=\"shutting down\" operationID=processId queueName=test workerId=0")) - require.True(t, strings.Contains(stringLogs, "msg=\"item for processId has been processed, no retry, element forgotten\" operationID=processId queueName=test workerId=0")) - require.True(t, strings.Contains(stringLogs, "msg=\"about to process item processId, queue length is 0\" operationID=processId queueName=test workerId=0")) + require.True(t, strings.Contains(stringLogs, "msg=\"shutting down\" queueName=test workerId=0 operationID=processId")) + require.True(t, strings.Contains(stringLogs, "msg=\"item for processId has been processed, no retry, element forgotten\" queueName=test workerId=0 operationID=processId")) + require.True(t, strings.Contains(stringLogs, "msg=\"about to process item processId, queue length is 0\" queueName=test workerId=0 operationID=processId")) require.True(t, strings.Contains(stringLogs, "msg=\"execution - worker test-0 last execution time")) require.True(t, strings.Contains(stringLogs, "executed after")) }) } + +type captureWriter struct { + buf *bytes.Buffer +} + +func (c *captureWriter) Write(p []byte) (n int, err error) { + return c.buf.Write(p) +} diff --git a/internal/process/upgrade_cluster/initialisation.go b/internal/process/upgrade_cluster/initialisation.go index 9c33c2f801..16c1ae4af8 100644 --- a/internal/process/upgrade_cluster/initialisation.go +++ b/internal/process/upgrade_cluster/initialisation.go @@ -2,6 +2,7 @@ package upgrade_cluster import ( "fmt" + "log/slog" "time" "github.com/kyma-project/control-plane/components/provisioner/pkg/gqlschema" @@ -14,7 +15,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" ) const ( @@ -64,7 +64,7 @@ func (s *InitialisationStep) Name() string { return "Upgrade_Cluster_Initialisation" } -func (s *InitialisationStep) Run(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (s *InitialisationStep) Run(operation internal.UpgradeClusterOperation, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { // Check concurrent deprovisioning (or suspension) operation (launched after target resolution) // Terminate (preempt) upgrade immediately with succeeded lastOp, err := s.operationStorage.GetLastOperation(operation.InstanceID) @@ -82,7 +82,7 @@ func (s *InitialisationStep) Run(operation internal.UpgradeClusterOperation, log return operation, s.timeSchedule.Retry, nil } if orchestration.IsCanceled() { - log.Infof("Skipping processing because orchestration %s was canceled", operation.OrchestrationID) + log.Info(fmt.Sprintf("Skipping processing because orchestration %s was canceled", operation.OrchestrationID)) return s.operationManager.OperationCanceled(operation, fmt.Sprintf("orchestration %s was canceled", operation.OrchestrationID), log) } @@ -111,22 +111,22 @@ func (s *InitialisationStep) Run(operation internal.UpgradeClusterOperation, log return s.initializeUpgradeShootRequest(operation, log) } - log.Infof("runtime being upgraded, check operation status for provisioner operation id: %v", operation.ProvisionerOperationID) - return s.checkRuntimeStatus(operation, log.WithField("runtimeID", operation.RuntimeOperation.RuntimeID)) + log.Info(fmt.Sprintf("runtime being upgraded, check operation status for provisioner operation id: %v", operation.ProvisionerOperationID)) + return s.checkRuntimeStatus(operation, log.With("runtimeID", operation.RuntimeOperation.RuntimeID)) } -func (s *InitialisationStep) initializeUpgradeShootRequest(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { - log.Infof("create provisioner input creator for plan ID %q", operation.ProvisioningParameters) +func (s *InitialisationStep) initializeUpgradeShootRequest(operation internal.UpgradeClusterOperation, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { + log.Info(fmt.Sprintf("create provisioner input creator for plan ID %+v", operation.ProvisioningParameters)) creator, err := s.inputBuilder.CreateUpgradeShootInput(operation.ProvisioningParameters) switch { case err == nil: operation.InputCreator = creator return operation, 0, nil // go to next step case kebError.IsTemporaryError(err): - log.Errorf("cannot create upgrade shoot input creator at the moment for plan %s: %s", operation.ProvisioningParameters.PlanID, err) + log.Error(fmt.Sprintf("cannot create upgrade shoot input creator at the moment for plan %s: %s", operation.ProvisioningParameters.PlanID, err)) return s.operationManager.RetryOperation(operation, "error while creating upgrade shoot input creator", err, 5*time.Second, 5*time.Minute, log) default: - log.Errorf("cannot create input creator for plan %s: %s", operation.ProvisioningParameters.PlanID, err) + log.Error(fmt.Sprintf("cannot create input creator for plan %s: %s", operation.ProvisioningParameters.PlanID, err)) return s.operationManager.OperationFailed(operation, "cannot create provisioning input creator", err, log) } } @@ -134,9 +134,9 @@ func (s *InitialisationStep) initializeUpgradeShootRequest(operation internal.Up // checkRuntimeStatus will check operation runtime status // It will also trigger performRuntimeTasks upgrade steps to ensure // all the required dependencies have been fulfilled for upgrade operation. -func (s *InitialisationStep) checkRuntimeStatus(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (s *InitialisationStep) checkRuntimeStatus(operation internal.UpgradeClusterOperation, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { if time.Since(operation.UpdatedAt) > CheckStatusTimeout { - log.Infof("operation has reached the time limit: updated operation time: %s", operation.UpdatedAt) + log.Info(fmt.Sprintf("operation has reached the time limit: updated operation time: %s", operation.UpdatedAt)) //send customer notification if operation.RuntimeOperation.Notification { err := s.sendNotificationComplete(operation, log) @@ -152,7 +152,7 @@ func (s *InitialisationStep) checkRuntimeStatus(operation internal.UpgradeCluste if err != nil { return operation, s.timeSchedule.StatusCheck, nil } - log.Infof("call to provisioner returned %s status", status.State.String()) + log.Info(fmt.Sprintf("call to provisioner returned %s status", status.State.String())) var msg string if status.Message != nil { @@ -194,7 +194,7 @@ func (s *InitialisationStep) checkRuntimeStatus(operation internal.UpgradeCluste return s.operationManager.OperationFailed(operation, fmt.Sprintf("unsupported provisioner client status: %s", status.State.String()), nil, log) } -func (s *InitialisationStep) sendNotificationComplete(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) error { +func (s *InitialisationStep) sendNotificationComplete(operation internal.UpgradeClusterOperation, log *slog.Logger) error { tenants := []notification.NotificationTenant{ { InstanceID: operation.InstanceID, @@ -208,13 +208,13 @@ func (s *InitialisationStep) sendNotificationComplete(operation internal.Upgrade } notificationBundle, err := s.bundleBuilder.NewBundle(operation.OrchestrationID, notificationParams) if err != nil { - log.Errorf("%s: %s", "Failed to create Notification Bundle", err) + log.Error(fmt.Sprintf("%s: %s", "Failed to create Notification Bundle", err)) return err } err2 := notificationBundle.UpdateNotificationEvent() if err2 != nil { msg := fmt.Sprintf("cannot update notification for orchestration %s", operation.OrchestrationID) - log.Errorf("%s: %s", msg, err) + log.Error(fmt.Sprintf("%s: %s", msg, err)) return err } return nil diff --git a/internal/process/upgrade_cluster/initialisation_test.go b/internal/process/upgrade_cluster/initialisation_test.go index d8d2256ac9..11c45515ff 100644 --- a/internal/process/upgrade_cluster/initialisation_test.go +++ b/internal/process/upgrade_cluster/initialisation_test.go @@ -1,6 +1,8 @@ package upgrade_cluster import ( + "log/slog" + "os" "testing" "time" @@ -23,7 +25,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal" "github.com/kyma-project/kyma-environment-broker/internal/broker" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" ) const ( @@ -43,9 +44,11 @@ type fixHyperscalerInputProvider interface { } func TestInitialisationStep_Run(t *testing.T) { + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) t.Run("should mark operation as Succeeded when upgrade was successful", func(t *testing.T) { // given - log := logrus.New() memoryStorage := storage.NewMemoryStorage() orch := internal.Orchestration{ @@ -114,7 +117,6 @@ func TestInitialisationStep_Run(t *testing.T) { t.Run("should initialize UpgradeRuntimeInput request when run", func(t *testing.T) { // given - log := logrus.New() memoryStorage := storage.NewMemoryStorage() err := memoryStorage.Orchestrations().Insert(fixOrchestrationWithKymaVer()) @@ -178,7 +180,6 @@ func TestInitialisationStep_Run(t *testing.T) { t.Run("should mark finish if orchestration was canceled", func(t *testing.T) { // given - log := logrus.New() memoryStorage := storage.NewMemoryStorage() err := memoryStorage.Orchestrations().Insert(internal.Orchestration{ diff --git a/internal/process/upgrade_cluster/log_skipping_upgrade.go b/internal/process/upgrade_cluster/log_skipping_upgrade.go index eebfe73209..07f71b336a 100644 --- a/internal/process/upgrade_cluster/log_skipping_upgrade.go +++ b/internal/process/upgrade_cluster/log_skipping_upgrade.go @@ -1,13 +1,12 @@ package upgrade_cluster import ( + "log/slog" "time" "github.com/kyma-project/kyma-environment-broker/internal/process" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" - "github.com/kyma-project/kyma-environment-broker/internal" ) @@ -25,7 +24,7 @@ func NewLogSkippingUpgradeStep(os storage.Operations) *LogSkippingUpgradeStep { } } -func (s *LogSkippingUpgradeStep) Run(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (s *LogSkippingUpgradeStep) Run(operation internal.UpgradeClusterOperation, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { log.Info("Skipping cluster upgrade due to step condition not met") return s.operationManager.OperationSucceeded(operation, "upgrade cluster skipped due to step condition", log) diff --git a/internal/process/upgrade_cluster/manager.go b/internal/process/upgrade_cluster/manager.go index d7cbdd2bfe..069bb1fc6f 100644 --- a/internal/process/upgrade_cluster/manager.go +++ b/internal/process/upgrade_cluster/manager.go @@ -3,6 +3,7 @@ package upgrade_cluster import ( "context" "fmt" + "log/slog" "sort" "time" @@ -12,12 +13,11 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/event" "github.com/kyma-project/kyma-environment-broker/internal/process" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" ) type Step interface { Name() string - Run(operation internal.UpgradeClusterOperation, logger logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) + Run(operation internal.UpgradeClusterOperation, logger *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) } type StepCondition func(operation internal.Operation) bool @@ -28,14 +28,14 @@ type StepWithCondition struct { } type Manager struct { - log logrus.FieldLogger + log *slog.Logger steps map[int][]StepWithCondition operationStorage storage.Operations publisher event.Publisher } -func NewManager(storage storage.Operations, pub event.Publisher, logger logrus.FieldLogger) *Manager { +func NewManager(storage storage.Operations, pub event.Publisher, logger *slog.Logger) *Manager { return &Manager{ log: logger, steps: make(map[int][]StepWithCondition, 0), @@ -55,10 +55,10 @@ func (m *Manager) AddStep(weight int, step Step, condition StepCondition) { m.steps[weight] = append(m.steps[weight], StepWithCondition{Step: step, condition: condition}) } -func (m *Manager) runStep(step Step, operation internal.UpgradeClusterOperation, logger logrus.FieldLogger) (processedOperation internal.UpgradeClusterOperation, when time.Duration, err error) { +func (m *Manager) runStep(step Step, operation internal.UpgradeClusterOperation, logger *slog.Logger) (processedOperation internal.UpgradeClusterOperation, when time.Duration, err error) { defer func() { if pErr := recover(); pErr != nil { - logger.Println("panic in RunStep during cluster upgrade: ", pErr) + logger.Error(fmt.Sprintf("panic in RunStep during cluster upgrade: %v", pErr)) err = errors.New(fmt.Sprintf("%v", pErr)) om := process.NewUpgradeClusterOperationManager(m.operationStorage) processedOperation, _, _ = om.OperationFailed(operation, "recovered from panic", err, m.log) @@ -93,7 +93,7 @@ func (m *Manager) sortWeight() []int { func (m *Manager) Execute(operationID string) (time.Duration, error) { op, err := m.operationStorage.GetUpgradeClusterOperationByID(operationID) if err != nil { - m.log.Errorf("Cannot fetch operation from storage: %s", err) + m.log.Error(fmt.Sprintf("Cannot fetch operation from storage: %s", err)) return 3 * time.Second, nil } operation := *op @@ -102,27 +102,27 @@ func (m *Manager) Execute(operationID string) (time.Duration, error) { } var when time.Duration - logOperation := m.log.WithFields(logrus.Fields{"operation": operationID, "instanceID": operation.InstanceID}) + logOperation := m.log.With("operation", operationID, "instanceID", operation.InstanceID) logOperation.Info("Start process operation steps") for _, weightStep := range m.sortWeight() { steps := m.steps[weightStep] for _, step := range steps { - logStep := logOperation.WithField("step", step.Name()) + logStep := logOperation.With("step", step.Name()) if step.condition != nil && !step.condition(operation.Operation) { - logStep.Debugf("Skipping due to not met condition") + logStep.Debug("Skipping due to not met condition") continue } - logStep.Infof("Start step") + logStep.Info("Start step") operation, when, err = m.runStep(step, operation, logStep) if err != nil { - logStep.Errorf("Process operation failed: %s", err) + logStep.Error(fmt.Sprintf("Process operation failed: %s", err)) return 0, err } if operation.IsFinished() { - logStep.Infof("Operation %q got status %s. Process finished.", operation.Operation.ID, operation.State) + logStep.Info(fmt.Sprintf("Operation %q got status %s. Process finished.", operation.Operation.ID, operation.State)) return 0, nil } if when == 0 { @@ -130,26 +130,26 @@ func (m *Manager) Execute(operationID string) (time.Duration, error) { continue } - logStep.Infof("Process operation will be repeated in %s ...", when) + logStep.Info(fmt.Sprintf("Process operation will be repeated in %s ...", when)) return when, nil } } - logOperation.Infof("Operation %q got status %s. All steps finished.", operation.Operation.ID, operation.State) + logOperation.Info(fmt.Sprintf("Operation %q got status %s. All steps finished.", operation.Operation.ID, operation.State)) return 0, nil } func (m Manager) Reschedule(operationID string, maintenanceWindowBegin, maintenanceWindowEnd time.Time) error { op, err := m.operationStorage.GetUpgradeClusterOperationByID(operationID) if err != nil { - m.log.Errorf("Cannot fetch operation %s from storage: %s", operationID, err) + m.log.Error(fmt.Sprintf("Cannot fetch operation %s from storage: %s", operationID, err)) return err } op.MaintenanceWindowBegin = maintenanceWindowBegin op.MaintenanceWindowEnd = maintenanceWindowEnd op, err = m.operationStorage.UpdateUpgradeClusterOperation(*op) if err != nil { - m.log.Errorf("Cannot update (reschedule) operation %s in storage: %s", operationID, err) + m.log.Error(fmt.Sprintf("Cannot update (reschedule) operation %s in storage: %s", operationID, err)) } return err diff --git a/internal/process/upgrade_cluster/manager_test.go b/internal/process/upgrade_cluster/manager_test.go index b576642d24..762a53853a 100644 --- a/internal/process/upgrade_cluster/manager_test.go +++ b/internal/process/upgrade_cluster/manager_test.go @@ -18,7 +18,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/event" "github.com/kyma-project/kyma-environment-broker/internal/process" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/util/wait" ) @@ -68,7 +67,6 @@ func TestManager_Execute(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) - log := logrus.New() memoryStorage := storage.NewMemoryStorage() operations := memoryStorage.Operations() err := operations.InsertUpgradeClusterOperation(fixOperation(tc.operationID)) @@ -84,7 +82,7 @@ func TestManager_Execute(t *testing.T) { eventCollector := &collectingEventHandler{} eventBroker.Subscribe(process.UpgradeClusterStepProcessed{}, eventCollector.OnEvent) - manager := NewManager(operations, eventBroker, log) + manager := NewManager(operations, eventBroker, logger) manager.InitStep(&sInit) manager.AddStep(2, &sFinal, nil) @@ -135,8 +133,8 @@ func (ts *testStep) Name() string { return ts.name } -func (ts *testStep) Run(operation internal.UpgradeClusterOperation, logger logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { - logger.Infof("inside %s step", ts.name) +func (ts *testStep) Run(operation internal.UpgradeClusterOperation, logger *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { + logger.Info(fmt.Sprintf("inside %s step", ts.name)) operation.Description = fmt.Sprintf("%s %s", operation.Description, ts.name) updated, err := ts.storage.UpdateUpgradeClusterOperation(operation) diff --git a/internal/process/upgrade_cluster/send_notification.go b/internal/process/upgrade_cluster/send_notification.go index 42f9782bfb..87a8f75267 100644 --- a/internal/process/upgrade_cluster/send_notification.go +++ b/internal/process/upgrade_cluster/send_notification.go @@ -2,10 +2,9 @@ package upgrade_cluster import ( "fmt" + "log/slog" "time" - "github.com/sirupsen/logrus" - "github.com/kyma-project/kyma-environment-broker/internal" kebError "github.com/kyma-project/kyma-environment-broker/internal/error" "github.com/kyma-project/kyma-environment-broker/internal/notification" @@ -29,7 +28,7 @@ func NewSendNotificationStep(os storage.Operations, bundleBuilder notification.B } } -func (s *SendNotificationStep) Run(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (s *SendNotificationStep) Run(operation internal.UpgradeClusterOperation, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { if operation.RuntimeOperation.Notification { tenants := []notification.NotificationTenant{ { @@ -44,16 +43,16 @@ func (s *SendNotificationStep) Run(operation internal.UpgradeClusterOperation, l } notificationBundle, err := s.bundleBuilder.NewBundle(operation.OrchestrationID, notificationParams) if err != nil { - log.Errorf("%s: %s", "Failed to create Notification Bundle", err) + log.Error(fmt.Sprintf("%s: %s", "Failed to create Notification Bundle", err)) return operation, 5 * time.Second, nil } - log.Infof("Sending http request to customer notification service") + log.Info("Sending http request to customer notification service") err = notificationBundle.UpdateNotificationEvent() //currently notification error can only be temporary error if err != nil && kebError.IsTemporaryError(err) { msg := fmt.Sprintf("cannot update notification for orchestration %s", operation.OrchestrationID) - log.Errorf("%s: %s", msg, err) + log.Error(fmt.Sprintf("%s: %s", msg, err)) return operation, 5 * time.Second, nil } } diff --git a/internal/process/upgrade_cluster/send_notification_test.go b/internal/process/upgrade_cluster/send_notification_test.go index 843b09c114..0f629f72a7 100644 --- a/internal/process/upgrade_cluster/send_notification_test.go +++ b/internal/process/upgrade_cluster/send_notification_test.go @@ -1,11 +1,12 @@ package upgrade_cluster import ( + "log/slog" + "os" "testing" "time" "github.com/kyma-project/kyma-environment-broker/internal" - "github.com/kyma-project/kyma-environment-broker/internal/logger" "github.com/kyma-project/kyma-environment-broker/internal/notification" "github.com/kyma-project/kyma-environment-broker/internal/notification/mocks" "github.com/kyma-project/kyma-environment-broker/internal/storage" @@ -16,6 +17,9 @@ import ( func TestSendNotificationStep_Run(t *testing.T) { // given memoryStorage := storage.NewMemoryStorage() + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) tenants := []notification.NotificationTenant{ { InstanceID: notification.FakeInstanceID, @@ -42,7 +46,7 @@ func TestSendNotificationStep_Run(t *testing.T) { step := NewSendNotificationStep(memoryStorage.Operations(), bundleBuilder) // when - _, repeat, err := step.Run(operation, logger.NewLogDummy()) + _, repeat, err := step.Run(operation, log) // then assert.NoError(t, err) diff --git a/internal/process/upgrade_cluster/upgrade_cluster_step.go b/internal/process/upgrade_cluster/upgrade_cluster_step.go index c819638c8c..013dee01d2 100644 --- a/internal/process/upgrade_cluster/upgrade_cluster_step.go +++ b/internal/process/upgrade_cluster/upgrade_cluster_step.go @@ -2,6 +2,7 @@ package upgrade_cluster import ( "fmt" + "log/slog" "time" "github.com/kyma-project/control-plane/components/provisioner/pkg/gqlschema" @@ -9,7 +10,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/process" "github.com/kyma-project/kyma-environment-broker/internal/provisioner" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" ) const DryRunPrefix = "dry_run-" @@ -47,9 +47,9 @@ func (s *UpgradeClusterStep) Name() string { return "Upgrade_Cluster" } -func (s *UpgradeClusterStep) Run(operation internal.UpgradeClusterOperation, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (s *UpgradeClusterStep) Run(operation internal.UpgradeClusterOperation, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { if time.Since(operation.UpdatedAt) > s.timeSchedule.UpgradeClusterTimeout { - log.Infof("operation has reached the time limit: updated operation time: %s", operation.UpdatedAt) + log.Info(fmt.Sprintf("operation has reached the time limit: updated operation time: %s", operation.UpdatedAt)) return s.operationManager.OperationFailed(operation, fmt.Sprintf("operation has reached the time limit: %s", s.timeSchedule.UpgradeClusterTimeout), nil, log) } @@ -79,7 +79,7 @@ func (s *UpgradeClusterStep) Run(operation internal.UpgradeClusterOperation, log // trigger upgradeRuntime mutation provisionerResponse, err = s.provisionerClient.UpgradeShoot(operation.ProvisioningParameters.ErsContext.GlobalAccountID, operation.RuntimeOperation.RuntimeID, input) if err != nil { - log.Errorf("call to provisioner failed: %s", err) + log.Error(fmt.Sprintf("call to provisioner failed: %s", err)) return operation, s.timeSchedule.Retry, nil } @@ -89,7 +89,7 @@ func (s *UpgradeClusterStep) Run(operation internal.UpgradeClusterOperation, log op.Description = "cluster upgrade in progress" }, log) if repeat != 0 { - log.Errorf("cannot save operation ID from provisioner") + log.Error("cannot save operation ID from provisioner") return operation, s.timeSchedule.Retry, nil } } @@ -97,24 +97,24 @@ func (s *UpgradeClusterStep) Run(operation internal.UpgradeClusterOperation, log if provisionerResponse.RuntimeID == nil { provisionerResponse, err = s.provisionerClient.RuntimeOperationStatus(operation.ProvisioningParameters.ErsContext.GlobalAccountID, operation.ProvisionerOperationID) if err != nil { - log.Errorf("call to provisioner about operation status failed: %s", err) + log.Error(fmt.Sprintf("call to provisioner about operation status failed: %s", err)) return operation, s.timeSchedule.Retry, nil } } if provisionerResponse.RuntimeID == nil { return operation, s.timeSchedule.StatusCheck, nil } - log = log.WithField("runtimeID", *provisionerResponse.RuntimeID) - log.Infof("call to provisioner for upgrade succeeded, got operation ID %q", *provisionerResponse.ID) + log = log.With("runtimeID", *provisionerResponse.RuntimeID) + log.Info(fmt.Sprintf("call to provisioner for upgrade succeeded, got operation ID %q", *provisionerResponse.ID)) rs := internal.NewRuntimeState(*provisionerResponse.RuntimeID, operation.Operation.ID, nil, gardenerUpgradeInputToConfigInput(input)) err = s.runtimeStateStorage.Insert(rs) if err != nil { - log.Errorf("cannot insert runtimeState: %s", err) + log.Error(fmt.Sprintf("cannot insert runtimeState: %s", err)) return operation, 10 * time.Second, nil } - log.Infof("cluster upgrade process initiated successfully") + log.Info("cluster upgrade process initiated successfully") // return repeat mode to start the initialization step which will now check the runtime status return operation, s.timeSchedule.Retry, nil diff --git a/internal/process/upgrade_cluster/upgrade_cluster_step_test.go b/internal/process/upgrade_cluster/upgrade_cluster_step_test.go index aa544013f6..f4dd761a16 100644 --- a/internal/process/upgrade_cluster/upgrade_cluster_step_test.go +++ b/internal/process/upgrade_cluster/upgrade_cluster_step_test.go @@ -1,6 +1,8 @@ package upgrade_cluster import ( + "log/slog" + "os" "testing" "time" @@ -12,7 +14,6 @@ import ( provisionerAutomock "github.com/kyma-project/kyma-environment-broker/internal/provisioner/automock" "github.com/kyma-project/kyma-environment-broker/internal/ptr" "github.com/kyma-project/kyma-environment-broker/internal/storage" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -29,7 +30,9 @@ const ( func TestUpgradeClusterStep_Run(t *testing.T) { // given expectedOIDC := fixture.FixOIDCConfigDTO() - log := logrus.New() + log := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) memoryStorage := storage.NewMemoryStorage() operation := fixUpgradeClusterOperationWithInputCreator(t) @@ -97,7 +100,7 @@ func TestUpgradeClusterStep_Run(t *testing.T) { // when - operation, repeat, err := step.Run(operation, log.WithFields(logrus.Fields{"step": "TEST"})) + operation, repeat, err := step.Run(operation, log.With("step", "TEST")) // then assert.NoError(t, err) diff --git a/internal/process/upgrade_cluster_operation.go b/internal/process/upgrade_cluster_operation.go index e3d84abc6a..fdc967bfd7 100644 --- a/internal/process/upgrade_cluster_operation.go +++ b/internal/process/upgrade_cluster_operation.go @@ -2,6 +2,7 @@ package process import ( "fmt" + "log/slog" "time" "github.com/kyma-project/kyma-environment-broker/common/orchestration" @@ -9,7 +10,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/kyma-project/kyma-environment-broker/internal/storage/dberr" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" ) type UpgradeClusterOperationManager struct { @@ -21,7 +21,7 @@ func NewUpgradeClusterOperationManager(storage storage.Operations) *UpgradeClust } // OperationSucceeded marks the operation as succeeded and only repeats it if there is a storage error -func (om *UpgradeClusterOperationManager) OperationSucceeded(operation internal.UpgradeClusterOperation, description string, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) OperationSucceeded(operation internal.UpgradeClusterOperation, description string, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { updatedOperation, repeat, _ := om.update(operation, orchestration.Succeeded, description, log) // repeat in case of storage error if repeat != 0 { @@ -32,7 +32,7 @@ func (om *UpgradeClusterOperationManager) OperationSucceeded(operation internal. } // OperationFailed marks the operation as failed and only repeats it if there is a storage error -func (om *UpgradeClusterOperationManager) OperationFailed(operation internal.UpgradeClusterOperation, description string, err error, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) OperationFailed(operation internal.UpgradeClusterOperation, description string, err error, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { updatedOperation, repeat, _ := om.update(operation, orchestration.Failed, description, log) // repeat in case of storage error if repeat != 0 { @@ -52,7 +52,7 @@ func (om *UpgradeClusterOperationManager) OperationFailed(operation internal.Upg } // OperationSucceeded marks the operation as succeeded and only repeats it if there is a storage error -func (om *UpgradeClusterOperationManager) OperationCanceled(operation internal.UpgradeClusterOperation, description string, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) OperationCanceled(operation internal.UpgradeClusterOperation, description string, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { updatedOperation, repeat, _ := om.update(operation, orchestration.Canceled, description, log) if repeat != 0 { return updatedOperation, repeat, nil @@ -62,20 +62,20 @@ func (om *UpgradeClusterOperationManager) OperationCanceled(operation internal.U } // RetryOperation retries an operation for at maxTime in retryInterval steps and fails the operation if retrying failed -func (om *UpgradeClusterOperationManager) RetryOperation(operation internal.UpgradeClusterOperation, errorMessage string, err error, retryInterval time.Duration, maxTime time.Duration, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) RetryOperation(operation internal.UpgradeClusterOperation, errorMessage string, err error, retryInterval time.Duration, maxTime time.Duration, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { since := time.Since(operation.UpdatedAt) - log.Infof("Retry Operation was triggered with message: %s", errorMessage) - log.Infof("Retrying for %s in %s steps", maxTime.String(), retryInterval.String()) + log.Info(fmt.Sprintf("Retry Operation was triggered with message: %s", errorMessage)) + log.Info(fmt.Sprintf("Retrying for %s in %s steps", maxTime.String(), retryInterval.String())) if since < maxTime { return operation, retryInterval, nil } - log.Errorf("Aborting after %s of failing retries", maxTime.String()) + log.Error(fmt.Sprintf("Aborting after %s of failing retries", maxTime.String())) return om.OperationFailed(operation, errorMessage, err, log) } // UpdateOperation updates a given operation -func (om *UpgradeClusterOperationManager) UpdateOperation(operation internal.UpgradeClusterOperation, update func(operation *internal.UpgradeClusterOperation), log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) UpdateOperation(operation internal.UpgradeClusterOperation, update func(operation *internal.UpgradeClusterOperation), log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { update(&operation) updatedOperation, err := om.storage.UpdateUpgradeClusterOperation(operation) switch { @@ -83,19 +83,19 @@ func (om *UpgradeClusterOperationManager) UpdateOperation(operation internal.Upg { op, err := om.storage.GetUpgradeClusterOperationByID(operation.Operation.ID) if err != nil { - log.Errorf("while getting operation: %v", err) + log.Error(fmt.Sprintf("while getting operation: %v", err)) return operation, 1 * time.Minute, err } op.Merge(&operation.Operation) update(op) updatedOperation, err = om.storage.UpdateUpgradeClusterOperation(*op) if err != nil { - log.Errorf("while updating operation after conflict: %v", err) + log.Error(fmt.Sprintf("while updating operation after conflict: %v", err)) return operation, 1 * time.Minute, err } } case err != nil: - log.Errorf("while updating operation: %v", err) + log.Error(fmt.Sprintf("while updating operation: %v", err)) return operation, 1 * time.Minute, err } return *updatedOperation, 0, nil @@ -105,20 +105,20 @@ func (om *UpgradeClusterOperationManager) UpdateOperation(operation internal.Upg func (om *UpgradeClusterOperationManager) SimpleUpdateOperation(operation internal.UpgradeClusterOperation) (internal.UpgradeClusterOperation, time.Duration) { updatedOperation, err := om.storage.UpdateUpgradeClusterOperation(operation) if err != nil { - logrus.WithField("orchestrationID", operation.OrchestrationID). - WithField("instanceID", operation.InstanceID). - Errorf("Update upgradeCluster operation failed: %s", err.Error()) + slog.With("orchestrationID", operation.OrchestrationID). + With("instanceID", operation.InstanceID). + Error(fmt.Sprintf("Update upgradeCluster operation failed: %s", err.Error())) return operation, 1 * time.Minute } return *updatedOperation, 0 } // RetryOperationWithoutFail retries an operation for at maxTime in retryInterval steps and omits the operation if retrying failed -func (om *UpgradeClusterOperationManager) RetryOperationWithoutFail(operation internal.UpgradeClusterOperation, description string, retryInterval, maxTime time.Duration, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) RetryOperationWithoutFail(operation internal.UpgradeClusterOperation, description string, retryInterval, maxTime time.Duration, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { since := time.Since(operation.UpdatedAt) - log.Infof("Retry Operation was triggered with message: %s", description) - log.Infof("Retrying for %s in %s steps", maxTime.String(), retryInterval.String()) + log.Info(fmt.Sprintf("Retry Operation was triggered with message: %s", description)) + log.Info(fmt.Sprintf("Retrying for %s in %s steps", maxTime.String(), retryInterval.String())) if since < maxTime { return operation, retryInterval, nil } @@ -128,11 +128,11 @@ func (om *UpgradeClusterOperationManager) RetryOperationWithoutFail(operation in return updatedOperation, repeat, nil } - log.Errorf("Omitting after %s of failing retries", maxTime.String()) + log.Error(fmt.Sprintf("Omitting after %s of failing retries", maxTime.String())) return updatedOperation, 0, nil } -func (om *UpgradeClusterOperationManager) update(operation internal.UpgradeClusterOperation, state domain.LastOperationState, description string, log logrus.FieldLogger) (internal.UpgradeClusterOperation, time.Duration, error) { +func (om *UpgradeClusterOperationManager) update(operation internal.UpgradeClusterOperation, state domain.LastOperationState, description string, log *slog.Logger) (internal.UpgradeClusterOperation, time.Duration, error) { return om.UpdateOperation(operation, func(operation *internal.UpgradeClusterOperation) { operation.State = state operation.Description = description diff --git a/internal/process/upgrade_cluster_operation_test.go b/internal/process/upgrade_cluster_operation_test.go index 7a3e75fd3d..2924dc1725 100644 --- a/internal/process/upgrade_cluster_operation_test.go +++ b/internal/process/upgrade_cluster_operation_test.go @@ -9,7 +9,6 @@ import ( "github.com/kyma-project/kyma-environment-broker/internal/fixture" "github.com/kyma-project/kyma-environment-broker/internal/storage" "github.com/pivotal-cf/brokerapi/v8/domain" - "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -24,7 +23,7 @@ func TestUpgradeClusterOperationManager_OperationSucceeded(t *testing.T) { require.NoError(t, err) // when - op, when, err := opManager.OperationSucceeded(op, "task succeeded", logrus.New()) + op, when, err := opManager.OperationSucceeded(op, "task succeeded", fixLogger()) // then assert.NoError(t, err) @@ -45,7 +44,7 @@ func TestUpgradeClusterOperationManager_OperationFailed(t *testing.T) { errOut := fmt.Errorf("error occurred") // when - op, when, err := opManager.OperationFailed(op, errMsg, errOut, logrus.New()) + op, when, err := opManager.OperationFailed(op, errMsg, errOut, fixLogger()) // then assert.Error(t, err) @@ -54,7 +53,7 @@ func TestUpgradeClusterOperationManager_OperationFailed(t *testing.T) { assert.Equal(t, time.Duration(0), when) // when - _, _, err = opManager.OperationFailed(op, errMsg, nil, logrus.New()) + _, _, err = opManager.OperationFailed(op, errMsg, nil, fixLogger()) // then assert.Error(t, err) @@ -78,7 +77,7 @@ func TestUpgradeClusterOperationManager_RetryOperation(t *testing.T) { require.NoError(t, err) // then - first call - op, when, err := opManager.RetryOperation(op, errorMessage, errOut, retryInterval, maxtime, logrus.New()) + op, when, err := opManager.RetryOperation(op, errorMessage, errOut, retryInterval, maxtime, fixLogger()) // when - first retry assert.True(t, when > 0) @@ -88,7 +87,7 @@ func TestUpgradeClusterOperationManager_RetryOperation(t *testing.T) { t.Log(op.UpdatedAt.String()) op.UpdatedAt = op.UpdatedAt.Add(-retryInterval - time.Second) // simulate wait of first retry t.Log(op.UpdatedAt.String()) - op, when, err = opManager.RetryOperation(op, errorMessage, errOut, retryInterval, maxtime, logrus.New()) + op, when, err = opManager.RetryOperation(op, errorMessage, errOut, retryInterval, maxtime, fixLogger()) // when - second call => retry assert.True(t, when > 0)