Skip to content

Commit

Permalink
Use slog in queues (#1564)
Browse files Browse the repository at this point in the history
  • Loading branch information
KsaweryZietara authored Dec 11, 2024
1 parent ab95bcb commit 0e9e359
Show file tree
Hide file tree
Showing 28 changed files with 273 additions and 267 deletions.
14 changes: 7 additions & 7 deletions cmd/broker/broker_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,20 @@ func NewBrokerSuiteTestWithConfig(t *testing.T, cfg *Config, version ...string)
k8sClientProvider := kubeconfig.NewFakeK8sClientProvider(fakeK8sSKRClient)
provisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Provisioning, log.With("provisioning", "manager"))
provisioningQueue := NewProvisioningProcessingQueue(context.Background(), provisionManager, workersAmount, cfg, db, provisionerClient, inputFactory,
edpClient, accountProvider, k8sClientProvider, cli, defaultOIDCValues(), logs)
edpClient, accountProvider, k8sClientProvider, cli, defaultOIDCValues(), log)

provisioningQueue.SpeedUp(10000)
provisionManager.SpeedUp(10000)

updateManager := process.NewStagedManager(db.Operations(), eventBroker, time.Hour, cfg.Update, log.With("update", "manager"))
updateQueue := NewUpdateProcessingQueue(context.Background(), updateManager, 1, db, inputFactory, provisionerClient,
eventBroker, *cfg, k8sClientProvider, cli, logs)
eventBroker, *cfg, k8sClientProvider, cli, log)
updateQueue.SpeedUp(10000)
updateManager.SpeedUp(10000)

deprovisionManager := process.NewStagedManager(db.Operations(), eventBroker, time.Hour, cfg.Deprovisioning, log.With("deprovisioning", "manager"))
deprovisioningQueue := NewDeprovisioningProcessingQueue(ctx, workersAmount, deprovisionManager, cfg, db, eventBroker,
provisionerClient, edpClient, accountProvider, k8sClientProvider, cli, configProvider, logs,
provisionerClient, edpClient, accountProvider, k8sClientProvider, cli, configProvider, log,
)
deprovisionManager.SpeedUp(10000)

Expand All @@ -249,7 +249,7 @@ func NewBrokerSuiteTestWithConfig(t *testing.T, cfg *Config, version ...string)
}
ts.poller = &broker.TimerPoller{PollInterval: 3 * time.Millisecond, PollTimeout: 3 * time.Second, Log: ts.t.Log}

ts.CreateAPI(inputFactory, cfg, db, provisioningQueue, deprovisioningQueue, updateQueue, logs, k8sClientProvider, gardener.NewFakeClient(), eventBroker)
ts.CreateAPI(inputFactory, cfg, db, provisioningQueue, deprovisioningQueue, updateQueue, logs, log, k8sClientProvider, gardener.NewFakeClient(), eventBroker)

notificationFakeClient := notification.NewFakeClient()
notificationBundleBuilder := notification.NewBundleBuilder(notificationFakeClient, cfg.Notification)
Expand All @@ -261,7 +261,7 @@ func NewBrokerSuiteTestWithConfig(t *testing.T, cfg *Config, version ...string)
Retry: 10 * time.Millisecond,
StatusCheck: 100 * time.Millisecond,
UpgradeClusterTimeout: 3 * time.Second,
}, 250*time.Millisecond, runtimeResolver, notificationBundleBuilder, logs, cli, *cfg, 1000)
}, 250*time.Millisecond, runtimeResolver, notificationBundleBuilder, log, cli, *cfg, 1000)

clusterQueue.SpeedUp(1000)

Expand Down Expand Up @@ -366,7 +366,7 @@ func (s *BrokerSuiteTest) CallAPI(method string, path string, body string) *http
return resp
}

