diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index e0249037fb120..80b8f60a9f0c3 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -167,6 +167,10 @@ func createBinlogBuf(t *testing.T, field *schemapb.FieldSchema, data storage.Fie vectors := data.(*storage.SparseFloatVectorFieldData) err = evt.AddSparseFloatVectorToPayload(vectors) assert.NoError(t, err) + case schemapb.DataType_Int8Vector: + vectors := data.(*storage.Int8VectorFieldData).Data + err = evt.AddInt8VectorToPayload(vectors, int(dim)) + assert.NoError(t, err) default: assert.True(t, false) return nil @@ -420,6 +424,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_SparseFloatVector suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.vecDataType = schemapb.DataType_Int8Vector + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/csv/reader_test.go b/internal/util/importutilv2/csv/reader_test.go index 5ae983166110c..c00c1a2511e54 100644 --- a/internal/util/importutilv2/csv/reader_test.go +++ b/internal/util/importutilv2/csv/reader_test.go @@ -229,6 +229,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_SparseFloatVector suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.vecDataType = schemapb.DataType_Int8Vector + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index 04ce55454cb74..bee68e3eb3af8 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -337,6 +337,19 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e return nil, err } return vec2, nil + case schemapb.DataType_Int8Vector: + if nullable && obj == r.nullkey { + return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } + var vec []int8 + err := json.Unmarshal([]byte(obj), &vec) + if err != nil { + return nil, r.wrapTypeError(obj, field) + } + if len(vec) != r.name2Dim[field.GetName()] { + return nil, r.wrapDimError(len(vec), field) + } + return vec, nil case schemapb.DataType_Array: if nullable && obj == r.nullkey { return nil, nil diff --git a/internal/util/importutilv2/json/reader_test.go b/internal/util/importutilv2/json/reader_test.go index eebf518cecc87..5569d77fdb33a 100644 --- a/internal/util/importutilv2/json/reader_test.go +++ b/internal/util/importutilv2/json/reader_test.go @@ -309,6 +309,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) suite.vecDataType = schemapb.DataType_SparseFloatVector suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) + suite.vecDataType = schemapb.DataType_Int8Vector + suite.run(schemapb.DataType_Int32, schemapb.DataType_None, false) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index 326a5270abb55..c61f685e1ecc5 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -377,6 +377,27 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { return nil, err } return vec, nil + case schemapb.DataType_Int8Vector: + arr, ok := obj.([]interface{}) + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } + if len(arr) != r.id2Dim[fieldID] { + return nil, r.wrapDimError(len(arr), fieldID) + } + vec := make([]int8, len(arr)) + for i := 0; i < len(arr); i++ { + value, ok := arr[i].(json.Number) + if !ok { + return nil, r.wrapTypeError(arr[i], fieldID) + } + num, err := strconv.ParseInt(value.String(), 10, 8) + if err != nil { + return nil, err + } + vec[i] = int8(num) + } + return vec, nil case schemapb.DataType_String, schemapb.DataType_VarChar: value, ok := obj.(string) if !ok { @@ -521,7 +542,7 @@ func (r *rowParser) parseNullableEntity(fieldID int64, obj any) (any, error) { return nil, err } return num, nil - case schemapb.DataType_BinaryVector, schemapb.DataType_FloatVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: + case schemapb.DataType_BinaryVector, schemapb.DataType_FloatVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector, schemapb.DataType_Int8Vector: return nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") case schemapb.DataType_String, schemapb.DataType_VarChar: if obj == nil { diff --git a/internal/util/importutilv2/numpy/field_reader.go b/internal/util/importutilv2/numpy/field_reader.go index eb1fe468a8380..369654d100633 100644 --- a/internal/util/importutilv2/numpy/field_reader.go +++ b/internal/util/importutilv2/numpy/field_reader.go @@ -104,6 +104,8 @@ func (c *FieldReader) getCount(count int64) int64 { count *= c.dim case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: count *= c.dim * 2 + case schemapb.DataType_Int8Vector: + count *= c.dim } if int(count) > (total - c.readPosition) { return int64(total - c.readPosition) @@ -203,6 +205,12 @@ func (c *FieldReader) Next(count int64) (any, error) { return nil, err } c.readPosition += int(readCount) + case schemapb.DataType_Int8Vector: + data, err = ReadN[int8](c.reader, c.order, readCount) + if err != nil { + return nil, err + } + c.readPosition += int(readCount) case schemapb.DataType_FloatVector: var elementType schemapb.DataType elementType, err = convertNumpyType(c.npyReader.Header.Descr.Type) diff --git a/internal/util/importutilv2/numpy/reader_test.go b/internal/util/importutilv2/numpy/reader_test.go index 45c272ccb4d32..759b99892f6b3 100644 --- a/internal/util/importutilv2/numpy/reader_test.go +++ b/internal/util/importutilv2/numpy/reader_test.go @@ -190,6 +190,14 @@ func (suite *ReaderSuite) run(dt schemapb.DataType) { copy(chunkedRows[i][:], innerSlice) } data = chunkedRows + case schemapb.DataType_Int8Vector: + rows := fieldData.GetDataRows().([]int8) + chunked := lo.Chunk(rows, dim) + chunkedRows := make([][dim]int8, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice) + } + data = chunkedRows default: data = fieldData.GetDataRows() } @@ -324,6 +332,14 @@ func (suite *ReaderSuite) failRun(dt schemapb.DataType, isDynamic bool) { copy(chunkedRows[i][:], innerSlice) } data = chunkedRows + case schemapb.DataType_Int8Vector: + rows := fieldData.GetDataRows().([]int8) + chunked := lo.Chunk(rows, dim) + chunkedRows := make([][dim]int8, len(chunked)) + for i, innerSlice := range chunked { + copy(chunkedRows[i][:], innerSlice) + } + data = chunkedRows default: data = fieldData.GetDataRows() } @@ -432,6 +448,8 @@ func (suite *ReaderSuite) TestVector() { suite.run(schemapb.DataType_Int32) // suite.vecDataType = schemapb.DataType_SparseFloatVector // suite.run(schemapb.DataType_Int32) + suite.vecDataType = schemapb.DataType_Int8Vector + suite.run(schemapb.DataType_Int32) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go index e55392f0b4590..e9d9d8d7c0aa8 100644 --- a/internal/util/importutilv2/numpy/util.go +++ b/internal/util/importutilv2/numpy/util.go @@ -223,6 +223,16 @@ func validateHeader(npyReader *npy.Reader, field *schemapb.FieldSchema, dim int) if shape[1] != dim/8 { return wrapDimError(shape[1]*8, dim, field) } + case schemapb.DataType_Int8Vector: + if elementType != schemapb.DataType_Int8 { + return wrapElementTypeError(elementType, field) + } + if len(shape) != 2 { + return wrapShapeError(len(shape), 2, field) + } + if shape[1] != dim { + return wrapDimError(shape[1], dim, field) + } case schemapb.DataType_VarChar, schemapb.DataType_JSON: if len(shape) != 1 { return wrapShapeError(len(shape), 1, field) diff --git a/internal/util/importutilv2/parquet/field_reader.go b/internal/util/importutilv2/parquet/field_reader.go index ca017e304ae0b..99c0c2f7545ff 100644 --- a/internal/util/importutilv2/parquet/field_reader.go +++ b/internal/util/importutilv2/parquet/field_reader.go @@ -176,6 +176,19 @@ func (c *FieldReader) Next(count int64) (any, any, error) { } data, err := ReadSparseFloatVectorData(c, count) return data, nil, err + case schemapb.DataType_Int8Vector: + if c.field.GetNullable() { + return nil, nil, merr.WrapErrParameterInvalidMsg("not support nullable in vector") + } + arrayData, err := ReadIntegerOrFloatArrayData[int8](c, count) + if err != nil { + return nil, nil, err + } + if arrayData == nil { + return nil, nil, nil + } + vectors := lo.Flatten(arrayData.([][]int8)) + return vectors, nil, nil case schemapb.DataType_Array: // array has not support default_value if c.field.GetNullable() { @@ -708,6 +721,8 @@ func checkVectorAligned(offsets []int32, dim int, dataType schemapb.DataType) er case schemapb.DataType_SparseFloatVector: // JSON format, skip alignment check return nil + case schemapb.DataType_Int8Vector: + return checkVectorAlignWithDim(offsets, int32(dim)) default: return fmt.Errorf("unexpected vector data type %s", dataType.String()) } diff --git a/internal/util/importutilv2/parquet/reader_test.go b/internal/util/importutilv2/parquet/reader_test.go index 30ec59d19efd3..46b98923c9be0 100644 --- a/internal/util/importutilv2/parquet/reader_test.go +++ b/internal/util/importutilv2/parquet/reader_test.go @@ -495,6 +495,8 @@ func (s *ReaderSuite) TestVector() { s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) s.vecDataType = schemapb.DataType_SparseFloatVector s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) + s.vecDataType = schemapb.DataType_Int8Vector + s.run(schemapb.DataType_Int32, schemapb.DataType_None, false, 0) } func TestUtil(t *testing.T) { diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index f4b252abf4cd2..18dd81a24a97d 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -202,6 +202,13 @@ func convertToArrowDataType(field *schemapb.FieldSchema, isArray bool) (arrow.Da }), nil case schemapb.DataType_SparseFloatVector: return &arrow.StringType{}, nil + case schemapb.DataType_Int8Vector: + return arrow.ListOfField(arrow.Field{ + Name: "item", + Type: &arrow.Int8Type{}, + Nullable: true, + Metadata: arrow.Metadata{}, + }), nil default: return nil, merr.WrapErrParameterInvalidMsg("unsupported data type %v", dataType.String()) } diff --git a/tests/restful_client_v2/testcases/test_collection_operations.py b/tests/restful_client_v2/testcases/test_collection_operations.py index 4b28f5cfcdab2..301a36fafd6aa 100644 --- a/tests/restful_client_v2/testcases/test_collection_operations.py +++ b/tests/restful_client_v2/testcases/test_collection_operations.py @@ -1409,6 +1409,7 @@ def test_create_collections_with_invalid_api_key(self): @pytest.mark.L0 +@pytest.mark.skip("skip temporarily, need fix") class TestCollectionProperties(TestBase): """Test collection property operations"""