feat: support partition key isolation (#34336)

issue: #34332

---------

Signed-off-by: Patrick Weizhi Xu <weizhi.xu@zilliz.com>
pull/34612/head
Patrick Weizhi Xu 2024-07-11 19:01:35 +08:00 committed by GitHub
parent d7a3697fb5
commit 104d0966b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 1063 additions and 112 deletions

View File

@ -217,6 +217,8 @@ VectorDiskAnnIndex<T>::BuildV2(const Config& config) {
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) { if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) {
build_config[VEC_OPT_FIELDS_PATH] = build_config[VEC_OPT_FIELDS_PATH] =
file_manager_->CacheOptFieldToDisk(opt_fields.value()); file_manager_->CacheOptFieldToDisk(opt_fields.value());
// `partition_key_isolation` is already in the config, so it falls through
// into the index Build call directly
} }
build_config.erase("insert_files"); build_config.erase("insert_files");
@ -264,6 +266,8 @@ VectorDiskAnnIndex<T>::Build(const Config& config) {
if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) { if (opt_fields.has_value() && index_.IsAdditionalScalarSupported()) {
build_config[VEC_OPT_FIELDS_PATH] = build_config[VEC_OPT_FIELDS_PATH] =
file_manager_->CacheOptFieldToDisk(opt_fields.value()); file_manager_->CacheOptFieldToDisk(opt_fields.value());
// `partition_key_isolation` is already in the config, so it falls through
// into the index Build call directly
} }
build_config.erase("insert_files"); build_config.erase("insert_files");

View File

@ -142,6 +142,9 @@ get_config(std::unique_ptr<milvus::proto::indexcgo::BuildIndexInfo>& info) {
if (info->opt_fields().size()) { if (info->opt_fields().size()) {
config["opt_fields"] = get_opt_field(info->opt_fields()); config["opt_fields"] = get_opt_field(info->opt_fields());
} }
if (info->partition_key_isolation()) {
config["partition_key_isolation"] = info->partition_key_isolation();
}
return config; return config;
} }

View File

@ -109,6 +109,7 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
} }
// vector index build needs information of optional scalar fields data // vector index build needs information of optional scalar fields data
optionalFields := make([]*indexpb.OptionalFieldInfo, 0) optionalFields := make([]*indexpb.OptionalFieldInfo, 0)
partitionKeyIsolation := false
if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) { if Params.CommonCfg.EnableMaterializedView.GetAsBool() && isOptionalScalarFieldSupported(indexType) {
collInfo, err := dependency.handler.GetCollection(ctx, segIndex.CollectionID) collInfo, err := dependency.handler.GetCollection(ctx, segIndex.CollectionID)
if err != nil || collInfo == nil { if err != nil || collInfo == nil {
@ -128,6 +129,13 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
FieldType: int32(partitionKeyField.DataType), FieldType: int32(partitionKeyField.DataType),
DataIds: getBinLogIDs(segment, partitionKeyField.FieldID), DataIds: getBinLogIDs(segment, partitionKeyField.FieldID),
}) })
iso, isoErr := common.IsPartitionKeyIsolationPropEnabled(collInfo.Properties)
if isoErr != nil {
log.Ctx(ctx).Warn("failed to parse partition key isolation", zap.Error(isoErr))
}
if iso {
partitionKeyIsolation = true
}
} }
} }
} }
@ -209,50 +217,52 @@ func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskSchedule
} }
it.req = &indexpb.CreateJobRequest{ it.req = &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: it.taskID, BuildID: it.taskID,
IndexVersion: segIndex.IndexVersion + 1, IndexVersion: segIndex.IndexVersion + 1,
StorageConfig: storageConfig, StorageConfig: storageConfig,
IndexParams: indexParams, IndexParams: indexParams,
TypeParams: typeParams, TypeParams: typeParams,
NumRows: segIndex.NumRows, NumRows: segIndex.NumRows,
CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
CollectionID: segment.GetCollectionID(), CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(), PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(), SegmentID: segment.GetID(),
FieldID: fieldID, FieldID: fieldID,
FieldName: field.GetName(), FieldName: field.GetName(),
FieldType: field.GetDataType(), FieldType: field.GetDataType(),
StorePath: storePath, StorePath: storePath,
StoreVersion: segment.GetStorageVersion(), StoreVersion: segment.GetStorageVersion(),
IndexStorePath: indexStorePath, IndexStorePath: indexStorePath,
Dim: int64(dim), Dim: int64(dim),
DataIds: binlogIDs, DataIds: binlogIDs,
OptionalScalarFields: optionalFields, OptionalScalarFields: optionalFields,
Field: field, Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
} }
} else { } else {
it.req = &indexpb.CreateJobRequest{ it.req = &indexpb.CreateJobRequest{
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath), IndexFilePrefix: path.Join(dependency.chunkManager.RootPath(), common.SegmentIndexPath),
BuildID: it.taskID, BuildID: it.taskID,
IndexVersion: segIndex.IndexVersion + 1, IndexVersion: segIndex.IndexVersion + 1,
StorageConfig: storageConfig, StorageConfig: storageConfig,
IndexParams: indexParams, IndexParams: indexParams,
TypeParams: typeParams, TypeParams: typeParams,
NumRows: segIndex.NumRows, NumRows: segIndex.NumRows,
CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(), CurrentIndexVersion: dependency.indexEngineVersionManager.GetCurrentIndexEngineVersion(),
CollectionID: segment.GetCollectionID(), CollectionID: segment.GetCollectionID(),
PartitionID: segment.GetPartitionID(), PartitionID: segment.GetPartitionID(),
SegmentID: segment.GetID(), SegmentID: segment.GetID(),
FieldID: fieldID, FieldID: fieldID,
FieldName: field.GetName(), FieldName: field.GetName(),
FieldType: field.GetDataType(), FieldType: field.GetDataType(),
Dim: int64(dim), Dim: int64(dim),
DataIds: binlogIDs, DataIds: binlogIDs,
OptionalScalarFields: optionalFields, OptionalScalarFields: optionalFields,
Field: field, Field: field,
PartitionKeyIsolation: partitionKeyIsolation,
} }
} }

View File

