From 6e491acba070d164fe7c9ceb729309712cf0df50 Mon Sep 17 00:00:00 2001 From: Saman Hosseini Date: Mon, 6 Nov 2023 23:04:44 +0100 Subject: [PATCH] fix(command/cmdbus): track dispatch before publishing event fix(builtin_test.go): prevent panic on context.Canceled error to avoid unnecessary panic when context is cancelled fix(handler_test.go): replace context.WithTimeout with context.WithCancel to avoid unnecessary timeout fix(bus.go): change WithArtificialDelay return type to Option for better type safety --- command/builtin/builtin_test.go | 5 ++++- command/cmdbus/bus.go | 12 ++++++------ command/handler_test.go | 6 ++---- event/eventbus/bus.go | 2 +- 4 files changed, 13 insertions(+), 12 deletions(-) diff --git a/command/builtin/builtin_test.go b/command/builtin/builtin_test.go index f8a16a6..b3fe7d3 100644 --- a/command/builtin/builtin_test.go +++ b/command/builtin/builtin_test.go @@ -2,6 +2,7 @@ package builtin_test import ( "context" + "errors" "testing" "time" @@ -297,7 +298,9 @@ func TestDeleteAggregate_CustomEvent_MatchAll(t *testing.T) { func panicOn(errs <-chan error) { for err := range errs { - panic(err) + if !errors.Is(err, context.Canceled) { + panic(err) + } } } diff --git a/command/cmdbus/bus.go b/command/cmdbus/bus.go index 04801a3..803bf0f 100644 --- a/command/cmdbus/bus.go +++ b/command/cmdbus/bus.go @@ -290,12 +290,6 @@ func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts Payload: load, }) - b.debugLog("publishing %q event ...", evt.Name()) - - if err := b.bus.Publish(ctx, evt.Any()); err != nil { - return fmt.Errorf("publish %q event: %w", evt.Name(), err) - } - out := make(chan error) accepted := make(chan struct{}) aborted := make(chan struct{}) @@ -313,6 +307,12 @@ func (b *Bus[ErrorCode]) Dispatch(ctx context.Context, cmd command.Command, opts defer b.cleanupDispatch(cmd.ID()) + b.debugLog("publishing %q event ...", evt.Name()) + + if err := b.bus.Publish(ctx, evt.Any()); err != nil { + return fmt.Errorf("publish %q event: %w", evt.Name(), err) + } + var timeout <-chan time.Time if b.assignTimeout > 0 { timer := time.NewTimer(b.assignTimeout) diff --git a/command/handler_test.go b/command/handler_test.go index 2997736..8b7a804 100644 --- a/command/handler_test.go +++ b/command/handler_test.go @@ -20,10 +20,10 @@ func TestHandler_Handle(t *testing.T) { enc := newEncoder() ebus := eventbus.New() subBus := cmdbus.New[int](enc, ebus) - pubBus := cmdbus.New[int](enc, ebus) + pubBus := cmdbus.New[int](enc, ebus, cmdbus.AssignTimeout(0)) h := command.NewHandler[any](subBus) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() handled := make(chan command.Command) @@ -52,9 +52,7 @@ func TestHandler_Handle(t *testing.T) { case err, ok := <-errs: if ok { t.Fatal(err) - break } - break case h := <-handled: if h.ID() != cmd.ID() || h.Name() != cmd.Name() || !reflect.DeepEqual(h.Payload(), cmd.Payload()) { t.Fatalf("handled Command differs from dispatched Command. want=%v got=%v", cmd, h) diff --git a/event/eventbus/bus.go b/event/eventbus/bus.go index 2b6aa55..c295d48 100644 --- a/event/eventbus/bus.go +++ b/event/eventbus/bus.go @@ -53,7 +53,7 @@ type Option func(*chanbus) // the rate of event publishing. The delay duration is specified by the provided // time.Duration value. The function returns an Option that can be used to // configure a chanbus instance. -func WithArtificialDelay(delay time.Duration) func(*chanbus) { +func WithArtificialDelay(delay time.Duration) Option { return func(c *chanbus) { c.artificialDelay = delay }