feat: Support Int8Vector in go (#38990)

Issue: #38666

Signed-off-by: Cai Yudong <yudong.cai@zilliz.com>
pull/39260/head
Cai Yudong 2025-01-14 20:43:06 +08:00 committed by GitHub
parent d89768f9e0
commit 5bf1b2b929
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
50 changed files with 1331 additions and 150 deletions

View File

@ -136,6 +136,8 @@ func (t FieldType) PbFieldType() (string, string) {
return "[]byte", ""
case FieldTypeBFloat16Vector:
return "[]byte", ""
case FieldTypeInt8Vector:
return "[]int8", ""
default:
return "undefined", ""
}
@ -177,6 +179,8 @@ const (
FieldTypeBFloat16Vector FieldType = 103
// FieldTypeBinaryVector field type sparse vector
FieldTypeSparseVector FieldType = 104
// FieldTypeInt8Vector field type int8 vector
FieldTypeInt8Vector FieldType = 105
)
// Field represent field schema in milvus

2
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0

4
go.sum
View File

@ -630,8 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=

View File

@ -58,6 +58,7 @@ using distance_t = float;
using float16 = knowhere::fp16;
using bfloat16 = knowhere::bf16;
using bin1 = knowhere::bin1;
using int8 = knowhere::int8;
// See also: https://github.com/milvus-io/milvus-proto/blob/master/proto/schema.proto
enum class DataType {
@ -85,6 +86,7 @@ enum class DataType {
VECTOR_FLOAT16 = 102,
VECTOR_BFLOAT16 = 103,
VECTOR_SPARSE_FLOAT = 104,
VECTOR_INT8 = 105,
};
using Timestamp = uint64_t; // TODO: use TiKV-like timestamp
@ -322,6 +324,11 @@ IsSparseFloatVectorDataType(DataType data_type) {
return data_type == DataType::VECTOR_SPARSE_FLOAT;
}
inline bool
IsInt8VectorDataType(DataType data_type) {
return data_type == DataType::VECTOR_INT8;
}
inline bool
IsFloatVectorDataType(DataType data_type) {
return IsDenseFloatVectorDataType(data_type) ||
@ -331,7 +338,7 @@ IsFloatVectorDataType(DataType data_type) {
inline bool
IsVectorDataType(DataType data_type) {
return IsBinaryVectorDataType(data_type) ||
IsFloatVectorDataType(data_type);
IsFloatVectorDataType(data_type) || IsInt8VectorDataType(data_type);
}
inline bool
@ -418,7 +425,17 @@ IsFloatVectorMetricType(const MetricType& metric_type) {
inline bool
IsBinaryVectorMetricType(const MetricType& metric_type) {
return !IsFloatVectorMetricType(metric_type);
return metric_type == knowhere::metric::HAMMING ||
metric_type == knowhere::metric::JACCARD ||
metric_type == knowhere::metric::SUPERSTRUCTURE ||
metric_type == knowhere::metric::SUBSTRUCTURE;
}
inline bool
IsIntVectorMetricType(const MetricType& metric_type) {
return metric_type == knowhere::metric::L2 ||
metric_type == knowhere::metric::IP ||
metric_type == knowhere::metric::COSINE;
}
// Plus 1 because we can't use greater(>) symbol

View File

@ -443,6 +443,10 @@ IndexFactory::CreateVectorIndex(
return std::make_unique<VectorMemIndex<bfloat16>>(
index_type, metric_type, version, file_manager_context);
}
case DataType::VECTOR_INT8: {
return std::make_unique<VectorMemIndex<int8>>(
index_type, metric_type, version, file_manager_context);
}
default:
PanicInfo(
DataTypeInvalid,

View File

@ -63,6 +63,7 @@ BIN_List() {
return ret;
}
// TODO caiyd: should list supported list
std::vector<std::tuple<IndexType, MetricType>>
unsupported_index_combinations() {
static std::vector<std::tuple<IndexType, MetricType>> ret{

View File

@ -105,11 +105,13 @@ CheckMetricTypeSupport(const MetricType& metric_type) {
if constexpr (std::is_same_v<T, bin1>) {
AssertInfo(
IsBinaryVectorMetricType(metric_type),
"binary vector does not float vector metric type: " + metric_type);
"binary vector does not support metric type: " + metric_type);
} else if constexpr (std::is_same_v<T, int8>) {
AssertInfo(IsIntVectorMetricType(metric_type),
"int vector does not support metric type: " + metric_type);
} else {
AssertInfo(
IsFloatVectorMetricType(metric_type),
"float vector does not binary vector metric type: " + metric_type);
AssertInfo(IsFloatVectorMetricType(metric_type),
"float vector does not support metric type: " + metric_type);
}
}

View File

@ -632,5 +632,6 @@ template class VectorMemIndex<float>;
template class VectorMemIndex<bin1>;
template class VectorMemIndex<float16>;
template class VectorMemIndex<bfloat16>;
template class VectorMemIndex<int8>;
} // namespace milvus::index

View File

@ -67,6 +67,7 @@ class IndexFactory {
case DataType::VECTOR_BFLOAT16:
case DataType::VECTOR_BINARY:
case DataType::VECTOR_SPARSE_FLOAT:
case DataType::VECTOR_INT8:
return std::make_unique<VecIndexCreator>(type, config, context);
default:
PanicInfo(DataTypeInvalid,

View File

@ -450,6 +450,29 @@ BuildSparseFloatVecIndex(CIndex index,
return status;
}
CStatus
BuildInt8VecIndex(CIndex index, int64_t int8_value_num, const int8_t* vectors) {
auto status = CStatus();
try {
AssertInfo(index,
"failed to build int8 vector index, passed index was null");
auto real_index =
reinterpret_cast<milvus::indexbuilder::IndexCreatorBase*>(index);
auto cIndex =
dynamic_cast<milvus::indexbuilder::VecIndexCreator*>(real_index);
auto dim = cIndex->dim();
auto row_nums = int8_value_num / dim;
auto ds = knowhere::GenDataSet(row_nums, dim, vectors);
cIndex->Build(ds);
status.error_code = Success;
status.error_msg = "";
} catch (std::exception& e) {
status.error_code = UnexpectedError;
status.error_msg = strdup(e.what());
}
return status;
}
// field_data:
// 1, serialized proto::schema::BoolArray, if type is bool;
// 2, serialized proto::schema::StringArray, if type is string;

View File

@ -59,6 +59,9 @@ BuildSparseFloatVecIndex(CIndex index,
int64_t dim,
const uint8_t* vectors);
CStatus
BuildInt8VecIndex(CIndex index, int64_t data_size, const int8_t* vectors);
// field_data:
// 1, serialized proto::schema::BoolArray, if type is bool;
// 2, serialized proto::schema::StringArray, if type is string;

View File

@ -73,6 +73,12 @@ ValidateIndexParams(const char* index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else if (dataType == milvus::DataType::VECTOR_INT8) {
status = knowhere::IndexStaticFaced<knowhere::int8>::ConfigCheck(
index_type,
knowhere::Version::GetCurrentVersion().VersionNumber(),
json,
error_msg);
} else {
status = knowhere::Status::invalid_args;
}

View File

@ -261,6 +261,11 @@ func (cit *createIndexTask) parseIndexParams(ctx context.Context) error {
for k, v := range Params.AutoIndexConfig.BinaryIndexParams.GetAsJSONMap() {
indexParamsMap[k] = v
}
} else if typeutil.IsIntVectorType(cit.fieldSchema.DataType) {
// override int vector index params by autoindex
for k, v := range Params.AutoIndexConfig.IndexParams.GetAsJSONMap() {
indexParamsMap[k] = v
}
}
if metricTypeExist {
@ -320,6 +325,9 @@ func (cit *createIndexTask) parseIndexParams(ctx context.Context) error {
} else if typeutil.IsBinaryVectorType(cit.fieldSchema.DataType) {
// override binary vector index params by autoindex
config = Params.AutoIndexConfig.BinaryIndexParams.GetAsJSONMap()
} else if typeutil.IsIntVectorType(cit.fieldSchema.DataType) {
// override int vector index params by autoindex
config = Params.AutoIndexConfig.IndexParams.GetAsJSONMap()
}
if !exist {
if err := handle(0, config); err != nil {
@ -364,10 +372,9 @@ func (cit *createIndexTask) parseIndexParams(ctx context.Context) error {
return merr.WrapErrParameterInvalid("valid index params", "invalid index params", "float vector index does not support metric type: "+metricType)
}
} else if typeutil.IsSparseFloatVectorType(cit.fieldSchema.DataType) {
if metricType != metric.IP && metricType != metric.BM25 {
if !funcutil.SliceContain(indexparamcheck.SparseFloatVectorMetrics, metricType) {
return merr.WrapErrParameterInvalid("valid index params", "invalid index params", "only IP&BM25 is the supported metric type for sparse index")
}
if metricType == metric.BM25 && cit.functionSchema.GetType() != schemapb.FunctionType_BM25 {
return merr.WrapErrParameterInvalid("valid index params", "invalid index params", "only BM25 Function output field support BM25 metric type")
}
@ -375,6 +382,10 @@ func (cit *createIndexTask) parseIndexParams(ctx context.Context) error {
if !funcutil.SliceContain(indexparamcheck.BinaryVectorMetrics, metricType) {
return merr.WrapErrParameterInvalid("valid index params", "invalid index params", "binary vector index does not support metric type: "+metricType)
}
} else if typeutil.IsIntVectorType(cit.fieldSchema.DataType) {
if !funcutil.SliceContain(indexparamcheck.IntVectorMetrics, metricType) {
return merr.WrapErrParameterInvalid("valid index params", "invalid index params", "int vector index does not support metric type: "+metricType)
}
}
}

View File

@ -88,6 +88,10 @@ func (v *validateUtil) Validate(data []*schemapb.FieldData, helper *typeutil.Sch
if err := v.checkSparseFloatFieldData(field, fieldSchema); err != nil {
return err
}
case schemapb.DataType_Int8Vector:
if err := v.checkInt8VectorFieldData(field, fieldSchema); err != nil {
return err
}
case schemapb.DataType_VarChar:
if err := v.checkVarCharFieldData(field, fieldSchema); err != nil {
return err
@ -246,6 +250,29 @@ func (v *validateUtil) checkAligned(data []*schemapb.FieldData, schema *typeutil
return errNumRowsMismatch(field.GetFieldName(), n)
}
case schemapb.DataType_Int8Vector:
f, err := schema.GetFieldFromName(field.GetFieldName())
if err != nil {
return err
}
dim, err := typeutil.GetDim(f)
if err != nil {
return err
}
n, err := funcutil.GetNumRowsOfInt8VectorField(field.GetVectors().GetInt8Vector(), dim)
if err != nil {
return err
}
dataDim := field.GetVectors().Dim
if dataDim != dim {
return errDimMismatch(field.GetFieldName(), dataDim, dim)
}
if n != numRows {
return errNumRowsMismatch(field.GetFieldName(), n)
}
default:
// error won't happen here.
n, err := funcutil.GetNumRowOfFieldDataWithSchema(field, schema)
@ -609,6 +636,15 @@ func (v *validateUtil) checkSparseFloatFieldData(field *schemapb.FieldData, fiel
return typeutil.ValidateSparseFloatRows(sparseRows...)
}
func (v *validateUtil) checkInt8VectorFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
int8VecArray := field.GetVectors().GetInt8Vector()
if int8VecArray == nil {
msg := fmt.Sprintf("int8 vector field '%v' is illegal, nil Vector_Int8 type", field.GetFieldName())
return merr.WrapErrParameterInvalid("need vector_int8 array", "got nil", msg)
}
return nil
}
func (v *validateUtil) checkVarCharFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error {
strArr := field.GetScalars().GetStringData().GetData()
if strArr == nil && fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {

View File

@ -542,7 +542,7 @@ func Test_validateUtil_checkAligned(t *testing.T) {
Fields: []*schemapb.FieldSchema{
{
Name: "test",
DataType: schemapb.DataType_Float16Vector,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
@ -1147,6 +1147,172 @@ func Test_validateUtil_checkAligned(t *testing.T) {
assert.Error(t, err)
})
//////////////////////////////////////////////////////////////////
t.Run("int8 vector column not found", func(t *testing.T) {
data := []*schemapb.FieldData{
{
FieldName: "test",
Type: schemapb.DataType_Int8Vector,
},
}
schema := &schemapb.CollectionSchema{}
h, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)
v := newValidateUtil()
err = v.checkAligned(data, h, 100)
assert.Error(t, err)
})
t.Run("int8 vector column dimension not found", func(t *testing.T) {
data := []*schemapb.FieldData{
{
FieldName: "test",
Type: schemapb.DataType_Int8Vector,
},
}
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
Name: "test",
DataType: schemapb.DataType_Int8Vector,
},
},
}
h, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)
v := newValidateUtil()
err = v.checkAligned(data, h, 100)
assert.Error(t, err)
})
t.Run("field_data dim not match schema dim", func(t *testing.T) {
data := []*schemapb.FieldData{
{
FieldName: "test",
Type: schemapb.DataType_Int8Vector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: []byte{1, 2, 3, 4, 5, 6, 7, 8},
},
Dim: 16,
},
},
},
}
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
Name: "test",
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
},
}
h, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)
v := newValidateUtil()
err = v.checkAligned(data, h, 1)
assert.Error(t, err)
})
t.Run("invalid num rows", func(t *testing.T) {
data := []*schemapb.FieldData{
{
FieldName: "test",
Type: schemapb.DataType_Int8Vector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: []byte{1, 2},
},
Dim: 8,
},
},
},
}
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
Name: "test",
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
},
}
h, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)
v := newValidateUtil()
err = v.checkAligned(data, h, 100)
assert.Error(t, err)
})
t.Run("num rows mismatch", func(t *testing.T) {
data := []*schemapb.FieldData{
{
FieldName: "test",
Type: schemapb.DataType_Int8Vector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: []byte{1, 2, 3, 4, 5, 6, 7, 8},
},
Dim: 8,
},
},
},
}
schema := &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
Name: "test",
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
},
}
h, err := typeutil.CreateSchemaHelper(schema)
assert.NoError(t, err)
v := newValidateUtil()
err = v.checkAligned(data, h, 100)
assert.Error(t, err)
})
//////////////////////////////////////////////////////////////////
t.Run("column not found", func(t *testing.T) {
@ -1500,6 +1666,17 @@ func Test_validateUtil_Validate(t *testing.T) {
},
},
},
{
FieldName: "test6",
Type: schemapb.DataType_Int8Vector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: typeutil.Int8ArrayToBytes(testutils.GenerateInt8Vectors(2, 8)),
},
},
},
},
}
schema := &schemapb.CollectionSchema{
@ -1559,6 +1736,17 @@ func Test_validateUtil_Validate(t *testing.T) {
},
},
},
{
Name: "test6",
FieldID: 106,
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
},
}
@ -1632,6 +1820,17 @@ func Test_validateUtil_Validate(t *testing.T) {
},
},
},
{
FieldName: "test6",
Type: schemapb.DataType_Int8Vector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: typeutil.Int8ArrayToBytes(testutils.GenerateInt8Vectors(2, 8)),
},
},
},
},
}
schema := &schemapb.CollectionSchema{
@ -1691,6 +1890,17 @@ func Test_validateUtil_Validate(t *testing.T) {
},
},
},
{
Name: "test6",
FieldID: 106,
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "8",
},
},
},
},
}

