mirror of https://github.com/milvus-io/milvus.git
fix: [2.5]AlterCollection unable to modify ConsistencyLevel (#39902)
fix: AlterCollection unable to modify ConsistencyLevel issue: https://github.com/milvus-io/milvus/issues/39707 pr: https://github.com/milvus-io/milvus/pull/39708 Signed-off-by: Xianhui.Lin <xianhui.lin@zilliz.com>pull/39900/head
parent
9918e1008d
commit
8c86e4e6ac
|
@ -19,6 +19,7 @@ package rootcoord
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/samber/lo"
|
||||
|
@ -90,6 +91,24 @@ func (a *alterCollectionTask) GetLockerKey() LockerKey {
|
|||
)
|
||||
}
|
||||
|
||||
func getConsistencyLevel(props ...*commonpb.KeyValuePair) (bool, commonpb.ConsistencyLevel) {
|
||||
for _, p := range props {
|
||||
if p.GetKey() == common.ConsistencyLevel {
|
||||
value := p.GetValue()
|
||||
if level, err := strconv.ParseInt(value, 10, 32); err == nil {
|
||||
if _, ok := commonpb.ConsistencyLevel_name[int32(level)]; ok {
|
||||
return true, commonpb.ConsistencyLevel(level)
|
||||
}
|
||||
} else {
|
||||
if level, ok := commonpb.ConsistencyLevel_value[value]; ok {
|
||||
return true, commonpb.ConsistencyLevel(level)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, commonpb.ConsistencyLevel(0)
|
||||
}
|
||||
|
||||
func executeAlterCollectionTaskSteps(ctx context.Context,
|
||||
core *Core,
|
||||
col *model.Collection,
|
||||
|
@ -101,6 +120,9 @@ func executeAlterCollectionTaskSteps(ctx context.Context,
|
|||
oldColl := col.Clone()
|
||||
oldColl.Properties = oldProperties
|
||||
newColl := col.Clone()
|
||||
if ok, level := getConsistencyLevel(newProperties...); ok {
|
||||
newColl.ConsistencyLevel = level
|
||||
}
|
||||
newColl.Properties = newProperties
|
||||
tso, err := core.tsoAllocator.GenerateTSO(1)
|
||||
if err == nil {
|
||||
|
|
|
@ -379,4 +379,49 @@ func Test_alterCollectionTask_Execute(t *testing.T) {
|
|||
coll.Properties = DeleteProperties(coll.Properties, deleteKeys)
|
||||
assert.Empty(t, coll.Properties)
|
||||
})
|
||||
|
||||
t.Run("alter successfully3", func(t *testing.T) {
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
meta.On("GetCollectionByName",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(&model.Collection{
|
||||
CollectionID: int64(1),
|
||||
Name: "cn",
|
||||
DBName: "foo",
|
||||
}, nil)
|
||||
meta.On("AlterCollection",
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
mock.Anything,
|
||||
).Return(nil)
|
||||
meta.On("ListAliasesByID", mock.Anything, mock.Anything).Return([]string{})
|
||||
broker := newMockBroker()
|
||||
broker.BroadcastAlteredCollectionFunc = func(ctx context.Context, req *milvuspb.AlterCollectionRequest) error {
|
||||
return nil
|
||||
}
|
||||
packChan := make(chan *msgstream.MsgPack, 10)
|
||||
ticker := newChanTimeTickSync(packChan)
|
||||
ticker.addDmlChannels("by-dev-rootcoord-dml_1")
|
||||
core := newTestCore(withValidProxyManager(), withMeta(meta), withBroker(broker), withTtSynchronizer(ticker), withInvalidTsoAllocator())
|
||||
task := &alterCollectionTask{
|
||||
baseTask: newBaseTask(context.Background(), core),
|
||||
Req: &milvuspb.AlterCollectionRequest{
|
||||
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_AlterCollection},
|
||||
CollectionName: "cn",
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.ConsistencyLevel,
|
||||
Value: "1",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := task.Execute(context.Background())
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -573,6 +573,10 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
|||
State: pb.PartitionState_PartitionCreated,
|
||||
}
|
||||
}
|
||||
ConsistencyLevel := t.Req.ConsistencyLevel
|
||||
if ok, level := getConsistencyLevel(t.Req.Properties...); ok {
|
||||
ConsistencyLevel = level
|
||||
}
|
||||
collInfo := model.Collection{
|
||||
CollectionID: collID,
|
||||
DBID: t.dbID,
|
||||
|
@ -585,7 +589,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
|
|||
VirtualChannelNames: vchanNames,
|
||||
PhysicalChannelNames: chanNames,
|
||||
ShardsNum: t.Req.ShardsNum,
|
||||
ConsistencyLevel: t.Req.ConsistencyLevel,
|
||||
ConsistencyLevel: ConsistencyLevel,
|
||||
CreateTime: ts,
|
||||
State: pb.CollectionState_CollectionCreating,
|
||||
Partitions: partitions,
|
||||
|
|
|
@ -1065,6 +1065,12 @@ func Test_createCollectionTask_Execute(t *testing.T) {
|
|||
CollectionName: collectionName,
|
||||
Schema: marshaledSchema,
|
||||
ShardsNum: int32(shardNum),
|
||||
Properties: []*commonpb.KeyValuePair{
|
||||
{
|
||||
Key: common.ConsistencyLevel,
|
||||
Value: "1",
|
||||
},
|
||||
},
|
||||
},
|
||||
channels: collectionChannels{physicalChannels: pchans},
|
||||
schema: schema,
|
||||
|
|
Loading…
Reference in New Issue