diff --git a/configs/milvus.yaml b/configs/milvus.yaml index aef9531684..513f8f0706 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -532,6 +532,7 @@ common: SearchCacheBudgetGBRatio: 0.1 LoadNumThreadRatio: 8 BeamWidthRatio: 4 + consistencyLevelUsedInDelete: "Bounded" gracefulTime: 5000 # milliseconds. it represents the interval (in ms) by which the request arrival time needs to be subtracted in the case of Bounded Consistency. gracefulStopTimeout: 1800 # seconds. it will force quit the server if the graceful stop process is not completed during this time. storageType: remote # please adjust in embedded Milvus: local, available values are [local, remote, opendal], value minio is deprecated, use remote instead diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index e8cc06f0d9..6d0a37452b 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -315,7 +315,7 @@ func (dt *deleteTask) getStreamingQueryAndDelteFunc(stream msgstream.MsgStream, PartitionIDs: partitionIDs, SerializedExprPlan: serializedPlan, OutputFieldsId: outputFieldIDs, - GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, commonpb.ConsistencyLevel_Bounded), + GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dt.ts, dt.ts, getConsistencyLevelFromConfig()), }, DmlChannels: channelIDs, Scope: querypb.DataScope_All, diff --git a/internal/proxy/util.go b/internal/proxy/util.go index e54f30b0d7..adb6bee1b2 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -768,6 +768,19 @@ func ReplaceID2Name(oldStr string, id int64, name string) string { return strings.ReplaceAll(oldStr, strconv.FormatInt(id, 10), name) } +func getConsistencyLevelFromConfig() commonpb.ConsistencyLevel { + value := Params.CommonCfg.ConsistencyLevelUsedInDelete.GetValue() + trimed := strings.TrimSpace(value) + lowered := strings.ToLower(trimed) + for consistencyLevel := range commonpb.ConsistencyLevel_value { + if lowered == strings.ToLower(consistencyLevel) { + return commonpb.ConsistencyLevel(commonpb.ConsistencyLevel_value[consistencyLevel]) + } + } + // not found, use default. + return paramtable.DefaultConsistencyLevelUsedInDelete +} + func parseGuaranteeTsFromConsistency(ts, tMax typeutil.Timestamp, consistency commonpb.ConsistencyLevel) typeutil.Timestamp { switch consistency { case commonpb.ConsistencyLevel_Strong: diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 1bbc147f4d..73e7a13fda 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -2087,3 +2087,17 @@ func TestSendReplicateMessagePack(t *testing.T) { SendReplicateMessagePack(ctx, mockStream, &milvuspb.DropIndexRequest{}) }) } + +func Test_getConsistencyLevelFromConfig(t *testing.T) { + paramtable.Init() + original := Params.CommonCfg.ConsistencyLevelUsedInDelete.GetValue() + defer func() { + Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue(original) + }() + for consistencyLevel := range commonpb.ConsistencyLevel_value { + Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue(consistencyLevel) + assert.Equal(t, commonpb.ConsistencyLevel(commonpb.ConsistencyLevel_value[consistencyLevel]), getConsistencyLevelFromConfig()) + } + Params.CommonCfg.ConsistencyLevelUsedInDelete.SwapTempValue("invalid") + assert.Equal(t, paramtable.DefaultConsistencyLevelUsedInDelete, getConsistencyLevelFromConfig()) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 824e58d11f..34d988df16 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -22,6 +22,7 @@ import ( "github.com/shirou/gopsutil/v3/disk" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/pkg/config" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -31,6 +32,7 @@ import ( const ( // DefaultIndexSliceSize defines the default slice size of index file when serializing. DefaultIndexSliceSize = 16 + DefaultConsistencyLevelUsedInDelete = commonpb.ConsistencyLevel_Bounded DefaultGracefulTime = 5000 // ms DefaultGracefulStopTimeout = 1800 // s DefaultHighPriorityThreadCoreCoefficient = 10 @@ -195,6 +197,7 @@ type commonConfig struct { SearchCacheBudgetGBRatio ParamItem `refreshable:"true"` LoadNumThreadRatio ParamItem `refreshable:"true"` BeamWidthRatio ParamItem `refreshable:"true"` + ConsistencyLevelUsedInDelete ParamItem `refreshable:"true"` GracefulTime ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` @@ -465,6 +468,15 @@ This configuration is only used by querynode and indexnode, it selects CPU instr } p.BeamWidthRatio.Init(base.mgr) + p.ConsistencyLevelUsedInDelete = ParamItem{ + Key: "common.consistencyLevelUsedInDelete", + Version: "2.0.0", + DefaultValue: DefaultConsistencyLevelUsedInDelete.String(), + Doc: "Consistency level used in delete by expression", + Export: true, + } + p.ConsistencyLevelUsedInDelete.Init(base.mgr) + p.GracefulTime = ParamItem{ Key: "common.gracefulTime", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 7e4ebae114..bf9e41399a 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -53,6 +53,9 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, Params.IndexSliceSize.GetAsInt64(), int64(DefaultIndexSliceSize)) t.Logf("knowhere index slice size = %d", Params.IndexSliceSize.GetAsInt64()) + assert.Equal(t, Params.ConsistencyLevelUsedInDelete.GetValue(), DefaultConsistencyLevelUsedInDelete.String()) + t.Logf("default ConsistencyLevelUsedInDelete = %s", Params.ConsistencyLevelUsedInDelete.GetValue()) + assert.Equal(t, Params.GracefulTime.GetAsInt64(), int64(DefaultGracefulTime)) t.Logf("default grafeful time = %d", Params.GracefulTime.GetAsInt64())