From 3f890ddf9cb315b7d8f50d1b528e7c7b168814c5 Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Fri, 16 Aug 2024 15:24:16 +0200 Subject: [PATCH 1/5] RHINENG-9452: add fatal error which should restart pod --- base/errors.go | 27 ++++++++++++++++++++++++++- base/mqueue/mqueue.go | 8 ++++++-- manager/middlewares/authentication.go | 4 ++++ 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/base/errors.go b/base/errors.go index 8cb6f1430..014a3b3cd 100644 --- a/base/errors.go +++ b/base/errors.go @@ -1,11 +1,36 @@ package base import ( - "errors" + stdErrors "errors" + + "github.com/pkg/errors" ) var ( ErrDatabase = errors.New("database error") + ErrKafka = errors.New("kafka error") ErrBadRequest = errors.New("bad request") ErrNotFound = errors.New("not found") + ErrFatal = errors.New("fatal error restarting pod") ) + +func WrapFatalError(err error, message string) error { + return wrapErrors(err, message) +} + +func WrapFatalDBError(err error, message string) error { + return wrapErrors(err, message, ErrFatal, ErrDatabase) +} + +func WrapFatalKafkaError(err error, message string) error { + return wrapErrors(err, message, ErrFatal, ErrKafka) +} + +func wrapErrors(err error, message string, errs ...error) error { + if err == nil { + return nil + } + errsJoined := stdErrors.Join(errs...) + err = stdErrors.Join(errsJoined, err) + return errors.Wrap(err, message) +} diff --git a/base/mqueue/mqueue.go b/base/mqueue/mqueue.go index 870d20fca..63d79450d 100644 --- a/base/mqueue/mqueue.go +++ b/base/mqueue/mqueue.go @@ -5,6 +5,7 @@ import ( "app/base" "app/base/utils" "context" + "errors" format "fmt" "io" "strings" @@ -55,13 +56,16 @@ func MakeRetryingHandler(handler MessageHandler) MessageHandler { backoffState, cancel := policy.Start(base.Context) defer cancel() for backoff.Continue(backoffState) { - if err = handler(message); err == nil { + if err = handler(message); err == nil || !errors.Is(err, base.ErrFatal) { return nil } utils.LogError("err", err.Error(), "attempt", attempt, "Try failed") attempt++ } - return err + if err != nil && errors.Is(err, base.ErrFatal) { + return err + } + return nil } } diff --git a/manager/middlewares/authentication.go b/manager/middlewares/authentication.go index f68feb5bc..62a9084fe 100644 --- a/manager/middlewares/authentication.go +++ b/manager/middlewares/authentication.go @@ -1,6 +1,7 @@ package middlewares import ( + "app/base" "app/base/database" "app/base/models" "app/base/utils" @@ -10,6 +11,8 @@ import ( "strings" "sync" + stdErrors "errors" + "github.com/pkg/errors" "github.com/redhatinsights/identity" @@ -50,6 +53,7 @@ func GetOrCreateAccount(orgID string) (int, error) { err = database.OnConflictUpdate(database.DB, "org_id", "org_id").Select("org_id").Create(&rhAccount).Error if err != nil { utils.LogWarn("err", err, "org_id", *rhAccount.OrgID, "Error creating account") + err = stdErrors.Join(base.ErrFatal, base.ErrDatabase, err) } return rhAccount.ID, err } From e5a3f8061f4600c51919c1183b27e5b3fc0be709 Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Fri, 16 Aug 2024 15:25:16 +0200 Subject: [PATCH 2/5] RHINENG-9452: refactor listener error handling --- listener/upload.go | 168 ++++++++++++++++++++++++++------------------- 1 file changed, 98 insertions(+), 70 deletions(-) diff --git a/listener/upload.go b/listener/upload.go index 47427ef17..b56647e84 100644 --- a/listener/upload.go +++ b/listener/upload.go @@ -22,26 +22,27 @@ import ( "sync" "time" + stdErrors "errors" + "github.com/pkg/errors" "gorm.io/gorm" "gorm.io/gorm/clause" ) const ( - WarnSkippingNoPackages = "skipping profile with no packages" - WarnSkippingReporter = "skipping excluded reporter" - WarnSkippingHostType = "skipping excluded host type" - WarnSkippingBadPackages = "skipping profile with malformed packages" WarnPayloadTracker = "unable to send message to payload tracker" ErrorNoAccountProvided = "no account provided in host message" ErrorKafkaSend = "unable to send evaluation message" ErrorProcessUpload = "unable to process upload" UploadSuccessNoEval = "upload event handled successfully, no eval required" UploadSuccess = "upload event handled successfully" + DeleteSuccess = "delete event handled successfully" FlushedFullBuffer = "flushing full eval event buffer" FlushedTimeoutBuffer = "flushing eval event buffer after timeout" ErrorUnmarshalMetadata = "unable to unmarshall platform metadata value" ErrorStatus = "error" + ProcessingStatus = "processing" + ReceivedStatus = "received" SuccessStatus = "success" RhuiPathPart = "/rhui/" RepoPathPattern = "(/content/.*)" @@ -49,10 +50,28 @@ const ( RepoReleaseverPlaceholder = "$releasever" ) +var ( + ErrNoPackages = errors.New("skipping profile with no packages") + ErrReporter = errors.New("skipping excluded reporter") + ErrHostType = errors.New("skipping excluded host type") + ErrBadPackages = errors.New("skipping profile with malformed packages") + ErrNoAccountProvided = errors.New("no account provided in host message") + ErrKafkaSend = errors.New("unable to send evaluation message") + ErrProcessUpload = errors.New("unable to process upload") +) + var ( repoPathRegex = regexp.MustCompile(RepoPathPattern) spacesRegex = regexp.MustCompile(`^\s*$`) httpClient *api.Client + metricByErr = map[error]string{ + ErrNoPackages: ReceivedWarnNoPackages, + ErrReporter: ReceivedWarnExcludedReporter, + ErrHostType: ReceivedWarnExcludedHostType, + ErrBadPackages: ReceivedWarnBadPackages, + ErrNoAccountProvided: ReceivedErrorIdentity, + ErrProcessUpload: ReceivedErrorProcessing, + } ) type Host struct { @@ -64,6 +83,7 @@ type Host struct { CulledTimestamp *types.Rfc3339Timestamp `json:"culled_timestamp,omitempty"` Reporter string `json:"reporter,omitempty"` SystemProfile inventory.SystemProfile `json:"system_profile,omitempty"` + ParsedYumUpdates *YumUpdates `json:"-"` } type HostMetadata struct { @@ -106,7 +126,6 @@ func (y *YumUpdates) GetBuiltPkgcache() bool { return y.BuiltPkgcache } -//nolint:funlen func HandleUpload(event HostEvent) error { tStart := time.Now() defer utils.ObserveSecondsSince(tStart, messageHandlingDuration.WithLabelValues(EventUpload)) @@ -117,99 +136,107 @@ func HandleUpload(event HostEvent) error { } updateReporterCounter(event.Host.Reporter) - payloadTrackerEvent := mqueue.PayloadTrackerEvent{ + ptEvent := mqueue.PayloadTrackerEvent{ OrgID: event.Host.OrgID, RequestID: &event.Metadata.RequestID, InventoryID: event.Host.ID, - Status: "received", + Status: ReceivedStatus, } - if _, ok := allowedReporters[event.Host.Reporter]; !ok { - utils.LogWarn("inventoryID", event.Host.ID, "reporter", event.Host.Reporter, WarnSkippingReporter) - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedWarnExcludedReporter).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("message-skip")) - sendPayloadStatus(ptWriter, payloadTrackerEvent, "", WarnSkippingReporter) - return nil + if err := validateHost(&event.Host); err != nil { + return handleListenerErrors(err, &event, &ptEvent, tStart, ReceivedStatus) } - if _, ok := excludedHostTypes[event.Host.SystemProfile.HostType]; ok { - utils.LogWarn("inventoryID", event.Host.ID, "hostType", event.Host.SystemProfile.HostType, WarnSkippingHostType) - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedWarnExcludedHostType).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("message-skip")) - sendPayloadStatus(ptWriter, payloadTrackerEvent, "", WarnSkippingHostType) - return nil - } - - if event.Host.OrgID == nil || *event.Host.OrgID == "" { - utils.LogError("inventoryID", event.Host.ID, ErrorNoAccountProvided) - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedErrorIdentity).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("message-skip")) - sendPayloadStatus(ptWriter, payloadTrackerEvent, "", ErrorNoAccountProvided) - return nil - } - - installedPackages := event.Host.SystemProfile.GetInstalledPackages() - if len(installedPackages) > 0 { - // parse first package from list and skip upload if pkg is malformed, e.g. without epoch - _, err := utils.ParseNevra(installedPackages[0]) - if err != nil { - utils.LogError("inventoryID", event.Host.ID, "err", err.Error(), WarnSkippingBadPackages) - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedWarnBadPackages).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues("message-skip")) - sendPayloadStatus(ptWriter, payloadTrackerEvent, ErrorStatus, WarnSkippingBadPackages) - return nil - } - } - - sendPayloadStatus(ptWriter, payloadTrackerEvent, "", "") + sendPayloadStatus(ptWriter, ptEvent, "", "Received by listener") yumUpdates, err := getYumUpdates(event, httpClient) if err != nil { // don't fail, use vmaas evaluation utils.LogError("err", err, "Could not get yum updates") } - utils.LogTrace("inventoryID", event.Host.ID, "yum_updates", string(yumUpdates.GetRawParsed())) - - if len(installedPackages) == 0 && yumUpdates == nil { - utils.LogWarn("inventoryID", event.Host.ID, WarnSkippingNoPackages) - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedWarnNoPackages).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues(ReceivedWarnNoPackages)) - sendPayloadStatus(ptWriter, payloadTrackerEvent, ErrorStatus, WarnSkippingNoPackages) - return nil - } sys, err := processUpload(&event.Host, yumUpdates) - if err != nil { - utils.LogError("inventoryID", event.Host.ID, "err", err.Error(), ErrorProcessUpload) - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedErrorProcessing).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues(ReceivedErrorProcessing)) - sendPayloadStatus(ptWriter, payloadTrackerEvent, ErrorStatus, ErrorProcessUpload) - return errors.Wrap(err, "Could not process upload") + return handleListenerErrors(stdErrors.Join(ErrProcessUpload, err), &event, &ptEvent, tStart, ErrorStatus) } - // Deleted system, return nil if sys == nil { - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedDeleted).Inc() - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues(ReceivedDeleted)) - sendPayloadStatus(ptWriter, payloadTrackerEvent, SuccessStatus, ReceivedDeleted) + logAndObserve(DeleteSuccess, ReceivedDeleted, &event, &ptEvent, tStart, SuccessStatus, true) return nil } if sys.UnchangedSince != nil && sys.LastEvaluation != nil { if sys.UnchangedSince.Before(*sys.LastEvaluation) { - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedSuccessNoEval).Inc() - utils.LogInfo("inventoryID", event.Host.ID, UploadSuccessNoEval) - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues(ReceivedSuccessNoEval)) - sendPayloadStatus(ptWriter, payloadTrackerEvent, SuccessStatus, ReceivedSuccessNoEval) + logAndObserve(UploadSuccessNoEval, ReceivedSuccessNoEval, &event, &ptEvent, tStart, SuccessStatus, true) return nil } } - bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &payloadTrackerEvent) + ptEvent.StatusMsg = ProcessingStatus + bufferEvalEvents(sys.InventoryID, sys.RhAccountID, &ptEvent) + logAndObserve(UploadSuccess, ReceivedSuccess, &event, &ptEvent, tStart, SuccessStatus, false) + return nil +} - eventMsgsReceivedCnt.WithLabelValues(EventUpload, ReceivedSuccess).Inc() - utils.LogInfo("inventoryID", event.Host.ID, UploadSuccess) - utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues(ReceivedSuccess)) +func handleListenerErrors(err error, event *HostEvent, ptEvent *mqueue.PayloadTrackerEvent, + tStart time.Time, status string) error { + if err != nil { + if errors.Is(err, base.ErrFatal) { + // fatal error which should restart the pod + utils.LogError("inventoryID", event.Host.ID, "err", err.Error()) + } else { + utils.LogWarn("inventoryID", event.Host.ID, "reporter", event.Host.Reporter, + "hostType", event.Host.SystemProfile.HostType, "err", err.Error()) + } + } + metric, ok := metricByErr[err] + if !ok { + metric = "internal-error" + } + logAndObserve(err.Error(), metric, event, ptEvent, tStart, status, true) + + return err +} + +func logAndObserve(msg, metric string, event *HostEvent, ptEvent *mqueue.PayloadTrackerEvent, + tStart time.Time, status string, withPayloadTracker bool) { + eventMsgsReceivedCnt.WithLabelValues(EventUpload, metric).Inc() + utils.LogInfo("inventoryID", event.Host.ID, msg) + utils.ObserveSecondsSince(tStart, messagePartDuration.WithLabelValues(metric)) + if withPayloadTracker { + sendPayloadStatus(ptWriter, *ptEvent, status, metric) + } +} + +func validateHost(host *Host) error { + if _, ok := allowedReporters[host.Reporter]; !ok { + return ErrReporter + } + if host.OrgID == nil || *host.OrgID == "" { + return ErrNoAccountProvided + } + + if _, ok := excludedHostTypes[host.SystemProfile.HostType]; ok { + return ErrHostType + } + + installedPackages := host.SystemProfile.GetInstalledPackages() + if len(installedPackages) == 0 { + return ErrNoPackages + } + if err := checkPackagesEpoch(installedPackages); err != nil { + err = stdErrors.Join(ErrBadPackages, err) + return err + } + return nil +} + +func checkPackagesEpoch(packages []string) error { + if len(packages) > 0 { + // parse first package from list and skip upload if pkg is malformed, e.g. without epoch + if _, err := utils.ParseNevra(packages[0]); err != nil { + return err + } + } return nil } @@ -741,6 +768,7 @@ func getYumUpdates(event HostEvent, client *api.Client) (*YumUpdates, error) { res.RawParsed = yumUpdates res.BuiltPkgcache = parsed.GetBuildPkgcache() + utils.LogTrace("inventoryID", event.Host.ID, "yum_updates", string(res.GetRawParsed())) return res, nil } From dc9e51c92769bed59f9733fe1b00399f9d05155c Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Fri, 16 Aug 2024 15:30:13 +0200 Subject: [PATCH 3/5] RHINENG-9452: update listener tests --- listener/upload_test.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/listener/upload_test.go b/listener/upload_test.go index c23ab9647..83bcfbc48 100644 --- a/listener/upload_test.go +++ b/listener/upload_test.go @@ -133,8 +133,10 @@ func TestUploadHandlerWarn(t *testing.T) { log.AddHook(logHook) noPkgsEvent := createTestUploadEvent("1", id, "puptoo", false, false) err := HandleUpload(noPkgsEvent) - assert.NoError(t, err) - assertInLogs(t, WarnSkippingNoPackages, logHook.LogEntries...) + if assert.Error(t, err) { + assert.ErrorIs(t, err, ErrNoPackages) + } + assertInLogs(t, ErrNoPackages.Error(), logHook.LogEntries...) } func TestUploadHandlerWarnSkipReporter(t *testing.T) { @@ -144,8 +146,10 @@ func TestUploadHandlerWarnSkipReporter(t *testing.T) { log.AddHook(logHook) noPkgsEvent := createTestUploadEvent("1", id, "yupana", false, false) err := HandleUpload(noPkgsEvent) - assert.NoError(t, err) - assertInLogs(t, WarnSkippingReporter, logHook.LogEntries...) + if assert.Error(t, err) { + assert.ErrorIs(t, err, ErrReporter) + } + assertInLogs(t, ErrReporter.Error(), logHook.LogEntries...) } func TestUploadHandlerWarnSkipHostType(t *testing.T) { @@ -156,8 +160,10 @@ func TestUploadHandlerWarnSkipHostType(t *testing.T) { event := createTestUploadEvent("1", id, "puptoo", true, false) event.Host.SystemProfile.HostType = "edge" err := HandleUpload(event) - assert.NoError(t, err) - assertInLogs(t, WarnSkippingHostType, logHook.LogEntries...) + if assert.Error(t, err) { + assert.ErrorIs(t, err, ErrHostType) + } + assertInLogs(t, ErrHostType.Error(), logHook.LogEntries...) } // error when parsing identity @@ -169,8 +175,10 @@ func TestUploadHandlerError1(t *testing.T) { event := createTestUploadEvent("1", id, "puptoo", true, false) *event.Host.OrgID = "" err := HandleUpload(event) - assert.NoError(t, err) - assertInLogs(t, ErrorNoAccountProvided, logHook.LogEntries...) + if assert.Error(t, err) { + assert.ErrorIs(t, err, ErrNoAccountProvided) + } + assertInLogs(t, ErrNoAccountProvided.Error(), logHook.LogEntries...) } type erroringWriter struct{} From 155b6e6c964ec143610ef27ee64b7205552ea109 Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Fri, 16 Aug 2024 16:16:33 +0200 Subject: [PATCH 4/5] RHINENG-9452: add fatal errors for listener --- listener/upload.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/listener/upload.go b/listener/upload.go index b56647e84..6d50a9e9f 100644 --- a/listener/upload.go +++ b/listener/upload.go @@ -449,12 +449,12 @@ func storeOrUpdateSysPlatform(tx *gorm.DB, system *models.SystemPlatform, colsTo if system.ID != 0 { // update system err := tx.Select(colsToUpdate).Updates(system).Error - return errors.Wrap(err, "unable to update system_platform") + return base.WrapFatalDBError(err, "unable to update system_platform") } // insert system err := database.OnConflictUpdateMulti(tx, []string{"rh_account_id", "inventory_id"}, colsToUpdate...). Save(system).Error - return errors.Wrap(err, "unable to insert to system_platform") + return base.WrapFatalDBError(err, "unable to insert to system_platform") } func getReporterID(reporter string) *int { @@ -506,7 +506,10 @@ func ensureReposInDB(tx *gorm.DB, repos []string) (repoIDs []int64, added int64, var existingRepos models.RepoSlice err = tx.Model(&models.Repo{}).Where("name IN (?)", repos).Find(&existingRepos).Error if err != nil { - return nil, 0, errors.Wrap(err, "unable to load repos") + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, 0, errors.Wrapf(err, "couldn't find repos: %s", repos) + } + return nil, 0, base.WrapFatalDBError(err, "unable to load repos") } inDBIDs := make(map[string]int64) @@ -529,7 +532,7 @@ func ensureReposInDB(tx *gorm.DB, repos []string) (repoIDs []int64, added int64, }) err = txOnConflict.Create(&toStore).Error if err != nil { - return nil, 0, errors.Wrap(err, "unable to update repos") + return nil, 0, base.WrapFatalDBError(err, "unable to update repos") } added = txOnConflict.RowsAffected for _, repo := range toStore { @@ -553,13 +556,13 @@ func updateSystemRepos(tx *gorm.DB, rhAccountID int, systemID int64, repoIDs []i }) err = database.BulkInsert(txOnConflict, repoSystemObjs) if err != nil { - return 0, 0, errors.Wrap(err, "unable to update system repos") + return 0, 0, base.WrapFatalDBError(err, "unable to update system repos") } nAdded = txOnConflict.RowsAffected nDeleted, err = deleteOtherSystemRepos(tx, rhAccountID, systemID, repoIDs) if err != nil { - return nAdded, 0, errors.Wrap(err, "unable to delete out-of-date system repos") + return nAdded, 0, base.WrapFatalDBError(err, "unable to delete out-of-date system repos") } return nAdded, nDeleted, nil @@ -702,7 +705,7 @@ func processUpload(host *Host, yumUpdates *YumUpdates) (*models.SystemPlatform, var deleted models.DeletedSystem if err := tx.Find(&deleted, "inventory_id = ?", host.ID).Error; err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { - return nil, errors.Wrap(err, "Checking deleted systems") + return nil, base.WrapFatalDBError(err, "checking deleted systems") } // If the system was deleted in last hour, don't register this upload @@ -716,7 +719,7 @@ func processUpload(host *Host, yumUpdates *YumUpdates) (*models.SystemPlatform, } err = tx.Commit().Error if err != nil { - return nil, errors.Wrap(err, "Committing changes") + return nil, base.WrapFatalDBError(err, "committing changes") } return sys, nil } From bbb08c0694c3aab0322f679ebcb964b5f661f088 Mon Sep 17 00:00:00 2001 From: Patrik Segedy Date: Fri, 16 Aug 2024 17:50:38 +0200 Subject: [PATCH 5/5] RHINENG-9452: add fatal errors for evaluator --- evaluator/evaluate.go | 6 +++--- evaluator/evaluate_baseline.go | 3 ++- evaluator/remediations.go | 5 +++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/evaluator/evaluate.go b/evaluator/evaluate.go index bcd0079b5..8af76c84c 100644 --- a/evaluator/evaluate.go +++ b/evaluator/evaluate.go @@ -214,7 +214,7 @@ func evaluateInDatabase(ctx context.Context, event *mqueue.PlatformEvent, invent thirdParty, err := analyzeRepos(system) if err != nil { - return nil, nil, errors.Wrap(err, "Repo analysis failed") + return nil, nil, errors.Wrap(err, "repo analysis failed") } system.ThirdParty = thirdParty // to set "system_platform.third_party" column @@ -414,7 +414,7 @@ func tryGetSystem(accountID int, inventoryID string, system, err := loadSystemData(accountID, inventoryID) if err != nil { evaluationCnt.WithLabelValues("error-db-read-inventory-data").Inc() - return nil, errors.Wrap(err, "error loading system from DB") + return nil, base.WrapFatalDBError(err, "error loading system from DB") } if system.ID == 0 { evaluationCnt.WithLabelValues("error-db-read-inventory-data").Inc() @@ -496,7 +496,7 @@ func evaluateAndStore(system *models.SystemPlatform, err = commitWithObserve(tx) if err != nil { evaluationCnt.WithLabelValues("error-database-commit").Inc() - return errors.New("database commit failed") + return base.WrapFatalDBError(err, "database commit failed") } return nil diff --git a/evaluator/evaluate_baseline.go b/evaluator/evaluate_baseline.go index 193d9cee1..91fc1dd1e 100644 --- a/evaluator/evaluate_baseline.go +++ b/evaluator/evaluate_baseline.go @@ -1,6 +1,7 @@ package evaluator import ( + "app/base" "app/base/database" "app/base/models" "app/base/vmaas" @@ -24,7 +25,7 @@ func limitVmaasToBaseline(system *models.SystemPlatform, vmaasData *vmaas.Update Where("public_date >= ?", baselineConfig.ToTime.Truncate(24*time.Hour)). Pluck("name", &filterOutNames).Error if err != nil { - return err + return base.WrapFatalDBError(err, "load reported advisories") } // create map of advisories we need to filter out diff --git a/evaluator/remediations.go b/evaluator/remediations.go index a73f1d21a..a170fa216 100644 --- a/evaluator/remediations.go +++ b/evaluator/remediations.go @@ -82,7 +82,8 @@ func publishRemediationsState(system *models.SystemPlatform, response *vmaas.Upd state := createRemediationsStateMsg(system.InventoryID, response) msg, err := mqueue.MessageFromJSON(system.InventoryID, state) if err != nil { - return errors.Wrap(err, "Formatting message") + return errors.Wrap(err, "formatting message") } - return remediationsPublisher.WriteMessages(base.Context, msg) + err = remediationsPublisher.WriteMessages(base.Context, msg) + return base.WrapFatalKafkaError(err, "write message") }