@ -1749,5 +1749,111 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() {
waitTaskDoneFunc(scheduler) waitTaskDoneFunc(scheduler)
resetMetaFunc() resetMetaFunc()
}) })
s.Run("enqueue partitionKeyIsolation is false when schema is not set", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler.enqueue(t)
waitTaskDoneFunc(scheduler)
resetMetaFunc()
})
scheduler.Stop() scheduler.Stop()
isoCollInfo := &collectionInfo{
ID: collID,
Schema: &schemapb.CollectionSchema{
Name: "coll",
Fields: fieldsSchema,
EnableDynamicField: false,
},
Properties: map[string]string{
common.PartitionKeyIsolationKey: "false",
},
}
handler_isolation := NewNMockHandler(s.T())
handler_isolation.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(isoCollInfo, nil)
scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation)
scheduler_isolation.Start()
s.Run("enqueue partitionKeyIsolation is false when MV not enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler_isolation.enqueue(t)
waitTaskDoneFunc(scheduler_isolation)
resetMetaFunc()
})
s.Run("enqueue partitionKeyIsolation is true when MV enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "true"
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, true)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler_isolation.enqueue(t)
waitTaskDoneFunc(scheduler_isolation)
resetMetaFunc()
})
s.Run("enqueue partitionKeyIsolation is invalid when MV is enabled", func() {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "invalid"
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) {
s.Equal(in.GetIndexRequest().PartitionKeyIsolation, false)
return merr.Success(), nil
}).Once()
t := &indexBuildTask{
taskID: buildID,
nodeID: nodeID,
taskInfo: &indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Unissued,
FailReason: "",
},
}
scheduler_isolation.enqueue(t)
waitTaskDoneFunc(scheduler_isolation)
resetMetaFunc()
})
scheduler_isolation.Stop()
} }

View File

@ -149,25 +149,26 @@ func (it *indexBuildTaskV2) Execute(ctx context.Context) error {
} }
buildIndexParams := &indexcgopb.BuildIndexInfo{ buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.req.GetClusterID(), ClusterID: it.req.GetClusterID(),
BuildID: it.req.GetBuildID(), BuildID: it.req.GetBuildID(),
CollectionID: it.req.GetCollectionID(), CollectionID: it.req.GetCollectionID(),
PartitionID: it.req.GetPartitionID(), PartitionID: it.req.GetPartitionID(),
SegmentID: it.req.GetSegmentID(), SegmentID: it.req.GetSegmentID(),
IndexVersion: it.req.GetIndexVersion(), IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.req.GetCurrentIndexVersion(), CurrentIndexVersion: it.req.GetCurrentIndexVersion(),
NumRows: it.req.GetNumRows(), NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(), Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(), IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(), InsertFiles: it.req.GetDataPaths(),
FieldSchema: it.req.GetField(), FieldSchema: it.req.GetField(),
StorageConfig: storageConfig, StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams), IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams), TypeParams: mapToKVPairs(it.newTypeParams),
StorePath: it.req.GetStorePath(), StorePath: it.req.GetStorePath(),
StoreVersion: it.req.GetStoreVersion(), StoreVersion: it.req.GetStoreVersion(),
IndexStorePath: it.req.GetIndexStorePath(), IndexStorePath: it.req.GetIndexStorePath(),
OptFields: optFields, OptFields: optFields,
PartitionKeyIsolation: it.req.GetPartitionKeyIsolation(),
} }
var err error var err error
@ -451,25 +452,26 @@ func (it *indexBuildTask) Execute(ctx context.Context) error {
} }
buildIndexParams := &indexcgopb.BuildIndexInfo{ buildIndexParams := &indexcgopb.BuildIndexInfo{
ClusterID: it.req.GetClusterID(), ClusterID: it.req.GetClusterID(),
BuildID: it.req.GetBuildID(), BuildID: it.req.GetBuildID(),
CollectionID: it.req.GetCollectionID(), CollectionID: it.req.GetCollectionID(),
PartitionID: it.req.GetPartitionID(), PartitionID: it.req.GetPartitionID(),
SegmentID: it.req.GetSegmentID(), SegmentID: it.req.GetSegmentID(),
IndexVersion: it.req.GetIndexVersion(), IndexVersion: it.req.GetIndexVersion(),
CurrentIndexVersion: it.req.GetCurrentIndexVersion(), CurrentIndexVersion: it.req.GetCurrentIndexVersion(),
NumRows: it.req.GetNumRows(), NumRows: it.req.GetNumRows(),
Dim: it.req.GetDim(), Dim: it.req.GetDim(),
IndexFilePrefix: it.req.GetIndexFilePrefix(), IndexFilePrefix: it.req.GetIndexFilePrefix(),
InsertFiles: it.req.GetDataPaths(), InsertFiles: it.req.GetDataPaths(),
FieldSchema: it.req.GetField(), FieldSchema: it.req.GetField(),
StorageConfig: storageConfig, StorageConfig: storageConfig,
IndexParams: mapToKVPairs(it.newIndexParams), IndexParams: mapToKVPairs(it.newIndexParams),
TypeParams: mapToKVPairs(it.newTypeParams), TypeParams: mapToKVPairs(it.newTypeParams),
StorePath: it.req.GetStorePath(), StorePath: it.req.GetStorePath(),
StoreVersion: it.req.GetStoreVersion(), StoreVersion: it.req.GetStoreVersion(),
IndexStorePath: it.req.GetIndexStorePath(), IndexStorePath: it.req.GetIndexStorePath(),
OptFields: optFields, OptFields: optFields,
PartitionKeyIsolation: it.req.GetPartitionKeyIsolation(),
} }
log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams)) log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams))

View File

@ -79,4 +79,5 @@ message BuildIndexInfo {
int64 store_version = 17; int64 store_version = 17;
string index_store_path = 18; string index_store_path = 18;
repeated OptionalFieldInfo opt_fields = 19; repeated OptionalFieldInfo opt_fields = 19;
bool partition_key_isolation = 20;
} }

View File

