mirror of https://github.com/milvus-io/milvus.git
Add an option to enable/disable vector field clustering key (#34097)
#30633 Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/34169/head
parent
c8c423e972
commit
f9a0f7bb25
|
@ -650,8 +650,10 @@ common:
|
|||
traceLogMode: 0 # trace request info
|
||||
bloomFilterSize: 100000 # bloom filter initial size
|
||||
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
|
||||
# clustering key/compaction related
|
||||
usePartitionKeyAsClusteringKey: false
|
||||
useVectorAsClusteringKey: false
|
||||
enableVectorClusteringKey: false
|
||||
|
||||
# QuotaConfig, configurations of Milvus quota and limits.
|
||||
# By default, we enable:
|
||||
|
|
|
@ -248,6 +248,10 @@ func (t *createCollectionTask) validateClusteringKey() error {
|
|||
idx := -1
|
||||
for i, field := range t.schema.Fields {
|
||||
if field.GetIsClusteringKey() {
|
||||
if typeutil.IsVectorType(field.GetDataType()) &&
|
||||
!paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() {
|
||||
return merr.WrapErrCollectionVectorClusteringKeyNotAllowed(t.CollectionName)
|
||||
}
|
||||
if idx != -1 {
|
||||
return merr.WrapErrCollectionIllegalSchema(t.CollectionName,
|
||||
fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name))
|
||||
|
|
|
@ -3568,6 +3568,41 @@ func TestClusteringKey(t *testing.T) {
|
|||
err = createCollectionTask.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("create collection with vector clustering key", func(t *testing.T) {
|
||||
fieldName2Type := make(map[string]schemapb.DataType)
|
||||
fieldName2Type["int64_field"] = schemapb.DataType_Int64
|
||||
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
|
||||
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
|
||||
clusterKeyField := &schemapb.FieldSchema{
|
||||
Name: "vec_field",
|
||||
DataType: schemapb.DataType_FloatVector,
|
||||
IsClusteringKey: true,
|
||||
}
|
||||
schema.Fields = append(schema.Fields, clusterKeyField)
|
||||
marshaledSchema, err := proto.Marshal(schema)
|
||||
assert.NoError(t, err)
|
||||
|
||||
createCollectionTask := &createCollectionTask{
|
||||
Condition: NewTaskCondition(ctx),
|
||||
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
|
||||
Timestamp: Timestamp(time.Now().UnixNano()),
|
||||
},
|
||||
DbName: "",
|
||||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
ShardsNum: shardsNum,
|
||||
},
|
||||
ctx: ctx,
|
||||
rootCoord: rc,
|
||||
result: nil,
|
||||
schema: nil,
|
||||
}
|
||||
err = createCollectionTask.PreExecute(ctx)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAlterCollectionCheckLoaded(t *testing.T) {
|
||||
|
|
|
@ -380,6 +380,7 @@ func vector2Placeholder(vectors [][]float32) *commonpb.PlaceholderValue {
|
|||
|
||||
func (sps *SegmentPrunerSuite) TestPruneSegmentsByVectorField() {
|
||||
paramtable.Init()
|
||||
paramtable.Get().Save(paramtable.Get().CommonCfg.EnableVectorClusteringKey.Key, "true")
|
||||
sps.SetupForClustering("vec", schemapb.DataType_FloatVector)
|
||||
vector1 := []float32{0.8877872002188053, 0.6131822285635065, 0.8476814632326242, 0.6645877829359371, 0.9962627712600025, 0.8976183052440327, 0.41941169325798844, 0.7554387854258499}
|
||||
vector2 := []float32{0.8644394874390322, 0.023327886647378615, 0.08330118483461302, 0.7068040179963112, 0.6983994910799851, 0.5562075958994153, 0.3288536247938002, 0.07077341010237759}
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/distance"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
func CalcVectorDistance(dim int64, dataType schemapb.DataType, left []byte, right []float32, metric string) ([]float32, error) {
|
||||
|
@ -70,10 +71,16 @@ func GetClusteringKeyField(collectionSchema *schemapb.CollectionSchema) *schemap
|
|||
// in some server mode, we regard partition key field or vector field as clustering key by default.
|
||||
// here is the priority: clusteringKey > partitionKey > vector field(only single vector)
|
||||
if clusteringKeyField != nil {
|
||||
if typeutil.IsVectorType(clusteringKeyField.GetDataType()) &&
|
||||
!paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() {
|
||||
return nil
|
||||
}
|
||||
return clusteringKeyField
|
||||
} else if paramtable.Get().CommonCfg.UsePartitionKeyAsClusteringKey.GetAsBool() && partitionKeyField != nil {
|
||||
return partitionKeyField
|
||||
} else if paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && len(vectorFields) == 1 {
|
||||
} else if paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() &&
|
||||
paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() &&
|
||||
len(vectorFields) == 1 {
|
||||
return vectorFields[0]
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -46,13 +46,14 @@ var (
|
|||
ErrServiceResourceInsufficient = newMilvusError("service resource insufficient", 12, true)
|
||||
|
||||
// Collection related
|
||||
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
|
||||
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
|
||||
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
|
||||
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
|
||||
ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false)
|
||||
ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false)
|
||||
ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true)
|
||||
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
|
||||
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
|
||||
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
|
||||
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
|
||||
ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false)
|
||||
ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false)
|
||||
ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true)
|
||||
ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false)
|
||||
|
||||
// Partition related
|
||||
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)
|
||||
|
|
|
@ -87,6 +87,7 @@ func (s *ErrSuite) TestWrap() {
|
|||
s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded)
|
||||
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded)
|
||||
s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering)
|
||||
s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed)
|
||||
|
||||
// Partition related
|
||||
s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound)
|
||||
|
|
|
@ -489,6 +489,16 @@ func WrapErrCollectionOnRecovering(collection any, msgAndArgs ...any) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// WrapErrCollectionVectorClusteringKeyNotAllowed wraps ErrCollectionVectorClusteringKeyNotAllowed with collection
|
||||
func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs ...any) error {
|
||||
err := wrapFields(ErrCollectionVectorClusteringKeyNotAllowed, value("collection", collection))
|
||||
if len(msgAndArgs) > 0 {
|
||||
msg := msgAndArgs[0].(string)
|
||||
err = errors.Wrapf(err, msg, msgAndArgs[1:]...)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
|
||||
err := wrapFields(ErrAliasNotFound,
|
||||
value("database", db),
|
||||
|
|
|
@ -252,6 +252,7 @@ type commonConfig struct {
|
|||
|
||||
UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"`
|
||||
UseVectorAsClusteringKey ParamItem `refreshable:"true"`
|
||||
EnableVectorClusteringKey ParamItem `refreshable:"true"`
|
||||
}
|
||||
|
||||
func (p *commonConfig) init(base *BaseTable) {
|
||||
|
@ -777,7 +778,7 @@ like the old password verification when updating the credential`,
|
|||
|
||||
p.UsePartitionKeyAsClusteringKey = ParamItem{
|
||||
Key: "common.usePartitionKeyAsClusteringKey",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
Doc: "if true, do clustering compaction and segment prune on partition key field",
|
||||
DefaultValue: "false",
|
||||
}
|
||||
|
@ -785,11 +786,19 @@ like the old password verification when updating the credential`,
|
|||
|
||||
p.UseVectorAsClusteringKey = ParamItem{
|
||||
Key: "common.useVectorAsClusteringKey",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
Doc: "if true, do clustering compaction and segment prune on vector field",
|
||||
DefaultValue: "false",
|
||||
}
|
||||
p.UseVectorAsClusteringKey.Init(base.mgr)
|
||||
|
||||
p.EnableVectorClusteringKey = ParamItem{
|
||||
Key: "common.enableVectorClusteringKey",
|
||||
Version: "2.4.6",
|
||||
Doc: "if true, enable vector clustering key and vector clustering compaction",
|
||||
DefaultValue: "false",
|
||||
}
|
||||
p.EnableVectorClusteringKey.Init(base.mgr)
|
||||
}
|
||||
|
||||
type gpuConfig struct {
|
||||
|
@ -3289,7 +3298,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionEnable = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.enable",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "false",
|
||||
Doc: "Enable clustering compaction",
|
||||
Export: true,
|
||||
|
@ -3298,7 +3307,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionAutoEnable = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.autoEnable",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "false",
|
||||
Doc: "Enable auto clustering compaction",
|
||||
Export: true,
|
||||
|
@ -3307,28 +3316,28 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionTriggerInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.triggerInterval",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "600",
|
||||
}
|
||||
p.ClusteringCompactionTriggerInterval.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionStateCheckInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.stateCheckInterval",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "10",
|
||||
}
|
||||
p.ClusteringCompactionStateCheckInterval.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionGCInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.gcInterval",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "600",
|
||||
}
|
||||
p.ClusteringCompactionGCInterval.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionMinInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minInterval",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction",
|
||||
DefaultValue: "3600",
|
||||
}
|
||||
|
@ -3336,7 +3345,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxInterval = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxInterval",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact",
|
||||
DefaultValue: "86400",
|
||||
}
|
||||
|
@ -3344,7 +3353,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionNewDataSizeThreshold = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.newDataSizeThreshold",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction",
|
||||
DefaultValue: "512m",
|
||||
}
|
||||
|
@ -3352,14 +3361,14 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionTimeoutInSeconds = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.timeout",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "3600",
|
||||
}
|
||||
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)
|
||||
|
||||
p.ClusteringCompactionDropTolerance = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.dropTolerance",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
Doc: "If clustering compaction job is finished for a long time, gc it",
|
||||
DefaultValue: "259200",
|
||||
}
|
||||
|
@ -3367,7 +3376,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionPreferSegmentSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.preferSegmentSize",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "512m",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
|
@ -3376,7 +3385,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxSegmentSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxSegmentSize",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "1024m",
|
||||
PanicIfEmpty: false,
|
||||
Export: true,
|
||||
|
@ -3385,7 +3394,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxTrainSizeRatio",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "0.8",
|
||||
Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit",
|
||||
Export: true,
|
||||
|
@ -3394,7 +3403,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxCentroidsNum = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxCentroidsNum",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "10240",
|
||||
Doc: "maximum centroids number in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3403,7 +3412,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMinCentroidsNum = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minCentroidsNum",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "16",
|
||||
Doc: "minimum centroids number in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3412,7 +3421,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMinClusterSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.minClusterSizeRatio",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "0.01",
|
||||
Doc: "minimum cluster size / avg size in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3421,7 +3430,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxClusterSizeRatio",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "10",
|
||||
Doc: "maximum cluster size / avg size in Kmeans train",
|
||||
Export: true,
|
||||
|
@ -3430,7 +3439,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #
|
|||
|
||||
p.ClusteringCompactionMaxClusterSize = ParamItem{
|
||||
Key: "dataCoord.compaction.clustering.maxClusterSize",
|
||||
Version: "2.4.2",
|
||||
Version: "2.4.6",
|
||||
DefaultValue: "5g",
|
||||
Doc: "maximum cluster size in Kmeans train",
|
||||
Export: true,
|
||||
|
|
|
@ -543,6 +543,8 @@ func TestComponentParam(t *testing.T) {
|
|||
assert.Equal(t, true, Params.UsePartitionKeyAsClusteringKey.GetAsBool())
|
||||
params.Save("common.useVectorAsClusteringKey", "true")
|
||||
assert.Equal(t, true, Params.UseVectorAsClusteringKey.GetAsBool())
|
||||
params.Save("common.enableVectorClusteringKey", "true")
|
||||
assert.Equal(t, true, Params.EnableVectorClusteringKey.GetAsBool())
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue