From d3e94f9861dba3fb775f49b4341662faa86cd256 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 5 Jul 2024 17:04:10 +0800 Subject: [PATCH] enhance: Use Blocked Bloom Filter instead of basic bloom fitler impl (#34377) issue: #32995 pr: #33405 To speed up the construction and querying of Bloom filters, we chose a blocked Bloom filter instead of a basic Bloom filter implementation. WARN: This PR is compatible with old version bf impl, but if fall back to old milvus version, it may causes bloom filter deserialize failed. In single Bloom filter test cases with a capacity of 1,000,000 and a false positive rate (FPR) of 0.001, the blocked Bloom filter is 5 times faster than the basic Bloom filter in both querying and construction, at the cost of a 30% increase in memory usage. Block BF construct time {"time": "54.128131ms"} Block BF size {"size": 3021578} Block BF Test cost {"time": "55.407352ms"} Basic BF construct time {"time": "210.262183ms"} Basic BF size {"size": 2396308} Basic BF Test cost {"time": "192.596229ms"} In multi Bloom filter test cases with a capacity of 100,000, an FPR of 0.001, and 100 Bloom filters, we reuse the primary key locations for all Bloom filters to avoid repeated hash computations. As a result, the blocked Bloom filter is also 5 times faster than the basic Bloom filter in querying. Block BF TestLocation cost {"time": "529.97183ms"} Basic BF TestLocation cost {"time": "3.197430181s"} Signed-off-by: Wei Liu --- go.mod | 7 +- go.sum | 2 + .../datanode/metacache/bloom_filter_set.go | 7 +- .../datanode/syncmgr/storage_serializer.go | 1 + .../delegator/delegator_data_test.go | 25 +- .../querynodev2/pkoracle/bloom_filter_set.go | 16 +- .../querynodev2/segments/bloom_filter_set.go | 101 ------ .../segments/bloom_filter_set_test.go | 91 ----- internal/storage/field_stats.go | 60 +++- internal/storage/field_stats_test.go | 8 +- internal/storage/pk_statistics.go | 88 +++-- internal/storage/stats.go | 49 ++- internal/storage/stats_test.go | 34 +- internal/util/bloomfilter/bloom_filter.go | 339 ++++++++++++++++++ .../util/bloomfilter/bloom_filter_test.go | 313 ++++++++++++++++ pkg/util/paramtable/component_param.go | 10 + pkg/util/paramtable/component_param_test.go | 1 + .../bloomfilter/bloom_filter_test.go | 196 ++++++++++ 18 files changed, 1068 insertions(+), 280 deletions(-) delete mode 100644 internal/querynodev2/segments/bloom_filter_set.go delete mode 100644 internal/querynodev2/segments/bloom_filter_set_test.go create mode 100644 internal/util/bloomfilter/bloom_filter.go create mode 100644 internal/util/bloomfilter/bloom_filter_test.go create mode 100644 tests/integration/bloomfilter/bloom_filter_test.go diff --git a/go.mod b/go.mod index 281be7e1e9..8868f5d491 100644 --- a/go.mod +++ b/go.mod @@ -65,11 +65,12 @@ require ( require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 require ( + github.com/greatroar/blobloom v0.8.0 github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 github.com/valyala/fastjson v1.6.4 - google.golang.org/protobuf v1.33.0 + github.com/zeebo/xxh3 v1.0.2 gopkg.in/yaml.v3 v3.0.1 ) @@ -208,7 +209,6 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect - github.com/zeebo/xxh3 v1.0.2 // indirect go.etcd.io/bbolt v1.3.6 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect go.etcd.io/etcd/client/v2 v2.305.5 // indirect @@ -235,6 +235,7 @@ require ( google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect @@ -256,3 +257,5 @@ replace ( ) exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b + +replace github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 diff --git a/go.sum b/go.sum index 238d640d1a..aa61435339 100644 --- a/go.sum +++ b/go.sum @@ -603,6 +603,8 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= +github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEHKbbv51OwNGO82U+d6ut08ppTmZVm+VY= +github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= github.com/milvus-io/milvus-proto/go-api/v2 v2.4.5 h1:83SZKox70jyABAPY7HdbvrwI3cmgwzyuc7dO4dYmoC0= diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 194345232f..8e2170fe0b 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -19,10 +19,10 @@ package metacache import ( "sync" - "github.com/bits-and-blooms/bloom/v3" "github.com/samber/lo" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -106,8 +106,9 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { if bfs.current == nil { bfs.current = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(bfs.batchSize, - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + PkFilter: bloomfilter.NewBloomFilterWithType(bfs.batchSize, + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } } diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go index a670982ea6..3522462784 100644 --- a/internal/datanode/syncmgr/storage_serializer.go +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -214,6 +214,7 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B FieldID: s.pkField.GetFieldID(), MaxPk: pks.MaxPK, MinPk: pks.MinPK, + BFType: pks.PkFilter.Type(), BF: pks.PkFilter, PkType: int64(s.pkField.GetDataType()), } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index eec6481b10..cc5591e269 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -25,7 +25,6 @@ import ( "testing" "time" - bloom "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" @@ -42,6 +41,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" @@ -281,8 +281,9 @@ func (s *DelegatorDataSuite) TestProcessDelete() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -537,8 +538,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -695,8 +698,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } @@ -896,8 +901,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() { Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet { return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet { bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed) - bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()) + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) pks := &storage.PkStatistics{ PkFilter: bf, } diff --git a/internal/querynodev2/pkoracle/bloom_filter_set.go b/internal/querynodev2/pkoracle/bloom_filter_set.go index d94a4008cc..3cf6a3f513 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -19,12 +19,12 @@ package pkoracle import ( "sync" - bloom "github.com/bits-and-blooms/bloom/v3" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -40,8 +40,6 @@ type BloomFilterSet struct { segType commonpb.SegmentState currentStat *storage.PkStatistics historyStats []*storage.PkStatistics - - kHashFunc uint } // MayPkExist returns whether any bloom filters returns positive. @@ -97,11 +95,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { + bf := bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue()) s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates( - paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), - ), + PkFilter: bf, } } @@ -128,9 +127,6 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { s.statsMutex.Lock() defer s.statsMutex.Unlock() - if stats.PkFilter.K() > s.kHashFunc { - s.kHashFunc = stats.PkFilter.K() - } s.historyStats = append(s.historyStats, stats) } diff --git a/internal/querynodev2/segments/bloom_filter_set.go b/internal/querynodev2/segments/bloom_filter_set.go deleted file mode 100644 index b07713961c..0000000000 --- a/internal/querynodev2/segments/bloom_filter_set.go +++ /dev/null @@ -1,101 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "sync" - - bloom "github.com/bits-and-blooms/bloom/v3" - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" - storage "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type bloomFilterSet struct { - statsMutex sync.RWMutex - currentStat *storage.PkStatistics - historyStats []*storage.PkStatistics -} - -func newBloomFilterSet() *bloomFilterSet { - return &bloomFilterSet{} -} - -// MayPkExist returns whether any bloom filters returns positive. -func (s *bloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool { - s.statsMutex.RLock() - defer s.statsMutex.RUnlock() - if s.currentStat != nil && s.currentStat.PkExist(pk) { - return true - } - - // for sealed, if one of the stats shows it exist, then we have to check it - for _, historyStat := range s.historyStats { - if historyStat.PkExist(pk) { - return true - } - } - return false -} - -// UpdateBloomFilter updates currentStats with provided pks. -func (s *bloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { - s.statsMutex.Lock() - defer s.statsMutex.Unlock() - - if s.currentStat == nil { - s.initCurrentStat() - } - - buf := make([]byte, 8) - for _, pk := range pks { - s.currentStat.UpdateMinMax(pk) - switch pk.Type() { - case schemapb.DataType_Int64: - int64Value := pk.(*storage.Int64PrimaryKey).Value - common.Endian.PutUint64(buf, uint64(int64Value)) - s.currentStat.PkFilter.Add(buf) - case schemapb.DataType_VarChar: - stringValue := pk.(*storage.VarCharPrimaryKey).Value - s.currentStat.PkFilter.AddString(stringValue) - default: - log.Error("failed to update bloomfilter", zap.Any("PK type", pk.Type())) - panic("failed to update bloomfilter") - } - } -} - -// AddHistoricalStats add loaded historical stats. -func (s *bloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) { - s.statsMutex.Lock() - defer s.statsMutex.Unlock() - - s.historyStats = append(s.historyStats, stats) -} - -// initCurrentStat initialize currentStats if nil. -// Note: invoker shall acquire statsMutex lock first. -func (s *bloomFilterSet) initCurrentStat() { - s.currentStat = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), - } -} diff --git a/internal/querynodev2/segments/bloom_filter_set_test.go b/internal/querynodev2/segments/bloom_filter_set_test.go deleted file mode 100644 index 9bf95a1ff9..0000000000 --- a/internal/querynodev2/segments/bloom_filter_set_test.go +++ /dev/null @@ -1,91 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package segments - -import ( - "testing" - - "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/pkg/util/paramtable" -) - -type BloomFilterSetSuite struct { - suite.Suite - - intPks []int64 - stringPks []string - set *bloomFilterSet -} - -func (suite *BloomFilterSetSuite) SetupTest() { - suite.intPks = []int64{1, 2, 3} - suite.stringPks = []string{"1", "2", "3"} - paramtable.Init() - suite.set = newBloomFilterSet() -} - -func (suite *BloomFilterSetSuite) TestInt64PkBloomFilter() { - pks, err := storage.GenInt64PrimaryKeys(suite.intPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func (suite *BloomFilterSetSuite) TestStringPkBloomFilter() { - pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func (suite *BloomFilterSetSuite) TestHistoricalBloomFilter() { - pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...) - suite.NoError(err) - - suite.set.UpdateBloomFilter(pks) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } - - old := suite.set.currentStat - suite.set.currentStat = nil - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.False(exist) - } - - suite.set.AddHistoricalStats(old) - for _, pk := range pks { - exist := suite.set.MayPkExist(pk) - suite.True(exist) - } -} - -func TestBloomFilterSet(t *testing.T) { - suite.Run(t, &BloomFilterSetSuite{}) -} diff --git a/internal/storage/field_stats.go b/internal/storage/field_stats.go index d6fad764c3..fbb6757ac1 100644 --- a/internal/storage/field_stats.go +++ b/internal/storage/field_stats.go @@ -20,10 +20,12 @@ import ( "encoding/json" "fmt" - "github.com/bits-and-blooms/bloom/v3" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -31,12 +33,13 @@ import ( // FieldStats contains statistics data for any column // todo: compatible to PrimaryKeyStats type FieldStats struct { - FieldID int64 `json:"fieldID"` - Type schemapb.DataType `json:"type"` - Max ScalarFieldValue `json:"max"` // for scalar field - Min ScalarFieldValue `json:"min"` // for scalar field - BF *bloom.BloomFilter `json:"bf"` // for scalar field - Centroids []VectorFieldValue `json:"centroids"` // for vector field + FieldID int64 `json:"fieldID"` + Type schemapb.DataType `json:"type"` + Max ScalarFieldValue `json:"max"` // for scalar field + Min ScalarFieldValue `json:"min"` // for scalar field + BFType bloomfilter.BFType `json:"bfType"` // for scalar field + BF bloomfilter.BloomFilterInterface `json:"bf"` // for scalar field + Centroids []VectorFieldValue `json:"centroids"` // for vector field } func (stats *FieldStats) Clone() FieldStats { @@ -152,12 +155,22 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error { } } - if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { - stats.BF = &bloom.BloomFilter{} - err = stats.BF.UnmarshalJSON(*bfMessage) + bfType := bloomfilter.BasicBF + if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil { + err := json.Unmarshal(*bfTypeMessage, &bfType) if err != nil { return err } + stats.BFType = bfType + } + + if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { + bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType) + if err != nil { + log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err)) + bf = bloomfilter.AlwaysTrueBloomFilter + } + stats.BF = bf } } else { stats.initCentroids(data, stats.Type) @@ -172,12 +185,12 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error { func (stats *FieldStats) initCentroids(data []byte, dataType schemapb.DataType) { type FieldStatsAux struct { - FieldID int64 `json:"fieldID"` - Type schemapb.DataType `json:"type"` - Max json.RawMessage `json:"max"` - Min json.RawMessage `json:"min"` - BF *bloom.BloomFilter `json:"bf"` - Centroids []json.RawMessage `json:"centroids"` + FieldID int64 `json:"fieldID"` + Type schemapb.DataType `json:"type"` + Max json.RawMessage `json:"max"` + Min json.RawMessage `json:"min"` + BF bloomfilter.BloomFilterInterface `json:"bf"` + Centroids []json.RawMessage `json:"centroids"` } // Unmarshal JSON into the auxiliary struct var aux FieldStatsAux @@ -372,10 +385,15 @@ func NewFieldStats(fieldID int64, pkType schemapb.DataType, rowNum int64) (*Fiel Type: pkType, }, nil } + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() return &FieldStats{ FieldID: fieldID, Type: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), }, nil } @@ -402,11 +420,17 @@ func (sw *FieldStatsWriter) GenerateList(stats []*FieldStats) error { // GenerateByData writes data from @msgs with @fieldID to @buffer func (sw *FieldStatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs ...FieldData) error { statsList := make([]*FieldStats, 0) + + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() for _, msg := range msgs { stats := &FieldStats{ FieldID: fieldID, Type: pkType, - BF: bloom.NewWithEstimates(uint(msg.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(msg.RowNum()), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), } stats.UpdateByMsgs(msg) diff --git a/internal/storage/field_stats_test.go b/internal/storage/field_stats_test.go index d58ffc1bf0..ba1b71c3ef 100644 --- a/internal/storage/field_stats_test.go +++ b/internal/storage/field_stats_test.go @@ -20,12 +20,13 @@ import ( "encoding/json" "testing" - "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestFieldStatsUpdate(t *testing.T) { @@ -373,7 +374,7 @@ func TestFieldStatsWriter_UpgradePrimaryKey(t *testing.T) { FieldID: common.RowIDField, Min: 1, Max: 9, - BF: bloom.NewWithEstimates(100000, 0.05), + BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, paramtable.Get().CommonCfg.BloomFilterType.GetValue()), } b := make([]byte, 8) @@ -574,8 +575,9 @@ func TestFieldStatsUnMarshal(t *testing.T) { assert.Error(t, err) err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": \"b\"}")) assert.Error(t, err) + // return AlwaysTrueBloomFilter when deserialize bloom filter failed. err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": 1, \"bf\": \"2\"}")) - assert.Error(t, err) + assert.NoError(t, err) }) t.Run("succeed", func(t *testing.T) { diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index f1e15f0d3d..35649ae46f 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -19,19 +19,19 @@ package storage import ( "fmt" - "github.com/bits-and-blooms/bloom/v3" "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" ) // pkStatistics contains pk field statistic information type PkStatistics struct { - PkFilter *bloom.BloomFilter // bloom filter of pk inside a segment - MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment - MaxPK PrimaryKey // maximal pk value, same above + PkFilter bloomfilter.BloomFilterInterface // bloom filter of pk inside a segment + MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment + MaxPK PrimaryKey // maximal pk value, same above } // update set pk min/max value if input value is beyond former range. @@ -110,16 +110,16 @@ func (st *PkStatistics) PkExist(pk PrimaryKey) bool { } // Locations returns a list of hash locations representing a data item. -func Locations(pk PrimaryKey, k uint) []uint64 { +func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64 { switch pk.Type() { case schemapb.DataType_Int64: buf := make([]byte, 8) int64Pk := pk.(*Int64PrimaryKey) common.Endian.PutUint64(buf, uint64(int64Pk.Value)) - return bloom.Locations(buf, k) + return bloomfilter.Locations(buf, k, bfType) case schemapb.DataType_VarChar: varCharPk := pk.(*VarCharPrimaryKey) - return bloom.Locations([]byte(varCharPk.Value), k) + return bloomfilter.Locations([]byte(varCharPk.Value), k, bfType) default: // TODO:: } @@ -133,7 +133,7 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { } // check bf first, TestLocation just do some bitset compute, cost is cheaper - if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K())) { + if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K(), st.PkFilter.Type())) { return false } @@ -148,13 +148,15 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo } // check bf first, TestLocation just do some bitset compute, cost is cheaper - locations := lc.Locations(st.PkFilter.K()) + locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type()) + ret := st.PkFilter.BatchTestLocations(locations, hits) + + // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, + // hits array will be removed after we merge bf in segment pks := lc.PKs() - for i := range pks { - // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, - // hits array will be removed after we merge bf in segment + for i := range ret { if !hits[i] { - hits[i] = st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) + hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) } } @@ -164,19 +166,31 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo // LocationsCache is a helper struct caching pk bloom filter locations. // Note that this helper is not concurrent safe and shall be used in same goroutine. type LocationsCache struct { - pk PrimaryKey - locations []uint64 + pk PrimaryKey + basicBFLocations []uint64 + blockBFLocations []uint64 } func (lc *LocationsCache) GetPk() PrimaryKey { return lc.pk } -func (lc *LocationsCache) Locations(k uint) []uint64 { - if int(k) > len(lc.locations) { - lc.locations = Locations(lc.pk, k) +func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64 { + switch bfType { + case bloomfilter.BasicBF: + if int(k) > len(lc.basicBFLocations) { + lc.basicBFLocations = Locations(lc.pk, k, bfType) + } + return lc.basicBFLocations[:k] + case bloomfilter.BlockedBF: + // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value + if len(lc.blockBFLocations) != 1 { + lc.blockBFLocations = Locations(lc.pk, 1, bfType) + } + return lc.blockBFLocations + default: + return nil } - return lc.locations[:k] } func NewLocationsCache(pk PrimaryKey) *LocationsCache { @@ -189,7 +203,11 @@ type BatchLocationsCache struct { pks []PrimaryKey k uint - locations [][]uint64 + // for block bf + blockLocations [][]uint64 + + // for basic bf + basicLocations [][]uint64 } func (lc *BatchLocationsCache) PKs() []PrimaryKey { @@ -200,15 +218,29 @@ func (lc *BatchLocationsCache) Size() int { return len(lc.pks) } -func (lc *BatchLocationsCache) Locations(k uint) [][]uint64 { - if k > lc.k { - lc.k = k - lc.locations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { - return Locations(pk, lc.k) - }) - } +func (lc *BatchLocationsCache) Locations(k uint, bfType bloomfilter.BFType) [][]uint64 { + switch bfType { + case bloomfilter.BasicBF: + if k > lc.k { + lc.k = k + lc.basicLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k, bfType) + }) + } - return lc.locations + return lc.basicLocations + case bloomfilter.BlockedBF: + // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value + if len(lc.blockLocations) != len(lc.pks) { + lc.blockLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k, bfType) + }) + } + + return lc.blockLocations + default: + return nil + } } func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache { diff --git a/internal/storage/stats.go b/internal/storage/stats.go index 7914e04b80..75da19ab5e 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -20,9 +20,10 @@ import ( "encoding/json" "fmt" - "github.com/bits-and-blooms/bloom/v3" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" @@ -31,13 +32,14 @@ import ( // PrimaryKeyStats contains statistics data for pk column type PrimaryKeyStats struct { - FieldID int64 `json:"fieldID"` - Max int64 `json:"max"` // useless, will delete - Min int64 `json:"min"` // useless, will delete - BF *bloom.BloomFilter `json:"bf"` - PkType int64 `json:"pkType"` - MaxPk PrimaryKey `json:"maxPk"` - MinPk PrimaryKey `json:"minPk"` + FieldID int64 `json:"fieldID"` + Max int64 `json:"max"` // useless, will delete + Min int64 `json:"min"` // useless, will delete + BFType bloomfilter.BFType `json:"bfType"` + BF bloomfilter.BloomFilterInterface `json:"bf"` + PkType int64 `json:"pkType"` + MaxPk PrimaryKey `json:"maxPk"` + MinPk PrimaryKey `json:"minPk"` } // UnmarshalJSON unmarshal bytes to PrimaryKeyStats @@ -110,12 +112,22 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error { } } - if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { - stats.BF = &bloom.BloomFilter{} - err = stats.BF.UnmarshalJSON(*bfMessage) + bfType := bloomfilter.BasicBF + if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil { + err := json.Unmarshal(*bfTypeMessage, &bfType) if err != nil { return err } + stats.BFType = bfType + } + + if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil { + bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType) + if err != nil { + log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err)) + bf = bloomfilter.AlwaysTrueBloomFilter + } + stats.BF = bf } return nil @@ -189,10 +201,16 @@ func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) if rowNum <= 0 { return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum) } + + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() return &PrimaryKeyStats{ FieldID: fieldID, PkType: pkType, - BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(rowNum), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), }, nil } @@ -228,10 +246,15 @@ func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error { // GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error { + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() stats := &PrimaryKeyStats{ FieldID: fieldID, PkType: int64(pkType), - BF: bloom.NewWithEstimates(uint(msgs.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType( + uint(msgs.RowNum()), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + bfType), } stats.UpdateByMsgs(msgs) diff --git a/internal/storage/stats_test.go b/internal/storage/stats_test.go index 709f49697f..cccd3d9f9e 100644 --- a/internal/storage/stats_test.go +++ b/internal/storage/stats_test.go @@ -20,12 +20,13 @@ import ( "encoding/json" "testing" - "github.com/bits-and-blooms/bloom/v3" "github.com/stretchr/testify/assert" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestStatsWriter_Int64PrimaryKey(t *testing.T) { @@ -124,11 +125,13 @@ func TestStatsWriter_UpgradePrimaryKey(t *testing.T) { Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}, } + bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue() stats := &PrimaryKeyStats{ FieldID: common.RowIDField, Min: 1, Max: 9, - BF: bloom.NewWithEstimates(100000, 0.05), + BFType: bloomfilter.BFTypeFromString(bfType), + BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, bfType), } b := make([]byte, 8) @@ -174,3 +177,30 @@ func TestDeserializeEmptyStats(t *testing.T) { _, err := DeserializeStats([]*Blob{blob}) assert.NoError(t, err) } + +func TestMarshalStats(t *testing.T) { + stat, err := NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 100000) + assert.NoError(t, err) + + for i := 0; i < 10000; i++ { + stat.Update(NewInt64PrimaryKey(int64(i))) + } + + sw := &StatsWriter{} + sw.GenerateList([]*PrimaryKeyStats{stat}) + bytes := sw.GetBuffer() + + sr := &StatsReader{} + sr.SetBuffer(bytes) + stat1, err := sr.GetPrimaryKeyStatsList() + assert.NoError(t, err) + assert.Equal(t, 1, len(stat1)) + assert.Equal(t, stat.Min, stat1[0].Min) + assert.Equal(t, stat.Max, stat1[0].Max) + + for i := 0; i < 10000; i++ { + b := make([]byte, 8) + common.Endian.PutUint64(b, uint64(i)) + assert.True(t, stat1[0].BF.Test(b)) + } +} diff --git a/internal/util/bloomfilter/bloom_filter.go b/internal/util/bloomfilter/bloom_filter.go new file mode 100644 index 0000000000..c5ca745c8b --- /dev/null +++ b/internal/util/bloomfilter/bloom_filter.go @@ -0,0 +1,339 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bloomfilter + +import ( + "encoding/json" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/cockroachdb/errors" + "github.com/greatroar/blobloom" + "github.com/pingcap/log" + "github.com/zeebo/xxh3" + "go.uber.org/zap" +) + +type BFType int + +var AlwaysTrueBloomFilter = &alwaysTrueBloomFilter{} + +const ( + UnsupportedBFName = "Unsupported BloomFilter" + BlockBFName = "BlockedBloomFilter" + BasicBFName = "BasicBloomFilter" + AlwaysTrueBFName = "AlwaysTrueBloomFilter" +) + +const ( + UnsupportedBF BFType = iota + 1 + AlwaysTrueBF // empty bloom filter + BasicBF + BlockedBF +) + +var bfNames = map[BFType]string{ + BasicBF: BasicBFName, + BlockedBF: BlockBFName, + AlwaysTrueBF: AlwaysTrueBFName, + UnsupportedBF: UnsupportedBFName, +} + +func (t BFType) String() string { + return bfNames[t] +} + +func BFTypeFromString(name string) BFType { + switch name { + case BasicBFName: + return BasicBF + case BlockBFName: + return BlockedBF + case AlwaysTrueBFName: + return AlwaysTrueBF + default: + return UnsupportedBF + } +} + +type BloomFilterInterface interface { + Type() BFType + Cap() uint + K() uint + Add(data []byte) + AddString(data string) + Test(data []byte) bool + TestString(data string) bool + TestLocations(locs []uint64) bool + BatchTestLocations(locs [][]uint64, hit []bool) []bool + MarshalJSON() ([]byte, error) + UnmarshalJSON(data []byte) error +} + +type basicBloomFilter struct { + inner *bloom.BloomFilter + k uint +} + +func newBasicBloomFilter(capacity uint, fp float64) *basicBloomFilter { + inner := bloom.NewWithEstimates(capacity, fp) + return &basicBloomFilter{ + inner: inner, + k: inner.K(), + } +} + +func (b *basicBloomFilter) Type() BFType { + return BasicBF +} + +func (b *basicBloomFilter) Cap() uint { + return b.inner.Cap() +} + +func (b *basicBloomFilter) K() uint { + return b.k +} + +func (b *basicBloomFilter) Add(data []byte) { + b.inner.Add(data) +} + +func (b *basicBloomFilter) AddString(data string) { + b.inner.AddString(data) +} + +func (b *basicBloomFilter) Test(data []byte) bool { + return b.inner.Test(data) +} + +func (b *basicBloomFilter) TestString(data string) bool { + return b.inner.TestString(data) +} + +func (b *basicBloomFilter) TestLocations(locs []uint64) bool { + return b.inner.TestLocations(locs[:b.k]) +} + +func (b *basicBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := range hits { + if !hits[i] { + if uint(len(locs[i])) < b.k { + ret[i] = true + continue + } + ret[i] = b.inner.TestLocations(locs[i][:b.k]) + } + } + return ret +} + +func (b basicBloomFilter) MarshalJSON() ([]byte, error) { + return b.inner.MarshalJSON() +} + +func (b *basicBloomFilter) UnmarshalJSON(data []byte) error { + inner := &bloom.BloomFilter{} + inner.UnmarshalJSON(data) + b.inner = inner + b.k = inner.K() + return nil +} + +// impl Blocked Bloom filter with blobloom and xxh3 hash +type blockedBloomFilter struct { + inner *blobloom.Filter + k uint +} + +func newBlockedBloomFilter(capacity uint, fp float64) *blockedBloomFilter { + inner := blobloom.NewOptimized(blobloom.Config{ + Capacity: uint64(capacity), + FPRate: fp, + }) + return &blockedBloomFilter{ + inner: inner, + k: inner.K(), + } +} + +func (b *blockedBloomFilter) Type() BFType { + return BlockedBF +} + +func (b *blockedBloomFilter) Cap() uint { + return uint(b.inner.NumBits()) +} + +func (b *blockedBloomFilter) K() uint { + return b.k +} + +func (b *blockedBloomFilter) Add(data []byte) { + loc := xxh3.Hash(data) + b.inner.Add(loc) +} + +func (b *blockedBloomFilter) AddString(data string) { + h := xxh3.HashString(data) + b.inner.Add(h) +} + +func (b *blockedBloomFilter) Test(data []byte) bool { + loc := xxh3.Hash(data) + return b.inner.Has(loc) +} + +func (b *blockedBloomFilter) TestString(data string) bool { + h := xxh3.HashString(data) + return b.inner.Has(h) +} + +func (b *blockedBloomFilter) TestLocations(locs []uint64) bool { + // for block bf, just cache it's hash result as locations + if len(locs) != 1 { + return true + } + return b.inner.Has(locs[0]) +} + +func (b *blockedBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := range hits { + if !hits[i] { + if len(locs[i]) != 1 { + ret[i] = true + continue + } + ret[i] = b.inner.Has(locs[i][0]) + } + } + return ret +} + +func (b blockedBloomFilter) MarshalJSON() ([]byte, error) { + return b.inner.MarshalJSON() +} + +func (b *blockedBloomFilter) UnmarshalJSON(data []byte) error { + inner := &blobloom.Filter{} + inner.UnmarshalJSON(data) + b.inner = inner + b.k = inner.K() + + return nil +} + +// always true bloom filter is used when deserialize stat log failed. +// Notice: add item to empty bloom filter is not permitted. and all Test Func will return false positive. +type alwaysTrueBloomFilter struct{} + +func (b *alwaysTrueBloomFilter) Type() BFType { + return AlwaysTrueBF +} + +func (b *alwaysTrueBloomFilter) Cap() uint { + return 0 +} + +func (b *alwaysTrueBloomFilter) K() uint { + return 0 +} + +func (b *alwaysTrueBloomFilter) Add(data []byte) { +} + +func (b *alwaysTrueBloomFilter) AddString(data string) { +} + +func (b *alwaysTrueBloomFilter) Test(data []byte) bool { + return true +} + +func (b *alwaysTrueBloomFilter) TestString(data string) bool { + return true +} + +func (b *alwaysTrueBloomFilter) TestLocations(locs []uint64) bool { + return true +} + +func (b *alwaysTrueBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := 0; i < len(hits); i++ { + ret[i] = true + } + + return ret +} + +func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) { + return []byte{}, nil +} + +func (b *alwaysTrueBloomFilter) UnmarshalJSON(data []byte) error { + return nil +} + +func NewBloomFilterWithType(capacity uint, fp float64, typeName string) BloomFilterInterface { + bfType := BFTypeFromString(typeName) + switch bfType { + case BlockedBF: + return newBlockedBloomFilter(capacity, fp) + case BasicBF: + return newBasicBloomFilter(capacity, fp) + default: + log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", typeName)) + return newBlockedBloomFilter(capacity, fp) + } +} + +func UnmarshalJSON(data []byte, bfType BFType) (BloomFilterInterface, error) { + switch bfType { + case BlockedBF: + bf := &blockedBloomFilter{} + err := json.Unmarshal(data, bf) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter") + } + return bf, nil + case BasicBF: + bf := &basicBloomFilter{} + err := json.Unmarshal(data, bf) + if err != nil { + return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter") + } + return bf, nil + case AlwaysTrueBF: + return AlwaysTrueBloomFilter, nil + default: + return nil, errors.Errorf("unsupported bloom filter type: %d", bfType) + } +} + +func Locations(data []byte, k uint, bfType BFType) []uint64 { + switch bfType { + case BasicBF: + return bloom.Locations(data, k) + case BlockedBF: + return []uint64{xxh3.Hash(data)} + case AlwaysTrueBF: + return nil + default: + log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", bfType.String())) + return nil + } +} diff --git a/internal/util/bloomfilter/bloom_filter_test.go b/internal/util/bloomfilter/bloom_filter_test.go new file mode 100644 index 0000000000..1c0252638a --- /dev/null +++ b/internal/util/bloomfilter/bloom_filter_test.go @@ -0,0 +1,313 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package bloomfilter + +import ( + "fmt" + "testing" + "time" + + "github.com/bits-and-blooms/bloom/v3" + "github.com/samber/lo" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-storage/go/common/log" +) + +func TestPerformance(t *testing.T) { + capacity := 1000000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", i))) + } + + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + start1 := time.Now() + for _, key := range keys { + bf1.Add(key) + } + log.Info("Block BF construct time", zap.Duration("time", time.Since(start1))) + data, err := bf1.MarshalJSON() + assert.NoError(t, err) + log.Info("Block BF size", zap.Int("size", len(data))) + + start2 := time.Now() + for _, key := range keys { + bf1.Test(key) + } + log.Info("Block BF Test cost", zap.Duration("time", time.Since(start2))) + + bf2 := newBasicBloomFilter(uint(capacity), fpr) + start3 := time.Now() + for _, key := range keys { + bf2.Add(key) + } + log.Info("Basic BF construct time", zap.Duration("time", time.Since(start3))) + data, err = bf2.MarshalJSON() + assert.NoError(t, err) + log.Info("Basic BF size", zap.Int("size", len(data))) + + start4 := time.Now() + for _, key := range keys { + bf2.Test(key) + } + log.Info("Basic BF Test cost", zap.Duration("time", time.Since(start4))) +} + +func TestPerformance_MultiBF(t *testing.T) { + capacity := 100000 + fpr := 0.001 + + testKeySize := 100000 + testKeys := make([][]byte, 0) + for i := 0; i < testKeySize; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + bfNum := 100 + bfs1 := make([]*blockedBloomFilter, 0) + start1 := time.Now() + for i := 0; i < bfNum; i++ { + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf1.Add([]byte(key)) + } + bfs1 = append(bfs1, bf1) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BlockedBF) + for i := 0; i < bfNum; i++ { + bfs1[i].TestLocations(locations) + } + } + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + bfs2 := make([]*basicBloomFilter, 0) + start1 = time.Now() + for i := 0; i < bfNum; i++ { + bf2 := newBasicBloomFilter(uint(capacity), fpr) + for _, key := range testKeys { + bf2.Add(key) + } + bfs2 = append(bfs2, bf2) + } + + log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 = time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BasicBF) + for i := 0; i < bfNum; i++ { + bfs2[i].TestLocations(locations) + } + } + log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) +} + +func TestPerformance_BatchTestLocations(t *testing.T) { + capacity := 100000 + fpr := 0.001 + + testKeySize := 100000 + testKeys := make([][]byte, 0) + for i := 0; i < testKeySize; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + batchSize := 1000 + + bfNum := 100 + bfs1 := make([]BloomFilterInterface, 0) + start1 := time.Now() + for i := 0; i < bfNum; i++ { + bf1 := NewBloomFilterWithType(uint(capacity), fpr, BlockBFName) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf1.Add([]byte(key)) + } + bfs1 = append(bfs1, bf1) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BlockedBF) + for i := 0; i < bfNum; i++ { + bfs1[i].TestLocations(locations) + } + } + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + start3 = time.Now() + for i := 0; i < testKeySize; i += batchSize { + endIdx := i + batchSize + if endIdx > testKeySize { + endIdx = testKeySize + } + locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 { + return Locations(key, bfs1[0].K(), BlockedBF) + }) + hits := make([]bool, batchSize) + for j := 0; j < bfNum; j++ { + bfs1[j].BatchTestLocations(locations, hits) + } + } + log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3))) + + bfs2 := make([]BloomFilterInterface, 0) + start1 = time.Now() + for i := 0; i < bfNum; i++ { + bf2 := NewBloomFilterWithType(uint(capacity), fpr, BasicBFName) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf2.Add([]byte(key)) + } + bfs2 = append(bfs2, bf2) + } + + log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 = time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs2[0].K(), BasicBF) + for i := 0; i < bfNum; i++ { + bfs2[i].TestLocations(locations) + } + } + log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + start3 = time.Now() + for i := 0; i < testKeySize; i += batchSize { + endIdx := i + batchSize + if endIdx > testKeySize { + endIdx = testKeySize + } + locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 { + return Locations(key, bfs2[0].K(), BasicBF) + }) + hits := make([]bool, batchSize) + for j := 0; j < bfNum; j++ { + bfs2[j].BatchTestLocations(locations, hits) + } + } + log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3))) +} + +func TestPerformance_Capacity(t *testing.T) { + fpr := 0.001 + + for _, capacity := range []int64{100, 1000, 10000, 100000, 1000000} { + keys := make([][]byte, 0) + for i := 0; i < int(capacity); i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + start1 := time.Now() + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + bf1.Add(key) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + testKeys := make([][]byte, 0) + for i := 0; i < 10000; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bf1.K(), bf1.Type()) + bf1.TestLocations(locations) + } + _, k := bloom.EstimateParameters(uint(capacity), fpr) + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)), zap.Int("k", int(k)), zap.Int64("capacity", capacity)) + } +} + +func TestMarshal(t *testing.T) { + capacity := 200000 + fpr := 0.001 + + keys := make([][]byte, 0) + for i := 0; i < capacity; i++ { + keys = append(keys, []byte(fmt.Sprintf("key%d", i))) + } + + // test basic bf + basicBF := newBasicBloomFilter(uint(capacity), fpr) + for _, key := range keys { + basicBF.Add(key) + } + data, err := basicBF.MarshalJSON() + assert.NoError(t, err) + basicBF2, err := UnmarshalJSON(data, BasicBF) + assert.NoError(t, err) + assert.Equal(t, basicBF.Type(), basicBF2.Type()) + + for _, key := range keys { + assert.True(t, basicBF2.Test(key)) + } + + // test block bf + blockBF := newBlockedBloomFilter(uint(capacity), fpr) + for _, key := range keys { + blockBF.Add(key) + } + data, err = blockBF.MarshalJSON() + assert.NoError(t, err) + blockBF2, err := UnmarshalJSON(data, BlockedBF) + assert.NoError(t, err) + assert.Equal(t, blockBF.Type(), blockBF.Type()) + for _, key := range keys { + assert.True(t, blockBF2.Test(key)) + } + + // test compatible with bits-and-blooms/bloom + bf := bloom.NewWithEstimates(uint(capacity), fpr) + for _, key := range keys { + bf.Add(key) + } + data, err = bf.MarshalJSON() + assert.NoError(t, err) + bf2, err := UnmarshalJSON(data, BasicBF) + assert.NoError(t, err) + for _, key := range keys { + assert.True(t, bf2.Test(key)) + } + + // test empty bloom filter + emptyBF := AlwaysTrueBloomFilter + for _, key := range keys { + bf.Add(key) + } + data, err = emptyBF.MarshalJSON() + assert.NoError(t, err) + emptyBF2, err := UnmarshalJSON(data, AlwaysTrueBF) + assert.NoError(t, err) + for _, key := range keys { + assert.True(t, emptyBF2.Test(key)) + } +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c1db39dbd6..fdce4ef9cf 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -244,6 +244,7 @@ type commonConfig struct { TraceLogMode ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"` MaxBloomFalsePositive ParamItem `refreshable:"true"` + BloomFilterType ParamItem `refreshable:"true"` BloomFilterApplyBatchSize ParamItem `refreshable:"true"` PanicWhenPluginFail ParamItem `refreshable:"false"` @@ -730,6 +731,15 @@ like the old password verification when updating the credential`, } p.BloomFilterSize.Init(base.mgr) + p.BloomFilterType = ParamItem{ + Key: "common.bloomFilterType", + Version: "2.4.3", + DefaultValue: "BasicBloomFilter", + Doc: "bloom filter type, support BasicBloomFilter and BlockedBloomFilter", + Export: true, + } + p.BloomFilterType.Init(base.mgr) + p.MaxBloomFalsePositive = ParamItem{ Key: "common.maxBloomFalsePositive", Version: "2.3.2", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 8968d3ab3b..672aea18d0 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -573,6 +573,7 @@ func TestCachedParam(t *testing.T) { assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint()) + assert.Equal(t, "BasicBloomFilter", params.CommonCfg.BloomFilterType.GetValue()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64()) diff --git a/tests/integration/bloomfilter/bloom_filter_test.go b/tests/integration/bloomfilter/bloom_filter_test.go new file mode 100644 index 0000000000..4ee4437afb --- /dev/null +++ b/tests/integration/bloomfilter/bloom_filter_test.go @@ -0,0 +1,196 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bloomfilter + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type BloomFilterTestSuit struct { + integration.MiniClusterSuite +} + +func (s *BloomFilterTestSuit) SetupSuite() { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") + + // disable compaction + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") + + s.Require().NoError(s.SetupEmbedEtcd()) +} + +func (s *BloomFilterTestSuit) TearDownSuite() { + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) + s.MiniClusterSuite.TearDownSuite() +} + +func (s *BloomFilterTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const ( + dim = 128 + dbName = "" + ) + + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: int32(channelNum), + }) + s.NoError(err) + s.True(merr.Ok(createCollectionStatus)) + + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.True(merr.Ok(showCollectionsResp.Status)) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + for i := 0; i < segmentNum; i++ { + // change bf type in real time + if i%2 == 0 { + paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BasicBloomFilter") + } else { + paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BlockedBloomFilter") + } + + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, segmentRowNum, dim) + hashKeys := integration.GenerateHashKeys(segmentRowNum) + insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(segmentRowNum), + }) + s.NoError(err) + s.True(merr.Ok(insertResult.Status)) + + if segmentDeleteNum > 0 { + if segmentDeleteNum > segmentRowNum { + segmentDeleteNum = segmentRowNum + } + + pks := insertResult.GetIDs().GetIntId().GetData()[:segmentDeleteNum] + log.Info("========================delete expr==================", + zap.Int("length of pk", len(pks)), + ) + + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + } + + // flush + flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + } + + // create index + createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2), + }) + s.NoError(err) + s.True(merr.Ok(createIndexStatus)) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + for i := 1; i < replica; i++ { + s.Cluster.AddQueryNode() + } + + // load + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: int32(replica), + }) + s.NoError(err) + s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode()) + s.True(merr.Ok(loadStatus)) + s.WaitForLoad(ctx, collectionName) + log.Info("initCollection Done") +} + +func (s *BloomFilterTestSuit) TestLoadAndQuery() { + name := "test_balance_" + funcutil.GenRandomStr() + s.initCollection(name, 1, 2, 10, 2000, 500) + + ctx := context.Background() + queryResult, err := s.Cluster.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: "", + CollectionName: name, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + if !merr.Ok(queryResult.GetStatus()) { + log.Warn("queryResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason())) + } + s.NoError(err) + s.True(merr.Ok(queryResult.GetStatus())) + numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0] + s.Equal(numEntities, int64(15000)) +} + +func TestBloomFilter(t *testing.T) { + suite.Run(t, new(BloomFilterTestSuit)) +}