Skip to content

Commit

Permalink
util: move sse encoding and typed sync/Pool wrapped into util pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
Wessie committed Jan 21, 2024
1 parent 0b777d9 commit 4e91d93
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 49 deletions.
35 changes: 5 additions & 30 deletions templates/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package templates
import (
"bytes"
"io"
"sync"

"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/util/pool"
)

type Executor struct {
site *Site
pool *Pool[*bytes.Buffer]
pool *pool.ResetPool[*bytes.Buffer]
}

func NewExecutor(site *Site) *Executor {
return &Executor{
site: site,
pool: NewPool(func() *bytes.Buffer { return new(bytes.Buffer) }),
pool: pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) }),
}
}

Expand Down Expand Up @@ -49,6 +49,8 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i
}

b := e.pool.Get()
defer e.pool.Put(b)

err = tmpl.ExecuteTemplate(b, template, input)
if err != nil {
return errors.E(op, err)
Expand All @@ -60,30 +62,3 @@ func (e *Executor) ExecuteTemplate(theme, page string, template string, output i
}
return nil
}

type Resetable interface {
Reset()
}

// Pool is a sync.Pool wrapped with a generic Resetable interface, the pool calls
// Reset before returning an item to the pool.
type Pool[T Resetable] struct {
p sync.Pool
}

func NewPool[T Resetable](newFn func() T) *Pool[T] {
return &Pool[T]{
sync.Pool{
New: func() interface{} { return newFn() },
},
}
}

func (p *Pool[T]) Get() T {
return p.p.Get().(T)
}

func (p *Pool[T]) Put(v T) {
v.Reset()
p.p.Put(v)
}
43 changes: 43 additions & 0 deletions util/pool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package pool

import "sync"

// Pool is a sync.Pool wrapped for type-safety
type Pool[T any] struct {
p sync.Pool
}

func NewPool[T any](newFn func() T) *Pool[T] {
return &Pool[T]{
sync.Pool{
New: func() any { return newFn() },
},
}
}

func (p *Pool[T]) Get() T {
return p.p.Get().(T)
}

func (p *Pool[T]) Put(v T) {
p.p.Put(v)
}

type Resetable interface {
Reset()
}

// ResetPool is a sync.Pool wrapped with a generic Resetable interface, the pool calls
// Reset before returning an item to the pool.
type ResetPool[T Resetable] struct {
*Pool[T]
}

func NewResetPool[T Resetable](newFn func() T) *ResetPool[T] {
return &ResetPool[T]{NewPool(newFn)}
}

func (p *ResetPool[T]) Put(v T) {
v.Reset()
p.p.Put(v)
}
53 changes: 53 additions & 0 deletions util/sse/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package sse

import (
"bytes"
"strconv"
"time"

"github.com/R-a-dio/valkyrie/util/pool"
)

type Event struct {
ID []byte
Name string
Retry time.Duration
Data []byte
}

var bufferPool = pool.NewResetPool(func() *bytes.Buffer { return new(bytes.Buffer) })

func (e Event) Encode() []byte {
b := bufferPool.Get()
defer bufferPool.Put(b)

if e.ID != nil {
b.WriteString("id: ")
b.Write(e.ID)
b.WriteString("\n")
}
if e.Name != "" {
b.WriteString("event: ")
b.WriteString(e.Name)
b.WriteString("\n")
}
if e.Retry != 0 {
b.WriteString("retry: ")
b.WriteString(strconv.Itoa(int(e.Retry.Milliseconds())))
b.WriteString("\n")
}
if e.Data != nil {
for _, line := range bytes.Split(e.Data, []byte("\n")) {
if len(line) == 0 {
b.WriteString("data\n")
} else {
b.WriteString("data: ")
b.Write(line)
b.WriteString("\n")
}
}
}
b.WriteString("\n")

return b.Bytes()
}
68 changes: 68 additions & 0 deletions util/sse/sse_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package v1

import (
"bytes"
"testing"
"time"
)

type eventCase struct {
desc string
e Event
expect string
}

var eventEncodeCases = []eventCase{
{
"simple message",
Event{Name: EventQueue, Data: []byte("queue information")},
"event: queue\ndata: queue information\n\n",
}, {
"simple newlines",
Event{Name: EventThread, Data: []byte("some data\nwith\nnewlines")},
"event: thread\ndata: some data\ndata: with\ndata: newlines\n\n",
}, {
"double newline",
Event{Name: EventMetadata, Data: []byte("some data\n\nwith newlines")},
"event: metadata\ndata: some data\ndata\ndata: with newlines\n\n",
}, {
"encode id",
Event{ID: []byte("50")},
"id: 50\n\n",
}, {
"encode retry",
Event{Retry: time.Second * 10},
"retry: 10000\n\n",
}, {
"encode everything",
Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")},
"id: 100\nevent: metadata\nretry: 50000\ndata: some data\ndata: and a newline\n\n",
},
}

func TestEventEncode(t *testing.T) {
for _, c := range eventEncodeCases {
t.Run(c.desc, func(t *testing.T) {
result := c.e.Encode()
expect := []byte(c.expect)

if !bytes.Equal(result, expect) {
t.Errorf("%#v != %#v\n", string(result), string(expect))
}
})
}
}

func BenchmarkEventEncodeSimple(b *testing.B) {
e := Event{Name: EventMetadata, Data: []byte("some data\nand a newline")}
for i := 0; i < b.N; i++ {
e.Encode()
}
}

func BenchmarkEventEncodeEverything(b *testing.B) {
e := Event{ID: []byte("100"), Name: EventMetadata, Retry: time.Second * 50, Data: []byte("some data\nand a newline")}
for i := 0; i < b.N; i++ {
e.Encode()
}
}
20 changes: 1 addition & 19 deletions website/api/v1/sse.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package v1

import (
"fmt"
"log"
"maps"
"net/http"
"sync"
"time"
)

const (
Expand Down Expand Up @@ -48,7 +46,6 @@ func NewStream() *Stream {
shutdownCh: make(chan struct{}),
}
go s.run()
go s.ping()
return s
}

Expand Down Expand Up @@ -99,20 +96,6 @@ func (s *Stream) SendEvent(event EventName, data []byte) {
}
}

func (s *Stream) ping() {
t := time.NewTicker(time.Second * 30)
defer t.Stop()

for range t.C {
s.SendEvent(EventPing, []byte("ping"))
select {
case <-s.shutdownCh:
return
default:
}
}
}

func (s *Stream) run() {
for req := range s.reqs {
switch req.cmd {
Expand Down Expand Up @@ -182,8 +165,7 @@ type request struct {
}

func newMessage(event EventName, data []byte) message {
// TODO: handle newlines in data
return message(fmt.Sprintf("event: %s\ndata: %s\n\n", event, data))
return message(sse.Event{Name: event, Data: data}.Encode())
}

type message []byte

0 comments on commit 4e91d93

Please sign in to comment.