mirror of https://github.com/milvus-io/milvus.git
Enhance InsertData and FieldData (#27436)
1. Add NewInsertData 2. Add GetRowNum(), GetMemorySize(), and, Append() for InsertData 3. Add AppendRow() for FieldData for compaction Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/27750/head
parent
efe8984eca
commit
2f16339aac
|
@ -102,214 +102,6 @@ func (b Blob) GetValue() []byte {
|
|||
return b.Value
|
||||
}
|
||||
|
||||
// FieldData defines field data interface
|
||||
type FieldData interface {
|
||||
GetMemorySize() int
|
||||
RowNum() int
|
||||
GetRow(i int) interface{}
|
||||
}
|
||||
|
||||
type BoolFieldData struct {
|
||||
Data []bool
|
||||
}
|
||||
type Int8FieldData struct {
|
||||
Data []int8
|
||||
}
|
||||
type Int16FieldData struct {
|
||||
Data []int16
|
||||
}
|
||||
type Int32FieldData struct {
|
||||
Data []int32
|
||||
}
|
||||
type Int64FieldData struct {
|
||||
Data []int64
|
||||
}
|
||||
type FloatFieldData struct {
|
||||
Data []float32
|
||||
}
|
||||
type DoubleFieldData struct {
|
||||
Data []float64
|
||||
}
|
||||
type StringFieldData struct {
|
||||
Data []string
|
||||
}
|
||||
type ArrayFieldData struct {
|
||||
ElementType schemapb.DataType
|
||||
Data []*schemapb.ScalarField
|
||||
}
|
||||
type JSONFieldData struct {
|
||||
Data [][]byte
|
||||
}
|
||||
type BinaryVectorFieldData struct {
|
||||
Data []byte
|
||||
Dim int
|
||||
}
|
||||
type FloatVectorFieldData struct {
|
||||
Data []float32
|
||||
Dim int
|
||||
}
|
||||
type Float16VectorFieldData struct {
|
||||
Data []byte
|
||||
Dim int
|
||||
}
|
||||
|
||||
// RowNum implements FieldData.RowNum
|
||||
func (data *BoolFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int8FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int16FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int32FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int64FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *FloatFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *DoubleFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *StringFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *BinaryVectorFieldData) RowNum() int { return len(data.Data) * 8 / data.Dim }
|
||||
func (data *FloatVectorFieldData) RowNum() int { return len(data.Data) / data.Dim }
|
||||
func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim }
|
||||
func (data *ArrayFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *JSONFieldData) RowNum() int { return len(data.Data) }
|
||||
|
||||
// GetRow implements FieldData.GetRow
|
||||
func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int8FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int16FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int32FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int64FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *FloatFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *StringFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *BinaryVectorFieldData) GetRow(i int) any {
|
||||
return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8]
|
||||
}
|
||||
|
||||
func (data *FloatVectorFieldData) GetRow(i int) any {
|
||||
return data.Data[i*data.Dim : (i+1)*data.Dim]
|
||||
}
|
||||
|
||||
func (data *Float16VectorFieldData) GetRow(i int) any {
|
||||
return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2]
|
||||
}
|
||||
|
||||
// why not binary.Size(data) directly? binary.Size(data) return -1
|
||||
// binary.Size returns how many bytes Write would generate to encode the value v, which
|
||||
// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data.
|
||||
// If v is neither of these, binary.Size returns -1.
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *BoolFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int8FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int16FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int32FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *Int64FieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
func (data *FloatFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
func (data *DoubleFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data)
|
||||
}
|
||||
|
||||
func (data *StringFieldData) GetMemorySize() int {
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
size += len(val) + 16
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (data *ArrayFieldData) GetMemorySize() int {
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
switch data.ElementType {
|
||||
case schemapb.DataType_Bool:
|
||||
size += binary.Size(val.GetBoolData().GetData())
|
||||
case schemapb.DataType_Int8:
|
||||
size += binary.Size(val.GetIntData().GetData()) / 4
|
||||
case schemapb.DataType_Int16:
|
||||
size += binary.Size(val.GetIntData().GetData()) / 2
|
||||
case schemapb.DataType_Int32:
|
||||
size += binary.Size(val.GetIntData().GetData())
|
||||
case schemapb.DataType_Float:
|
||||
size += binary.Size(val.GetFloatData().GetData())
|
||||
case schemapb.DataType_Double:
|
||||
size += binary.Size(val.GetDoubleData().GetData())
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
size += (&StringFieldData{Data: val.GetStringData().GetData()}).GetMemorySize()
|
||||
}
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (data *JSONFieldData) GetMemorySize() int {
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
size += len(val) + 16
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (data *BinaryVectorFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data) + 4
|
||||
}
|
||||
|
||||
func (data *FloatVectorFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data) + 4
|
||||
}
|
||||
|
||||
func (data *Float16VectorFieldData) GetMemorySize() int {
|
||||
return binary.Size(data.Data) + 4
|
||||
}
|
||||
|
||||
// system field id:
|
||||
// 0: unique row id
|
||||
// 1: timestamp
|
||||
// 100: first user field id
|
||||
// 101: second user field id
|
||||
// 102: ...
|
||||
|
||||
// TODO: fill it
|
||||
// info for each blob
|
||||
type BlobInfo struct {
|
||||
Length int
|
||||
}
|
||||
|
||||
// InsertData example row_schema: {float_field, int_field, float_vector_field, string_field}
|
||||
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
|
||||
type InsertData struct {
|
||||
// Todo, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArray
|
||||
Data map[FieldID]FieldData // field id to field data
|
||||
Infos []BlobInfo
|
||||
}
|
||||
|
||||
func (iData *InsertData) IsEmpty() bool {
|
||||
if iData == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
timeFieldData, ok := iData.Data[common.TimeStampField]
|
||||
return (!ok) || (timeFieldData.RowNum() <= 0)
|
||||
}
|
||||
|
||||
// InsertCodec serializes and deserializes the insert data
|
||||
// Blob key example:
|
||||
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
|
||||
|
|
|
@ -24,8 +24,10 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/etcdpb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
)
|
||||
|
||||
|
@ -50,8 +52,8 @@ const (
|
|||
Float16VectorField = 112
|
||||
)
|
||||
|
||||
func TestInsertCodec(t *testing.T) {
|
||||
schema := &etcdpb.CollectionMeta{
|
||||
func genTestCollectionMeta() *etcdpb.CollectionMeta {
|
||||
return &etcdpb.CollectionMeta{
|
||||
ID: CollectionID,
|
||||
CreateTime: 1,
|
||||
SegmentIDs: []int64{SegmentID},
|
||||
|
@ -62,46 +64,40 @@ func TestInsertCodec(t *testing.T) {
|
|||
AutoID: true,
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: RowIDField,
|
||||
Name: "row_id",
|
||||
IsPrimaryKey: false,
|
||||
Description: "row_id",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
FieldID: RowIDField,
|
||||
Name: "row_id",
|
||||
Description: "row_id",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: TimestampField,
|
||||
Name: "Timestamp",
|
||||
IsPrimaryKey: false,
|
||||
Description: "Timestamp",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
FieldID: TimestampField,
|
||||
Name: "Timestamp",
|
||||
Description: "Timestamp",
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: BoolField,
|
||||
Name: "field_bool",
|
||||
IsPrimaryKey: false,
|
||||
Description: "bool",
|
||||
DataType: schemapb.DataType_Bool,
|
||||
FieldID: BoolField,
|
||||
Name: "field_bool",
|
||||
Description: "bool",
|
||||
DataType: schemapb.DataType_Bool,
|
||||
},
|
||||
{
|
||||
FieldID: Int8Field,
|
||||
Name: "field_int8",
|
||||
IsPrimaryKey: false,
|
||||
Description: "int8",
|
||||
DataType: schemapb.DataType_Int8,
|
||||
FieldID: Int8Field,
|
||||
Name: "field_int8",
|
||||
Description: "int8",
|
||||
DataType: schemapb.DataType_Int8,
|
||||
},
|
||||
{
|
||||
FieldID: Int16Field,
|
||||
Name: "field_int16",
|
||||
IsPrimaryKey: false,
|
||||
Description: "int16",
|
||||
DataType: schemapb.DataType_Int16,
|
||||
FieldID: Int16Field,
|
||||
Name: "field_int16",
|
||||
Description: "int16",
|
||||
DataType: schemapb.DataType_Int16,
|
||||
},
|
||||
{
|
||||
FieldID: Int32Field,
|
||||
Name: "field_int32",
|
||||
IsPrimaryKey: false,
|
||||
Description: "int32",
|
||||
DataType: schemapb.DataType_Int32,
|
||||
FieldID: Int32Field,
|
||||
Name: "field_int32",
|
||||
Description: "int32",
|
||||
DataType: schemapb.DataType_Int32,
|
||||
},
|
||||
{
|
||||
FieldID: Int64Field,
|
||||
|
@ -111,25 +107,22 @@ func TestInsertCodec(t *testing.T) {
|
|||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: FloatField,
|
||||
Name: "field_float",
|
||||
IsPrimaryKey: false,
|
||||
Description: "float",
|
||||
DataType: schemapb.DataType_Float,
|
||||
FieldID: FloatField,
|
||||
Name: "field_float",
|
||||
Description: "float",
|
||||
DataType: schemapb.DataType_Float,
|
||||
},
|
||||
{
|
||||
FieldID: DoubleField,
|
||||
Name: "field_double",
|
||||
IsPrimaryKey: false,
|
||||
Description: "double",
|
||||
DataType: schemapb.DataType_Double,
|
||||
FieldID: DoubleField,
|
||||
Name: "field_double",
|
||||
Description: "double",
|
||||
DataType: schemapb.DataType_Double,
|
||||
},
|
||||
{
|
||||
FieldID: StringField,
|
||||
Name: "field_string",
|
||||
IsPrimaryKey: false,
|
||||
Description: "string",
|
||||
DataType: schemapb.DataType_String,
|
||||
FieldID: StringField,
|
||||
Name: "field_string",
|
||||
Description: "string",
|
||||
DataType: schemapb.DataType_String,
|
||||
},
|
||||
{
|
||||
FieldID: ArrayField,
|
||||
|
@ -145,29 +138,48 @@ func TestInsertCodec(t *testing.T) {
|
|||
DataType: schemapb.DataType_JSON,
|
||||
},
|
||||
{
|
||||
FieldID: BinaryVectorField,
|
||||
Name: "field_binary_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "binary_vector",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
FieldID: BinaryVectorField,
|
||||
Name: "field_binary_vector",
|
||||
Description: "binary_vector",
|
||||
DataType: schemapb.DataType_BinaryVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: "8",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: FloatVectorField,
|
||||
Name: "field_float_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "float_vector",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
FieldID: FloatVectorField,
|
||||
Name: "field_float_vector",
|
||||
Description: "float_vector",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: "4",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
FieldID: Float16VectorField,
|
||||
Name: "field_float16_vector",
|
||||
IsPrimaryKey: false,
|
||||
Description: "float16_vector",
|
||||
DataType: schemapb.DataType_Float16Vector,
|
||||
FieldID: Float16VectorField,
|
||||
Name: "field_float16_vector",
|
||||
Description: "float16_vector",
|
||||
DataType: schemapb.DataType_Float16Vector,
|
||||
TypeParams: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.DimKey,
|
||||
Value: "4",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestInsertCodec(t *testing.T) {
|
||||
schema := genTestCollectionMeta()
|
||||
insertCodec := NewInsertCodecWithSchema(schema)
|
||||
insertData1 := &InsertData{
|
||||
Data: map[int64]FieldData{
|
||||
|
|
|
@ -0,0 +1,463 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/common"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
// TODO: fill it
|
||||
// info for each blob
|
||||
type BlobInfo struct {
|
||||
Length int
|
||||
}
|
||||
|
||||
// InsertData example row_schema: {float_field, int_field, float_vector_field, string_field}
|
||||
// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
|
||||
//
|
||||
// system filed id:
|
||||
// 0: unique row id
|
||||
// 1: timestamp
|
||||
// 100: first user field id
|
||||
// 101: second user field id
|
||||
// 102: ...
|
||||
type InsertData struct {
|
||||
// TODO, data should be zero copy by passing data directly to event reader or change Data to map[FieldID]FieldDataArray
|
||||
Data map[FieldID]FieldData // field id to field data
|
||||
Infos []BlobInfo
|
||||
}
|
||||
|
||||
func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error) {
|
||||
if schema == nil {
|
||||
return nil, fmt.Errorf("Nil input schema")
|
||||
}
|
||||
|
||||
idata := &InsertData{
|
||||
Data: make(map[FieldID]FieldData),
|
||||
}
|
||||
|
||||
for _, fSchema := range schema.Fields {
|
||||
fieldData, err := NewFieldData(fSchema.DataType, fSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idata.Data[fSchema.FieldID] = fieldData
|
||||
}
|
||||
return idata, nil
|
||||
}
|
||||
|
||||
func (iData *InsertData) IsEmpty() bool {
|
||||
if iData == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
timeFieldData, ok := iData.Data[common.TimeStampField]
|
||||
return (!ok) || (timeFieldData.RowNum() <= 0)
|
||||
}
|
||||
|
||||
func (i *InsertData) GetRowNum() int {
|
||||
if i.Data == nil || len(i.Data) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
data, ok := i.Data[common.RowIDField]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
|
||||
return data.RowNum()
|
||||
}
|
||||
|
||||
func (i *InsertData) GetMemorySize() int {
|
||||
var size int
|
||||
if i.Data == nil || len(i.Data) == 0 {
|
||||
return size
|
||||
}
|
||||
|
||||
for _, data := range i.Data {
|
||||
size += data.GetMemorySize()
|
||||
}
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
func (i *InsertData) Append(row map[FieldID]interface{}) error {
|
||||
for fID, v := range row {
|
||||
field, ok := i.Data[fID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Missing field when appending row, got %d", fID)
|
||||
}
|
||||
|
||||
if err := field.AppendRow(v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FieldData defines field data interface
|
||||
type FieldData interface {
|
||||
GetMemorySize() int
|
||||
RowNum() int
|
||||
GetRow(i int) any
|
||||
AppendRow(row interface{}) error
|
||||
}
|
||||
|
||||
func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) (FieldData, error) {
|
||||
typeParams := fieldSchema.GetTypeParams()
|
||||
switch dataType {
|
||||
case schemapb.DataType_Float16Vector:
|
||||
dim, err := GetDimFromParams(typeParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Float16VectorFieldData{
|
||||
Data: make([]byte, 0),
|
||||
Dim: dim,
|
||||
}, nil
|
||||
case schemapb.DataType_FloatVector:
|
||||
dim, err := GetDimFromParams(typeParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &FloatVectorFieldData{
|
||||
Data: make([]float32, 0),
|
||||
Dim: dim,
|
||||
}, nil
|
||||
case schemapb.DataType_BinaryVector:
|
||||
dim, err := GetDimFromParams(typeParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &BinaryVectorFieldData{
|
||||
Data: make([]byte, 0),
|
||||
Dim: dim,
|
||||
}, nil
|
||||
|
||||
case schemapb.DataType_Bool:
|
||||
return &BoolFieldData{
|
||||
Data: make([]bool, 0),
|
||||
}, nil
|
||||
|
||||
case schemapb.DataType_Int8:
|
||||
return &Int8FieldData{
|
||||
Data: make([]int8, 0),
|
||||
}, nil
|
||||
|
||||
case schemapb.DataType_Int16:
|
||||
return &Int16FieldData{
|
||||
Data: make([]int16, 0),
|
||||
}, nil
|
||||
|
||||
case schemapb.DataType_Int32:
|
||||
return &Int32FieldData{
|
||||
Data: make([]int32, 0),
|
||||
}, nil
|
||||
|
||||
case schemapb.DataType_Int64:
|
||||
return &Int64FieldData{
|
||||
Data: make([]int64, 0),
|
||||
}, nil
|
||||
case schemapb.DataType_Float:
|
||||
return &FloatFieldData{
|
||||
Data: make([]float32, 0),
|
||||
}, nil
|
||||
|
||||
case schemapb.DataType_Double:
|
||||
return &DoubleFieldData{
|
||||
Data: make([]float64, 0),
|
||||
}, nil
|
||||
case schemapb.DataType_JSON:
|
||||
return &JSONFieldData{
|
||||
Data: make([][]byte, 0),
|
||||
}, nil
|
||||
case schemapb.DataType_Array:
|
||||
return &ArrayFieldData{
|
||||
Data: make([]*schemapb.ScalarField, 0),
|
||||
ElementType: fieldSchema.GetElementType(),
|
||||
}, nil
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
return &StringFieldData{
|
||||
Data: make([]string, 0),
|
||||
}, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("Unexpected schema data type: %d", dataType)
|
||||
}
|
||||
}
|
||||
|
||||
type BoolFieldData struct {
|
||||
Data []bool
|
||||
}
|
||||
type Int8FieldData struct {
|
||||
Data []int8
|
||||
}
|
||||
type Int16FieldData struct {
|
||||
Data []int16
|
||||
}
|
||||
type Int32FieldData struct {
|
||||
Data []int32
|
||||
}
|
||||
type Int64FieldData struct {
|
||||
Data []int64
|
||||
}
|
||||
type FloatFieldData struct {
|
||||
Data []float32
|
||||
}
|
||||
type DoubleFieldData struct {
|
||||
Data []float64
|
||||
}
|
||||
type StringFieldData struct {
|
||||
Data []string
|
||||
}
|
||||
type ArrayFieldData struct {
|
||||
ElementType schemapb.DataType
|
||||
Data []*schemapb.ScalarField
|
||||
}
|
||||
type JSONFieldData struct {
|
||||
Data [][]byte
|
||||
}
|
||||
type BinaryVectorFieldData struct {
|
||||
Data []byte
|
||||
Dim int
|
||||
}
|
||||
type FloatVectorFieldData struct {
|
||||
Data []float32
|
||||
Dim int
|
||||
}
|
||||
type Float16VectorFieldData struct {
|
||||
Data []byte
|
||||
Dim int
|
||||
}
|
||||
|
||||
// RowNum implements FieldData.RowNum
|
||||
func (data *BoolFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int8FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int16FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int32FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *Int64FieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *FloatFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *DoubleFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *StringFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *ArrayFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *JSONFieldData) RowNum() int { return len(data.Data) }
|
||||
func (data *BinaryVectorFieldData) RowNum() int { return len(data.Data) * 8 / data.Dim }
|
||||
func (data *FloatVectorFieldData) RowNum() int { return len(data.Data) / data.Dim }
|
||||
func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim }
|
||||
|
||||
// GetRow implements FieldData.GetRow
|
||||
func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int8FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int16FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int32FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *Int64FieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *FloatFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *StringFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] }
|
||||
func (data *BinaryVectorFieldData) GetRow(i int) interface{} {
|
||||
return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8]
|
||||
}
|
||||
|
||||
func (data *FloatVectorFieldData) GetRow(i int) interface{} {
|
||||
return data.Data[i*data.Dim : (i+1)*data.Dim]
|
||||
}
|
||||
|
||||
func (data *Float16VectorFieldData) GetRow(i int) interface{} {
|
||||
return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2]
|
||||
}
|
||||
|
||||
// AppendRow implements FieldData.AppendRow
|
||||
func (data *BoolFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(bool)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("bool", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *Int8FieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(int8)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("int8", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *Int16FieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(int16)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("int16", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *Int32FieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(int32)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("int32", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *Int64FieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(int64)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("int64", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *FloatFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(float32)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("float32", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *DoubleFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(float64)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("float64", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *StringFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(string)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("string", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *ArrayFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.(*schemapb.ScalarField)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("*schemapb.ScalarField", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *JSONFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.([]byte)
|
||||
if !ok {
|
||||
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *BinaryVectorFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.([]byte)
|
||||
if !ok || len(v) != data.Dim/8 {
|
||||
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *FloatVectorFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.([]float32)
|
||||
if !ok || len(v) != data.Dim {
|
||||
return merr.WrapErrParameterInvalid("[]float32", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *Float16VectorFieldData) AppendRow(row interface{}) error {
|
||||
v, ok := row.([]byte)
|
||||
if !ok || len(v) != data.Dim*2 {
|
||||
return merr.WrapErrParameterInvalid("[]byte", row, "Wrong row type")
|
||||
}
|
||||
data.Data = append(data.Data, v...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMemorySize implements FieldData.GetMemorySize
|
||||
func (data *BoolFieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *Int8FieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *Int16FieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *Int32FieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *Int64FieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *FloatFieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *DoubleFieldData) GetMemorySize() int { return binary.Size(data.Data) }
|
||||
func (data *BinaryVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
|
||||
func (data *FloatVectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
|
||||
func (data *Float16VectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 }
|
||||
|
||||
// why not binary.Size(data) directly? binary.Size(data) return -1
|
||||
// binary.Size returns how many bytes Write would generate to encode the value v, which
|
||||
// must be a fixed-size value or a slice of fixed-size values, or a pointer to such data.
|
||||
// If v is neither of these, binary.Size returns -1.
|
||||
func (data *StringFieldData) GetMemorySize() int {
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
size += len(val) + 16
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (data *ArrayFieldData) GetMemorySize() int {
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
switch data.ElementType {
|
||||
case schemapb.DataType_Bool:
|
||||
size += binary.Size(val.GetBoolData().GetData())
|
||||
case schemapb.DataType_Int8:
|
||||
size += binary.Size(val.GetIntData().GetData()) / 4
|
||||
case schemapb.DataType_Int16:
|
||||
size += binary.Size(val.GetIntData().GetData()) / 2
|
||||
case schemapb.DataType_Int32:
|
||||
size += binary.Size(val.GetIntData().GetData())
|
||||
case schemapb.DataType_Int64:
|
||||
size += binary.Size(val.GetLongData().GetData())
|
||||
case schemapb.DataType_Float:
|
||||
size += binary.Size(val.GetFloatData().GetData())
|
||||
case schemapb.DataType_Double:
|
||||
size += binary.Size(val.GetDoubleData().GetData())
|
||||
case schemapb.DataType_String, schemapb.DataType_VarChar:
|
||||
size += (&StringFieldData{Data: val.GetStringData().GetData()}).GetMemorySize()
|
||||
}
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
func (data *JSONFieldData) GetMemorySize() int {
|
||||
var size int
|
||||
for _, val := range data.Data {
|
||||
size += len(val) + 16
|
||||
}
|
||||
return size
|
||||
}
|
|
@ -0,0 +1,319 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
)
|
||||
|
||||
func TestInsertDataSuite(t *testing.T) {
|
||||
suite.Run(t, new(InsertDataSuite))
|
||||
}
|
||||
|
||||
func TestArrayFieldDataSuite(t *testing.T) {
|
||||
suite.Run(t, new(ArrayFieldDataSuite))
|
||||
}
|
||||
|
||||
type InsertDataSuite struct {
|
||||
suite.Suite
|
||||
|
||||
schema *schemapb.CollectionSchema
|
||||
|
||||
iDataOneRow *InsertData
|
||||
iDataTwoRows *InsertData
|
||||
iDataEmpty *InsertData
|
||||
}
|
||||
|
||||
func (s *InsertDataSuite) SetupSuite() {
|
||||
s.schema = genTestCollectionMeta().Schema
|
||||
}
|
||||
|
||||
func (s *InsertDataSuite) TestInsertData() {
|
||||
s.Run("nil schema", func() {
|
||||
idata, err := NewInsertData(nil)
|
||||
s.Error(err)
|
||||
s.Nil(idata)
|
||||
})
|
||||
|
||||
s.Run("invalid schema", func() {
|
||||
tests := []struct {
|
||||
description string
|
||||
invalidType schemapb.DataType
|
||||
}{
|
||||
{"binary vector without dim", schemapb.DataType_BinaryVector},
|
||||
{"float vector without dim", schemapb.DataType_FloatVector},
|
||||
{"float16 vector without dim", schemapb.DataType_Float16Vector},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
s.Run(test.description, func() {
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
DataType: test.invalidType,
|
||||
},
|
||||
},
|
||||
}
|
||||
idata, err := NewInsertData(schema)
|
||||
s.Error(err)
|
||||
s.Nil(idata)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
s.Run("empty iData", func() {
|
||||
idata := &InsertData{}
|
||||
s.True(idata.IsEmpty())
|
||||
s.Equal(0, idata.GetRowNum())
|
||||
s.Equal(0, idata.GetMemorySize())
|
||||
|
||||
err := idata.Append(map[FieldID]interface{}{1: struct{}{}})
|
||||
s.Error(err)
|
||||
})
|
||||
|
||||
s.Run("init by New", func() {
|
||||
s.True(s.iDataEmpty.IsEmpty())
|
||||
s.Equal(0, s.iDataEmpty.GetRowNum())
|
||||
s.Equal(12, s.iDataEmpty.GetMemorySize())
|
||||
|
||||
s.False(s.iDataOneRow.IsEmpty())
|
||||
s.Equal(1, s.iDataOneRow.GetRowNum())
|
||||
s.Equal(139, s.iDataOneRow.GetMemorySize())
|
||||
|
||||
s.False(s.iDataTwoRows.IsEmpty())
|
||||
s.Equal(2, s.iDataTwoRows.GetRowNum())
|
||||
s.Equal(266, s.iDataTwoRows.GetMemorySize())
|
||||
|
||||
for _, field := range s.iDataTwoRows.Data {
|
||||
s.Equal(2, field.RowNum())
|
||||
|
||||
err := field.AppendRow(struct{}{})
|
||||
log.Warn("error", zap.Error(err))
|
||||
s.ErrorIs(err, merr.ErrParameterInvalid)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *InsertDataSuite) TestMemorySize() {
|
||||
s.Equal(s.iDataEmpty.Data[RowIDField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[TimestampField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[BoolField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[Int8Field].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[Int16Field].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[Int32Field].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[Int64Field].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[FloatField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[DoubleField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[StringField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[ArrayField].GetMemorySize(), 0)
|
||||
s.Equal(s.iDataEmpty.Data[BinaryVectorField].GetMemorySize(), 4)
|
||||
s.Equal(s.iDataEmpty.Data[FloatVectorField].GetMemorySize(), 4)
|
||||
s.Equal(s.iDataEmpty.Data[Float16VectorField].GetMemorySize(), 4)
|
||||
|
||||
s.Equal(s.iDataOneRow.Data[RowIDField].GetMemorySize(), 8)
|
||||
s.Equal(s.iDataOneRow.Data[TimestampField].GetMemorySize(), 8)
|
||||
s.Equal(s.iDataOneRow.Data[BoolField].GetMemorySize(), 1)
|
||||
s.Equal(s.iDataOneRow.Data[Int8Field].GetMemorySize(), 1)
|
||||
s.Equal(s.iDataOneRow.Data[Int16Field].GetMemorySize(), 2)
|
||||
s.Equal(s.iDataOneRow.Data[Int32Field].GetMemorySize(), 4)
|
||||
s.Equal(s.iDataOneRow.Data[Int64Field].GetMemorySize(), 8)
|
||||
s.Equal(s.iDataOneRow.Data[FloatField].GetMemorySize(), 4)
|
||||
s.Equal(s.iDataOneRow.Data[DoubleField].GetMemorySize(), 8)
|
||||
s.Equal(s.iDataOneRow.Data[StringField].GetMemorySize(), 19)
|
||||
s.Equal(s.iDataOneRow.Data[JSONField].GetMemorySize(), len([]byte(`{"batch":1}`))+16)
|
||||
s.Equal(s.iDataOneRow.Data[ArrayField].GetMemorySize(), 3*4)
|
||||
s.Equal(s.iDataOneRow.Data[BinaryVectorField].GetMemorySize(), 5)
|
||||
s.Equal(s.iDataOneRow.Data[FloatVectorField].GetMemorySize(), 20)
|
||||
s.Equal(s.iDataOneRow.Data[Float16VectorField].GetMemorySize(), 12)
|
||||
|
||||
s.Equal(s.iDataTwoRows.Data[RowIDField].GetMemorySize(), 16)
|
||||
s.Equal(s.iDataTwoRows.Data[TimestampField].GetMemorySize(), 16)
|
||||
s.Equal(s.iDataTwoRows.Data[BoolField].GetMemorySize(), 2)
|
||||
s.Equal(s.iDataTwoRows.Data[Int8Field].GetMemorySize(), 2)
|
||||
s.Equal(s.iDataTwoRows.Data[Int16Field].GetMemorySize(), 4)
|
||||
s.Equal(s.iDataTwoRows.Data[Int32Field].GetMemorySize(), 8)
|
||||
s.Equal(s.iDataTwoRows.Data[Int64Field].GetMemorySize(), 16)
|
||||
s.Equal(s.iDataTwoRows.Data[FloatField].GetMemorySize(), 8)
|
||||
s.Equal(s.iDataTwoRows.Data[DoubleField].GetMemorySize(), 16)
|
||||
s.Equal(s.iDataTwoRows.Data[StringField].GetMemorySize(), 38)
|
||||
s.Equal(s.iDataTwoRows.Data[ArrayField].GetMemorySize(), 24)
|
||||
s.Equal(s.iDataTwoRows.Data[BinaryVectorField].GetMemorySize(), 6)
|
||||
s.Equal(s.iDataTwoRows.Data[FloatVectorField].GetMemorySize(), 36)
|
||||
s.Equal(s.iDataTwoRows.Data[Float16VectorField].GetMemorySize(), 20)
|
||||
}
|
||||
|
||||
func (s *InsertDataSuite) SetupTest() {
|
||||
var err error
|
||||
s.iDataEmpty, err = NewInsertData(s.schema)
|
||||
s.Require().NoError(err)
|
||||
s.True(s.iDataEmpty.IsEmpty())
|
||||
s.Equal(0, s.iDataEmpty.GetRowNum())
|
||||
s.Equal(12, s.iDataEmpty.GetMemorySize())
|
||||
|
||||
row1 := map[FieldID]interface{}{
|
||||
RowIDField: int64(3),
|
||||
TimestampField: int64(3),
|
||||
BoolField: true,
|
||||
Int8Field: int8(3),
|
||||
Int16Field: int16(3),
|
||||
Int32Field: int32(3),
|
||||
Int64Field: int64(3),
|
||||
FloatField: float32(3),
|
||||
DoubleField: float64(3),
|
||||
StringField: "str",
|
||||
BinaryVectorField: []byte{0},
|
||||
FloatVectorField: []float32{4, 5, 6, 7},
|
||||
Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255},
|
||||
ArrayField: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},
|
||||
},
|
||||
},
|
||||
JSONField: []byte(`{"batch":3}`),
|
||||
}
|
||||
|
||||
s.iDataOneRow, err = NewInsertData(s.schema)
|
||||
s.Require().NoError(err)
|
||||
err = s.iDataOneRow.Append(row1)
|
||||
s.Require().NoError(err)
|
||||
|
||||
for fID, field := range s.iDataOneRow.Data {
|
||||
s.Equal(row1[fID], field.GetRow(0))
|
||||
}
|
||||
|
||||
row2 := map[FieldID]interface{}{
|
||||
RowIDField: int64(1),
|
||||
TimestampField: int64(1),
|
||||
BoolField: false,
|
||||
Int8Field: int8(1),
|
||||
Int16Field: int16(1),
|
||||
Int32Field: int32(1),
|
||||
Int64Field: int64(1),
|
||||
FloatField: float32(1),
|
||||
DoubleField: float64(1),
|
||||
StringField: string("str"),
|
||||
BinaryVectorField: []byte{0},
|
||||
FloatVectorField: []float32{4, 5, 6, 7},
|
||||
Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8},
|
||||
ArrayField: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}},
|
||||
},
|
||||
},
|
||||
JSONField: []byte(`{"batch":1}`),
|
||||
}
|
||||
|
||||
s.iDataTwoRows, err = NewInsertData(s.schema)
|
||||
s.Require().NoError(err)
|
||||
err = s.iDataTwoRows.Append(row1)
|
||||
s.Require().NoError(err)
|
||||
err = s.iDataTwoRows.Append(row2)
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
|
||||
type ArrayFieldDataSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func (s *ArrayFieldDataSuite) TestArrayFieldData() {
|
||||
fieldID2Type := map[int64]schemapb.DataType{
|
||||
ArrayField + 1: schemapb.DataType_Bool,
|
||||
ArrayField + 2: schemapb.DataType_Int8,
|
||||
ArrayField + 3: schemapb.DataType_Int16,
|
||||
ArrayField + 4: schemapb.DataType_Int32,
|
||||
ArrayField + 5: schemapb.DataType_Int64,
|
||||
ArrayField + 6: schemapb.DataType_Float,
|
||||
ArrayField + 7: schemapb.DataType_Double,
|
||||
ArrayField + 8: schemapb.DataType_VarChar,
|
||||
}
|
||||
|
||||
schema := &schemapb.CollectionSchema{
|
||||
Fields: []*schemapb.FieldSchema{
|
||||
{
|
||||
FieldID: RowIDField,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: TimestampField,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
{
|
||||
FieldID: Int64Field,
|
||||
IsPrimaryKey: true,
|
||||
DataType: schemapb.DataType_Int64,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for fieldID, elementType := range fieldID2Type {
|
||||
schema.Fields = append(schema.Fields, &schemapb.FieldSchema{
|
||||
FieldID: fieldID,
|
||||
DataType: schemapb.DataType_Array,
|
||||
ElementType: elementType,
|
||||
})
|
||||
}
|
||||
|
||||
insertData, err := NewInsertData(schema)
|
||||
s.NoError(err)
|
||||
|
||||
s.Equal(0, insertData.GetRowNum())
|
||||
s.Equal(0, insertData.GetMemorySize())
|
||||
s.True(insertData.IsEmpty())
|
||||
|
||||
fieldIDToData := map[int64]interface{}{
|
||||
RowIDField: int64(1),
|
||||
TimestampField: int64(2),
|
||||
Int64Field: int64(3),
|
||||
ArrayField + 1: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_BoolData{
|
||||
BoolData: &schemapb.BoolArray{Data: []bool{true, false}},
|
||||
},
|
||||
},
|
||||
ArrayField + 2: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{0, 0}},
|
||||
},
|
||||
},
|
||||
ArrayField + 3: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{1, 1}},
|
||||
},
|
||||
},
|
||||
ArrayField + 4: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_IntData{
|
||||
IntData: &schemapb.IntArray{Data: []int32{2, 2}},
|
||||
},
|
||||
},
|
||||
ArrayField + 5: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_LongData{
|
||||
LongData: &schemapb.LongArray{Data: []int64{3, 3}},
|
||||
},
|
||||
},
|
||||
ArrayField + 6: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_FloatData{
|
||||
FloatData: &schemapb.FloatArray{Data: []float32{4, 4}},
|
||||
},
|
||||
},
|
||||
ArrayField + 7: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_DoubleData{
|
||||
DoubleData: &schemapb.DoubleArray{Data: []float64{5, 5}},
|
||||
},
|
||||
},
|
||||
ArrayField + 8: &schemapb.ScalarField{
|
||||
Data: &schemapb.ScalarField_StringData{
|
||||
StringData: &schemapb.StringArray{Data: []string{"6", "6"}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = insertData.Append(fieldIDToData)
|
||||
s.NoError(err)
|
||||
s.Equal(1, insertData.GetRowNum())
|
||||
s.Equal(114, insertData.GetMemorySize())
|
||||
s.False(insertData.IsEmpty())
|
||||
}
|
Loading…
Reference in New Issue