View File

@ -387,6 +387,10 @@ func AddFieldDataToPayload(eventWriter *insertEventWriter, dataType schemapb.Dat
if err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)); err != nil {
return err
}
case schemapb.DataType_Int8Vector:
if err = eventWriter.AddInt8VectorToPayload(singleData.(*Int8VectorFieldData).Data, singleData.(*Int8VectorFieldData).Dim); err != nil {
return err
}
default:
return fmt.Errorf("undefined data type %d", dataType)
}
@ -675,6 +679,22 @@ func AddInsertData(dataType schemapb.DataType, data interface{}, insertData *Ins
insertData.Data[fieldID] = vec
return singleData.RowNum(), nil
case schemapb.DataType_Int8Vector:
singleData := data.([]int8)
if fieldData == nil {
fieldData = &Int8VectorFieldData{Data: make([]int8, 0, rowNum*dim)}
}
int8VectorFieldData := fieldData.(*Int8VectorFieldData)
int8VectorFieldData.Data = append(int8VectorFieldData.Data, singleData...)
length, err := eventReader.GetPayloadLengthFromReader()
if err != nil {
return 0, err
}
int8VectorFieldData.Dim = dim
insertData.Data[fieldID] = int8VectorFieldData
return length, nil
default:
return 0, fmt.Errorf("undefined data type %d", dataType)
}

View File