@ -287,6 +287,7 @@ message CreateJobRequest {
repeated int64 data_ids = 23; repeated int64 data_ids = 23;
repeated OptionalFieldInfo optional_scalar_fields = 24; repeated OptionalFieldInfo optional_scalar_fields = 24;
schema.FieldSchema field = 25; schema.FieldSchema field = 25;
bool partition_key_isolation = 26;
} }
message QueryJobsRequest { message QueryJobsRequest {

View File

@ -1235,6 +1235,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC
AlterCollectionRequest: request, AlterCollectionRequest: request,
rootCoord: node.rootCoord, rootCoord: node.rootCoord,
queryCoord: node.queryCoord, queryCoord: node.queryCoord,
dataCoord: node.dataCoord,
} }
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(

View File

@ -95,19 +95,21 @@ type Cache interface {
AllocID(ctx context.Context) (int64, error) AllocID(ctx context.Context) (int64, error)
} }
type collectionBasicInfo struct { type collectionBasicInfo struct {
collID typeutil.UniqueID collID typeutil.UniqueID
createdTimestamp uint64 createdTimestamp uint64
createdUtcTimestamp uint64 createdUtcTimestamp uint64
consistencyLevel commonpb.ConsistencyLevel consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
} }
type collectionInfo struct { type collectionInfo struct {
collID typeutil.UniqueID collID typeutil.UniqueID
schema *schemaInfo schema *schemaInfo
partInfo *partitionInfos partInfo *partitionInfos
createdTimestamp uint64 createdTimestamp uint64
createdUtcTimestamp uint64 createdUtcTimestamp uint64
consistencyLevel commonpb.ConsistencyLevel consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
} }
type databaseInfo struct { type databaseInfo struct {
@ -185,10 +187,11 @@ type partitionInfo struct {
func (info *collectionInfo) getBasicInfo() *collectionBasicInfo { func (info *collectionInfo) getBasicInfo() *collectionBasicInfo {
// Do a deep copy for all fields. // Do a deep copy for all fields.
basicInfo := &collectionBasicInfo{ basicInfo := &collectionBasicInfo{
collID: info.collID, collID: info.collID,
createdTimestamp: info.createdTimestamp, createdTimestamp: info.createdTimestamp,
createdUtcTimestamp: info.createdUtcTimestamp, createdUtcTimestamp: info.createdUtcTimestamp,
consistencyLevel: info.consistencyLevel, consistencyLevel: info.consistencyLevel,
partitionKeyIsolation: info.partitionKeyIsolation,
} }
return basicInfo return basicInfo
@ -385,14 +388,20 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
m.collInfo[database] = make(map[string]*collectionInfo) m.collInfo[database] = make(map[string]*collectionInfo)
} }
isolation, err := common.IsPartitionKeyIsolationKvEnabled(collection.Properties...)
if err != nil {
return nil, err
}
schemaInfo := newSchemaInfo(collection.Schema) schemaInfo := newSchemaInfo(collection.Schema)
m.collInfo[database][collectionName] = &collectionInfo{ m.collInfo[database][collectionName] = &collectionInfo{
collID: collection.CollectionID, collID: collection.CollectionID,
schema: schemaInfo, schema: schemaInfo,
partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField), partInfo: parsePartitionsInfo(infos, schemaInfo.hasPartitionKeyField),
createdTimestamp: collection.CreatedTimestamp, createdTimestamp: collection.CreatedTimestamp,
createdUtcTimestamp: collection.CreatedUtcTimestamp, createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel, consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
} }
log.Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID)) log.Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID))
@ -714,6 +723,7 @@ func (m *MetaCache) describeCollection(ctx context.Context, database, collection
CreatedUtcTimestamp: coll.CreatedUtcTimestamp, CreatedUtcTimestamp: coll.CreatedUtcTimestamp,
ConsistencyLevel: coll.ConsistencyLevel, ConsistencyLevel: coll.ConsistencyLevel,
DbName: coll.GetDbName(), DbName: coll.GetDbName(),
Properties: coll.Properties,
} }
for _, field := range coll.Schema.Fields { for _, field := range coll.Schema.Fields {
if field.FieldID >= common.StartOfUserFieldID { if field.FieldID >= common.StartOfUserFieldID {
@ -795,7 +805,7 @@ func parsePartitionsInfo(infos []*partitionInfo, hasPartitionKey bool) *partitio
} }
index, err := strconv.ParseInt(splits[len(splits)-1], 10, 64) index, err := strconv.ParseInt(splits[len(splits)-1], 10, 64)
if err != nil { if err != nil {
log.Info("partition group not in partitionKey pattern", zap.String("parititonName", partitionName), zap.Error(err)) log.Info("partition group not in partitionKey pattern", zap.String("partitionName", partitionName), zap.Error(err))
return result return result
} }
partitionNames[index] = partitionName partitionNames[index] = partitionName

View File

@ -51,6 +51,7 @@ type collectionMeta struct {
physicalChannelNames []string physicalChannelNames []string
createdTimestamp uint64 createdTimestamp uint64
createdUtcTimestamp uint64 createdUtcTimestamp uint64
properties []*commonpb.KeyValuePair
} }
type partitionMeta struct { type partitionMeta struct {
@ -385,6 +386,7 @@ func (coord *RootCoordMock) CreateCollection(ctx context.Context, req *milvuspb.
physicalChannelNames: physicalChannelNames, physicalChannelNames: physicalChannelNames,
createdTimestamp: ts, createdTimestamp: ts,
createdUtcTimestamp: ts, createdUtcTimestamp: ts,
properties: req.GetProperties(),
} }
coord.partitionMtx.Lock() coord.partitionMtx.Lock()
@ -528,6 +530,7 @@ func (coord *RootCoordMock) DescribeCollection(ctx context.Context, req *milvusp
PhysicalChannelNames: meta.physicalChannelNames, PhysicalChannelNames: meta.physicalChannelNames,
CreatedTimestamp: meta.createdUtcTimestamp, CreatedTimestamp: meta.createdUtcTimestamp,
CreatedUtcTimestamp: meta.createdUtcTimestamp, CreatedUtcTimestamp: meta.createdUtcTimestamp,
Properties: meta.properties,
}, nil }, nil
} }

View File

