Skip to content

Commit

Permalink
Merge pull request #8 from iflytek/feature/optimize/penglin2
Browse files Browse the repository at this point in the history
optimize session mode
  • Loading branch information
niclausse authored Apr 25, 2024
2 parents 447a137 + 4561400 commit fec983d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 32 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ install-goreleaser: ## check license if not exist install go-lint tools
build:
mkdir -p bin
$(GOBUILD) -v -o ./bin/xtest ./cmd
cp ./xtest.toml ./bin

clean:
rm -rf bin dist
Expand Down
1 change: 0 additions & 1 deletion cmd/xtest.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//go:build !linux
// +build !linux

package main

Expand Down
9 changes: 7 additions & 2 deletions request/fileSession.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,15 @@ func (r *Request) FileSessionCall(cli *xsfcli.Client, index int64) (info analy.E
}

select {
case r.C.AsyncDrop <- _var.OutputMeta{reqSid, outType, v.Meta.Name, v.Meta.Attribute, index, v.Data}:
case r.C.AsyncDrop <- _var.OutputMeta{Name: v.Meta.Name, Sid: reqSid, Type: outType, Desc: v.Meta.Attribute, Seq: index, Data: v.Data}:
default:
// 异步channel满, 同步写; key: sid-type-format-encoding, value: data
key := reqSid + "-" + outType + "-" + v.Meta.Name
key := reqSid + "-" + outType + "-" + v.Meta.Name + "-" + strconv.FormatInt(index, 10)
if outType == "image" {
key += ".jpg"
} else if outType == "text" {
key += ".txt"
}
r.downOutput(key, v.Data, cli.Log)
}
}
Expand Down
10 changes: 7 additions & 3 deletions request/oneShot.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,15 @@ func (r *Request) OneShotCall(cli *xsfcli.Client, index int64) (info analy.ErrIn
outType = "video"
}
select {
case r.C.AsyncDrop <- _var.OutputMeta{v.Meta.Name, sessId,
outType, v.Meta.Attribute, index, v.Data}:
case r.C.AsyncDrop <- _var.OutputMeta{Name: v.Meta.Name, Sid: sessId, Type: outType, Desc: v.Meta.Attribute, Seq: index, Data: v.Data}:
default:
// 异步channel满, 同步写; key: sid-type-name, value: data
key := sessId + "-" + outType + "-" + v.Meta.Name
key := sessId + "-" + outType + "-" + v.Meta.Name + "-" + strconv.FormatInt(index, 10)
if outType == "image" {
key += ".jpg"
} else if outType == "text" {
key += ".txt"
}
r.downOutput(key, v.Data, cli.Log)
}
}
Expand Down
9 changes: 3 additions & 6 deletions request/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package request
import (
"fmt"
"github.com/xfyun/xsf/utils"
"io/ioutil"
"os"
"path/filepath"
"strconv"
"sync"
)

