fix: Fix rootcoord meta mutex contention (#38799)

RootCoord meta uses copy-on-write, allowing the removal of unnecessary
copies.

issue: https://github.com/milvus-io/milvus/issues/37630

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38775/head
yihao.dai 2025-01-15 20:11:08 +08:00 committed by GitHub
parent 82bdf9a6a8
commit 0df2c75b77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 163 additions and 6 deletions

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
@ -36,6 +52,31 @@ func (c *Collection) Available() bool {
return c.State == pb.CollectionState_CollectionCreated
}
func (c *Collection) ShallowClone() *Collection {
return &Collection{
TenantID: c.TenantID,
DBID: c.DBID,
CollectionID: c.CollectionID,
Name: c.Name,
DBName: c.DBName,
Description: c.Description,
AutoID: c.AutoID,
Fields: c.Fields,
Partitions: c.Partitions,
VirtualChannelNames: c.VirtualChannelNames,
PhysicalChannelNames: c.PhysicalChannelNames,
ShardsNum: c.ShardsNum,
ConsistencyLevel: c.ConsistencyLevel,
CreateTime: c.CreateTime,
StartPositions: c.StartPositions,
Aliases: c.Aliases,
Properties: c.Properties,
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: c.Functions,
}
}
func (c *Collection) Clone() *Collection {
return &Collection{
TenantID: c.TenantID,

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
@ -405,3 +421,95 @@ func TestCollection_Equal(t *testing.T) {
})
}
}
func TestClone(t *testing.T) {
collection := &Collection{
TenantID: "1",
DBID: 2,
CollectionID: 3,
Partitions: []*Partition{
{
PartitionID: 4,
PartitionName: "5",
PartitionCreatedTimestamp: 6,
CollectionID: 7,
State: pb.PartitionState_PartitionCreated,
},
{
PartitionID: 8,
PartitionName: "9",
PartitionCreatedTimestamp: 10,
CollectionID: 11,
State: pb.PartitionState_PartitionCreating,
},
},
Name: "12",
DBName: "13",
Description: "14",
AutoID: true,
Fields: []*Field{
{
FieldID: 15,
Name: "16",
IsPrimaryKey: false,
Description: "17",
DataType: schemapb.DataType_Double,
TypeParams: []*commonpb.KeyValuePair{{Key: "18", Value: "19"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "20", Value: "21"}},
AutoID: true,
State: schemapb.FieldState_FieldDropping,
IsDynamic: true,
IsPartitionKey: false,
IsClusteringKey: true,
IsFunctionOutput: false,
DefaultValue: nil,
ElementType: schemapb.DataType_String,
Nullable: true,
},
{
FieldID: 22,
Name: "23",
IsPrimaryKey: true,
Description: "24",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{{Key: "25", Value: "26"}},
IndexParams: []*commonpb.KeyValuePair{{Key: "27", Value: "28"}},
AutoID: true,
State: schemapb.FieldState_FieldCreating,
IsDynamic: true,
IsPartitionKey: false,
IsClusteringKey: true,
IsFunctionOutput: false,
DefaultValue: nil,
ElementType: schemapb.DataType_VarChar,
Nullable: true,
},
},
Functions: []*Function{
{
ID: functionID,
Name: functionName,
Type: schemapb.FunctionType_BM25,
InputFieldIDs: []int64{101},
InputFieldNames: []string{"text"},
OutputFieldIDs: []int64{103},
OutputFieldNames: []string{"sparse"},
},
},
VirtualChannelNames: []string{"c1", "c2"},
PhysicalChannelNames: []string{"c3", "c4"},
ShardsNum: 2,
StartPositions: startPositions,
CreateTime: 1234,
ConsistencyLevel: commonpb.ConsistencyLevel_Eventually,
Aliases: []string{"a1", "a2"},
Properties: []*commonpb.KeyValuePair{{Key: "32", Value: "33"}},
State: pb.CollectionState_CollectionCreated,
EnableDynamicField: true,
}
clone1 := collection.Clone()
assert.Equal(t, clone1, collection)
clone2 := collection.ShallowClone()
assert.Equal(t, clone2, collection)
}

View File

@ -549,19 +549,23 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID
return nil
}
// Note: The returned model.Collection is read-only. Do NOT modify it directly,
// as it may cause unexpected behavior or inconsistencies.
func filterUnavailable(coll *model.Collection) *model.Collection {
clone := coll.Clone()
clone := coll.ShallowClone()
// pick available partitions.
clone.Partitions = nil
clone.Partitions = make([]*model.Partition, 0, len(coll.Partitions))
for _, partition := range coll.Partitions {
if partition.Available() {
clone.Partitions = append(clone.Partitions, partition.Clone())
clone.Partitions = append(clone.Partitions, partition)
}
}
return clone
}
// getLatestCollectionByIDInternal should be called with ts = typeutil.MaxTimestamp
// Note: The returned model.Collection is read-only. Do NOT modify it directly,
// as it may cause unexpected behavior or inconsistencies.
func (mt *MetaTable) getLatestCollectionByIDInternal(ctx context.Context, collectionID UniqueID, allowUnavailable bool) (*model.Collection, error) {
coll, ok := mt.collID2Meta[collectionID]
if !ok || coll == nil {
@ -579,6 +583,8 @@ func (mt *MetaTable) getLatestCollectionByIDInternal(ctx context.Context, collec
}
// getCollectionByIDInternal get collection by collection id without lock.
// Note: The returned model.Collection is read-only. Do NOT modify it directly,
// as it may cause unexpected behavior or inconsistencies.
func (mt *MetaTable) getCollectionByIDInternal(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error) {
if isMaxTs(ts) {
return mt.getLatestCollectionByIDInternal(ctx, collectionID, allowUnavailable)
@ -652,6 +658,8 @@ func (mt *MetaTable) GetCollectionID(ctx context.Context, dbName string, collect
return InvalidCollectionID
}
// Note: The returned model.Collection is read-only. Do NOT modify it directly,
// as it may cause unexpected behavior or inconsistencies.
func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) {
// backward compatibility for rolling upgrade
if dbName == "" {
@ -852,8 +860,8 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
}
// check new collection already exists
newColl, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts)
if newColl != nil {
coll, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts)
if coll != nil {
log.Warn("check new collection fail")
return fmt.Errorf("duplicated new collection name %s:%s with other collection name or alias", newDBName, newName)
}
@ -875,7 +883,7 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam
return fmt.Errorf("fail to rename db name, must drop all aliases of this collection before rename")
}
newColl = oldColl.Clone()
newColl := oldColl.Clone()
newColl.Name = newName
newColl.DBID = targetDB.ID
if err := mt.catalog.AlterCollection(ctx, oldColl, newColl, metastore.MODIFY, ts); err != nil {