Skip to content

Commit

Permalink
Enable MultiProcessor in processor.go (#217)
Browse files Browse the repository at this point in the history
* add multiprocessor feature and 1 ut

* more ut

* linter

* address comments

* small refractors

* change receiver to receiverimpl

* address comments and rename ReceiverImpl to ReceiverEx

* small fix

* revert

* panic handle in Start()

* linter
  • Loading branch information
karenychen authored Apr 22, 2024
1 parent 1438c81 commit 1fd7ed8
Show file tree
Hide file tree
Showing 7 changed files with 350 additions and 48 deletions.
1 change: 1 addition & 0 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
golang.org/x/sync v0.7.0
google.golang.org/protobuf v1.33.0
)

Expand Down
4 changes: 4 additions & 0 deletions v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
34 changes: 20 additions & 14 deletions v2/metrics/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

const (
subsystem = "goshuttle_handler"
receiverNameLabel = "receiverName"
messageTypeLabel = "messageType"
deliveryCountLabel = "deliveryCount"
successLabel = "success"
Expand All @@ -29,12 +30,12 @@ func NewRegistry() *Registry {
Name: "message_received_total",
Help: "total number of messages received by the processor",
Subsystem: subsystem,
}, []string{}),
}, []string{receiverNameLabel}),
MessageHandledCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_handled_total",
Help: "total number of messages handled by this handler",
Subsystem: subsystem,
}, []string{messageTypeLabel, deliveryCountLabel}),
}, []string{receiverNameLabel, messageTypeLabel, deliveryCountLabel}),
MessageLockRenewedCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_lock_renewed_total",
Help: "total number of message lock renewal",
Expand All @@ -49,7 +50,7 @@ func NewRegistry() *Registry {
Name: "concurrent_message_count",
Help: "number of messages being handled concurrently",
Subsystem: subsystem,
}, []string{messageTypeLabel}),
}, []string{receiverNameLabel, messageTypeLabel}),
}
}

Expand Down Expand Up @@ -85,10 +86,10 @@ type Recorder interface {
IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage)
DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage)
IncMessageHandled(msg *azservicebus.ReceivedMessage)
IncMessageReceived(float64)
IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage)
IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage)
IncMessageReceived(receiverName string, count float64)
IncConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage)
DecConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage)
}

// IncMessageLockRenewedSuccess increase the message lock renewal success counter
Expand All @@ -106,20 +107,25 @@ func (m *Registry) IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessag
}

// IncMessageHandled increase the message Handled
func (m *Registry) IncMessageHandled(msg *azservicebus.ReceivedMessage) {
func (m *Registry) IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
labels[receiverNameLabel] = receiverName
labels[deliveryCountLabel] = strconv.FormatUint(uint64(msg.DeliveryCount), 10)
m.MessageHandledCount.With(labels).Inc()
}

// IncConcurrentMessageCount increases the concurrent message counter
func (m *Registry) IncConcurrentMessageCount(msg *azservicebus.ReceivedMessage) {
m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Inc()
func (m *Registry) IncConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
labels[receiverNameLabel] = receiverName
m.ConcurrentMessageCount.With(labels).Inc()
}

// DecConcurrentMessageCount decreases the concurrent message counter
func (m *Registry) DecConcurrentMessageCount(msg *azservicebus.ReceivedMessage) {
m.ConcurrentMessageCount.With(getMessageTypeLabel(msg)).Dec()
func (m *Registry) DecConcurrentMessageCount(receiverName string, msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
labels[receiverNameLabel] = receiverName
m.ConcurrentMessageCount.With(labels).Dec()
}

// IncMessageDeadlineReachedCount increases the message deadline reached counter
Expand All @@ -129,8 +135,8 @@ func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMess
}

