Skip to content

Commit

Permalink
Add support for stream log for model outputs with final dimension > 1.
Browse files Browse the repository at this point in the history
  • Loading branch information
dchoi-viant committed Sep 15, 2023
1 parent 5e70e40 commit fbef303
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 35 deletions.
26 changes: 26 additions & 0 deletions service/stream/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package stream

import "github.com/viant/tapper/io"

const propertyKey string = "output"

type KVStrings []string

// implements github.com/viant/tapper/io.Encoder
func (s KVStrings) Encode(m io.Stream) {
m.PutStrings(propertyKey, []string(s))
}

type KVInts []int

// implements github.com/viant/tapper/io.Encoder
func (s KVInts) Encode(m io.Stream) {
m.PutInts(propertyKey, []int(s))
}

type KVFloat64s []float64

// implements github.com/viant/tapper/io.Encoder
func (s KVFloat64s) Encode(m io.Stream) {
m.PutFloats(propertyKey, []float64(s))
}
152 changes: 117 additions & 35 deletions service/stream/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/viant/mly/shared/common"
"github.com/viant/mly/shared/stat"
"github.com/viant/tapper/config"
"github.com/viant/tapper/io"
tlog "github.com/viant/tapper/log"
"github.com/viant/tapper/msg"
"github.com/viant/tapper/msg/json"
Expand All @@ -22,6 +23,25 @@ import (
type dictProvider func() *common.Dictionary
type outputsProvider func() []domain.Output

// Service is used to log request inputs to model outputs without an output
// transformer, in JSON format.
//
// The input values will be directly inlined into the resulting JSON.
// The outputs will be provided as properties in the resulting JSON, with
// the keys as the output Tensor names.
//
// If the dimensions of the output from the model are [1, numOutputs, 1] (single
// request), the value in the JSON object will be a scalar.
// If the dimensions of the output from the model are [batchSize, numOutputs, 1],
// (batch request), the value in the JSON object will be a list of scalars of
// length batchSize.
// If the dimensions of the output from the model are [1, numOutputs, outDims],
// (single request), the value of the JSON object will be a list of scalars of
// length outDims.
// If the dimensions of the output from the model are [batchSize, numOutputs, outDims],
// (batch request), the value of the JSON object will be a list of objects of length
// batchSize, where each object has a property with key "output" and value a
// list of scalars of length outDims.
type Service struct {
modelID string

Expand Down Expand Up @@ -114,40 +134,85 @@ func (s *Service) Log(data []byte, output interface{}, timeTaken time.Duration)
}

outputs := s.outputsProvider()
writeObject(tmsg, hasBatchSize, output, outputs)

if err := s.logger.Log(tmsg); err != nil {
stats.Append(err)
log.Printf("[%s log] failed to log: %v\n", s.modelID, err)
}

s.logCounterComplete.Increment()
}

