enhance: add param for bloomfilter(#29388) (#29490)

related: #29388

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
pull/29574/head
MrPresent-Han 2023-12-28 18:10:46 +08:00 committed by GitHub
parent 514e279f3a
commit ed644983e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 48 additions and 22 deletions

View File

@ -581,6 +581,8 @@ common:
warn: 1000 # minimum milliseconds for printing durations in warn level
ttMsgEnabled: true # Whether the instance disable sending ts messages
traceLogMode: 0 # trace request info, 0: none, 1: simple request info, like collection/partition/database name, 2: request detail
bloomFilterSize: 100000
maxBloomFalsePositive: 0.05
# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:

View File

@ -23,6 +23,7 @@ import (
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type BloomFilterSet struct {
@ -58,7 +59,8 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
if bfs.current == nil {
bfs.current = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
}

View File

@ -23,6 +23,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type BloomFilterSetSuite struct {
@ -31,6 +32,7 @@ type BloomFilterSetSuite struct {
}
func (s *BloomFilterSetSuite) SetupTest() {
paramtable.Init()
s.bfs = NewBloomFilterSet()
}
@ -56,7 +58,6 @@ func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData {
func (s *BloomFilterSetSuite) TestWriteRead() {
ids := []int64{1, 2, 3, 4, 5}
for _, id := range ids {
s.False(s.bfs.PkExists(storage.NewInt64PrimaryKey(id)), "pk shall not exist before update")
}

View File

@ -246,7 +246,8 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
pks := &storage.PkStatistics{
PkFilter: bf,
}
@ -428,7 +429,8 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
pks := &storage.PkStatistics{
PkFilter: bf,
}
@ -583,7 +585,8 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
pks := &storage.PkStatistics{
PkFilter: bf,
}
@ -779,7 +782,8 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
bf := bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive)
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
pks := &storage.PkStatistics{
PkFilter: bf,
}

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ Candidate = (*BloomFilterSet)(nil)
@ -80,7 +81,8 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) {
if s.currentStat == nil {
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
}
@ -115,7 +117,8 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) {
func (s *BloomFilterSet) initCurrentStat() {
if s.currentStat == nil {
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
}
}

View File

@ -26,6 +26,7 @@ import (
storage "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type bloomFilterSet struct {
@ -94,6 +95,7 @@ func (s *bloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) {
// Note: invoker shall acquire statsMutex lock first.
func (s *bloomFilterSet) initCurrentStat() {
s.currentStat = &storage.PkStatistics{
PkFilter: bloom.NewWithEstimates(storage.BloomFilterSize, storage.MaxBloomFalsePositive),
PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type BloomFilterSetSuite struct {
@ -35,6 +36,7 @@ type BloomFilterSetSuite struct {
func (suite *BloomFilterSetSuite) SetupTest() {
suite.intPks = []int64{1, 2, 3}
suite.stringPks = []string{"1", "2", "3"}
paramtable.Init()
suite.set = newBloomFilterSet()
}

View File

@ -26,12 +26,7 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
)
const (
// TODO silverxia maybe need set from config
BloomFilterSize uint = 100000
MaxBloomFalsePositive float64 = 0.005
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// PrimaryKeyStats contains statistics data for pk column
@ -197,7 +192,7 @@ func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error)
return &PrimaryKeyStats{
FieldID: fieldID,
PkType: pkType,
BF: bloom.NewWithEstimates(uint(rowNum), MaxBloomFalsePositive),
BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}, nil
}
@ -236,7 +231,7 @@ func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, m
stats := &PrimaryKeyStats{
FieldID: fieldID,
PkType: int64(pkType),
BF: bloom.NewWithEstimates(uint(msgs.RowNum()), MaxBloomFalsePositive),
BF: bloom.NewWithEstimates(uint(msgs.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
}
stats.UpdateByMsgs(msgs)

View File

@ -52,9 +52,5 @@ func Init(opts ...Option) (*Manager, error) {
}
func formatKey(key string) string {
ret := strings.ToLower(key)
ret = strings.ReplaceAll(ret, "/", "")
ret = strings.ReplaceAll(ret, "_", "")
ret = strings.ReplaceAll(ret, ".", "")
return ret
return strings.NewReplacer("/", "", "_", "", ".", "").Replace(strings.ToLower(key))
}

View File

@ -230,6 +230,9 @@ type commonConfig struct {
EnableStorageV2 ParamItem `refreshable:"false"`
TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"`
BloomFilterSize ParamItem `refreshable:"true"`
MaxBloomFalsePositive ParamItem `refreshable:"true"`
}
func (p *commonConfig) init(base *BaseTable) {
@ -672,6 +675,22 @@ like the old password verification when updating the credential`,
Doc: "trace request info",
}
p.TraceLogMode.Init(base.mgr)
p.BloomFilterSize = ParamItem{
Key: "common.bloomFilterSize",
Version: "2.3.2",
DefaultValue: "100000",
Doc: "bloom filter initial size",
}
p.BloomFilterSize.Init(base.mgr)
p.MaxBloomFalsePositive = ParamItem{
Key: "common.maxBloomFalsePositive",
Version: "2.3.2",
DefaultValue: "0.05",
Doc: "max false positive rate for bloom filter",
}
p.MaxBloomFalsePositive.Init(base.mgr)
}
type gpuConfig struct {