enhance: Use BatchPkExist to reduce bloom filter func call cost (#33611)

issue:#33610

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/33792/head
wei liu 2024-06-13 17:57:56 +08:00 committed by GitHub
parent 2dfa752527
commit ab93d9c23d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 548 additions and 115 deletions

View File

@ -237,39 +237,44 @@ func (t *LevelZeroCompactionTask) splitDelta(
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End() defer span.End()
split := func(pk storage.PrimaryKey) []int64 {
lc := storage.NewLocationsCache(pk)
predicts := make([]int64, 0, len(segmentBfs))
for segmentID, bf := range segmentBfs {
if bf.PkExists(lc) {
predicts = append(predicts, segmentID)
}
}
return predicts
}
allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) {
return segment.GetSegmentID(), segment return segment.GetSegmentID(), segment
}) })
// spilt all delete data to segments // spilt all delete data to segments
targetSegBuffer := make(map[int64]*SegmentDeltaWriter) targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
for _, delta := range allDelta { split := func(pks []storage.PrimaryKey, pkTss []uint64) {
for i, pk := range delta.Pks { lc := storage.NewBatchLocationsCache(pks)
predicted := split(pk) for segmentID, bf := range segmentBfs {
hits := bf.BatchPkExist(lc)
for _, gotSeg := range predicted { for i, hit := range hits {
writer, ok := targetSegBuffer[gotSeg] if hit {
if !ok { writer, ok := targetSegBuffer[segmentID]
segment := allSeg[gotSeg] if !ok {
writer = NewSegmentDeltaWriter(gotSeg, segment.GetPartitionID(), segment.GetCollectionID()) segment := allSeg[segmentID]
targetSegBuffer[gotSeg] = writer writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[i], pkTss[i])
} }
writer.Write(pk, delta.Tss[i])
} }
} }
} }
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
// spilt all delete data to segments
for _, deleteBuffer := range allDelta {
pks := deleteBuffer.Pks
pkTss := deleteBuffer.Tss
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx])
}
}
return targetSegBuffer return targetSegBuffer
} }

View File

@ -70,6 +70,21 @@ func (bfs *BloomFilterSet) PkExists(lc *storage.LocationsCache) bool {
return false return false
} }
func (bfs *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
bfs.mut.RLock()
defer bfs.mut.RUnlock()
hits := make([]bool, lc.Size())
if bfs.current != nil {
bfs.current.BatchPkExist(lc, hits)
}
for _, bf := range bfs.history {
bf.BatchPkExist(lc, hits)
}
return hits
}
func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
bfs.mut.Lock() bfs.mut.Lock()
defer bfs.mut.Unlock() defer bfs.mut.Unlock()

View File

