diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 32b2c3b5e7..f6c558745b 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -102,214 +102,6 @@ func (b Blob) GetValue() []byte { return b.Value } -// FieldData defines field data interface -type FieldData interface { - GetMemorySize() int - RowNum() int - GetRow(i int) interface{} -} - -type BoolFieldData struct { - Data []bool -} -type Int8FieldData struct { - Data []int8 -} -type Int16FieldData struct { - Data []int16 -} -type Int32FieldData struct { - Data []int32 -} -type Int64FieldData struct { - Data []int64 -} -type FloatFieldData struct { - Data []float32 -} -type DoubleFieldData struct { - Data []float64 -} -type StringFieldData struct { - Data []string -} -type ArrayFieldData struct { - ElementType schemapb.DataType - Data []*schemapb.ScalarField -} -type JSONFieldData struct { - Data [][]byte -} -type BinaryVectorFieldData struct { - Data []byte - Dim int -} -type FloatVectorFieldData struct { - Data []float32 - Dim int -} -type Float16VectorFieldData struct { - Data []byte - Dim int -} - -// RowNum implements FieldData.RowNum -func (data *BoolFieldData) RowNum() int { return len(data.Data) } -func (data *Int8FieldData) RowNum() int { return len(data.Data) } -func (data *Int16FieldData) RowNum() int { return len(data.Data) } -func (data *Int32FieldData) RowNum() int { return len(data.Data) } -func (data *Int64FieldData) RowNum() int { return len(data.Data) } -func (data *FloatFieldData) RowNum() int { return len(data.Data) } -func (data *DoubleFieldData) RowNum() int { return len(data.Data) } -func (data *StringFieldData) RowNum() int { return len(data.Data) } -func (data *BinaryVectorFieldData) RowNum() int { return len(data.Data) * 8 / data.Dim } -func (data *FloatVectorFieldData) RowNum() int { return len(data.Data) / data.Dim } -func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim } -func (data *ArrayFieldData) RowNum() int { return len(data.Data) } -func (data *JSONFieldData) RowNum() int { return len(data.Data) } - -// GetRow implements FieldData.GetRow -func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int8FieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int16FieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int32FieldData) GetRow(i int) any { return data.Data[i] } -func (data *Int64FieldData) GetRow(i int) any { return data.Data[i] } -func (data *FloatFieldData) GetRow(i int) any { return data.Data[i] } -func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] } -func (data *StringFieldData) GetRow(i int) any { return data.Data[i] } -func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] } -func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] } -func (data *BinaryVectorFieldData) GetRow(i int) any { - return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8] -} - -func (data *FloatVectorFieldData) GetRow(i int) any { - return data.Data[i*data.Dim : (i+1)*data.Dim] -} - -func (data *Float16VectorFieldData) GetRow(i int) any { - return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2] -} - -// why not binary.Size(data) directly? binary.Size(data) return -1 -// binary.Size returns how many bytes Write would generate to encode the value v, which -// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data. -// If v is neither of these, binary.Size returns -1. - -// GetMemorySize implements FieldData.GetMemorySize -func (data *BoolFieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -// GetMemorySize implements FieldData.GetMemorySize -func (data *Int8FieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -// GetMemorySize implements FieldData.GetMemorySize -func (data *Int16FieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -// GetMemorySize implements FieldData.GetMemorySize -func (data *Int32FieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -// GetMemorySize implements FieldData.GetMemorySize -func (data *Int64FieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -func (data *FloatFieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -func (data *DoubleFieldData) GetMemorySize() int { - return binary.Size(data.Data) -} - -func (data *StringFieldData) GetMemorySize() int { - var size int - for _, val := range data.Data { - size += len(val) + 16 - } - return size -} - -func (data *ArrayFieldData) GetMemorySize() int { - var size int - for _, val := range data.Data { - switch data.ElementType { - case schemapb.DataType_Bool: - size += binary.Size(val.GetBoolData().GetData()) - case schemapb.DataType_Int8: - size += binary.Size(val.GetIntData().GetData()) / 4 - case schemapb.DataType_Int16: - size += binary.Size(val.GetIntData().GetData()) / 2 - case schemapb.DataType_Int32: - size += binary.Size(val.GetIntData().GetData()) - case schemapb.DataType_Float: - size += binary.Size(val.GetFloatData().GetData()) - case schemapb.DataType_Double: - size += binary.Size(val.GetDoubleData().GetData()) - case schemapb.DataType_String, schemapb.DataType_VarChar: - size += (&StringFieldData{Data: val.GetStringData().GetData()}).GetMemorySize() - } - } - return size -} - -func (data *JSONFieldData) GetMemorySize() int { - var size int - for _, val := range data.Data { - size += len(val) + 16 - } - return size -} - -func (data *BinaryVectorFieldData) GetMemorySize() int { - return binary.Size(data.Data) + 4 -} - -func (data *FloatVectorFieldData) GetMemorySize() int { - return binary.Size(data.Data) + 4 -} - -func (data *Float16VectorFieldData) GetMemorySize() int { - return binary.Size(data.Data) + 4 -} - -// system field id: -// 0: unique row id -// 1: timestamp -// 100: first user field id -// 101: second user field id -// 102: ... - -// TODO: fill it -// info for each blob -type BlobInfo struct { - Length int -} - -// InsertData example row_schema: {float_field, int_field, float_vector_field, string_field} -// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>} -type InsertData struct { - // Todo, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArray - Data map[FieldID]FieldData // field id to field data - Infos []BlobInfo -} - -func (iData *InsertData) IsEmpty() bool { - if iData == nil { - return true - } - - timeFieldData, ok := iData.Data[common.TimeStampField] - return (!ok) || (timeFieldData.RowNum() <= 0) -} - // InsertCodec serializes and deserializes the insert data // Blob key example: // ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx} diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 00aa943535..e3000f2919 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -24,8 +24,10 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -50,8 +52,8 @@ const ( Float16VectorField = 112 ) -func TestInsertCodec(t *testing.T) { - schema := &etcdpb.CollectionMeta{ +func genTestCollectionMeta() *etcdpb.CollectionMeta { + return &etcdpb.CollectionMeta{ ID: CollectionID, CreateTime: 1, SegmentIDs: []int64{SegmentID}, @@ -62,46 +64,40 @@ func TestInsertCodec(t *testing.T) { AutoID: true, Fields: []*schemapb.FieldSchema{ { - FieldID: RowIDField, - Name: "row_id", - IsPrimaryKey: false, - Description: "row_id", - DataType: schemapb.DataType_Int64, + FieldID: RowIDField, + Name: "row_id", + Description: "row_id", + DataType: schemapb.DataType_Int64, }, { - FieldID: TimestampField, - Name: "Timestamp", - IsPrimaryKey: false, - Description: "Timestamp", - DataType: schemapb.DataType_Int64, + FieldID: TimestampField, + Name: "Timestamp", + Description: "Timestamp", + DataType: schemapb.DataType_Int64, }, { - FieldID: BoolField, - Name: "field_bool", - IsPrimaryKey: false, - Description: "bool", - DataType: schemapb.DataType_Bool, + FieldID: BoolField, + Name: "field_bool", + Description: "bool", + DataType: schemapb.DataType_Bool, }, { - FieldID: Int8Field, - Name: "field_int8", - IsPrimaryKey: false, - Description: "int8", - DataType: schemapb.DataType_Int8, + FieldID: Int8Field, + Name: "field_int8", + Description: "int8", + DataType: schemapb.DataType_Int8, }, { - FieldID: Int16Field, - Name: "field_int16", - IsPrimaryKey: false, - Description: "int16", - DataType: schemapb.DataType_Int16, + FieldID: Int16Field, + Name: "field_int16", + Description: "int16", + DataType: schemapb.DataType_Int16, }, { - FieldID: Int32Field, - Name: "field_int32", - IsPrimaryKey: false, - Description: "int32", - DataType: schemapb.DataType_Int32, + FieldID: Int32Field, + Name: "field_int32", + Description: "int32", + DataType: schemapb.DataType_Int32, }, { FieldID: Int64Field, @@ -111,25 +107,22 @@ func TestInsertCodec(t *testing.T) { DataType: schemapb.DataType_Int64, }, { - FieldID: FloatField, - Name: "field_float", - IsPrimaryKey: false, - Description: "float", - DataType: schemapb.DataType_Float, + FieldID: FloatField, + Name: "field_float", + Description: "float", + DataType: schemapb.DataType_Float, }, { - FieldID: DoubleField, - Name: "field_double", - IsPrimaryKey: false, - Description: "double", - DataType: schemapb.DataType_Double, + FieldID: DoubleField, + Name: "field_double", + Description: "double", + DataType: schemapb.DataType_Double, }, { - FieldID: StringField, - Name: "field_string", - IsPrimaryKey: false, - Description: "string", - DataType: schemapb.DataType_String, + FieldID: StringField, + Name: "field_string", + Description: "string", + DataType: schemapb.DataType_String, }, { FieldID: ArrayField, @@ -145,29 +138,48 @@ func TestInsertCodec(t *testing.T) { DataType: schemapb.DataType_JSON, }, { - FieldID: BinaryVectorField, - Name: "field_binary_vector", - IsPrimaryKey: false, - Description: "binary_vector", - DataType: schemapb.DataType_BinaryVector, + FieldID: BinaryVectorField, + Name: "field_binary_vector", + Description: "binary_vector", + DataType: schemapb.DataType_BinaryVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "8", + }, + }, }, { - FieldID: FloatVectorField, - Name: "field_float_vector", - IsPrimaryKey: false, - Description: "float_vector", - DataType: schemapb.DataType_FloatVector, + FieldID: FloatVectorField, + Name: "field_float_vector", + Description: "float_vector", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "4", + }, + }, }, { - FieldID: Float16VectorField, - Name: "field_float16_vector", - IsPrimaryKey: false, - Description: "float16_vector", - DataType: schemapb.DataType_Float16Vector, + FieldID: Float16VectorField, + Name: "field_float16_vector", + Description: "float16_vector", + DataType: schemapb.DataType_Float16Vector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "4", + }, + }, }, }, }, } +} + +func TestInsertCodec(t *testing.T) { + schema := genTestCollectionMeta() insertCodec := NewInsertCodecWithSchema(schema) insertData1 := &InsertData{ Data: map[int64]FieldData{ diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go new file mode 100644 index 0000000000..7b568b9254 --- /dev/null +++ b/internal/storage/insert_data.go @@ -0,0 +1,463 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "encoding/binary" + "fmt" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// TODO: fill it +// info for each blob +type BlobInfo struct { + Length int +} + +// InsertData example row_schema: {float_field, int_field, float_vector_field, string_field} +// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>} +// +// system filed id: +// 0: unique row id +// 1: timestamp +// 100: first user field id +// 101: second user field id +// 102: ... +type InsertData struct { + // TODO, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArray + Data map[FieldID]FieldData // field id to field data + Infos []BlobInfo +} + +func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error) { + if schema == nil { + return nil, fmt.Errorf("Nil input schema") + } + + idata := &InsertData{ + Data: make(map[FieldID]FieldData), + } + + for _, fSchema := range schema.Fields { + fieldData, err := NewFieldData(fSchema.DataType, fSchema) + if err != nil { + return nil, err + } + idata.Data[fSchema.FieldID] = fieldData + } + return idata, nil +} + +func (iData *InsertData) IsEmpty() bool { + if iData == nil { + return true + } + + timeFieldData, ok := iData.Data[common.TimeStampField] + return (!ok) || (timeFieldData.RowNum() <= 0) +} + +func (i *InsertData) GetRowNum() int { + if i.Data == nil || len(i.Data) == 0 { + return 0 + } + + data, ok := i.Data[common.RowIDField] + if !ok { + return 0 + } + + return data.RowNum() +} + +func (i *InsertData) GetMemorySize() int { + var size int + if i.Data == nil || len(i.Data) == 0 { + return size + } + + for _, data := range i.Data { + size += data.GetMemorySize() + } + + return size +} + +func (i *InsertData) Append(row map[FieldID]interface{}) error { + for fID, v := range row { + field, ok := i.Data[fID] + if !ok { + return fmt.Errorf("Missing field when appending row, got %d", fID) + } + + if err := field.AppendRow(v); err != nil { + return err + } + } + + return nil +} + +// FieldData defines field data interface +type FieldData interface { + GetMemorySize() int + RowNum() int + GetRow(i int) any + AppendRow(row interface{}) error +} + +func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) (FieldData, error) { + typeParams := fieldSchema.GetTypeParams() + switch dataType { + case schemapb.DataType_Float16Vector: + dim, err := GetDimFromParams(typeParams) + if err != nil { + return nil, err + } + return &Float16VectorFieldData{ + Data: make([]byte, 0), + Dim: dim, + }, nil + case schemapb.DataType_FloatVector: + dim, err := GetDimFromParams(typeParams) + if err != nil { + return nil, err + } + return &FloatVectorFieldData{ + Data: make([]float32, 0), + Dim: dim, + }, nil + case schemapb.DataType_BinaryVector: + dim, err := GetDimFromParams(typeParams) + if err != nil { + return nil, err + } + return &BinaryVectorFieldData{ + Data: make([]byte, 0), + Dim: dim, + }, nil + + case schemapb.DataType_Bool: + return &BoolFieldData{ + Data: make([]bool, 0), + }, nil + + case schemapb.DataType_Int8: + return &Int8FieldData{ + Data: make([]int8, 0), + }, nil + + case schemapb.DataType_Int16: + return &Int16FieldData{ + Data: make([]int16, 0), + }, nil + + case schemapb.DataType_Int32: + return &Int32FieldData{ + Data: make([]int32, 0), + }, nil + + case schemapb.DataType_Int64: + return &Int64FieldData{ + Data: make([]int64, 0), + }, nil + case schemapb.DataType_Float: + return &FloatFieldData{ + Data: make([]float32, 0), + }, nil + + case schemapb.DataType_Double: + return &DoubleFieldData{ + Data: make([]float64, 0), + }, nil + case schemapb.DataType_JSON: + return &JSONFieldData{ + Data: make([][]byte, 0), + }, nil + case schemapb.DataType_Array: + return &ArrayFieldData{ + Data: make([]*schemapb.ScalarField, 0), + ElementType: fieldSchema.GetElementType(), + }, nil + case schemapb.DataType_String, schemapb.DataType_VarChar: + return &StringFieldData{ + Data: make([]string, 0), + }, nil + default: + return nil, fmt.Errorf("Unexpected schema data type: %d", dataType) + } +} + +type BoolFieldData struct { + Data []bool +} +type Int8FieldData struct { + Data []int8 +} +type Int16FieldData struct { + Data []int16 +} +type Int32FieldData struct { + Data []int32 +} +type Int64FieldData struct { + Data []int64 +} +type FloatFieldData struct { + Data []float32 +} +type DoubleFieldData struct { + Data []float64 +} +type StringFieldData struct { + Data []string +} +type ArrayFieldData struct { + ElementType schemapb.DataType + Data []*schemapb.ScalarField +} +type JSONFieldData struct { + Data [][]byte +} +type BinaryVectorFieldData struct { + Data []byte + Dim int +} +type FloatVectorFieldData struct { + Data []float32 + Dim int +} +type Float16VectorFieldData struct { + Data []byte + Dim int +} + +// RowNum implements FieldData.RowNum +func (data *BoolFieldData) RowNum() int { return len(data.Data) } +func (data *Int8FieldData) RowNum() int { return len(data.Data) } +func (data *Int16FieldData) RowNum() int { return len(data.Data) } +func (data *Int32FieldData) RowNum() int { return len(data.Data) } +func (data *Int64FieldData) RowNum() int { return len(data.Data) } +func (data *FloatFieldData) RowNum() int { return len(data.Data) } +func (data *DoubleFieldData) RowNum() int { return len(data.Data) } +func (data *StringFieldData) RowNum() int { return len(data.Data) } +func (data *ArrayFieldData) RowNum() int { return len(data.Data) } +func (data *JSONFieldData) RowNum() int { return len(data.Data) } +func (data *BinaryVectorFieldData) RowNum() int { return len(data.Data) * 8 / data.Dim } +func (data *FloatVectorFieldData) RowNum() int { return len(data.Data) / data.Dim } +func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim } + +// GetRow implements FieldData.GetRow +func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] } +func (data *Int8FieldData) GetRow(i int) any { return data.Data[i] } +func (data *Int16FieldData) GetRow(i int) any { return data.Data[i] } +func (data *Int32FieldData) GetRow(i int) any { return data.Data[i] } +func (data *Int64FieldData) GetRow(i int) any { return data.Data[i] } +func (data *FloatFieldData) GetRow(i int) any { return data.Data[i] } +func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] } +func (data *StringFieldData) GetRow(i int) any { return data.Data[i] } +func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] } +func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] } +func (data *BinaryVectorFieldData) GetRow(i int) interface{} { + return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8] +} + +func (data *FloatVectorFieldData) GetRow(i int) interface{} { + return data.Data[i*data.Dim : (i+1)*data.Dim] +} + +func (data *Float16VectorFieldData) GetRow(i int) interface{} { + return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2] +} + +// AppendRow implements FieldData.AppendRow +func (data *BoolFieldData) AppendRow(row interface{}) error { + v, ok := row.(bool) + if !ok { + return merr.WrapErrParameterInvalid("bool", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *Int8FieldData) AppendRow(row interface{}) error { + v, ok := row.(int8) + if !ok { + return merr.WrapErrParameterInvalid("int8", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *Int16FieldData) AppendRow(row interface{}) error { + v, ok := row.(int16) + if !ok { + return merr.WrapErrParameterInvalid("int16", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *Int32FieldData) AppendRow(row interface{}) error { + v, ok := row.(int32) + if !ok { + return merr.WrapErrParameterInvalid("int32", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *Int64FieldData) AppendRow(row interface{}) error { + v, ok := row.(int64) + if !ok { + return merr.WrapErrParameterInvalid("int64", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *FloatFieldData) AppendRow(row interface{}) error { + v, ok := row.(float32) + if !ok { + return merr.WrapErrParameterInvalid("float32", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *DoubleFieldData) AppendRow(row interface{}) error { + v, ok := row.(float64) + if !ok { + return merr.WrapErrParameterInvalid("float64", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *StringFieldData) AppendRow(row interface{}) error { + v, ok := row.(string) + if !ok { + return merr.WrapErrParameterInvalid("string", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *ArrayFieldData) AppendRow(row interface{}) error { + v, ok := row.(*schemapb.ScalarField) + if !ok { + return merr.WrapErrParameterInvalid("*schemapb.ScalarField", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *JSONFieldData) AppendRow(row interface{}) error { + v, ok := row.([]byte) + if !ok { + return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type") + } + data.Data = append(data.Data, v) + return nil +} + +func (data *BinaryVectorFieldData) AppendRow(row interface{}) error { + v, ok := row.([]byte) + if !ok || len(v) != data.Dim/8 { + return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *FloatVectorFieldData) AppendRow(row interface{}) error { + v, ok := row.([]float32) + if !ok || len(v) != data.Dim { + return merr.WrapErrParameterInvalid("[]float32", row, "Wrong row type") + } + data.Data = append(data.Data, v...) + return nil +} + +func (data *Float16VectorFieldData) AppendRow(row interface{}) error { + v, ok := row.([]byte) + if !ok || len(v) != data.Dim*2 { + return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type") + } + data.Data = append(data.Data, v...) + return nil +} + +// GetMemorySize implements FieldData.GetMemorySize +func (data *BoolFieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *Int8FieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *Int16FieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *Int32FieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *Int64FieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *FloatFieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *DoubleFieldData) GetMemorySize() int { return binary.Size(data.Data) } +func (data *BinaryVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } +func (data *FloatVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } +func (data *Float16VectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } + +// why not binary.Size(data) directly? binary.Size(data) return -1 +// binary.Size returns how many bytes Write would generate to encode the value v, which +// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data. +// If v is neither of these, binary.Size returns -1. +func (data *StringFieldData) GetMemorySize() int { + var size int + for _, val := range data.Data { + size += len(val) + 16 + } + return size +} + +func (data *ArrayFieldData) GetMemorySize() int { + var size int + for _, val := range data.Data { + switch data.ElementType { + case schemapb.DataType_Bool: + size += binary.Size(val.GetBoolData().GetData()) + case schemapb.DataType_Int8: + size += binary.Size(val.GetIntData().GetData()) / 4 + case schemapb.DataType_Int16: + size += binary.Size(val.GetIntData().GetData()) / 2 + case schemapb.DataType_Int32: + size += binary.Size(val.GetIntData().GetData()) + case schemapb.DataType_Int64: + size += binary.Size(val.GetLongData().GetData()) + case schemapb.DataType_Float: + size += binary.Size(val.GetFloatData().GetData()) + case schemapb.DataType_Double: + size += binary.Size(val.GetDoubleData().GetData()) + case schemapb.DataType_String, schemapb.DataType_VarChar: + size += (&StringFieldData{Data: val.GetStringData().GetData()}).GetMemorySize() + } + } + return size +} + +func (data *JSONFieldData) GetMemorySize() int { + var size int + for _, val := range data.Data { + size += len(val) + 16 + } + return size +} diff --git a/internal/storage/insert_data_test.go b/internal/storage/insert_data_test.go new file mode 100644 index 0000000000..49c8781a45 --- /dev/null +++ b/internal/storage/insert_data_test.go @@ -0,0 +1,319 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func TestInsertDataSuite(t *testing.T) { + suite.Run(t, new(InsertDataSuite)) +} + +func TestArrayFieldDataSuite(t *testing.T) { + suite.Run(t, new(ArrayFieldDataSuite)) +} + +type InsertDataSuite struct { + suite.Suite + + schema *schemapb.CollectionSchema + + iDataOneRow *InsertData + iDataTwoRows *InsertData + iDataEmpty *InsertData +} + +func (s *InsertDataSuite) SetupSuite() { + s.schema = genTestCollectionMeta().Schema +} + +func (s *InsertDataSuite) TestInsertData() { + s.Run("nil schema", func() { + idata, err := NewInsertData(nil) + s.Error(err) + s.Nil(idata) + }) + + s.Run("invalid schema", func() { + tests := []struct { + description string + invalidType schemapb.DataType + }{ + {"binary vector without dim", schemapb.DataType_BinaryVector}, + {"float vector without dim", schemapb.DataType_FloatVector}, + {"float16 vector without dim", schemapb.DataType_Float16Vector}, + } + + for _, test := range tests { + s.Run(test.description, func() { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + DataType: test.invalidType, + }, + }, + } + idata, err := NewInsertData(schema) + s.Error(err) + s.Nil(idata) + }) + } + }) + + s.Run("empty iData", func() { + idata := &InsertData{} + s.True(idata.IsEmpty()) + s.Equal(0, idata.GetRowNum()) + s.Equal(0, idata.GetMemorySize()) + + err := idata.Append(map[FieldID]interface{}{1: struct{}{}}) + s.Error(err) + }) + + s.Run("init by New", func() { + s.True(s.iDataEmpty.IsEmpty()) + s.Equal(0, s.iDataEmpty.GetRowNum()) + s.Equal(12, s.iDataEmpty.GetMemorySize()) + + s.False(s.iDataOneRow.IsEmpty()) + s.Equal(1, s.iDataOneRow.GetRowNum()) + s.Equal(139, s.iDataOneRow.GetMemorySize()) + + s.False(s.iDataTwoRows.IsEmpty()) + s.Equal(2, s.iDataTwoRows.GetRowNum()) + s.Equal(266, s.iDataTwoRows.GetMemorySize()) + + for _, field := range s.iDataTwoRows.Data { + s.Equal(2, field.RowNum()) + + err := field.AppendRow(struct{}{}) + log.Warn("error", zap.Error(err)) + s.ErrorIs(err, merr.ErrParameterInvalid) + } + }) +} + +func (s *InsertDataSuite) TestMemorySize() { + s.Equal(s.iDataEmpty.Data[RowIDField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[TimestampField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[BoolField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[Int8Field].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[Int16Field].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[Int32Field].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[Int64Field].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[FloatField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[DoubleField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[StringField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[ArrayField].GetMemorySize(), 0) + s.Equal(s.iDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4) + s.Equal(s.iDataEmpty.Data[FloatVectorField].GetMemorySize(), 4) + s.Equal(s.iDataEmpty.Data[Float16VectorField].GetMemorySize(), 4) + + s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 8) + s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 8) + s.Equal(s.iDataOneRow.Data[BoolField].GetMemorySize(), 1) + s.Equal(s.iDataOneRow.Data[Int8Field].GetMemorySize(), 1) + s.Equal(s.iDataOneRow.Data[Int16Field].GetMemorySize(), 2) + s.Equal(s.iDataOneRow.Data[Int32Field].GetMemorySize(), 4) + s.Equal(s.iDataOneRow.Data[Int64Field].GetMemorySize(), 8) + s.Equal(s.iDataOneRow.Data[FloatField].GetMemorySize(), 4) + s.Equal(s.iDataOneRow.Data[DoubleField].GetMemorySize(), 8) + s.Equal(s.iDataOneRow.Data[StringField].GetMemorySize(), 19) + s.Equal(s.iDataOneRow.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16) + s.Equal(s.iDataOneRow.Data[ArrayField].GetMemorySize(), 3*4) + s.Equal(s.iDataOneRow.Data[BinaryVectorField].GetMemorySize(), 5) + s.Equal(s.iDataOneRow.Data[FloatVectorField].GetMemorySize(), 20) + s.Equal(s.iDataOneRow.Data[Float16VectorField].GetMemorySize(), 12) + + s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 16) + s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 16) + s.Equal(s.iDataTwoRows.Data[BoolField].GetMemorySize(), 2) + s.Equal(s.iDataTwoRows.Data[Int8Field].GetMemorySize(), 2) + s.Equal(s.iDataTwoRows.Data[Int16Field].GetMemorySize(), 4) + s.Equal(s.iDataTwoRows.Data[Int32Field].GetMemorySize(), 8) + s.Equal(s.iDataTwoRows.Data[Int64Field].GetMemorySize(), 16) + s.Equal(s.iDataTwoRows.Data[FloatField].GetMemorySize(), 8) + s.Equal(s.iDataTwoRows.Data[DoubleField].GetMemorySize(), 16) + s.Equal(s.iDataTwoRows.Data[StringField].GetMemorySize(), 38) + s.Equal(s.iDataTwoRows.Data[ArrayField].GetMemorySize(), 24) + s.Equal(s.iDataTwoRows.Data[BinaryVectorField].GetMemorySize(), 6) + s.Equal(s.iDataTwoRows.Data[FloatVectorField].GetMemorySize(), 36) + s.Equal(s.iDataTwoRows.Data[Float16VectorField].GetMemorySize(), 20) +} + +func (s *InsertDataSuite) SetupTest() { + var err error + s.iDataEmpty, err = NewInsertData(s.schema) + s.Require().NoError(err) + s.True(s.iDataEmpty.IsEmpty()) + s.Equal(0, s.iDataEmpty.GetRowNum()) + s.Equal(12, s.iDataEmpty.GetMemorySize()) + + row1 := map[FieldID]interface{}{ + RowIDField: int64(3), + TimestampField: int64(3), + BoolField: true, + Int8Field: int8(3), + Int16Field: int16(3), + Int32Field: int32(3), + Int64Field: int64(3), + FloatField: float32(3), + DoubleField: float64(3), + StringField: "str", + BinaryVectorField: []byte{0}, + FloatVectorField: []float32{4, 5, 6, 7}, + Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + ArrayField: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, + }, + }, + JSONField: []byte(`{"batch":3}`), + } + + s.iDataOneRow, err = NewInsertData(s.schema) + s.Require().NoError(err) + err = s.iDataOneRow.Append(row1) + s.Require().NoError(err) + + for fID, field := range s.iDataOneRow.Data { + s.Equal(row1[fID], field.GetRow(0)) + } + + row2 := map[FieldID]interface{}{ + RowIDField: int64(1), + TimestampField: int64(1), + BoolField: false, + Int8Field: int8(1), + Int16Field: int16(1), + Int32Field: int32(1), + Int64Field: int64(1), + FloatField: float32(1), + DoubleField: float64(1), + StringField: string("str"), + BinaryVectorField: []byte{0}, + FloatVectorField: []float32{4, 5, 6, 7}, + Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + ArrayField: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, + }, + }, + JSONField: []byte(`{"batch":1}`), + } + + s.iDataTwoRows, err = NewInsertData(s.schema) + s.Require().NoError(err) + err = s.iDataTwoRows.Append(row1) + s.Require().NoError(err) + err = s.iDataTwoRows.Append(row2) + s.Require().NoError(err) +} + +type ArrayFieldDataSuite struct { + suite.Suite +} + +func (s *ArrayFieldDataSuite) TestArrayFieldData() { + fieldID2Type := map[int64]schemapb.DataType{ + ArrayField + 1: schemapb.DataType_Bool, + ArrayField + 2: schemapb.DataType_Int8, + ArrayField + 3: schemapb.DataType_Int16, + ArrayField + 4: schemapb.DataType_Int32, + ArrayField + 5: schemapb.DataType_Int64, + ArrayField + 6: schemapb.DataType_Float, + ArrayField + 7: schemapb.DataType_Double, + ArrayField + 8: schemapb.DataType_VarChar, + } + + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: RowIDField, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: TimestampField, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: Int64Field, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + }, + } + + for fieldID, elementType := range fieldID2Type { + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: fieldID, + DataType: schemapb.DataType_Array, + ElementType: elementType, + }) + } + + insertData, err := NewInsertData(schema) + s.NoError(err) + + s.Equal(0, insertData.GetRowNum()) + s.Equal(0, insertData.GetMemorySize()) + s.True(insertData.IsEmpty()) + + fieldIDToData := map[int64]interface{}{ + RowIDField: int64(1), + TimestampField: int64(2), + Int64Field: int64(3), + ArrayField + 1: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_BoolData{ + BoolData: &schemapb.BoolArray{Data: []bool{true, false}}, + }, + }, + ArrayField + 2: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{0, 0}}, + }, + }, + ArrayField + 3: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{1, 1}}, + }, + }, + ArrayField + 4: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{2, 2}}, + }, + }, + ArrayField + 5: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_LongData{ + LongData: &schemapb.LongArray{Data: []int64{3, 3}}, + }, + }, + ArrayField + 6: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_FloatData{ + FloatData: &schemapb.FloatArray{Data: []float32{4, 4}}, + }, + }, + ArrayField + 7: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_DoubleData{ + DoubleData: &schemapb.DoubleArray{Data: []float64{5, 5}}, + }, + }, + ArrayField + 8: &schemapb.ScalarField{ + Data: &schemapb.ScalarField_StringData{ + StringData: &schemapb.StringArray{Data: []string{"6", "6"}}, + }, + }, + } + + err = insertData.Append(fieldIDToData) + s.NoError(err) + s.Equal(1, insertData.GetRowNum()) + s.Equal(114, insertData.GetMemorySize()) + s.False(insertData.IsEmpty()) +}