From d0a8f7b0df0871aca80b6908e48c83ede0c16795 Mon Sep 17 00:00:00 2001 From: Saman Hosseini Date: Wed, 9 Nov 2022 17:00:48 +0100 Subject: [PATCH] fix(goes/esgallery): fix auto-save concurrency error --- goes/esgallery/processor.go | 57 ++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/goes/esgallery/processor.go b/goes/esgallery/processor.go index 4341639..ebfe9a2 100644 --- a/goes/esgallery/processor.go +++ b/goes/esgallery/processor.go @@ -25,6 +25,8 @@ import ( // ProcessedTag is added to [gallery.Stack]s that were processed by a [*PostProcessor]. const ProcessedTag = "processed" +var autoSaveMaxTries = 5 + // Processor post-processes [gallery.Stack]s and uploads the processed images // to (cloud) storage. type Processor[StackID, ImageID ID] struct { @@ -537,28 +539,49 @@ func (q *processorQueue[Gallery, StackID, ImageID]) shouldProcess(evt event.Even } func (q *processorQueue[Gallery, StackID, ImageID]) apply(result *ProcessorResult[StackID, ImageID], galleryID uuid.UUID) error { - q.cfg.debugLog("fetching gallery to apply result to ... [id=%s]", galleryID) + // The following is true only if the [WithAutoApply] option is provided with + // the `autoSave` parameter set to `true`: + // + // Between the time of fetching and saving the gallery, the gallery might + // have raised other aggregate events. To avoid optimistic concurrency + // issues, we re-try the whole process a few times. + var tries int + for { + tries++ + + q.cfg.debugLog("fetching gallery to apply result to ... [id=%s]", galleryID) + g, err := q.processor.fetchGallery(q.ctx, galleryID) + if err != nil { + return fmt.Errorf("fetch gallery: %w", err) + } - g, err := q.processor.fetchGallery(q.ctx, galleryID) - if err != nil { - return fmt.Errorf("fetch gallery: %w", err) - } + q.cfg.debugLog("applying processing result ... [galleryId=%s, stackId=%s]", galleryID, result.StackID) + if err := result.Apply(g); err != nil { + return err + } + result.Applied = true - q.cfg.debugLog("applying processing result ... [galleryId=%s, stackId=%s]", galleryID, result.StackID) - if err := result.Apply(g); err != nil { - return err - } - result.Applied = true + if q.processor.autoSave != nil { + q.cfg.debugLog("auto-saving gallery ... [id=%s]", galleryID) + if err := q.processor.autoSave(q.ctx, g); err != nil { + if aggregate.IsConsistencyError(err) { + q.cfg.debugLog("optimistic concurrency error while auto-saving: %v", err) + + if tries >= autoSaveMaxTries { + return fmt.Errorf("optimistic concurrency error while auto-saving: %w", err) + } - if q.processor.autoSave != nil { - q.cfg.debugLog("auto-saving gallery ... [id=%s]", galleryID) - if err := q.processor.autoSave(q.ctx, g); err != nil { - return fmt.Errorf("auto-save gallery: %w", err) + q.cfg.debugLog("retrying auto-save because of optimistic concurrency error ...") + continue + } + + return fmt.Errorf("auto-save gallery: %w", err) + } + result.Saved = true } - result.Saved = true - } - return nil + return nil + } } func (q *processorQueue[Gallery, StackID, ImageID]) fail(err error) {