Skip to content

Commit

Permalink
update server-side events
Browse files Browse the repository at this point in the history
  • Loading branch information
zmh-program committed Nov 12, 2023
1 parent c381818 commit 607df16
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
4 changes: 2 additions & 2 deletions manager/transhipment.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ func sendStreamTranshipmentResponse(c *gin.Context, form TranshipmentForm, id st

c.Stream(func(w io.Writer) bool {
if resp, ok := <-channel; ok {
c.SSEvent("message", resp)
c.Render(-1, utils.NewEvent(resp))
return true
}

w.Write([]byte("[DATA: DONE]"))
c.Render(-1, utils.NewEndEvent())
return false
})
}
74 changes: 74 additions & 0 deletions utils/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,83 @@ import (
"chat/globals"
"crypto/tls"
"fmt"
"io"
"net/http"
"strings"
)

var dataReplacer = strings.NewReplacer(
"\n", "\ndata:",
"\r", "\\r",
)

type StreamEvent struct {
Event string `json:"event"`
Id string `json:"id"`
Data interface{} `json:"data"`
}

type stringWriter interface {
io.Writer
writeString(string) (int, error)
}

type stringWrapper struct {
io.Writer
}

func (w stringWrapper) writeString(str string) (int, error) {
return w.Writer.Write([]byte(str))
}

func checkWriter(writer io.Writer) stringWriter {
if w, ok := writer.(stringWriter); ok {
return w
} else {
return stringWrapper{writer}
}
}

func encode(writer io.Writer, event StreamEvent) error {
w := checkWriter(writer)
return writeData(w, event.Data)
}

func writeData(w stringWriter, data interface{}) error {
dataReplacer.WriteString(w, fmt.Sprint(data))
if strings.HasPrefix(data.(string), "data") {
w.writeString("\n\n")
}
return nil
}

func (r StreamEvent) Render(w http.ResponseWriter) error {
r.WriteContentType(w)
return encode(w, r)
}

func (r StreamEvent) WriteContentType(w http.ResponseWriter) {
header := w.Header()
header["Content-Type"] = []string{"text/event-stream"}

if _, exist := header["Cache-Control"]; !exist {
header["Cache-Control"] = []string{"no-cache"}
}
}

func NewEvent(data interface{}) StreamEvent {
chunk := Marshal(data)
return StreamEvent{
Data: fmt.Sprintf("data: %s", chunk),
}
}

func NewEndEvent() StreamEvent {
return StreamEvent{
Data: "data: [DONE]",
}
}

func SSEClient(method string, uri string, headers map[string]string, body interface{}, callback func(string) error) error {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}

Expand Down

0 comments on commit 607df16

Please sign in to comment.