@ -54,6 +54,7 @@ const (
Float16VectorField = 112
BFloat16VectorField = 113
SparseFloatVectorField = 114
Int8VectorField = 115
)
func genTestCollectionMeta() *etcdpb.CollectionMeta {
@ -196,6 +197,18 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta {
DataType: schemapb.DataType_SparseFloatVector,
TypeParams: []*commonpb.KeyValuePair{},
},
{
FieldID: Int8VectorField,
Name: "field_int8_vector",
Description: "int8_vector",
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "4",
},
},
},
},
},
}
@ -212,6 +225,7 @@ func TestInsertCodecFailed(t *testing.T) {
{"nullable BinaryVector field", schemapb.DataType_BinaryVector},
{"nullable BFloat16Vector field", schemapb.DataType_BFloat16Vector},
{"nullable SparseFloatVector field", schemapb.DataType_SparseFloatVector},
{"nullable Int8Vector field", schemapb.DataType_Int8Vector},
}
for _, test := range tests {
@ -341,6 +355,10 @@ func TestInsertCodec(t *testing.T) {
},
},
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{-4, -5, -6, -7, -4, -5, -6, -7},
Dim: 4,
},
},
}
@ -404,6 +422,10 @@ func TestInsertCodec(t *testing.T) {
},
},
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
},
ArrayField: &ArrayFieldData{
ElementType: schemapb.DataType_Int32,
Data: []*schemapb.ScalarField{
@ -450,8 +472,9 @@ func TestInsertCodec(t *testing.T) {
Contents: [][]byte{},
},
},
ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}, nil, false},
JSONField: &JSONFieldData{[][]byte{}, nil, false},
Int8VectorField: &Int8VectorFieldData{[]int8{}, 4},
ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}, nil, false},
JSONField: &JSONFieldData{[][]byte{}, nil, false},
},
}
b, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty)
@ -517,6 +540,7 @@ func TestInsertCodec(t *testing.T) {
typeutil.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}),
},
}, &resultData.Data[SparseFloatVectorField].(*SparseFloatVectorFieldData).SparseFloatArray)
assert.Equal(t, []int8{0, 1, 2, 3, 0, 1, 2, 3, -4, -5, -6, -7, -4, -5, -6, -7}, resultData.Data[Int8VectorField].(*Int8VectorFieldData).Data)
int32ArrayList := [][]int32{{1, 2, 3}, {4, 5, 6}, {3, 2, 1}, {6, 5, 4}}
resultArrayList := [][]int32{}
@ -817,6 +841,18 @@ func TestMemorySize(t *testing.T) {
Data: []float32{4, 5, 6, 7},
Dim: 4,
},
Float16VectorField: &Float16VectorFieldData{
Data: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
Dim: 4,
},
BFloat16VectorField: &BFloat16VectorFieldData{
Data: []byte{0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7},
Dim: 4,
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{4, 5, 6, 7},
Dim: 4,
},
ArrayField: &ArrayFieldData{
ElementType: schemapb.DataType_Int32,
Data: []*schemapb.ScalarField{
@ -845,7 +881,10 @@ func TestMemorySize(t *testing.T) {
assert.Equal(t, insertData1.Data[DoubleField].GetMemorySize(), 9)
assert.Equal(t, insertData1.Data[StringField].GetMemorySize(), 18)
assert.Equal(t, insertData1.Data[BinaryVectorField].GetMemorySize(), 5)
assert.Equal(t, insertData1.Data[FloatField].GetMemorySize(), 5)
assert.Equal(t, insertData1.Data[FloatVectorField].GetMemorySize(), 20)
assert.Equal(t, insertData1.Data[Float16VectorField].GetMemorySize(), 12)
assert.Equal(t, insertData1.Data[BFloat16VectorField].GetMemorySize(), 12)
assert.Equal(t, insertData1.Data[Int8VectorField].GetMemorySize(), 8)
assert.Equal(t, insertData1.Data[ArrayField].GetMemorySize(), 3*4+1)
assert.Equal(t, insertData1.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16+1)
@ -889,6 +928,18 @@ func TestMemorySize(t *testing.T) {
Data: []float32{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
},
Float16VectorField: &Float16VectorFieldData{
Data: []byte{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 4,
},
BFloat16VectorField: &BFloat16VectorFieldData{
Data: []byte{0, 1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7},
Dim: 4,
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{0, 1, 2, 3, 0, 1, 2, 3},
Dim: 4,
},
},
}
@ -903,22 +954,28 @@ func TestMemorySize(t *testing.T) {
assert.Equal(t, insertData2.Data[DoubleField].GetMemorySize(), 17)
assert.Equal(t, insertData2.Data[StringField].GetMemorySize(), 36)
assert.Equal(t, insertData2.Data[BinaryVectorField].GetMemorySize(), 6)
assert.Equal(t, insertData2.Data[FloatField].GetMemorySize(), 9)
assert.Equal(t, insertData2.Data[FloatVectorField].GetMemorySize(), 36)
assert.Equal(t, insertData2.Data[Float16VectorField].GetMemorySize(), 20)
assert.Equal(t, insertData2.Data[BFloat16VectorField].GetMemorySize(), 20)
assert.Equal(t, insertData2.Data[Int8VectorField].GetMemorySize(), 12)
insertDataEmpty := &InsertData{
Data: map[int64]FieldData{
RowIDField: &Int64FieldData{[]int64{}, nil, false},
TimestampField: &Int64FieldData{[]int64{}, nil, false},
BoolField: &BoolFieldData{[]bool{}, nil, false},
Int8Field: &Int8FieldData{[]int8{}, nil, false},
Int16Field: &Int16FieldData{[]int16{}, nil, false},
Int32Field: &Int32FieldData{[]int32{}, nil, false},
Int64Field: &Int64FieldData{[]int64{}, nil, false},
FloatField: &FloatFieldData{[]float32{}, nil, false},
DoubleField: &DoubleFieldData{[]float64{}, nil, false},
StringField: &StringFieldData{[]string{}, schemapb.DataType_VarChar, nil, false},
BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8},
FloatVectorField: &FloatVectorFieldData{[]float32{}, 4},
RowIDField: &Int64FieldData{[]int64{}, nil, false},
TimestampField: &Int64FieldData{[]int64{}, nil, false},
BoolField: &BoolFieldData{[]bool{}, nil, false},
Int8Field: &Int8FieldData{[]int8{}, nil, false},
Int16Field: &Int16FieldData{[]int16{}, nil, false},
Int32Field: &Int32FieldData{[]int32{}, nil, false},
Int64Field: &Int64FieldData{[]int64{}, nil, false},
FloatField: &FloatFieldData{[]float32{}, nil, false},
DoubleField: &DoubleFieldData{[]float64{}, nil, false},
StringField: &StringFieldData{[]string{}, schemapb.DataType_VarChar, nil, false},
BinaryVectorField: &BinaryVectorFieldData{[]byte{}, 8},
FloatVectorField: &FloatVectorFieldData{[]float32{}, 4},
Float16VectorField: &Float16VectorFieldData{[]byte{}, 4},
BFloat16VectorField: &BFloat16VectorFieldData{[]byte{}, 4},
Int8VectorField: &Int8VectorFieldData{[]int8{}, 4},
},
}
@ -934,6 +991,9 @@ func TestMemorySize(t *testing.T) {
assert.Equal(t, insertDataEmpty.Data[StringField].GetMemorySize(), 1)
assert.Equal(t, insertDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4)
assert.Equal(t, insertDataEmpty.Data[FloatVectorField].GetMemorySize(), 4)
assert.Equal(t, insertDataEmpty.Data[Float16VectorField].GetMemorySize(), 4)
assert.Equal(t, insertDataEmpty.Data[BFloat16VectorField].GetMemorySize(), 4)
assert.Equal(t, insertDataEmpty.Data[Int8VectorField].GetMemorySize(), 4)
}
func TestDeleteData(t *testing.T) {
@ -1027,4 +1087,6 @@ func TestAddFieldDataToPayload(t *testing.T) {
},
})
assert.Error(t, err)
err = AddFieldDataToPayload(e, schemapb.DataType_Int8Vector, &Int8VectorFieldData{[]int8{}, 4})
assert.Error(t, err)
}

View File

@ -232,6 +232,18 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema,
return nil, merr.WrapErrParameterInvalidMsg("vector not support null")
}
return &SparseFloatVectorFieldData{}, nil
case schemapb.DataType_Int8Vector:
if fieldSchema.GetNullable() {
return nil, merr.WrapErrParameterInvalidMsg("vector not support null")
}
dim, err := GetDimFromParams(typeParams)
if err != nil {
return nil, err
}
return &Int8VectorFieldData{
Data: make([]int8, 0, cap),
Dim: dim,
}, nil
case schemapb.DataType_Bool:
data := &BoolFieldData{
Data: make([]bool, 0, cap),
@ -407,6 +419,11 @@ type SparseFloatVectorFieldData struct {
schemapb.SparseFloatArray
}
type Int8VectorFieldData struct {
Data []int8
Dim int
}
func (dst *SparseFloatVectorFieldData) AppendAllRows(src *SparseFloatVectorFieldData) {
if len(src.Contents) == 0 {
return
@ -435,6 +452,7 @@ func (data *BFloat16VectorFieldData) RowNum() int {
return len(data.Data) / 2 / data.Dim
}
func (data *SparseFloatVectorFieldData) RowNum() int { return len(data.Contents) }
func (data *Int8VectorFieldData) RowNum() int { return len(data.Data) / data.Dim }
// GetRow implements FieldData.GetRow
func (data *BoolFieldData) GetRow(i int) any {
@ -527,6 +545,10 @@ func (data *BFloat16VectorFieldData) GetRow(i int) interface{} {
return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2]
}
func (data *Int8VectorFieldData) GetRow(i int) interface{} {
return data.Data[i*data.Dim : (i+1)*data.Dim]
}
func (data *BoolFieldData) GetDataRows() any { return data.Data }
func (data *Int8FieldData) GetDataRows() any { return data.Data }
func (data *Int16FieldData) GetDataRows() any { return data.Data }
@ -542,6 +564,7 @@ func (data *FloatVectorFieldData) GetDataRows() any { return data.Data }
func (data *Float16VectorFieldData) GetDataRows() any { return data.Data }
func (data *BFloat16VectorFieldData) GetDataRows() any { return data.Data }
func (data *SparseFloatVectorFieldData) GetDataRows() any { return data.Contents }
func (data *Int8VectorFieldData) GetDataRows() any { return data.Data }
// AppendRow implements FieldData.AppendRow
func (data *BoolFieldData) AppendRow(row interface{}) error {
@ -766,6 +789,15 @@ func (data *SparseFloatVectorFieldData) AppendRow(row interface{}) error {
return nil
}
func (data *Int8VectorFieldData) AppendRow(row interface{}) error {
v, ok := row.([]int8)
if !ok || len(v) != data.Dim {
return merr.WrapErrParameterInvalid("[]int8", row, "Wrong row type")
}
data.Data = append(data.Data, v...)
return nil
}
func (data *BoolFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error {
err := data.AppendDataRows(dataRows)
if err != nil {
@ -890,6 +922,14 @@ func (data *SparseFloatVectorFieldData) AppendRows(dataRows interface{}, validDa
return data.AppendValidDataRows(validDataRows)
}
func (data *Int8VectorFieldData) AppendRows(dataRows interface{}, validDataRows interface{}) error {
err := data.AppendDataRows(dataRows)
if err != nil {
return err
}
return data.AppendValidDataRows(validDataRows)
}
func (data *BoolFieldData) AppendDataRows(rows interface{}) error {
v, ok := rows.([]bool)
if !ok {
@ -1044,6 +1084,18 @@ func (data *SparseFloatVectorFieldData) AppendDataRows(rows interface{}) error {
return nil
}
func (data *Int8VectorFieldData) AppendDataRows(rows interface{}) error {
v, ok := rows.([]int8)
if !ok {
return merr.WrapErrParameterInvalid("[]int8", rows, "Wrong rows type")
}
if len(v)%(data.Dim) != 0 {
return merr.WrapErrParameterInvalid(data.Dim, len(v), "Wrong vector size")
}
data.Data = append(data.Data, v...)
return nil
}
func (data *BoolFieldData) AppendValidDataRows(rows interface{}) error {
if rows == nil {
return nil
@ -1233,6 +1285,19 @@ func (data *SparseFloatVectorFieldData) AppendValidDataRows(rows interface{}) er
return nil
}
func (data *Int8VectorFieldData) AppendValidDataRows(rows interface{}) error {
if rows != nil {
v, ok := rows.([]bool)
if !ok {
return merr.WrapErrParameterInvalid("[]bool", rows, "Wrong rows type")
}
if len(v) != 0 {
return merr.WrapErrParameterInvalidMsg("not support Nullable in vector")
}
}
return nil
}
// GetMemorySize implements FieldData.GetMemorySize
func (data *BoolFieldData) GetMemorySize() int {
return binary.Size(data.Data) + binary.Size(data.ValidData) + binary.Size(data.Nullable)
@ -1271,6 +1336,8 @@ func (data *SparseFloatVectorFieldData) GetMemorySize() int {
return proto.Size(&data.SparseFloatArray)
}
func (data *Int8VectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
// GetDataType implements FieldData.GetDataType
func (data *BoolFieldData) GetDataType() schemapb.DataType { return schemapb.DataType_Bool }
func (data *Int8FieldData) GetDataType() schemapb.DataType { return schemapb.DataType_Int8 }
@ -1302,6 +1369,10 @@ func (data *SparseFloatVectorFieldData) GetDataType() schemapb.DataType {
return schemapb.DataType_SparseFloatVector
}
func (data *Int8VectorFieldData) GetDataType() schemapb.DataType {
return schemapb.DataType_Int8Vector
}
// why not binary.Size(data) directly? binary.Size(data) return -1
// binary.Size returns how many bytes Write would generate to encode the value v, which
// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data.
@ -1358,6 +1429,7 @@ func (data *BinaryVectorFieldData) GetRowSize(i int) int { return data.Dim / 8
func (data *FloatVectorFieldData) GetRowSize(i int) int { return data.Dim * 4 }
func (data *Float16VectorFieldData) GetRowSize(i int) int { return data.Dim * 2 }
func (data *BFloat16VectorFieldData) GetRowSize(i int) int { return data.Dim * 2 }
func (data *Int8VectorFieldData) GetRowSize(i int) int { return data.Dim }
func (data *StringFieldData) GetRowSize(i int) int { return len(data.Data[i]) + 16 }
func (data *JSONFieldData) GetRowSize(i int) int { return len(data.Data[i]) + 16 }
func (data *ArrayFieldData) GetRowSize(i int) int {
@ -1434,6 +1506,10 @@ func (data *Float16VectorFieldData) GetNullable() bool {
return false
}
func (data *Int8VectorFieldData) GetNullable() bool {
return false
}
func (data *StringFieldData) GetNullable() bool {
return data.Nullable
}

View File

@ -83,6 +83,7 @@ func (s *InsertDataSuite) TestInsertData() {
{"float vector without dim", schemapb.DataType_FloatVector},
{"float16 vector without dim", schemapb.DataType_Float16Vector},
{"bfloat16 vector without dim", schemapb.DataType_BFloat16Vector},
{"int8 vector without dim", schemapb.DataType_Int8Vector},
}
for _, test := range tests {
@ -114,15 +115,15 @@ func (s *InsertDataSuite) TestInsertData() {
s.Run("init by New", func() {
s.True(s.iDataEmpty.IsEmpty())
s.Equal(0, s.iDataEmpty.GetRowNum())
s.Equal(28, s.iDataEmpty.GetMemorySize())
s.Equal(32, s.iDataEmpty.GetMemorySize())
s.False(s.iDataOneRow.IsEmpty())
s.Equal(1, s.iDataOneRow.GetRowNum())
s.Equal(191, s.iDataOneRow.GetMemorySize())
s.Equal(199, s.iDataOneRow.GetMemorySize())
s.False(s.iDataTwoRows.IsEmpty())
s.Equal(2, s.iDataTwoRows.GetRowNum())
s.Equal(352, s.iDataTwoRows.GetMemorySize())
s.Equal(364, s.iDataTwoRows.GetMemorySize())
for _, field := range s.iDataTwoRows.Data {
s.Equal(2, field.RowNum())
@ -151,6 +152,7 @@ func (s *InsertDataSuite) TestMemorySize() {
s.Equal(s.iDataEmpty.Data[Float16VectorField].GetMemorySize(), 4)
s.Equal(s.iDataEmpty.Data[BFloat16VectorField].GetMemorySize(), 4)
s.Equal(s.iDataEmpty.Data[SparseFloatVectorField].GetMemorySize(), 0)
s.Equal(s.iDataEmpty.Data[Int8VectorField].GetMemorySize(), 4)
s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 9)
s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 9)
@ -169,6 +171,7 @@ func (s *InsertDataSuite) TestMemorySize() {
s.Equal(s.iDataOneRow.Data[Float16VectorField].GetMemorySize(), 12)
s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetMemorySize(), 12)
s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetMemorySize(), 28)
s.Equal(s.iDataOneRow.Data[Int8VectorField].GetMemorySize(), 8)
s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 17)
s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 17)
@ -186,6 +189,7 @@ func (s *InsertDataSuite) TestMemorySize() {
s.Equal(s.iDataTwoRows.Data[Float16VectorField].GetMemorySize(), 20)
s.Equal(s.iDataTwoRows.Data[BFloat16VectorField].GetMemorySize(), 20)
s.Equal(s.iDataTwoRows.Data[SparseFloatVectorField].GetMemorySize(), 54)
s.Equal(s.iDataTwoRows.Data[Int8VectorField].GetMemorySize(), 12)
}
func (s *InsertDataSuite) TestGetRowSize() {
@ -206,6 +210,7 @@ func (s *InsertDataSuite) TestGetRowSize() {
s.Equal(s.iDataOneRow.Data[Float16VectorField].GetRowSize(0), 8)
s.Equal(s.iDataOneRow.Data[BFloat16VectorField].GetRowSize(0), 8)
s.Equal(s.iDataOneRow.Data[SparseFloatVectorField].GetRowSize(0), 24)
s.Equal(s.iDataOneRow.Data[Int8VectorField].GetRowSize(0), 4)
}
func (s *InsertDataSuite) TestGetDataType() {
@ -230,7 +235,7 @@ func (s *InsertDataSuite) SetupTest() {
s.Require().NoError(err)
s.True(s.iDataEmpty.IsEmpty())
s.Equal(0, s.iDataEmpty.GetRowNum())
s.Equal(28, s.iDataEmpty.GetMemorySize())
s.Equal(32, s.iDataEmpty.GetMemorySize())
row1 := map[FieldID]interface{}{
RowIDField: int64(3),
@ -248,6 +253,7 @@ func (s *InsertDataSuite) SetupTest() {
Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255},
BFloat16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255},
SparseFloatVectorField: typeutil.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{4, 5, 6}),
Int8VectorField: []int8{-4, -5, 6, 7},
ArrayField: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},
@ -281,6 +287,7 @@ func (s *InsertDataSuite) SetupTest() {
Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8},
BFloat16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8},
SparseFloatVectorField: typeutil.CreateSparseFloatRow([]uint32{2, 3, 4}, []float32{4, 5, 6}),
Int8VectorField: []int8{-128, -5, 6, 127},
ArrayField: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},

View File

@ -26,28 +26,29 @@ import (
// PayloadWriterInterface abstracts PayloadWriter
type PayloadWriterInterface interface {
AddDataToPayload(msgs any, valids []bool) error
AddBoolToPayload(msgs []bool, valids []bool) error
AddByteToPayload(msgs []byte, valids []bool) error
AddInt8ToPayload(msgs []int8, valids []bool) error
AddInt16ToPayload(msgs []int16, valids []bool) error
AddInt32ToPayload(msgs []int32, valids []bool) error
AddInt64ToPayload(msgs []int64, valids []bool) error
AddFloatToPayload(msgs []float32, valids []bool) error
AddDoubleToPayload(msgs []float64, valids []bool) error
AddOneStringToPayload(msgs string, isValid bool) error
AddOneArrayToPayload(msg *schemapb.ScalarField, isValid bool) error
AddOneJSONToPayload(msg []byte, isValid bool) error
AddBinaryVectorToPayload(binVec []byte, dim int) error
AddFloatVectorToPayload(binVec []float32, dim int) error
AddFloat16VectorToPayload(binVec []byte, dim int) error
AddBFloat16VectorToPayload(binVec []byte, dim int) error
AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error
AddDataToPayload(any, []bool) error
AddBoolToPayload([]bool, []bool) error
AddByteToPayload([]byte, []bool) error
AddInt8ToPayload([]int8, []bool) error
AddInt16ToPayload([]int16, []bool) error
AddInt32ToPayload([]int32, []bool) error
AddInt64ToPayload([]int64, []bool) error
AddFloatToPayload([]float32, []bool) error
AddDoubleToPayload([]float64, []bool) error
AddOneStringToPayload(string, bool) error
AddOneArrayToPayload(*schemapb.ScalarField, bool) error
AddOneJSONToPayload([]byte, bool) error
AddBinaryVectorToPayload([]byte, int) error
AddFloatVectorToPayload([]float32, int) error
AddFloat16VectorToPayload([]byte, int) error
AddBFloat16VectorToPayload([]byte, int) error
AddSparseFloatVectorToPayload(*SparseFloatVectorFieldData) error
AddInt8VectorToPayload([]int8, int) error
FinishPayloadWriter() error
GetPayloadBufferFromWriter() ([]byte, error)
GetPayloadLengthFromWriter() (int, error)
ReleasePayloadWriter()
Reserve(size int)
Reserve(int)
Close()
}
@ -70,6 +71,7 @@ type PayloadReaderInterface interface {
GetBFloat16VectorFromPayload() ([]byte, int, error)
GetFloatVectorFromPayload() ([]float32, int, error)
GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error)
GetInt8VectorFromPayload() ([]int8, int, error)
GetPayloadLengthFromReader() (int, error)
GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error)

View File

@ -89,6 +89,9 @@ func (r *PayloadReader) GetDataFromPayload() (interface{}, []bool, int, error) {
case schemapb.DataType_SparseFloatVector:
val, dim, err := r.GetSparseFloatVectorFromPayload()
return val, nil, dim, err
case schemapb.DataType_Int8Vector:
val, dim, err := r.GetInt8VectorFromPayload()
return val, nil, dim, err
case schemapb.DataType_String, schemapb.DataType_VarChar:
val, validData, err := r.GetStringFromPayload()
return val, validData, 0, err
@ -631,6 +634,36 @@ func (r *PayloadReader) GetSparseFloatVectorFromPayload() (*SparseFloatVectorFie
return fieldData, int(fieldData.Dim), nil
}
// GetInt8VectorFromPayload returns vector, dimension, error
func (r *PayloadReader) GetInt8VectorFromPayload() ([]int8, int, error) {
if r.colType != schemapb.DataType_Int8Vector {
return nil, -1, fmt.Errorf("failed to get int8 vector from datatype %v", r.colType.String())
}
col, err := r.reader.RowGroup(0).Column(0)
if err != nil {
return nil, -1, err
}
dim := col.Descriptor().TypeLength()
values := make([]parquet.FixedLenByteArray, r.numRows)
valuesRead, err := ReadDataFromAllRowGroups[parquet.FixedLenByteArray, *file.FixedLenByteArrayColumnChunkReader](r.reader, values, 0, r.numRows)
if err != nil {
return nil, -1, err
}
if valuesRead != r.numRows {
return nil, -1, fmt.Errorf("expect %d rows, but got valuesRead = %d", r.numRows, valuesRead)
}
ret := make([]int8, int64(dim)*r.numRows)
for i := 0; i < int(r.numRows); i++ {
int8Vals := arrow.Int8Traits.CastFromBytes(values[i])
copy(ret[i*dim:(i+1)*dim], int8Vals)
}
return ret, dim, nil
}
func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) {
return int(r.numRows), nil
}

View File

@ -236,6 +236,12 @@ func (w *NativePayloadWriter) AddDataToPayload(data interface{}, validData []boo
return merr.WrapErrParameterInvalidMsg("incorrect data type")
}
return w.AddSparseFloatVectorToPayload(val)
case schemapb.DataType_Int8Vector:
val, ok := data.([]int8)
if !ok {
return merr.WrapErrParameterInvalidMsg("incorrect data type")
}
return w.AddInt8VectorToPayload(val, w.dim.GetValue())
default:
return errors.New("unsupported datatype")
}
@ -668,6 +674,33 @@ func (w *NativePayloadWriter) AddSparseFloatVectorToPayload(data *SparseFloatVec
return nil
}
func (w *NativePayloadWriter) AddInt8VectorToPayload(data []int8, dim int) error {
if w.finished {
return errors.New("can't append data to finished int8 vector payload")
}
if len(data) == 0 {
return errors.New("can't add empty msgs into int8 vector payload")
}
builder, ok := w.builder.(*array.FixedSizeBinaryBuilder)
if !ok {
return errors.New("failed to cast Int8VectorBuilder")
}
byteLength := dim
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
vec := data[i*dim : (i+1)*dim]
vecBytes := arrow.Int8Traits.CastToBytes(vec)
builder.Append(vecBytes)
}
return nil
}
func (w *NativePayloadWriter) FinishPayloadWriter() error {
if w.finished {
return errors.New("can't reuse a finished writer")
@ -770,6 +803,10 @@ func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy
}
case schemapb.DataType_SparseFloatVector:
return &arrow.BinaryType{}
case schemapb.DataType_Int8Vector:
return &arrow.FixedSizeBinaryType{
ByteWidth: dim,
}
default:
panic("unsupported data type")
}

View File

@ -27,6 +27,7 @@ import (
"sort"
"strconv"
"github.com/apache/arrow/go/v12/arrow"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
@ -288,6 +289,16 @@ func readBFloat16Vectors(blobReaders []io.Reader, dim int) []byte {
return ret
}
func readInt8Vectors(blobReaders []io.Reader, dim int) []int8 {
ret := make([]int8, 0)
for _, r := range blobReaders {
v := make([]int8, dim)
ReadBinary(r, &v, schemapb.DataType_Int8Vector)
ret = append(ret, v...)
}
return ret
}
func readBoolArray(blobReaders []io.Reader) []bool {
ret := make([]bool, 0)
for _, r := range blobReaders {
@ -431,6 +442,19 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap
case schemapb.DataType_SparseFloatVector:
return nil, fmt.Errorf("Sparse Float Vector is not supported in row based data")
case schemapb.DataType_Int8Vector:
dim, err := GetDimFromParams(field.TypeParams)
if err != nil {
log.Error("failed to get dim", zap.Error(err))
return nil, err
}
vecs := readInt8Vectors(blobReaders, dim)
idata.Data[field.FieldID] = &Int8VectorFieldData{
Data: vecs,
Dim: dim,
}
case schemapb.DataType_Bool:
idata.Data[field.FieldID] = &BoolFieldData{
Data: readBoolArray(blobReaders),
@ -573,6 +597,19 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche
SparseFloatArray: *srcFields[field.FieldID].GetVectors().GetSparseFloatVector(),
}
case schemapb.DataType_Int8Vector:
dim, err := GetDimFromParams(field.TypeParams)
if err != nil {
log.Error("failed to get dim", zap.Error(err))
return nil, err
}
srcData := srcField.GetVectors().GetInt8Vector()
fieldData = &Int8VectorFieldData{
Data: lo.Map(srcData, func(v byte, _ int) int8 { return int8(v) }),
Dim: dim,
}
case schemapb.DataType_Bool:
srcData := srcField.GetScalars().GetBoolData().GetData()
validData := srcField.GetValidData()
@ -888,6 +925,18 @@ func mergeSparseFloatVectorField(data *InsertData, fid FieldID, field *SparseFlo
fieldData.AppendAllRows(field)
}
func mergeInt8VectorField(data *InsertData, fid FieldID, field *Int8VectorFieldData) {
if _, ok := data.Data[fid]; !ok {
fieldData := &Int8VectorFieldData{
Data: nil,
Dim: field.Dim,
}
data.Data[fid] = fieldData
}
fieldData := data.Data[fid].(*Int8VectorFieldData)
fieldData.Data = append(fieldData.Data, field.Data...)
}
// MergeFieldData merge field into data.
func MergeFieldData(data *InsertData, fid FieldID, field FieldData) {
if field == nil {
@ -924,6 +973,8 @@ func MergeFieldData(data *InsertData, fid FieldID, field FieldData) {
mergeBFloat16VectorField(data, fid, field)
case *SparseFloatVectorFieldData:
mergeSparseFloatVectorField(data, fid, field)
case *Int8VectorFieldData:
mergeInt8VectorField(data, fid, field)
}
}
@ -1259,6 +1310,20 @@ func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.Insert
},
},
}
case *Int8VectorFieldData:
dataBytes := arrow.Int8Traits.CastToBytes(rawData.Data)
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Int8Vector,
FieldId: fieldID,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: dataBytes,
},
Dim: int64(rawData.Dim),
},
},
}
default:
return insertRecord, fmt.Errorf("unsupported data type when transter storage.InsertData to internalpb.InsertRecord")
}

View File

@ -179,6 +179,10 @@ func TestTransferColumnBasedInsertDataToRowBased(t *testing.T) {
Dim: 1,
Data: []byte{1, 1, 2, 2, 3, 3},
}
f13 := &Int8VectorFieldData{
Dim: 1,
Data: []int8{1, 2, 3},
}
data.Data[101] = f1
data.Data[102] = f2
@ -192,6 +196,7 @@ func TestTransferColumnBasedInsertDataToRowBased(t *testing.T) {
data.Data[110] = f10
data.Data[111] = f11
data.Data[112] = f12
data.Data[113] = f13
utss, rowIDs, rows, err := TransferColumnBasedInsertDataToRowBased(data)
assert.NoError(t, err)
@ -216,6 +221,7 @@ func TestTransferColumnBasedInsertDataToRowBased(t *testing.T) {
0, 0, 0, 0, // 0
1, 1,
1, 1,
1,
},
rows[0].Value)
assert.ElementsMatch(t,
@ -232,6 +238,7 @@ func TestTransferColumnBasedInsertDataToRowBased(t *testing.T) {
0, 0, 0, 0, // 0
2, 2,
2, 2,
2,
},
rows[1].Value)
assert.ElementsMatch(t,
@ -248,6 +255,7 @@ func TestTransferColumnBasedInsertDataToRowBased(t *testing.T) {
0, 0, 0, 0, // 0
3, 3,
3, 3,
3,
},
rows[2].Value)
}
@ -331,7 +339,7 @@ func TestReadBinary(t *testing.T) {
}
}
func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse bool) (schema *schemapb.CollectionSchema, pkFieldID UniqueID, fieldIDs []UniqueID) {
func genAllFieldsSchema(dim int, withSparse bool) (schema *schemapb.CollectionSchema, pkFieldID UniqueID, fieldIDs []UniqueID) {
schema = &schemapb.CollectionSchema{
Name: "all_fields_schema",
Description: "all_fields_schema",
@ -364,7 +372,7 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(fVecDim),
Value: strconv.Itoa(dim),
},
},
},
@ -373,7 +381,7 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(bVecDim),
Value: strconv.Itoa(dim),
},
},
},
@ -382,7 +390,7 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(f16VecDim),
Value: strconv.Itoa(dim),
},
},
},
@ -391,7 +399,16 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(bf16VecDim),
Value: strconv.Itoa(dim),
},
},
},
{
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
},
},
@ -434,7 +451,7 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse
return schema, pkFieldID, fieldIDs
}
func genAllFieldsSchemaNullable(fVecDim, bVecDim, f16VecDim, bf16VecDim int, withSparse bool) (schema *schemapb.CollectionSchema, pkFieldID UniqueID, fieldIDs []UniqueID) {
func genAllFieldsSchemaNullable(dim int, withSparse bool) (schema *schemapb.CollectionSchema, pkFieldID UniqueID, fieldIDs []UniqueID) {
schema = &schemapb.CollectionSchema{
Name: "all_fields_schema_nullable",
Description: "all_fields_schema_nullable",
@ -477,7 +494,7 @@ func genAllFieldsSchemaNullable(fVecDim, bVecDim, f16VecDim, bf16VecDim int, wit
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(fVecDim),
Value: strconv.Itoa(dim),
},
},
},
@ -486,7 +503,7 @@ func genAllFieldsSchemaNullable(fVecDim, bVecDim, f16VecDim, bf16VecDim int, wit
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(bVecDim),
Value: strconv.Itoa(dim),
},
},
},
@ -495,7 +512,7 @@ func genAllFieldsSchemaNullable(fVecDim, bVecDim, f16VecDim, bf16VecDim int, wit
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(f16VecDim),
Value: strconv.Itoa(dim),
},
},
},
@ -504,7 +521,16 @@ func genAllFieldsSchemaNullable(fVecDim, bVecDim, f16VecDim, bf16VecDim int, wit
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(bf16VecDim),
Value: strconv.Itoa(dim),
},
},
},
{
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: strconv.Itoa(dim),
},
},
},
@ -563,8 +589,8 @@ func generateInt32ArrayList(numRows int) []*schemapb.ScalarField {
return ret
}
func genRowWithAllFields(fVecDim, bVecDim, f16VecDim, bf16VecDim int) (blob *commonpb.Blob, pk int64, row []interface{}) {
schema, _, _ := genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim, true)
func genRowWithAllFields(dim int) (blob *commonpb.Blob, pk int64, row []interface{}) {
schema, _, _ := genAllFieldsSchema(dim, true)
ret := &commonpb.Blob{
Value: nil,
}
@ -573,25 +599,30 @@ func genRowWithAllFields(fVecDim, bVecDim, f16VecDim, bf16VecDim int) (blob *com
var buffer bytes.Buffer
switch field.DataType {
case schemapb.DataType_FloatVector:
fVec := testutils.GenerateFloatVectors(1, fVecDim)
fVec := testutils.GenerateFloatVectors(1, dim)
_ = binary.Write(&buffer, common.Endian, fVec)
ret.Value = append(ret.Value, buffer.Bytes()...)
row = append(row, fVec)
case schemapb.DataType_BinaryVector:
bVec := testutils.GenerateBinaryVectors(1, bVecDim)
bVec := testutils.GenerateBinaryVectors(1, dim)
_ = binary.Write(&buffer, common.Endian, bVec)
ret.Value = append(ret.Value, buffer.Bytes()...)
row = append(row, bVec)
case schemapb.DataType_Float16Vector:
f16Vec := testutils.GenerateFloat16Vectors(1, f16VecDim)
f16Vec := testutils.GenerateFloat16Vectors(1, dim)
_ = binary.Write(&buffer, common.Endian, f16Vec)
ret.Value = append(ret.Value, buffer.Bytes()...)
row = append(row, f16Vec)
case schemapb.DataType_BFloat16Vector:
bf16Vec := testutils.GenerateBFloat16Vectors(1, bf16VecDim)
bf16Vec := testutils.GenerateBFloat16Vectors(1, dim)
_ = binary.Write(&buffer, common.Endian, bf16Vec)
ret.Value = append(ret.Value, buffer.Bytes()...)
row = append(row, bf16Vec)
case schemapb.DataType_Int8Vector:
iVec := testutils.GenerateInt8Vectors(1, dim)
_ = binary.Write(&buffer, common.Endian, iVec)
ret.Value = append(ret.Value, buffer.Bytes()...)
row = append(row, iVec)
case schemapb.DataType_Bool:
data := rand.Int()%2 == 0
_ = binary.Write(&buffer, common.Endian, data)
@ -649,7 +680,7 @@ func genRowWithAllFields(fVecDim, bVecDim, f16VecDim, bf16VecDim int) (blob *com
return ret, pk, row
}
func genRowBasedInsertMsg(numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim int) (msg *msgstream.InsertMsg, pks []int64, columns [][]interface{}) {
func genRowBasedInsertMsg(numRows, dim int) (msg *msgstream.InsertMsg, pks []int64, columns [][]interface{}) {
msg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: nil,
@ -672,7 +703,7 @@ func genRowBasedInsertMsg(numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim int)
pks = make([]int64, 0)
raws := make([][]interface{}, 0)
for i := 0; i < numRows; i++ {
row, pk, raw := genRowWithAllFields(fVecDim, bVecDim, f16VecDim, bf16VecDim)
row, pk, raw := genRowWithAllFields(dim)
msg.InsertRequest.RowData = append(msg.InsertRequest.RowData, row)
pks = append(pks, pk)
raws = append(raws, raw)
@ -687,7 +718,7 @@ func genRowBasedInsertMsg(numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim int)
return msg, pks, columns
}
func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim int) (msg *msgstream.InsertMsg, pks []int64, columns [][]interface{}) {
func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, dim int) (msg *msgstream.InsertMsg, pks []int64, columns [][]interface{}) {
msg = &msgstream.InsertMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: nil,
@ -878,13 +909,13 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
columns[idx] = append(columns[idx], d)
}
case schemapb.DataType_FloatVector:
data := testutils.GenerateFloatVectors(numRows, fVecDim)
data := testutils.GenerateFloatVectors(numRows, dim)
f := &schemapb.FieldData{
Type: schemapb.DataType_FloatVector,
FieldName: field.Name,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(fVecDim),
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: data,
@ -896,16 +927,16 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
}
msg.FieldsData = append(msg.FieldsData, f)
for nrows := 0; nrows < numRows; nrows++ {
columns[idx] = append(columns[idx], data[nrows*fVecDim:(nrows+1)*fVecDim])
columns[idx] = append(columns[idx], data[nrows*dim:(nrows+1)*dim])
}
case schemapb.DataType_BinaryVector:
data := testutils.GenerateBinaryVectors(numRows, bVecDim)
data := testutils.GenerateBinaryVectors(numRows, dim)
f := &schemapb.FieldData{
Type: schemapb.DataType_BinaryVector,
FieldName: field.Name,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(bVecDim),
Dim: int64(dim),
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: data,
},
@ -915,16 +946,16 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
}
msg.FieldsData = append(msg.FieldsData, f)
for nrows := 0; nrows < numRows; nrows++ {
columns[idx] = append(columns[idx], data[nrows*bVecDim/8:(nrows+1)*bVecDim/8])
columns[idx] = append(columns[idx], data[nrows*dim/8:(nrows+1)*dim/8])
}
case schemapb.DataType_Float16Vector:
data := testutils.GenerateFloat16Vectors(numRows, f16VecDim)
data := testutils.GenerateFloat16Vectors(numRows, dim)
f := &schemapb.FieldData{
Type: schemapb.DataType_Float16Vector,
FieldName: field.Name,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(f16VecDim),
Dim: int64(dim),
Data: &schemapb.VectorField_Float16Vector{
Float16Vector: data,
},
@ -934,16 +965,16 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
}
msg.FieldsData = append(msg.FieldsData, f)
for nrows := 0; nrows < numRows; nrows++ {
columns[idx] = append(columns[idx], data[nrows*f16VecDim*2:(nrows+1)*f16VecDim*2])
columns[idx] = append(columns[idx], data[nrows*dim*2:(nrows+1)*dim*2])
}
case schemapb.DataType_BFloat16Vector:
data := testutils.GenerateBFloat16Vectors(numRows, bf16VecDim)
data := testutils.GenerateBFloat16Vectors(numRows, dim)
f := &schemapb.FieldData{
Type: schemapb.DataType_BFloat16Vector,
FieldName: field.Name,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(bf16VecDim),
Dim: int64(dim),
Data: &schemapb.VectorField_Bfloat16Vector{
Bfloat16Vector: data,
},
@ -953,7 +984,7 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
}
msg.FieldsData = append(msg.FieldsData, f)
for nrows := 0; nrows < numRows; nrows++ {
columns[idx] = append(columns[idx], data[nrows*bf16VecDim*2:(nrows+1)*bf16VecDim*2])
columns[idx] = append(columns[idx], data[nrows*dim*2:(nrows+1)*dim*2])
}
case schemapb.DataType_SparseFloatVector:
data := testutils.GenerateSparseFloatVectors(numRows)
@ -974,7 +1005,25 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
for nrows := 0; nrows < numRows; nrows++ {
columns[idx] = append(columns[idx], data.Contents[nrows])
}
case schemapb.DataType_Int8Vector:
data := testutils.GenerateInt8Vectors(numRows, dim)
f := &schemapb.FieldData{
Type: schemapb.DataType_Int8Vector,
FieldName: field.Name,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: typeutil.Int8ArrayToBytes(data),
},
},
},
FieldId: field.FieldID,
}
msg.FieldsData = append(msg.FieldsData, f)
for nrows := 0; nrows < numRows; nrows++ {
columns[idx] = append(columns[idx], data[nrows*dim:(nrows+1)*dim])
}
case schemapb.DataType_Array:
data := generateInt32ArrayList(numRows)
f := &schemapb.FieldData{
@ -1030,10 +1079,10 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim
}
func TestRowBasedInsertMsgToInsertData(t *testing.T) {
numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim := 10, 8, 8, 8, 8
schema, _, fieldIDs := genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim, false)
numRows, dim := 10, 8
schema, _, fieldIDs := genAllFieldsSchema(dim, false)
fieldIDs = fieldIDs[:len(fieldIDs)-2]
msg, _, columns := genRowBasedInsertMsg(numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim)
msg, _, columns := genRowBasedInsertMsg(numRows, dim)
idata, err := RowBasedInsertMsgToInsertData(msg, schema, false)
assert.NoError(t, err)
@ -1049,9 +1098,9 @@ func TestRowBasedInsertMsgToInsertData(t *testing.T) {
}
func TestRowBasedTransferInsertMsgToInsertRecord(t *testing.T) {
numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim := 10, 8, 8, 8, 8
schema, _, _ := genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim, false)
msg, _, _ := genRowBasedInsertMsg(numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim)
numRows, dim := 10, 8
schema, _, _ := genAllFieldsSchema(dim, false)
msg, _, _ := genRowBasedInsertMsg(numRows, dim)
_, err := TransferInsertMsgToInsertRecord(schema, msg)
assert.NoError(t, err)
@ -1144,9 +1193,9 @@ func TestRowBasedInsertMsgToInsertBFloat16VectorDataError(t *testing.T) {
}
func TestColumnBasedInsertMsgToInsertData(t *testing.T) {
numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim := 2, 2, 8, 2, 2
schema, _, fieldIDs := genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim, true)
msg, _, columns := genColumnBasedInsertMsg(schema, numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim)
numRows, dim := 2, 8
schema, _, fieldIDs := genAllFieldsSchema(dim, true)
msg, _, columns := genColumnBasedInsertMsg(schema, numRows, dim)
idata, err := ColumnBasedInsertMsgToInsertData(msg, schema)
assert.NoError(t, err)
@ -1162,9 +1211,9 @@ func TestColumnBasedInsertMsgToInsertData(t *testing.T) {
}
func TestColumnBasedInsertMsgToInsertDataNullable(t *testing.T) {
numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim := 2, 2, 8, 2, 2
schema, _, fieldIDs := genAllFieldsSchemaNullable(fVecDim, bVecDim, f16VecDim, bf16VecDim, true)
msg, _, columns := genColumnBasedInsertMsg(schema, numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim)
numRows, dim := 2, 8
schema, _, fieldIDs := genAllFieldsSchemaNullable(dim, true)
msg, _, columns := genColumnBasedInsertMsg(schema, numRows, dim)
idata, err := ColumnBasedInsertMsgToInsertData(msg, schema)
assert.NoError(t, err)
@ -1268,10 +1317,10 @@ func TestColumnBasedInsertMsgToInsertBFloat16VectorDataError(t *testing.T) {
}
func TestInsertMsgToInsertData(t *testing.T) {
numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim := 10, 8, 8, 8, 8
schema, _, fieldIDs := genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim, false)
numRows, dim := 10, 8
schema, _, fieldIDs := genAllFieldsSchema(dim, false)
fieldIDs = fieldIDs[:len(fieldIDs)-2]
msg, _, columns := genRowBasedInsertMsg(numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim)
msg, _, columns := genRowBasedInsertMsg(numRows, dim)
idata, err := InsertMsgToInsertData(msg, schema)
assert.NoError(t, err)
@ -1287,9 +1336,9 @@ func TestInsertMsgToInsertData(t *testing.T) {
}
func TestInsertMsgToInsertData2(t *testing.T) {
numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim := 2, 2, 8, 2, 2
schema, _, fieldIDs := genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim, true)
msg, _, columns := genColumnBasedInsertMsg(schema, numRows, fVecDim, bVecDim, f16VecDim, bf16VecDim)
numRows, dim := 2, 8
schema, _, fieldIDs := genAllFieldsSchema(dim, true)
msg, _, columns := genColumnBasedInsertMsg(schema, numRows, dim)
idata, err := InsertMsgToInsertData(msg, schema)
assert.NoError(t, err)
@ -1366,6 +1415,10 @@ func TestMergeInsertData(t *testing.T) {
},
},
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{0},
Dim: 1,
},
ArrayField: &ArrayFieldData{
Data: []*schemapb.ScalarField{
{
@ -1453,6 +1506,10 @@ func TestMergeInsertData(t *testing.T) {
},
}, f.(*SparseFloatVectorFieldData))
f, ok = d1.Data[Int8VectorField]
assert.True(t, ok)
assert.Equal(t, []int8{0}, f.(*Int8VectorFieldData).Data)
f, ok = d1.Data[ArrayField]
assert.True(t, ok)
assert.Equal(t, []int32{4, 5, 6}, f.(*ArrayFieldData).Data[0].GetIntData().GetData())
@ -1519,6 +1576,10 @@ func TestMergeInsertData(t *testing.T) {
},
},
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{0},
Dim: 1,
},
ArrayField: &ArrayFieldData{
Data: []*schemapb.ScalarField{
{
@ -1592,6 +1653,10 @@ func TestMergeInsertData(t *testing.T) {
},
},
},
Int8VectorField: &Int8VectorFieldData{
Data: []int8{1},
Dim: 1,
},
ArrayField: &ArrayFieldData{
Data: []*schemapb.ScalarField{
{
@ -1681,6 +1746,10 @@ func TestMergeInsertData(t *testing.T) {
},
}, f.(*SparseFloatVectorFieldData))
f, ok = d1.Data[Int8VectorField]
assert.True(t, ok)
assert.Equal(t, []int8{0, 1}, f.(*Int8VectorFieldData).Data)
f, ok = d1.Data[ArrayField]
assert.True(t, ok)
assert.Equal(t, []int32{1, 2, 3}, f.(*ArrayFieldData).Data[0].GetIntData().GetData())

View File

@ -23,6 +23,7 @@ func CalcVectorDistance(dim int64, dataType schemapb.DataType, left []byte, righ
case schemapb.DataType_BinaryVector:
case schemapb.DataType_Float16Vector:
case schemapb.DataType_BFloat16Vector:
case schemapb.DataType_Int8Vector:
default:
return nil, merr.ErrParameterInvalid
}

View File

@ -191,6 +191,15 @@ func generateBinaryVectors(numRows, dim int) []byte {
return ret
}
func generateInt8Vectors(numRows, dim int) []int8 {
total := numRows * dim
ret := make([]int8, 0, total)
for i := 0; i < total; i++ {
ret = append(ret, int8(rand.Intn(256)-128))
}
return ret
}
func genFieldData(dtype schemapb.DataType, numRows, dim int) storage.FieldData {
switch dtype {
case schemapb.DataType_Bool:
@ -249,6 +258,11 @@ func genFieldData(dtype schemapb.DataType, numRows, dim int) storage.FieldData {
Data: generateBFloat16Vectors(numRows, dim),
Dim: dim,
}
case schemapb.DataType_Int8Vector:
return &storage.Int8VectorFieldData{
Data: generateInt8Vectors(numRows, dim),
Dim: dim,
}
default:
return nil
}
@ -369,6 +383,22 @@ func genBFloat16VecIndexCases(dtype schemapb.DataType) []indexTestCase {
}
}
func genInt8VecIndexCases(dtype schemapb.DataType) []indexTestCase {
return []indexTestCase{
{
dtype: dtype,
typeParams: nil,
indexParams: map[string]string{
common.IndexTypeKey: IndexHNSW,
common.MetricTypeKey: metric.L2,
common.DimKey: strconv.Itoa(dim),
"M": strconv.Itoa(16),
"efConstruction": strconv.Itoa(efConstruction),
},
},
}
}
func genTypedIndexCase(dtype schemapb.DataType) []indexTestCase {
switch dtype {
case schemapb.DataType_Bool:
@ -397,6 +427,8 @@ func genTypedIndexCase(dtype schemapb.DataType) []indexTestCase {
return genFloat16VecIndexCases(dtype)
case schemapb.DataType_BFloat16Vector:
return genBFloat16VecIndexCases(dtype)
case schemapb.DataType_Int8Vector:
return genInt8VecIndexCases(dtype)
default:
return nil
}
@ -417,6 +449,7 @@ func genIndexCase() []indexTestCase {
schemapb.DataType_FloatVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector,
schemapb.DataType_Int8Vector,
}
var ret []indexTestCase
for _, dtype := range dtypes {

View File

@ -60,6 +60,15 @@ func GenBinaryVecDataset(vectors []byte) *Dataset {
}
}
func GenInt8VecDataset(vectors []int8) *Dataset {
return &Dataset{
DType: schemapb.DataType_Int8Vector,
Data: map[string]interface{}{
keyRawArr: vectors,
},
}
}
func GenDataset(data storage.FieldData) *Dataset {
switch f := data.(type) {
case *storage.BoolFieldData:
@ -128,6 +137,8 @@ func GenDataset(data storage.FieldData) *Dataset {
return GenBFloat16VecDataset(f.Data)
case *storage.SparseFloatVectorFieldData:
return GenSparseFloatVecDataset(f)
case *storage.Int8VectorFieldData:
return GenInt8VecDataset(f.Data)
default:
return &Dataset{
DType: schemapb.DataType_None,

View File

@ -170,6 +170,8 @@ func (index *CgoIndex) Build(dataset *Dataset) error {
return index.buildBFloat16VecIndex(dataset)
case schemapb.DataType_BinaryVector:
return index.buildBinaryVecIndex(dataset)
case schemapb.DataType_Int8Vector:
return index.buildInt8VecIndex(dataset)
case schemapb.DataType_Bool:
return index.buildBoolIndex(dataset)
case schemapb.DataType_Int8:
@ -223,6 +225,12 @@ func (index *CgoIndex) buildBinaryVecIndex(dataset *Dataset) error {
return HandleCStatus(&status, "failed to build binary vector index")
}
func (index *CgoIndex) buildInt8VecIndex(dataset *Dataset) error {
vectors := dataset.Data[keyRawArr].([]int8)
status := C.BuildInt8VecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.int8_t)(&vectors[0]))
return HandleCStatus(&status, "failed to build int8 vector index")
}
// TODO: investigate if we can pass an bool array to cgo.
func (index *CgoIndex) buildBoolIndex(dataset *Dataset) error {
arr := dataset.Data[keyRawArr].([]bool)

View File

@ -92,6 +92,13 @@ func generateBFloat16VectorTestCases() []vecTestCase {
}
}
func generateInt8VectorTestCases() []vecTestCase {
return []vecTestCase{
{IndexHNSW, metric.L2, false, schemapb.DataType_Int8Vector},
{IndexHNSW, metric.IP, false, schemapb.DataType_Int8Vector},
}
}
func generateTestCases() []vecTestCase {
return append(generateFloatVectorTestCases(), generateBinaryVectorTestCases()...)
}
@ -218,6 +225,23 @@ func TestCIndex_BuildBinaryVecIndex(t *testing.T) {
}
}
func TestCIndex_BuildInt8VecIndex(t *testing.T) {
for _, c := range generateInt8VectorTestCases() {
typeParams, indexParams := generateParams(c.indexType, c.metricType)
index, err := NewCgoIndex(c.dtype, typeParams, indexParams)
assert.Equal(t, err, nil)
assert.NotEqual(t, index, nil)
vectors := generateInt8Vectors(nb, dim)
err = index.Build(GenInt8VecDataset(vectors))
assert.Equal(t, err, nil)
err = index.Delete()
assert.Equal(t, err, nil)
}
}
func TestCIndex_Codec(t *testing.T) {
for _, c := range generateTestCases() {
typeParams, indexParams := generateParams(c.indexType, c.metricType)
@ -305,4 +329,10 @@ func TestCIndex_Error(t *testing.T) {
err = indexPtr.Build(GenBinaryVecDataset(binaryVectors))
assert.Error(t, err)
})
t.Run("BuildInt8VecIndexWithoutIds error", func(t *testing.T) {
int8Vectors := []int8{11, 22, 33, 44}
err = indexPtr.Build(GenInt8VecDataset(int8Vectors))
assert.Error(t, err)
})
}

View File

@ -52,15 +52,17 @@ const (
)
var (
FloatVectorMetrics = []string{metric.L2, metric.IP, metric.COSINE} // const
BinaryVectorMetrics = []string{metric.HAMMING, metric.JACCARD, metric.SUBSTRUCTURE, metric.SUPERSTRUCTURE} // const
FloatVectorMetrics = []string{metric.L2, metric.IP, metric.COSINE} // const
SparseFloatVectorMetrics = []string{metric.IP, metric.BM25} // const
BinaryVectorMetrics = []string{metric.HAMMING, metric.JACCARD, metric.SUBSTRUCTURE, metric.SUPERSTRUCTURE} // const
IntVectorMetrics = []string{metric.L2, metric.IP, metric.COSINE} // const
)
// BinIDMapMetrics is a set of all metric types supported for binary vector.
var (
BinIDMapMetrics = []string{metric.HAMMING, metric.JACCARD, metric.SUBSTRUCTURE, metric.SUPERSTRUCTURE} // const
BinIvfMetrics = []string{metric.HAMMING, metric.JACCARD} // const
HnswMetrics = []string{metric.L2, metric.IP, metric.COSINE, metric.HAMMING, metric.JACCARD} // const
HnswMetrics = []string{metric.L2, metric.IP, metric.COSINE} // const
RaftMetrics = []string{metric.L2, metric.IP}
CagraBuildAlgoTypes = []string{CargaBuildAlgoIVFPQ, CargaBuildAlgoNNDESCENT}
supportDimPerSubQuantizer = []int{32, 28, 24, 20, 16, 12, 10, 8, 6, 4, 3, 2, 1} // const
@ -72,4 +74,5 @@ const (
FloatVectorDefaultMetricType = metric.COSINE
SparseFloatVectorDefaultMetricType = metric.IP
BinaryVectorDefaultMetricType = metric.HAMMING
IntVectorDefaultMetricType = metric.COSINE
)

View File

@ -52,6 +52,10 @@ func (c vecIndexChecker) StaticCheck(dataType schemapb.DataType, params map[stri
if !CheckStrByValues(params, Metric, BinaryVectorMetrics) {
return fmt.Errorf("metric type %s not found or not supported, supported: %v", params[Metric], BinaryVectorMetrics)
}
} else if typeutil.IsIntVectorType(dataType) {
if !CheckStrByValues(params, Metric, IntVectorMetrics) {
return fmt.Errorf("metric type %s not found or not supported, supported: %v", params[Metric], IntVectorMetrics)
}
}
indexType, exist := params[common.IndexTypeKey]

View File

@ -115,6 +115,12 @@ func TestVecIndexChecker_SetDefaultMetricTypeIfNotExist(t *testing.T) {
params: map[string]string{},
expectedType: BinaryVectorDefaultMetricType,
},
{
name: "int vector",
dataType: schemapb.DataType_Int8Vector,
params: map[string]string{},
expectedType: IntVectorDefaultMetricType,
},
{
name: "Existing metric type",
dataType: schemapb.DataType_FloatVector,

View File

@ -171,6 +171,15 @@ func CreateInsertData(schema *schemapb.CollectionSchema, rows int, nullPercent .
Dim: dim,
},
}
case schemapb.DataType_Int8Vector:
dim, err := typeutil.GetDim(f)
if err != nil {
return nil, err
}
insertData.Data[f.FieldID] = &storage.Int8VectorFieldData{
Data: testutils.GenerateInt8Vectors(rows, int(dim)),
Dim: int(dim),
}
case schemapb.DataType_String, schemapb.DataType_VarChar:
insertData.Data[f.FieldID].AppendDataRows(testutils.GenerateStringArray(rows))
case schemapb.DataType_JSON:
@ -419,6 +428,20 @@ func BuildArrayData(schema *schemapb.CollectionSchema, insertData *storage.Inser
return string(bs)
}), nil)
columns = append(columns, builder.NewStringArray())
case schemapb.DataType_Int8Vector:
builder := array.NewListBuilder(mem, &arrow.Int8Type{})
dim := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Dim
int8VecData := insertData.Data[fieldID].(*storage.Int8VectorFieldData).Data
rows := len(int8VecData) / dim
offsets := make([]int32, 0, rows)
valid := make([]bool, 0, rows)
for i := 0; i < rows; i++ {
offsets = append(offsets, int32(i*dim))
valid = append(valid, true)
}
builder.ValueBuilder().(*array.Int8Builder).AppendValues(int8VecData, nil)
builder.AppendValues(offsets, valid)
columns = append(columns, builder.NewListArray())
case schemapb.DataType_JSON:
builder := array.NewStringBuilder(mem)
jsonData := insertData.Data[fieldID].(*storage.JSONFieldData).Data
@ -746,6 +769,13 @@ func CreateInsertDataForCSV(schema *schemapb.CollectionSchema, insertData *stora
return nil, err
}
data = append(data, string(j))
case schemapb.DataType_Int8Vector:
vec := value.GetRow(i).([]int8)
j, err := json.Marshal(vec)
if err != nil {
return nil, err
}
data = append(data, string(j))
default:
str := fmt.Sprintf("%v", value.GetRow(i))
data = append(data, str)

View File

@ -65,6 +65,7 @@ func TestGenEmptyFieldData(t *testing.T) {
schemapb.DataType_FloatVector,
schemapb.DataType_Float16Vector,
schemapb.DataType_BFloat16Vector,
schemapb.DataType_Int8Vector,
}
field := &schemapb.FieldSchema{Name: "field_name", FieldID: 100}

View File

@ -98,6 +98,15 @@ func ConvertToArrowSchema(fields []*schemapb.FieldSchema) (*arrow.Schema, error)
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim * 2},
})
case schemapb.DataType_Int8Vector:
dim, err := storage.GetDimFromParams(field.TypeParams)
if err != nil {
return nil, err
}
arrowFields = append(arrowFields, arrow.Field{
Name: field.Name,
Type: &arrow.FixedSizeBinaryType{ByteWidth: dim},
})
default:
return nil, merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}

View File

@ -42,6 +42,7 @@ func TestConvertArrowSchema(t *testing.T) {
{FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON},
{FieldID: 14, Name: "field13", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}},
{FieldID: 15, Name: "field14", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}},
{FieldID: 16, Name: "field15", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "128"}}},
}
schema, err := ConvertToArrowSchema(fieldSchemas)
@ -66,6 +67,7 @@ func TestConvertArrowSchemaWithoutDim(t *testing.T) {
{FieldID: 13, Name: "field12", DataType: schemapb.DataType_JSON},
{FieldID: 14, Name: "field13", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{}},
{FieldID: 15, Name: "field14", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{}},
{FieldID: 16, Name: "field15", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{}},
}
_, err := ConvertToArrowSchema(fieldSchemas)