@ -19,6 +19,7 @@ package metacache
import ( import (
"testing" "testing"
"github.com/samber/lo"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
@ -68,6 +69,37 @@ func (s *BloomFilterSetSuite) TestWriteRead() {
for _, id := range ids { for _, id := range ids {
s.True(s.bfs.PkExists(storage.NewLocationsCache(storage.NewInt64PrimaryKey(id))), "pk shall return exist after update") s.True(s.bfs.PkExists(storage.NewLocationsCache(storage.NewInt64PrimaryKey(id))), "pk shall return exist after update")
} }
lc := storage.NewBatchLocationsCache(lo.Map(ids, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
hits := s.bfs.BatchPkExist(lc)
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
}
func (s *BloomFilterSetSuite) TestBatchPkExist() {
capacity := 100000
ids := make([]int64, 0)
for id := 0; id < capacity; id++ {
ids = append(ids, int64(id))
}
bfs := NewBloomFilterSetWithBatchSize(uint(capacity))
err := bfs.UpdatePKRange(s.GetFieldData(ids))
s.NoError(err)
batchSize := 1000
for i := 0; i < capacity; i += batchSize {
endIdx := i + batchSize
if endIdx > capacity {
endIdx = capacity
}
lc := storage.NewBatchLocationsCache(lo.Map(ids[i:endIdx], func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) }))
hits := bfs.BatchPkExist(lc)
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
}
} }
func (s *BloomFilterSetSuite) TestRoll() { func (s *BloomFilterSetSuite) TestRoll() {

View File

@ -1,8 +1,6 @@
package writebuffer package writebuffer
import ( import (
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/metacache"
@ -10,6 +8,7 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -32,28 +31,45 @@ func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
} }
func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
// distribute delete msg for previous data batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) {
lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) *storage.LocationsCache { return storage.NewLocationsCache(pk) }) lc := storage.NewBatchLocationsCache(pks)
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for _, segment := range segments { for _, segment := range segments {
if segment.CompactTo() != 0 { if segment.CompactTo() != 0 {
continue continue
} }
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
var deletePks []storage.PrimaryKey var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp var deleteTss []typeutil.Timestamp
for idx, lc := range lcs { for i, hit := range hits {
if segment.GetBloomFilterSet().PkExists(lc) { if hit {
deletePks = append(deletePks, pks[idx]) deletePks = append(deletePks, pks[i])
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) deleteTss = append(deleteTss, pkTss[i])
} }
} }
if len(deletePks) > 0 { if len(deletePks) > 0 {
wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos) wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos)
} }
} }
}
// distribute delete msg for previous data
for _, delMsg := range deleteMsgs {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
pkTss := delMsg.GetTimestamps()
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx], segments)
}
for _, inData := range groups { for _, inData := range groups {
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {

View File

@ -3,7 +3,6 @@ package writebuffer
import ( import (
"context" "context"
"github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -16,6 +15,7 @@ import (
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
@ -48,28 +48,43 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
} }
func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
for _, delMsg := range deleteMsgs { batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos) split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo, l0SegmentID int64) {
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) lc := storage.NewBatchLocationsCache(pks)
lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) *storage.LocationsCache { return storage.NewLocationsCache(pk) })
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for _, segment := range segments { for _, segment := range segments {
if segment.CompactTo() != 0 { if segment.CompactTo() != 0 {
continue continue
} }
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
var deletePks []storage.PrimaryKey var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp var deleteTss []typeutil.Timestamp
for idx, lc := range lcs { for i, hit := range hits {
if segment.GetBloomFilterSet().PkExists(lc) { if hit {
deletePks = append(deletePks, pks[idx]) deletePks = append(deletePks, pks[i])
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) deleteTss = append(deleteTss, pkTss[i])
} }
} }
if len(deletePks) > 0 { if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos) wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
} }
} }
}
for _, delMsg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
pkTss := delMsg.GetTimestamps()
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx], segments, l0SegmentID)
}
for _, inData := range groups { for _, inData := range groups {
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {

View File

@ -202,18 +202,23 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
// segment => delete data // segment => delete data
delRecords := make(map[int64]DeleteData) delRecords := make(map[int64]DeleteData)
for _, data := range deleteData { for _, data := range deleteData {
for i, pk := range data.PrimaryKeys { pks := data.PrimaryKeys
segmentIDs, err := sd.pkOracle.Get(pk, pkoracle.WithPartitionID(data.PartitionID)) batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
if err != nil { for idx := 0; idx < len(pks); idx += batchSize {
log.Warn("failed to get delete candidates for pk", zap.Any("pk", pk.GetValue())) endIdx := idx + batchSize
continue if endIdx > len(pks) {
endIdx = len(pks)
} }
for _, segmentID := range segmentIDs {
delRecord := delRecords[segmentID] pk2SegmentIDs := sd.pkOracle.BatchGet(pks[idx:endIdx], pkoracle.WithPartitionID(data.PartitionID))
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pk) for i, segmentIDs := range pk2SegmentIDs {
delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[i]) for _, segmentID := range segmentIDs {
delRecord.RowCount++ delRecord := delRecords[segmentID]
delRecords[segmentID] = delRecord delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[idx+i])
delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[idx+i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
}
} }
} }
} }
@ -522,11 +527,20 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
segment := segment.(*segments.L0Segment) segment := segment.(*segments.L0Segment)
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords() segmentPks, segmentTss := segment.DeleteRecords()
for i, pk := range segmentPks { batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
lc := storage.NewLocationsCache(pk) for idx := 0; idx < len(segmentPks); idx += batchSize {
if candidate.MayPkExist(lc) { endIdx := idx + batchSize
pks = append(pks, pk) if endIdx > len(segmentPks) {
tss = append(tss, segmentTss[i]) endIdx = len(segmentPks)
}
lc := storage.NewBatchLocationsCache(segmentPks[idx:endIdx])
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
pks = append(pks, segmentPks[idx+i])
tss = append(tss, segmentTss[idx+i])
}
} }
} }
} }
@ -634,10 +648,20 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID { if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID {
continue continue
} }
for i, pk := range record.DeleteData.Pks { pks := record.DeleteData.Pks
lc := storage.NewLocationsCache(pk) batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
if candidate.MayPkExist(lc) { for idx := 0; idx < len(pks); idx += batchSize {
deleteData.Append(pk, record.DeleteData.Tss[i]) endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
lc := storage.NewBatchLocationsCache(pks[idx:endIdx])
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
deleteData.Append(pks[idx+i], record.DeleteData.Tss[idx+i])
}
} }
} }
} }
@ -731,11 +755,21 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position
continue continue
} }
for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) { pks := storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys())
lc := storage.NewLocationsCache(pk) batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
if candidate.MayPkExist(lc) { for idx := 0; idx < len(pks); idx += batchSize {
result.Pks = append(result.Pks, pk) endIdx := idx + batchSize
result.Tss = append(result.Tss, dmsg.Timestamps[idx]) if endIdx > len(pks) {
endIdx = len(pks)
}
lc := storage.NewBatchLocationsCache(pks[idx:endIdx])
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
result.Pks = append(result.Pks, pks[idx+i])
result.Tss = append(result.Tss, dmsg.Timestamps[idx+i])
}
} }
} }
} }

