Skip to content

Commit

Permalink
Fix inconsistent log output when in batch mode with multi-dimensional…
Browse files Browse the repository at this point in the history
… outputs when there is only 1 element in the batch.

(cherry picked from commit 185c315)
  • Loading branch information
dchoi-viant committed Dec 7, 2023
1 parent eac9407 commit a6a3adc
Showing 1 changed file with 92 additions and 102 deletions.
194 changes: 92 additions & 102 deletions service/stream/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,121 +157,111 @@ func writeObject(tmsg msg.Message, hasBatchSize bool, output interface{}, output

switch actual := v.(type) {
case [][]string:
switch len(actual) {
case 0:
case 1:
outVec := actual[0]
if hasBatchSize || len(outVec) > 1 {
// single request, multi output
// batched request of batch 1, single output
tmsg.PutStrings(outputName, outVec)
} else {
// single request, single output
tmsg.PutString(outputName, outVec[0])
lenActual := len(actual)
if lenActual == 0 {
break
}

outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize && lenOutVec > 1 {
// batched request of size 1, multi output
c := make([]io.Encoder, lenActual)
for i, v := range actual {
c[i] = KVStrings(v)
}
default:
lAct := len(actual[0])
multiOutDims := lAct > 1
if multiOutDims {
// batch request, multi output
c := make([]io.Encoder, lAct)
for i, v := range actual {
c[i] = KVStrings(v)
}
tmsg.PutObjects(outputName, c)
} else {
// batch request, single output
var stringSlice = make([]string, len(actual))
for i, vec := range actual {
stringSlice[i] = vec[0]
}

tmsg.PutStrings(outputName, stringSlice)
tmsg.PutObjects(outputName, c)
} else if hasBatchSize {
// batch request, single output
var stringSlice = make([]string, lenActual)
for i, vec := range actual {
stringSlice[i] = vec[0]
}

tmsg.PutStrings(outputName, stringSlice)
} else if lenOutVec > 1 {
// single request, multi output
tmsg.PutStrings(outputName, outVec)
} else if lenActual == 1 {
// single request, single output
tmsg.PutString(outputName, outVec[0])
} else {
return fmt.Errorf("batch logging (%d) string missing batch_size", lenActual)
}

case [][]int64:
switch len(actual) {
case 0:
case 1:
outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize || lenOutVec > 1 {
var c []int
if lenOutVec > 1 {
c = make([]int, lenOutVec)
for i, v := range outVec {
c[i] = int(v)
}
} else {
c = []int{int(outVec[0])}
}
lenActual := len(actual)
if lenActual == 0 {
break
}

tmsg.PutInts(outputName, c)
} else {
tmsg.PutInt(outputName, int(actual[0][0]))
}
default:
lOutput := len(actual[0])
multiOutDims := lOutput > 1
if multiOutDims {
c := make([]io.Encoder, len(actual))
for i, v := range actual {
t := make([]int, lOutput)
for ii, vv := range v {
t[ii] = int(vv)
}
c[i] = KVInts(t)
}
tmsg.PutObjects(outputName, c)
} else {
var ints = make([]int, len(actual))
for i, vec := range actual {
ints[i] = int(vec[0])
outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize && lenOutVec > 1 {
c := make([]io.Encoder, lenActual)
for i, v := range actual {
t := make([]int, lenOutVec)
for ii, vv := range v {
t[ii] = int(vv)
}
tmsg.PutInts(outputName, ints)
c[i] = KVInts(t)
}
tmsg.PutObjects(outputName, c)
} else if hasBatchSize {
// hasBatchSize && lenOutVec <= 1
var ints = make([]int, lenActual)
for i, vec := range actual {
ints[i] = int(vec[0])
}
tmsg.PutInts(outputName, ints)
} else if lenOutVec > 1 {
// !hasBatchSize && lenOutVec > 1
c := make([]int, lenOutVec)
for i, v := range outVec {
c[i] = int(v)
}
tmsg.PutInts(outputName, c)
} else if lenActual == 1 {
tmsg.PutInt(outputName, int(outVec[0]))
} else {
return fmt.Errorf("batch logging (%d) int64 missing batch_size", lenActual)
}
case [][]float32:
switch len(actual) {
case 0:
case 1:
outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize || lenOutVec > 1 {
var c []float64
if lenOutVec > 1 {
c = make([]float64, lenOutVec)
for i, v := range outVec {
c[i] = float64(v)
}
} else {
c = []float64{float64(outVec[0])}
}
lenActual := len(actual)
if lenActual == 0 {
break
}

tmsg.PutFloats(outputName, c)
} else {
tmsg.PutFloat(outputName, float64(actual[0][0]))
}
default:
lOutput := len(actual[0])
multiOutDims := lOutput > 1
if multiOutDims {
c := make([]io.Encoder, len(actual))
for i, v := range actual {
t := make([]float64, lOutput)
for ii, vv := range v {
t[ii] = float64(vv)
}
c[i] = KVFloat64s(t)
outVec := actual[0]
lenOutVec := len(outVec)
if hasBatchSize && lenOutVec > 1 {
c := make([]io.Encoder, lenActual)
for i, v := range actual {
t := make([]float64, lenOutVec)
for ii, vv := range v {
t[ii] = float64(vv)
}
tmsg.PutObjects(outputName, c)
} else {
var ints = make([]float64, len(actual))
for i, vec := range actual {
ints[i] = float64(vec[0])
}
tmsg.PutFloats(outputName, ints)
c[i] = KVFloat64s(t)
}
tmsg.PutObjects(outputName, c)
} else if hasBatchSize {
c := make([]float64, lenActual)
for i, vec := range actual {
c[i] = float64(vec[0])
}
tmsg.PutFloats(outputName, c)
} else if lenOutVec > 1 {
c := make([]float64, lenOutVec)
for i, v := range outVec {
c[i] = float64(v)
}

tmsg.PutFloats(outputName, c)
} else if lenActual == 1 {
tmsg.PutFloat(outputName, float64(outVec[0]))
} else {
return fmt.Errorf("batch logging (%d) float32 missing batch_size", lenActual)
}
default:
return fmt.Errorf("no handler for %T", actual)
Expand Down

0 comments on commit a6a3adc

Please sign in to comment.