View File

@ -5,6 +5,7 @@ import (
"math"
"path"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"google.golang.org/protobuf/proto"
@ -124,7 +125,18 @@ func BuildRecord(b *array.RecordBuilder, data *storage.InsertData, fields []*sch
for i := 0; i < length; i++ {
builder.Append(data[i*byteLength : (i+1)*byteLength])
}
case schemapb.DataType_Int8Vector:
vecData := data.Data[field.FieldID].(*storage.Int8VectorFieldData)
builder := fBuilder.(*array.FixedSizeBinaryBuilder)
dim := vecData.Dim
data := vecData.Data
byteLength := dim
length := len(data) / byteLength
builder.Reserve(length)
for i := 0; i < length; i++ {
builder.Append(arrow.Int8Traits.CastToBytes(data[i*dim : (i+1)*dim]))
}
default:
return merr.WrapErrParameterInvalidMsg("unknown type %v", field.DataType.String())
}

View File

@ -40,6 +40,7 @@ const (
Float16Flag uint64 = 1 << 2
BFloat16Flag uint64 = 1 << 3
SparseFloat32Flag uint64 = 1 << 4
Int8Flag uint64 = 1 << 5
// NOTrainFlag This flag indicates that there is no need to create any index structure
NOTrainFlag uint64 = 1 << 16
@ -62,11 +63,12 @@ type VecIndexMgr interface {
GetFeature(indexType IndexType) (uint64, bool)
IsBinarySupport(indexType IndexType) bool
IsFlat32Support(indexType IndexType) bool
IsFlat16Support(indexType IndexType) bool
IsBFlat16Support(indexType IndexType) bool
IsSparseFloat32Support(indexType IndexType) bool
IsBinaryVectorSupport(indexType IndexType) bool
IsFloat32VectorSupport(indexType IndexType) bool
IsFloat16VectorSupport(indexType IndexType) bool
IsBFloat16VectorSupport(indexType IndexType) bool
IsSparseFloat32VectorSupport(indexType IndexType) bool
IsInt8VectorSupport(indexType IndexType) bool
IsDataTypeSupport(indexType IndexType, dataType schemapb.DataType) bool
IsFlatVecIndex(indexType IndexType) bool
@ -124,7 +126,7 @@ func (mgr *vecIndexMgrImpl) init() {
log.Info("init vector indexes with features : " + featureLog.String())
}
func (mgr *vecIndexMgrImpl) IsBinarySupport(indexType IndexType) bool {
func (mgr *vecIndexMgrImpl) IsBinaryVectorSupport(indexType IndexType) bool {
feature, ok := mgr.GetFeature(indexType)
if !ok {
return false
@ -132,7 +134,7 @@ func (mgr *vecIndexMgrImpl) IsBinarySupport(indexType IndexType) bool {
return (feature & BinaryFlag) == BinaryFlag
}
func (mgr *vecIndexMgrImpl) IsFlat32Support(indexType IndexType) bool {
func (mgr *vecIndexMgrImpl) IsFloat32VectorSupport(indexType IndexType) bool {
feature, ok := mgr.GetFeature(indexType)
if !ok {
return false
@ -140,7 +142,7 @@ func (mgr *vecIndexMgrImpl) IsFlat32Support(indexType IndexType) bool {
return (feature & Float32Flag) == Float32Flag
}
func (mgr *vecIndexMgrImpl) IsFlat16Support(indexType IndexType) bool {
func (mgr *vecIndexMgrImpl) IsFloat16VectorSupport(indexType IndexType) bool {
feature, ok := mgr.GetFeature(indexType)
if !ok {
return false
@ -148,7 +150,7 @@ func (mgr *vecIndexMgrImpl) IsFlat16Support(indexType IndexType) bool {
return (feature & Float16Flag) == Float16Flag
}
func (mgr *vecIndexMgrImpl) IsBFlat16Support(indexType IndexType) bool {
func (mgr *vecIndexMgrImpl) IsBFloat16VectorSupport(indexType IndexType) bool {
feature, ok := mgr.GetFeature(indexType)
if !ok {
return false
@ -156,7 +158,7 @@ func (mgr *vecIndexMgrImpl) IsBFlat16Support(indexType IndexType) bool {
return (feature & BFloat16Flag) == BFloat16Flag
}
func (mgr *vecIndexMgrImpl) IsSparseFloat32Support(indexType IndexType) bool {
func (mgr *vecIndexMgrImpl) IsSparseFloat32VectorSupport(indexType IndexType) bool {
feature, ok := mgr.GetFeature(indexType)
if !ok {
return false
@ -164,17 +166,27 @@ func (mgr *vecIndexMgrImpl) IsSparseFloat32Support(indexType IndexType) bool {
return (feature & SparseFloat32Flag) == SparseFloat32Flag
}
func (mgr *vecIndexMgrImpl) IsInt8VectorSupport(indexType IndexType) bool {
feature, ok := mgr.GetFeature(indexType)
if !ok {
return false
}
return (feature & Int8Flag) == Int8Flag
}
func (mgr *vecIndexMgrImpl) IsDataTypeSupport(indexType IndexType, dataType schemapb.DataType) bool {
if dataType == schemapb.DataType_BinaryVector {
return mgr.IsBinarySupport(indexType)
return mgr.IsBinaryVectorSupport(indexType)
} else if dataType == schemapb.DataType_FloatVector {
return mgr.IsFlat32Support(indexType)
return mgr.IsFloat32VectorSupport(indexType)
} else if dataType == schemapb.DataType_BFloat16Vector {
return mgr.IsBFlat16Support(indexType)
return mgr.IsBFloat16VectorSupport(indexType)
} else if dataType == schemapb.DataType_Float16Vector {
return mgr.IsFlat16Support(indexType)
return mgr.IsFloat16VectorSupport(indexType)
} else if dataType == schemapb.DataType_SparseFloatVector {
return mgr.IsSparseFloat32Support(indexType)
return mgr.IsSparseFloat32VectorSupport(indexType)
} else if dataType == schemapb.DataType_Int8Vector {
return mgr.IsInt8VectorSupport(indexType)
}
return false
}

View File

@ -15,7 +15,7 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2

View File

@ -492,8 +492,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f h1:So6RKU5wqP/8EaKogicJP8gZ2SrzzS/JprusBaE3RKc=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250102080446-c3ba3d26a90f/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=

View File

@ -354,6 +354,17 @@ func GetNumRowsOfBFloat16VectorField(bf16Datas []byte, dim int64) (uint64, error
return uint64((int64(l)) / dim / 2), nil
}
func GetNumRowsOfInt8VectorField(iDatas []byte, dim int64) (uint64, error) {
if dim <= 0 {
return 0, fmt.Errorf("dim(%d) should be greater than 0", dim)
}
l := len(iDatas)
if int64(l)%dim != 0 {
return 0, fmt.Errorf("the length(%d) of int8 data should divide the dim(%d)", l, dim)
}
return uint64(int64(l) / dim), nil
}
// GetNumRowOfFieldDataWithSchema returns num of rows with schema specification.
func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeutil.SchemaHelper) (uint64, error) {
var fieldNumRows uint64
@ -405,6 +416,12 @@ func GetNumRowOfFieldDataWithSchema(fieldData *schemapb.FieldData, helper *typeu
}
case schemapb.DataType_SparseFloatVector:
fieldNumRows = uint64(len(fieldData.GetVectors().GetSparseFloatVector().GetContents()))
case schemapb.DataType_Int8Vector:
dim := fieldData.GetVectors().GetDim()
fieldNumRows, err = GetNumRowsOfInt8VectorField(fieldData.GetVectors().GetInt8Vector(), dim)
if err != nil {
return 0, err
}
default:
return 0, fmt.Errorf("%s is not supported now", fieldSchema.GetDataType())
}
@ -468,6 +485,12 @@ func GetNumRowOfFieldData(fieldData *schemapb.FieldData) (uint64, error) {
}
case *schemapb.VectorField_SparseFloatVector:
fieldNumRows = uint64(len(vectorField.GetSparseFloatVector().GetContents()))
case *schemapb.VectorField_Int8Vector:
dim := vectorField.GetDim()
fieldNumRows, err = GetNumRowsOfInt8VectorField(vectorField.GetInt8Vector(), dim)
if err != nil {
return 0, err
}
default:
return 0, fmt.Errorf("%s is not supported now", vectorFieldType)
}

View File

@ -412,6 +412,34 @@ func TestGetNumRowsOfBinaryVectorField(t *testing.T) {
}
}
func TestGetNumRowsOfInt8VectorField(t *testing.T) {
cases := []struct {
iDatas []byte
dim int64
want uint64
errIsNil bool
}{
{[]byte{}, -1, 0, false}, // dim <= 0
{[]byte{}, 0, 0, false}, // dim <= 0
{[]byte{1}, 128, 0, false}, // length % dim != 0
{[]byte{}, 128, 0, true},
{[]byte{1, 2}, 2, 1, true},
{[]byte{1, 2, 3, 4}, 2, 2, true},
}
for _, test := range cases {
got, err := GetNumRowsOfInt8VectorField(test.iDatas, test.dim)
if test.errIsNil {
assert.Equal(t, nil, err)
if got != test.want {
t.Errorf("GetNumRowsOfInt8VectorField(%v, %v) = %v, %v", test.iDatas, test.dim, test.want, nil)
}
} else {
assert.NotEqual(t, nil, err)
}
}
}
func Test_ReadBinary(t *testing.T) {
// TODO: test big endian.
// low byte in high address, high byte in low address.
@ -596,6 +624,7 @@ func (s *NumRowsWithSchemaSuite) SetupSuite() {
{FieldID: 112, Name: "float16_vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: 113, Name: "bfloat16_vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: 114, Name: "sparse_vector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: 115, Name: "int8_vector", DataType: schemapb.DataType_Int8Vector, TypeParams: []*commonpb.KeyValuePair{{Key: "dim", Value: "8"}}},
{FieldID: 999, Name: "unknown", DataType: schemapb.DataType_None},
},
}
@ -777,6 +806,19 @@ func (s *NumRowsWithSchemaSuite) TestNormalCases() {
},
expect: 6,
},
{
tag: "int8_vector",
input: &schemapb.FieldData{
FieldName: "int8_vector",
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: 8,
Data: &schemapb.VectorField_Int8Vector{Int8Vector: make([]byte, 7*8)},
},
},
},
expect: 7,
},
}
for _, tc := range cases {
s.Run(tc.tag, func() {
@ -855,6 +897,18 @@ func (s *NumRowsWithSchemaSuite) TestErrorCases() {
},
},
},
{
tag: "int8_vector",
input: &schemapb.FieldData{
FieldName: "int8_vector",
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: 13,
Data: &schemapb.VectorField_Int8Vector{Int8Vector: make([]byte, 8*5)},
},
},
},
},
}
for _, tc := range cases {

View File

@ -102,6 +102,19 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place
Values: vec.Contents,
}
return placeholderValue, nil
case schemapb.DataType_Int8Vector:
vectors := fieldData.GetVectors()
x, ok := vectors.GetData().(*schemapb.VectorField_Int8Vector)
if !ok {
return nil, errors.New("vector data is not schemapb.VectorField_Int8Vector")
}
placeholderValue := &commonpb.PlaceholderValue{
Tag: "$0",
Type: commonpb.PlaceholderType_Int8Vector,
Values: flattenedInt8VectorsToByteVectors(x.Int8Vector, int(vectors.Dim)),
}
return placeholderValue, nil
case schemapb.DataType_VarChar:
strs := fieldData.GetScalars().GetStringData().GetData()
placeholderValue := &commonpb.PlaceholderValue{
@ -121,7 +134,6 @@ func flattenedFloatVectorsToByteVectors(flattenedVectors []float32, dimension in
for _, floatVector := range floatVectors {
result = append(result, floatVectorToByteVector(floatVector))
}
return result
}
@ -153,25 +165,28 @@ func flattenedByteVectorsToByteVectors(flattenedVectors []byte, dimension int) [
func flattenedFloat16VectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte {
result := make([][]byte, 0)
vectorBytes := 2 * dimension
for i := 0; i < len(flattenedVectors); i += vectorBytes {
result = append(result, flattenedVectors[i:i+vectorBytes])
}
return result
}
func flattenedBFloat16VectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte {
result := make([][]byte, 0)
vectorBytes := 2 * dimension
for i := 0; i < len(flattenedVectors); i += vectorBytes {
result = append(result, flattenedVectors[i:i+vectorBytes])
}
return result
}
func flattenedInt8VectorsToByteVectors(flattenedVectors []byte, dimension int) [][]byte {
result := make([][]byte, 0)
vectorBytes := dimension
for i := 0; i < len(flattenedVectors); i += vectorBytes {
result = append(result, flattenedVectors[i:i+vectorBytes])
}
return result
}

View File

@ -44,3 +44,16 @@ func Test_flattenedBFloat16VectorsToByteVectors(t *testing.T) {
assert.Equal(t, expected, actual)
}
func Test_flattenedInt8VectorsToByteVectors(t *testing.T) {
flattenedVectors := []byte{0, 1, 2, 3, 4, 5, 6, 7}
dimension := 4
actual := flattenedInt8VectorsToByteVectors(flattenedVectors, dimension)
expected := [][]byte{
{0, 1, 2, 3},
{4, 5, 6, 7},
}
assert.Equal(t, expected, actual)
}

View File

@ -263,6 +263,7 @@ const (
FloatVectorDefaultMetricType = metric.COSINE
SparseFloatVectorDefaultMetricType = metric.IP
BinaryVectorDefaultMetricType = metric.HAMMING
IntVectorDefaultMetricType = metric.COSINE
)
func SetDefaultMetricTypeIfNotExist(dType schemapb.DataType, params map[string]string) {
@ -272,6 +273,8 @@ func SetDefaultMetricTypeIfNotExist(dType schemapb.DataType, params map[string]s
setDefaultIfNotExist(params, common.MetricTypeKey, SparseFloatVectorDefaultMetricType)
} else if typeutil.IsBinaryVectorType(dType) {
setDefaultIfNotExist(params, common.MetricTypeKey, BinaryVectorDefaultMetricType)
} else if typeutil.IsIntVectorType(dType) {
setDefaultIfNotExist(params, common.MetricTypeKey, IntVectorDefaultMetricType)
}
}

View File

@ -280,6 +280,15 @@ func GenerateBFloat16Vectors(numRows, dim int) []byte {
return ret
}
func GenerateInt8Vectors(numRows, dim int) []int8 {
total := numRows * dim
ret := make([]int8, 0, total)
for i := 0; i < total; i++ {
ret = append(ret, int8(rand.Intn(256)-128))
}
return ret
}
func GenerateBFloat16VectorsWithInvalidData(numRows, dim int) []byte {
total := numRows * dim
ret16 := make([]uint16, 0, total)
@ -858,6 +867,36 @@ func NewSparseFloatVectorFieldData(fieldName string, numRows int) *schemapb.Fiel
}
}
func NewInt8VectorFieldData(fieldName string, numRows, dim int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_Int8Vector,
FieldName: fieldName,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: typeutil.Int8ArrayToBytes(GenerateInt8Vectors(numRows, dim)),
},
},
},
}
}
func NewInt8VectorFieldDataWithValue(fieldName string, fieldValue interface{}, dim int) *schemapb.FieldData {
return &schemapb.FieldData{
Type: schemapb.DataType_Int8Vector,
FieldName: fieldName,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: fieldValue.([]byte),
},
},
},
}
}
func GenerateScalarFieldData(dType schemapb.DataType, fieldName string, numRows int) *schemapb.FieldData {
switch dType {
case schemapb.DataType_Bool:
@ -931,6 +970,8 @@ func GenerateVectorFieldData(dType schemapb.DataType, fieldName string, numRows
return NewBFloat16VectorFieldData(fieldName, numRows, dim)
case schemapb.DataType_SparseFloatVector:
return NewSparseFloatVectorFieldData(fieldName, numRows)
case schemapb.DataType_Int8Vector:
return NewInt8VectorFieldData(fieldName, numRows, dim)
default:
panic("unsupported data type")
}
@ -953,6 +994,8 @@ func GenerateVectorFieldDataWithValue(dType schemapb.DataType, fieldName string,
fieldData = NewFloat16VectorFieldDataWithValue(fieldName, fieldValue, dim)
case schemapb.DataType_BFloat16Vector:
fieldData = NewBFloat16VectorFieldDataWithValue(fieldName, fieldValue, dim)
case schemapb.DataType_Int8Vector:
fieldData = NewInt8VectorFieldDataWithValue(fieldName, fieldValue, dim)
default:
panic("unsupported data type")
}

View File

@ -195,3 +195,13 @@ func Float32ArrayToBFloat16Bytes(fv []float32) []byte {
}
return data
}
// Int8ArrayToBytes serialize vector into byte slice, used in search placeholder
// LittleEndian is used for convention
func Int8ArrayToBytes(iv []int8) []byte {
data := make([]byte, 0, len(iv))
for _, i := range iv {
data = append(data, byte(i))
}
return data
}

View File

@ -227,6 +227,27 @@ func genEmptySparseFloatVectorFieldData(field *schemapb.FieldSchema) (*schemapb.
}, nil
}
func genEmptyInt8VectorFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) {
dim, err := GetDim(field)
if err != nil {
return nil, err
}
return &schemapb.FieldData{
Type: field.GetDataType(),
FieldName: field.GetName(),
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: nil,
},
},
},
FieldId: field.GetFieldID(),
IsDynamic: field.GetIsDynamic(),
}, nil
}
func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) {
dataType := field.GetDataType()
switch dataType {
@ -256,6 +277,8 @@ func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error)
return genEmptyBFloat16VectorFieldData(field)
case schemapb.DataType_SparseFloatVector:
return genEmptySparseFloatVectorFieldData(field)
case schemapb.DataType_Int8Vector:
return genEmptyInt8VectorFieldData(field)
default:
return nil, fmt.Errorf("unsupported data type: %s", dataType.String())
}

