Skip to content

Commit

Permalink
fix(goes/esgallery): fix auto-save concurrency error
Browse files Browse the repository at this point in the history
  • Loading branch information
bounoable committed Nov 9, 2022
1 parent b1092c8 commit d0a8f7b
Showing 1 changed file with 40 additions and 17 deletions.
57 changes: 40 additions & 17 deletions goes/esgallery/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
// ProcessedTag is added to [gallery.Stack]s that were processed by a [*PostProcessor].
const ProcessedTag = "processed"

var autoSaveMaxTries = 5

// Processor post-processes [gallery.Stack]s and uploads the processed images
// to (cloud) storage.
type Processor[StackID, ImageID ID] struct {
Expand Down Expand Up @@ -537,28 +539,49 @@ func (q *processorQueue[Gallery, StackID, ImageID]) shouldProcess(evt event.Even
}

func (q *processorQueue[Gallery, StackID, ImageID]) apply(result *ProcessorResult[StackID, ImageID], galleryID uuid.UUID) error {
q.cfg.debugLog("fetching gallery to apply result to ... [id=%s]", galleryID)
// The following is true only if the [WithAutoApply] option is provided with
// the `autoSave` parameter set to `true`:
//
// Between the time of fetching and saving the gallery, the gallery might
// have raised other aggregate events. To avoid optimistic concurrency
// issues, we re-try the whole process a few times.
var tries int
for {
tries++

q.cfg.debugLog("fetching gallery to apply result to ... [id=%s]", galleryID)
g, err := q.processor.fetchGallery(q.ctx, galleryID)
if err != nil {
return fmt.Errorf("fetch gallery: %w", err)
}

g, err := q.processor.fetchGallery(q.ctx, galleryID)
if err != nil {
return fmt.Errorf("fetch gallery: %w", err)
}
q.cfg.debugLog("applying processing result ... [galleryId=%s, stackId=%s]", galleryID, result.StackID)
if err := result.Apply(g); err != nil {
return err
}
result.Applied = true

q.cfg.debugLog("applying processing result ... [galleryId=%s, stackId=%s]", galleryID, result.StackID)
if err := result.Apply(g); err != nil {
return err
}
result.Applied = true
if q.processor.autoSave != nil {
q.cfg.debugLog("auto-saving gallery ... [id=%s]", galleryID)
if err := q.processor.autoSave(q.ctx, g); err != nil {
if aggregate.IsConsistencyError(err) {
q.cfg.debugLog("optimistic concurrency error while auto-saving: %v", err)

if tries >= autoSaveMaxTries {
return fmt.Errorf("optimistic concurrency error while auto-saving: %w", err)
}

if q.processor.autoSave != nil {
q.cfg.debugLog("auto-saving gallery ... [id=%s]", galleryID)
if err := q.processor.autoSave(q.ctx, g); err != nil {
return fmt.Errorf("auto-save gallery: %w", err)
q.cfg.debugLog("retrying auto-save because of optimistic concurrency error ...")
continue
}

return fmt.Errorf("auto-save gallery: %w", err)
}
result.Saved = true
}
result.Saved = true
}

return nil
return nil
}
}

func (q *processorQueue[Gallery, StackID, ImageID]) fail(err error) {
Expand Down

0 comments on commit d0a8f7b

Please sign in to comment.