View File

@ -261,6 +261,13 @@ func (s *DelegatorDataSuite) TestProcessDelete() {
ms.EXPECT().MayPkExist(mock.Anything).RunAndReturn(func(lc *storage.LocationsCache) bool { ms.EXPECT().MayPkExist(mock.Anything).RunAndReturn(func(lc *storage.LocationsCache) bool {
return lc.GetPk().EQ(storage.NewInt64PrimaryKey(10)) return lc.GetPk().EQ(storage.NewInt64PrimaryKey(10))
}) })
ms.EXPECT().BatchPkExist(mock.Anything).RunAndReturn(func(lc *storage.BatchLocationsCache) []bool {
hits := make([]bool, lc.Size())
for i, pk := range lc.PKs() {
hits[i] = pk.EQ(storage.NewInt64PrimaryKey(10))
}
return hits
})
return ms return ms
}) })
}, nil) }, nil)

View File

@ -59,6 +59,21 @@ func (s *BloomFilterSet) MayPkExist(lc *storage.LocationsCache) bool {
return false return false
} }
func (s *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
hits := make([]bool, lc.Size())
if s.currentStat != nil {
s.currentStat.BatchPkExist(lc, hits)
}
for _, bf := range s.historyStats {
bf.BatchPkExist(lc, hits)
}
return hits
}
// ID implement candidate. // ID implement candidate.
func (s *BloomFilterSet) ID() int64 { func (s *BloomFilterSet) ID() int64 {
return s.segmentID return s.segmentID
@ -80,12 +95,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) {
defer s.statsMutex.Unlock() defer s.statsMutex.Unlock()
if s.currentStat == nil { if s.currentStat == nil {
bf := bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue())
s.currentStat = &storage.PkStatistics{ s.currentStat = &storage.PkStatistics{
PkFilter: bf, PkFilter: bloomfilter.NewBloomFilterWithType(
paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(),
paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(),
paramtable.Get().CommonCfg.BloomFilterType.GetValue(),
),
} }
} }

View File

@ -93,4 +93,10 @@ func TestHistoricalStat(t *testing.T) {
ret := bfs.MayPkExist(lc) ret := bfs.MayPkExist(lc)
assert.True(t, ret) assert.True(t, ret)
} }
lc := storage.NewBatchLocationsCache(pks)
ret := bfs.BatchPkExist(lc)
for i := range ret {
assert.True(t, ret[i])
}
} }