View File

@ -159,6 +159,17 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe
// generated by SPLADE as reference and returning size of a sparse
// vector with 150 non-zeros.
res += 1200
case schemapb.DataType_Int8Vector:
for _, kv := range fs.TypeParams {
if kv.Key == common.DimKey {
v, err := strconv.Atoi(kv.Value)
if err != nil {
return -1, err
}
res += v
break
}
}
}
}
return res, nil
@ -246,6 +257,8 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e
// counting only the size of the vector data, ignoring other
// bytes used in proto.
res += len(vec.Contents[rowOffset])
case schemapb.DataType_Int8Vector:
res += int(fs.GetVectors().GetDim())
default:
panic("Unknown data type:" + fs.GetType().String())
}
@ -474,6 +487,10 @@ func IsBinaryVectorType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_BinaryVector
}
func IsIntVectorType(dataType schemapb.DataType) bool {
return dataType == schemapb.DataType_Int8Vector
}
func IsDenseFloatVectorType(dataType schemapb.DataType) bool {
switch dataType {
case schemapb.DataType_FloatVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
@ -492,6 +509,8 @@ func VectorTypeSize(dataType schemapb.DataType) float64 {
return 0.125
case schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector:
return 2.0
case schemapb.DataType_Int8Vector:
return 1.0
default:
return 0.0
}
@ -506,12 +525,12 @@ func IsFloatVectorType(dataType schemapb.DataType) bool {
}
func IsFixDimVectorType(dataType schemapb.DataType) bool {
return IsBinaryVectorType(dataType) || IsDenseFloatVectorType(dataType)
return IsBinaryVectorType(dataType) || IsDenseFloatVectorType(dataType) || IsIntVectorType(dataType)
}
// IsVectorType returns true if input is a vector type, otherwise false
func IsVectorType(dataType schemapb.DataType) bool {
return IsBinaryVectorType(dataType) || IsFloatVectorType(dataType)
return IsBinaryVectorType(dataType) || IsFloatVectorType(dataType) || IsIntVectorType(dataType)
}
// IsIntegerType returns true if input is an integer type, otherwise false
@ -690,6 +709,10 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap
},
}
vectors.Vectors.Dim = 0
case *schemapb.VectorField_Int8Vector:
vectors.Vectors.Data = &schemapb.VectorField_Int8Vector{
Int8Vector: make([]byte, 0, topK*dim),
}
}
fd.Field = vectors
}
@ -900,6 +923,19 @@ func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int6
}
vec := dstVector.Data.(*schemapb.VectorField_SparseFloatVector).SparseFloatVector
appendSize += appendSparseFloatArraySingleRow(vec, srcVector.SparseFloatVector, idx)
case *schemapb.VectorField_Int8Vector:
if dstVector.GetInt8Vector() == nil {
srcToCopy := srcVector.Int8Vector[idx*dim : (idx+1)*dim]
dstVector.Data = &schemapb.VectorField_Int8Vector{
Int8Vector: make([]byte, len(srcToCopy)),
}
copy(dstVector.Data.(*schemapb.VectorField_Int8Vector).Int8Vector, srcToCopy)
} else {
dstInt8Vector := dstVector.Data.(*schemapb.VectorField_Int8Vector)
dstInt8Vector.Int8Vector = append(dstInt8Vector.Int8Vector, srcVector.Int8Vector[idx*dim:(idx+1)*dim]...)
}
/* #nosec G103 */
appendSize += int64(unsafe.Sizeof(srcVector.Int8Vector[idx*dim : (idx+1)*dim]))
default:
log.Error("Not supported field type", zap.String("field type", fieldData.Type.String()))
}
@ -958,6 +994,9 @@ func DeleteFieldData(dst []*schemapb.FieldData) {
dstBfloat16Vector.Bfloat16Vector = dstBfloat16Vector.Bfloat16Vector[:len(dstBfloat16Vector.Bfloat16Vector)-int(dim*2)]
case *schemapb.VectorField_SparseFloatVector:
trimSparseFloatArray(dstVector.GetSparseFloatVector())
case *schemapb.VectorField_Int8Vector:
dstInt8Vector := dstVector.Data.(*schemapb.VectorField_Int8Vector)
dstInt8Vector.Int8Vector = dstInt8Vector.Int8Vector[:len(dstInt8Vector.Int8Vector)-int(dim)]
default:
log.Error("wrong field type added", zap.String("field type", fieldData.Type.String()))
}
@ -1148,6 +1187,15 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error
} else {
appendSparseFloatArray(dstVector.GetSparseFloatVector(), srcVector.SparseFloatVector)
}
case *schemapb.VectorField_Int8Vector:
if dstVector.GetInt8Vector() == nil {
dstVector.Data = &schemapb.VectorField_Int8Vector{
Int8Vector: srcVector.Int8Vector,
}
} else {
dstInt8Vector := dstVector.Data.(*schemapb.VectorField_Int8Vector)
dstInt8Vector.Int8Vector = append(dstInt8Vector.Int8Vector, srcVector.Int8Vector...)
}
default:
log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String()))
return errors.New("unsupported data type: " + srcFieldData.Type.String())
@ -1449,6 +1497,10 @@ func GetData(field *schemapb.FieldData, idx int) interface{} {
return field.GetVectors().GetBfloat16Vector()[idx*dataBytes : (idx+1)*dataBytes]
case schemapb.DataType_SparseFloatVector:
return field.GetVectors().GetSparseFloatVector().Contents[idx]
case schemapb.DataType_Int8Vector:
dim := int(field.GetVectors().GetDim())
dataBytes := dim
return field.GetVectors().GetInt8Vector()[idx*dataBytes : (idx+1)*dataBytes]
}
return nil
}

