From 0df2c75b775a08ec783f4d1652e2f51ce79e5da8 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Wed, 15 Jan 2025 20:11:08 +0800 Subject: [PATCH] fix: Fix rootcoord meta mutex contention (#38799) RootCoord meta uses copy-on-write, allowing the removal of unnecessary copies. issue: https://github.com/milvus-io/milvus/issues/37630 --------- Signed-off-by: bigsheeper --- internal/metastore/model/collection.go | 41 ++++++++ internal/metastore/model/collection_test.go | 108 ++++++++++++++++++++ internal/rootcoord/meta_table.go | 20 ++-- 3 files changed, 163 insertions(+), 6 deletions(-) diff --git a/internal/metastore/model/collection.go b/internal/metastore/model/collection.go index a620c70601..6a26adfd38 100644 --- a/internal/metastore/model/collection.go +++ b/internal/metastore/model/collection.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package model import ( @@ -36,6 +52,31 @@ func (c *Collection) Available() bool { return c.State == pb.CollectionState_CollectionCreated } +func (c *Collection) ShallowClone() *Collection { + return &Collection{ + TenantID: c.TenantID, + DBID: c.DBID, + CollectionID: c.CollectionID, + Name: c.Name, + DBName: c.DBName, + Description: c.Description, + AutoID: c.AutoID, + Fields: c.Fields, + Partitions: c.Partitions, + VirtualChannelNames: c.VirtualChannelNames, + PhysicalChannelNames: c.PhysicalChannelNames, + ShardsNum: c.ShardsNum, + ConsistencyLevel: c.ConsistencyLevel, + CreateTime: c.CreateTime, + StartPositions: c.StartPositions, + Aliases: c.Aliases, + Properties: c.Properties, + State: c.State, + EnableDynamicField: c.EnableDynamicField, + Functions: c.Functions, + } +} + func (c *Collection) Clone() *Collection { return &Collection{ TenantID: c.TenantID, diff --git a/internal/metastore/model/collection_test.go b/internal/metastore/model/collection_test.go index ee11b29abf..78ed5600a8 100644 --- a/internal/metastore/model/collection_test.go +++ b/internal/metastore/model/collection_test.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package model import ( @@ -405,3 +421,95 @@ func TestCollection_Equal(t *testing.T) { }) } } + +func TestClone(t *testing.T) { + collection := &Collection{ + TenantID: "1", + DBID: 2, + CollectionID: 3, + Partitions: []*Partition{ + { + PartitionID: 4, + PartitionName: "5", + PartitionCreatedTimestamp: 6, + CollectionID: 7, + State: pb.PartitionState_PartitionCreated, + }, + { + PartitionID: 8, + PartitionName: "9", + PartitionCreatedTimestamp: 10, + CollectionID: 11, + State: pb.PartitionState_PartitionCreating, + }, + }, + Name: "12", + DBName: "13", + Description: "14", + AutoID: true, + Fields: []*Field{ + { + FieldID: 15, + Name: "16", + IsPrimaryKey: false, + Description: "17", + DataType: schemapb.DataType_Double, + TypeParams: []*commonpb.KeyValuePair{{Key: "18", Value: "19"}}, + IndexParams: []*commonpb.KeyValuePair{{Key: "20", Value: "21"}}, + AutoID: true, + State: schemapb.FieldState_FieldDropping, + IsDynamic: true, + IsPartitionKey: false, + IsClusteringKey: true, + IsFunctionOutput: false, + DefaultValue: nil, + ElementType: schemapb.DataType_String, + Nullable: true, + }, + { + FieldID: 22, + Name: "23", + IsPrimaryKey: true, + Description: "24", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{{Key: "25", Value: "26"}}, + IndexParams: []*commonpb.KeyValuePair{{Key: "27", Value: "28"}}, + AutoID: true, + State: schemapb.FieldState_FieldCreating, + IsDynamic: true, + IsPartitionKey: false, + IsClusteringKey: true, + IsFunctionOutput: false, + DefaultValue: nil, + ElementType: schemapb.DataType_VarChar, + Nullable: true, + }, + }, + Functions: []*Function{ + { + ID: functionID, + Name: functionName, + Type: schemapb.FunctionType_BM25, + InputFieldIDs: []int64{101}, + InputFieldNames: []string{"text"}, + OutputFieldIDs: []int64{103}, + OutputFieldNames: []string{"sparse"}, + }, + }, + VirtualChannelNames: []string{"c1", "c2"}, + PhysicalChannelNames: []string{"c3", "c4"}, + ShardsNum: 2, + StartPositions: startPositions, + CreateTime: 1234, + ConsistencyLevel: commonpb.ConsistencyLevel_Eventually, + Aliases: []string{"a1", "a2"}, + Properties: []*commonpb.KeyValuePair{{Key: "32", Value: "33"}}, + State: pb.CollectionState_CollectionCreated, + EnableDynamicField: true, + } + + clone1 := collection.Clone() + assert.Equal(t, clone1, collection) + clone2 := collection.ShallowClone() + assert.Equal(t, clone2, collection) +} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 02e91a4ac6..cf76a93547 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -549,19 +549,23 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID return nil } +// Note: The returned model.Collection is read-only. Do NOT modify it directly, +// as it may cause unexpected behavior or inconsistencies. func filterUnavailable(coll *model.Collection) *model.Collection { - clone := coll.Clone() + clone := coll.ShallowClone() // pick available partitions. - clone.Partitions = nil + clone.Partitions = make([]*model.Partition, 0, len(coll.Partitions)) for _, partition := range coll.Partitions { if partition.Available() { - clone.Partitions = append(clone.Partitions, partition.Clone()) + clone.Partitions = append(clone.Partitions, partition) } } return clone } // getLatestCollectionByIDInternal should be called with ts = typeutil.MaxTimestamp +// Note: The returned model.Collection is read-only. Do NOT modify it directly, +// as it may cause unexpected behavior or inconsistencies. func (mt *MetaTable) getLatestCollectionByIDInternal(ctx context.Context, collectionID UniqueID, allowUnavailable bool) (*model.Collection, error) { coll, ok := mt.collID2Meta[collectionID] if !ok || coll == nil { @@ -579,6 +583,8 @@ func (mt *MetaTable) getLatestCollectionByIDInternal(ctx context.Context, collec } // getCollectionByIDInternal get collection by collection id without lock. +// Note: The returned model.Collection is read-only. Do NOT modify it directly, +// as it may cause unexpected behavior or inconsistencies. func (mt *MetaTable) getCollectionByIDInternal(ctx context.Context, dbName string, collectionID UniqueID, ts Timestamp, allowUnavailable bool) (*model.Collection, error) { if isMaxTs(ts) { return mt.getLatestCollectionByIDInternal(ctx, collectionID, allowUnavailable) @@ -652,6 +658,8 @@ func (mt *MetaTable) GetCollectionID(ctx context.Context, dbName string, collect return InvalidCollectionID } +// Note: The returned model.Collection is read-only. Do NOT modify it directly, +// as it may cause unexpected behavior or inconsistencies. func (mt *MetaTable) getCollectionByNameInternal(ctx context.Context, dbName string, collectionName string, ts Timestamp) (*model.Collection, error) { // backward compatibility for rolling upgrade if dbName == "" { @@ -852,8 +860,8 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam } // check new collection already exists - newColl, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts) - if newColl != nil { + coll, err := mt.getCollectionByNameInternal(ctx, newDBName, newName, ts) + if coll != nil { log.Warn("check new collection fail") return fmt.Errorf("duplicated new collection name %s:%s with other collection name or alias", newDBName, newName) } @@ -875,7 +883,7 @@ func (mt *MetaTable) RenameCollection(ctx context.Context, dbName string, oldNam return fmt.Errorf("fail to rename db name, must drop all aliases of this collection before rename") } - newColl = oldColl.Clone() + newColl := oldColl.Clone() newColl.Name = newName newColl.DBID = targetDB.ID if err := mt.catalog.AlterCollection(ctx, oldColl, newColl, metastore.MODIFY, ts); err != nil {