mirror of https://github.com/milvus-io/milvus.git
enhance: Use Blocked Bloom Filter instead of basic bloom fitler impl (#34377)
issue: #32995 pr: #33405 To speed up the construction and querying of Bloom filters, we chose a blocked Bloom filter instead of a basic Bloom filter implementation. WARN: This PR is compatible with old version bf impl, but if fall back to old milvus version, it may causes bloom filter deserialize failed. In single Bloom filter test cases with a capacity of 1,000,000 and a false positive rate (FPR) of 0.001, the blocked Bloom filter is 5 times faster than the basic Bloom filter in both querying and construction, at the cost of a 30% increase in memory usage. Block BF construct time {"time": "54.128131ms"} Block BF size {"size": 3021578} Block BF Test cost {"time": "55.407352ms"} Basic BF construct time {"time": "210.262183ms"} Basic BF size {"size": 2396308} Basic BF Test cost {"time": "192.596229ms"} In multi Bloom filter test cases with a capacity of 100,000, an FPR of 0.001, and 100 Bloom filters, we reuse the primary key locations for all Bloom filters to avoid repeated hash computations. As a result, the blocked Bloom filter is also 5 times faster than the basic Bloom filter in querying. Block BF TestLocation cost {"time": "529.97183ms"} Basic BF TestLocation cost {"time": "3.197430181s"} Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/34471/head
parent
173c02902e
commit
d3e94f9861
7
go.mod
7
go.mod
|
@ -65,11 +65,12 @@ require (
|
||||||
require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70
|
require github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/greatroar/blobloom v0.8.0
|
||||||
github.com/jolestar/go-commons-pool/v2 v2.1.2
|
github.com/jolestar/go-commons-pool/v2 v2.1.2
|
||||||
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
|
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/valyala/fastjson v1.6.4
|
github.com/valyala/fastjson v1.6.4
|
||||||
google.golang.org/protobuf v1.33.0
|
github.com/zeebo/xxh3 v1.0.2
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -208,7 +209,6 @@ require (
|
||||||
github.com/x448/float16 v0.8.4 // indirect
|
github.com/x448/float16 v0.8.4 // indirect
|
||||||
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
|
||||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||||
github.com/zeebo/xxh3 v1.0.2 // indirect
|
|
||||||
go.etcd.io/bbolt v1.3.6 // indirect
|
go.etcd.io/bbolt v1.3.6 // indirect
|
||||||
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
|
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
|
||||||
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
|
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
|
||||||
|
@ -235,6 +235,7 @@ require (
|
||||||
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
|
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
|
||||||
|
google.golang.org/protobuf v1.33.0 // indirect
|
||||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||||
|
@ -256,3 +257,5 @@ replace (
|
||||||
)
|
)
|
||||||
|
|
||||||
exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b
|
exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b
|
||||||
|
|
||||||
|
replace github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -603,6 +603,8 @@ github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d h1:5PJl274Y63IEHC+7izoQ
|
||||||
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||||
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
|
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
|
||||||
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
|
||||||
|
github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEHKbbv51OwNGO82U+d6ut08ppTmZVm+VY=
|
||||||
|
github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
|
||||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
|
||||||
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
|
||||||
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.5 h1:83SZKox70jyABAPY7HdbvrwI3cmgwzyuc7dO4dYmoC0=
|
github.com/milvus-io/milvus-proto/go-api/v2 v2.4.5 h1:83SZKox70jyABAPY7HdbvrwI3cmgwzyuc7dO4dYmoC0=
|
||||||
|
|
|
@ -19,10 +19,10 @@ package metacache
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -106,8 +106,9 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
|
||||||
|
|
||||||
if bfs.current == nil {
|
if bfs.current == nil {
|
||||||
bfs.current = &storage.PkStatistics{
|
bfs.current = &storage.PkStatistics{
|
||||||
PkFilter: bloom.NewWithEstimates(bfs.batchSize,
|
PkFilter: bloomfilter.NewBloomFilterWithType(bfs.batchSize,
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterType.GetValue()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -214,6 +214,7 @@ func (s *storageV1Serializer) serializeMergedPkStats(pack *SyncPack) (*storage.B
|
||||||
FieldID: s.pkField.GetFieldID(),
|
FieldID: s.pkField.GetFieldID(),
|
||||||
MaxPk: pks.MaxPK,
|
MaxPk: pks.MaxPK,
|
||||||
MinPk: pks.MinPK,
|
MinPk: pks.MinPK,
|
||||||
|
BFType: pks.PkFilter.Type(),
|
||||||
BF: pks.PkFilter,
|
BF: pks.PkFilter,
|
||||||
PkType: int64(s.pkField.GetDataType()),
|
PkType: int64(s.pkField.GetDataType()),
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,6 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
bloom "github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"github.com/stretchr/testify/mock"
|
"github.com/stretchr/testify/mock"
|
||||||
|
@ -42,6 +41,7 @@ import (
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
"github.com/milvus-io/milvus/internal/querynodev2/segments"
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
|
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/internal/util/initcore"
|
"github.com/milvus-io/milvus/internal/util/initcore"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||||
|
@ -281,8 +281,9 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
|
||||||
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
||||||
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
||||||
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
||||||
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
bf := bloomfilter.NewBloomFilterWithType(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
|
||||||
pks := &storage.PkStatistics{
|
pks := &storage.PkStatistics{
|
||||||
PkFilter: bf,
|
PkFilter: bf,
|
||||||
}
|
}
|
||||||
|
@ -537,8 +538,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
|
||||||
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
||||||
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
||||||
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
||||||
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
bf := bloomfilter.NewBloomFilterWithType(
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
|
||||||
pks := &storage.PkStatistics{
|
pks := &storage.PkStatistics{
|
||||||
PkFilter: bf,
|
PkFilter: bf,
|
||||||
}
|
}
|
||||||
|
@ -695,8 +698,10 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
|
||||||
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
||||||
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
||||||
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
||||||
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
bf := bloomfilter.NewBloomFilterWithType(
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
|
||||||
pks := &storage.PkStatistics{
|
pks := &storage.PkStatistics{
|
||||||
PkFilter: bf,
|
PkFilter: bf,
|
||||||
}
|
}
|
||||||
|
@ -896,8 +901,10 @@ func (s *DelegatorDataSuite) TestReleaseSegment() {
|
||||||
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
Call.Return(func(ctx context.Context, collectionID int64, version int64, infos ...*querypb.SegmentLoadInfo) []*pkoracle.BloomFilterSet {
|
||||||
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) *pkoracle.BloomFilterSet {
|
||||||
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
bfs := pkoracle.NewBloomFilterSet(info.GetSegmentID(), info.GetPartitionID(), commonpb.SegmentState_Sealed)
|
||||||
bf := bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
bf := bloomfilter.NewBloomFilterWithType(
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat())
|
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
|
||||||
pks := &storage.PkStatistics{
|
pks := &storage.PkStatistics{
|
||||||
PkFilter: bf,
|
PkFilter: bf,
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,12 +19,12 @@ package pkoracle
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
bloom "github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
@ -40,8 +40,6 @@ type BloomFilterSet struct {
|
||||||
segType commonpb.SegmentState
|
segType commonpb.SegmentState
|
||||||
currentStat *storage.PkStatistics
|
currentStat *storage.PkStatistics
|
||||||
historyStats []*storage.PkStatistics
|
historyStats []*storage.PkStatistics
|
||||||
|
|
||||||
kHashFunc uint
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MayPkExist returns whether any bloom filters returns positive.
|
// MayPkExist returns whether any bloom filters returns positive.
|
||||||
|
@ -97,11 +95,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) {
|
||||||
defer s.statsMutex.Unlock()
|
defer s.statsMutex.Unlock()
|
||||||
|
|
||||||
if s.currentStat == nil {
|
if s.currentStat == nil {
|
||||||
|
bf := bloomfilter.NewBloomFilterWithType(
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
|
||||||
s.currentStat = &storage.PkStatistics{
|
s.currentStat = &storage.PkStatistics{
|
||||||
PkFilter: bloom.NewWithEstimates(
|
PkFilter: bf,
|
||||||
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,9 +127,6 @@ func (s *BloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) {
|
||||||
s.statsMutex.Lock()
|
s.statsMutex.Lock()
|
||||||
defer s.statsMutex.Unlock()
|
defer s.statsMutex.Unlock()
|
||||||
|
|
||||||
if stats.PkFilter.K() > s.kHashFunc {
|
|
||||||
s.kHashFunc = stats.PkFilter.K()
|
|
||||||
}
|
|
||||||
s.historyStats = append(s.historyStats, stats)
|
s.historyStats = append(s.historyStats, stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,101 +0,0 @@
|
||||||
// Licensed to the LF AI & Data foundation under one
|
|
||||||
// or more contributor license agreements. See the NOTICE file
|
|
||||||
// distributed with this work for additional information
|
|
||||||
// regarding copyright ownership. The ASF licenses this file
|
|
||||||
// to you under the Apache License, Version 2.0 (the
|
|
||||||
// "License"); you may not use this file except in compliance
|
|
||||||
// with the License. You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package segments
|
|
||||||
|
|
||||||
import (
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
bloom "github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
|
||||||
storage "github.com/milvus-io/milvus/internal/storage"
|
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
||||||
)
|
|
||||||
|
|
||||||
type bloomFilterSet struct {
|
|
||||||
statsMutex sync.RWMutex
|
|
||||||
currentStat *storage.PkStatistics
|
|
||||||
historyStats []*storage.PkStatistics
|
|
||||||
}
|
|
||||||
|
|
||||||
func newBloomFilterSet() *bloomFilterSet {
|
|
||||||
return &bloomFilterSet{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// MayPkExist returns whether any bloom filters returns positive.
|
|
||||||
func (s *bloomFilterSet) MayPkExist(pk storage.PrimaryKey) bool {
|
|
||||||
s.statsMutex.RLock()
|
|
||||||
defer s.statsMutex.RUnlock()
|
|
||||||
if s.currentStat != nil && s.currentStat.PkExist(pk) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
// for sealed, if one of the stats shows it exist, then we have to check it
|
|
||||||
for _, historyStat := range s.historyStats {
|
|
||||||
if historyStat.PkExist(pk) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateBloomFilter updates currentStats with provided pks.
|
|
||||||
func (s *bloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) {
|
|
||||||
s.statsMutex.Lock()
|
|
||||||
defer s.statsMutex.Unlock()
|
|
||||||
|
|
||||||
if s.currentStat == nil {
|
|
||||||
s.initCurrentStat()
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := make([]byte, 8)
|
|
||||||
for _, pk := range pks {
|
|
||||||
s.currentStat.UpdateMinMax(pk)
|
|
||||||
switch pk.Type() {
|
|
||||||
case schemapb.DataType_Int64:
|
|
||||||
int64Value := pk.(*storage.Int64PrimaryKey).Value
|
|
||||||
common.Endian.PutUint64(buf, uint64(int64Value))
|
|
||||||
s.currentStat.PkFilter.Add(buf)
|
|
||||||
case schemapb.DataType_VarChar:
|
|
||||||
stringValue := pk.(*storage.VarCharPrimaryKey).Value
|
|
||||||
s.currentStat.PkFilter.AddString(stringValue)
|
|
||||||
default:
|
|
||||||
log.Error("failed to update bloomfilter", zap.Any("PK type", pk.Type()))
|
|
||||||
panic("failed to update bloomfilter")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddHistoricalStats add loaded historical stats.
|
|
||||||
func (s *bloomFilterSet) AddHistoricalStats(stats *storage.PkStatistics) {
|
|
||||||
s.statsMutex.Lock()
|
|
||||||
defer s.statsMutex.Unlock()
|
|
||||||
|
|
||||||
s.historyStats = append(s.historyStats, stats)
|
|
||||||
}
|
|
||||||
|
|
||||||
// initCurrentStat initialize currentStats if nil.
|
|
||||||
// Note: invoker shall acquire statsMutex lock first.
|
|
||||||
func (s *bloomFilterSet) initCurrentStat() {
|
|
||||||
s.currentStat = &storage.PkStatistics{
|
|
||||||
PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
|
|
||||||
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,91 +0,0 @@
|
||||||
// Licensed to the LF AI & Data foundation under one
|
|
||||||
// or more contributor license agreements. See the NOTICE file
|
|
||||||
// distributed with this work for additional information
|
|
||||||
// regarding copyright ownership. The ASF licenses this file
|
|
||||||
// to you under the Apache License, Version 2.0 (the
|
|
||||||
// "License"); you may not use this file except in compliance
|
|
||||||
// with the License. You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package segments
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/suite"
|
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/storage"
|
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
|
||||||
)
|
|
||||||
|
|
||||||
type BloomFilterSetSuite struct {
|
|
||||||
suite.Suite
|
|
||||||
|
|
||||||
intPks []int64
|
|
||||||
stringPks []string
|
|
||||||
set *bloomFilterSet
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *BloomFilterSetSuite) SetupTest() {
|
|
||||||
suite.intPks = []int64{1, 2, 3}
|
|
||||||
suite.stringPks = []string{"1", "2", "3"}
|
|
||||||
paramtable.Init()
|
|
||||||
suite.set = newBloomFilterSet()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *BloomFilterSetSuite) TestInt64PkBloomFilter() {
|
|
||||||
pks, err := storage.GenInt64PrimaryKeys(suite.intPks...)
|
|
||||||
suite.NoError(err)
|
|
||||||
|
|
||||||
suite.set.UpdateBloomFilter(pks)
|
|
||||||
for _, pk := range pks {
|
|
||||||
exist := suite.set.MayPkExist(pk)
|
|
||||||
suite.True(exist)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *BloomFilterSetSuite) TestStringPkBloomFilter() {
|
|
||||||
pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...)
|
|
||||||
suite.NoError(err)
|
|
||||||
|
|
||||||
suite.set.UpdateBloomFilter(pks)
|
|
||||||
for _, pk := range pks {
|
|
||||||
exist := suite.set.MayPkExist(pk)
|
|
||||||
suite.True(exist)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (suite *BloomFilterSetSuite) TestHistoricalBloomFilter() {
|
|
||||||
pks, err := storage.GenVarcharPrimaryKeys(suite.stringPks...)
|
|
||||||
suite.NoError(err)
|
|
||||||
|
|
||||||
suite.set.UpdateBloomFilter(pks)
|
|
||||||
for _, pk := range pks {
|
|
||||||
exist := suite.set.MayPkExist(pk)
|
|
||||||
suite.True(exist)
|
|
||||||
}
|
|
||||||
|
|
||||||
old := suite.set.currentStat
|
|
||||||
suite.set.currentStat = nil
|
|
||||||
for _, pk := range pks {
|
|
||||||
exist := suite.set.MayPkExist(pk)
|
|
||||||
suite.False(exist)
|
|
||||||
}
|
|
||||||
|
|
||||||
suite.set.AddHistoricalStats(old)
|
|
||||||
for _, pk := range pks {
|
|
||||||
exist := suite.set.MayPkExist(pk)
|
|
||||||
suite.True(exist)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestBloomFilterSet(t *testing.T) {
|
|
||||||
suite.Run(t, &BloomFilterSetSuite{})
|
|
||||||
}
|
|
|
@ -20,10 +20,12 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bloom/v3"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
@ -31,12 +33,13 @@ import (
|
||||||
// FieldStats contains statistics data for any column
|
// FieldStats contains statistics data for any column
|
||||||
// todo: compatible to PrimaryKeyStats
|
// todo: compatible to PrimaryKeyStats
|
||||||
type FieldStats struct {
|
type FieldStats struct {
|
||||||
FieldID int64 `json:"fieldID"`
|
FieldID int64 `json:"fieldID"`
|
||||||
Type schemapb.DataType `json:"type"`
|
Type schemapb.DataType `json:"type"`
|
||||||
Max ScalarFieldValue `json:"max"` // for scalar field
|
Max ScalarFieldValue `json:"max"` // for scalar field
|
||||||
Min ScalarFieldValue `json:"min"` // for scalar field
|
Min ScalarFieldValue `json:"min"` // for scalar field
|
||||||
BF *bloom.BloomFilter `json:"bf"` // for scalar field
|
BFType bloomfilter.BFType `json:"bfType"` // for scalar field
|
||||||
Centroids []VectorFieldValue `json:"centroids"` // for vector field
|
BF bloomfilter.BloomFilterInterface `json:"bf"` // for scalar field
|
||||||
|
Centroids []VectorFieldValue `json:"centroids"` // for vector field
|
||||||
}
|
}
|
||||||
|
|
||||||
func (stats *FieldStats) Clone() FieldStats {
|
func (stats *FieldStats) Clone() FieldStats {
|
||||||
|
@ -152,12 +155,22 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil {
|
bfType := bloomfilter.BasicBF
|
||||||
stats.BF = &bloom.BloomFilter{}
|
if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil {
|
||||||
err = stats.BF.UnmarshalJSON(*bfMessage)
|
err := json.Unmarshal(*bfTypeMessage, &bfType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
stats.BFType = bfType
|
||||||
|
}
|
||||||
|
|
||||||
|
if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil {
|
||||||
|
bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err))
|
||||||
|
bf = bloomfilter.AlwaysTrueBloomFilter
|
||||||
|
}
|
||||||
|
stats.BF = bf
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stats.initCentroids(data, stats.Type)
|
stats.initCentroids(data, stats.Type)
|
||||||
|
@ -172,12 +185,12 @@ func (stats *FieldStats) UnmarshalJSON(data []byte) error {
|
||||||
|
|
||||||
func (stats *FieldStats) initCentroids(data []byte, dataType schemapb.DataType) {
|
func (stats *FieldStats) initCentroids(data []byte, dataType schemapb.DataType) {
|
||||||
type FieldStatsAux struct {
|
type FieldStatsAux struct {
|
||||||
FieldID int64 `json:"fieldID"`
|
FieldID int64 `json:"fieldID"`
|
||||||
Type schemapb.DataType `json:"type"`
|
Type schemapb.DataType `json:"type"`
|
||||||
Max json.RawMessage `json:"max"`
|
Max json.RawMessage `json:"max"`
|
||||||
Min json.RawMessage `json:"min"`
|
Min json.RawMessage `json:"min"`
|
||||||
BF *bloom.BloomFilter `json:"bf"`
|
BF bloomfilter.BloomFilterInterface `json:"bf"`
|
||||||
Centroids []json.RawMessage `json:"centroids"`
|
Centroids []json.RawMessage `json:"centroids"`
|
||||||
}
|
}
|
||||||
// Unmarshal JSON into the auxiliary struct
|
// Unmarshal JSON into the auxiliary struct
|
||||||
var aux FieldStatsAux
|
var aux FieldStatsAux
|
||||||
|
@ -372,10 +385,15 @@ func NewFieldStats(fieldID int64, pkType schemapb.DataType, rowNum int64) (*Fiel
|
||||||
Type: pkType,
|
Type: pkType,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue()
|
||||||
return &FieldStats{
|
return &FieldStats{
|
||||||
FieldID: fieldID,
|
FieldID: fieldID,
|
||||||
Type: pkType,
|
Type: pkType,
|
||||||
BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
BFType: bloomfilter.BFTypeFromString(bfType),
|
||||||
|
BF: bloomfilter.NewBloomFilterWithType(
|
||||||
|
uint(rowNum),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
bfType),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,11 +420,17 @@ func (sw *FieldStatsWriter) GenerateList(stats []*FieldStats) error {
|
||||||
// GenerateByData writes data from @msgs with @fieldID to @buffer
|
// GenerateByData writes data from @msgs with @fieldID to @buffer
|
||||||
func (sw *FieldStatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs ...FieldData) error {
|
func (sw *FieldStatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs ...FieldData) error {
|
||||||
statsList := make([]*FieldStats, 0)
|
statsList := make([]*FieldStats, 0)
|
||||||
|
|
||||||
|
bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue()
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
stats := &FieldStats{
|
stats := &FieldStats{
|
||||||
FieldID: fieldID,
|
FieldID: fieldID,
|
||||||
Type: pkType,
|
Type: pkType,
|
||||||
BF: bloom.NewWithEstimates(uint(msg.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
BFType: bloomfilter.BFTypeFromString(bfType),
|
||||||
|
BF: bloomfilter.NewBloomFilterWithType(
|
||||||
|
uint(msg.RowNum()),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
bfType),
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.UpdateByMsgs(msg)
|
stats.UpdateByMsgs(msg)
|
||||||
|
|
|
@ -20,12 +20,13 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestFieldStatsUpdate(t *testing.T) {
|
func TestFieldStatsUpdate(t *testing.T) {
|
||||||
|
@ -373,7 +374,7 @@ func TestFieldStatsWriter_UpgradePrimaryKey(t *testing.T) {
|
||||||
FieldID: common.RowIDField,
|
FieldID: common.RowIDField,
|
||||||
Min: 1,
|
Min: 1,
|
||||||
Max: 9,
|
Max: 9,
|
||||||
BF: bloom.NewWithEstimates(100000, 0.05),
|
BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, paramtable.Get().CommonCfg.BloomFilterType.GetValue()),
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]byte, 8)
|
b := make([]byte, 8)
|
||||||
|
@ -574,8 +575,9 @@ func TestFieldStatsUnMarshal(t *testing.T) {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": \"b\"}"))
|
err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": \"b\"}"))
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
// return AlwaysTrueBloomFilter when deserialize bloom filter failed.
|
||||||
err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": 1, \"bf\": \"2\"}"))
|
err = stats.UnmarshalJSON([]byte("{\"fieldID\":1,\"max\":10, \"maxPk\":10, \"minPk\": 1, \"bf\": \"2\"}"))
|
||||||
assert.Error(t, err)
|
assert.NoError(t, err)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("succeed", func(t *testing.T) {
|
t.Run("succeed", func(t *testing.T) {
|
||||||
|
|
|
@ -19,19 +19,19 @@ package storage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"github.com/cockroachdb/errors"
|
"github.com/cockroachdb/errors"
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
// pkStatistics contains pk field statistic information
|
// pkStatistics contains pk field statistic information
|
||||||
type PkStatistics struct {
|
type PkStatistics struct {
|
||||||
PkFilter *bloom.BloomFilter // bloom filter of pk inside a segment
|
PkFilter bloomfilter.BloomFilterInterface // bloom filter of pk inside a segment
|
||||||
MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
|
MinPK PrimaryKey // minimal pk value, shortcut for checking whether a pk is inside this segment
|
||||||
MaxPK PrimaryKey // maximal pk value, same above
|
MaxPK PrimaryKey // maximal pk value, same above
|
||||||
}
|
}
|
||||||
|
|
||||||
// update set pk min/max value if input value is beyond former range.
|
// update set pk min/max value if input value is beyond former range.
|
||||||
|
@ -110,16 +110,16 @@ func (st *PkStatistics) PkExist(pk PrimaryKey) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Locations returns a list of hash locations representing a data item.
|
// Locations returns a list of hash locations representing a data item.
|
||||||
func Locations(pk PrimaryKey, k uint) []uint64 {
|
func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64 {
|
||||||
switch pk.Type() {
|
switch pk.Type() {
|
||||||
case schemapb.DataType_Int64:
|
case schemapb.DataType_Int64:
|
||||||
buf := make([]byte, 8)
|
buf := make([]byte, 8)
|
||||||
int64Pk := pk.(*Int64PrimaryKey)
|
int64Pk := pk.(*Int64PrimaryKey)
|
||||||
common.Endian.PutUint64(buf, uint64(int64Pk.Value))
|
common.Endian.PutUint64(buf, uint64(int64Pk.Value))
|
||||||
return bloom.Locations(buf, k)
|
return bloomfilter.Locations(buf, k, bfType)
|
||||||
case schemapb.DataType_VarChar:
|
case schemapb.DataType_VarChar:
|
||||||
varCharPk := pk.(*VarCharPrimaryKey)
|
varCharPk := pk.(*VarCharPrimaryKey)
|
||||||
return bloom.Locations([]byte(varCharPk.Value), k)
|
return bloomfilter.Locations([]byte(varCharPk.Value), k, bfType)
|
||||||
default:
|
default:
|
||||||
// TODO::
|
// TODO::
|
||||||
}
|
}
|
||||||
|
@ -133,7 +133,7 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check bf first, TestLocation just do some bitset compute, cost is cheaper
|
// check bf first, TestLocation just do some bitset compute, cost is cheaper
|
||||||
if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K())) {
|
if !st.PkFilter.TestLocations(lc.Locations(st.PkFilter.K(), st.PkFilter.Type())) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,13 +148,15 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo
|
||||||
}
|
}
|
||||||
|
|
||||||
// check bf first, TestLocation just do some bitset compute, cost is cheaper
|
// check bf first, TestLocation just do some bitset compute, cost is cheaper
|
||||||
locations := lc.Locations(st.PkFilter.K())
|
locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type())
|
||||||
|
ret := st.PkFilter.BatchTestLocations(locations, hits)
|
||||||
|
|
||||||
|
// todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment,
|
||||||
|
// hits array will be removed after we merge bf in segment
|
||||||
pks := lc.PKs()
|
pks := lc.PKs()
|
||||||
for i := range pks {
|
for i := range ret {
|
||||||
// todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment,
|
|
||||||
// hits array will be removed after we merge bf in segment
|
|
||||||
if !hits[i] {
|
if !hits[i] {
|
||||||
hits[i] = st.PkFilter.TestLocations(locations[i]) && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i])
|
hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,19 +166,31 @@ func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []boo
|
||||||
// LocationsCache is a helper struct caching pk bloom filter locations.
|
// LocationsCache is a helper struct caching pk bloom filter locations.
|
||||||
// Note that this helper is not concurrent safe and shall be used in same goroutine.
|
// Note that this helper is not concurrent safe and shall be used in same goroutine.
|
||||||
type LocationsCache struct {
|
type LocationsCache struct {
|
||||||
pk PrimaryKey
|
pk PrimaryKey
|
||||||
locations []uint64
|
basicBFLocations []uint64
|
||||||
|
blockBFLocations []uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc *LocationsCache) GetPk() PrimaryKey {
|
func (lc *LocationsCache) GetPk() PrimaryKey {
|
||||||
return lc.pk
|
return lc.pk
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc *LocationsCache) Locations(k uint) []uint64 {
|
func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64 {
|
||||||
if int(k) > len(lc.locations) {
|
switch bfType {
|
||||||
lc.locations = Locations(lc.pk, k)
|
case bloomfilter.BasicBF:
|
||||||
|
if int(k) > len(lc.basicBFLocations) {
|
||||||
|
lc.basicBFLocations = Locations(lc.pk, k, bfType)
|
||||||
|
}
|
||||||
|
return lc.basicBFLocations[:k]
|
||||||
|
case bloomfilter.BlockedBF:
|
||||||
|
// for block bf, we only need cache the hash result, which is a uint and only compute once for any k value
|
||||||
|
if len(lc.blockBFLocations) != 1 {
|
||||||
|
lc.blockBFLocations = Locations(lc.pk, 1, bfType)
|
||||||
|
}
|
||||||
|
return lc.blockBFLocations
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return lc.locations[:k]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocationsCache(pk PrimaryKey) *LocationsCache {
|
func NewLocationsCache(pk PrimaryKey) *LocationsCache {
|
||||||
|
@ -189,7 +203,11 @@ type BatchLocationsCache struct {
|
||||||
pks []PrimaryKey
|
pks []PrimaryKey
|
||||||
k uint
|
k uint
|
||||||
|
|
||||||
locations [][]uint64
|
// for block bf
|
||||||
|
blockLocations [][]uint64
|
||||||
|
|
||||||
|
// for basic bf
|
||||||
|
basicLocations [][]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc *BatchLocationsCache) PKs() []PrimaryKey {
|
func (lc *BatchLocationsCache) PKs() []PrimaryKey {
|
||||||
|
@ -200,15 +218,29 @@ func (lc *BatchLocationsCache) Size() int {
|
||||||
return len(lc.pks)
|
return len(lc.pks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (lc *BatchLocationsCache) Locations(k uint) [][]uint64 {
|
func (lc *BatchLocationsCache) Locations(k uint, bfType bloomfilter.BFType) [][]uint64 {
|
||||||
if k > lc.k {
|
switch bfType {
|
||||||
lc.k = k
|
case bloomfilter.BasicBF:
|
||||||
lc.locations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 {
|
if k > lc.k {
|
||||||
return Locations(pk, lc.k)
|
lc.k = k
|
||||||
})
|
lc.basicLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 {
|
||||||
}
|
return Locations(pk, lc.k, bfType)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
return lc.locations
|
return lc.basicLocations
|
||||||
|
case bloomfilter.BlockedBF:
|
||||||
|
// for block bf, we only need cache the hash result, which is a uint and only compute once for any k value
|
||||||
|
if len(lc.blockLocations) != len(lc.pks) {
|
||||||
|
lc.blockLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 {
|
||||||
|
return Locations(pk, lc.k, bfType)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return lc.blockLocations
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache {
|
func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache {
|
||||||
|
|
|
@ -20,9 +20,10 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bloom/v3"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/log"
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
@ -31,13 +32,14 @@ import (
|
||||||
|
|
||||||
// PrimaryKeyStats contains statistics data for pk column
|
// PrimaryKeyStats contains statistics data for pk column
|
||||||
type PrimaryKeyStats struct {
|
type PrimaryKeyStats struct {
|
||||||
FieldID int64 `json:"fieldID"`
|
FieldID int64 `json:"fieldID"`
|
||||||
Max int64 `json:"max"` // useless, will delete
|
Max int64 `json:"max"` // useless, will delete
|
||||||
Min int64 `json:"min"` // useless, will delete
|
Min int64 `json:"min"` // useless, will delete
|
||||||
BF *bloom.BloomFilter `json:"bf"`
|
BFType bloomfilter.BFType `json:"bfType"`
|
||||||
PkType int64 `json:"pkType"`
|
BF bloomfilter.BloomFilterInterface `json:"bf"`
|
||||||
MaxPk PrimaryKey `json:"maxPk"`
|
PkType int64 `json:"pkType"`
|
||||||
MinPk PrimaryKey `json:"minPk"`
|
MaxPk PrimaryKey `json:"maxPk"`
|
||||||
|
MinPk PrimaryKey `json:"minPk"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalJSON unmarshal bytes to PrimaryKeyStats
|
// UnmarshalJSON unmarshal bytes to PrimaryKeyStats
|
||||||
|
@ -110,12 +112,22 @@ func (stats *PrimaryKeyStats) UnmarshalJSON(data []byte) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil {
|
bfType := bloomfilter.BasicBF
|
||||||
stats.BF = &bloom.BloomFilter{}
|
if bfTypeMessage, ok := messageMap["bfType"]; ok && bfTypeMessage != nil {
|
||||||
err = stats.BF.UnmarshalJSON(*bfMessage)
|
err := json.Unmarshal(*bfTypeMessage, &bfType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
stats.BFType = bfType
|
||||||
|
}
|
||||||
|
|
||||||
|
if bfMessage, ok := messageMap["bf"]; ok && bfMessage != nil {
|
||||||
|
bf, err := bloomfilter.UnmarshalJSON(*bfMessage, bfType)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Failed to unmarshal bloom filter, use AlwaysTrueBloomFilter instead of return err", zap.Error(err))
|
||||||
|
bf = bloomfilter.AlwaysTrueBloomFilter
|
||||||
|
}
|
||||||
|
stats.BF = bf
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -189,10 +201,16 @@ func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error)
|
||||||
if rowNum <= 0 {
|
if rowNum <= 0 {
|
||||||
return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum)
|
return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue()
|
||||||
return &PrimaryKeyStats{
|
return &PrimaryKeyStats{
|
||||||
FieldID: fieldID,
|
FieldID: fieldID,
|
||||||
PkType: pkType,
|
PkType: pkType,
|
||||||
BF: bloom.NewWithEstimates(uint(rowNum), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
BFType: bloomfilter.BFTypeFromString(bfType),
|
||||||
|
BF: bloomfilter.NewBloomFilterWithType(
|
||||||
|
uint(rowNum),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
bfType),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,10 +246,15 @@ func (sw *StatsWriter) Generate(stats *PrimaryKeyStats) error {
|
||||||
|
|
||||||
// GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer
|
// GenerateByData writes Int64Stats or StringStats from @msgs with @fieldID to @buffer
|
||||||
func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error {
|
func (sw *StatsWriter) GenerateByData(fieldID int64, pkType schemapb.DataType, msgs FieldData) error {
|
||||||
|
bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue()
|
||||||
stats := &PrimaryKeyStats{
|
stats := &PrimaryKeyStats{
|
||||||
FieldID: fieldID,
|
FieldID: fieldID,
|
||||||
PkType: int64(pkType),
|
PkType: int64(pkType),
|
||||||
BF: bloom.NewWithEstimates(uint(msgs.RowNum()), paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()),
|
BFType: bloomfilter.BFTypeFromString(bfType),
|
||||||
|
BF: bloomfilter.NewBloomFilterWithType(
|
||||||
|
uint(msgs.RowNum()),
|
||||||
|
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
|
||||||
|
bfType),
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.UpdateByMsgs(msgs)
|
stats.UpdateByMsgs(msgs)
|
||||||
|
|
|
@ -20,12 +20,13 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/bits-and-blooms/bloom/v3"
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/bloomfilter"
|
||||||
"github.com/milvus-io/milvus/pkg/common"
|
"github.com/milvus-io/milvus/pkg/common"
|
||||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestStatsWriter_Int64PrimaryKey(t *testing.T) {
|
func TestStatsWriter_Int64PrimaryKey(t *testing.T) {
|
||||||
|
@ -124,11 +125,13 @@ func TestStatsWriter_UpgradePrimaryKey(t *testing.T) {
|
||||||
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
|
Data: []int64{1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bfType := paramtable.Get().CommonCfg.BloomFilterType.GetValue()
|
||||||
stats := &PrimaryKeyStats{
|
stats := &PrimaryKeyStats{
|
||||||
FieldID: common.RowIDField,
|
FieldID: common.RowIDField,
|
||||||
Min: 1,
|
Min: 1,
|
||||||
Max: 9,
|
Max: 9,
|
||||||
BF: bloom.NewWithEstimates(100000, 0.05),
|
BFType: bloomfilter.BFTypeFromString(bfType),
|
||||||
|
BF: bloomfilter.NewBloomFilterWithType(100000, 0.05, bfType),
|
||||||
}
|
}
|
||||||
|
|
||||||
b := make([]byte, 8)
|
b := make([]byte, 8)
|
||||||
|
@ -174,3 +177,30 @@ func TestDeserializeEmptyStats(t *testing.T) {
|
||||||
_, err := DeserializeStats([]*Blob{blob})
|
_, err := DeserializeStats([]*Blob{blob})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMarshalStats(t *testing.T) {
|
||||||
|
stat, err := NewPrimaryKeyStats(1, int64(schemapb.DataType_Int64), 100000)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
stat.Update(NewInt64PrimaryKey(int64(i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
sw := &StatsWriter{}
|
||||||
|
sw.GenerateList([]*PrimaryKeyStats{stat})
|
||||||
|
bytes := sw.GetBuffer()
|
||||||
|
|
||||||
|
sr := &StatsReader{}
|
||||||
|
sr.SetBuffer(bytes)
|
||||||
|
stat1, err := sr.GetPrimaryKeyStatsList()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, len(stat1))
|
||||||
|
assert.Equal(t, stat.Min, stat1[0].Min)
|
||||||
|
assert.Equal(t, stat.Max, stat1[0].Max)
|
||||||
|
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
b := make([]byte, 8)
|
||||||
|
common.Endian.PutUint64(b, uint64(i))
|
||||||
|
assert.True(t, stat1[0].BF.Test(b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,339 @@
|
||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
package bloomfilter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/bits-and-blooms/bloom/v3"
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
|
"github.com/greatroar/blobloom"
|
||||||
|
"github.com/pingcap/log"
|
||||||
|
"github.com/zeebo/xxh3"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BFType int
|
||||||
|
|
||||||
|
var AlwaysTrueBloomFilter = &alwaysTrueBloomFilter{}
|
||||||
|
|
||||||
|
const (
|
||||||
|
UnsupportedBFName = "Unsupported BloomFilter"
|
||||||
|
BlockBFName = "BlockedBloomFilter"
|
||||||
|
BasicBFName = "BasicBloomFilter"
|
||||||
|
AlwaysTrueBFName = "AlwaysTrueBloomFilter"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
UnsupportedBF BFType = iota + 1
|
||||||
|
AlwaysTrueBF // empty bloom filter
|
||||||
|
BasicBF
|
||||||
|
BlockedBF
|
||||||
|
)
|
||||||
|
|
||||||
|
var bfNames = map[BFType]string{
|
||||||
|
BasicBF: BasicBFName,
|
||||||
|
BlockedBF: BlockBFName,
|
||||||
|
AlwaysTrueBF: AlwaysTrueBFName,
|
||||||
|
UnsupportedBF: UnsupportedBFName,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t BFType) String() string {
|
||||||
|
return bfNames[t]
|
||||||
|
}
|
||||||
|
|
||||||
|
func BFTypeFromString(name string) BFType {
|
||||||
|
switch name {
|
||||||
|
case BasicBFName:
|
||||||
|
return BasicBF
|
||||||
|
case BlockBFName:
|
||||||
|
return BlockedBF
|
||||||
|
case AlwaysTrueBFName:
|
||||||
|
return AlwaysTrueBF
|
||||||
|
default:
|
||||||
|
return UnsupportedBF
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type BloomFilterInterface interface {
|
||||||
|
Type() BFType
|
||||||
|
Cap() uint
|
||||||
|
K() uint
|
||||||
|
Add(data []byte)
|
||||||
|
AddString(data string)
|
||||||
|
Test(data []byte) bool
|
||||||
|
TestString(data string) bool
|
||||||
|
TestLocations(locs []uint64) bool
|
||||||
|
BatchTestLocations(locs [][]uint64, hit []bool) []bool
|
||||||
|
MarshalJSON() ([]byte, error)
|
||||||
|
UnmarshalJSON(data []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type basicBloomFilter struct {
|
||||||
|
inner *bloom.BloomFilter
|
||||||
|
k uint
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBasicBloomFilter(capacity uint, fp float64) *basicBloomFilter {
|
||||||
|
inner := bloom.NewWithEstimates(capacity, fp)
|
||||||
|
return &basicBloomFilter{
|
||||||
|
inner: inner,
|
||||||
|
k: inner.K(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) Type() BFType {
|
||||||
|
return BasicBF
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) Cap() uint {
|
||||||
|
return b.inner.Cap()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) K() uint {
|
||||||
|
return b.k
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) Add(data []byte) {
|
||||||
|
b.inner.Add(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) AddString(data string) {
|
||||||
|
b.inner.AddString(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) Test(data []byte) bool {
|
||||||
|
return b.inner.Test(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) TestString(data string) bool {
|
||||||
|
return b.inner.TestString(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) TestLocations(locs []uint64) bool {
|
||||||
|
return b.inner.TestLocations(locs[:b.k])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool {
|
||||||
|
ret := make([]bool, len(locs))
|
||||||
|
for i := range hits {
|
||||||
|
if !hits[i] {
|
||||||
|
if uint(len(locs[i])) < b.k {
|
||||||
|
ret[i] = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ret[i] = b.inner.TestLocations(locs[i][:b.k])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b basicBloomFilter) MarshalJSON() ([]byte, error) {
|
||||||
|
return b.inner.MarshalJSON()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *basicBloomFilter) UnmarshalJSON(data []byte) error {
|
||||||
|
inner := &bloom.BloomFilter{}
|
||||||
|
inner.UnmarshalJSON(data)
|
||||||
|
b.inner = inner
|
||||||
|
b.k = inner.K()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// impl Blocked Bloom filter with blobloom and xxh3 hash
|
||||||
|
type blockedBloomFilter struct {
|
||||||
|
inner *blobloom.Filter
|
||||||
|
k uint
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBlockedBloomFilter(capacity uint, fp float64) *blockedBloomFilter {
|
||||||
|
inner := blobloom.NewOptimized(blobloom.Config{
|
||||||
|
Capacity: uint64(capacity),
|
||||||
|
FPRate: fp,
|
||||||
|
})
|
||||||
|
return &blockedBloomFilter{
|
||||||
|
inner: inner,
|
||||||
|
k: inner.K(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) Type() BFType {
|
||||||
|
return BlockedBF
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) Cap() uint {
|
||||||
|
return uint(b.inner.NumBits())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) K() uint {
|
||||||
|
return b.k
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) Add(data []byte) {
|
||||||
|
loc := xxh3.Hash(data)
|
||||||
|
b.inner.Add(loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) AddString(data string) {
|
||||||
|
h := xxh3.HashString(data)
|
||||||
|
b.inner.Add(h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) Test(data []byte) bool {
|
||||||
|
loc := xxh3.Hash(data)
|
||||||
|
return b.inner.Has(loc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) TestString(data string) bool {
|
||||||
|
h := xxh3.HashString(data)
|
||||||
|
return b.inner.Has(h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) TestLocations(locs []uint64) bool {
|
||||||
|
// for block bf, just cache it's hash result as locations
|
||||||
|
if len(locs) != 1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return b.inner.Has(locs[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool {
|
||||||
|
ret := make([]bool, len(locs))
|
||||||
|
for i := range hits {
|
||||||
|
if !hits[i] {
|
||||||
|
if len(locs[i]) != 1 {
|
||||||
|
ret[i] = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ret[i] = b.inner.Has(locs[i][0])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b blockedBloomFilter) MarshalJSON() ([]byte, error) {
|
||||||
|
return b.inner.MarshalJSON()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *blockedBloomFilter) UnmarshalJSON(data []byte) error {
|
||||||
|
inner := &blobloom.Filter{}
|
||||||
|
inner.UnmarshalJSON(data)
|
||||||
|
b.inner = inner
|
||||||
|
b.k = inner.K()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// always true bloom filter is used when deserialize stat log failed.
|
||||||
|
// Notice: add item to empty bloom filter is not permitted. and all Test Func will return false positive.
|
||||||
|
type alwaysTrueBloomFilter struct{}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) Type() BFType {
|
||||||
|
return AlwaysTrueBF
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) Cap() uint {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) K() uint {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) Add(data []byte) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) AddString(data string) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) Test(data []byte) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) TestString(data string) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) TestLocations(locs []uint64) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool {
|
||||||
|
ret := make([]bool, len(locs))
|
||||||
|
for i := 0; i < len(hits); i++ {
|
||||||
|
ret[i] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) {
|
||||||
|
return []byte{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *alwaysTrueBloomFilter) UnmarshalJSON(data []byte) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBloomFilterWithType(capacity uint, fp float64, typeName string) BloomFilterInterface {
|
||||||
|
bfType := BFTypeFromString(typeName)
|
||||||
|
switch bfType {
|
||||||
|
case BlockedBF:
|
||||||
|
return newBlockedBloomFilter(capacity, fp)
|
||||||
|
case BasicBF:
|
||||||
|
return newBasicBloomFilter(capacity, fp)
|
||||||
|
default:
|
||||||
|
log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", typeName))
|
||||||
|
return newBlockedBloomFilter(capacity, fp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnmarshalJSON(data []byte, bfType BFType) (BloomFilterInterface, error) {
|
||||||
|
switch bfType {
|
||||||
|
case BlockedBF:
|
||||||
|
bf := &blockedBloomFilter{}
|
||||||
|
err := json.Unmarshal(data, bf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter")
|
||||||
|
}
|
||||||
|
return bf, nil
|
||||||
|
case BasicBF:
|
||||||
|
bf := &basicBloomFilter{}
|
||||||
|
err := json.Unmarshal(data, bf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to unmarshal blocked bloom filter")
|
||||||
|
}
|
||||||
|
return bf, nil
|
||||||
|
case AlwaysTrueBF:
|
||||||
|
return AlwaysTrueBloomFilter, nil
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("unsupported bloom filter type: %d", bfType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Locations(data []byte, k uint, bfType BFType) []uint64 {
|
||||||
|
switch bfType {
|
||||||
|
case BasicBF:
|
||||||
|
return bloom.Locations(data, k)
|
||||||
|
case BlockedBF:
|
||||||
|
return []uint64{xxh3.Hash(data)}
|
||||||
|
case AlwaysTrueBF:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
log.Info("unsupported bloom filter type, using block bloom filter", zap.String("type", bfType.String()))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,313 @@
|
||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
package bloomfilter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/bits-and-blooms/bloom/v3"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-storage/go/common/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPerformance(t *testing.T) {
|
||||||
|
capacity := 1000000
|
||||||
|
fpr := 0.001
|
||||||
|
|
||||||
|
keys := make([][]byte, 0)
|
||||||
|
for i := 0; i < capacity; i++ {
|
||||||
|
keys = append(keys, []byte(fmt.Sprintf("key%d", i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
bf1 := newBlockedBloomFilter(uint(capacity), fpr)
|
||||||
|
start1 := time.Now()
|
||||||
|
for _, key := range keys {
|
||||||
|
bf1.Add(key)
|
||||||
|
}
|
||||||
|
log.Info("Block BF construct time", zap.Duration("time", time.Since(start1)))
|
||||||
|
data, err := bf1.MarshalJSON()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
log.Info("Block BF size", zap.Int("size", len(data)))
|
||||||
|
|
||||||
|
start2 := time.Now()
|
||||||
|
for _, key := range keys {
|
||||||
|
bf1.Test(key)
|
||||||
|
}
|
||||||
|
log.Info("Block BF Test cost", zap.Duration("time", time.Since(start2)))
|
||||||
|
|
||||||
|
bf2 := newBasicBloomFilter(uint(capacity), fpr)
|
||||||
|
start3 := time.Now()
|
||||||
|
for _, key := range keys {
|
||||||
|
bf2.Add(key)
|
||||||
|
}
|
||||||
|
log.Info("Basic BF construct time", zap.Duration("time", time.Since(start3)))
|
||||||
|
data, err = bf2.MarshalJSON()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
log.Info("Basic BF size", zap.Int("size", len(data)))
|
||||||
|
|
||||||
|
start4 := time.Now()
|
||||||
|
for _, key := range keys {
|
||||||
|
bf2.Test(key)
|
||||||
|
}
|
||||||
|
log.Info("Basic BF Test cost", zap.Duration("time", time.Since(start4)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPerformance_MultiBF(t *testing.T) {
|
||||||
|
capacity := 100000
|
||||||
|
fpr := 0.001
|
||||||
|
|
||||||
|
testKeySize := 100000
|
||||||
|
testKeys := make([][]byte, 0)
|
||||||
|
for i := 0; i < testKeySize; i++ {
|
||||||
|
testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))))
|
||||||
|
}
|
||||||
|
|
||||||
|
bfNum := 100
|
||||||
|
bfs1 := make([]*blockedBloomFilter, 0)
|
||||||
|
start1 := time.Now()
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bf1 := newBlockedBloomFilter(uint(capacity), fpr)
|
||||||
|
for j := 0; j < capacity; j++ {
|
||||||
|
key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))
|
||||||
|
bf1.Add([]byte(key))
|
||||||
|
}
|
||||||
|
bfs1 = append(bfs1, bf1)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1)))
|
||||||
|
|
||||||
|
start3 := time.Now()
|
||||||
|
for _, key := range testKeys {
|
||||||
|
locations := Locations(key, bfs1[0].K(), BlockedBF)
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bfs1[i].TestLocations(locations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)))
|
||||||
|
|
||||||
|
bfs2 := make([]*basicBloomFilter, 0)
|
||||||
|
start1 = time.Now()
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bf2 := newBasicBloomFilter(uint(capacity), fpr)
|
||||||
|
for _, key := range testKeys {
|
||||||
|
bf2.Add(key)
|
||||||
|
}
|
||||||
|
bfs2 = append(bfs2, bf2)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1)))
|
||||||
|
|
||||||
|
start3 = time.Now()
|
||||||
|
for _, key := range testKeys {
|
||||||
|
locations := Locations(key, bfs1[0].K(), BasicBF)
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bfs2[i].TestLocations(locations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPerformance_BatchTestLocations(t *testing.T) {
|
||||||
|
capacity := 100000
|
||||||
|
fpr := 0.001
|
||||||
|
|
||||||
|
testKeySize := 100000
|
||||||
|
testKeys := make([][]byte, 0)
|
||||||
|
for i := 0; i < testKeySize; i++ {
|
||||||
|
testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))))
|
||||||
|
}
|
||||||
|
|
||||||
|
batchSize := 1000
|
||||||
|
|
||||||
|
bfNum := 100
|
||||||
|
bfs1 := make([]BloomFilterInterface, 0)
|
||||||
|
start1 := time.Now()
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bf1 := NewBloomFilterWithType(uint(capacity), fpr, BlockBFName)
|
||||||
|
for j := 0; j < capacity; j++ {
|
||||||
|
key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))
|
||||||
|
bf1.Add([]byte(key))
|
||||||
|
}
|
||||||
|
bfs1 = append(bfs1, bf1)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1)))
|
||||||
|
|
||||||
|
start3 := time.Now()
|
||||||
|
for _, key := range testKeys {
|
||||||
|
locations := Locations(key, bfs1[0].K(), BlockedBF)
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bfs1[i].TestLocations(locations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)))
|
||||||
|
|
||||||
|
start3 = time.Now()
|
||||||
|
for i := 0; i < testKeySize; i += batchSize {
|
||||||
|
endIdx := i + batchSize
|
||||||
|
if endIdx > testKeySize {
|
||||||
|
endIdx = testKeySize
|
||||||
|
}
|
||||||
|
locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 {
|
||||||
|
return Locations(key, bfs1[0].K(), BlockedBF)
|
||||||
|
})
|
||||||
|
hits := make([]bool, batchSize)
|
||||||
|
for j := 0; j < bfNum; j++ {
|
||||||
|
bfs1[j].BatchTestLocations(locations, hits)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3)))
|
||||||
|
|
||||||
|
bfs2 := make([]BloomFilterInterface, 0)
|
||||||
|
start1 = time.Now()
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bf2 := NewBloomFilterWithType(uint(capacity), fpr, BasicBFName)
|
||||||
|
for j := 0; j < capacity; j++ {
|
||||||
|
key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))
|
||||||
|
bf2.Add([]byte(key))
|
||||||
|
}
|
||||||
|
bfs2 = append(bfs2, bf2)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1)))
|
||||||
|
|
||||||
|
start3 = time.Now()
|
||||||
|
for _, key := range testKeys {
|
||||||
|
locations := Locations(key, bfs2[0].K(), BasicBF)
|
||||||
|
for i := 0; i < bfNum; i++ {
|
||||||
|
bfs2[i].TestLocations(locations)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3)))
|
||||||
|
|
||||||
|
start3 = time.Now()
|
||||||
|
for i := 0; i < testKeySize; i += batchSize {
|
||||||
|
endIdx := i + batchSize
|
||||||
|
if endIdx > testKeySize {
|
||||||
|
endIdx = testKeySize
|
||||||
|
}
|
||||||
|
locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 {
|
||||||
|
return Locations(key, bfs2[0].K(), BasicBF)
|
||||||
|
})
|
||||||
|
hits := make([]bool, batchSize)
|
||||||
|
for j := 0; j < bfNum; j++ {
|
||||||
|
bfs2[j].BatchTestLocations(locations, hits)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPerformance_Capacity(t *testing.T) {
|
||||||
|
fpr := 0.001
|
||||||
|
|
||||||
|
for _, capacity := range []int64{100, 1000, 10000, 100000, 1000000} {
|
||||||
|
keys := make([][]byte, 0)
|
||||||
|
for i := 0; i < int(capacity); i++ {
|
||||||
|
keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))))
|
||||||
|
}
|
||||||
|
|
||||||
|
start1 := time.Now()
|
||||||
|
bf1 := newBlockedBloomFilter(uint(capacity), fpr)
|
||||||
|
for _, key := range keys {
|
||||||
|
bf1.Add(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1)))
|
||||||
|
|
||||||
|
testKeys := make([][]byte, 0)
|
||||||
|
for i := 0; i < 10000; i++ {
|
||||||
|
testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))))
|
||||||
|
}
|
||||||
|
|
||||||
|
start3 := time.Now()
|
||||||
|
for _, key := range testKeys {
|
||||||
|
locations := Locations(key, bf1.K(), bf1.Type())
|
||||||
|
bf1.TestLocations(locations)
|
||||||
|
}
|
||||||
|
_, k := bloom.EstimateParameters(uint(capacity), fpr)
|
||||||
|
log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)), zap.Int("k", int(k)), zap.Int64("capacity", capacity))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMarshal(t *testing.T) {
|
||||||
|
capacity := 200000
|
||||||
|
fpr := 0.001
|
||||||
|
|
||||||
|
keys := make([][]byte, 0)
|
||||||
|
for i := 0; i < capacity; i++ {
|
||||||
|
keys = append(keys, []byte(fmt.Sprintf("key%d", i)))
|
||||||
|
}
|
||||||
|
|
||||||
|
// test basic bf
|
||||||
|
basicBF := newBasicBloomFilter(uint(capacity), fpr)
|
||||||
|
for _, key := range keys {
|
||||||
|
basicBF.Add(key)
|
||||||
|
}
|
||||||
|
data, err := basicBF.MarshalJSON()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
basicBF2, err := UnmarshalJSON(data, BasicBF)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, basicBF.Type(), basicBF2.Type())
|
||||||
|
|
||||||
|
for _, key := range keys {
|
||||||
|
assert.True(t, basicBF2.Test(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// test block bf
|
||||||
|
blockBF := newBlockedBloomFilter(uint(capacity), fpr)
|
||||||
|
for _, key := range keys {
|
||||||
|
blockBF.Add(key)
|
||||||
|
}
|
||||||
|
data, err = blockBF.MarshalJSON()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
blockBF2, err := UnmarshalJSON(data, BlockedBF)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, blockBF.Type(), blockBF.Type())
|
||||||
|
for _, key := range keys {
|
||||||
|
assert.True(t, blockBF2.Test(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// test compatible with bits-and-blooms/bloom
|
||||||
|
bf := bloom.NewWithEstimates(uint(capacity), fpr)
|
||||||
|
for _, key := range keys {
|
||||||
|
bf.Add(key)
|
||||||
|
}
|
||||||
|
data, err = bf.MarshalJSON()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
bf2, err := UnmarshalJSON(data, BasicBF)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
for _, key := range keys {
|
||||||
|
assert.True(t, bf2.Test(key))
|
||||||
|
}
|
||||||
|
|
||||||
|
// test empty bloom filter
|
||||||
|
emptyBF := AlwaysTrueBloomFilter
|
||||||
|
for _, key := range keys {
|
||||||
|
bf.Add(key)
|
||||||
|
}
|
||||||
|
data, err = emptyBF.MarshalJSON()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
emptyBF2, err := UnmarshalJSON(data, AlwaysTrueBF)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
for _, key := range keys {
|
||||||
|
assert.True(t, emptyBF2.Test(key))
|
||||||
|
}
|
||||||
|
}
|
|
@ -244,6 +244,7 @@ type commonConfig struct {
|
||||||
TraceLogMode ParamItem `refreshable:"true"`
|
TraceLogMode ParamItem `refreshable:"true"`
|
||||||
BloomFilterSize ParamItem `refreshable:"true"`
|
BloomFilterSize ParamItem `refreshable:"true"`
|
||||||
MaxBloomFalsePositive ParamItem `refreshable:"true"`
|
MaxBloomFalsePositive ParamItem `refreshable:"true"`
|
||||||
|
BloomFilterType ParamItem `refreshable:"true"`
|
||||||
BloomFilterApplyBatchSize ParamItem `refreshable:"true"`
|
BloomFilterApplyBatchSize ParamItem `refreshable:"true"`
|
||||||
PanicWhenPluginFail ParamItem `refreshable:"false"`
|
PanicWhenPluginFail ParamItem `refreshable:"false"`
|
||||||
|
|
||||||
|
@ -730,6 +731,15 @@ like the old password verification when updating the credential`,
|
||||||
}
|
}
|
||||||
p.BloomFilterSize.Init(base.mgr)
|
p.BloomFilterSize.Init(base.mgr)
|
||||||
|
|
||||||
|
p.BloomFilterType = ParamItem{
|
||||||
|
Key: "common.bloomFilterType",
|
||||||
|
Version: "2.4.3",
|
||||||
|
DefaultValue: "BasicBloomFilter",
|
||||||
|
Doc: "bloom filter type, support BasicBloomFilter and BlockedBloomFilter",
|
||||||
|
Export: true,
|
||||||
|
}
|
||||||
|
p.BloomFilterType.Init(base.mgr)
|
||||||
|
|
||||||
p.MaxBloomFalsePositive = ParamItem{
|
p.MaxBloomFalsePositive = ParamItem{
|
||||||
Key: "common.maxBloomFalsePositive",
|
Key: "common.maxBloomFalsePositive",
|
||||||
Version: "2.3.2",
|
Version: "2.3.2",
|
||||||
|
|
|
@ -573,6 +573,7 @@ func TestCachedParam(t *testing.T) {
|
||||||
|
|
||||||
assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint())
|
assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint())
|
||||||
assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint())
|
assert.Equal(t, uint(100000), params.CommonCfg.BloomFilterSize.GetAsUint())
|
||||||
|
assert.Equal(t, "BasicBloomFilter", params.CommonCfg.BloomFilterType.GetValue())
|
||||||
|
|
||||||
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
|
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
|
||||||
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
|
assert.Equal(t, uint64(8388608), params.ServiceParam.MQCfg.PursuitBufferSize.GetAsUint64())
|
||||||
|
|
|
@ -0,0 +1,196 @@
|
||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package bloomfilter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
"github.com/samber/lo"
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
|
"github.com/milvus-io/milvus/pkg/log"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/metric"
|
||||||
|
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||||
|
"github.com/milvus-io/milvus/tests/integration"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BloomFilterTestSuit struct {
|
||||||
|
integration.MiniClusterSuite
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BloomFilterTestSuit) SetupSuite() {
|
||||||
|
paramtable.Init()
|
||||||
|
paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000")
|
||||||
|
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1")
|
||||||
|
|
||||||
|
// disable compaction
|
||||||
|
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
|
||||||
|
|
||||||
|
s.Require().NoError(s.SetupEmbedEtcd())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BloomFilterTestSuit) TearDownSuite() {
|
||||||
|
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)
|
||||||
|
s.MiniClusterSuite.TearDownSuite()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BloomFilterTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
const (
|
||||||
|
dim = 128
|
||||||
|
dbName = ""
|
||||||
|
)
|
||||||
|
|
||||||
|
schema := integration.ConstructSchema(collectionName, dim, true)
|
||||||
|
marshaledSchema, err := proto.Marshal(schema)
|
||||||
|
s.NoError(err)
|
||||||
|
|
||||||
|
createCollectionStatus, err := s.Cluster.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
Schema: marshaledSchema,
|
||||||
|
ShardsNum: int32(channelNum),
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(createCollectionStatus))
|
||||||
|
|
||||||
|
log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus))
|
||||||
|
showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(showCollectionsResp.Status))
|
||||||
|
log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp))
|
||||||
|
|
||||||
|
for i := 0; i < segmentNum; i++ {
|
||||||
|
// change bf type in real time
|
||||||
|
if i%2 == 0 {
|
||||||
|
paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BasicBloomFilter")
|
||||||
|
} else {
|
||||||
|
paramtable.Get().Save(paramtable.Get().CommonCfg.BloomFilterType.Key, "BlockedBloomFilter")
|
||||||
|
}
|
||||||
|
|
||||||
|
fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, segmentRowNum, dim)
|
||||||
|
hashKeys := integration.GenerateHashKeys(segmentRowNum)
|
||||||
|
insertResult, err := s.Cluster.Proxy.Insert(ctx, &milvuspb.InsertRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
FieldsData: []*schemapb.FieldData{fVecColumn},
|
||||||
|
HashKeys: hashKeys,
|
||||||
|
NumRows: uint32(segmentRowNum),
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(insertResult.Status))
|
||||||
|
|
||||||
|
if segmentDeleteNum > 0 {
|
||||||
|
if segmentDeleteNum > segmentRowNum {
|
||||||
|
segmentDeleteNum = segmentRowNum
|
||||||
|
}
|
||||||
|
|
||||||
|
pks := insertResult.GetIDs().GetIntId().GetData()[:segmentDeleteNum]
|
||||||
|
log.Info("========================delete expr==================",
|
||||||
|
zap.Int("length of pk", len(pks)),
|
||||||
|
)
|
||||||
|
|
||||||
|
expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ","))
|
||||||
|
|
||||||
|
deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{
|
||||||
|
CollectionName: collectionName,
|
||||||
|
Expr: expr,
|
||||||
|
})
|
||||||
|
s.Require().NoError(err)
|
||||||
|
s.Require().True(merr.Ok(deleteResp.GetStatus()))
|
||||||
|
s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt())
|
||||||
|
}
|
||||||
|
|
||||||
|
// flush
|
||||||
|
flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionNames: []string{collectionName},
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
|
||||||
|
ids := segmentIDs.GetData()
|
||||||
|
s.Require().NotEmpty(segmentIDs)
|
||||||
|
s.Require().True(has)
|
||||||
|
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
|
||||||
|
s.True(has)
|
||||||
|
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create index
|
||||||
|
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
|
||||||
|
CollectionName: collectionName,
|
||||||
|
FieldName: integration.FloatVecField,
|
||||||
|
IndexName: "_default",
|
||||||
|
ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.L2),
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(createIndexStatus))
|
||||||
|
s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField)
|
||||||
|
|
||||||
|
for i := 1; i < replica; i++ {
|
||||||
|
s.Cluster.AddQueryNode()
|
||||||
|
}
|
||||||
|
|
||||||
|
// load
|
||||||
|
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
|
||||||
|
DbName: dbName,
|
||||||
|
CollectionName: collectionName,
|
||||||
|
ReplicaNumber: int32(replica),
|
||||||
|
})
|
||||||
|
s.NoError(err)
|
||||||
|
s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
|
||||||
|
s.True(merr.Ok(loadStatus))
|
||||||
|
s.WaitForLoad(ctx, collectionName)
|
||||||
|
log.Info("initCollection Done")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BloomFilterTestSuit) TestLoadAndQuery() {
|
||||||
|
name := "test_balance_" + funcutil.GenRandomStr()
|
||||||
|
s.initCollection(name, 1, 2, 10, 2000, 500)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
queryResult, err := s.Cluster.Proxy.Query(ctx, &milvuspb.QueryRequest{
|
||||||
|
DbName: "",
|
||||||
|
CollectionName: name,
|
||||||
|
Expr: "",
|
||||||
|
OutputFields: []string{"count(*)"},
|
||||||
|
})
|
||||||
|
if !merr.Ok(queryResult.GetStatus()) {
|
||||||
|
log.Warn("queryResult fail reason", zap.String("reason", queryResult.GetStatus().GetReason()))
|
||||||
|
}
|
||||||
|
s.NoError(err)
|
||||||
|
s.True(merr.Ok(queryResult.GetStatus()))
|
||||||
|
numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0]
|
||||||
|
s.Equal(numEntities, int64(15000))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBloomFilter(t *testing.T) {
|
||||||
|
suite.Run(t, new(BloomFilterTestSuit))
|
||||||
|
}
|
Loading…
Reference in New Issue