View File

@ -27,6 +27,7 @@ import (
type Candidate interface { type Candidate interface {
// MayPkExist checks whether primary key could exists in this candidate. // MayPkExist checks whether primary key could exists in this candidate.
MayPkExist(lc *storage.LocationsCache) bool MayPkExist(lc *storage.LocationsCache) bool
BatchPkExist(lc *storage.BatchLocationsCache) []bool
ID() int64 ID() int64
Partition() int64 Partition() int64

View File

@ -33,6 +33,14 @@ func (k candidateKey) MayPkExist(lc *storage.LocationsCache) bool {
return true return true
} }
func (k candidateKey) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
ret := make([]bool, 0)
for i := 0; i < lc.Size(); i++ {
ret = append(ret, true)
}
return ret
}
// ID implements Candidate. // ID implements Candidate.
func (k candidateKey) ID() int64 { func (k candidateKey) ID() int64 {
return k.segmentID return k.segmentID

View File

@ -28,6 +28,7 @@ import (
type PkOracle interface { type PkOracle interface {
// GetCandidates returns segment candidates of which pk might belongs to. // GetCandidates returns segment candidates of which pk might belongs to.
Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error)
BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64
// RegisterCandidate adds candidate into pkOracle. // RegisterCandidate adds candidate into pkOracle.
Register(candidate Candidate, workerID int64) error Register(candidate Candidate, workerID int64) error
// RemoveCandidate removes candidate // RemoveCandidate removes candidate
@ -46,6 +47,7 @@ type pkOracle struct {
// Get implements PkOracle. // Get implements PkOracle.
func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) { func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) {
var result []int64 var result []int64
lc := storage.NewLocationsCache(pk)
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
for _, filter := range filters { for _, filter := range filters {
if !filter(candidate) { if !filter(candidate) {
@ -53,7 +55,6 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i
} }
} }
lc := storage.NewLocationsCache(pk)
if candidate.MayPkExist(lc) { if candidate.MayPkExist(lc) {
result = append(result, candidate.ID()) result = append(result, candidate.ID())
} }
@ -63,6 +64,29 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i
return result, nil return result, nil
} }
func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 {
result := make([][]int64, len(pks))
lc := storage.NewBatchLocationsCache(pks)
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
for _, filter := range filters {
if !filter(candidate) {
return true
}
}
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
result[i] = append(result[i], candidate.ID())
}
}
return true
})
return result
}
func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string { func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string {
return fmt.Sprintf("%s-%d-%d", candidate.Type().String(), workerID, candidate.ID()) return fmt.Sprintf("%s-%d-%d", candidate.Type().String(), workerID, candidate.ID())
} }

View File