@ -331,6 +331,11 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error {
return err return err
} }
hasPartitionKey := hasParitionKeyModeField(t.schema)
if _, err := validatePartitionKeyIsolation(t.CollectionName, hasPartitionKey, t.GetProperties()...); err != nil {
return err
}
// validate clustering key // validate clustering key
if err := t.validateClusteringKey(); err != nil { if err := t.validateClusteringKey(); err != nil {
return err return err
@ -841,6 +846,7 @@ type alterCollectionTask struct {
rootCoord types.RootCoordClient rootCoord types.RootCoordClient
result *commonpb.Status result *commonpb.Status
queryCoord types.QueryCoordClient queryCoord types.QueryCoordClient
dataCoord types.DataCoordClient
} }
func (t *alterCollectionTask) TraceCtx() context.Context { func (t *alterCollectionTask) TraceCtx() context.Context {
@ -900,6 +906,32 @@ func hasLazyLoadProp(props ...*commonpb.KeyValuePair) bool {
return false return false
} }
func validatePartitionKeyIsolation(colName string, isPartitionKeyEnabled bool, props ...*commonpb.KeyValuePair) (bool, error) {
iso, err := common.IsPartitionKeyIsolationKvEnabled(props...)
if err != nil {
return false, err
}
// partition key isolation is not set, skip
if !iso {
return false, nil
}
if !isPartitionKeyEnabled {
return false, merr.WrapErrCollectionIllegalSchema(colName,
"partition key isolation mode is enabled but no partition key field is set. Please set the partition key first")
}
if !paramtable.Get().CommonCfg.EnableMaterializedView.GetAsBool() {
return false, merr.WrapErrCollectionIllegalSchema(colName,
"partition key isolation mode is enabled but current Milvus does not support it. Please contact us")
}
log.Info("validated with partition key isolation", zap.String("collectionName", colName))
return true, nil
}
func (t *alterCollectionTask) PreExecute(ctx context.Context) error { func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
t.Base.MsgType = commonpb.MsgType_AlterCollection t.Base.MsgType = commonpb.MsgType_AlterCollection
t.Base.SourceID = paramtable.GetNodeID() t.Base.SourceID = paramtable.GetNodeID()
@ -920,6 +952,62 @@ func (t *alterCollectionTask) PreExecute(ctx context.Context) error {
} }
} }
isPartitionKeyMode, err := isPartitionKeyMode(ctx, t.GetDbName(), t.CollectionName)
if err != nil {
return err
}
// check if the new partition key isolation is valid to use
newIsoValue, err := validatePartitionKeyIsolation(t.CollectionName, isPartitionKeyMode, t.Properties...)
if err != nil {
return err
}
collBasicInfo, err := globalMetaCache.GetCollectionInfo(t.ctx, t.GetDbName(), t.CollectionName, t.CollectionID)
if err != nil {
return err
}
oldIsoValue := collBasicInfo.partitionKeyIsolation
log.Info("alter collection pre check with partition key isolation",
zap.String("collectionName", t.CollectionName),
zap.Bool("isPartitionKeyMode", isPartitionKeyMode),
zap.Bool("newIsoValue", newIsoValue),
zap.Bool("oldIsoValue", oldIsoValue))
// if the isolation flag in properties is not set, meta cache will assign partitionKeyIsolation in collection info to false
// - None|false -> false, skip
// - None|false -> true, check if the collection has vector index
// - true -> false, check if the collection has vector index
// - false -> true, check if the collection has vector index
// - true -> true, skip
if oldIsoValue != newIsoValue {
collSchema, err := globalMetaCache.GetCollectionSchema(ctx, t.GetDbName(), t.CollectionName)
if err != nil {
return err
}
hasVecIndex := false
indexName := ""
indexResponse, err := t.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{
CollectionID: t.CollectionID,
IndexName: "",
})
if err != nil {
return merr.WrapErrServiceInternal("describe index failed", err.Error())
}
for _, index := range indexResponse.IndexInfos {
for _, field := range collSchema.Fields {
if index.FieldID == field.FieldID && typeutil.IsVectorType(field.DataType) {
hasVecIndex = true
indexName = field.GetName()
}
}
}
if hasVecIndex {
return merr.WrapErrIndexDuplicate(indexName,
"can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first")
}
}
return nil return nil
} }

View File

@ -297,7 +297,7 @@ func (t *searchTask) checkNq(ctx context.Context) (int64, error) {
return nq, nil return nq, nil
} }
func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask) error { func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask, plan *planpb.PlanNode) error {
if t.enableMaterializedView { if t.enableMaterializedView {
partitionKeyFieldSchema, err := typeutil.GetPartitionKeyFieldSchema(t.schema.CollectionSchema) partitionKeyFieldSchema, err := typeutil.GetPartitionKeyFieldSchema(t.schema.CollectionSchema)
if err != nil { if err != nil {
@ -305,7 +305,26 @@ func setQueryInfoIfMvEnable(queryInfo *planpb.QueryInfo, t *searchTask) error {
return err return err
} }
if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyFieldSchema) { if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyFieldSchema) {
collInfo, colErr := globalMetaCache.GetCollectionInfo(t.ctx, t.request.GetDbName(), t.collectionName, t.CollectionID)
if colErr != nil {
log.Warn("failed to get collection info", zap.Error(colErr))
return err
}
if collInfo.partitionKeyIsolation {
expr, err := exprutil.ParseExprFromPlan(plan)
if err != nil {
log.Warn("failed to parse expr from plan during MV", zap.Error(err))
return err
}
err = exprutil.ValidatePartitionKeyIsolation(expr)
if err != nil {
return err
}
}
queryInfo.MaterializedViewInvolved = true queryInfo.MaterializedViewInvolved = true
} else {
return errors.New("partition key field data type is not supported in materialized view")
} }
} }
return nil return nil
@ -350,7 +369,10 @@ func (t *searchTask) initAdvancedSearchRequest(ctx context.Context) error {
if len(partitionIDs) > 0 { if len(partitionIDs) > 0 {
internalSubReq.PartitionIDs = partitionIDs internalSubReq.PartitionIDs = partitionIDs
t.partitionIDsSet.Upsert(partitionIDs...) t.partitionIDsSet.Upsert(partitionIDs...)
setQueryInfoIfMvEnable(queryInfo, t) mvErr := setQueryInfoIfMvEnable(queryInfo, t, plan)
if mvErr != nil {
return mvErr
}
} }
} else { } else {
internalSubReq.PartitionIDs = t.SearchRequest.GetPartitionIDs() internalSubReq.PartitionIDs = t.SearchRequest.GetPartitionIDs()
@ -406,7 +428,10 @@ func (t *searchTask) initSearchRequest(ctx context.Context) error {
} }
if len(partitionIDs) > 0 { if len(partitionIDs) > 0 {
t.SearchRequest.PartitionIDs = partitionIDs t.SearchRequest.PartitionIDs = partitionIDs
setQueryInfoIfMvEnable(queryInfo, t) mvErr := setQueryInfoIfMvEnable(queryInfo, t, plan)
if mvErr != nil {
return mvErr
}
} }
} }

View File

