Skip to content

Commit

Permalink
v2:
Browse files Browse the repository at this point in the history
- update to go 1.21
- minor internal tweaks
- support errors on publish attempts to workers with full input buffers
  • Loading branch information
dcarbone committed Oct 25, 2023
1 parent 8b417b8 commit 0e5cc57
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 97 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/dcarbone/seb
module github.com/dcarbone/seb/v2

go 1.15
go 1.21
203 changes: 108 additions & 95 deletions seb.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math/rand"
"slices"
"strconv"
"sync"
"time"
Expand All @@ -16,20 +17,15 @@ func init() {
GlobalBus = New()
}

var RecipientNotFoundErr = errors.New("target recipient not found")
var (
ErrRecipientNotFound = errors.New("target recipient not found")

func IsRecipientNotFoundErr(err error) bool {
for err != nil {
if err == RecipientNotFoundErr {
return true
}
err = errors.Unwrap(err)
}
return false
}
ErrWorkerClosed = errors.New("worker is closed")
ErrWorkerBufferFull = errors.New("worker input buffer is full")
)

type Reply struct {
Data interface{}
Data any
Err error
}

Expand All @@ -39,7 +35,7 @@ type Event struct {
ID string // random
Originated int64 // unixnano timestamp of when this event was created
Topic string // topic of event
Data interface{} // no attempt is made to prevent memory sharing
Data any // no attempt is made to prevent memory sharing
Reply chan<- Reply // if defined, hints to recipients that a response is desired.
}

Expand All @@ -50,7 +46,7 @@ type EventHandler func(Event)
type EventChannel chan Event

