Skip to content

Commit

Permalink
fix channel data sync error in different thread
Browse files Browse the repository at this point in the history
  • Loading branch information
zmh-program committed Oct 21, 2023
1 parent f744404 commit 981b7e2
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 57 deletions.
4 changes: 2 additions & 2 deletions app/src/conf.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import axios from "axios";

export const version = "3.4.3";
export const deploy: boolean = true;
export const version = "3.4.4";
export const deploy: boolean = false;
export let rest_api: string = "http://localhost:8094";
export let ws_api: string = "ws://localhost:8094";

Expand Down
3 changes: 2 additions & 1 deletion app/src/conversation/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export type StreamMessage = {
};

export type ChatProps = {
type?: string;
message: string;
model: string;
web?: boolean;
Expand Down Expand Up @@ -70,7 +71,7 @@ export class Connection {
}, 500);
}
} catch {
this.triggerCallback({
if (t !== undefined) this.triggerCallback({
message: t("request-failed"),
end: true,
});
Expand Down
6 changes: 6 additions & 0 deletions app/src/conversation/conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ export class Conversation {
);
this.refer = refer;
this.load(data);

this.connection?.sendWithRetry(null, {
type: "share",
message: this.refer,
model: "gpt-3.5-turbo",
});
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion app/src/routes/Home.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ function ChatWrapper() {
): Promise<boolean> {
const message: string = formatMessage(file, data);
if (message.length > 0 && data.trim().length > 0) {
if (await manager.send(t, auth, { message, web, model })) {
if (await manager.send(t, auth, { message, web, model, type: "chat" })) {
clearFile();
return true;
}
Expand Down
89 changes: 36 additions & 53 deletions manager/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
ShareType = "share"
)

type Stack []*conversation.FormMessage
type Stack chan *conversation.FormMessage

type Connection struct {
conn *utils.WebSocket
Expand All @@ -27,14 +27,12 @@ type Connection struct {
}

func NewConnection(conn *utils.WebSocket, auth bool, hash string, bufferSize int) *Connection {
buf := &Connection{
return &Connection{
conn: conn,
auth: auth,
hash: hash,
stack: make(Stack, bufferSize),
}
buf.ReadWorker()
return buf
}

func (c *Connection) GetConn() *utils.WebSocket {
Expand All @@ -50,50 +48,40 @@ func (c *Connection) GetStack() Stack {
}

func (c *Connection) ReadWorker() {
go func() {
for {
form := utils.ReadForm[conversation.FormMessage](c.conn)
if form.Type == "" {
form.Type = ChatType
}

c.Write(form)
for {
form := utils.ReadForm[conversation.FormMessage](c.conn)
if form == nil {
break
}

if form == nil {
return
}
if form.Type == "" {
form.Type = ChatType
}
}()
}

func (c *Connection) Write(data *conversation.FormMessage) {
c.stack = append(c.stack, data)
c.Write(form)
}
}

func (c *Connection) Read() (*conversation.FormMessage, bool) {
if len(c.stack) == 0 {
return nil, false
func (c *Connection) Write(data *conversation.FormMessage) {
if len(c.stack) == cap(c.stack) {
c.Skip()
}
form := c.stack[0]
c.Skip()
return form, true
c.stack <- data
}

func (c *Connection) ReadWithBlock() *conversation.FormMessage {
// return: nil if connection is closed
for {
if form, ok := c.Read(); ok {
return form
}
}
func (c *Connection) Read() *conversation.FormMessage {
form := <-c.stack
return form
}

func (c *Connection) Peek() *conversation.FormMessage {
// return nil if no message is received
if len(c.stack) == 0 {
select {
case form := <-c.stack:
utils.InsertChannel(c.stack, form, 0)
return form
default:
return nil
}
return c.ReadWithBlock()
}

func (c *Connection) PeekWithType(t string) *conversation.FormMessage {
Expand All @@ -110,10 +98,7 @@ func (c *Connection) PeekWithType(t string) *conversation.FormMessage {
}

func (c *Connection) Skip() {
if len(c.stack) == 0 {
return
}
c.stack = c.stack[1:]
<-c.stack
}

func (c *Connection) GetDB() *sql.DB {
Expand All @@ -132,25 +117,23 @@ func (c *Connection) SendClient(message globals.ChatSegmentResponse) error {
return c.conn.SendJSON(message)
}

func (c *Connection) Handle(handler func(*conversation.FormMessage) error) {
defer c.conn.DeferClose()

func (c *Connection) Process(handler func(*conversation.FormMessage) error) {
for {
form := c.ReadWithBlock()
if form == nil {
return
}

if !c.Lock() {
if form := c.Read(); form != nil {
if err := handler(form); err != nil {
return
}
} else {
return
}
}
}

if err := handler(form); err != nil {
return
}
func (c *Connection) Handle(handler func(*conversation.FormMessage) error) {
defer c.conn.DeferClose()

c.Release()
}
go c.Process(handler)
c.ReadWorker()
}

func (c *Connection) Lock() bool {
Expand Down
11 changes: 11 additions & 0 deletions utils/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,14 @@ func MultiF[T comparable](condition bool, tval func() T, fval T) T {
return fval
}
}

func InsertChannel[T any](ch chan T, value T, index int) {
var arr []T
for i := 0; i < len(ch); i++ {
arr = append(arr, <-ch)
}
arr = Insert(arr, index, value)
for _, v := range arr {
ch <- v
}
}

0 comments on commit 981b7e2

Please sign in to comment.