@ -2628,11 +2628,12 @@ func (s *MaterializedViewTestSuite) TearDownSuite() {
func (s *MaterializedViewTestSuite) SetupTest() { func (s *MaterializedViewTestSuite) SetupTest() {
s.mockMetaCache = NewMockCache(s.T()) s.mockMetaCache = NewMockCache(s.T())
s.mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.colID, nil).Maybe() s.mockMetaCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.colID, nil)
s.mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( s.mockMetaCache.EXPECT().GetCollectionInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&collectionBasicInfo{ &collectionBasicInfo{
collID: s.colID, collID: s.colID,
}, nil).Maybe() partitionKeyIsolation: true,
}, nil)
globalMetaCache = s.mockMetaCache globalMetaCache = s.mockMetaCache
} }
@ -2731,6 +2732,33 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() {
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved) s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
} }
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarCharWithIsolation() {
task := s.getSearchTask()
task.enableMaterializedView = true
task.request.Dsl = testVarCharField + " == \"a\""
schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testVarCharField, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil)
s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil)
err := task.PreExecute(s.ctx)
s.NoError(err)
s.NotZero(len(task.queryInfos))
s.Equal(true, task.queryInfos[0].MaterializedViewInvolved)
}
func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarCharWithIsolationInvalid() {
task := s.getSearchTask()
task.enableMaterializedView = true
task.request.Dsl = testVarCharField + " in [\"a\", \"b\"]"
schema := ConstructCollectionSchemaWithPartitionKey(s.colName, s.fieldName2Types, testInt64Field, testVarCharField, false)
schemaInfo := newSchemaInfo(schema)
s.mockMetaCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(schemaInfo, nil)
s.mockMetaCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"partition_1", "partition_2"}, nil)
s.mockMetaCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"partition_1": 1, "partition_2": 2}, nil)
s.ErrorContains(task.PreExecute(s.ctx), "partition key isolation does not support IN")
}
func TestMaterializedView(t *testing.T) { func TestMaterializedView(t *testing.T) {
suite.Run(t, new(MaterializedViewTestSuite)) suite.Run(t, new(MaterializedViewTestSuite))
} }

View File

@ -1033,7 +1033,7 @@ func TestHasCollectionTask(t *testing.T) {
err = task.Execute(ctx) err = task.Execute(ctx)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, false, task.result.Value) assert.Equal(t, false, task.result.Value)
// createCollection in RootCood and fill GlobalMetaCache // createIsoCollection in RootCood and fill GlobalMetaCache
rc.CreateCollection(ctx, createColReq) rc.CreateCollection(ctx, createColReq)
globalMetaCache.GetCollectionID(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName) globalMetaCache.GetCollectionID(ctx, GetCurDBNameFromContextOrDefault(ctx), collectionName)
@ -3642,3 +3642,224 @@ func TestAlterCollectionCheckLoaded(t *testing.T) {
err = task.PreExecute(context.Background()) err = task.PreExecute(context.Background())
assert.Equal(t, merr.Code(merr.ErrCollectionLoaded), merr.Code(err)) assert.Equal(t, merr.Code(merr.ErrCollectionLoaded), merr.Code(err))
} }
func TestTaskPartitionKeyIsolation(t *testing.T) {
rc := NewRootCoordMock()
defer rc.Close()
dc := NewDataCoordMock()
defer dc.Close()
qc := getQueryCoordClient()
defer qc.Close()
ctx := context.Background()
mgr := newShardClientMgr()
err := InitMetaCache(ctx, rc, qc, mgr)
assert.NoError(t, err)
shardsNum := common.DefaultShardsNum
prefix := "TestPartitionKeyIsolation"
collectionName := prefix + funcutil.GenRandomStr()
getSchema := func(colName string, hasPartitionKey bool) *schemapb.CollectionSchema {
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
fieldName2Type["int64_field"] = schemapb.DataType_Int64
schema := constructCollectionSchemaByDataType(colName, fieldName2Type, "int64_field", false)
if hasPartitionKey {
partitionKeyField := &schemapb.FieldSchema{
Name: "partition_key_field",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
}
fieldName2Type["partition_key_field"] = schemapb.DataType_Int64
schema.Fields = append(schema.Fields, partitionKeyField)
}
return schema
}
getCollectionTask := func(colName string, isIso bool, marshaledSchema []byte) *createCollectionTask {
isoStr := "false"
if isIso {
isoStr = "true"
}
return &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
Timestamp: Timestamp(time.Now().UnixNano()),
},
DbName: "",
CollectionName: colName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
Properties: []*commonpb.KeyValuePair{{Key: common.PartitionKeyIsolationKey, Value: isoStr}},
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
}
createIsoCollection := func(colName string, hasPartitionKey bool, isIsolation bool, isIsoNil bool) {
isoStr := "false"
if isIsolation {
isoStr = "true"
}
schema := getSchema(colName, hasPartitionKey)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createColReq := &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DropCollection,
MsgID: 100,
Timestamp: 100,
},
DbName: dbName,
CollectionName: colName,
Schema: marshaledSchema,
ShardsNum: 1,
Properties: []*commonpb.KeyValuePair{{Key: common.PartitionKeyIsolationKey, Value: isoStr}},
}
if isIsoNil {
createColReq.Properties = nil
}
stats, err := rc.CreateCollection(ctx, createColReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, stats.ErrorCode)
}
getAlterCollectionTask := func(colName string, isIsolation bool) *alterCollectionTask {
isoStr := "false"
if isIsolation {
isoStr = "true"
}
return &alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: colName,
Properties: []*commonpb.KeyValuePair{{Key: common.PartitionKeyIsolationKey, Value: isoStr}},
},
queryCoord: qc,
dataCoord: dc,
}
}
t.Run("create collection valid", func(t *testing.T) {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
schema := getSchema(collectionName, true)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := getCollectionTask(collectionName, true, marshaledSchema)
err = createCollectionTask.PreExecute(ctx)
assert.NoError(t, err)
err = createCollectionTask.Execute(ctx)
assert.NoError(t, err)
})
t.Run("create collection without isolation", func(t *testing.T) {
schema := getSchema(collectionName, true)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := getCollectionTask(collectionName, false, marshaledSchema)
err = createCollectionTask.PreExecute(ctx)
assert.NoError(t, err)
err = createCollectionTask.Execute(ctx)
assert.NoError(t, err)
})
t.Run("create collection isolation but no partition key", func(t *testing.T) {
schema := getSchema(collectionName, false)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := getCollectionTask(collectionName, true, marshaledSchema)
assert.ErrorContains(t, createCollectionTask.PreExecute(ctx), "partition key isolation mode is enabled but no partition key field is set")
})
t.Run("create collection with isolation and partition key but MV is not enabled", func(t *testing.T) {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
schema := getSchema(collectionName, true)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createCollectionTask := getCollectionTask(collectionName, true, marshaledSchema)
assert.ErrorContains(t, createCollectionTask.PreExecute(ctx), "partition key isolation mode is enabled but current Milvus does not support it")
})
t.Run("alter collection from valid", func(t *testing.T) {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
colName := collectionName + "AlterValid"
createIsoCollection(colName, true, false, false)
alterTask := getAlterCollectionTask(colName, true)
err := alterTask.PreExecute(ctx)
assert.NoError(t, err)
})
t.Run("alter collection without isolation", func(t *testing.T) {
colName := collectionName + "AlterNoIso"
createIsoCollection(colName, true, false, true)
alterTask := alterCollectionTask{
AlterCollectionRequest: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{},
CollectionName: colName,
Properties: nil,
},
queryCoord: qc,
}
err := alterTask.PreExecute(ctx)
assert.NoError(t, err)
})
t.Run("alter collection isolation but no partition key", func(t *testing.T) {
colName := collectionName + "AlterNoPartkey"
createIsoCollection(colName, false, false, false)
alterTask := getAlterCollectionTask(colName, true)
assert.ErrorContains(t, alterTask.PreExecute(ctx), "partition key isolation mode is enabled but no partition key field is set")
})
t.Run("alter collection with isolation and partition key but MV is not enabled", func(t *testing.T) {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
colName := collectionName + "AlterNoMv"
createIsoCollection(colName, true, false, false)
alterTask := getAlterCollectionTask(colName, true)
assert.ErrorContains(t, alterTask.PreExecute(ctx), "partition key isolation mode is enabled but current Milvus does not support it")
})
t.Run("alter collection with vec index and isolation", func(t *testing.T) {
paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true")
defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false")
colName := collectionName + "AlterVecIndex"
createIsoCollection(colName, true, true, false)
resp, err := rc.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{DbName: dbName, CollectionName: colName})
assert.NoError(t, err)
var vecFieldID int64 = 0
for _, field := range resp.Schema.Fields {
if field.DataType == schemapb.DataType_FloatVector {
vecFieldID = field.FieldID
break
}
}
assert.NotEqual(t, vecFieldID, int64(0))
dc.DescribeIndexFunc = func(ctx context.Context, request *indexpb.DescribeIndexRequest, opts ...grpc.CallOption) (*indexpb.DescribeIndexResponse, error) {
return &indexpb.DescribeIndexResponse{
Status: merr.Success(),
IndexInfos: []*indexpb.IndexInfo{
{
FieldID: vecFieldID,
},
},
}, nil
}
alterTask := getAlterCollectionTask(colName, false)
assert.ErrorContains(t, alterTask.PreExecute(ctx),
"can not alter partition key isolation mode if the collection already has a vector index. Please drop the index first")
})
}

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
) )
@ -74,6 +75,16 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
core: a.core, core: a.core,
}) })
// properties needs to be refreshed in the cache
aliases := a.core.meta.ListAliasesByID(oldColl.CollectionID)
redoTask.AddSyncStep(&expireCacheStep{
baseStep: baseStep{core: a.core},
dbName: a.Req.GetDbName(),
collectionNames: append(aliases, a.Req.GetCollectionName()),
collectionID: oldColl.CollectionID,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)},
})
return redoTask.Execute(ctx) return redoTask.Execute(ctx)
} }

