diff --git a/templates/executor.go b/templates/executor.go index 9309ed85..c5d16a1c 100644 --- a/templates/executor.go +++ b/templates/executor.go @@ -3,20 +3,20 @@ package templates import ( "bytes" "io" - "sync" "github.com/R-a-dio/valkyrie/errors" + "github.com/R-a-dio/valkyrie/util/pool" ) type Executor struct { site *Site - pool *Pool[*bytes.Buffer] + pool *pool.ResetPool[*bytes.Buffer] } func NewExecutor(site *Site) *Executor { return &Executor{ site: site, - pool: NewPool(func() *bytes.Buffer { return new(bytes.Buffer) }), + pool: pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) }), } } @@ -49,6 +49,8 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i } b := e.pool.Get() + defer e.pool.Put(b) + err = tmpl.ExecuteTemplate(b, template, input) if err != nil { return errors.E(op, err) @@ -60,30 +62,3 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i } return nil } - -type Resetable interface { - Reset() -} - -// Pool is a sync.Pool wrapped with a generic Resetable interface, the pool calls -// Reset before returning an item to the pool. -type Pool[T Resetable] struct { - p sync.Pool -} - -func NewPool[T Resetable](newFn func() T) *Pool[T] { - return &Pool[T]{ - sync.Pool{ - New: func() interface{} { return newFn() }, - }, - } -} - -func (p *Pool[T]) Get() T { - return p.p.Get().(T) -} - -func (p *Pool[T]) Put(v T) { - v.Reset() - p.p.Put(v) -} diff --git a/util/pool/pool.go b/util/pool/pool.go new file mode 100644 index 00000000..fe2de021 --- /dev/null +++ b/util/pool/pool.go @@ -0,0 +1,43 @@ +package pool + +import "sync" + +// Pool is a sync.Pool wrapped for type-safety +type Pool[T any] struct { + p sync.Pool +} + +func NewPool[T any](newFn func() T) *Pool[T] { + return &Pool[T]{ + sync.Pool{ + New: func() any { return newFn() }, + }, + } +} + +func (p *Pool[T]) Get() T { + return p.p.Get().(T) +} + +func (p *Pool[T]) Put(v T) { + p.p.Put(v) +} + +type Resetable interface { + Reset() +} + +// ResetPool is a sync.Pool wrapped with a generic Resetable interface, the pool calls +// Reset before returning an item to the pool. +type ResetPool[T Resetable] struct { + *Pool[T] +} + +func NewResetPool[T Resetable](newFn func() T) *ResetPool[T] { + return &ResetPool[T]{NewPool(newFn)} +} + +func (p *ResetPool[T]) Put(v T) { + v.Reset() + p.p.Put(v) +} diff --git a/util/sse/sse.go b/util/sse/sse.go new file mode 100644 index 00000000..3abdee85 --- /dev/null +++ b/util/sse/sse.go @@ -0,0 +1,53 @@ +package sse + +import ( + "bytes" + "strconv" + "time" + + "github.com/R-a-dio/valkyrie/util/pool" +) + +type Event struct { + ID []byte + Name string + Retry time.Duration + Data []byte +} + +var bufferPool = pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) }) + +func (e Event) Encode() []byte { + b := bufferPool.Get() + defer bufferPool.Put(b) + + if e.ID != nil { + b.WriteString("id: ") + b.Write(e.ID) + b.WriteString("\n") + } + if e.Name != "" { + b.WriteString("event: ") + b.WriteString(e.Name) + b.WriteString("\n") + } + if e.Retry != 0 { + b.WriteString("retry: ") + b.WriteString(strconv.Itoa(int(e.Retry.Milliseconds()))) + b.WriteString("\n") + } + if e.Data != nil { + for _, line := range bytes.Split(e.Data, []byte("\n")) { + if len(line) == 0 { + b.WriteString("data\n") + } else { + b.WriteString("data: ") + b.Write(line) + b.WriteString("\n") + } + } + } + b.WriteString("\n") + + return b.Bytes() +} diff --git a/util/sse/sse_test.go b/util/sse/sse_test.go new file mode 100644 index 00000000..f5265dc7 --- /dev/null +++ b/util/sse/sse_test.go @@ -0,0 +1,68 @@ +package v1 + +import ( + "bytes" + "testing" + "time" +) + +type eventCase struct { + desc string + e Event + expect string +} + +var eventEncodeCases = []eventCase{ + { + "simple message", + Event{Name: EventQueue, Data: []byte("queue information")}, + "event: queue\ndata: queue information\n\n", + }, { + "simple newlines", + Event{Name: EventThread, Data: []byte("some data\nwith\nnewlines")}, + "event: thread\ndata: some data\ndata: with\ndata: newlines\n\n", + }, { + "double newline", + Event{Name: EventMetadata, Data: []byte("some data\n\nwith newlines")}, + "event: metadata\ndata: some data\ndata\ndata: with newlines\n\n", + }, { + "encode id", + Event{ID: []byte("50")}, + "id: 50\n\n", + }, { + "encode retry", + Event{Retry: time.Second * 10}, + "retry: 10000\n\n", + }, { + "encode everything", + Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")}, + "id: 100\nevent: metadata\nretry: 50000\ndata: some data\ndata: and a newline\n\n", + }, +} + +func TestEventEncode(t *testing.T) { + for _, c := range eventEncodeCases { + t.Run(c.desc, func(t *testing.T) { + result := c.e.Encode() + expect := []byte(c.expect) + + if !bytes.Equal(result, expect) { + t.Errorf("%#v != %#v\n", string(result), string(expect)) + } + }) + } +} + +func BenchmarkEventEncodeSimple(b *testing.B) { + e := Event{Name: EventMetadata, Data: []byte("some data\nand a newline")} + for i := 0; i < b.N; i++ { + e.Encode() + } +} + +func BenchmarkEventEncodeEverything(b *testing.B) { + e := Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")} + for i := 0; i < b.N; i++ { + e.Encode() + } +} diff --git a/website/api/v1/sse.go b/website/api/v1/sse.go index 290625ba..78c58511 100644 --- a/website/api/v1/sse.go +++ b/website/api/v1/sse.go @@ -1,12 +1,10 @@ package v1 import ( - "fmt" "log" "maps" "net/http" "sync" - "time" ) const ( @@ -48,7 +46,6 @@ func NewStream() *Stream { shutdownCh: make(chan struct{}), } go s.run() - go s.ping() return s } @@ -99,20 +96,6 @@ func (s *Stream) SendEvent(event EventName, data []byte) { } } -func (s *Stream) ping() { - t := time.NewTicker(time.Second * 30) - defer t.Stop() - - for range t.C { - s.SendEvent(EventPing, []byte("ping")) - select { - case <-s.shutdownCh: - return - default: - } - } -} - func (s *Stream) run() { for req := range s.reqs { switch req.cmd { @@ -182,8 +165,7 @@ type request struct { } func newMessage(event EventName, data []byte) message { - // TODO: handle newlines in data - return message(fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)) + return message(sse.Event{Name: event, Data: data}.Encode()) } type message []byte