Skip to content

Commit

Permalink
Fix panic bug for log.
Browse files Browse the repository at this point in the history
  • Loading branch information
dchoi-viant committed Sep 18, 2023
1 parent fbef303 commit fd554f5
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 64 deletions.
47 changes: 31 additions & 16 deletions service/stream/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stream

import (
"bytes"
"fmt"
"log"
"os"
"reflect"
Expand Down Expand Up @@ -52,11 +53,12 @@ type Service struct {
dictProvider dictProvider
outputsProvider outputsProvider

logMetric *gmetric.Operation
logCounterNoData *gmetric.Counter
logCounterNoSample *gmetric.Counter
logCounterNoEnd *gmetric.Counter
logCounterComplete *gmetric.Counter
logMetric *gmetric.Operation
logCounterNoData *gmetric.Counter
logCounterNoSample *gmetric.Counter
logCounterNoEnd *gmetric.Counter
logCounterComplete *gmetric.Counter
logCounterNoHandler *gmetric.Counter
}

func NewService(modelID string, streamCfg *config.Stream, afsv afs.Service, dp dictProvider, op outputsProvider, m *gmetric.Service) (*Service, error) {
Expand Down Expand Up @@ -134,17 +136,21 @@ func (s *Service) Log(data []byte, output interface{}, timeTaken time.Duration)
}

outputs := s.outputsProvider()
writeObject(tmsg, hasBatchSize, output, outputs)
err := writeObject(tmsg, hasBatchSize, output, outputs)
if err != nil {
s.logCounterNoHandler.Increment()
return
}

if err := s.logger.Log(tmsg); err != nil {
if err = s.logger.Log(tmsg); err != nil {
stats.Append(err)
log.Printf("[%s log] failed to log: %v\n", s.modelID, err)
}

s.logCounterComplete.Increment()
}

func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, outputs []domain.Output) {
func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, outputs []domain.Output) error {
if value, ok := output.([]interface{}); ok {
for outputIdx, v := range value {
outputName := outputs[outputIdx].Name
Expand All @@ -156,20 +162,25 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output
case 1:
outVec := actual[0]
if hasBatchSize || len(outVec) > 1 {
// single request, multi output
// batched request of batch 1, single output
tmsg.PutStrings(outputName, outVec)
} else {
// single request, single output
tmsg.PutString(outputName, outVec[0])
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
if multiOutDims {
// batch request, multi output
c := make([]io.Encoder, lAct)
for i, v := range actual {
c[i] = KVStrings(v)
}
tmsg.PutObjects(outputName, c)
} else {
// batch request, single output
var stringSlice = make([]string, len(actual))
for i, vec := range actual {
stringSlice[i] = vec[0]
Expand Down Expand Up @@ -200,12 +211,12 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output
tmsg.PutInt(outputName, int(actual[0][0]))
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
lOutput := len(actual[0])
multiOutDims := lOutput > 1
if multiOutDims {
c := make([]io.Encoder, lAct)
c := make([]io.Encoder, len(actual))
for i, v := range actual {
t := make([]int, lAct)
t := make([]int, lOutput)
for ii, vv := range v {
t[ii] = int(vv)
}
Expand Down Expand Up @@ -242,12 +253,12 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output
tmsg.PutFloat(outputName, float64(actual[0][0]))
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
lOutput := len(actual[0])
multiOutDims := lOutput > 1
if multiOutDims {
c := make([]io.Encoder, lAct)
c := make([]io.Encoder, len(actual))
for i, v := range actual {
t := make([]float64, lAct)
t := make([]float64, lOutput)
for ii, vv := range v {
t[ii] = float64(vv)
}
Expand All @@ -262,9 +273,13 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output
tmsg.PutFloats(outputName, ints)
}
}
default:
return fmt.Errorf("no handler for %T", actual)
}
}
}

return nil
}

func getStreamID() string {
Expand Down
146 changes: 98 additions & 48 deletions service/stream/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,142 @@ package stream

import (
"bytes"
ejson "encoding/json"
stdjson "encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/viant/mly/service/domain"
"github.com/viant/tapper/msg"
"github.com/viant/tapper/msg/json"
)

type siP struct {
// batch:single output:single payload
type sssp struct {
O1 string `json:"out1"`
}

type muP struct {
type ssip struct {
O1 int `json:"out1"`
}

type ssfp struct {
O1 float64 `json:"out1"`
}

// batch:multi, output:single or batch:single output:multi string payload
// xx = mixed
type mxsp struct {
O1 []string `json:"out1"`
}

type mbP struct {
type mxip struct {
O1 []int `json:"out1"`
}

type mxfp struct {
O1 []float64 `json:"out1"`
}

// batch:multi, output:multi string payload
type mmsp struct {
O1 []struct {
Ov []string `json:"output"`
} `json:"out1"`
}

type mmip struct {
O1 []struct {
Ov []int `json:"output"`
} `json:"out1"`
}

type mmfp struct {
O1 []struct {
Ov []float64 `json:"output"`
} `json:"out1"`
}

func TestWriteObject(t *testing.T) {
p := msg.NewProvider(2048, 32, json.New)
os := []domain.Output{{Name: "out1"}}

testCases := []struct {
name string
out []interface{}
verify func([]byte)
name string
out []interface{}
// expected instance
ei func() interface{}
}{
{
name: "single-single-dim",
out: []interface{}{[][]string{
[]string{"a"},
}},
verify: func(b []byte) {
p := new(siP)
err := ejson.Unmarshal(b, &p)
assert.Nil(t, err)
},
name: "single-single-string",
out: []interface{}{[][]string{[]string{"a"}}},
ei: func() interface{} { return new(sssp) },
},
{
name: "single-multi-string",
out: []interface{}{[][]string{[]string{"a", "b"}}},
ei: func() interface{} { return new(mxsp) },
},
{
name: "batch-single-string",
out: []interface{}{[][]string{[]string{"a"}, []string{"b"}}},
ei: func() interface{} { return new(mxsp) },
},
{
name: "batch-multi-string",
out: []interface{}{[][]string{[]string{"a", "b"}, []string{"c", "d"}}},
ei: func() interface{} { return new(mmsp) },
},
{
name: "single-single-int",
out: []interface{}{[][]int64{[]int64{1}}},
ei: func() interface{} { return new(ssip) },
},
{
name: "single-multi-int",
out: []interface{}{[][]int64{[]int64{1, 2, 3}}},
ei: func() interface{} { return new(mxip) },
},
{
name: "batch-single-int",
out: []interface{}{[][]int64{[]int64{1}, []int64{2}}},
ei: func() interface{} { return new(mxip) },
},
{
name: "batch-multi-int",
out: []interface{}{[][]int64{[]int64{1, 2, 5}, []int64{3, 4, 6}}},
ei: func() interface{} { return new(mmip) },
},
{
name: "single-single-float",
out: []interface{}{[][]float32{[]float32{1}}},
ei: func() interface{} { return new(ssfp) },
},
{
name: "single-multi-dim",
out: []interface{}{[][]string{
[]string{"a", "b"},
}},
verify: func(b []byte) {
p := new(muP)
err := ejson.Unmarshal(b, &p)
assert.Nil(t, err)
},
name: "single-multi-float",
out: []interface{}{[][]float32{[]float32{1, 2, 3}}},
ei: func() interface{} { return new(mxfp) },
},
{
name: "batch-single-dim",
out: []interface{}{[][]string{
[]string{"a"},
[]string{"b"},
}},
verify: func(b []byte) {
p := new(muP)
err := ejson.Unmarshal(b, &p)
assert.Nil(t, err)
},
name: "batch-single-float",
out: []interface{}{[][]float32{[]float32{1}, []float32{2}}},
ei: func() interface{} { return new(mxfp) },
},
{
name: "batch-multi-dim",
out: []interface{}{[][]string{
[]string{"a", "b"},
[]string{"c", "d"},
}},
verify: func(b []byte) {
mbp := new(mbP)
err := ejson.Unmarshal(b, &mbp)
assert.Nil(t, err)
},
name: "batch-multi-float",
out: []interface{}{[][]float32{[]float32{1, 2, 5}, []float32{3, 4, 6}}},
ei: func() interface{} { return new(mmfp) },
},
}

for _, tc := range testCases {
m := p.NewMessage()
writeObject(m, false, tc.out, os)
err := writeObject(m, false, tc.out, os)
require.Nil(t, err)
b := new(bytes.Buffer)
m.WriteTo(b)
tc.verify(b.Bytes())
ei := tc.ei()
err = stdjson.Unmarshal(b.Bytes(), ei)
require.Nil(t, err)
m.Free()
}
}

0 comments on commit fd554f5

Please sign in to comment.