View File

@ -92,8 +92,9 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
mock.Anything, mock.Anything,
mock.Anything, mock.Anything,
).Return(errors.New("err")) ).Return(errors.New("err"))
meta.On("ListAliasesByID", mock.Anything).Return([]string{})
core := newTestCore(withMeta(meta)) core := newTestCore(withValidProxyManager(), withMeta(meta))
task := &alterCollectionTask{ task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core), baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{ Req: &milvuspb.AlterCollectionRequest{
@ -121,13 +122,49 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
mock.Anything, mock.Anything,
mock.Anything, mock.Anything,
).Return(nil) ).Return(nil)
meta.On("ListAliasesByID", mock.Anything).Return([]string{})
broker := newMockBroker() broker := newMockBroker()
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return errors.New("err") return errors.New("err")
} }
core := newTestCore(withMeta(meta), withBroker(broker)) core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection},
CollectionName: "cn",
Properties: properties,
},
}
err := task.Execute(context.Background())
assert.Error(t, err)
})
t.Run("expire cache failed", func(t *testing.T) {
meta := mockrootcoord.NewIMetaTable(t)
meta.On("GetCollectionByName",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(&model.Collection{CollectionID: int64(1)}, nil)
meta.On("AlterCollection",
mock.Anything,
mock.Anything,
mock.Anything,
mock.Anything,
).Return(nil)
meta.On("ListAliasesByID", mock.Anything).Return([]string{})
broker := newMockBroker()
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return errors.New("err")
}
core := newTestCore(withInvalidProxyManager(), withMeta(meta), withBroker(broker))
task := &alterCollectionTask{ task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core), baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{ Req: &milvuspb.AlterCollectionRequest{
@ -155,13 +192,14 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
mock.Anything, mock.Anything,
mock.Anything, mock.Anything,
).Return(nil) ).Return(nil)
meta.On("ListAliasesByID", mock.Anything).Return([]string{})
broker := newMockBroker() broker := newMockBroker()
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
return nil return nil
} }
core := newTestCore(withMeta(meta), withBroker(broker)) core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker))
task := &alterCollectionTask{ task := &alterCollectionTask{
baseTask: newBaseTask(context.Background(), core), baseTask: newBaseTask(context.Background(), core),
Req: &milvuspb.AlterCollectionRequest{ Req: &milvuspb.AlterCollectionRequest{
@ -220,5 +258,17 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
Key: common.CollectionAutoCompactionKey, Key: common.CollectionAutoCompactionKey,
Value: "true", Value: "true",
}) })
updatePropsIso := []*commonpb.KeyValuePair{
{
Key: common.PartitionKeyIsolationKey,
Value: "true",
},
}
updateCollectionProperties(coll, updatePropsIso)
assert.Contains(t, coll.Properties, &commonpb.KeyValuePair{
Key: common.PartitionKeyIsolationKey,
Value: "true",
})
}) })
} }

View File

