Skip to content

Commit

Permalink
feat(goes/esgallery): add Debug option
Browse files Browse the repository at this point in the history
  • Loading branch information
bounoable committed Sep 17, 2022
1 parent 6f2b3ff commit a566f89
Showing 1 changed file with 50 additions and 8 deletions.
58 changes: 50 additions & 8 deletions goes/esgallery/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
stdimage "image"
"io"
"log"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -343,6 +344,12 @@ func NewPostProcessor[
// RunProcessorOption is an option for [*PostProcessor.Run].
type RunProcessorOption func(*runProcessorConfig)

type runProcessorConfig struct {
workers int
discardResults bool
debug bool
}

// Workers returns a [RunProcessorOption] that sets the number of workers for
// [PostProcessor.Run]. Defaults to 1.
func Workers(workers int) RunProcessorOption {
Expand All @@ -363,9 +370,11 @@ func DiscardResults(discard bool) RunProcessorOption {
}
}

type runProcessorConfig struct {
workers int
discardResults bool
// Debug returns a [RunProcessorOption] that enables debug logs.
func Debug(debug bool) RunProcessorOption {
return func(cfg *runProcessorConfig) {
cfg.debug = debug
}
}

// Run runs the post-processor in the background and returns a channel of
Expand All @@ -382,6 +391,13 @@ func (pp *PostProcessor[Gallery, StackID, ImageID]) Run(ctx context.Context, pip
opt(&cfg)
}

cfg.debugLog(
"starting post-processor with options:\n\tWorkers:\t%d\n\tDiscard results:\t%v",
cfg.workers, cfg.discardResults,
)

cfg.debugLog("subscribing to %v events ...", ProcessorTriggerEvents)

events, errs, err := pp.bus.Subscribe(ctx, ProcessorTriggerEvents...)
if err != nil {
return nil, nil, fmt.Errorf("subscribe to %v events: %w", ProcessorTriggerEvents, err)
Expand Down Expand Up @@ -440,6 +456,8 @@ func (q *processorQueue[Gallery, StackID, ImageID]) work() {
shouldPush = true
)

q.cfg.debugLog("handling %q event ...", evt.Name())

switch evt.Name() {
case StackAdded:
result, err = q.stackAdded(event.Cast[gallery.Stack[StackID, ImageID]](evt))
Expand All @@ -454,31 +472,43 @@ func (q *processorQueue[Gallery, StackID, ImageID]) work() {

result.Trigger = evt

galleryID := pick.AggregateID(evt)

if q.processor.autoApply {
if err := q.apply(&result, pick.AggregateID(evt)); err != nil {
if err := q.apply(&result, galleryID); err != nil {
q.fail(fmt.Errorf("apply result: %w", err))
}
}

if !q.cfg.discardResults && shouldPush {
if q.cfg.discardResults {
q.cfg.debugLog("discarding processing result [galleryId=%s, stackId=%s]", galleryID, result.StackID)
continue
}

if shouldPush {
q.push(result)
}
}
}

func (q *processorQueue[Gallery, StackID, ImageID]) apply(result *ProcessorResult[StackID, ImageID], galleryID uuid.UUID) error {
q.cfg.debugLog("fetching gallery to apply result to ... [id=%s]", galleryID)

g, err := q.processor.fetchGallery(q.ctx, galleryID)
if err != nil {
return fmt.Errorf("fetch gallery: %w", err)
}

q.cfg.debugLog("applying processing result ... [galleryId=%s, stackId=%s]", galleryID, result.StackID)
if err := result.Apply(g); err != nil {
return err
}
result.Applied = true

if q.processor.autoSave != nil {
q.cfg.debugLog("auto-saving gallery ... [id=%s]", galleryID)
if err := q.processor.autoSave(q.ctx, g); err != nil {
return fmt.Errorf("autosave gallery: %w", err)
return fmt.Errorf("auto-save gallery: %w", err)
}
result.Saved = true
}
Expand Down Expand Up @@ -507,9 +537,15 @@ func (q *processorQueue[Gallery, StackID, ImageID]) stackAdded(evt event.Of[gall
return zero, fmt.Errorf("fetch gallery: %w", err)
}

data := evt.Data()
stack := evt.Data()

result, err := q.processor.processor.Process(q.ctx, q.pipeline, g, data.ID)
if _, ok := g.Stack(stack.ID); !ok {
return zero, fmt.Errorf("%w [galleryId=%s, stackId=%s]", gallery.ErrStackNotFound, galleryID, stack.ID)
}

q.cfg.debugLog("running processor on stack ... [galleryId=%s, stackId=%s]", galleryID, stack.ID)

result, err := q.processor.processor.Process(q.ctx, q.pipeline, g, stack.ID)
if err != nil {
return result, fmt.Errorf("run processor: %w", err)
}
Expand Down Expand Up @@ -544,6 +580,12 @@ func (q *processorQueue[Gallery, StackID, ImageID]) stackAdded(evt event.Of[gall
// return result, true, nil
// }

func (cfg runProcessorConfig) debugLog(format string, args ...any) {
if cfg.debug {
log.Printf("[DEBUG] %s", fmt.Sprintf(format, args...))
}
}

func zeroResult[StackID, ImageID ID]() (zero ProcessorResult[StackID, ImageID]) {
return zero
}
Expand Down

0 comments on commit a566f89

Please sign in to comment.