diff --git a/go.mod b/go.mod index 281be7e1e9..8868f5d491 100644 --- a/go.mod +++ b/go.mod @@ -65,11 +65,12 @@ require ( require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 require ( + github.com/greatroar/blobloom v0.8.0 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 github.com/valyala/fastjson v1.6.4 - google.golang.org/protobuf v1.33.0 + github.com/zeebo/xxh3 v1.0.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -208,7 +209,6 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect - github.com/zeebo/xxh3 v1.0.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect @@ -235,6 +235,7 @@ require ( google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect @@ -256,3 +257,5 @@ replace ( ) exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b + +replace github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 diff --git a/go.sum b/go.sum index 238d640d1a..aa61435339 100644 --- a/go.sum +++ b/go.sum @@ -603,6 +603,8 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEHKbbv51OwNGO82U+d6ut08ppTmZVm+VY= +github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.4.5 h1:83SZKox70jyABAPY7HdbvrwI3cmgwzyuc7dO4dYmoC0= diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 194345232f..8e2170fe0b 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -19,10 +19,10 @@ package metacache import ( "sync" - "github.com/bits-and-blooms/bloom/v3" "github.com/samber/lo" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -106,8 +106,9 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { if bfs.current == nil { bfs.current = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(bfs.batchSize, - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + PkFilter: bloomfilter.NewBloomFilterWithType(bfs.batchSize, + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } } diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go index a670982ea6..3522462784 100644 --- a/internal/datanode/syncmgr/storage_serializer.go +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -214,6 +214,7 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B FieldID: s.pkField.GetFieldID(), MaxPk: pks.MaxPK, MinPk: pks.MinPK, + BFType: pks.PkFilter.Type(), BF: pks.PkFilter, PkType: int64(s.pkField.GetDataType()), } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index eec6481b10..cc5591e269 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -25,7 +25,6 @@ import ( "testing" "time" - bloom "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -42,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -281,8 +281,9 @@ func (s *DelegatorDataSuite) TestProcessDelete() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -537,8 +538,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -695,8 +698,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -896,8 +901,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } diff --git a/internal/querynodev2/pkoracle/bloom_filter_set.go b/internal/querynodev2/pkoracle/bloom_filter_set.go index d94a4008cc..3cf6a3f513 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -19,12 +19,12 @@ package pkoracle import ( "sync" - bloom "github.com/bits-and-blooms/bloom/v3" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -40,8 +40,6 @@ type BloomFilterSet struct { segType commonpb.SegmentState currentStat *storage.PkStatistics historyStats []*storage.PkStatistics - - kHashFunc uint } // MayPkExist returns whether any bloom filters returns positive. @@ -97,11 +95,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates( - paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), - ), + PkFilter: bf, } } @@ -128,9 +127,6 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { s.statsMutex.Lock() defer s.statsMutex.Unlock() - if stats.PkFilter.K() > s.kHashFunc { - s.kHashFunc = stats.PkFilter.K() - } s.historyStats = append(s.historyStats, stats) } diff --git a/internal/querynodev2/segments/bloom_filter_set.go b/internal/querynodev2/segments/bloom_filter_set.go deleted file mode 100644 index b07713961c..0000000000 --- a/internal/querynodev2/segments/bloom_filter_set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "sync" - - bloom "github.com/bits-and-blooms/bloom/v3" - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - storage "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type bloomFilterSet struct { - statsMutex sync.RWMutex - currentStat *storage.PkStatistics - historyStats []*storage.PkStatistics -} - -func newBloomFilterSet() *bloomFilterSet { - return &bloomFilterSet{} -} - -// MayPkExist returns whether any bloom filters returns positive. -func (s *bloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { - s.statsMutex.RLock() - defer s.statsMutex.RUnlock() - if s.currentStat != nil && s.currentStat.PkExist(pk) { - return true - } - - // for sealed, if one of the stats shows it exist, then we have to check it - for _, historyStat := range s.historyStats { - if historyStat.PkExist(pk) { - return true - } - } - return false -} - -// UpdateBloomFilter updates currentStats with provided pks. -func (s *bloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { - s.statsMutex.Lock() - defer s.statsMutex.Unlock() - - if s.currentStat == nil { - s.initCurrentStat() - } - - buf := make([]byte, 8) - for _, pk := range pks { - s.currentStat.UpdateMinMax(pk) - switch pk.Type() { - case schemapb.DataType_Int64: - int64Value := pk.(*storage.Int64PrimaryKey).Value - common.Endian.PutUint64(buf, uint64(int64Value)) - s.currentStat.PkFilter.Add(buf) - case schemapb.DataType_VarChar: - stringValue := pk.(*storage.VarCharPrimaryKey).Value - s.currentStat.PkFilter.AddString(stringValue) - default: - log.Error("failed to update bloomfilter", zap.Any("PK type", pk.Type())) - panic("failed to update bloomfilter") - } - } -} - -// AddHistoricalStats add loaded historical stats. -func (s *bloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { - s.statsMutex.Lock() - defer s.statsMutex.Unlock() - - s.historyStats = append(s.historyStats, stats) -} - -// initCurrentStat initialize currentStats if nil. -// Note: invoker shall acquire statsMutex lock first. -func (s *bloomFilterSet) initCurrentStat() { - s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), - } -} diff --git a/internal/querynodev2/segments/bloom_filter_set_test.go b/internal/querynodev2/segments/bloom_filter_set_test.go deleted file mode 100644 index 9bf95a1ff9..0000000000 --- a/internal/querynodev2/segments/bloom_filter_set_test.go +++ /dev/null @@ -1,91 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type BloomFilterSetSuite struct { - suite.Suite - - intPks []int64 - stringPks []string - set *bloomFilterSet -} - -func (suite *BloomFilterSetSuite) SetupTest() { - suite.intPks = []int64{1, 2, 3} - suite.stringPks = []string{"1", "2", "3"} - paramtable.Init() - suite.set = newBloomFilterSet() -} - -func (suite *BloomFilterSetSuite) TestInt64PkBloomFilter() { - pks, err := storage.GenInt64PrimaryKeys(suite.intPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func (suite *BloomFilterSetSuite) TestStringPkBloomFilter() { - pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func (suite *BloomFilterSetSuite) TestHistoricalBloomFilter() { - pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } - - old := suite.set.currentStat - suite.set.currentStat = nil - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.False(exist) - } - - suite.set.AddHistoricalStats(old) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func TestBloomFilterSet(t *testing.T) { - suite.Run(t, &BloomFilterSetSuite{}) -} diff --git a/internal/storage/field_stats.go b/internal/storage/field_stats.go index d6fad764c3..fbb6757ac1 100644 --- a/internal/storage/field_stats.go +++ b/internal/storage/field_stats.go @@ -20,10 +20,12 @@ import ( "encoding/json" "fmt" - "github.com/bits-and-blooms/bloom/v3" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -31,12 +33,13 @@ import ( // FieldStats contains statistics data for any column // todo: compatible to PrimaryKeyStats type FieldStats struct { - FieldID int64 `json:"fieldID"` - Type schemapb.DataType `json:"type"` - Max ScalarFieldValue `json:"max"` // for scalar field - Min ScalarFieldValue `json:"min"` // for scalar field - BF *bloom.BloomFilter `json:"bf"` // for scalar field - Centroids []VectorFieldValue `json:"centroids"` // for vector field + FieldID int64 `json:"fieldID"` + Type schemapb.DataType `json:"type"` + Max ScalarFieldValue `json:"max"` // for scalar field + Min ScalarFieldValue `json:"min"` // for scalar field + BFType bloomfilter.BFType `json:"bfType"` // for scalar field + BF bloomfilter.BloomFilterInterface `json:"bf"` // for scalar field + Centroids []VectorFieldValue `json:"centroids"` // for vector field } func (stats *FieldStats) Clone() FieldStats { @@ -152,12 +155,22 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error { } } - if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { - stats.BF = &bloom.BloomFilter{} - err = stats.BF.UnmarshalJSON(*bfMessage) + bfType := bloomfilter.BasicBF + if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil { + err := json.Unmarshal(*bfTypeMessage, &bfType) if err != nil { return err } + stats.BFType = bfType + } + + if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { + bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType) + if err != nil { + log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err)) + bf = bloomfilter.AlwaysTrueBloomFilter + } + stats.BF = bf } } else { stats.initCentroids(data, stats.Type) @@ -172,12 +185,12 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error { func (stats *FieldStats) initCentroids(data []byte, dataType schemapb.DataType) { type FieldStatsAux struct { - FieldID int64 `json:"fieldID"` - Type schemapb.DataType `json:"type"` - Max json.RawMessage `json:"max"` - Min json.RawMessage `json:"min"` - BF *bloom.BloomFilter `json:"bf"` - Centroids []json.RawMessage `json:"centroids"` + FieldID int64 `json:"fieldID"` + Type schemapb.DataType `json:"type"` + Max json.RawMessage `json:"max"` + Min json.RawMessage `json:"min"` + BF bloomfilter.BloomFilterInterface `json:"bf"` + Centroids []json.RawMessage `json:"centroids"` } // Unmarshal JSON into the auxiliary struct var aux FieldStatsAux @@ -372,10 +385,15 @@ func NewFieldStats(fieldID int64, pkType schemapb.DataType, rowNum int64) (*Fiel Type: pkType, }, nil } + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() return &FieldStats{ FieldID: fieldID, Type: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), }, nil } @@ -402,11 +420,17 @@ func (sw *FieldStatsWriter) GenerateList(stats []*FieldStats) error { // GenerateByData writes data from @msgs with @fieldID to @buffer func (sw *FieldStatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs ...FieldData) error { statsList := make([]*FieldStats, 0) + + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() for _, msg := range msgs { stats := &FieldStats{ FieldID: fieldID, Type: pkType, - BF: bloom.NewWithEstimates(uint(msg.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(msg.RowNum()), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), } stats.UpdateByMsgs(msg) diff --git a/internal/storage/field_stats_test.go b/internal/storage/field_stats_test.go index d58ffc1bf0..ba1b71c3ef 100644 --- a/internal/storage/field_stats_test.go +++ b/internal/storage/field_stats_test.go @@ -20,12 +20,13 @@ import ( "encoding/json" "testing" - "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestFieldStatsUpdate(t *testing.T) { @@ -373,7 +374,7 @@ func TestFieldStatsWriter_UpgradePrimaryKey(t *testing.T) { FieldID: common.RowIDField, Min: 1, Max: 9, - BF: bloom.NewWithEstimates(100000, 0.05), + BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } b := make([]byte, 8) @@ -574,8 +575,9 @@ func TestFieldStatsUnMarshal(t *testing.T) { assert.Error(t, err) err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": \"b\"}")) assert.Error(t, err) + // return AlwaysTrueBloomFilter when deserialize bloom filter failed. err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": 1, \"bf\": \"2\"}")) - assert.Error(t, err) + assert.NoError(t, err) }) t.Run("succeed", func(t *testing.T) { diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index f1e15f0d3d..35649ae46f 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -19,19 +19,19 @@ package storage import ( "fmt" - "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" ) // pkStatistics contains pk field statistic information type PkStatistics struct { - PkFilter *bloom.BloomFilter // bloom filter of pk inside a segment - MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment - MaxPK PrimaryKey // maximal pk value, same above + PkFilter bloomfilter.BloomFilterInterface // bloom filter of pk inside a segment + MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment + MaxPK PrimaryKey // maximal pk value, same above } // update set pk min/max value if input value is beyond former range. @@ -110,16 +110,16 @@ func (st *PkStatistics) PkExist(pk PrimaryKey) bool { } // Locations returns a list of hash locations representing a data item. -func Locations(pk PrimaryKey, k uint) []uint64 { +func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64 { switch pk.Type() { case schemapb.DataType_Int64: buf := make([]byte, 8) int64Pk := pk.(*Int64PrimaryKey) common.Endian.PutUint64(buf, uint64(int64Pk.Value)) - return bloom.Locations(buf, k) + return bloomfilter.Locations(buf, k, bfType) case schemapb.DataType_VarChar: varCharPk := pk.(*VarCharPrimaryKey) - return bloom.Locations([]byte(varCharPk.Value), k) + return bloomfilter.Locations([]byte(varCharPk.Value), k, bfType) default: // TODO:: } @@ -133,7 +133,7 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { } // check bf first, TestLocation just do some bitset compute, cost is cheaper - if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K())) { + if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K(), st.PkFilter.Type())) { return false } @@ -148,13 +148,15 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo } // check bf first, TestLocation just do some bitset compute, cost is cheaper - locations := lc.Locations(st.PkFilter.K()) + locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type()) + ret := st.PkFilter.BatchTestLocations(locations, hits) + + // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, + // hits array will be removed after we merge bf in segment pks := lc.PKs() - for i := range pks { - // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, - // hits array will be removed after we merge bf in segment + for i := range ret { if !hits[i] { - hits[i] = st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) + hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) } } @@ -164,19 +166,31 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo // LocationsCache is a helper struct caching pk bloom filter locations. // Note that this helper is not concurrent safe and shall be used in same goroutine. type LocationsCache struct { - pk PrimaryKey - locations []uint64 + pk PrimaryKey + basicBFLocations []uint64 + blockBFLocations []uint64 } func (lc *LocationsCache) GetPk() PrimaryKey { return lc.pk } -func (lc *LocationsCache) Locations(k uint) []uint64 { - if int(k) > len(lc.locations) { - lc.locations = Locations(lc.pk, k) +func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64 { + switch bfType { + case bloomfilter.BasicBF: + if int(k) > len(lc.basicBFLocations) { + lc.basicBFLocations = Locations(lc.pk, k, bfType) + } + return lc.basicBFLocations[:k] + case bloomfilter.BlockedBF: + // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value + if len(lc.blockBFLocations) != 1 { + lc.blockBFLocations = Locations(lc.pk, 1, bfType) + } + return lc.blockBFLocations + default: + return nil } - return lc.locations[:k] } func NewLocationsCache(pk PrimaryKey) *LocationsCache { @@ -189,7 +203,11 @@ type BatchLocationsCache struct { pks []PrimaryKey k uint - locations [][]uint64 + // for block bf + blockLocations [][]uint64 + + // for basic bf + basicLocations [][]uint64 } func (lc *BatchLocationsCache) PKs() []PrimaryKey { @@ -200,15 +218,29 @@ func (lc *BatchLocationsCache) Size() int { return len(lc.pks) } -func (lc *BatchLocationsCache) Locations(k uint) [][]uint64 { - if k > lc.k { - lc.k = k - lc.locations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { - return Locations(pk, lc.k) - }) - } +func (lc *BatchLocationsCache) Locations(k uint, bfType bloomfilter.BFType) [][]uint64 { + switch bfType { + case bloomfilter.BasicBF: + if k > lc.k { + lc.k = k + lc.basicLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k, bfType) + }) + } - return lc.locations + return lc.basicLocations + case bloomfilter.BlockedBF: + // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value + if len(lc.blockLocations) != len(lc.pks) { + lc.blockLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k, bfType) + }) + } + + return lc.blockLocations + default: + return nil + } } func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache { diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 7914e04b80..75da19ab5e 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -20,9 +20,10 @@ import ( "encoding/json" "fmt" - "github.com/bits-and-blooms/bloom/v3" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -31,13 +32,14 @@ import ( // PrimaryKeyStats contains statistics data for pk column type PrimaryKeyStats struct { - FieldID int64 `json:"fieldID"` - Max int64 `json:"max"` // useless, will delete - Min int64 `json:"min"` // useless, will delete - BF *bloom.BloomFilter `json:"bf"` - PkType int64 `json:"pkType"` - MaxPk PrimaryKey `json:"maxPk"` - MinPk PrimaryKey `json:"minPk"` + FieldID int64 `json:"fieldID"` + Max int64 `json:"max"` // useless, will delete + Min int64 `json:"min"` // useless, will delete + BFType bloomfilter.BFType `json:"bfType"` + BF bloomfilter.BloomFilterInterface `json:"bf"` + PkType int64 `json:"pkType"` + MaxPk PrimaryKey `json:"maxPk"` + MinPk PrimaryKey `json:"minPk"` } // UnmarshalJSON unmarshal bytes to PrimaryKeyStats @@ -110,12 +112,22 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error { } } - if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { - stats.BF = &bloom.BloomFilter{} - err = stats.BF.UnmarshalJSON(*bfMessage) + bfType := bloomfilter.BasicBF + if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil { + err := json.Unmarshal(*bfTypeMessage, &bfType) if err != nil { return err } + stats.BFType = bfType + } + + if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { + bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType) + if err != nil { + log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err)) + bf = bloomfilter.AlwaysTrueBloomFilter + } + stats.BF = bf } return nil @@ -189,10 +201,16 @@ func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) if rowNum <= 0 { return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum) } + + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() return &PrimaryKeyStats{ FieldID: fieldID, PkType: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), }, nil } @@ -228,10 +246,15 @@ func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error { // GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error { + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() stats := &PrimaryKeyStats{ FieldID: fieldID, PkType: int64(pkType), - BF: bloom.NewWithEstimates(uint(msgs.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(msgs.RowNum()), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), } stats.UpdateByMsgs(msgs) diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go index 709f49697f..cccd3d9f9e 100644 --- a/internal/storage/stats_test.go +++ b/internal/storage/stats_test.go @@ -20,12 +20,13 @@ import ( "encoding/json" "testing" - "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestStatsWriter_Int64PrimaryKey(t *testing.T) { @@ -124,11 +125,13 @@ func TestStatsWriter_UpgradePrimaryKey(t *testing.T) { Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, } + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() stats := &PrimaryKeyStats{ FieldID: common.RowIDField, Min: 1, Max: 9, - BF: bloom.NewWithEstimates(100000, 0.05), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, bfType), } b := make([]byte, 8) @@ -174,3 +177,30 @@ func TestDeserializeEmptyStats(t *testing.T) { _, err := DeserializeStats([]*Blob{blob}) assert.NoError(t, err) } + +func TestMarshalStats(t *testing.T) { + stat, err := NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 100000) + assert.NoError(t, err) + + for i := 0; i < 10000; i++ { + stat.Update(NewInt64PrimaryKey(int64(i))) + } + + sw := &StatsWriter{} + sw.GenerateList([]*PrimaryKeyStats{stat}) + bytes := sw.GetBuffer() + + sr := &StatsReader{} + sr.SetBuffer(bytes) + stat1, err := sr.GetPrimaryKeyStatsList() + assert.NoError(t, err) + assert.Equal(t, 1, len(stat1)) + assert.Equal(t, stat.Min, stat1[0].Min) + assert.Equal(t, stat.Max, stat1[0].Max) + + for i := 0; i < 10000; i++ { + b := make([]byte, 8) + common.Endian.PutUint64(b, uint64(i)) + assert.True(t, stat1[0].BF.Test(b)) + } +} diff --git a/internal/util/bloomfilter/bloom_filter.go b/internal/util/bloomfilter/bloom_filter.go new file mode 100644 index 0000000000..c5ca745c8b --- /dev/null +++ b/internal/util/bloomfilter/bloom_filter.go @@ -0,0 +1,339 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bloomfilter + +import ( + "encoding/json" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/cockroachdb/errors" + "github.com/greatroar/blobloom" + "github.com/pingcap/log" + "github.com/zeebo/xxh3" + "go.uber.org/zap" +) + +type BFType int + +var AlwaysTrueBloomFilter = &alwaysTrueBloomFilter{} + +const ( + UnsupportedBFName = "Unsupported BloomFilter" + BlockBFName = "BlockedBloomFilter" + BasicBFName = "BasicBloomFilter" + AlwaysTrueBFName = "AlwaysTrueBloomFilter" +) + +const ( + UnsupportedBF BFType = iota + 1 + AlwaysTrueBF // empty bloom filter + BasicBF + BlockedBF +) + +var bfNames = map[BFType]string{ + BasicBF: BasicBFName, + BlockedBF: BlockBFName, + AlwaysTrueBF: AlwaysTrueBFName, + UnsupportedBF: UnsupportedBFName, +} + +func (t BFType) String() string { + return bfNames[t] +} + +func BFTypeFromString(name string) BFType { + switch name { + case BasicBFName: + return BasicBF + case BlockBFName: + return BlockedBF + case AlwaysTrueBFName: + return AlwaysTrueBF + default: + return UnsupportedBF + } +} + +type BloomFilterInterface interface { + Type() BFType + Cap() uint + K() uint + Add(data []byte) + AddString(data string) + Test(data []byte) bool + TestString(data string) bool + TestLocations(locs []uint64) bool + BatchTestLocations(locs [][]uint64, hit []bool) []bool + MarshalJSON() ([]byte, error) + UnmarshalJSON(data []byte) error +} + +type basicBloomFilter struct { + inner *bloom.BloomFilter + k uint +} + +func newBasicBloomFilter(capacity uint, fp float64) *basicBloomFilter { + inner := bloom.NewWithEstimates(capacity, fp) + return &basicBloomFilter{ + inner: inner, + k: inner.K(), + } +} + +func (b *basicBloomFilter) Type() BFType { + return BasicBF +} + +func (b *basicBloomFilter) Cap() uint { + return b.inner.Cap() +} + +func (b *basicBloomFilter) K() uint { + return b.k +} + +func (b *basicBloomFilter) Add(data []byte) { + b.inner.Add(data) +} + +func (b *basicBloomFilter) AddString(data string) { + b.inner.AddString(data) +} + +func (b *basicBloomFilter) Test(data []byte) bool { + return b.inner.Test(data) +} + +func (b *basicBloomFilter) TestString(data string) bool { + return b.inner.TestString(data) +} + +func (b *basicBloomFilter) TestLocations(locs []uint64) bool { + return b.inner.TestLocations(locs[:b.k]) +} + +func (b *basicBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := range hits { + if !hits[i] { + if uint(len(locs[i])) < b.k { + ret[i] = true + continue + } + ret[i] = b.inner.TestLocations(locs[i][:b.k]) + } + } + return ret +} + +func (b basicBloomFilter) MarshalJSON() ([]byte, error) { + return b.inner.MarshalJSON() +} + +func (b *basicBloomFilter) UnmarshalJSON(data []byte) error { + inner := &bloom.BloomFilter{} + inner.UnmarshalJSON(data) + b.inner = inner + b.k = inner.K() + return nil +} + +// impl Blocked Bloom filter with blobloom and xxh3 hash +type blockedBloomFilter struct { + inner *blobloom.Filter + k uint +} + +func newBlockedBloomFilter(capacity uint, fp float64) *blockedBloomFilter { + inner := blobloom.NewOptimized(blobloom.Config{ + Capacity: uint64(capacity), + FPRate: fp, + }) + return &blockedBloomFilter{ + inner: inner, + k: inner.K(), + } +} + +func (b *blockedBloomFilter) Type() BFType { + return BlockedBF +} + +func (b *blockedBloomFilter) Cap() uint { + return uint(b.inner.NumBits()) +} + +func (b *blockedBloomFilter) K() uint { + return b.k +} + +func (b *blockedBloomFilter) Add(data []byte) { + loc := xxh3.Hash(data) + b.inner.Add(loc) +} + +func (b *blockedBloomFilter) AddString(data string) { + h := xxh3.HashString(data) + b.inner.Add(h) +} + +func (b *blockedBloomFilter) Test(data []byte) bool { + loc := xxh3.Hash(data) + return b.inner.Has(loc) +} + +func (b *blockedBloomFilter) TestString(data string) bool { + h := xxh3.HashString(data) + return b.inner.Has(h) +} + +func (b *blockedBloomFilter) TestLocations(locs []uint64) bool { + // for block bf, just cache it's hash result as locations + if len(locs) != 1 { + return true + } + return b.inner.Has(locs[0]) +} + +func (b *blockedBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := range hits { + if !hits[i] { + if len(locs[i]) != 1 { + ret[i] = true + continue + } + ret[i] = b.inner.Has(locs[i][0]) + } + } + return ret +} + +func (b blockedBloomFilter) MarshalJSON() ([]byte, error) { + return b.inner.MarshalJSON() +} + +func (b *blockedBloomFilter) UnmarshalJSON(data []byte) error { + inner := &blobloom.Filter{} + inner.UnmarshalJSON(data) + b.inner = inner + b.k = inner.K() + + return nil +} + +// always true bloom filter is used when deserialize stat log failed. +// Notice: add item to empty bloom filter is not permitted. and all Test Func will return false positive. +type alwaysTrueBloomFilter struct{} + +func (b *alwaysTrueBloomFilter) Type() BFType { + return AlwaysTrueBF +} + +func (b *alwaysTrueBloomFilter) Cap() uint { + return 0 +} + +func (b *alwaysTrueBloomFilter) K() uint { + return 0 +} + +func (b *alwaysTrueBloomFilter) Add(data []byte) { +} + +func (b *alwaysTrueBloomFilter) AddString(data string) { +} + +func (b *alwaysTrueBloomFilter) Test(data []byte) bool { + return true +} + +func (b *alwaysTrueBloomFilter) TestString(data string) bool { + return true +} + +func (b *alwaysTrueBloomFilter) TestLocations(locs []uint64) bool { + return true +} + +func (b *alwaysTrueBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := 0; i < len(hits); i++ { + ret[i] = true + } + + return ret +} + +func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) { + return []byte{}, nil +} + +func (b *alwaysTrueBloomFilter) UnmarshalJSON(data []byte) error { + return nil +} + +func NewBloomFilterWithType(capacity uint, fp float64, typeName string) BloomFilterInterface { + bfType := BFTypeFromString(typeName) + switch bfType { + case BlockedBF: + return newBlockedBloomFilter(capacity, fp) + case BasicBF: + return newBasicBloomFilter(capacity, fp) + default: + log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", typeName)) + return newBlockedBloomFilter(capacity, fp) + } +} + +func UnmarshalJSON(data []byte, bfType BFType) (BloomFilterInterface, error) { + switch bfType { + case BlockedBF: + bf := &blockedBloomFilter{} + err := json.Unmarshal(data, bf) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter") + } + return bf, nil + case BasicBF: + bf := &basicBloomFilter{} + err := json.Unmarshal(data, bf) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter") + } + return bf, nil + case AlwaysTrueBF: + return AlwaysTrueBloomFilter, nil + default: + return nil, errors.Errorf("unsupported bloom filter type: %d", bfType) + } +} + +func Locations(data []byte, k uint, bfType BFType) []uint64 { + switch bfType { + case BasicBF: + return bloom.Locations(data, k) + case BlockedBF: + return []uint64{xxh3.Hash(data)} + case AlwaysTrueBF: + return nil + default: + log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", bfType.String())) + return nil + } +} diff --git a/internal/util/bloomfilter/bloom_filter_test.go b/internal/util/bloomfilter/bloom_filter_test.go new file mode 100644 index 0000000000..1c0252638a --- /dev/null +++ b/internal/util/bloomfilter/bloom_filter_test.go @@ -0,0 +1,313 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bloomfilter + +import ( + "fmt" + "testing" + "time" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-storage/go/common/log" +) + +func TestPerformance(t *testing.T) { + capacity := 1000000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", i))) + } + + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + start1 := time.Now() + for _, key := range keys { + bf1.Add(key) + } + log.Info("Block BF construct time", zap.Duration("time", time.Since(start1))) + data, err := bf1.MarshalJSON() + assert.NoError(t, err) + log.Info("Block BF size", zap.Int("size", len(data))) + + start2 := time.Now() + for _, key := range keys { + bf1.Test(key) + } + log.Info("Block BF Test cost", zap.Duration("time", time.Since(start2))) + + bf2 := newBasicBloomFilter(uint(capacity), fpr) + start3 := time.Now() + for _, key := range keys { + bf2.Add(key) + } + log.Info("Basic BF construct time", zap.Duration("time", time.Since(start3))) + data, err = bf2.MarshalJSON() + assert.NoError(t, err) + log.Info("Basic BF size", zap.Int("size", len(data))) + + start4 := time.Now() + for _, key := range keys { + bf2.Test(key) + } + log.Info("Basic BF Test cost", zap.Duration("time", time.Since(start4))) +} + +func TestPerformance_MultiBF(t *testing.T) { + capacity := 100000 + fpr := 0.001 + + testKeySize := 100000 + testKeys := make([][]byte, 0) + for i := 0; i < testKeySize; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + bfNum := 100 + bfs1 := make([]*blockedBloomFilter, 0) + start1 := time.Now() + for i := 0; i < bfNum; i++ { + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf1.Add([]byte(key)) + } + bfs1 = append(bfs1, bf1) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BlockedBF) + for i := 0; i < bfNum; i++ { + bfs1[i].TestLocations(locations) + } + } + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + bfs2 := make([]*basicBloomFilter, 0) + start1 = time.Now() + for i := 0; i < bfNum; i++ { + bf2 := newBasicBloomFilter(uint(capacity), fpr) + for _, key := range testKeys { + bf2.Add(key) + } + bfs2 = append(bfs2, bf2) + } + + log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 = time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BasicBF) + for i := 0; i < bfNum; i++ { + bfs2[i].TestLocations(locations) + } + } + log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) +} + +func TestPerformance_BatchTestLocations(t *testing.T) { + capacity := 100000 + fpr := 0.001 + + testKeySize := 100000 + testKeys := make([][]byte, 0) + for i := 0; i < testKeySize; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + batchSize := 1000 + + bfNum := 100 + bfs1 := make([]BloomFilterInterface, 0) + start1 := time.Now() + for i := 0; i < bfNum; i++ { + bf1 := NewBloomFilterWithType(uint(capacity), fpr, BlockBFName) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf1.Add([]byte(key)) + } + bfs1 = append(bfs1, bf1) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BlockedBF) + for i := 0; i < bfNum; i++ { + bfs1[i].TestLocations(locations) + } + } + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + start3 = time.Now() + for i := 0; i < testKeySize; i += batchSize { + endIdx := i + batchSize + if endIdx > testKeySize { + endIdx = testKeySize + } + locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 { + return Locations(key, bfs1[0].K(), BlockedBF) + }) + hits := make([]bool, batchSize) + for j := 0; j < bfNum; j++ { + bfs1[j].BatchTestLocations(locations, hits) + } + } + log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3))) + + bfs2 := make([]BloomFilterInterface, 0) + start1 = time.Now() + for i := 0; i < bfNum; i++ { + bf2 := NewBloomFilterWithType(uint(capacity), fpr, BasicBFName) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf2.Add([]byte(key)) + } + bfs2 = append(bfs2, bf2) + } + + log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 = time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs2[0].K(), BasicBF) + for i := 0; i < bfNum; i++ { + bfs2[i].TestLocations(locations) + } + } + log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + start3 = time.Now() + for i := 0; i < testKeySize; i += batchSize { + endIdx := i + batchSize + if endIdx > testKeySize { + endIdx = testKeySize + } + locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 { + return Locations(key, bfs2[0].K(), BasicBF) + }) + hits := make([]bool, batchSize) + for j := 0; j < bfNum; j++ { + bfs2[j].BatchTestLocations(locations, hits) + } + } + log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3))) +} + +func TestPerformance_Capacity(t *testing.T) { + fpr := 0.001 + + for _, capacity := range []int64{100, 1000, 10000, 100000, 1000000} { + keys := make([][]byte, 0) + for i := 0; i < int(capacity); i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + start1 := time.Now() + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + bf1.Add(key) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + testKeys := make([][]byte, 0) + for i := 0; i < 10000; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bf1.K(), bf1.Type()) + bf1.TestLocations(locations) + } + _, k := bloom.EstimateParameters(uint(capacity), fpr) + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)), zap.Int("k", int(k)), zap.Int64("capacity", capacity)) + } +} + +func TestMarshal(t *testing.T) { + capacity := 200000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", i))) + } + + // test basic bf + basicBF := newBasicBloomFilter(uint(capacity), fpr) + for _, key := range keys { + basicBF.Add(key) + } + data, err := basicBF.MarshalJSON() + assert.NoError(t, err) + basicBF2, err := UnmarshalJSON(data, BasicBF) + assert.NoError(t, err) + assert.Equal(t, basicBF.Type(), basicBF2.Type()) + + for _, key := range keys { + assert.True(t, basicBF2.Test(key)) + } + + // test block bf + blockBF := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + blockBF.Add(key) + } + data, err = blockBF.MarshalJSON() + assert.NoError(t, err) + blockBF2, err := UnmarshalJSON(data, BlockedBF) + assert.NoError(t, err) + assert.Equal(t, blockBF.Type(), blockBF.Type()) + for _, key := range keys { + assert.True(t, blockBF2.Test(key)) + } + + // test compatible with bits-and-blooms/bloom + bf := bloom.NewWithEstimates(uint(capacity), fpr) + for _, key := range keys { + bf.Add(key) + } + data, err = bf.MarshalJSON() + assert.NoError(t, err) + bf2, err := UnmarshalJSON(data, BasicBF) + assert.NoError(t, err) + for _, key := range keys { + assert.True(t, bf2.Test(key)) + } + + // test empty bloom filter + emptyBF := AlwaysTrueBloomFilter + for _, key := range keys { + bf.Add(key) + } + data, err = emptyBF.MarshalJSON() + assert.NoError(t, err) + emptyBF2, err := UnmarshalJSON(data, AlwaysTrueBF) + assert.NoError(t, err) + for _, key := range keys { + assert.True(t, emptyBF2.Test(key)) + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c1db39dbd6..fdce4ef9cf 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -244,6 +244,7 @@ type commonConfig struct { TraceLogMode ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"` MaxBloomFalsePositive ParamItem `refreshable:"true"` + BloomFilterType ParamItem `refreshable:"true"` BloomFilterApplyBatchSize ParamItem `refreshable:"true"` PanicWhenPluginFail ParamItem `refreshable:"false"` @@ -730,6 +731,15 @@ like the old password verification when updating the credential`, } p.BloomFilterSize.Init(base.mgr) + p.BloomFilterType = ParamItem{ + Key: "common.bloomFilterType", + Version: "2.4.3", + DefaultValue: "BasicBloomFilter", + Doc: "bloom filter type, support BasicBloomFilter and BlockedBloomFilter", + Export: true, + } + p.BloomFilterType.Init(base.mgr) + p.MaxBloomFalsePositive = ParamItem{ Key: "common.maxBloomFalsePositive", Version: "2.3.2", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 8968d3ab3b..672aea18d0 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -573,6 +573,7 @@ func TestCachedParam(t *testing.T) { assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) + assert.Equal(t, "BasicBloomFilter", params.CommonCfg.BloomFilterType.GetValue()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) diff --git a/tests/integration/bloomfilter/bloom_filter_test.go b/tests/integration/bloomfilter/bloom_filter_test.go new file mode 100644 index 0000000000..4ee4437afb --- /dev/null +++ b/tests/integration/bloomfilter/bloom_filter_test.go @@ -0,0 +1,196 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type BloomFilterTestSuit struct { + integration.MiniClusterSuite +} + +func (s *BloomFilterTestSuit) SetupSuite() { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") + + // disable compaction + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") + + s.Require().NoError(s.SetupEmbedEtcd()) +} + +func (s *BloomFilterTestSuit) TearDownSuite() { + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) + s.MiniClusterSuite.TearDownSuite() +} + +func (s *BloomFilterTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const ( + dim = 128 + dbName = "" + ) + + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: int32(channelNum), + }) + s.NoError(err) + s.True(merr.Ok(createCollectionStatus)) + + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.True(merr.Ok(showCollectionsResp.Status)) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + for i := 0; i < segmentNum; i++ { + // change bf type in real time + if i%2 == 0 { + paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BasicBloomFilter") + } else { + paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BlockedBloomFilter") + } + + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, segmentRowNum, dim) + hashKeys := integration.GenerateHashKeys(segmentRowNum) + insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(segmentRowNum), + }) + s.NoError(err) + s.True(merr.Ok(insertResult.Status)) + + if segmentDeleteNum > 0 { + if segmentDeleteNum > segmentRowNum { + segmentDeleteNum = segmentRowNum + } + + pks := insertResult.GetIDs().GetIntId().GetData()[:segmentDeleteNum] + log.Info("========================delete expr==================", + zap.Int("length of pk", len(pks)), + ) + + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + } + + // flush + flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + } + + // create index + createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), + }) + s.NoError(err) + s.True(merr.Ok(createIndexStatus)) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + for i := 1; i < replica; i++ { + s.Cluster.AddQueryNode() + } + + // load + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: int32(replica), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.True(merr.Ok(loadStatus)) + s.WaitForLoad(ctx, collectionName) + log.Info("initCollection Done") +} + +func (s *BloomFilterTestSuit) TestLoadAndQuery() { + name := "test_balance_" + funcutil.GenRandomStr() + s.initCollection(name, 1, 2, 10, 2000, 500) + + ctx := context.Background() + queryResult, err := s.Cluster.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: "", + CollectionName: name, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + if !merr.Ok(queryResult.GetStatus()) { + log.Warn("queryResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason())) + } + s.NoError(err) + s.True(merr.Ok(queryResult.GetStatus())) + numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0] + s.Equal(numEntities, int64(15000)) +} + +func TestBloomFilter(t *testing.T) { + suite.Run(t, new(BloomFilterTestSuit)) +}