// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package importv2 import ( "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/testutil" "github.com/milvus-io/milvus/pkg/v2/common" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" ) func Test_AppendSystemFieldsData(t *testing.T) { const count = 100 pkField := &schemapb.FieldSchema{ FieldID: 100, Name: "pk", IsPrimaryKey: true, AutoID: true, } vecField := &schemapb.FieldSchema{ FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ { Key: common.DimKey, Value: "4", }, }, } int64Field := &schemapb.FieldSchema{ FieldID: 102, Name: "int64", DataType: schemapb.DataType_Int64, } schema := &schemapb.CollectionSchema{} task := &ImportTask{ req: &datapb.ImportRequest{ Ts: 1000, Schema: schema, }, allocator: allocator.NewLocalAllocator(0, count*2), } pkField.DataType = schemapb.DataType_Int64 schema.Fields = []*schemapb.FieldSchema{pkField, vecField, int64Field} insertData, err := testutil.CreateInsertData(schema, count) assert.NoError(t, err) assert.Equal(t, 0, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Nil(t, insertData.Data[common.RowIDField]) assert.Nil(t, insertData.Data[common.TimeStampField]) rowNum, _ := GetInsertDataRowCount(insertData, task.GetSchema()) err = AppendSystemFieldsData(task, insertData, rowNum) assert.NoError(t, err) assert.Equal(t, count, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Equal(t, count, insertData.Data[common.RowIDField].RowNum()) assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum()) pkField.DataType = schemapb.DataType_VarChar schema.Fields = []*schemapb.FieldSchema{pkField, vecField, int64Field} insertData, err = testutil.CreateInsertData(schema, count) assert.NoError(t, err) assert.Equal(t, 0, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Nil(t, insertData.Data[common.RowIDField]) assert.Nil(t, insertData.Data[common.TimeStampField]) rowNum, _ = GetInsertDataRowCount(insertData, task.GetSchema()) err = AppendSystemFieldsData(task, insertData, rowNum) assert.NoError(t, err) assert.Equal(t, count, insertData.Data[pkField.GetFieldID()].RowNum()) assert.Equal(t, count, insertData.Data[common.RowIDField].RowNum()) assert.Equal(t, count, insertData.Data[common.TimeStampField].RowNum()) } func Test_UnsetAutoID(t *testing.T) { pkField := &schemapb.FieldSchema{ FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: true, } vecField := &schemapb.FieldSchema{ FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector, } schema := &schemapb.CollectionSchema{} schema.Fields = []*schemapb.FieldSchema{pkField, vecField} UnsetAutoID(schema) for _, field := range schema.GetFields() { if field.GetIsPrimaryKey() { assert.False(t, schema.GetFields()[0].GetAutoID()) } } } func Test_PickSegment(t *testing.T) { const ( vchannel = "ch-0" partitionID = 10 ) task := &ImportTask{ req: &datapb.ImportRequest{ RequestSegments: []*datapb.ImportRequestSegment{ { SegmentID: 100, PartitionID: partitionID, Vchannel: vchannel, }, { SegmentID: 101, PartitionID: partitionID, Vchannel: vchannel, }, { SegmentID: 102, PartitionID: partitionID, Vchannel: vchannel, }, { SegmentID: 103, PartitionID: partitionID, Vchannel: vchannel, }, }, }, } importedSize := map[int64]int{} totalSize := 8 * 1024 * 1024 * 1024 batchSize := 1 * 1024 * 1024 for totalSize > 0 { picked, err := PickSegment(task.req.GetRequestSegments(), vchannel, partitionID) assert.NoError(t, err) importedSize[picked] += batchSize totalSize -= batchSize } expectSize := 2 * 1024 * 1024 * 1024 fn := func(actual int) { t.Logf("actual=%d, expect*0.8=%f, expect*1.2=%f", actual, float64(expectSize)*0.9, float64(expectSize)*1.1) assert.True(t, float64(actual) > float64(expectSize)*0.8) assert.True(t, float64(actual) < float64(expectSize)*1.2) } fn(importedSize[int64(100)]) fn(importedSize[int64(101)]) fn(importedSize[int64(102)]) fn(importedSize[int64(103)]) // test no candidate segments found _, err := PickSegment(task.req.GetRequestSegments(), "ch-2", 20) assert.Error(t, err) } func Test_AppendNullableDefaultFieldsData(t *testing.T) { buildSchemaFn := func() *schemapb.CollectionSchema { fields := make([]*schemapb.FieldSchema, 0) fields = append(fields, &schemapb.FieldSchema{ FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, AutoID: false, }) fields = append(fields, &schemapb.FieldSchema{ FieldID: 101, Name: "vec", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ { Key: common.DimKey, Value: "4", }, }, }) fields = append(fields, &schemapb.FieldSchema{ FieldID: 102, Name: "dummy", DataType: schemapb.DataType_Int32, Nullable: true, }) return &schemapb.CollectionSchema{ Fields: fields, } } const count = 10 tests := []struct { name string fieldID int64 dataType schemapb.DataType nullable bool defaultVal *schemapb.ValueField }{ // nullable tests { name: "bool is nullable", fieldID: 200, dataType: schemapb.DataType_Bool, nullable: true, }, { name: "int8 is nullable", fieldID: 200, dataType: schemapb.DataType_Int8, nullable: true, }, { name: "int16 is nullable", fieldID: 200, dataType: schemapb.DataType_Int16, nullable: true, }, { name: "int32 is nullable", fieldID: 200, dataType: schemapb.DataType_Int32, nullable: true, }, { name: "int64 is nullable", fieldID: 200, dataType: schemapb.DataType_Int64, nullable: true, defaultVal: nil, }, { name: "float is nullable", fieldID: 200, dataType: schemapb.DataType_Float, nullable: true, }, { name: "double is nullable", fieldID: 200, dataType: schemapb.DataType_Double, nullable: true, }, { name: "varchar is nullable", fieldID: 200, dataType: schemapb.DataType_VarChar, nullable: true, }, { name: "json is nullable", fieldID: 200, dataType: schemapb.DataType_JSON, nullable: true, }, { name: "array is nullable", fieldID: 200, dataType: schemapb.DataType_Array, nullable: true, }, // default value tests { name: "bool is default", fieldID: 200, dataType: schemapb.DataType_Bool, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_BoolData{ BoolData: true, }, }, }, { name: "int8 is default", fieldID: 200, dataType: schemapb.DataType_Int8, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_IntData{ IntData: 99, }, }, }, { name: "int16 is default", fieldID: 200, dataType: schemapb.DataType_Int16, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_IntData{ IntData: 99, }, }, }, { name: "int32 is default", fieldID: 200, dataType: schemapb.DataType_Int32, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_IntData{ IntData: 99, }, }, }, { name: "int64 is default", fieldID: 200, dataType: schemapb.DataType_Int64, nullable: true, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_LongData{ LongData: 99, }, }, }, { name: "float is default", fieldID: 200, dataType: schemapb.DataType_Float, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_FloatData{ FloatData: 99.99, }, }, }, { name: "double is default", fieldID: 200, dataType: schemapb.DataType_Double, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_DoubleData{ DoubleData: 99.99, }, }, }, { name: "varchar is default", fieldID: 200, dataType: schemapb.DataType_VarChar, defaultVal: &schemapb.ValueField{ Data: &schemapb.ValueField_StringData{ StringData: "hello world", }, }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { schema := buildSchemaFn() fieldSchema := &schemapb.FieldSchema{ FieldID: tt.fieldID, Name: fmt.Sprintf("field_%d", tt.fieldID), DataType: tt.dataType, Nullable: tt.nullable, DefaultValue: tt.defaultVal, } if tt.dataType == schemapb.DataType_Array { fieldSchema.ElementType = schemapb.DataType_Int64 fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxCapacityKey, Value: "100"}) } else if tt.dataType == schemapb.DataType_VarChar { fieldSchema.TypeParams = append(fieldSchema.TypeParams, &commonpb.KeyValuePair{Key: common.MaxLengthKey, Value: "100"}) } insertData, err := testutil.CreateInsertData(schema, count) assert.NoError(t, err) schema.Fields = append(schema.Fields, fieldSchema) fieldData, err := storage.NewFieldData(fieldSchema.GetDataType(), fieldSchema, 0) assert.NoError(t, err) insertData.Data[fieldSchema.GetFieldID()] = fieldData err = AppendNullableDefaultFieldsData(schema, insertData, count) assert.NoError(t, err) for fieldID, fieldData := range insertData.Data { if fieldID < int64(200) { continue } assert.Equal(t, count, fieldData.RowNum()) if tt.nullable { assert.True(t, fieldData.GetNullable()) } if tt.defaultVal != nil { switch tt.dataType { case schemapb.DataType_Bool: tempFieldData := fieldData.(*storage.BoolFieldData) for _, v := range tempFieldData.Data { assert.True(t, v) } case schemapb.DataType_Int8: tempFieldData := fieldData.(*storage.Int8FieldData) for _, v := range tempFieldData.Data { assert.Equal(t, int8(99), v) } case schemapb.DataType_Int16: tempFieldData := fieldData.(*storage.Int16FieldData) for _, v := range tempFieldData.Data { assert.Equal(t, int16(99), v) } case schemapb.DataType_Int32: tempFieldData := fieldData.(*storage.Int32FieldData) for _, v := range tempFieldData.Data { assert.Equal(t, int32(99), v) } case schemapb.DataType_Int64: tempFieldData := fieldData.(*storage.Int64FieldData) for _, v := range tempFieldData.Data { assert.Equal(t, int64(99), v) } case schemapb.DataType_Float: tempFieldData := fieldData.(*storage.FloatFieldData) for _, v := range tempFieldData.Data { assert.Equal(t, float32(99.99), v) } case schemapb.DataType_Double: tempFieldData := fieldData.(*storage.DoubleFieldData) for _, v := range tempFieldData.Data { assert.Equal(t, float64(99.99), v) } case schemapb.DataType_VarChar: tempFieldData := fieldData.(*storage.StringFieldData) for _, v := range tempFieldData.Data { assert.Equal(t, "hello world", v) } default: } } else if tt.nullable { for i := 0; i < count; i++ { assert.Nil(t, fieldData.GetRow(i)) } } } }) } }