func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Config, db storage.BrokerStorage, provisioningQueue *process.Queue, deprovisionQueue *process.Queue, updateQueue *process.Queue, logs logrus.FieldLogger, skrK8sClientProvider *kubeconfig.FakeProvider, gardenerClient client.Client, eventBroker *event.PubSub) {
func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Config, db storage.BrokerStorage, provisioningQueue *process.Queue, deprovisionQueue *process.Queue, updateQueue *process.Queue, logs logrus.FieldLogger, log *slog.Logger, skrK8sClientProvider *kubeconfig.FakeProvider, gardenerClient client.Client, eventBroker *event.PubSub) {
servicesConfig := map[string]broker.Service{
broker.KymaServiceName: {
Description: "",
Expand Down Expand Up @@ -397,7 +397,7 @@ func (s *BrokerSuiteTest) CreateAPI(inputFactory broker.PlanValidator, cfg *Conf
kcBuilder := &kcMock.KcBuilder{}
kcBuilder.On("Build", nil).Return("--kubeconfig file", nil)
createAPI(s.router, servicesConfig, inputFactory, cfg, db, provisioningQueue, deprovisionQueue, updateQueue,
lager.NewLogger("api"), logs, planDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, fakeKcpK8sClient, eventBroker)
lager.NewLogger("api"), logs, log, planDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, fakeKcpK8sClient, eventBroker)

s.httpServer = httptest.NewServer(s.router)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/broker/deprovisioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"log/slog"
"time"

"github.com/kyma-project/kyma-environment-broker/common/hyperscaler"
Expand All @@ -12,15 +13,14 @@ import (
"github.com/kyma-project/kyma-environment-broker/internal/process/steps"
"github.com/kyma-project/kyma-environment-broker/internal/provisioner"
"github.com/kyma-project/kyma-environment-broker/internal/storage"
"github.com/sirupsen/logrus"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func NewDeprovisioningProcessingQueue(ctx context.Context, workersAmount int, deprovisionManager *process.StagedManager,
cfg *Config, db storage.BrokerStorage, pub event.Publisher,
provisionerClient provisioner.Client,
edpClient deprovisioning.EDPClient, accountProvider hyperscaler.AccountProvider,
k8sClientProvider K8sClientProvider, cli client.Client, configProvider input.ConfigurationProvider, logs logrus.FieldLogger) *process.Queue {
k8sClientProvider K8sClientProvider, cli client.Client, configProvider input.ConfigurationProvider, logs *slog.Logger) *process.Queue {

deprovisioningSteps := []struct {
disabled bool
Expand Down
6 changes: 1 addition & 5 deletions cmd/broker/deprovisioning_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/kyma-project/kyma-environment-broker/internal/provisioner"
"github.com/kyma-project/kyma-environment-broker/internal/storage"
"github.com/pivotal-cf/brokerapi/v8/domain"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -73,9 +72,6 @@ func NewDeprovisioningSuite(t *testing.T) *DeprovisioningSuite {
Level: slog.LevelInfo,
}))

logs := logrus.New()
logs.Formatter.(*logrus.TextFormatter).TimestampFormat = "15:04:05.000"

cfg := fixConfig()
cfg.EDP.Environment = edpEnvironment

Expand Down Expand Up @@ -116,7 +112,7 @@ func NewDeprovisioningSuite(t *testing.T) *DeprovisioningSuite {

deprovisioningQueue := NewDeprovisioningProcessingQueue(ctx, workersAmount, deprovisionManager, cfg, db, eventBroker,
provisionerClient,
edpClient, accountProvider, kubeconfig.NewFakeK8sClientProvider(fakeK8sSKRClient), fakeK8sSKRClient, configProvider, logs,
edpClient, accountProvider, kubeconfig.NewFakeK8sClientProvider(fakeK8sSKRClient), fakeK8sSKRClient, configProvider, log,
)

deprovisioningQueue.SpeedUp(10000)
Expand Down
85 changes: 43 additions & 42 deletions cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func main() {
// create and fill config
var cfg Config
err = envconfig.InitWithPrefix(&cfg, "APP")
fatalOnError(err, logs)
fatalOnError(err, log)

if cfg.LogLevel != "" {
l, _ := logrus.ParseLevel(cfg.LogLevel)
Expand All @@ -262,9 +262,9 @@ func main() {

// create kubernetes client
kcpK8sConfig, err := config.GetConfig()
fatalOnError(err, logs)
fatalOnError(err, log)
kcpK8sClient, err := initClient(kcpK8sConfig)
fatalOnError(err, logs)
fatalOnError(err, log)
skrK8sClientProvider := kubeconfig.NewK8sClientFromSecretProvider(kcpK8sClient)

// create storage
Expand All @@ -274,7 +274,7 @@ func main() {
db = storage.NewMemoryStorage()
} else {
store, conn, err := storage.NewFromConfig(cfg.Database, cfg.Events, cipher)
fatalOnError(err, logs)
fatalOnError(err, log)
db = store
dbStatsCollector := sqlstats.NewStatsCollector("broker", conn)
prometheus.MustRegister(dbStatsCollector)
Expand All @@ -293,27 +293,27 @@ func main() {
kebConfig.NewConfigMapKeysValidator(),
kebConfig.NewConfigMapConverter())
gardenerClusterConfig, err := gardener.NewGardenerClusterConfig(cfg.Gardener.KubeconfigPath)
fatalOnError(err, logs)
fatalOnError(err, log)
cfg.Gardener.DNSProviders, err = gardener.ReadDNSProvidersValuesFromYAML(cfg.SkrDnsProvidersValuesYAMLFilePath)
fatalOnError(err, logs)
fatalOnError(err, log)
dynamicGardener, err := dynamic.NewForConfig(gardenerClusterConfig)
fatalOnError(err, logs)
fatalOnError(err, log)
gardenerClient, err := initClient(gardenerClusterConfig)
fatalOnError(err, logs)
fatalOnError(err, log)

gardenerNamespace := fmt.Sprintf("garden-%v", cfg.Gardener.Project)
gardenerAccountPool := hyperscaler.NewAccountPool(dynamicGardener, gardenerNamespace)
gardenerSharedPool := hyperscaler.NewSharedGardenerAccountPool(dynamicGardener, gardenerNamespace)
accountProvider := hyperscaler.NewAccountProvider(gardenerAccountPool, gardenerSharedPool)

regions, err := provider.ReadPlatformRegionMappingFromFile(cfg.TrialRegionMappingFilePath)
fatalOnError(err, logs)
fatalOnError(err, log)
log.Info(fmt.Sprintf("Platform region mapping for trial: %v", regions))

oidcDefaultValues, err := runtime.ReadOIDCDefaultValuesFromYAML(cfg.SkrOidcDefaultValuesYAMLFilePath)
fatalOnError(err, logs)
fatalOnError(err, log)
inputFactory, err := input.NewInputBuilderFactory(configProvider, cfg.Provisioner, regions, cfg.FreemiumProviders, oidcDefaultValues, cfg.Broker.UseSmallerMachineTypes)
fatalOnError(err, logs)
fatalOnError(err, log)

edpClient := edp.NewClient(cfg.EDP)

Expand All @@ -326,25 +326,25 @@ func main() {
// run queues
provisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Provisioning, log.With("provisioning", "manager"))
provisionQueue := NewProvisioningProcessingQueue(ctx, provisionManager, cfg.Provisioning.WorkersAmount, &cfg, db, provisionerClient, inputFactory,
edpClient, accountProvider, skrK8sClientProvider, kcpK8sClient, oidcDefaultValues, logs)
edpClient, accountProvider, skrK8sClientProvider, kcpK8sClient, oidcDefaultValues, log)

deprovisionManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Deprovisioning, log.With("deprovisioning", "manager"))
deprovisionQueue := NewDeprovisioningProcessingQueue(ctx, cfg.Deprovisioning.WorkersAmount, deprovisionManager, &cfg, db, eventBroker, provisionerClient, edpClient, accountProvider,
skrK8sClientProvider, kcpK8sClient, configProvider, logs)
skrK8sClientProvider, kcpK8sClient, configProvider, log)

updateManager := process.NewStagedManager(db.Operations(), eventBroker, cfg.OperationTimeout, cfg.Update, log.With("update", "manager"))
updateQueue := NewUpdateProcessingQueue(ctx, updateManager, cfg.Update.WorkersAmount, db, inputFactory, provisionerClient, eventBroker,
cfg, skrK8sClientProvider, kcpK8sClient, logs)
cfg, skrK8sClientProvider, kcpK8sClient, log)
/***/
servicesConfig, err := broker.NewServicesConfigFromFile(cfg.CatalogFilePath)
fatalOnError(err, logs)
fatalOnError(err, log)

// create kubeconfig builder
kcBuilder := kubeconfig.NewBuilder(provisionerClient, kcpK8sClient, skrK8sClientProvider)

// create server
router := mux.NewRouter()
createAPI(router, servicesConfig, inputFactory, &cfg, db, provisionQueue, deprovisionQueue, updateQueue, logger, logs,
createAPI(router, servicesConfig, inputFactory, &cfg, db, provisionQueue, deprovisionQueue, updateQueue, logger, logs, log,
inputFactory.GetPlanDefaults, kcBuilder, skrK8sClientProvider, skrK8sClientProvider, gardenerClient, kcpK8sClient, eventBroker)

// create metrics endpoint
Expand All @@ -358,20 +358,20 @@ func main() {
runtimeResolver := orchestrationExt.NewGardenerRuntimeResolver(dynamicGardener, gardenerNamespace, runtimeLister, log)

clusterQueue := NewClusterOrchestrationProcessingQueue(ctx, db, provisionerClient, eventBroker, inputFactory,
nil, time.Minute, runtimeResolver, notificationBuilder, logs, kcpK8sClient, cfg, 1)
nil, time.Minute, runtimeResolver, notificationBuilder, log, kcpK8sClient, cfg, 1)

// TODO: in case of cluster upgrade the same Azure Zones must be send to the Provisioner
orchestrationHandler := orchestrate.NewOrchestrationHandler(db, clusterQueue, cfg.MaxPaginationPage, log)

if !cfg.DisableProcessOperationsInProgress {
err = processOperationsInProgressByType(internal.OperationTypeProvision, db.Operations(), provisionQueue, logs)
fatalOnError(err, logs)
err = processOperationsInProgressByType(internal.OperationTypeDeprovision, db.Operations(), deprovisionQueue, logs)
fatalOnError(err, logs)
err = processOperationsInProgressByType(internal.OperationTypeUpdate, db.Operations(), updateQueue, logs)
fatalOnError(err, logs)
err = reprocessOrchestrations(orchestrationExt.UpgradeClusterOrchestration, db.Orchestrations(), db.Operations(), clusterQueue, logs)
fatalOnError(err, logs)
err = processOperationsInProgressByType(internal.OperationTypeProvision, db.Operations(), provisionQueue, log)
fatalOnError(err, log)
err = processOperationsInProgressByType(internal.OperationTypeDeprovision, db.Operations(), deprovisionQueue, log)
fatalOnError(err, log)
err = processOperationsInProgressByType(internal.OperationTypeUpdate, db.Operations(), updateQueue, log)
fatalOnError(err, log)
err = reprocessOrchestrations(orchestrationExt.UpgradeClusterOrchestration, db.Orchestrations(), db.Operations(), clusterQueue, log)
fatalOnError(err, log)
} else {
log.Info("Skipping processing operation in progress on start")
}
Expand All @@ -381,7 +381,7 @@ func main() {
"domain": cfg.DomainName,
}
err = swagger.NewTemplate("/swagger", swaggerTemplates).Execute()
fatalOnError(err, logs)
fatalOnError(err, log)

// create /orchestration
orchestrationHandler.AttachRoutes(router)
Expand All @@ -403,7 +403,7 @@ func main() {
log.Info(fmt.Sprintf("Call handled: method=%s url=%s statusCode=%d size=%d", params.Request.Method, params.URL.Path, params.StatusCode, params.Size))
})

fatalOnError(http.ListenAndServe(cfg.Host+":"+cfg.Port, svr), logs)
fatalOnError(http.ListenAndServe(cfg.Host+":"+cfg.Port, svr), log)
}

func logConfiguration(logs *slog.Logger, cfg Config) {
Expand All @@ -423,27 +423,27 @@ func logConfiguration(logs *slog.Logger, cfg Config) {
}

func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planValidator broker.PlanValidator, cfg *Config, db storage.BrokerStorage,
provisionQueue, deprovisionQueue, updateQueue *process.Queue, logger lager.Logger, logs logrus.FieldLogger, planDefaults broker.PlanDefaults, kcBuilder kubeconfig.KcBuilder, clientProvider K8sClientProvider, kubeconfigProvider KubeconfigProvider, gardenerClient, kcpK8sClient client.Client, publisher event.Publisher) {
provisionQueue, deprovisionQueue, updateQueue *process.Queue, logger lager.Logger, logs logrus.FieldLogger, log *slog.Logger, planDefaults broker.PlanDefaults, kcBuilder kubeconfig.KcBuilder, clientProvider K8sClientProvider, kubeconfigProvider KubeconfigProvider, gardenerClient, kcpK8sClient client.Client, publisher event.Publisher) {

suspensionCtxHandler := suspension.NewContextUpdateHandler(db.Operations(), provisionQueue, deprovisionQueue, logs)

defaultPlansConfig, err := servicesConfig.DefaultPlansConfig()
fatalOnError(err, logs)
fatalOnError(err, log)

debugSink, err := lager.NewRedactingSink(lager.NewWriterSink(os.Stdout, lager.DEBUG), []string{"instance-details"}, []string{})
fatalOnError(err, logs)
fatalOnError(err, log)
logger.RegisterSink(debugSink)
errorSink, err := lager.NewRedactingSink(lager.NewWriterSink(os.Stderr, lager.ERROR), []string{"instance-details"}, []string{})
fatalOnError(err, logs)
fatalOnError(err, log)
logger.RegisterSink(errorSink)

freemiumGlobalAccountIds, err := whitelist.ReadWhitelistedGlobalAccountIdsFromFile(cfg.FreemiumWhitelistedGlobalAccountsFilePath)
fatalOnError(err, logs)
fatalOnError(err, log)
logs.Infof("Number of globalAccountIds for unlimited freeemium: %d\n", len(freemiumGlobalAccountIds))

// backward compatibility for tests
convergedCloudRegionProvider, err := broker.NewDefaultConvergedCloudRegionsProvider(cfg.SapConvergedCloudRegionMappingsFilePath, &broker.YamlRegionReader{})
fatalOnError(err, logs)
fatalOnError(err, log)
logs.Infof("%s plan region mappings loaded", broker.SapConvergedCloudPlanName)

// create KymaEnvironmentBroker endpoints
Expand Down Expand Up @@ -482,19 +482,19 @@ func createAPI(router *mux.Router, servicesConfig broker.ServicesConfig, planVal
}

// queues all in progress operations by type
func processOperationsInProgressByType(opType internal.OperationType, op storage.Operations, queue *process.Queue, log logrus.FieldLogger) error {
func processOperationsInProgressByType(opType internal.OperationType, op storage.Operations, queue *process.Queue, log *slog.Logger) error {
operations, err := op.GetNotFinishedOperationsByType(opType)
if err != nil {
return fmt.Errorf("while getting in progress operations from storage: %w", err)
}
for _, operation := range operations {
queue.Add(operation.ID)
log.Infof("Resuming the processing of %s operation ID: %s", opType, operation.ID)
log.Info(fmt.Sprintf("Resuming the processing of %s operation ID: %s", opType, operation.ID))
}
return nil
}

func reprocessOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log logrus.FieldLogger) error {
func reprocessOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log *slog.Logger) error {
if err := processCancelingOrchestrations(orchestrationType, orchestrationsStorage, operationsStorage, queue, log); err != nil {
return fmt.Errorf("while processing canceled %s orchestrations: %w", orchestrationType, err)
}
Expand All @@ -510,7 +510,7 @@ func reprocessOrchestrations(orchestrationType orchestrationExt.Type, orchestrat
return nil
}

func processOrchestration(orchestrationType orchestrationExt.Type, state string, orchestrationsStorage storage.Orchestrations, queue *process.Queue, log logrus.FieldLogger) error {
func processOrchestration(orchestrationType orchestrationExt.Type, state string, orchestrationsStorage storage.Orchestrations, queue *process.Queue, log *slog.Logger) error {
filter := dbmodel.OrchestrationFilter{
Types: []string{string(orchestrationType)},
States: []string{state},
Expand All @@ -525,14 +525,14 @@ func processOrchestration(orchestrationType orchestrationExt.Type, state string,

for _, o := range orchestrations {
queue.Add(o.OrchestrationID)
log.Infof("Resuming the processing of %s %s orchestration ID: %s", state, orchestrationType, o.OrchestrationID)
log.Info(fmt.Sprintf("Resuming the processing of %s %s orchestration ID: %s", state, orchestrationType, o.OrchestrationID))
}
return nil
}

// processCancelingOrchestrations reprocess orchestrations with canceling state only when some in progress operations exists
// reprocess only one orchestration to not clog up the orchestration queue on start
func processCancelingOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log logrus.FieldLogger) error {
func processCancelingOrchestrations(orchestrationType orchestrationExt.Type, orchestrationsStorage storage.Orchestrations, operationsStorage storage.Operations, queue *process.Queue, log *slog.Logger) error {
filter := dbmodel.OrchestrationFilter{
Types: []string{string(orchestrationType)},
States: []string{orchestrationExt.Canceling},
Expand All @@ -556,7 +556,7 @@ func processCancelingOrchestrations(orchestrationType orchestrationExt.Type, orc
}

if count > 0 {
log.Infof("Resuming the processing of %s %s orchestration ID: %s", orchestrationExt.Canceling, orchestrationType, o.OrchestrationID)
log.Info(fmt.Sprintf("Resuming the processing of %s %s orchestration ID: %s", orchestrationExt.Canceling, orchestrationType, o.OrchestrationID))
queue.Add(o.OrchestrationID)
return nil
}
Expand Down Expand Up @@ -585,9 +585,10 @@ func initClient(cfg *rest.Config) (client.Client, error) {
return cli, nil
}

func fatalOnError(err error, log logrus.FieldLogger) {
func fatalOnError(err error, log *slog.Logger) {
if err != nil {
log.Fatal(err)
log.Error(err.Error())
os.Exit(1)
}
}

Expand Down
Loading

0 comments on commit 0e9e359

Please sign in to comment.