diff --git a/go.mod b/go.mod index d26e77871c..218725be30 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.16.7 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231228051838-b5442d755fa4 + github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231229025438-39bce6abb18f github.com/minio/minio-go/v7 v7.0.61 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.3.0 diff --git a/go.sum b/go.sum index 95c831d51a..2ca0615cd3 100644 --- a/go.sum +++ b/go.sum @@ -583,8 +583,8 @@ github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/le github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231228051838-b5442d755fa4 h1:nxIohfJOCMbixFAC3q4Lclmv0xg/8q6D8T7D8l258To= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231228051838-b5442d755fa4/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231229025438-39bce6abb18f h1:8lNcRqhQgUROtmtiIEdpQHGW82KMI5oASVKxkaZ/tBg= +github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20231229025438-39bce6abb18f/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092 h1:UYJ7JB+QlMOoFHNdd8mUa3/lV63t9dnBX7ILXmEEWPY= github.com/milvus-io/milvus-storage/go v0.0.0-20231109072809-1cd7b0866092/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= diff --git a/internal/metastore/model/field.go b/internal/metastore/model/field.go index 10d44604d2..a4d906a24a 100644 --- a/internal/metastore/model/field.go +++ b/internal/metastore/model/field.go @@ -7,19 +7,20 @@ import ( ) type Field struct { - FieldID int64 - Name string - IsPrimaryKey bool - Description string - DataType schemapb.DataType - TypeParams []*commonpb.KeyValuePair - IndexParams []*commonpb.KeyValuePair - AutoID bool - State schemapb.FieldState - IsDynamic bool - IsPartitionKey bool // partition key mode, multi logic partitions share a physical partition - DefaultValue *schemapb.ValueField - ElementType schemapb.DataType + FieldID int64 + Name string + IsPrimaryKey bool + Description string + DataType schemapb.DataType + TypeParams []*commonpb.KeyValuePair + IndexParams []*commonpb.KeyValuePair + AutoID bool + State schemapb.FieldState + IsDynamic bool + IsPartitionKey bool // partition key mode, multi logic partitions share a physical partition + IsClusteringKey bool + DefaultValue *schemapb.ValueField + ElementType schemapb.DataType } func (f *Field) Available() bool { @@ -28,19 +29,20 @@ func (f *Field) Available() bool { func (f *Field) Clone() *Field { return &Field{ - FieldID: f.FieldID, - Name: f.Name, - IsPrimaryKey: f.IsPrimaryKey, - Description: f.Description, - DataType: f.DataType, - TypeParams: common.CloneKeyValuePairs(f.TypeParams), - IndexParams: common.CloneKeyValuePairs(f.IndexParams), - AutoID: f.AutoID, - State: f.State, - IsDynamic: f.IsDynamic, - IsPartitionKey: f.IsPartitionKey, - DefaultValue: f.DefaultValue, - ElementType: f.ElementType, + FieldID: f.FieldID, + Name: f.Name, + IsPrimaryKey: f.IsPrimaryKey, + Description: f.Description, + DataType: f.DataType, + TypeParams: common.CloneKeyValuePairs(f.TypeParams), + IndexParams: common.CloneKeyValuePairs(f.IndexParams), + AutoID: f.AutoID, + State: f.State, + IsDynamic: f.IsDynamic, + IsPartitionKey: f.IsPartitionKey, + IsClusteringKey: f.IsClusteringKey, + DefaultValue: f.DefaultValue, + ElementType: f.ElementType, } } @@ -68,6 +70,7 @@ func (f *Field) Equal(other Field) bool { f.AutoID == other.AutoID && f.IsPartitionKey == other.IsPartitionKey && f.IsDynamic == other.IsDynamic && + f.IsClusteringKey == other.IsClusteringKey && f.DefaultValue == other.DefaultValue && f.ElementType == other.ElementType } @@ -91,18 +94,19 @@ func MarshalFieldModel(field *Field) *schemapb.FieldSchema { } return &schemapb.FieldSchema{ - FieldID: field.FieldID, - Name: field.Name, - IsPrimaryKey: field.IsPrimaryKey, - Description: field.Description, - DataType: field.DataType, - TypeParams: field.TypeParams, - IndexParams: field.IndexParams, - AutoID: field.AutoID, - IsDynamic: field.IsDynamic, - IsPartitionKey: field.IsPartitionKey, - DefaultValue: field.DefaultValue, - ElementType: field.ElementType, + FieldID: field.FieldID, + Name: field.Name, + IsPrimaryKey: field.IsPrimaryKey, + Description: field.Description, + DataType: field.DataType, + TypeParams: field.TypeParams, + IndexParams: field.IndexParams, + AutoID: field.AutoID, + IsDynamic: field.IsDynamic, + IsPartitionKey: field.IsPartitionKey, + IsClusteringKey: field.IsClusteringKey, + DefaultValue: field.DefaultValue, + ElementType: field.ElementType, } } @@ -124,18 +128,19 @@ func UnmarshalFieldModel(fieldSchema *schemapb.FieldSchema) *Field { } return &Field{ - FieldID: fieldSchema.FieldID, - Name: fieldSchema.Name, - IsPrimaryKey: fieldSchema.IsPrimaryKey, - Description: fieldSchema.Description, - DataType: fieldSchema.DataType, - TypeParams: fieldSchema.TypeParams, - IndexParams: fieldSchema.IndexParams, - AutoID: fieldSchema.AutoID, - IsDynamic: fieldSchema.IsDynamic, - IsPartitionKey: fieldSchema.IsPartitionKey, - DefaultValue: fieldSchema.DefaultValue, - ElementType: fieldSchema.ElementType, + FieldID: fieldSchema.FieldID, + Name: fieldSchema.Name, + IsPrimaryKey: fieldSchema.IsPrimaryKey, + Description: fieldSchema.Description, + DataType: fieldSchema.DataType, + TypeParams: fieldSchema.TypeParams, + IndexParams: fieldSchema.IndexParams, + AutoID: fieldSchema.AutoID, + IsDynamic: fieldSchema.IsDynamic, + IsPartitionKey: fieldSchema.IsPartitionKey, + IsClusteringKey: fieldSchema.IsClusteringKey, + DefaultValue: fieldSchema.DefaultValue, + ElementType: fieldSchema.ElementType, } } diff --git a/internal/proxy/task.go b/internal/proxy/task.go index a3f9bf16e3..bb57d377e4 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -208,6 +208,36 @@ func (t *createCollectionTask) validatePartitionKey() error { return nil } +func (t *createCollectionTask) validateClusteringKey() error { + idx := -1 + for i, field := range t.schema.Fields { + if field.GetIsClusteringKey() { + if idx != -1 { + return merr.WrapErrCollectionIllegalSchema(t.CollectionName, + fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name)) + } + + if field.GetIsPrimaryKey() { + return merr.WrapErrCollectionIllegalSchema(t.CollectionName, + fmt.Sprintf("the clustering key field must not be primary key field, field name = %s", field.Name)) + } + + if field.GetIsPartitionKey() { + return merr.WrapErrCollectionIllegalSchema(t.CollectionName, + fmt.Sprintf("the clustering key field must not be partition key field, field name = %s", field.Name)) + } + idx = i + } + } + + if idx != -1 { + log.Info("create collection with clustering key", + zap.String("collectionName", t.CollectionName), + zap.String("clusteringKeyField", t.schema.Fields[idx].Name)) + } + return nil +} + func (t *createCollectionTask) PreExecute(ctx context.Context) error { t.Base.MsgType = commonpb.MsgType_CreateCollection t.Base.SourceID = paramtable.GetNodeID() @@ -266,6 +296,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error { return err } + // validate clustering key + if err := t.validateClusteringKey(); err != nil { + return err + } + for _, field := range t.schema.Fields { // validate field name if err := validateFieldName(field.Name); err != nil { @@ -572,18 +607,19 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error { } if field.FieldID >= common.StartOfUserFieldID { t.result.Schema.Fields = append(t.result.Schema.Fields, &schemapb.FieldSchema{ - FieldID: field.FieldID, - Name: field.Name, - IsPrimaryKey: field.IsPrimaryKey, - AutoID: field.AutoID, - Description: field.Description, - DataType: field.DataType, - TypeParams: field.TypeParams, - IndexParams: field.IndexParams, - IsDynamic: field.IsDynamic, - IsPartitionKey: field.IsPartitionKey, - DefaultValue: field.DefaultValue, - ElementType: field.ElementType, + FieldID: field.FieldID, + Name: field.Name, + IsPrimaryKey: field.IsPrimaryKey, + AutoID: field.AutoID, + Description: field.Description, + DataType: field.DataType, + TypeParams: field.TypeParams, + IndexParams: field.IndexParams, + IsDynamic: field.IsDynamic, + IsPartitionKey: field.IsPartitionKey, + IsClusteringKey: field.IsClusteringKey, + DefaultValue: field.DefaultValue, + ElementType: field.ElementType, }) } } diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 4bb083ee5f..eb3acf48d7 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -3493,3 +3493,186 @@ func TestPartitionKey(t *testing.T) { assert.Error(t, err) }) } + +func TestClusteringKey(t *testing.T) { + rc := NewRootCoordMock() + + defer rc.Close() + qc := getQueryCoordClient() + + ctx := context.Background() + + mgr := newShardClientMgr() + err := InitMetaCache(ctx, rc, qc, mgr) + assert.NoError(t, err) + + shardsNum := common.DefaultShardsNum + prefix := "TestClusteringKey" + collectionName := prefix + funcutil.GenRandomStr() + + t.Run("create collection normal", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64 + clusterKeyField := &schemapb.FieldSchema{ + Name: "cluster_key_field", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + vecField := &schemapb.FieldSchema{ + Name: "fvec_field", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: strconv.Itoa(testVecDim), + }, + }, + } + schema.Fields = append(schema.Fields, vecField) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.NoError(t, err) + err = createCollectionTask.Execute(ctx) + assert.NoError(t, err) + }) + + t.Run("create collection clustering key can not be partition key", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64 + clusterKeyField := &schemapb.FieldSchema{ + Name: "cluster_key_field", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + IsPartitionKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.Error(t, err) + }) + + t.Run("create collection clustering key can not be primary key", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64 + clusterKeyField := &schemapb.FieldSchema{ + Name: "cluster_key_field", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + IsPrimaryKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.Error(t, err) + }) + + t.Run("create collection not support more than one clustering key", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + fieldName2Type["cluster_key_field"] = schemapb.DataType_Int64 + clusterKeyField := &schemapb.FieldSchema{ + Name: "cluster_key_field", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + clusterKeyField2 := &schemapb.FieldSchema{ + Name: "cluster_key_field2", + DataType: schemapb.DataType_Int64, + IsClusteringKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField2) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.Error(t, err) + }) +} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 3f3a3095f9..411e108749 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -50,6 +50,7 @@ var ( ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) + ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index abe3178d44..9310edb0d5 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -465,6 +465,15 @@ func WrapErrCollectionLoaded(collection string, msgAndArgs ...any) error { return err } +func WrapErrCollectionIllegalSchema(collection string, msgAndArgs ...any) error { + err := wrapFields(ErrCollectionIllegalSchema, value("collection", collection)) + if len(msgAndArgs) > 0 { + msg := msgAndArgs[0].(string) + err = errors.Wrapf(err, msg, msgAndArgs[1:]...) + } + return err +} + func WrapErrAliasNotFound(db any, alias any, msg ...string) error { err := wrapFields(ErrAliasNotFound, value("database", db),