@ -35,6 +35,50 @@ func (_m *MockSegment) EXPECT() *MockSegment_Expecter {
return &MockSegment_Expecter{mock: &_m.Mock} return &MockSegment_Expecter{mock: &_m.Mock}
} }
// BatchPkExist provides a mock function with given fields: lc
func (_m *MockSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
ret := _m.Called(lc)
var r0 []bool
if rf, ok := ret.Get(0).(func(*storage.BatchLocationsCache) []bool); ok {
r0 = rf(lc)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]bool)
}
}
return r0
}
// MockSegment_BatchTestLocationCache_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchPkExist'
type MockSegment_BatchTestLocationCache_Call struct {
*mock.Call
}
// BatchPkExist is a helper method to define mock.On call
// - lc *storage.BatchLocationsCache
func (_e *MockSegment_Expecter) BatchPkExist(lc interface{}) *MockSegment_BatchTestLocationCache_Call {
return &MockSegment_BatchTestLocationCache_Call{Call: _e.mock.On("BatchPkExist", lc)}
}
func (_c *MockSegment_BatchTestLocationCache_Call) Run(run func(lc *storage.BatchLocationsCache)) *MockSegment_BatchTestLocationCache_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(*storage.BatchLocationsCache))
})
return _c
}
func (_c *MockSegment_BatchTestLocationCache_Call) Return(_a0 []bool) *MockSegment_BatchTestLocationCache_Call {
_c.Call.Return(_a0)
return _c
}
func (_c *MockSegment_BatchTestLocationCache_Call) RunAndReturn(run func(*storage.BatchLocationsCache) []bool) *MockSegment_BatchTestLocationCache_Call {
_c.Call.Return(run)
return _c
}
// CASVersion provides a mock function with given fields: _a0, _a1 // CASVersion provides a mock function with given fields: _a0, _a1
func (_m *MockSegment) CASVersion(_a0 int64, _a1 int64) bool { func (_m *MockSegment) CASVersion(_a0 int64, _a1 int64) bool {
ret := _m.Called(_a0, _a1) ret := _m.Called(_a0, _a1)

View File

@ -186,8 +186,12 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) {
// MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter, // MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter,
// false otherwise, // false otherwise,
// may returns true even the PK doesn't exist actually // may returns true even the PK doesn't exist actually
func (s *baseSegment) MayPkExist(lc *storage.LocationsCache) bool { func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool {
return s.bloomFilterSet.MayPkExist(lc) return s.bloomFilterSet.MayPkExist(pk)
}
func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool {
return s.bloomFilterSet.BatchPkExist(lc)
} }
// ResourceUsageEstimate returns the estimated resource usage of the segment. // ResourceUsageEstimate returns the estimated resource usage of the segment.

View File

@ -84,6 +84,7 @@ type Segment interface {
// Bloom filter related // Bloom filter related
UpdateBloomFilter(pks []storage.PrimaryKey) UpdateBloomFilter(pks []storage.PrimaryKey)
MayPkExist(lc *storage.LocationsCache) bool MayPkExist(lc *storage.LocationsCache) bool
BatchPkExist(lc *storage.BatchLocationsCache) []bool
// Read operations // Read operations
Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error)

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/bloomfilter" "github.com/milvus-io/milvus/internal/util/bloomfilter"
@ -125,21 +126,6 @@ func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64 {
return nil return nil
} }
func (st *PkStatistics) TestLocations(pk PrimaryKey, locs []uint64) bool {
// empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
return false
}
// check bf first, TestLocation just do some bitset compute, cost is cheaper
if !st.PkFilter.TestLocations(locs) {
return false
}
// check pk range first, ugly but key it for now
return st.MinPK.LE(pk) && st.MaxPK.GE(pk)
}
func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool {
// empty pkStatics // empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
@ -155,6 +141,28 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool {
return st.MinPK.LE(lc.pk) && st.MaxPK.GE(lc.pk) return st.MinPK.LE(lc.pk) && st.MaxPK.GE(lc.pk)
} }
func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []bool {
// empty pkStatics
if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil {
return hits
}
// check bf first, TestLocation just do some bitset compute, cost is cheaper
locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type())
ret := st.PkFilter.BatchTestLocations(locations, hits)
// todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment,
// hits array will be removed after we merge bf in segment
pks := lc.PKs()
for i := range ret {
if !hits[i] {
hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i])
}
}
return hits
}
// LocationsCache is a helper struct caching pk bloom filter locations. // LocationsCache is a helper struct caching pk bloom filter locations.
// Note that this helper is not concurrent safe and shall be used in same goroutine. // Note that this helper is not concurrent safe and shall be used in same goroutine.
type LocationsCache struct { type LocationsCache struct {
@ -175,10 +183,11 @@ func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64
} }
return lc.basicBFLocations[:k] return lc.basicBFLocations[:k]
case bloomfilter.BlockedBF: case bloomfilter.BlockedBF:
if int(k) > len(lc.blockBFLocations) { // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value
lc.blockBFLocations = Locations(lc.pk, k, bfType) if len(lc.blockBFLocations) != 1 {
lc.blockBFLocations = Locations(lc.pk, 1, bfType)
} }
return lc.blockBFLocations[:k] return lc.blockBFLocations
default: default:
return nil return nil
} }
@ -189,3 +198,53 @@ func NewLocationsCache(pk PrimaryKey) *LocationsCache {
pk: pk, pk: pk,
} }
} }
type BatchLocationsCache struct {
pks []PrimaryKey
k uint
// for block bf
blockLocations [][]uint64
// for basic bf
basicLocations [][]uint64
}
func (lc *BatchLocationsCache) PKs() []PrimaryKey {
return lc.pks
}
func (lc *BatchLocationsCache) Size() int {
return len(lc.pks)
}
func (lc *BatchLocationsCache) Locations(k uint, bfType bloomfilter.BFType) [][]uint64 {
switch bfType {
case bloomfilter.BasicBF:
if k > lc.k {
lc.k = k
lc.basicLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 {
return Locations(pk, lc.k, bfType)
})
}
return lc.basicLocations
case bloomfilter.BlockedBF:
// for block bf, we only need cache the hash result, which is a uint and only compute once for any k value
if len(lc.blockLocations) != len(lc.pks) {
lc.blockLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 {
return Locations(pk, lc.k, bfType)
})
}
return lc.blockLocations
default:
return nil
}
}
func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache {
return &BatchLocationsCache{
pks: pks,
}
}

