mirror of https://github.com/milvus-io/milvus.git
test: add cases for gosdk v2 partial load (#37924)
issue: #33419 Signed-off-by: ThreadDao <yufen.zong@zilliz.com>pull/38021/head
parent
cb6542339e
commit
c5327ff150
|
@ -186,7 +186,7 @@ func (chainTask *CollectionPrepare) Load(ctx context.Context, t *testing.T, mc *
|
|||
if lp.CollectionName == "" {
|
||||
log.Fatal("[Load] Empty collection name is not expected")
|
||||
}
|
||||
loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(lp.CollectionName).WithReplica(lp.Replica))
|
||||
loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(lp.CollectionName).WithReplica(lp.Replica).WithLoadFields(lp.LoadFields...).WithSkipLoadDynamicField(lp.SkipLoadDynamicField))
|
||||
common.CheckErr(t, err, true)
|
||||
err = loadTask.Await(ctx)
|
||||
common.CheckErr(t, err, true)
|
||||
|
|
|
@ -6,8 +6,10 @@ import (
|
|||
)
|
||||
|
||||
type LoadParams struct {
|
||||
CollectionName string
|
||||
Replica int
|
||||
CollectionName string
|
||||
Replica int
|
||||
LoadFields []string
|
||||
SkipLoadDynamicField bool
|
||||
}
|
||||
|
||||
func NewLoadParams(collectionName string) *LoadParams {
|
||||
|
@ -21,6 +23,16 @@ func (opt *LoadParams) TWithReplica(replica int) *LoadParams {
|
|||
return opt
|
||||
}
|
||||
|
||||
func (opt *LoadParams) TWithLoadFields(loadFields ...string) *LoadParams {
|
||||
opt.LoadFields = loadFields
|
||||
return opt
|
||||
}
|
||||
|
||||
func (opt *LoadParams) TWithSkipLoadDynamicField(skipFlag bool) *LoadParams {
|
||||
opt.SkipLoadDynamicField = skipFlag
|
||||
return opt
|
||||
}
|
||||
|
||||
// GenSearchVectors gen search vectors
|
||||
func GenSearchVectors(nq int, dim int, dataType entity.FieldType) []entity.Vector {
|
||||
vectors := make([]entity.Vector, 0, nq)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package testcases
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -108,17 +109,8 @@ func TestLoadCollectionMultiPartitions(t *testing.T) {
|
|||
common.CheckErr(t, err, true)
|
||||
|
||||
// insert [0, nb) into default partition, [nb, nb*2) into new partition
|
||||
_defVec := hp.GenColumnData(common.DefaultNb, entity.FieldTypeFloatVector, *hp.TNewDataOption())
|
||||
_defPk := hp.GenColumnData(common.DefaultNb, entity.FieldTypeInt64, *hp.TNewDataOption())
|
||||
insertRes1, err1 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_defPk, _defVec))
|
||||
common.CheckErr(t, err1, true)
|
||||
require.EqualValues(t, common.DefaultNb, insertRes1.InsertCount)
|
||||
|
||||
_parPk := hp.GenColumnData(common.DefaultNb, entity.FieldTypeInt64, *hp.TNewDataOption().TWithStart(common.DefaultNb))
|
||||
_parVec := hp.GenColumnData(common.DefaultNb, entity.FieldTypeFloatVector, *hp.TNewDataOption().TWithStart(common.DefaultNb))
|
||||
insertRes2, err2 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_parPk, _parVec).WithPartition(parName))
|
||||
common.CheckErr(t, err2, true)
|
||||
require.EqualValues(t, common.DefaultNb, insertRes2.InsertCount)
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb))
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema).TWithPartitionName(parName), hp.TNewDataOption().TWithNb(common.DefaultNb).TWithStart(common.DefaultNb))
|
||||
|
||||
// create index
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
|
@ -262,6 +254,231 @@ func TestLoadCollectionSparse(t *testing.T) {
|
|||
require.EqualValues(t, common.DefaultNb, count)
|
||||
}
|
||||
|
||||
func TestLoadPartialFields(t *testing.T) {
|
||||
t.Skip("https://github.com/milvus-io/milvus/issues/37853")
|
||||
/*
|
||||
1. verify the collection loaded successfully
|
||||
2. verify the loaded fields can be searched in expr and output_fields
|
||||
3. verify the skip fields not loaded, and cannot search/query/hybrid search with them in expr or output_fields
|
||||
*/
|
||||
// create collection -> insert -> flush -> index
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
||||
prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.AllFields), hp.TNewFieldsOption(), hp.TNewSchemaOption())
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption())
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema))
|
||||
|
||||
// load partial fields
|
||||
partialLoadedFields := []string{common.DefaultInt64FieldName, common.DefaultVarcharFieldName, common.DefaultFloatVecFieldName, common.DefaultFloat16VecFieldName}
|
||||
loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(partialLoadedFields...).WithSkipLoadDynamicField(true))
|
||||
common.CheckErr(t, err, true)
|
||||
err = loadTask.Await(ctx)
|
||||
common.CheckErr(t, err, true)
|
||||
|
||||
// search loaded fields & loaded fileds expr & output loaded fields
|
||||
expr := fmt.Sprintf("%s > '2' ", common.DefaultVarcharFieldName)
|
||||
vectors := hp.GenSearchVectors(common.DefaultNq, common.DefaultDim, entity.FieldTypeFloatVector)
|
||||
searchRes, err := mc.Search(ctx, clientv2.NewSearchOption(schema.CollectionName, common.DefaultLimit, vectors).WithANNSField(common.DefaultFloatVecFieldName).WithOutputFields("*").WithFilter(expr))
|
||||
common.CheckErr(t, err, true)
|
||||
common.CheckSearchResult(t, searchRes, common.DefaultNq, common.DefaultLimit)
|
||||
common.CheckOutputFields(t, partialLoadedFields, searchRes[0].Fields)
|
||||
|
||||
// TODO: hybrid search with loaded fields
|
||||
|
||||
// search with not-loaded anns field -> Error
|
||||
_, err = mc.Search(ctx, clientv2.NewSearchOption(schema.CollectionName, common.DefaultLimit, vectors).WithANNSField(common.DefaultBFloat16VecFieldName))
|
||||
common.CheckErr(t, err, false, "ann field \"bf16Vec\" not loaded")
|
||||
|
||||
// search with expr not loaded field -> Error
|
||||
invalidExpr := fmt.Sprintf("%s > 2.0 ", common.DefaultFloatFieldName)
|
||||
_, err = mc.Search(ctx, clientv2.NewSearchOption(schema.CollectionName, common.DefaultLimit, vectors).WithANNSField(common.DefaultFloatVecFieldName).WithFilter(invalidExpr))
|
||||
common.CheckErr(t, err, false, "field fieldID:105 name:\"float\" data_type:Float is not loaded")
|
||||
|
||||
// search with output_fields not loaded field -> Error
|
||||
_, err = mc.Search(ctx, clientv2.NewSearchOption(schema.CollectionName, common.DefaultLimit, vectors).WithANNSField(common.DefaultFloatVecFieldName).WithOutputFields(common.DefaultBoolFieldName))
|
||||
common.CheckErr(t, err, false, "field bool is not loaded")
|
||||
}
|
||||
|
||||
func TestLoadSkipDynamicField(t *testing.T) {
|
||||
/*
|
||||
1. load -> search output dynamic field
|
||||
2. reload and skip dynamic field
|
||||
verify:
|
||||
- search output dynamic field -> error
|
||||
- search with dynamic field in expr -> error
|
||||
*/
|
||||
// create collection -> insert -> flush -> index
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
||||
// create collection -> insert -> flush -> index -> load
|
||||
prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64Vec), hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true))
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption())
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema))
|
||||
prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName))
|
||||
|
||||
// query and verify output dynamic fields
|
||||
queryRes, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.DefaultDynamicFieldName).WithLimit(10))
|
||||
common.CheckErr(t, err, true)
|
||||
common.CheckOutputFields(t, []string{common.DefaultDynamicFieldName, common.DefaultInt64FieldName}, queryRes.Fields)
|
||||
|
||||
// reload and skip dynamic field
|
||||
mc.ReleaseCollection(ctx, clientv2.NewReleaseCollectionOption(schema.CollectionName))
|
||||
loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithSkipLoadDynamicField(true))
|
||||
common.CheckErr(t, err, true)
|
||||
err = loadTask.Await(ctx)
|
||||
common.CheckErr(t, err, true)
|
||||
|
||||
// search and verify output dynamic fields
|
||||
t.Log("https://github.com/milvus-io/milvus/issues/37857")
|
||||
_, err = mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.DefaultDynamicNumberField).WithLimit(10))
|
||||
common.CheckErr(t, err, false, fmt.Sprintf("field %s cannot be returned since dynamic field not loaded", common.DefaultDynamicNumberField))
|
||||
|
||||
_, err = mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithFilter(fmt.Sprintf("%s > 0", common.DefaultDynamicNumberField)))
|
||||
common.CheckErr(t, err, false, "but dynamic field is not loaded")
|
||||
}
|
||||
|
||||
func TestLoadPartialVectorFields(t *testing.T) {
|
||||
t.Skip("waiting for HybridSearch implementation")
|
||||
/*
|
||||
1. verify load different vector fields has different hybrid search results
|
||||
*/
|
||||
// create collection -> insert -> flush -> index
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
||||
// create collection -> insert -> flush -> index
|
||||
prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64MultiVec), hp.TNewFieldsOption(), hp.TNewSchemaOption())
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption())
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema))
|
||||
|
||||
// load partial vector fields
|
||||
prepare.Load(ctx, t, mc, hp.NewLoadParams(schema.CollectionName).TWithLoadFields(common.DefaultFloatVecFieldName, common.DefaultFloat16VecFieldName))
|
||||
// TODO: verify hybrid search results
|
||||
// TODO: load partial vector fields
|
||||
}
|
||||
|
||||
func TestLoadPartialFieldsPartitions(t *testing.T) {
|
||||
/*
|
||||
1. insert data into default partition and parName partition
|
||||
2. load default partition with partial fields -> succ & query
|
||||
3. load parName partition with different partial fields -> error
|
||||
4. load parName partition with same partial fields -> succ
|
||||
5. query from default partition and parName partition -> count=2*nb
|
||||
*/
|
||||
// no [pk, clustering, part dynamic fields] field, not all fields
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
||||
parName := common.GenRandomString("p", 4)
|
||||
|
||||
// create collection and partition
|
||||
prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64VecAllScalar), hp.TNewFieldsOption(), hp.TNewSchemaOption())
|
||||
err := mc.CreatePartition(ctx, clientv2.NewCreatePartitionOption(schema.CollectionName, parName))
|
||||
common.CheckErr(t, err, true)
|
||||
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(common.DefaultNb))
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema).TWithPartitionName(parName), hp.TNewDataOption().TWithNb(common.DefaultNb).TWithStart(common.DefaultNb))
|
||||
|
||||
// create index
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema))
|
||||
|
||||
// load default partition with partial fields
|
||||
partitionFields := []string{common.DefaultInt64FieldName, common.DefaultFloatVecFieldName, common.DefaultBoolFieldName}
|
||||
taskDef, errDef := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, common.DefaultPartition).WithLoadFields(partitionFields...))
|
||||
common.CheckErr(t, errDef, true)
|
||||
err = taskDef.Await(ctx)
|
||||
common.CheckErr(t, err, true)
|
||||
|
||||
// query from default partition
|
||||
queryDefault, errQuery := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(partitionFields...).WithPartitions(common.DefaultPartition).WithLimit(common.DefaultLimit))
|
||||
common.CheckErr(t, errQuery, true)
|
||||
common.CheckOutputFields(t, partitionFields, queryDefault.Fields)
|
||||
|
||||
// load parName partition with different partial fields
|
||||
diffFields := []string{common.DefaultInt64FieldName, common.DefaultFloatVecFieldName, common.DefaultVarcharFieldName}
|
||||
_, errPar := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, parName).WithLoadFields(diffFields...))
|
||||
common.CheckErr(t, errPar, false, "can't change the load field list for loaded collection")
|
||||
|
||||
// load parName partition with different partial fields
|
||||
taskPar, errPar := mc.LoadPartitions(ctx, clientv2.NewLoadPartitionsOption(schema.CollectionName, parName).WithLoadFields(partitionFields...))
|
||||
common.CheckErr(t, errPar, true)
|
||||
err = taskPar.Await(ctx)
|
||||
common.CheckErr(t, err, true)
|
||||
|
||||
// query from default partition and parName partition
|
||||
countMultiPartitions, err := mc.Query(ctx, clientv2.NewQueryOption(schema.CollectionName).WithOutputFields(common.QueryCountFieldName))
|
||||
common.CheckErr(t, err, true)
|
||||
count, _ := countMultiPartitions.GetColumn(common.QueryCountFieldName).GetAsInt64(0)
|
||||
require.Equal(t, int64(common.DefaultNb*2), count)
|
||||
}
|
||||
|
||||
func TestLoadPartialFieldsWithoutPartitionKey(t *testing.T) {
|
||||
/*
|
||||
code fields: pk, clustering key, partition key, part dynamic fields, non-vector fields -> error
|
||||
not index: error
|
||||
*/
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
||||
// define fields & schema
|
||||
pkField := entity.NewField().WithName("pk").WithDataType(entity.FieldTypeInt64).WithIsPrimaryKey(true)
|
||||
partitionKeyField := entity.NewField().WithName("partition_key").WithDataType(entity.FieldTypeInt64).WithIsPartitionKey(true)
|
||||
clusteringKeyField := entity.NewField().WithName("clustering_key").WithDataType(entity.FieldTypeInt64).WithIsClusteringKey(true)
|
||||
vecField := entity.NewField().WithName("vec").WithDataType(entity.FieldTypeFloatVector).WithDim(common.DefaultDim)
|
||||
schema := entity.NewSchema().WithName(common.GenRandomString("partial", 4)).WithField(pkField).WithField(partitionKeyField).WithField(clusteringKeyField).WithField(vecField).WithDynamicFieldEnabled(true)
|
||||
mc.CreateCollection(ctx, clientv2.NewCreateCollectionOption(schema.CollectionName, schema))
|
||||
|
||||
// load partial fields without partition key fields
|
||||
_, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(pkField.Name, vecField.Name))
|
||||
common.CheckErr(t, err, false, "does not contain partition key field partition_key")
|
||||
// load partial fields without clustering key fields
|
||||
_, err = mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(partitionKeyField.Name, pkField.Name, vecField.Name))
|
||||
common.CheckErr(t, err, false, "does not contain clustering key field clustering_key")
|
||||
// load partial fields without pk
|
||||
_, err = mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(partitionKeyField.Name, clusteringKeyField.Name, vecField.Name))
|
||||
common.CheckErr(t, err, false, "does not contain primary key field pk")
|
||||
// load partial fields without vector fields
|
||||
_, err = mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(partitionKeyField.Name, clusteringKeyField.Name, pkField.Name))
|
||||
common.CheckErr(t, err, false, "does not contain vector field")
|
||||
|
||||
_, err = mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(partitionKeyField.Name, clusteringKeyField.Name, pkField.Name, vecField.Name))
|
||||
common.CheckErr(t, err, false, "index not found")
|
||||
|
||||
// create index
|
||||
hp.CollPrepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema))
|
||||
_, err = mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(partitionKeyField.Name, clusteringKeyField.Name, pkField.Name, vecField.Name))
|
||||
common.CheckErr(t, err, true)
|
||||
}
|
||||
|
||||
func TestLoadPartialFieldsRepeated(t *testing.T) {
|
||||
/*
|
||||
1. repeated Load with different LoadFields -> error
|
||||
*/
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
||||
// create collection -> insert -> flush -> index -> load
|
||||
prepare, schema := hp.CollPrepare.CreateCollection(ctx, t, mc, hp.NewCreateCollectionParams(hp.Int64VecArray), hp.TNewFieldsOption(), hp.TNewSchemaOption().TWithEnableDynamicField(true))
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption())
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
prepare.CreateIndex(ctx, t, mc, hp.TNewIndexParams(schema))
|
||||
|
||||
loadTask, err := mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(common.DefaultInt64FieldName, common.DefaultFloatVecFieldName, common.DefaultInt64ArrayField))
|
||||
common.CheckErr(t, err, true)
|
||||
err = loadTask.Await(ctx)
|
||||
common.CheckErr(t, err, true)
|
||||
|
||||
// load with different fields
|
||||
_, err = mc.LoadCollection(ctx, clientv2.NewLoadCollectionOption(schema.CollectionName).WithLoadFields(common.DefaultInt64FieldName, common.DefaultFloatVecFieldName, common.DefaultVarcharArrayField))
|
||||
common.CheckErr(t, err, false, "can't change the load field list for loaded collection")
|
||||
}
|
||||
|
||||
func TestReleaseCollection(t *testing.T) {
|
||||
ctx := hp.CreateContext(t, time.Second*common.DefaultTimeout)
|
||||
mc := createDefaultMilvusClient(ctx, t)
|
||||
|
@ -305,15 +522,8 @@ func TestReleasePartitions(t *testing.T) {
|
|||
common.CheckErr(t, err, true)
|
||||
|
||||
// insert [0, nb) into default partition and new partition
|
||||
_defVec := hp.GenColumnData(nb, entity.FieldTypeFloatVector, *hp.TNewDataOption())
|
||||
_defPk := hp.GenColumnData(nb, entity.FieldTypeInt64, *hp.TNewDataOption())
|
||||
insertRes1, err1 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_defPk, _defVec))
|
||||
common.CheckErr(t, err1, true)
|
||||
require.EqualValues(t, nb, insertRes1.InsertCount)
|
||||
|
||||
insertRes2, err2 := mc.Insert(ctx, clientv2.NewColumnBasedInsertOption(schema.CollectionName).WithColumns(_defVec, _defPk).WithPartition(parName))
|
||||
common.CheckErr(t, err2, true)
|
||||
require.EqualValues(t, nb, insertRes2.InsertCount)
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema), hp.TNewDataOption().TWithNb(nb))
|
||||
prepare.InsertData(ctx, t, mc, hp.NewInsertParams(schema).TWithPartitionName(parName), hp.TNewDataOption().TWithNb(nb).TWithStart(nb))
|
||||
|
||||
// create index
|
||||
prepare.FlushData(ctx, t, mc, schema.CollectionName)
|
||||
|
|
Loading…
Reference in New Issue