// 下行数据异步落盘或打印
func (r *Request) DownStreamWrite(wg *sync.WaitGroup, log *utils.Logger) {

for {
output, alive := <-r.C.AsyncDrop
if !alive {
Expand All @@ -24,6 +23,7 @@ func (r *Request) DownStreamWrite(wg *sync.WaitGroup, log *utils.Logger) {
} else if output.Type == "text" {
key += ".txt"
}

r.downOutput(key, output.Data, log)
}
wg.Done()
Expand All @@ -38,9 +38,6 @@ func (r *Request) downOutput(key string, data []byte, log *utils.Logger) {
return
}

//tmp := []byte(key + ":")
//tmp = append(tmp, data...)
//tmp = append(tmp, byte('\n'))
wlen, err := fi.Write(data)
if err != nil || wlen != len(data) {
log.Errorw("downOutput Sync AppendFile fail", "err", err.Error(), "wlen", wlen, "key", key)
Expand All @@ -49,7 +46,7 @@ func (r *Request) downOutput(key string, data []byte, log *utils.Logger) {
}
_ = fi.Close()
case 1: // 输出至目录OutputDst
err := ioutil.WriteFile(r.C.OutputDst+"/"+key, data, os.ModePerm)
err := os.WriteFile(filepath.Join(r.C.OutputDst, key), data, os.ModePerm)
if err != nil {
log.Errorw("downOutput Sync WriteFile fail", "err", err.Error(), "key", key)
return
Expand Down
33 changes: 16 additions & 17 deletions request/session.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package request

import (
"encoding/json"
"errors"
"github.com/golang/protobuf/proto"
xsfcli "github.com/xfyun/xsf/client"
Expand All @@ -18,9 +17,6 @@ import (

func (r *Request) SessionCall(cli *xsfcli.Client, index int64) (info analy.ErrInfo) {
// 下行结果缓存

data, _ := json.Marshal(r.C.UpStreams)
println(string(data))
var indexs []int = make([]int, 0, len(r.C.UpStreams))
for _, v := range r.C.UpStreams {
streamIndex := index % int64(len(v.DataList))
Expand All @@ -45,21 +41,21 @@ func (r *Request) SessionCall(cli *xsfcli.Client, index int64) (info analy.ErrIn
}
_ = r.sessAIExcp(cli, hdl, reqSid)
// 结果落盘
tmpMerge := make(map[int] /*streamId*/ *protocol.Payload)
tmpMerge := make(map[string] /*streamId*/ *protocol.Payload)
cli.Log.Debugw("length of thrRslt", "length", len(thrRslt))
for k, _ := range thrRslt {
for k := range thrRslt {
for _, d := range thrRslt[k].Pl {
meta, exist := tmpMerge[k]
meta, exist := tmpMerge[d.Meta.Name]
if exist {
tmpMerge[k].Data = append(meta.Data, d.Data...)
meta.Data = append(meta.Data, d.Data...)
} else {
tmpMerge[k] = d
tmpMerge[d.Meta.Name] = d
}
}
}

for seq, v := range tmpMerge {
var outType string = "invalidType"
for _, v := range tmpMerge {
var outType = "invalidType"
switch v.Meta.DataType {
case protocol.MetaDesc_TEXT:
outType = "text"
Expand All @@ -72,10 +68,15 @@ func (r *Request) SessionCall(cli *xsfcli.Client, index int64) (info analy.ErrIn
}

select {
case r.C.AsyncDrop <- _var.OutputMeta{v.Meta.Name, reqSid, outType, v.Meta.Attribute, int64(seq), v.Data}:
case r.C.AsyncDrop <- _var.OutputMeta{Name: v.Meta.Name, Sid: reqSid, Type: outType, Desc: v.Meta.Attribute, Seq: index, Data: v.Data}:
default:
// 异步channel满, 同步写; key: sid-type-format-encoding, value: data
key := reqSid + "-" + outType + "-" + v.Meta.Name + strconv.FormatInt(index, 10)
key := reqSid + "-" + outType + "-" + v.Meta.Name + "-" + strconv.FormatInt(index, 10)
if outType == "image" {
key += ".jpg"
} else if outType == "text" {
key += ".txt"
}
r.downOutput(key, v.Data, cli.Log)
}
}
Expand Down Expand Up @@ -130,7 +131,7 @@ func (r *Request) sessAIIn(cli *xsfcli.Client, indexs []int, thrRslt *[]protocol
resp, ecode, err := caller.SessionCall(xsfcli.CREATE, r.C.SvcName, "AIIn", req, time.Duration(r.C.TimeOut+r.C.LossDeviation)*time.Millisecond)
if err != nil {
cli.Log.Errorw("sessAIIn Create request fail", "err", err.Error(), "code", ecode, "params", dataIn.Params)
analy.Perf.Record(reqSid, resp.Handle(), analy.DataBegin, analy.SessBegin, analy.DOWN, int(ecode), err.Error())
analy.Perf.Record(reqSid, "", analy.DataBegin, analy.SessBegin, analy.DOWN, int(ecode), err.Error())
return hdl, status, analy.ErrInfo{ErrCode: int(ecode), ErrStr: err}
}
hdl = resp.Session()
Expand Down Expand Up @@ -215,9 +216,7 @@ func (r *Request) multiUpStream(cli *xsfcli.Client, swg *sync.WaitGroup, session
if dataSendLen[streamId] >= len(r.C.UpStreams[streamId].DataList[fileId]) {
continue // 该上行数据流已发送完毕
}
//if dataSendLen[streamId] == 0 {
// upStatus = protocol.EngInputData_BEGIN
//}

if dataSendLen[streamId]+size >= len(r.C.UpStreams[streamId].DataList[fileId]) {
size = len(r.C.UpStreams[streamId].DataList[fileId]) - dataSendLen[streamId]
upStatus = protocol.EngInputData_END
Expand Down
4 changes: 2 additions & 2 deletions var/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (c *Conf) secParseSvc(conf *utils.Configure) error {
}
c.DropThr = c.MultiThr
if cnt, err := conf.GetInt64(secTmp, "loopCnt"); err == nil {
c.LoopCnt.Store(int64(cnt))
c.LoopCnt.Store(cnt)
}

if perfOn, err := conf.GetBool(secTmp, "perfOn"); err == nil {
Expand Down Expand Up @@ -539,7 +539,7 @@ func (c *Conf) secParseDStream(conf *utils.Configure) error {
return err
}
}
err = os.MkdirAll(c.OutputDst, os.ModeDir)
err = os.MkdirAll(c.OutputDst, os.ModePerm)
if err != nil {
return err
}
Expand Down

0 comments on commit fec983d

Please sign in to comment.