@ -509,3 +509,95 @@ func maxGenericValue(left *planpb.GenericValue, right *planpb.GenericValue) *pla
} }
return right return right
} }
func ValidatePartitionKeyIsolation(expr *planpb.Expr) error {
foundPartitionKey, err := validatePartitionKeyIsolationFromExpr(expr)
if err != nil {
return err
}
if !foundPartitionKey {
return errors.New("partition key not found in expr when validating partition key isolation")
}
return nil
}
func validatePartitionKeyIsolationFromExpr(expr *planpb.Expr) (bool, error) {
switch expr := expr.GetExpr().(type) {
case *planpb.Expr_BinaryExpr:
return validatePartitionKeyIsolationFromBinaryExpr(expr.BinaryExpr)
case *planpb.Expr_UnaryExpr:
return validatePartitionKeyIsolationFromUnaryExpr(expr.UnaryExpr)
case *planpb.Expr_TermExpr:
return validatePartitionKeyIsolationFromTermExpr(expr.TermExpr)
case *planpb.Expr_UnaryRangeExpr:
return validatePartitionKeyIsolationFromRangeExpr(expr.UnaryRangeExpr)
}
return false, nil
}
func validatePartitionKeyIsolationFromBinaryExpr(expr *planpb.BinaryExpr) (bool, error) {
// return directly if has errors on either or both sides
leftRes, leftErr := validatePartitionKeyIsolationFromExpr(expr.Left)
if leftErr != nil {
return leftRes, leftErr
}
rightRes, rightErr := validatePartitionKeyIsolationFromExpr(expr.Right)
if rightErr != nil {
return rightRes, rightErr
}
// the following deals with no error on either side
if expr.Op == planpb.BinaryExpr_LogicalAnd {
// if one of them is partition key
// e.g. partition_key_field == 1 && other_field > 10
if leftRes || rightRes {
return true, nil
}
// if none of them is partition key
return false, nil
}
if expr.Op == planpb.BinaryExpr_LogicalOr {
// if either side has partition key, but OR them
// e.g. partition_key_field == 1 || other_field > 10
if leftRes || rightRes {
return true, errors.New("partition key isolation does not support OR")
}
// if none of them has partition key
return false, nil
}
return false, nil
}
func validatePartitionKeyIsolationFromUnaryExpr(expr *planpb.UnaryExpr) (bool, error) {
res, err := validatePartitionKeyIsolationFromExpr(expr.GetChild())
if err != nil {
return res, err
}
if expr.Op == planpb.UnaryExpr_Not {
if res {
return true, errors.New("partition key isolation does not support NOT")
}
return false, nil
}
return res, err
}
func validatePartitionKeyIsolationFromTermExpr(expr *planpb.TermExpr) (bool, error) {
if expr.GetColumnInfo().GetIsPartitionKey() {
// e.g. partition_key_field in [1, 2, 3]
return true, errors.New("partition key isolation does not support IN")
}
return false, nil
}
func validatePartitionKeyIsolationFromRangeExpr(expr *planpb.UnaryRangeExpr) (bool, error) {
if expr.GetColumnInfo().GetIsPartitionKey() {
if expr.GetOp() == planpb.OpType_Equal {
// e.g. partition_key_field == 1
return true, nil
}
return true, errors.Newf("partition key isolation does not support %s", expr.GetOp().String())
}
return false, nil
}

View File

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/parser/planparserv2" "github.com/milvus-io/milvus/internal/parser/planparserv2"
"github.com/milvus-io/milvus/internal/proto/planpb" "github.com/milvus-io/milvus/internal/proto/planpb"
@ -277,3 +278,209 @@ func TestParseStrRanges(t *testing.T) {
assert.Equal(t, range0.includeUpper, false) assert.Equal(t, range0.includeUpper, false)
} }
} }
func TestValidatePartitionKeyIsolation(t *testing.T) {
prefix := "TestValidatePartitionKeyIsolation"
collectionName := prefix + funcutil.GenRandomStr()
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["int64_field"] = schemapb.DataType_Int64
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
fieldName2Type["fvec_field"] = schemapb.DataType_FloatVector
schema := testutil.ConstructCollectionSchemaByDataType(collectionName, fieldName2Type,
"int64_field", false, 8)
schema.Properties = append(schema.Properties, &commonpb.KeyValuePair{
Key: common.PartitionKeyIsolationKey,
Value: "true",
})
partitionKeyField := &schemapb.FieldSchema{
Name: "key_field",
DataType: schemapb.DataType_Int64,
IsPartitionKey: true,
}
schema.Fields = append(schema.Fields, partitionKeyField)
fieldID := common.StartOfUserFieldID
for _, field := range schema.Fields {
field.FieldID = int64(fieldID)
fieldID++
}
schemaHelper, err := typeutil.CreateSchemaHelper(schema)
require.NoError(t, err)
type testCase struct {
name string
expr string
expectedErrorString string
}
cases := []testCase{
{
name: "partition key isolation equal",
expr: "key_field == 10",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with same field equal",
expr: "key_field == 10 && key_field == 10",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with same field equal diff",
expr: "key_field == 10 && key_field == 20",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with same field equal 3",
expr: "key_field == 10 && key_field == 11 && key_field == 12",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with varchar field equal",
expr: "key_field == 10 && varChar_field == 'a'",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with varchar field not equal",
expr: "key_field == 10 && varChar_field != 'a'",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with varchar field in",
expr: "key_field == 10 && varChar_field in ['a', 'b']",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with varchar field in Reversed",
expr: "varChar_field in ['a', 'b'] && key_field == 10",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with varchar field OR",
expr: "key_field == 10 && (varChar_field == 'a' || varChar_field == 'b')",
expectedErrorString: "",
},
{
name: "partition key isolation equal AND with varchar field OR Reversed",
expr: "(varChar_field == 'a' || varChar_field == 'b') && key_field == 10",
expectedErrorString: "",
},
{
name: "partition key isolation equal to arithmic operations",
expr: "key_field == (1+1)",
expectedErrorString: "",
},
{
name: "partition key isolation empty",
expr: "",
expectedErrorString: "partition key not found in expr when validating partition key isolation",
},
{
name: "partition key isolation not equal",
expr: "key_field != 10",
expectedErrorString: "partition key isolation does not support NotEqual",
},
{
name: "partition key isolation term",
expr: "key_field in [10]",
expectedErrorString: "partition key isolation does not support IN",
},
{
name: "partition key isolation term multiple",
expr: "key_field in [10, 20]",
expectedErrorString: "partition key isolation does not support IN",
},
{
name: "partition key isolation NOT term",
expr: "key_field not in [10]",
expectedErrorString: "partition key isolation does not support IN",
},
{
name: "partition key isolation less",
expr: "key_field < 10",
expectedErrorString: "partition key isolation does not support LessThan",
},
{
name: "partition key isolation less or equal",
expr: "key_field <= 10",
expectedErrorString: "partition key isolation does not support LessEq",
},
{
name: "partition key isolation greater",
expr: "key_field > 10",
expectedErrorString: "partition key isolation does not support GreaterThan",
},
{
name: "partition key isolation equal greator or equal",
expr: "key_field >= 10",
expectedErrorString: "partition key isolation does not support GreaterEqual",
},
{
name: "partition key isolation NOT equal",
expr: "not(key_field == 10)",
expectedErrorString: "partition key isolation does not support NOT",
},
{
name: "partition key isolation equal AND with same field term",
expr: "key_field == 10 && key_field in [10]",
expectedErrorString: "partition key isolation does not support IN",
},
{
name: "partition key isolation equal OR with same field equal",
expr: "key_field == 10 || key_field == 11",
expectedErrorString: "partition key isolation does not support OR",
},
{
name: "partition key isolation equal OR with same field equal Reversed",
expr: "key_field == 11 || key_field == 10",
expectedErrorString: "partition key isolation does not support OR",
},
{
name: "partition key isolation equal OR with other field equal",
expr: "key_field == 10 || varChar_field == 'a'",
expectedErrorString: "partition key isolation does not support OR",
},
{
name: "partition key isolation equal OR with other field equal Reversed",
expr: "varChar_field == 'a' || key_field == 10",
expectedErrorString: "partition key isolation does not support OR",
},
{
name: "partition key isolation equal OR with other field equal",
expr: "key_field == 10 || varChar_field == 'a'",
expectedErrorString: "partition key isolation does not support OR",
},
{
name: "partition key isolation equal AND",
expr: "key_field == 10 && (key_field == 10 || key_field == 11)",
expectedErrorString: "partition key isolation does not support OR",
},
{
name: "partition key isolation other field equal",
expr: "varChar_field == 'a'",
expectedErrorString: "partition key not found in expr when validating partition key isolation",
},
{
name: "partition key isolation other field equal AND",
expr: "varChar_field == 'a' && int64_field == 1",
expectedErrorString: "partition key not found in expr when validating partition key isolation",
},
{
name: "partition key isolation complex OR",
expr: "(key_field == 10 and int64_field == 11) or (key_field == 10 and varChar_field == 'a')",
expectedErrorString: "partition key isolation does not support OR",
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
queryPlan, err := planparserv2.CreateRetrievePlan(schemaHelper, tc.expr)
assert.NoError(t, err)
planExpr, err := ParseExprFromPlan(queryPlan)
assert.NoError(t, err)
if tc.expectedErrorString != "" {
assert.ErrorContains(t, ValidatePartitionKeyIsolation(planExpr), tc.expectedErrorString)
} else {
assert.NoError(t, ValidatePartitionKeyIsolation(planExpr))
}
})
}
}

