enhance: make consistency level used in delete configurable (#29280) (#29284)

pr: #29280 
issue: #29279

Signed-off-by: longjiquan <jiquan.long@zilliz.com>
pull/29350/head
Jiquan Long 2023-12-20 14:14:34 +08:00 committed by GitHub
parent 3182b9df5b
commit 8ef0c571b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 1 deletions

View File

@ -532,6 +532,7 @@ common:
SearchCacheBudgetGBRatio: 0.1
LoadNumThreadRatio: 8
BeamWidthRatio: 4
consistencyLevelUsedInDelete: "Bounded"
gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency.
gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time.
storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead

View File

@ -315,7 +315,7 @@ func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream,
PartitionIDs: partitionIDs,
SerializedExprPlan: serializedPlan,
OutputFieldsId: outputFieldIDs,
GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, commonpb.ConsistencyLevel_Bounded),
GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, getConsistencyLevelFromConfig()),
},
DmlChannels: channelIDs,
Scope: querypb.DataScope_All,

View File

@ -768,6 +768,19 @@ func ReplaceID2Name(oldStr string, id int64, name string) string {
return strings.ReplaceAll(oldStr, strconv.FormatInt(id, 10), name)
}
func getConsistencyLevelFromConfig() commonpb.ConsistencyLevel {
value := Params.CommonCfg.ConsistencyLevelUsedInDelete.GetValue()
trimed := strings.TrimSpace(value)
lowered := strings.ToLower(trimed)
for consistencyLevel := range commonpb.ConsistencyLevel_value {
if lowered == strings.ToLower(consistencyLevel) {
return commonpb.ConsistencyLevel(commonpb.ConsistencyLevel_value[consistencyLevel])
}
}
// not found, use default.
return paramtable.DefaultConsistencyLevelUsedInDelete
}
func parseGuaranteeTsFromConsistency(ts, tMax typeutil.Timestamp, consistency commonpb.ConsistencyLevel) typeutil.Timestamp {
switch consistency {
case commonpb.ConsistencyLevel_Strong:

View File

@ -2087,3 +2087,17 @@ func TestSendReplicateMessagePack(t *testing.T) {
SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropIndexRequest{})
})
}
func Test_getConsistencyLevelFromConfig(t *testing.T) {
paramtable.Init()
original := Params.CommonCfg.ConsistencyLevelUsedInDelete.GetValue()
defer func() {
Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue(original)
}()
for consistencyLevel := range commonpb.ConsistencyLevel_value {
Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue(consistencyLevel)
assert.Equal(t, commonpb.ConsistencyLevel(commonpb.ConsistencyLevel_value[consistencyLevel]), getConsistencyLevelFromConfig())
}
Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue("invalid")
assert.Equal(t, paramtable.DefaultConsistencyLevelUsedInDelete, getConsistencyLevelFromConfig())
}

View File

@ -22,6 +22,7 @@ import (
"github.com/shirou/gopsutil/v3/disk"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware"
@ -31,6 +32,7 @@ import (
const (
// DefaultIndexSliceSize defines the default slice size of index file when serializing.
DefaultIndexSliceSize = 16
DefaultConsistencyLevelUsedInDelete = commonpb.ConsistencyLevel_Bounded
DefaultGracefulTime = 5000 // ms
DefaultGracefulStopTimeout = 1800 // s
DefaultHighPriorityThreadCoreCoefficient = 10
@ -195,6 +197,7 @@ type commonConfig struct {
SearchCacheBudgetGBRatio ParamItem `refreshable:"true"`
LoadNumThreadRatio ParamItem `refreshable:"true"`
BeamWidthRatio ParamItem `refreshable:"true"`
ConsistencyLevelUsedInDelete ParamItem `refreshable:"true"`
GracefulTime ParamItem `refreshable:"true"`
GracefulStopTimeout ParamItem `refreshable:"true"`
@ -465,6 +468,15 @@ This configuration is only used by querynode and indexnode, it selects CPU instr
}
p.BeamWidthRatio.Init(base.mgr)
p.ConsistencyLevelUsedInDelete = ParamItem{
Key: "common.consistencyLevelUsedInDelete",
Version: "2.0.0",
DefaultValue: DefaultConsistencyLevelUsedInDelete.String(),
Doc: "Consistency level used in delete by expression",
Export: true,
}
p.ConsistencyLevelUsedInDelete.Init(base.mgr)
p.GracefulTime = ParamItem{
Key: "common.gracefulTime",
Version: "2.0.0",

View File

@ -53,6 +53,9 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, Params.IndexSliceSize.GetAsInt64(), int64(DefaultIndexSliceSize))
t.Logf("knowhere index slice size = %d", Params.IndexSliceSize.GetAsInt64())
assert.Equal(t, Params.ConsistencyLevelUsedInDelete.GetValue(), DefaultConsistencyLevelUsedInDelete.String())
t.Logf("default ConsistencyLevelUsedInDelete = %s", Params.ConsistencyLevelUsedInDelete.GetValue())
assert.Equal(t, Params.GracefulTime.GetAsInt64(), int64(DefaultGracefulTime))
t.Logf("default grafeful time = %d", Params.GracefulTime.GetAsInt64())