// IncMessageReceived increases the message received counter
func (m *Registry) IncMessageReceived(count float64) {
m.MessageReceivedCount.With(map[string]string{}).Add(count)
func (m *Registry) IncMessageReceived(receiverName string, count float64) {
m.MessageReceivedCount.WithLabelValues(receiverName).Add(count)
}

// Informer allows to inspect metrics value stored in the registry at runtime
Expand Down
2 changes: 1 addition & 1 deletion v2/metrics/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestRegistry_Init(t *testing.T) {
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
g.Expect(fRegistry.collectors).To(HaveLen(5))
Metric.IncMessageReceived(10)
Metric.IncMessageReceived("testReceiverName", 10)
}

func TestNewInformerDefault(t *testing.T) {
Expand Down
130 changes: 100 additions & 30 deletions v2/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
Expand All @@ -25,6 +26,18 @@ type MessageSettler interface {
RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

type ReceiverEx struct { // shuttle.Receiver is already an exported interface
name string
sbReceiver Receiver
}

func NewReceiverEx(name string, sbReceiver Receiver) *ReceiverEx {
return &ReceiverEx{
name: name,
sbReceiver: sbReceiver,
}
}

type Handler interface {
Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
Expand All @@ -39,10 +52,10 @@ func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message
// Processor encapsulates the message pump and concurrency handling of servicebus.
// it exposes a handler API to provides a middleware based message processing pipeline.
type Processor struct {
receiver Receiver
receivers map[string]*ReceiverEx
options ProcessorOptions
handle Handler
concurrencyTokens chan struct{} // tracks how many concurrent messages are currently being handled by the processor
concurrencyTokens chan struct{} // tracks how many concurrent messages are currently being handled by the processor, shared across all receivers
}

// ProcessorOptions configures the processor
Expand All @@ -60,8 +73,8 @@ type ProcessorOptions struct {
StartRetryDelayStrategy RetryDelayStrategy
}

func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := ProcessorOptions{
func applyProcessorOptions(options *ProcessorOptions) *ProcessorOptions {
opts := &ProcessorOptions{
MaxConcurrency: 1,
ReceiveInterval: to.Ptr(1 * time.Second),
StartMaxAttempt: 1,
Expand All @@ -81,24 +94,78 @@ func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOpti
opts.StartRetryDelayStrategy = options.StartRetryDelayStrategy
}
}
return opts
}

func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := applyProcessorOptions(options)
receiverEx := NewReceiverEx("receiver", receiver)
return &Processor{
receivers: map[string]*ReceiverEx{receiverEx.name: receiverEx},
handle: handler,
options: *opts,
concurrencyTokens: make(chan struct{}, opts.MaxConcurrency),
}
}

func NewMultiProcessor(receiversEx []*ReceiverEx, handler HandlerFunc, options *ProcessorOptions) *Processor {
opts := applyProcessorOptions(options)
var receivers = make(map[string]*ReceiverEx)
for _, receiver := range receiversEx {
receivers[receiver.name] = receiver
}
return &Processor{
receiver: receiver,
receivers: receivers,
handle: handler,
options: opts,
options: *opts,
concurrencyTokens: make(chan struct{}, opts.MaxConcurrency),
}
}

// Start starts the processor and blocks until an error occurs or the context is canceled.
// Start starts processing on all the receivers of the processor and blocks until all processors are stopped or the context is canceled.
// It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy.
// Returns a combined list of errors encountered during each processor start.
func (p *Processor) Start(ctx context.Context) error {
wg := sync.WaitGroup{}
errChan := make(chan error, len(p.receivers))
for name := range p.receivers {
wg.Add(1)
go func(receiverName string) {
defer func() {
if rec := recover(); rec != nil {
errChan <- fmt.Errorf("panic recovered from processor %s: %v", receiverName, rec)
}
wg.Done()
}()
err := p.startOne(ctx, receiverName)
if err != nil {
errChan <- err
}
}(name)
}
wg.Wait()
close(errChan)
var allErrs []error
for err := range errChan {
allErrs = append(allErrs, err)
}
return errors.Join(allErrs...)
}

// startOne starts a processor with the receiverName and blocks until an error occurs or the context is canceled.
// It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy.
// Returns a combined list of errors during the start attempts or ctx.Err() if the context
// is cancelled during the retries.
func (p *Processor) Start(ctx context.Context) error {
func (p *Processor) startOne(ctx context.Context, receiverName string) error {
receiverEx, ok := p.receivers[receiverName]
if !ok {
return fmt.Errorf("processor %s not found", receiverName)
}
var savedError error
for attempt := 0; attempt < p.options.StartMaxAttempt; attempt++ {
if err := p.start(ctx); err != nil {
if err := p.start(ctx, receiverEx); err != nil {
savedError = errors.Join(savedError, err)
log(ctx, fmt.Sprintf("processor start attempt %d failed: %v", attempt, err))
log(ctx, fmt.Sprintf("processor %s start attempt %d failed: %v", receiverName, attempt, err))
if attempt+1 == p.options.StartMaxAttempt { // last attempt, return early
break
}
Expand All @@ -115,16 +182,18 @@ func (p *Processor) Start(ctx context.Context) error {
}

// start starts the processor and blocks until an error occurs or the context is canceled.
func (p *Processor) start(ctx context.Context) error {
log(ctx, "starting processor")
messages, err := p.receiver.ReceiveMessages(ctx, p.options.MaxConcurrency, nil)
func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error {
receiverName := receiverEx.name
receiver := receiverEx.sbReceiver
log(ctx, fmt.Sprintf("starting processor %s", receiverName))
messages, err := receiver.ReceiveMessages(ctx, p.options.MaxConcurrency, nil)
if err != nil {
return err
return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
}
log(ctx, fmt.Sprintf("received %d messages - initial", len(messages)))
processor.Metric.IncMessageReceived(float64(len(messages)))
log(ctx, fmt.Sprintf("processor %s received %d messages - initial", receiverName, len(messages)))
processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
for _, msg := range messages {
p.process(ctx, msg)
p.process(ctx, receiverEx, msg)
}
for ctx.Err() == nil {
select {
Expand All @@ -133,37 +202,38 @@ func (p *Processor) start(ctx context.Context) error {
if ctx.Err() != nil || maxMessages == 0 {
break
}
messages, err := p.receiver.ReceiveMessages(ctx, maxMessages, nil)
messages, err := receiver.ReceiveMessages(ctx, maxMessages, nil)
if err != nil {
return err
return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err)
}
log(ctx, fmt.Sprintf("received %d messages from processor loop", len(messages)))
processor.Metric.IncMessageReceived(float64(len(messages)))
log(ctx, fmt.Sprintf("processor %s received %d messages from processor loop", receiverName, len(messages)))
processor.Metric.IncMessageReceived(receiverName, float64(len(messages)))
for _, msg := range messages {
p.process(ctx, msg)
p.process(ctx, receiverEx, msg)
}
case <-ctx.Done():
log(ctx, "context done, stop receiving")
log(ctx, fmt.Sprintf("context done, stop receiving from processor %s", receiverName))
break
}
}
log(ctx, "exiting processor")
return ctx.Err()
log(ctx, fmt.Sprintf("exiting processor %s", receiverName))
return fmt.Errorf("processor %s stopped: %w", receiverName, ctx.Err())
}

func (p *Processor) process(ctx context.Context, message *azservicebus.ReceivedMessage) {
func (p *Processor) process(ctx context.Context, receiverEx *ReceiverEx, message *azservicebus.ReceivedMessage) {
receiverName := receiverEx.name
p.concurrencyTokens <- struct{}{}
go func() {
msgContext, cancel := context.WithCancel(ctx)
// cancel messageContext when we get out of this goroutine
defer cancel()
defer func() {
<-p.concurrencyTokens
processor.Metric.IncMessageHandled(message)
processor.Metric.DecConcurrentMessageCount(message)
processor.Metric.IncMessageHandled(receiverName, message)
processor.Metric.DecConcurrentMessageCount(receiverName, message)
}()
processor.Metric.IncConcurrentMessageCount(message)
p.handle.Handle(msgContext, p.receiver, message)
processor.Metric.IncConcurrentMessageCount(receiverName, message)
p.handle.Handle(msgContext, receiverEx.sbReceiver, message)
}()
}

Expand Down
5 changes: 5 additions & 0 deletions v2/processor_fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type fakeReceiver struct {
SetupReceivedMessages chan *azservicebus.ReceivedMessage
*fakeSettler
SetupMaxReceiveCalls int
SetupReceivePanic string
}

func (f *fakeReceiver) ReceiveMessages(_ context.Context, maxMessages int, _ *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) {
Expand All @@ -69,6 +70,10 @@ func (f *fakeReceiver) ReceiveMessages(_ context.Context, maxMessages int, _ *az
}
}

if f.SetupReceivePanic != "" {
panic(f.SetupReceivePanic)
}

// return an error if we request more messages than there are available.
if len(f.ReceiveCalls) >= f.SetupMaxReceiveCalls {
return result, fmt.Errorf("max receive calls exceeded")
Expand Down
Loading

0 comments on commit 1fd7ed8

Please sign in to comment.