type worker struct {
mu sync.RWMutex
mu sync.Mutex
id string
closed bool
in chan Event
Expand All @@ -59,37 +55,36 @@ type worker struct {
}

func newWorker(id string, fn EventHandler) *worker {
nw := new(worker)
nw.id = id
nw.in = make(chan Event, 100)
nw.out = make(chan Event)
nw.fn = fn
go nw.publish()
go nw.process()
return nw
w := &worker{
id: id,
in: make(chan Event, 100),
out: make(chan Event),
fn: fn,
}
go w.publish()
go w.process()
return w
}

func (nw *worker) close() {
nw.mu.Lock()
func (w *worker) close() {
w.mu.Lock()
defer w.mu.Unlock()

if nw.closed {
nw.mu.Unlock()
if w.closed {
return
}

nw.closed = true
w.closed = true

close(nw.in)
close(nw.out)
if len(nw.in) > 0 {
for range nw.in {
close(w.in)
close(w.out)
if len(w.in) > 0 {
for range w.in {
}
}

nw.mu.Unlock()
}

func (nw *worker) publish() {
func (w *worker) publish() {
var wait *time.Timer

defer func() {
Expand All @@ -98,13 +93,13 @@ func (nw *worker) publish() {
}
}()

for n := range nw.in {
// todo: it is probably not necessary to test here as if the worker is closed between this event
// being processed and the next event in, it is removed from the map of available workers to push
// to before close == true, meaning it cannot have new messages pushed to it.
nw.mu.RLock()
if nw.closed {
nw.mu.RUnlock()
for n := range w.in {
// acquire lock to prevent closure while attempting to push a message
w.mu.Lock()

// test if worker was closed between message push request and now.
if w.closed {
w.mu.Unlock()
return
}

Expand All @@ -118,39 +113,41 @@ func (nw *worker) publish() {
// attempt to push message to consumer, allowing for up to 5 seconds of blocking
// if block window passes, drop on floor
select {
case nw.out <- n:
case w.out <- n:
if !wait.Stop() {
<-wait.C
}
case <-wait.C:
}

nw.mu.RUnlock()
w.mu.Unlock()
}
}

func (nw *worker) process() {
// nw.out is an unbuffered channel. it blocks until any preceding event has been handled by the registered
func (w *worker) process() {
// w.out is an unbuffered channel. it blocks until any preceding event has been handled by the registered
// handler. it is closed once the publish() loop breaks.
for n := range nw.out {
nw.fn(n)
for n := range w.out {
w.fn(n)
}
}

func (nw *worker) push(n Event) {
// hold an rlock for the duration of the push attempt to ensure that, at a minimum, the message is added to the
func (w *worker) push(n Event) error {
// hold a lock for the duration of the push attempt to ensure that, at a minimum, the message is added to the
// channel before it can be closed.
nw.mu.RLock()
defer nw.mu.RUnlock()
w.mu.Lock()
defer w.mu.Unlock()

if nw.closed {
return
if w.closed {
return ErrWorkerClosed
}

// attempt to push message to ingest chan. if chan is full, drop on floor
select {
case nw.in <- n:
case w.in <- n:
return nil
default:
return ErrWorkerBufferFull
}
}

Expand All @@ -172,44 +169,39 @@ func (s *lockableRandSource) Int63() int64 {
return v
}

func (s *lockableRandSource) Seed(v int64) {
s.mu.Lock()
s.src.Seed(v)
s.mu.Unlock()
}

type Bus struct {
mu sync.RWMutex
ws map[string]*worker
rs rand.Source
mu sync.RWMutex
rand *lockableRandSource
topics map[string]*worker
}

// New creates a new Bus for immediate use
func New() *Bus {
b := new(Bus)
b.ws = make(map[string]*worker)
b.rs = newLockableRandSource()
return b
b := Bus{
topics: make(map[string]*worker),
rand: newLockableRandSource(),
}
return &b
}

// Push will immediately send a new event to all currently registered recipients
func (b *Bus) Push(topic string, data interface{}) {
b.sendEvent(b.buildEvent(topic, data))
func (b *Bus) Push(topic string, data any) error {
return b.sendEvent(b.buildEvent(topic, data))
}

// PushTo attempts to push an even to a specific recipient
func (b *Bus) PushTo(to, topic string, data interface{}) error {
func (b *Bus) PushTo(to, topic string, data any) error {
return b.sendEventTo(to, b.buildEvent(topic, data))
}

// Request will publish a new event with the Reply chan defined, blocking until a single response has been received
// or the provided context expires
func (b *Bus) Request(ctx context.Context, topic string, data interface{}) (Reply, error) {
func (b *Bus) Request(ctx context.Context, topic string, data any) (Reply, error) {
return b.doRequest(ctx, "", topic, data)
}

// RequestFrom attempts to request a response from a specific recipient
func (b *Bus) RequestFrom(ctx context.Context, to, topic string, data interface{}) (Reply, error) {
func (b *Bus) RequestFrom(ctx context.Context, to, topic string, data any) (Reply, error) {
return b.doRequest(ctx, to, topic, data)
}

Expand All @@ -232,14 +224,14 @@ func (b *Bus) AttachHandler(id string, fn EventHandler) (string, bool) {
defer b.mu.Unlock()

if id == "" {
id = strconv.FormatInt(b.rs.Int63(), 10)
id = strconv.FormatInt(b.rand.Int63(), 10)
}

if w, replaced = b.ws[id]; replaced {
if w, replaced = b.topics[id]; replaced {
w.close()
}

b.ws[id] = newWorker(id, fn)
b.topics[id] = newWorker(id, fn)

return id, replaced
}
Expand Down Expand Up @@ -281,9 +273,9 @@ func (b *Bus) DetachRecipient(id string) bool {
b.mu.Lock()
defer b.mu.Unlock()

if w, ok := b.ws[id]; ok {
if w, ok := b.topics[id]; ok {
go w.close()
delete(b.ws, id)
delete(b.topics, id)
return ok
}

Expand All @@ -297,21 +289,21 @@ func (b *Bus) DetachAllRecipients() int {
defer b.mu.Unlock()

// count how many are in there right now
cnt := len(b.ws)
cnt := len(b.topics)

// close all current in separate goroutine
for _, w := range b.ws {
for _, w := range b.topics {
go w.close()
}

b.ws = make(map[string]*worker)
b.topics = make(map[string]*worker)

return cnt
}

func (b *Bus) buildEvent(t string, d interface{}) Event {
func (b *Bus) buildEvent(t string, d any) Event {
n := Event{
ID: strconv.FormatInt(b.rs.Int63(), 10),
ID: strconv.FormatInt(b.rand.Int63(), 10),
Originated: time.Now().UnixNano(),
Topic: t,
Data: d,
Expand All @@ -320,26 +312,39 @@ func (b *Bus) buildEvent(t string, d interface{}) Event {
}

// sendEvent immediately calls each handler with the new event
func (b *Bus) sendEvent(ev Event) {
func (b *Bus) sendEvent(ev Event) error {
b.mu.RLock()
defer b.mu.RUnlock()

for _, w := range b.ws {
w.push(ev)
var (
err error
finalErr []error
)

for i := range b.topics {
if err = b.topics[i].push(ev); err != nil {
finalErr = append(finalErr, fmt.Errorf("worker %q produced error on push: %w", b.topics[i].id, err))
}
}

b.mu.RUnlock()

if len(finalErr) > 0 {
return errors.Join(finalErr...)
}

return nil
}

func (b *Bus) sendEventTo(to string, ev Event) error {
b.mu.RLock()
defer b.mu.RUnlock()
if w, ok := b.ws[to]; ok {
w.push(ev)
return nil
if w, ok := b.topics[to]; ok {
return w.push(ev)
}
return RecipientNotFoundErr
return fmt.Errorf("%w: %s", ErrRecipientNotFound, to)
}

func (b *Bus) doRequest(ctx context.Context, to, topic string, data interface{}) (Reply, error) {
func (b *Bus) doRequest(ctx context.Context, to, topic string, data any) (Reply, error) {
ch := make(chan Reply)
defer close(ch)

Expand All @@ -351,7 +356,9 @@ func (b *Bus) doRequest(ctx context.Context, to, topic string, data interface{})
}

if to == "" {
b.sendEvent(ev)
if err := b.sendEvent(ev); err != nil {
return Reply{}, err
}
} else if err := b.sendEventTo(to, ev); err != nil {
return Reply{}, err
}
Expand Down Expand Up @@ -380,9 +387,12 @@ func eventFilterFunc(topics []string, fn EventHandler) EventHandler {
}
}
}

// create local copy of topics
tps := slices.Clone(topics)
return func(event Event) {
for _, topic := range topics {
if topic == event.Topic {
for i := range tps {
if tps[i] == event.Topic {
fn(event)
return
}
Expand All @@ -400,9 +410,12 @@ func eventChanFilterFunc(topics []string, ch EventChannel) EventHandler {
}
}
}

// create local copy of topics
tps := slices.Clone(topics)
return func(event Event) {
for _, topic := range topics {
if topic == event.Topic {
for i := range tps {
if tps[i] == event.Topic {
ch <- event
return
}
Expand Down

0 comments on commit 0e5cc57

Please sign in to comment.