From fd554f56b9ae3e533f309dc7b0bd7f0dae2b0b4c Mon Sep 17 00:00:00 2001 From: David Choi Date: Mon, 18 Sep 2023 11:15:51 -0700 Subject: [PATCH] Fix panic bug for log. --- service/stream/service.go | 47 +++++++---- service/stream/service_test.go | 146 ++++++++++++++++++++++----------- 2 files changed, 129 insertions(+), 64 deletions(-) diff --git a/service/stream/service.go b/service/stream/service.go index cf0dcd1..fbee46d 100644 --- a/service/stream/service.go +++ b/service/stream/service.go @@ -2,6 +2,7 @@ package stream import ( "bytes" + "fmt" "log" "os" "reflect" @@ -52,11 +53,12 @@ type Service struct { dictProvider dictProvider outputsProvider outputsProvider - logMetric *gmetric.Operation - logCounterNoData *gmetric.Counter - logCounterNoSample *gmetric.Counter - logCounterNoEnd *gmetric.Counter - logCounterComplete *gmetric.Counter + logMetric *gmetric.Operation + logCounterNoData *gmetric.Counter + logCounterNoSample *gmetric.Counter + logCounterNoEnd *gmetric.Counter + logCounterComplete *gmetric.Counter + logCounterNoHandler *gmetric.Counter } func NewService(modelID string, streamCfg *config.Stream, afsv afs.Service, dp dictProvider, op outputsProvider, m *gmetric.Service) (*Service, error) { @@ -134,9 +136,13 @@ func (s *Service) Log(data []byte, output interface{}, timeTaken time.Duration) } outputs := s.outputsProvider() - writeObject(tmsg, hasBatchSize, output, outputs) + err := writeObject(tmsg, hasBatchSize, output, outputs) + if err != nil { + s.logCounterNoHandler.Increment() + return + } - if err := s.logger.Log(tmsg); err != nil { + if err = s.logger.Log(tmsg); err != nil { stats.Append(err) log.Printf("[%s log] failed to log: %v\n", s.modelID, err) } @@ -144,7 +150,7 @@ func (s *Service) Log(data []byte, output interface{}, timeTaken time.Duration) s.logCounterComplete.Increment() } -func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, outputs []domain.Output) { +func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, outputs []domain.Output) error { if value, ok := output.([]interface{}); ok { for outputIdx, v := range value { outputName := outputs[outputIdx].Name @@ -156,20 +162,25 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output case 1: outVec := actual[0] if hasBatchSize || len(outVec) > 1 { + // single request, multi output + // batched request of batch 1, single output tmsg.PutStrings(outputName, outVec) } else { + // single request, single output tmsg.PutString(outputName, outVec[0]) } default: lAct := len(actual[0]) multiOutDims := lAct > 1 if multiOutDims { + // batch request, multi output c := make([]io.Encoder, lAct) for i, v := range actual { c[i] = KVStrings(v) } tmsg.PutObjects(outputName, c) } else { + // batch request, single output var stringSlice = make([]string, len(actual)) for i, vec := range actual { stringSlice[i] = vec[0] @@ -200,12 +211,12 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output tmsg.PutInt(outputName, int(actual[0][0])) } default: - lAct := len(actual[0]) - multiOutDims := lAct > 1 + lOutput := len(actual[0]) + multiOutDims := lOutput > 1 if multiOutDims { - c := make([]io.Encoder, lAct) + c := make([]io.Encoder, len(actual)) for i, v := range actual { - t := make([]int, lAct) + t := make([]int, lOutput) for ii, vv := range v { t[ii] = int(vv) } @@ -242,12 +253,12 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output tmsg.PutFloat(outputName, float64(actual[0][0])) } default: - lAct := len(actual[0]) - multiOutDims := lAct > 1 + lOutput := len(actual[0]) + multiOutDims := lOutput > 1 if multiOutDims { - c := make([]io.Encoder, lAct) + c := make([]io.Encoder, len(actual)) for i, v := range actual { - t := make([]float64, lAct) + t := make([]float64, lOutput) for ii, vv := range v { t[ii] = float64(vv) } @@ -262,9 +273,13 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output tmsg.PutFloats(outputName, ints) } } + default: + return fmt.Errorf("no handler for %T", actual) } } } + + return nil } func getStreamID() string { diff --git a/service/stream/service_test.go b/service/stream/service_test.go index 4244e76..28de6ed 100644 --- a/service/stream/service_test.go +++ b/service/stream/service_test.go @@ -2,92 +2,142 @@ package stream import ( "bytes" - ejson "encoding/json" + stdjson "encoding/json" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/viant/mly/service/domain" "github.com/viant/tapper/msg" "github.com/viant/tapper/msg/json" ) -type siP struct { +// batch:single output:single payload +type sssp struct { O1 string `json:"out1"` } -type muP struct { +type ssip struct { + O1 int `json:"out1"` +} + +type ssfp struct { + O1 float64 `json:"out1"` +} + +// batch:multi, output:single or batch:single output:multi string payload +// xx = mixed +type mxsp struct { O1 []string `json:"out1"` } -type mbP struct { +type mxip struct { + O1 []int `json:"out1"` +} + +type mxfp struct { + O1 []float64 `json:"out1"` +} + +// batch:multi, output:multi string payload +type mmsp struct { O1 []struct { Ov []string `json:"output"` } `json:"out1"` } +type mmip struct { + O1 []struct { + Ov []int `json:"output"` + } `json:"out1"` +} + +type mmfp struct { + O1 []struct { + Ov []float64 `json:"output"` + } `json:"out1"` +} + func TestWriteObject(t *testing.T) { p := msg.NewProvider(2048, 32, json.New) os := []domain.Output{{Name: "out1"}} testCases := []struct { - name string - out []interface{} - verify func([]byte) + name string + out []interface{} + // expected instance + ei func() interface{} }{ { - name: "single-single-dim", - out: []interface{}{[][]string{ - []string{"a"}, - }}, - verify: func(b []byte) { - p := new(siP) - err := ejson.Unmarshal(b, &p) - assert.Nil(t, err) - }, + name: "single-single-string", + out: []interface{}{[][]string{[]string{"a"}}}, + ei: func() interface{} { return new(sssp) }, + }, + { + name: "single-multi-string", + out: []interface{}{[][]string{[]string{"a", "b"}}}, + ei: func() interface{} { return new(mxsp) }, + }, + { + name: "batch-single-string", + out: []interface{}{[][]string{[]string{"a"}, []string{"b"}}}, + ei: func() interface{} { return new(mxsp) }, + }, + { + name: "batch-multi-string", + out: []interface{}{[][]string{[]string{"a", "b"}, []string{"c", "d"}}}, + ei: func() interface{} { return new(mmsp) }, + }, + { + name: "single-single-int", + out: []interface{}{[][]int64{[]int64{1}}}, + ei: func() interface{} { return new(ssip) }, + }, + { + name: "single-multi-int", + out: []interface{}{[][]int64{[]int64{1, 2, 3}}}, + ei: func() interface{} { return new(mxip) }, + }, + { + name: "batch-single-int", + out: []interface{}{[][]int64{[]int64{1}, []int64{2}}}, + ei: func() interface{} { return new(mxip) }, + }, + { + name: "batch-multi-int", + out: []interface{}{[][]int64{[]int64{1, 2, 5}, []int64{3, 4, 6}}}, + ei: func() interface{} { return new(mmip) }, + }, + { + name: "single-single-float", + out: []interface{}{[][]float32{[]float32{1}}}, + ei: func() interface{} { return new(ssfp) }, }, { - name: "single-multi-dim", - out: []interface{}{[][]string{ - []string{"a", "b"}, - }}, - verify: func(b []byte) { - p := new(muP) - err := ejson.Unmarshal(b, &p) - assert.Nil(t, err) - }, + name: "single-multi-float", + out: []interface{}{[][]float32{[]float32{1, 2, 3}}}, + ei: func() interface{} { return new(mxfp) }, }, { - name: "batch-single-dim", - out: []interface{}{[][]string{ - []string{"a"}, - []string{"b"}, - }}, - verify: func(b []byte) { - p := new(muP) - err := ejson.Unmarshal(b, &p) - assert.Nil(t, err) - }, + name: "batch-single-float", + out: []interface{}{[][]float32{[]float32{1}, []float32{2}}}, + ei: func() interface{} { return new(mxfp) }, }, { - name: "batch-multi-dim", - out: []interface{}{[][]string{ - []string{"a", "b"}, - []string{"c", "d"}, - }}, - verify: func(b []byte) { - mbp := new(mbP) - err := ejson.Unmarshal(b, &mbp) - assert.Nil(t, err) - }, + name: "batch-multi-float", + out: []interface{}{[][]float32{[]float32{1, 2, 5}, []float32{3, 4, 6}}}, + ei: func() interface{} { return new(mmfp) }, }, } for _, tc := range testCases { m := p.NewMessage() - writeObject(m, false, tc.out, os) + err := writeObject(m, false, tc.out, os) + require.Nil(t, err) b := new(bytes.Buffer) m.WriteTo(b) - tc.verify(b.Bytes()) + ei := tc.ei() + err = stdjson.Unmarshal(b.Bytes(), ei) + require.Nil(t, err) m.Free() } }