View File

@ -47,49 +47,49 @@ func TestSchema(t *testing.T) {
Name: "field_int8",
IsPrimaryKey: false,
Description: "",
DataType: 2,
DataType: schemapb.DataType_Int8,
},
{
FieldID: 101,
Name: "field_int16",
IsPrimaryKey: false,
Description: "",
DataType: 3,
DataType: schemapb.DataType_Int16,
},
{
FieldID: 102,
Name: "field_int32",
IsPrimaryKey: false,
Description: "",
DataType: 4,
DataType: schemapb.DataType_Int32,
},
{
FieldID: 103,
Name: "field_int64",
IsPrimaryKey: true,
Description: "",
DataType: 5,
DataType: schemapb.DataType_Int64,
},
{
FieldID: 104,
Name: "field_float",
IsPrimaryKey: false,
Description: "",
DataType: 10,
DataType: schemapb.DataType_Float,
},
{
FieldID: 105,
Name: "field_double",
IsPrimaryKey: false,
Description: "",
DataType: 11,
DataType: schemapb.DataType_Double,
},
{
FieldID: 106,
Name: "field_string",
IsPrimaryKey: false,
Description: "",
DataType: 21,
DataType: schemapb.DataType_String,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.MaxLengthKey,
@ -102,7 +102,7 @@ func TestSchema(t *testing.T) {
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "",
DataType: 101,
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
@ -121,7 +121,7 @@ func TestSchema(t *testing.T) {
Name: "field_binary_vector",
IsPrimaryKey: false,
Description: "",
DataType: 100,
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
@ -145,7 +145,7 @@ func TestSchema(t *testing.T) {
Name: "field_float16_vector",
IsPrimaryKey: false,
Description: "",
DataType: 102,
DataType: schemapb.DataType_Float16Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
@ -158,7 +158,7 @@ func TestSchema(t *testing.T) {
Name: "field_bfloat16_vector",
IsPrimaryKey: false,
Description: "",
DataType: 103,
DataType: schemapb.DataType_BFloat16Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
@ -167,12 +167,25 @@ func TestSchema(t *testing.T) {
},
},
// Do not test on sparse float vector field.
{
FieldID: 113,
Name: "field_int8_vector",
IsPrimaryKey: false,
Description: "",
DataType: schemapb.DataType_Int8Vector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}
t.Run("EstimateSizePerRecord", func(t *testing.T) {
size, err := EstimateSizePerRecord(schema)
assert.Equal(t, 680+DynamicFieldMaxLength*3, size)
assert.Equal(t, 2219, size)
assert.NoError(t, err)
})
@ -198,19 +211,25 @@ func TestSchema(t *testing.T) {
dim, err := helper.GetVectorDimFromID(107)
assert.NoError(t, err)
assert.Equal(t, 128, dim)
dim1, err := helper.GetVectorDimFromID(108)
dim, err = helper.GetVectorDimFromID(108)
assert.NoError(t, err)
assert.Equal(t, 128, dim1)
assert.Equal(t, 128, dim)
_, err = helper.GetVectorDimFromID(103)
assert.Error(t, err)
dim2, err := helper.GetVectorDimFromID(111)
dim, err = helper.GetVectorDimFromID(111)
assert.NoError(t, err)
assert.Equal(t, 128, dim2)
assert.Equal(t, 128, dim)
dim3, err := helper.GetVectorDimFromID(112)
dim, err = helper.GetVectorDimFromID(112)
assert.NoError(t, err)
assert.Equal(t, 128, dim3)
assert.Equal(t, 128, dim)
dim, err = helper.GetVectorDimFromID(113)
assert.NoError(t, err)
assert.Equal(t, 128, dim)
})
t.Run("Type", func(t *testing.T) {
@ -227,6 +246,7 @@ func TestSchema(t *testing.T) {
assert.True(t, IsVectorType(schemapb.DataType_Float16Vector))
assert.True(t, IsVectorType(schemapb.DataType_BFloat16Vector))
assert.True(t, IsVectorType(schemapb.DataType_SparseFloatVector))
assert.True(t, IsVectorType(schemapb.DataType_Int8Vector))
assert.False(t, IsIntegerType(schemapb.DataType_Bool))
assert.True(t, IsIntegerType(schemapb.DataType_Int8))
@ -241,6 +261,7 @@ func TestSchema(t *testing.T) {
assert.False(t, IsIntegerType(schemapb.DataType_Float16Vector))
assert.False(t, IsIntegerType(schemapb.DataType_BFloat16Vector))
assert.False(t, IsIntegerType(schemapb.DataType_SparseFloatVector))
assert.False(t, IsIntegerType(schemapb.DataType_Int8Vector))
assert.False(t, IsFloatingType(schemapb.DataType_Bool))
assert.False(t, IsFloatingType(schemapb.DataType_Int8))
@ -255,6 +276,7 @@ func TestSchema(t *testing.T) {
assert.False(t, IsFloatingType(schemapb.DataType_Float16Vector))
assert.False(t, IsFloatingType(schemapb.DataType_BFloat16Vector))
assert.False(t, IsFloatingType(schemapb.DataType_SparseFloatVector))
assert.False(t, IsFloatingType(schemapb.DataType_Int8Vector))
assert.False(t, IsSparseFloatVectorType(schemapb.DataType_Bool))
assert.False(t, IsSparseFloatVectorType(schemapb.DataType_Int8))
@ -269,6 +291,7 @@ func TestSchema(t *testing.T) {
assert.False(t, IsSparseFloatVectorType(schemapb.DataType_Float16Vector))
assert.False(t, IsSparseFloatVectorType(schemapb.DataType_BFloat16Vector))
assert.True(t, IsSparseFloatVectorType(schemapb.DataType_SparseFloatVector))
assert.False(t, IsSparseFloatVectorType(schemapb.DataType_Int8Vector))
})
}
@ -959,6 +982,20 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType,
},
FieldId: fieldID,
}
case schemapb.DataType_Int8Vector:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Int8Vector,
FieldName: fieldName,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: dim,
Data: &schemapb.VectorField_Int8Vector{
Int8Vector: fieldValue.([]byte),
},
},
},
FieldId: fieldID,
}
case schemapb.DataType_Array:
fieldData = &schemapb.FieldData{
Type: schemapb.DataType_Array,
@ -1012,6 +1049,7 @@ func TestAppendFieldData(t *testing.T) {
BFloat16VectorFieldName = "BFloat16VectorField"
ArrayFieldName = "ArrayField"
SparseFloatVectorFieldName = "SparseFloatVectorField"
Int8VectorFieldName = "Int8VectorField"
BoolFieldID = common.StartOfUserFieldID + 1
Int32FieldID = common.StartOfUserFieldID + 2
Int64FieldID = common.StartOfUserFieldID + 3
@ -1023,6 +1061,7 @@ func TestAppendFieldData(t *testing.T) {
BFloat16VectorFieldID = common.StartOfUserFieldID + 9
ArrayFieldID = common.StartOfUserFieldID + 10
SparseFloatVectorFieldID = common.StartOfUserFieldID + 11
Int8VectorFieldID = common.StartOfUserFieldID + 12
)
BoolArray := []bool{true, false}
Int32Array := []int32{1, 2}
@ -1039,6 +1078,9 @@ func TestAppendFieldData(t *testing.T) {
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff,
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff,
}
Int8Vector := []byte{
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff,
}
ArrayArray := []*schemapb.ScalarField{
{
Data: &schemapb.ScalarField_IntData{
@ -1063,7 +1105,7 @@ func TestAppendFieldData(t *testing.T) {
},
}
result := make([]*schemapb.FieldData, 11)
result := make([]*schemapb.FieldData, 12)
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1))
@ -1076,6 +1118,7 @@ func TestAppendFieldData(t *testing.T) {
fieldDataArray1 = append(fieldDataArray1, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[0:Dim*2], Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(ArrayFieldName, ArrayFieldID, schemapb.DataType_Array, ArrayArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int8VectorFieldName, Int8VectorFieldID, schemapb.DataType_Int8Vector, Int8Vector[0:Dim], Dim))
var fieldDataArray2 []*schemapb.FieldData
fieldDataArray2 = append(fieldDataArray2, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[1:2], 1))
@ -1089,6 +1132,7 @@ func TestAppendFieldData(t *testing.T) {
fieldDataArray2 = append(fieldDataArray2, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[2*Dim:4*Dim], Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(ArrayFieldName, ArrayFieldID, schemapb.DataType_Array, ArrayArray[1:2], 1))
fieldDataArray2 = append(fieldDataArray2, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[1], SparseFloatVector.Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int8VectorFieldName, Int8VectorFieldID, schemapb.DataType_Int8Vector, Int8Vector[Dim:2*Dim], Dim))
AppendFieldData(result, fieldDataArray1, 0)
AppendFieldData(result, fieldDataArray2, 0)
@ -1104,6 +1148,7 @@ func TestAppendFieldData(t *testing.T) {
assert.Equal(t, BFloat16Vector, result[8].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector)
assert.Equal(t, ArrayArray, result[9].GetScalars().GetArrayData().Data)
assert.Equal(t, SparseFloatVector, result[10].GetVectors().GetSparseFloatVector())
assert.Equal(t, Int8Vector, result[11].GetVectors().Data.(*schemapb.VectorField_Int8Vector).Int8Vector)
}
func TestDeleteFieldData(t *testing.T) {
@ -1120,6 +1165,7 @@ func TestDeleteFieldData(t *testing.T) {
Float16VectorFieldName = "Float16VectorField"
BFloat16VectorFieldName = "BFloat16VectorField"
SparseFloatVectorFieldName = "SparseFloatVectorField"
Int8VectorFieldName = "Int8VectorField"
)
const (
@ -1134,6 +1180,7 @@ func TestDeleteFieldData(t *testing.T) {
Float16VectorFieldID
BFloat16VectorFieldID
SparseFloatVectorFieldID
Int8VectorFieldID
)
BoolArray := []bool{true, false}
Int32Array := []int32{1, 2}
@ -1158,9 +1205,12 @@ func TestDeleteFieldData(t *testing.T) {
CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}),
},
}
Int8Vector := []byte{
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff,
}
result1 := make([]*schemapb.FieldData, 11)
result2 := make([]*schemapb.FieldData, 11)
result1 := make([]*schemapb.FieldData, 12)
result2 := make([]*schemapb.FieldData, 12)
var fieldDataArray1 []*schemapb.FieldData
fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1))
@ -1173,6 +1223,7 @@ func TestDeleteFieldData(t *testing.T) {
fieldDataArray1 = append(fieldDataArray1, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[0:2*Dim], Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[0:2*Dim], Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim))
fieldDataArray1 = append(fieldDataArray1, genFieldData(Int8VectorFieldName, Int8VectorFieldID, schemapb.DataType_Int8Vector, Int8Vector[0:Dim], Dim))
var fieldDataArray2 []*schemapb.FieldData
fieldDataArray2 = append(fieldDataArray2, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[1:2], 1))
@ -1186,6 +1237,7 @@ func TestDeleteFieldData(t *testing.T) {
fieldDataArray2 = append(fieldDataArray2, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[2*Dim:4*Dim], Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[2*Dim:4*Dim], Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[1], SparseFloatVector.Dim))
fieldDataArray2 = append(fieldDataArray2, genFieldData(Int8VectorFieldName, Int8VectorFieldID, schemapb.DataType_Int8Vector, Int8Vector[Dim:2*Dim], Dim))
AppendFieldData(result1, fieldDataArray1, 0)
AppendFieldData(result1, fieldDataArray2, 0)
@ -1203,6 +1255,7 @@ func TestDeleteFieldData(t *testing.T) {
tmpSparseFloatVector := proto.Clone(SparseFloatVector).(*schemapb.SparseFloatArray)
tmpSparseFloatVector.Contents = [][]byte{SparseFloatVector.Contents[0]}
assert.Equal(t, tmpSparseFloatVector.Contents, result1[SparseFloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetSparseFloatVector().Contents)
assert.Equal(t, Int8Vector[0:Dim], result1[Int8VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Int8Vector).Int8Vector)
AppendFieldData(result2, fieldDataArray2, 0)
AppendFieldData(result2, fieldDataArray1, 0)
@ -1220,6 +1273,7 @@ func TestDeleteFieldData(t *testing.T) {
tmpSparseFloatVector = proto.Clone(SparseFloatVector).(*schemapb.SparseFloatArray)
tmpSparseFloatVector.Contents = [][]byte{SparseFloatVector.Contents[1]}
assert.EqualExportedValues(t, tmpSparseFloatVector, result2[SparseFloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetSparseFloatVector())
assert.Equal(t, Int8Vector[Dim:2*Dim], result2[Int8VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Int8Vector).Int8Vector)
}
func TestEstimateEntitySize(t *testing.T) {
@ -1638,6 +1692,10 @@ func TestGetDataAndGetDataSize(t *testing.T) {
CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}),
},
}
Int8Vector := []byte{
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77,
0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77,
}
boolData := genFieldData(fieldName, fieldID, schemapb.DataType_Bool, BoolArray, 1)
int8Data := genFieldData(fieldName, fieldID, schemapb.DataType_Int8, Int8Array, 1)
@ -1652,6 +1710,7 @@ func TestGetDataAndGetDataSize(t *testing.T) {
float16VecData := genFieldData(fieldName, fieldID, schemapb.DataType_Float16Vector, Float16Vector, Dim)
bfloat16VecData := genFieldData(fieldName, fieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector, Dim)
sparseFloatData := genFieldData(fieldName, fieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim)
int8VecData := genFieldData(fieldName, fieldID, schemapb.DataType_Int8Vector, Int8Vector, Dim)
invalidData := &schemapb.FieldData{
Type: schemapb.DataType_None,
}
@ -1678,6 +1737,7 @@ func TestGetDataAndGetDataSize(t *testing.T) {
float16VecDataRes := GetData(float16VecData, 0)
bfloat16VecDataRes := GetData(bfloat16VecData, 0)
sparseFloatDataRes := GetData(sparseFloatData, 0)
int8VecDataRes := GetData(int8VecData, 0)
invalidDataRes := GetData(invalidData, 0)
assert.Equal(t, BoolArray[0], boolDataRes)
@ -1693,6 +1753,7 @@ func TestGetDataAndGetDataSize(t *testing.T) {
assert.ElementsMatch(t, Float16Vector[:2*Dim], float16VecDataRes)
assert.ElementsMatch(t, BFloat16Vector[:2*Dim], bfloat16VecDataRes)
assert.Equal(t, SparseFloatVector.Contents[0], sparseFloatDataRes)
assert.ElementsMatch(t, Int8Vector[:Dim], int8VecDataRes)
assert.Nil(t, invalidDataRes)
})
}
@ -1765,6 +1826,7 @@ func TestMergeFieldData(t *testing.T) {
},
genFieldData("float16_vector", 111, schemapb.DataType_Float16Vector, []byte("12345678"), 4),
genFieldData("bfloat16_vector", 112, schemapb.DataType_BFloat16Vector, []byte("12345678"), 4),
genFieldData("int8_vector", 113, schemapb.DataType_Int8Vector, []byte("12345678"), 4),
}
srcFields := []*schemapb.FieldData{
@ -1826,6 +1888,7 @@ func TestMergeFieldData(t *testing.T) {
},
genFieldData("float16_vector", 111, schemapb.DataType_Float16Vector, []byte("abcdefgh"), 4),
genFieldData("bfloat16_vector", 112, schemapb.DataType_BFloat16Vector, []byte("ABCDEFGH"), 4),
genFieldData("int8_vector", 113, schemapb.DataType_Int8Vector, []byte("abcdefgh"), 4),
}
err := MergeFieldData(dstFields, srcFields)
@ -1860,6 +1923,7 @@ func TestMergeFieldData(t *testing.T) {
}, dstFields[6].GetVectors().GetSparseFloatVector())
assert.Equal(t, []byte("12345678abcdefgh"), dstFields[7].GetVectors().GetFloat16Vector())
assert.Equal(t, []byte("12345678ABCDEFGH"), dstFields[8].GetVectors().GetBfloat16Vector())
assert.Equal(t, []byte("12345678abcdefgh"), dstFields[9].GetVectors().GetInt8Vector())
})
t.Run("merge with nil", func(t *testing.T) {
@ -1894,6 +1958,7 @@ func TestMergeFieldData(t *testing.T) {
},
genFieldData("float16_vector", 111, schemapb.DataType_Float16Vector, []byte("12345678"), 4),
genFieldData("bfloat16_vector", 112, schemapb.DataType_BFloat16Vector, []byte("12345678"), 4),
genFieldData("int8_vector", 113, schemapb.DataType_Int8Vector, []byte("12345678"), 4),
}
dstFields := []*schemapb.FieldData{
@ -1904,6 +1969,7 @@ func TestMergeFieldData(t *testing.T) {
{Type: schemapb.DataType_SparseFloatVector, FieldName: "sparseFloat", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_SparseFloatVector{}}}, FieldId: 104},
{Type: schemapb.DataType_Float16Vector, FieldName: "float16_vector", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_Float16Vector{}}}, FieldId: 111},
{Type: schemapb.DataType_BFloat16Vector, FieldName: "bfloat16_vector", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_Bfloat16Vector{}}}, FieldId: 112},
{Type: schemapb.DataType_Int8Vector, FieldName: "int8_vector", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_Int8Vector{}}}, FieldId: 113},
}
err := MergeFieldData(dstFields, srcFields)
@ -1929,6 +1995,7 @@ func TestMergeFieldData(t *testing.T) {
}, dstFields[4].GetVectors().GetSparseFloatVector())
assert.Equal(t, []byte("12345678"), dstFields[5].GetVectors().GetFloat16Vector())
assert.Equal(t, []byte("12345678"), dstFields[6].GetVectors().GetBfloat16Vector())
assert.Equal(t, []byte("12345678"), dstFields[7].GetVectors().GetInt8Vector())
})
t.Run("error case", func(t *testing.T) {
@ -2294,6 +2361,32 @@ func (s *FieldDataSuite) TestPrepareFieldData() {
s.EqualValues(0, field.GetVectors().GetDim())
s.EqualValues(topK, cap(field.GetVectors().GetSparseFloatVector().GetContents()))
})
s.Run("int8_vector", func() {
samples := []*schemapb.FieldData{
{
FieldId: fieldID,
FieldName: fieldName,
Type: schemapb.DataType_Int8Vector,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: 128,
Data: &schemapb.VectorField_Int8Vector{},
},
},
},
}
fields := PrepareResultFieldData(samples, topK)
s.Require().Len(fields, 1)
field := fields[0]
s.Equal(fieldID, field.GetFieldId())
s.Equal(fieldName, field.GetFieldName())
s.Equal(schemapb.DataType_Int8Vector, field.GetType())
s.EqualValues(128, field.GetVectors().GetDim())
s.EqualValues(topK*128, cap(field.GetVectors().GetInt8Vector()))
})
}
func TestFieldData(t *testing.T) {