From 8c86e4e6ac2db017af50024f1b343478dd71c923 Mon Sep 17 00:00:00 2001 From: Xianhui Lin <35839735+JsDove@users.noreply.github.com> Date: Mon, 17 Feb 2025 10:00:15 +0800 Subject: [PATCH] fix: [2.5]AlterCollection unable to modify ConsistencyLevel (#39902) fix: AlterCollection unable to modify ConsistencyLevel issue: https://github.com/milvus-io/milvus/issues/39707 pr: https://github.com/milvus-io/milvus/pull/39708 Signed-off-by: Xianhui.Lin --- internal/rootcoord/alter_collection_task.go | 22 +++++++++ .../rootcoord/alter_collection_task_test.go | 45 +++++++++++++++++++ internal/rootcoord/create_collection_task.go | 6 ++- .../rootcoord/create_collection_task_test.go | 6 +++ 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 3b3b2f7cdf..66b9452319 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -19,6 +19,7 @@ package rootcoord import ( "context" "fmt" + "strconv" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -90,6 +91,24 @@ func (a *alterCollectionTask) GetLockerKey() LockerKey { ) } +func getConsistencyLevel(props ...*commonpb.KeyValuePair) (bool, commonpb.ConsistencyLevel) { + for _, p := range props { + if p.GetKey() == common.ConsistencyLevel { + value := p.GetValue() + if level, err := strconv.ParseInt(value, 10, 32); err == nil { + if _, ok := commonpb.ConsistencyLevel_name[int32(level)]; ok { + return true, commonpb.ConsistencyLevel(level) + } + } else { + if level, ok := commonpb.ConsistencyLevel_value[value]; ok { + return true, commonpb.ConsistencyLevel(level) + } + } + } + } + return false, commonpb.ConsistencyLevel(0) +} + func executeAlterCollectionTaskSteps(ctx context.Context, core *Core, col *model.Collection, @@ -101,6 +120,9 @@ func executeAlterCollectionTaskSteps(ctx context.Context, oldColl := col.Clone() oldColl.Properties = oldProperties newColl := col.Clone() + if ok, level := getConsistencyLevel(newProperties...); ok { + newColl.ConsistencyLevel = level + } newColl.Properties = newProperties tso, err := core.tsoAllocator.GenerateTSO(1) if err == nil { diff --git a/internal/rootcoord/alter_collection_task_test.go b/internal/rootcoord/alter_collection_task_test.go index a4eade44e7..53b26e6c13 100644 --- a/internal/rootcoord/alter_collection_task_test.go +++ b/internal/rootcoord/alter_collection_task_test.go @@ -379,4 +379,49 @@ func Test_alterCollectionTask_Execute(t *testing.T) { coll.Properties = DeleteProperties(coll.Properties, deleteKeys) assert.Empty(t, coll.Properties) }) + + t.Run("alter successfully3", func(t *testing.T) { + meta := mockrootcoord.NewIMetaTable(t) + meta.On("GetCollectionByName", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(&model.Collection{ + CollectionID: int64(1), + Name: "cn", + DBName: "foo", + }, nil) + meta.On("AlterCollection", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil) + meta.On("ListAliasesByID", mock.Anything, mock.Anything).Return([]string{}) + broker := newMockBroker() + broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error { + return nil + } + packChan := make(chan *msgstream.MsgPack, 10) + ticker := newChanTimeTickSync(packChan) + ticker.addDmlChannels("by-dev-rootcoord-dml_1") + core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker), withInvalidTsoAllocator()) + task := &alterCollectionTask{ + baseTask: newBaseTask(context.Background(), core), + Req: &milvuspb.AlterCollectionRequest{ + Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection}, + CollectionName: "cn", + Properties: []*commonpb.KeyValuePair{ + { + Key: common.ConsistencyLevel, + Value: "1", + }, + }, + }, + } + + err := task.Execute(context.Background()) + assert.NoError(t, err) + }) } diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 1fd5729939..ffb076ea5f 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -573,6 +573,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { State: pb.PartitionState_PartitionCreated, } } + ConsistencyLevel := t.Req.ConsistencyLevel + if ok, level := getConsistencyLevel(t.Req.Properties...); ok { + ConsistencyLevel = level + } collInfo := model.Collection{ CollectionID: collID, DBID: t.dbID, @@ -585,7 +589,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { VirtualChannelNames: vchanNames, PhysicalChannelNames: chanNames, ShardsNum: t.Req.ShardsNum, - ConsistencyLevel: t.Req.ConsistencyLevel, + ConsistencyLevel: ConsistencyLevel, CreateTime: ts, State: pb.CollectionState_CollectionCreating, Partitions: partitions, diff --git a/internal/rootcoord/create_collection_task_test.go b/internal/rootcoord/create_collection_task_test.go index a27af1cf7a..3cb89b1020 100644 --- a/internal/rootcoord/create_collection_task_test.go +++ b/internal/rootcoord/create_collection_task_test.go @@ -1065,6 +1065,12 @@ func Test_createCollectionTask_Execute(t *testing.T) { CollectionName: collectionName, Schema: marshaledSchema, ShardsNum: int32(shardNum), + Properties: []*commonpb.KeyValuePair{ + { + Key: common.ConsistencyLevel, + Value: "1", + }, + }, }, channels: collectionChannels{physicalChannels: pchans}, schema: schema,