View File

@ -22,6 +22,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
) )
@ -161,8 +162,9 @@ const (
// common properties // common properties
const ( const (
MmapEnabledKey = "mmap.enabled" MmapEnabledKey = "mmap.enabled"
LazyLoadEnableKey = "lazyload.enabled" LazyLoadEnableKey = "lazyload.enabled"
PartitionKeyIsolationKey = "partitionkey.isolation"
) )
const ( const (
@ -224,6 +226,31 @@ func IsCollectionLazyLoadEnabled(kvs ...*commonpb.KeyValuePair) bool {
return false return false
} }
func IsPartitionKeyIsolationKvEnabled(kvs ...*commonpb.KeyValuePair) (bool, error) {
for _, kv := range kvs {
if kv.Key == PartitionKeyIsolationKey {
val, err := strconv.ParseBool(strings.ToLower(kv.Value))
if err != nil {
return false, errors.Wrap(err, "failed to parse partition key isolation")
}
return val, nil
}
}
return false, nil
}
func IsPartitionKeyIsolationPropEnabled(props map[string]string) (bool, error) {
val, ok := props[PartitionKeyIsolationKey]
if !ok {
return false, nil
}
iso, parseErr := strconv.ParseBool(val)
if parseErr != nil {
return false, errors.Wrap(parseErr, "failed to parse partition key isolation property")
}
return iso, nil
}
const ( const (
// LatestVerision is the magic number for watch latest revision // LatestVerision is the magic number for watch latest revision
LatestRevision = int64(-1) LatestRevision = int64(-1)

View File

@ -88,3 +88,64 @@ func TestDatabaseProperties(t *testing.T) {
_, err = DatabaseLevelResourceGroups(props) _, err = DatabaseLevelResourceGroups(props)
assert.Error(t, err) assert.Error(t, err)
} }
func TestCommonPartitionKeyIsolation(t *testing.T) {
getProto := func(val string) []*commonpb.KeyValuePair {
return []*commonpb.KeyValuePair{
{
Key: PartitionKeyIsolationKey,
Value: val,
},
}
}
getMp := func(val string) map[string]string {
return map[string]string{
PartitionKeyIsolationKey: val,
}
}
t.Run("pb", func(t *testing.T) {
props := getProto("true")
res, err := IsPartitionKeyIsolationKvEnabled(props...)
assert.NoError(t, err)
assert.True(t, res)
props = getProto("false")
res, err = IsPartitionKeyIsolationKvEnabled(props...)
assert.NoError(t, err)
assert.False(t, res)
props = getProto("")
res, err = IsPartitionKeyIsolationKvEnabled(props...)
assert.ErrorContains(t, err, "failed to parse partition key isolation")
assert.False(t, res)
props = getProto("invalid")
res, err = IsPartitionKeyIsolationKvEnabled(props...)
assert.ErrorContains(t, err, "failed to parse partition key isolation")
assert.False(t, res)
})
t.Run("map", func(t *testing.T) {
props := getMp("true")
res, err := IsPartitionKeyIsolationPropEnabled(props)
assert.NoError(t, err)
assert.True(t, res)
props = getMp("false")
res, err = IsPartitionKeyIsolationPropEnabled(props)
assert.NoError(t, err)
assert.False(t, res)
props = getMp("")
res, err = IsPartitionKeyIsolationPropEnabled(props)
assert.ErrorContains(t, err, "failed to parse partition key isolation property")
assert.False(t, res)
props = getMp("invalid")
res, err = IsPartitionKeyIsolationPropEnabled(props)
assert.ErrorContains(t, err, "failed to parse partition key isolation property")
assert.False(t, res)
})
}