func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, outputs []domain.Output) {
if value, ok := output.([]interface{}); ok {
for outputIdx, v := range value {
outputName := outputs[outputIdx].Name

switch actual := v.(type) {
case [][]string:
if len(actual) > 0 {
switch len(actual) {
case 0:
case 1:
if hasBatchSize {
tmsg.PutStrings(outputName, []string{actual[0][0]})
} else {
tmsg.PutString(outputName, actual[0][0])
switch len(actual) {
case 0:
case 1:
outVec := actual[0]
if hasBatchSize || len(outVec) > 1 {
tmsg.PutStrings(outputName, outVec)
} else {
tmsg.PutString(outputName, outVec[0])
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
if multiOutDims {
c := make([]io.Encoder, lAct)
for i, v := range actual {
c[i] = KVStrings(v)
}
default:
tmsg.PutObjects(outputName, c)
} else {
var stringSlice = make([]string, len(actual))
for i, vec := range actual {
stringSlice[i] = vec[0]
}

tmsg.PutStrings(outputName, stringSlice)
}
}
case [][]int64:
if len(actual) > 0 {
switch len(actual) {
case 0:
case 1:
if hasBatchSize {
tmsg.PutInts(outputName, []int{int(actual[0][0])})
switch len(actual) {
case 0:
case 1:
outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize || lenOutVec > 1 {
var c []int
if lenOutVec > 1 {
c = make([]int, lenOutVec)
for i, v := range outVec {
c[i] = int(v)
}
} else {
tmsg.PutInt(outputName, int(actual[0][0]))
c = []int{int(outVec[0])}
}
default:

tmsg.PutInts(outputName, c)
} else {
tmsg.PutInt(outputName, int(actual[0][0]))
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
if multiOutDims {
c := make([]io.Encoder, lAct)
for i, v := range actual {
t := make([]int, lAct)
for ii, vv := range v {
t[ii] = int(vv)
}
c[i] = KVInts(t)
}
tmsg.PutObjects(outputName, c)
} else {
var ints = make([]int, len(actual))
for i, vec := range actual {
ints[i] = int(vec[0])
Expand All @@ -156,33 +221,50 @@ func (s *Service) Log(data []byte, output interface{}, timeTaken time.Duration)
}
}
case [][]float32:
if len(actual) > 0 {
switch len(actual) {
case 0:
case 1:
if hasBatchSize {
tmsg.PutFloats(outputName, []float64{float64(actual[0][0])})
switch len(actual) {
case 0:
case 1:
outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize || lenOutVec > 1 {
var c []float64
if lenOutVec > 1 {
c = make([]float64, lenOutVec)
for i, v := range outVec {
c[i] = float64(v)
}
} else {
tmsg.PutFloat(outputName, float64(actual[0][0]))
c = []float64{float64(outVec[0])}
}

tmsg.PutFloats(outputName, c)
} else {
tmsg.PutFloat(outputName, float64(actual[0][0]))
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
if multiOutDims {
c := make([]io.Encoder, lAct)
for i, v := range actual {
t := make([]float64, lAct)
for ii, vv := range v {
t[ii] = float64(vv)
}
c[i] = KVFloat64s(t)
}
default:
var floats = make([]float64, len(actual))
tmsg.PutObjects(outputName, c)
} else {
var ints = make([]float64, len(actual))
for i, vec := range actual {
floats[i] = float64(vec[0])
ints[i] = float64(vec[0])
}
tmsg.PutFloats(outputName, floats)
tmsg.PutFloats(outputName, ints)
}
}
}
}
}

if err := s.logger.Log(tmsg); err != nil {
stats.Append(err)
log.Printf("[%s log] failed to log: %v\n", s.modelID, err)
}

s.logCounterComplete.Increment()
}

func getStreamID() string {
Expand Down
93 changes: 93 additions & 0 deletions service/stream/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package stream

import (
"bytes"
ejson "encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/viant/mly/service/domain"
"github.com/viant/tapper/msg"
"github.com/viant/tapper/msg/json"
)

type siP struct {
O1 string `json:"out1"`
}

type muP struct {
O1 []string `json:"out1"`
}

type mbP struct {
O1 []struct {
Ov []string `json:"output"`
} `json:"out1"`
}

func TestWriteObject(t *testing.T) {
p := msg.NewProvider(2048, 32, json.New)
os := []domain.Output{{Name: "out1"}}

testCases := []struct {
name string
out []interface{}
verify func([]byte)
}{
{
name: "single-single-dim",
out: []interface{}{[][]string{
[]string{"a"},
}},
verify: func(b []byte) {
p := new(siP)
err := ejson.Unmarshal(b, &p)
assert.Nil(t, err)
},
},
{
name: "single-multi-dim",
out: []interface{}{[][]string{
[]string{"a", "b"},
}},
verify: func(b []byte) {
p := new(muP)
err := ejson.Unmarshal(b, &p)
assert.Nil(t, err)
},
},
{
name: "batch-single-dim",
out: []interface{}{[][]string{
[]string{"a"},
[]string{"b"},
}},
verify: func(b []byte) {
p := new(muP)
err := ejson.Unmarshal(b, &p)
assert.Nil(t, err)
},
},
{
name: "batch-multi-dim",
out: []interface{}{[][]string{
[]string{"a", "b"},
[]string{"c", "d"},
}},
verify: func(b []byte) {
mbp := new(mbP)
err := ejson.Unmarshal(b, &mbp)
assert.Nil(t, err)
},
},
}

for _, tc := range testCases {
m := p.NewMessage()
writeObject(m, false, tc.out, os)
b := new(bytes.Buffer)
m.WriteTo(b)
tc.verify(b.Bytes())
m.Free()
}
}

0 comments on commit fbef303

Please sign in to comment.