From a566f89ac8e5c5c0ebde054481afd725784288ee Mon Sep 17 00:00:00 2001 From: Saman Hosseini Date: Sat, 17 Sep 2022 15:02:27 +0200 Subject: [PATCH] feat(goes/esgallery): add `Debug` option --- goes/esgallery/processor.go | 58 ++++++++++++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/goes/esgallery/processor.go b/goes/esgallery/processor.go index a4e12b5..458e341 100644 --- a/goes/esgallery/processor.go +++ b/goes/esgallery/processor.go @@ -6,6 +6,7 @@ import ( "fmt" stdimage "image" "io" + "log" "net/http" "strings" "sync" @@ -343,6 +344,12 @@ func NewPostProcessor[ // RunProcessorOption is an option for [*PostProcessor.Run]. type RunProcessorOption func(*runProcessorConfig) +type runProcessorConfig struct { + workers int + discardResults bool + debug bool +} + // Workers returns a [RunProcessorOption] that sets the number of workers for // [PostProcessor.Run]. Defaults to 1. func Workers(workers int) RunProcessorOption { @@ -363,9 +370,11 @@ func DiscardResults(discard bool) RunProcessorOption { } } -type runProcessorConfig struct { - workers int - discardResults bool +// Debug returns a [RunProcessorOption] that enables debug logs. +func Debug(debug bool) RunProcessorOption { + return func(cfg *runProcessorConfig) { + cfg.debug = debug + } } // Run runs the post-processor in the background and returns a channel of @@ -382,6 +391,13 @@ func (pp *PostProcessor[Gallery, StackID, ImageID]) Run(ctx context.Context, pip opt(&cfg) } + cfg.debugLog( + "starting post-processor with options:\n\tWorkers:\t%d\n\tDiscard results:\t%v", + cfg.workers, cfg.discardResults, + ) + + cfg.debugLog("subscribing to %v events ...", ProcessorTriggerEvents) + events, errs, err := pp.bus.Subscribe(ctx, ProcessorTriggerEvents...) if err != nil { return nil, nil, fmt.Errorf("subscribe to %v events: %w", ProcessorTriggerEvents, err) @@ -440,6 +456,8 @@ func (q *processorQueue[Gallery, StackID, ImageID]) work() { shouldPush = true ) + q.cfg.debugLog("handling %q event ...", evt.Name()) + switch evt.Name() { case StackAdded: result, err = q.stackAdded(event.Cast[gallery.Stack[StackID, ImageID]](evt)) @@ -454,31 +472,43 @@ func (q *processorQueue[Gallery, StackID, ImageID]) work() { result.Trigger = evt + galleryID := pick.AggregateID(evt) + if q.processor.autoApply { - if err := q.apply(&result, pick.AggregateID(evt)); err != nil { + if err := q.apply(&result, galleryID); err != nil { q.fail(fmt.Errorf("apply result: %w", err)) } } - if !q.cfg.discardResults && shouldPush { + if q.cfg.discardResults { + q.cfg.debugLog("discarding processing result [galleryId=%s, stackId=%s]", galleryID, result.StackID) + continue + } + + if shouldPush { q.push(result) } } } func (q *processorQueue[Gallery, StackID, ImageID]) apply(result *ProcessorResult[StackID, ImageID], galleryID uuid.UUID) error { + q.cfg.debugLog("fetching gallery to apply result to ... [id=%s]", galleryID) + g, err := q.processor.fetchGallery(q.ctx, galleryID) if err != nil { return fmt.Errorf("fetch gallery: %w", err) } + + q.cfg.debugLog("applying processing result ... [galleryId=%s, stackId=%s]", galleryID, result.StackID) if err := result.Apply(g); err != nil { return err } result.Applied = true if q.processor.autoSave != nil { + q.cfg.debugLog("auto-saving gallery ... [id=%s]", galleryID) if err := q.processor.autoSave(q.ctx, g); err != nil { - return fmt.Errorf("autosave gallery: %w", err) + return fmt.Errorf("auto-save gallery: %w", err) } result.Saved = true } @@ -507,9 +537,15 @@ func (q *processorQueue[Gallery, StackID, ImageID]) stackAdded(evt event.Of[gall return zero, fmt.Errorf("fetch gallery: %w", err) } - data := evt.Data() + stack := evt.Data() - result, err := q.processor.processor.Process(q.ctx, q.pipeline, g, data.ID) + if _, ok := g.Stack(stack.ID); !ok { + return zero, fmt.Errorf("%w [galleryId=%s, stackId=%s]", gallery.ErrStackNotFound, galleryID, stack.ID) + } + + q.cfg.debugLog("running processor on stack ... [galleryId=%s, stackId=%s]", galleryID, stack.ID) + + result, err := q.processor.processor.Process(q.ctx, q.pipeline, g, stack.ID) if err != nil { return result, fmt.Errorf("run processor: %w", err) } @@ -544,6 +580,12 @@ func (q *processorQueue[Gallery, StackID, ImageID]) stackAdded(evt event.Of[gall // return result, true, nil // } +func (cfg runProcessorConfig) debugLog(format string, args ...any) { + if cfg.debug { + log.Printf("[DEBUG] %s", fmt.Sprintf(format, args...)) + } +} + func zeroResult[StackID, ImageID ID]() (zero ProcessorResult[StackID, ImageID]) { return zero }