View File

@ -77,6 +77,7 @@ type BloomFilterInterface interface {
Test(data []byte) bool Test(data []byte) bool
TestString(data string) bool TestString(data string) bool
TestLocations(locs []uint64) bool TestLocations(locs []uint64) bool
BatchTestLocations(locs [][]uint64, hit []bool) []bool
MarshalJSON() ([]byte, error) MarshalJSON() ([]byte, error)
UnmarshalJSON(data []byte) error UnmarshalJSON(data []byte) error
} }
@ -126,6 +127,20 @@ func (b *basicBloomFilter) TestLocations(locs []uint64) bool {
return b.inner.TestLocations(locs[:b.k]) return b.inner.TestLocations(locs[:b.k])
} }
func (b *basicBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool {
ret := make([]bool, len(locs))
for i := range hits {
if !hits[i] {
if uint(len(locs[i])) < b.k {
ret[i] = true
continue
}
ret[i] = b.inner.TestLocations(locs[i][:b.k])
}
}
return ret
}
func (b basicBloomFilter) MarshalJSON() ([]byte, error) { func (b basicBloomFilter) MarshalJSON() ([]byte, error) {
return b.inner.MarshalJSON() return b.inner.MarshalJSON()
} }
@ -188,7 +203,25 @@ func (b *blockedBloomFilter) TestString(data string) bool {
} }
func (b *blockedBloomFilter) TestLocations(locs []uint64) bool { func (b *blockedBloomFilter) TestLocations(locs []uint64) bool {
return b.inner.TestLocations(locs) // for block bf, just cache it's hash result as locations
if len(locs) != 1 {
return true
}
return b.inner.Has(locs[0])
}
func (b *blockedBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool {
ret := make([]bool, len(locs))
for i := range hits {
if !hits[i] {
if len(locs[i]) != 1 {
ret[i] = true
continue
}
ret[i] = b.inner.Has(locs[i][0])
}
}
return ret
} }
func (b blockedBloomFilter) MarshalJSON() ([]byte, error) { func (b blockedBloomFilter) MarshalJSON() ([]byte, error) {
@ -238,6 +271,15 @@ func (b *alwaysTrueBloomFilter) TestLocations(locs []uint64) bool {
return true return true
} }
func (b *alwaysTrueBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool {
ret := make([]bool, len(locs))
for i := 0; i < len(hits); i++ {
ret[i] = true
}
return ret
}
func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) { func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) {
return []byte{}, nil return []byte{}, nil
} }
@ -287,7 +329,7 @@ func Locations(data []byte, k uint, bfType BFType) []uint64 {
case BasicBF: case BasicBF:
return bloom.Locations(data, k) return bloom.Locations(data, k)
case BlockedBF: case BlockedBF:
return blobloom.Locations(xxh3.Hash(data), k) return []uint64{xxh3.Hash(data)}
case AlwaysTrueBF: case AlwaysTrueBF:
return nil return nil
default: default:

View File

@ -21,6 +21,7 @@ import (
"time" "time"
"github.com/bits-and-blooms/bloom/v3" "github.com/bits-and-blooms/bloom/v3"
"github.com/samber/lo"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"go.uber.org/zap" "go.uber.org/zap"
@ -73,9 +74,10 @@ func TestPerformance_MultiBF(t *testing.T) {
capacity := 100000 capacity := 100000
fpr := 0.001 fpr := 0.001
keys := make([][]byte, 0) testKeySize := 100000
for i := 0; i < capacity; i++ { testKeys := make([][]byte, 0)
keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) for i := 0; i < testKeySize; i++ {
testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))))
} }
bfNum := 100 bfNum := 100
@ -83,8 +85,9 @@ func TestPerformance_MultiBF(t *testing.T) {
start1 := time.Now() start1 := time.Now()
for i := 0; i < bfNum; i++ { for i := 0; i < bfNum; i++ {
bf1 := newBlockedBloomFilter(uint(capacity), fpr) bf1 := newBlockedBloomFilter(uint(capacity), fpr)
for _, key := range keys { for j := 0; j < capacity; j++ {
bf1.Add(key) key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))
bf1.Add([]byte(key))
} }
bfs1 = append(bfs1, bf1) bfs1 = append(bfs1, bf1)
} }
@ -92,7 +95,7 @@ func TestPerformance_MultiBF(t *testing.T) {
log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1)))
start3 := time.Now() start3 := time.Now()
for _, key := range keys { for _, key := range testKeys {
locations := Locations(key, bfs1[0].K(), BlockedBF) locations := Locations(key, bfs1[0].K(), BlockedBF)
for i := 0; i < bfNum; i++ { for i := 0; i < bfNum; i++ {
bfs1[i].TestLocations(locations) bfs1[i].TestLocations(locations)
@ -104,7 +107,7 @@ func TestPerformance_MultiBF(t *testing.T) {
start1 = time.Now() start1 = time.Now()
for i := 0; i < bfNum; i++ { for i := 0; i < bfNum; i++ {
bf2 := newBasicBloomFilter(uint(capacity), fpr) bf2 := newBasicBloomFilter(uint(capacity), fpr)
for _, key := range keys { for _, key := range testKeys {
bf2.Add(key) bf2.Add(key)
} }
bfs2 = append(bfs2, bf2) bfs2 = append(bfs2, bf2)
@ -113,7 +116,7 @@ func TestPerformance_MultiBF(t *testing.T) {
log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1)))
start3 = time.Now() start3 = time.Now()
for _, key := range keys { for _, key := range testKeys {
locations := Locations(key, bfs1[0].K(), BasicBF) locations := Locations(key, bfs1[0].K(), BasicBF)
for i := 0; i < bfNum; i++ { for i := 0; i < bfNum; i++ {
bfs2[i].TestLocations(locations) bfs2[i].TestLocations(locations)
@ -122,6 +125,96 @@ func TestPerformance_MultiBF(t *testing.T) {
log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3)))
} }
func TestPerformance_BatchTestLocations(t *testing.T) {
capacity := 100000
fpr := 0.001
testKeySize := 100000
testKeys := make([][]byte, 0)
for i := 0; i < testKeySize; i++ {
testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))))
}
batchSize := 1000
bfNum := 100
bfs1 := make([]*blockedBloomFilter, 0)
start1 := time.Now()
for i := 0; i < bfNum; i++ {
bf1 := newBlockedBloomFilter(uint(capacity), fpr)
for j := 0; j < capacity; j++ {
key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))
bf1.Add([]byte(key))
}
bfs1 = append(bfs1, bf1)
}
log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1)))
start3 := time.Now()
for _, key := range testKeys {
locations := Locations(key, bfs1[0].K(), BlockedBF)
for i := 0; i < bfNum; i++ {
bfs1[i].TestLocations(locations)
}
}
log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3)))
start3 = time.Now()
for i := 0; i < testKeySize; i += batchSize {
endIdx := i + batchSize
if endIdx > testKeySize {
endIdx = testKeySize
}
locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 {
return Locations(key, bfs1[0].K(), BlockedBF)
})
hits := make([]bool, batchSize)
for j := 0; j < bfNum; j++ {
bfs1[j].BatchTestLocations(locations, hits)
}
}
log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3)))
bfs2 := make([]*basicBloomFilter, 0)
start1 = time.Now()
for i := 0; i < bfNum; i++ {
bf2 := newBasicBloomFilter(uint(capacity), fpr)
for j := 0; j < capacity; j++ {
key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i))
bf2.Add([]byte(key))
}
bfs2 = append(bfs2, bf2)
}
log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1)))
start3 = time.Now()
for _, key := range testKeys {
locations := Locations(key, bfs2[0].K(), BasicBF)
for i := 0; i < bfNum; i++ {
bfs2[i].TestLocations(locations)
}
}
log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3)))
start3 = time.Now()
for i := 0; i < testKeySize; i += batchSize {
endIdx := i + batchSize
if endIdx > testKeySize {
endIdx = testKeySize
}
locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 {
return Locations(key, bfs2[0].K(), BasicBF)
})
hits := make([]bool, batchSize)
for j := 0; j < bfNum; j++ {
bfs2[j].BatchTestLocations(locations, hits)
}
}
log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3)))
}
func TestPerformance_Capacity(t *testing.T) { func TestPerformance_Capacity(t *testing.T) {
fpr := 0.001 fpr := 0.001

View File

@ -239,15 +239,16 @@ type commonConfig struct {
LockSlowLogInfoThreshold ParamItem `refreshable:"true"` LockSlowLogInfoThreshold ParamItem `refreshable:"true"`
LockSlowLogWarnThreshold ParamItem `refreshable:"true"` LockSlowLogWarnThreshold ParamItem `refreshable:"true"`
StorageScheme ParamItem `refreshable:"false"` StorageScheme ParamItem `refreshable:"false"`
EnableStorageV2 ParamItem `refreshable:"false"` EnableStorageV2 ParamItem `refreshable:"false"`
StoragePathPrefix ParamItem `refreshable:"false"` StoragePathPrefix ParamItem `refreshable:"false"`
TTMsgEnabled ParamItem `refreshable:"true"` TTMsgEnabled ParamItem `refreshable:"true"`
TraceLogMode ParamItem `refreshable:"true"` TraceLogMode ParamItem `refreshable:"true"`
BloomFilterSize ParamItem `refreshable:"true"` BloomFilterSize ParamItem `refreshable:"true"`
BloomFilterType ParamItem `refreshable:"true"` BloomFilterType ParamItem `refreshable:"true"`
MaxBloomFalsePositive ParamItem `refreshable:"true"` MaxBloomFalsePositive ParamItem `refreshable:"true"`
PanicWhenPluginFail ParamItem `refreshable:"false"` BloomFilterApplyBatchSize ParamItem `refreshable:"true"`
PanicWhenPluginFail ParamItem `refreshable:"false"`
UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"`
UseVectorAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"`
@ -757,6 +758,15 @@ like the old password verification when updating the credential`,
} }
p.MaxBloomFalsePositive.Init(base.mgr) p.MaxBloomFalsePositive.Init(base.mgr)
p.BloomFilterApplyBatchSize = ParamItem{
Key: "common.bloomFilterApplyBatchSize",
Version: "2.4.4",
DefaultValue: "1000",
Doc: "batch size when to apply pk to bloom filter",
Export: true,
}
p.BloomFilterApplyBatchSize.Init(base.mgr)
p.PanicWhenPluginFail = ParamItem{ p.PanicWhenPluginFail = ParamItem{
Key: "common.panicWhenPluginFail", Key: "common.panicWhenPluginFail",
Version: "2.4.2", Version: "2.4.2",

View File

@ -108,6 +108,8 @@ func TestComponentParam(t *testing.T) {
params.Save("common.preCreatedTopic.timeticker", "timeticker") params.Save("common.preCreatedTopic.timeticker", "timeticker")
assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings()) assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings())
assert.Equal(t, uint(1000), params.CommonCfg.BloomFilterApplyBatchSize.GetAsUint())
}) })
t.Run("test rootCoordConfig", func(t *testing.T) { t.Run("test rootCoordConfig", func(t *testing.T) {