diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index 7703fb4874..126a80e593 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -237,39 +237,44 @@ func (t *LevelZeroCompactionTask) splitDelta( _, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta") defer span.End() - split := func(pk storage.PrimaryKey) []int64 { - lc := storage.NewLocationsCache(pk) - predicts := make([]int64, 0, len(segmentBfs)) - for segmentID, bf := range segmentBfs { - if bf.PkExists(lc) { - predicts = append(predicts, segmentID) - } - } - return predicts - } - allSeg := lo.Associate(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) (int64, *datapb.CompactionSegmentBinlogs) { return segment.GetSegmentID(), segment }) // spilt all delete data to segments targetSegBuffer := make(map[int64]*SegmentDeltaWriter) - for _, delta := range allDelta { - for i, pk := range delta.Pks { - predicted := split(pk) - - for _, gotSeg := range predicted { - writer, ok := targetSegBuffer[gotSeg] - if !ok { - segment := allSeg[gotSeg] - writer = NewSegmentDeltaWriter(gotSeg, segment.GetPartitionID(), segment.GetCollectionID()) - targetSegBuffer[gotSeg] = writer + split := func(pks []storage.PrimaryKey, pkTss []uint64) { + lc := storage.NewBatchLocationsCache(pks) + for segmentID, bf := range segmentBfs { + hits := bf.BatchPkExist(lc) + for i, hit := range hits { + if hit { + writer, ok := targetSegBuffer[segmentID] + if !ok { + segment := allSeg[segmentID] + writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID()) + targetSegBuffer[segmentID] = writer + } + writer.Write(pks[i], pkTss[i]) } - writer.Write(pk, delta.Tss[i]) } } } + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + // spilt all delete data to segments + for _, deleteBuffer := range allDelta { + pks := deleteBuffer.Pks + pkTss := deleteBuffer.Tss + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + split(pks[idx:endIdx], pkTss[idx:endIdx]) + } + } + return targetSegBuffer } diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 80b7bc0578..da7173e7e7 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -70,6 +70,21 @@ func (bfs *BloomFilterSet) PkExists(lc *storage.LocationsCache) bool { return false } +func (bfs *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + bfs.mut.RLock() + defer bfs.mut.RUnlock() + + hits := make([]bool, lc.Size()) + if bfs.current != nil { + bfs.current.BatchPkExist(lc, hits) + } + + for _, bf := range bfs.history { + bf.BatchPkExist(lc, hits) + } + return hits +} + func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { bfs.mut.Lock() defer bfs.mut.Unlock() diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go index 885eb1d37a..ec470137eb 100644 --- a/internal/datanode/metacache/bloom_filter_set_test.go +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -19,6 +19,7 @@ package metacache import ( "testing" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -68,6 +69,37 @@ func (s *BloomFilterSetSuite) TestWriteRead() { for _, id := range ids { s.True(s.bfs.PkExists(storage.NewLocationsCache(storage.NewInt64PrimaryKey(id))), "pk shall return exist after update") } + + lc := storage.NewBatchLocationsCache(lo.Map(ids, func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + hits := s.bfs.BatchPkExist(lc) + for _, hit := range hits { + s.True(hit, "pk shall return exist after batch update") + } +} + +func (s *BloomFilterSetSuite) TestBatchPkExist() { + capacity := 100000 + ids := make([]int64, 0) + for id := 0; id < capacity; id++ { + ids = append(ids, int64(id)) + } + + bfs := NewBloomFilterSetWithBatchSize(uint(capacity)) + err := bfs.UpdatePKRange(s.GetFieldData(ids)) + s.NoError(err) + + batchSize := 1000 + for i := 0; i < capacity; i += batchSize { + endIdx := i + batchSize + if endIdx > capacity { + endIdx = capacity + } + lc := storage.NewBatchLocationsCache(lo.Map(ids[i:endIdx], func(id int64, _ int) storage.PrimaryKey { return storage.NewInt64PrimaryKey(id) })) + hits := bfs.BatchPkExist(lc) + for _, hit := range hits { + s.True(hit, "pk shall return exist after batch update") + } + } } func (s *BloomFilterSetSuite) TestRoll() { diff --git a/internal/datanode/writebuffer/bf_write_buffer.go b/internal/datanode/writebuffer/bf_write_buffer.go index 322c4d5692..4a5ae9ae53 100644 --- a/internal/datanode/writebuffer/bf_write_buffer.go +++ b/internal/datanode/writebuffer/bf_write_buffer.go @@ -1,8 +1,6 @@ package writebuffer import ( - "github.com/samber/lo" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datanode/metacache" @@ -10,6 +8,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -32,28 +31,45 @@ func NewBFWriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca } func (wb *bfWriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { - // distribute delete msg for previous data - for _, delMsg := range deleteMsgs { - pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) - lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) *storage.LocationsCache { return storage.NewLocationsCache(pk) }) - segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), - metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + + split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo) { + lc := storage.NewBatchLocationsCache(pks) for _, segment := range segments { if segment.CompactTo() != 0 { continue } + + hits := segment.GetBloomFilterSet().BatchPkExist(lc) var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp - for idx, lc := range lcs { - if segment.GetBloomFilterSet().PkExists(lc) { - deletePks = append(deletePks, pks[idx]) - deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + for i, hit := range hits { + if hit { + deletePks = append(deletePks, pks[i]) + deleteTss = append(deleteTss, pkTss[i]) } } + if len(deletePks) > 0 { wb.bufferDelete(segment.SegmentID(), deletePks, deleteTss, startPos, endPos) } } + } + + // distribute delete msg for previous data + for _, delMsg := range deleteMsgs { + pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) + pkTss := delMsg.GetTimestamps() + segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + split(pks[idx:endIdx], pkTss[idx:endIdx], segments) + } for _, inData := range groups { if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { diff --git a/internal/datanode/writebuffer/l0_write_buffer.go b/internal/datanode/writebuffer/l0_write_buffer.go index 019994406f..1883a5e085 100644 --- a/internal/datanode/writebuffer/l0_write_buffer.go +++ b/internal/datanode/writebuffer/l0_write_buffer.go @@ -3,7 +3,6 @@ package writebuffer import ( "context" - "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -16,6 +15,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/retry" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -48,28 +48,43 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca } func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) { - for _, delMsg := range deleteMsgs { - l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos) - pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) - lcs := lo.Map(pks, func(pk storage.PrimaryKey, _ int) *storage.LocationsCache { return storage.NewLocationsCache(pk) }) - segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), - metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo, l0SegmentID int64) { + lc := storage.NewBatchLocationsCache(pks) for _, segment := range segments { if segment.CompactTo() != 0 { continue } + + hits := segment.GetBloomFilterSet().BatchPkExist(lc) var deletePks []storage.PrimaryKey var deleteTss []typeutil.Timestamp - for idx, lc := range lcs { - if segment.GetBloomFilterSet().PkExists(lc) { - deletePks = append(deletePks, pks[idx]) - deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx]) + for i, hit := range hits { + if hit { + deletePks = append(deletePks, pks[i]) + deleteTss = append(deleteTss, pkTss[i]) } } if len(deletePks) > 0 { wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos) } } + } + + for _, delMsg := range deleteMsgs { + l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos) + pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys()) + pkTss := delMsg.GetTimestamps() + segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID), + metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed)) + + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + split(pks[idx:endIdx], pkTss[idx:endIdx], segments, l0SegmentID) + } for _, inData := range groups { if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index 7aafe00f39..63521761cb 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -202,18 +202,23 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) { // segment => delete data delRecords := make(map[int64]DeleteData) for _, data := range deleteData { - for i, pk := range data.PrimaryKeys { - segmentIDs, err := sd.pkOracle.Get(pk, pkoracle.WithPartitionID(data.PartitionID)) - if err != nil { - log.Warn("failed to get delete candidates for pk", zap.Any("pk", pk.GetValue())) - continue + pks := data.PrimaryKeys + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) } - for _, segmentID := range segmentIDs { - delRecord := delRecords[segmentID] - delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pk) - delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[i]) - delRecord.RowCount++ - delRecords[segmentID] = delRecord + + pk2SegmentIDs := sd.pkOracle.BatchGet(pks[idx:endIdx], pkoracle.WithPartitionID(data.PartitionID)) + for i, segmentIDs := range pk2SegmentIDs { + for _, segmentID := range segmentIDs { + delRecord := delRecords[segmentID] + delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[idx+i]) + delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[idx+i]) + delRecord.RowCount++ + delRecords[segmentID] = delRecord + } } } } @@ -522,11 +527,20 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac segment := segment.(*segments.L0Segment) if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID { segmentPks, segmentTss := segment.DeleteRecords() - for i, pk := range segmentPks { - lc := storage.NewLocationsCache(pk) - if candidate.MayPkExist(lc) { - pks = append(pks, pk) - tss = append(tss, segmentTss[i]) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(segmentPks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(segmentPks) { + endIdx = len(segmentPks) + } + + lc := storage.NewBatchLocationsCache(segmentPks[idx:endIdx]) + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + pks = append(pks, segmentPks[idx+i]) + tss = append(tss, segmentTss[idx+i]) + } } } } @@ -634,10 +648,20 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context, if record.PartitionID != common.AllPartitionsID && candidate.Partition() != record.PartitionID { continue } - for i, pk := range record.DeleteData.Pks { - lc := storage.NewLocationsCache(pk) - if candidate.MayPkExist(lc) { - deleteData.Append(pk, record.DeleteData.Tss[i]) + pks := record.DeleteData.Pks + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + + lc := storage.NewBatchLocationsCache(pks[idx:endIdx]) + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + deleteData.Append(pks[idx+i], record.DeleteData.Tss[idx+i]) + } } } } @@ -731,11 +755,21 @@ func (sd *shardDelegator) readDeleteFromMsgstream(ctx context.Context, position continue } - for idx, pk := range storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) { - lc := storage.NewLocationsCache(pk) - if candidate.MayPkExist(lc) { - result.Pks = append(result.Pks, pk) - result.Tss = append(result.Tss, dmsg.Timestamps[idx]) + pks := storage.ParseIDs2PrimaryKeys(dmsg.GetPrimaryKeys()) + batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt() + for idx := 0; idx < len(pks); idx += batchSize { + endIdx := idx + batchSize + if endIdx > len(pks) { + endIdx = len(pks) + } + + lc := storage.NewBatchLocationsCache(pks[idx:endIdx]) + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + result.Pks = append(result.Pks, pks[idx+i]) + result.Tss = append(result.Tss, dmsg.Timestamps[idx+i]) + } } } } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 3ee5295736..3e7933af71 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -261,6 +261,13 @@ func (s *DelegatorDataSuite) TestProcessDelete() { ms.EXPECT().MayPkExist(mock.Anything).RunAndReturn(func(lc *storage.LocationsCache) bool { return lc.GetPk().EQ(storage.NewInt64PrimaryKey(10)) }) + ms.EXPECT().BatchPkExist(mock.Anything).RunAndReturn(func(lc *storage.BatchLocationsCache) []bool { + hits := make([]bool, lc.Size()) + for i, pk := range lc.PKs() { + hits[i] = pk.EQ(storage.NewInt64PrimaryKey(10)) + } + return hits + }) return ms }) }, nil) diff --git a/internal/querynodev2/pkoracle/bloom_filter_set.go b/internal/querynodev2/pkoracle/bloom_filter_set.go index 88f5602ebf..ef64da02ed 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set.go @@ -59,6 +59,21 @@ func (s *BloomFilterSet) MayPkExist(lc *storage.LocationsCache) bool { return false } +func (s *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + s.statsMutex.RLock() + defer s.statsMutex.RUnlock() + + hits := make([]bool, lc.Size()) + if s.currentStat != nil { + s.currentStat.BatchPkExist(lc, hits) + } + + for _, bf := range s.historyStats { + bf.BatchPkExist(lc, hits) + } + return hits +} + // ID implement candidate. func (s *BloomFilterSet) ID() int64 { return s.segmentID @@ -80,12 +95,12 @@ func (s *BloomFilterSet) UpdateBloomFilter(pks []storage.PrimaryKey) { defer s.statsMutex.Unlock() if s.currentStat == nil { - bf := bloomfilter.NewBloomFilterWithType( - paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), - paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), - paramtable.Get().CommonCfg.BloomFilterType.GetValue()) s.currentStat = &storage.PkStatistics{ - PkFilter: bf, + PkFilter: bloomfilter.NewBloomFilterWithType( + paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat(), + paramtable.Get().CommonCfg.BloomFilterType.GetValue(), + ), } } diff --git a/internal/querynodev2/pkoracle/bloom_filter_set_test.go b/internal/querynodev2/pkoracle/bloom_filter_set_test.go index 9aaa8f0a08..2bde478cbd 100644 --- a/internal/querynodev2/pkoracle/bloom_filter_set_test.go +++ b/internal/querynodev2/pkoracle/bloom_filter_set_test.go @@ -93,4 +93,10 @@ func TestHistoricalStat(t *testing.T) { ret := bfs.MayPkExist(lc) assert.True(t, ret) } + + lc := storage.NewBatchLocationsCache(pks) + ret := bfs.BatchPkExist(lc) + for i := range ret { + assert.True(t, ret[i]) + } } diff --git a/internal/querynodev2/pkoracle/candidate.go b/internal/querynodev2/pkoracle/candidate.go index c115a5a0c1..bb2479702b 100644 --- a/internal/querynodev2/pkoracle/candidate.go +++ b/internal/querynodev2/pkoracle/candidate.go @@ -27,6 +27,7 @@ import ( type Candidate interface { // MayPkExist checks whether primary key could exists in this candidate. MayPkExist(lc *storage.LocationsCache) bool + BatchPkExist(lc *storage.BatchLocationsCache) []bool ID() int64 Partition() int64 diff --git a/internal/querynodev2/pkoracle/key.go b/internal/querynodev2/pkoracle/key.go index 6600398798..fe68025617 100644 --- a/internal/querynodev2/pkoracle/key.go +++ b/internal/querynodev2/pkoracle/key.go @@ -33,6 +33,14 @@ func (k candidateKey) MayPkExist(lc *storage.LocationsCache) bool { return true } +func (k candidateKey) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + ret := make([]bool, 0) + for i := 0; i < lc.Size(); i++ { + ret = append(ret, true) + } + return ret +} + // ID implements Candidate. func (k candidateKey) ID() int64 { return k.segmentID diff --git a/internal/querynodev2/pkoracle/pk_oracle.go b/internal/querynodev2/pkoracle/pk_oracle.go index a700fe3066..c3d9fc1094 100644 --- a/internal/querynodev2/pkoracle/pk_oracle.go +++ b/internal/querynodev2/pkoracle/pk_oracle.go @@ -28,6 +28,7 @@ import ( type PkOracle interface { // GetCandidates returns segment candidates of which pk might belongs to. Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) + BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 // RegisterCandidate adds candidate into pkOracle. Register(candidate Candidate, workerID int64) error // RemoveCandidate removes candidate @@ -46,6 +47,7 @@ type pkOracle struct { // Get implements PkOracle. func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error) { var result []int64 + lc := storage.NewLocationsCache(pk) pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { for _, filter := range filters { if !filter(candidate) { @@ -53,7 +55,6 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i } } - lc := storage.NewLocationsCache(pk) if candidate.MayPkExist(lc) { result = append(result, candidate.ID()) } @@ -63,6 +64,29 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i return result, nil } +func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 { + result := make([][]int64, len(pks)) + + lc := storage.NewBatchLocationsCache(pks) + pko.candidates.Range(func(key string, candidate candidateWithWorker) bool { + for _, filter := range filters { + if !filter(candidate) { + return true + } + } + + hits := candidate.BatchPkExist(lc) + for i, hit := range hits { + if hit { + result[i] = append(result[i], candidate.ID()) + } + } + return true + }) + + return result +} + func (pko *pkOracle) candidateKey(candidate Candidate, workerID int64) string { return fmt.Sprintf("%s-%d-%d", candidate.Type().String(), workerID, candidate.ID()) } diff --git a/internal/querynodev2/segments/mock_segment.go b/internal/querynodev2/segments/mock_segment.go index e31d1b5181..6240709084 100644 --- a/internal/querynodev2/segments/mock_segment.go +++ b/internal/querynodev2/segments/mock_segment.go @@ -35,6 +35,50 @@ func (_m *MockSegment) EXPECT() *MockSegment_Expecter { return &MockSegment_Expecter{mock: &_m.Mock} } +// BatchPkExist provides a mock function with given fields: lc +func (_m *MockSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + ret := _m.Called(lc) + + var r0 []bool + if rf, ok := ret.Get(0).(func(*storage.BatchLocationsCache) []bool); ok { + r0 = rf(lc) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]bool) + } + } + + return r0 +} + +// MockSegment_BatchTestLocationCache_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'BatchPkExist' +type MockSegment_BatchTestLocationCache_Call struct { + *mock.Call +} + +// BatchPkExist is a helper method to define mock.On call +// - lc *storage.BatchLocationsCache +func (_e *MockSegment_Expecter) BatchPkExist(lc interface{}) *MockSegment_BatchTestLocationCache_Call { + return &MockSegment_BatchTestLocationCache_Call{Call: _e.mock.On("BatchPkExist", lc)} +} + +func (_c *MockSegment_BatchTestLocationCache_Call) Run(run func(lc *storage.BatchLocationsCache)) *MockSegment_BatchTestLocationCache_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*storage.BatchLocationsCache)) + }) + return _c +} + +func (_c *MockSegment_BatchTestLocationCache_Call) Return(_a0 []bool) *MockSegment_BatchTestLocationCache_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegment_BatchTestLocationCache_Call) RunAndReturn(run func(*storage.BatchLocationsCache) []bool) *MockSegment_BatchTestLocationCache_Call { + _c.Call.Return(run) + return _c +} + // CASVersion provides a mock function with given fields: _a0, _a1 func (_m *MockSegment) CASVersion(_a0 int64, _a1 int64) bool { ret := _m.Called(_a0, _a1) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 47ba711fdd..83404096cd 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -186,8 +186,12 @@ func (s *baseSegment) UpdateBloomFilter(pks []storage.PrimaryKey) { // MayPkExist returns true if the given PK exists in the PK range and being positive through the bloom filter, // false otherwise, // may returns true even the PK doesn't exist actually -func (s *baseSegment) MayPkExist(lc *storage.LocationsCache) bool { - return s.bloomFilterSet.MayPkExist(lc) +func (s *baseSegment) MayPkExist(pk *storage.LocationsCache) bool { + return s.bloomFilterSet.MayPkExist(pk) +} + +func (s *baseSegment) BatchPkExist(lc *storage.BatchLocationsCache) []bool { + return s.bloomFilterSet.BatchPkExist(lc) } // ResourceUsageEstimate returns the estimated resource usage of the segment. diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index f439d0f818..164395b206 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -84,6 +84,7 @@ type Segment interface { // Bloom filter related UpdateBloomFilter(pks []storage.PrimaryKey) MayPkExist(lc *storage.LocationsCache) bool + BatchPkExist(lc *storage.BatchLocationsCache) []bool // Read operations Search(ctx context.Context, searchReq *SearchRequest) (*SearchResult, error) diff --git a/internal/storage/pk_statistics.go b/internal/storage/pk_statistics.go index 7d4b21e2ef..35649ae46f 100644 --- a/internal/storage/pk_statistics.go +++ b/internal/storage/pk_statistics.go @@ -20,6 +20,7 @@ import ( "fmt" "github.com/cockroachdb/errors" + "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/util/bloomfilter" @@ -125,21 +126,6 @@ func Locations(pk PrimaryKey, k uint, bfType bloomfilter.BFType) []uint64 { return nil } -func (st *PkStatistics) TestLocations(pk PrimaryKey, locs []uint64) bool { - // empty pkStatics - if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { - return false - } - - // check bf first, TestLocation just do some bitset compute, cost is cheaper - if !st.PkFilter.TestLocations(locs) { - return false - } - - // check pk range first, ugly but key it for now - return st.MinPK.LE(pk) && st.MaxPK.GE(pk) -} - func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { // empty pkStatics if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { @@ -155,6 +141,28 @@ func (st *PkStatistics) TestLocationCache(lc *LocationsCache) bool { return st.MinPK.LE(lc.pk) && st.MaxPK.GE(lc.pk) } +func (st *PkStatistics) BatchPkExist(lc *BatchLocationsCache, hits []bool) []bool { + // empty pkStatics + if st.MinPK == nil || st.MaxPK == nil || st.PkFilter == nil { + return hits + } + + // check bf first, TestLocation just do some bitset compute, cost is cheaper + locations := lc.Locations(st.PkFilter.K(), st.PkFilter.Type()) + ret := st.PkFilter.BatchTestLocations(locations, hits) + + // todo: a bit ugly, hits[i]'s value will depends on multi bf in single segment, + // hits array will be removed after we merge bf in segment + pks := lc.PKs() + for i := range ret { + if !hits[i] { + hits[i] = ret[i] && st.MinPK.LE(pks[i]) && st.MaxPK.GE(pks[i]) + } + } + + return hits +} + // LocationsCache is a helper struct caching pk bloom filter locations. // Note that this helper is not concurrent safe and shall be used in same goroutine. type LocationsCache struct { @@ -175,10 +183,11 @@ func (lc *LocationsCache) Locations(k uint, bfType bloomfilter.BFType) []uint64 } return lc.basicBFLocations[:k] case bloomfilter.BlockedBF: - if int(k) > len(lc.blockBFLocations) { - lc.blockBFLocations = Locations(lc.pk, k, bfType) + // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value + if len(lc.blockBFLocations) != 1 { + lc.blockBFLocations = Locations(lc.pk, 1, bfType) } - return lc.blockBFLocations[:k] + return lc.blockBFLocations default: return nil } @@ -189,3 +198,53 @@ func NewLocationsCache(pk PrimaryKey) *LocationsCache { pk: pk, } } + +type BatchLocationsCache struct { + pks []PrimaryKey + k uint + + // for block bf + blockLocations [][]uint64 + + // for basic bf + basicLocations [][]uint64 +} + +func (lc *BatchLocationsCache) PKs() []PrimaryKey { + return lc.pks +} + +func (lc *BatchLocationsCache) Size() int { + return len(lc.pks) +} + +func (lc *BatchLocationsCache) Locations(k uint, bfType bloomfilter.BFType) [][]uint64 { + switch bfType { + case bloomfilter.BasicBF: + if k > lc.k { + lc.k = k + lc.basicLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k, bfType) + }) + } + + return lc.basicLocations + case bloomfilter.BlockedBF: + // for block bf, we only need cache the hash result, which is a uint and only compute once for any k value + if len(lc.blockLocations) != len(lc.pks) { + lc.blockLocations = lo.Map(lc.pks, func(pk PrimaryKey, _ int) []uint64 { + return Locations(pk, lc.k, bfType) + }) + } + + return lc.blockLocations + default: + return nil + } +} + +func NewBatchLocationsCache(pks []PrimaryKey) *BatchLocationsCache { + return &BatchLocationsCache{ + pks: pks, + } +} diff --git a/internal/util/bloomfilter/bloom_filter.go b/internal/util/bloomfilter/bloom_filter.go index 778597844e..2183f04ef4 100644 --- a/internal/util/bloomfilter/bloom_filter.go +++ b/internal/util/bloomfilter/bloom_filter.go @@ -77,6 +77,7 @@ type BloomFilterInterface interface { Test(data []byte) bool TestString(data string) bool TestLocations(locs []uint64) bool + BatchTestLocations(locs [][]uint64, hit []bool) []bool MarshalJSON() ([]byte, error) UnmarshalJSON(data []byte) error } @@ -126,6 +127,20 @@ func (b *basicBloomFilter) TestLocations(locs []uint64) bool { return b.inner.TestLocations(locs[:b.k]) } +func (b *basicBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := range hits { + if !hits[i] { + if uint(len(locs[i])) < b.k { + ret[i] = true + continue + } + ret[i] = b.inner.TestLocations(locs[i][:b.k]) + } + } + return ret +} + func (b basicBloomFilter) MarshalJSON() ([]byte, error) { return b.inner.MarshalJSON() } @@ -188,7 +203,25 @@ func (b *blockedBloomFilter) TestString(data string) bool { } func (b *blockedBloomFilter) TestLocations(locs []uint64) bool { - return b.inner.TestLocations(locs) + // for block bf, just cache it's hash result as locations + if len(locs) != 1 { + return true + } + return b.inner.Has(locs[0]) +} + +func (b *blockedBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := range hits { + if !hits[i] { + if len(locs[i]) != 1 { + ret[i] = true + continue + } + ret[i] = b.inner.Has(locs[i][0]) + } + } + return ret } func (b blockedBloomFilter) MarshalJSON() ([]byte, error) { @@ -238,6 +271,15 @@ func (b *alwaysTrueBloomFilter) TestLocations(locs []uint64) bool { return true } +func (b *alwaysTrueBloomFilter) BatchTestLocations(locs [][]uint64, hits []bool) []bool { + ret := make([]bool, len(locs)) + for i := 0; i < len(hits); i++ { + ret[i] = true + } + + return ret +} + func (b *alwaysTrueBloomFilter) MarshalJSON() ([]byte, error) { return []byte{}, nil } @@ -287,7 +329,7 @@ func Locations(data []byte, k uint, bfType BFType) []uint64 { case BasicBF: return bloom.Locations(data, k) case BlockedBF: - return blobloom.Locations(xxh3.Hash(data), k) + return []uint64{xxh3.Hash(data)} case AlwaysTrueBF: return nil default: diff --git a/internal/util/bloomfilter/bloom_filter_test.go b/internal/util/bloomfilter/bloom_filter_test.go index 5774d205b9..df65ecffbd 100644 --- a/internal/util/bloomfilter/bloom_filter_test.go +++ b/internal/util/bloomfilter/bloom_filter_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/bits-and-blooms/bloom/v3" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "go.uber.org/zap" @@ -73,9 +74,10 @@ func TestPerformance_MultiBF(t *testing.T) { capacity := 100000 fpr := 0.001 - keys := make([][]byte, 0) - for i := 0; i < capacity; i++ { - keys = append(keys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + testKeySize := 100000 + testKeys := make([][]byte, 0) + for i := 0; i < testKeySize; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) } bfNum := 100 @@ -83,8 +85,9 @@ func TestPerformance_MultiBF(t *testing.T) { start1 := time.Now() for i := 0; i < bfNum; i++ { bf1 := newBlockedBloomFilter(uint(capacity), fpr) - for _, key := range keys { - bf1.Add(key) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf1.Add([]byte(key)) } bfs1 = append(bfs1, bf1) } @@ -92,7 +95,7 @@ func TestPerformance_MultiBF(t *testing.T) { log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) start3 := time.Now() - for _, key := range keys { + for _, key := range testKeys { locations := Locations(key, bfs1[0].K(), BlockedBF) for i := 0; i < bfNum; i++ { bfs1[i].TestLocations(locations) @@ -104,7 +107,7 @@ func TestPerformance_MultiBF(t *testing.T) { start1 = time.Now() for i := 0; i < bfNum; i++ { bf2 := newBasicBloomFilter(uint(capacity), fpr) - for _, key := range keys { + for _, key := range testKeys { bf2.Add(key) } bfs2 = append(bfs2, bf2) @@ -113,7 +116,7 @@ func TestPerformance_MultiBF(t *testing.T) { log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) start3 = time.Now() - for _, key := range keys { + for _, key := range testKeys { locations := Locations(key, bfs1[0].K(), BasicBF) for i := 0; i < bfNum; i++ { bfs2[i].TestLocations(locations) @@ -122,6 +125,96 @@ func TestPerformance_MultiBF(t *testing.T) { log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) } +func TestPerformance_BatchTestLocations(t *testing.T) { + capacity := 100000 + fpr := 0.001 + + testKeySize := 100000 + testKeys := make([][]byte, 0) + for i := 0; i < testKeySize; i++ { + testKeys = append(testKeys, []byte(fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)))) + } + + batchSize := 1000 + + bfNum := 100 + bfs1 := make([]*blockedBloomFilter, 0) + start1 := time.Now() + for i := 0; i < bfNum; i++ { + bf1 := newBlockedBloomFilter(uint(capacity), fpr) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf1.Add([]byte(key)) + } + bfs1 = append(bfs1, bf1) + } + + log.Info("Block BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 := time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs1[0].K(), BlockedBF) + for i := 0; i < bfNum; i++ { + bfs1[i].TestLocations(locations) + } + } + log.Info("Block BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + start3 = time.Now() + for i := 0; i < testKeySize; i += batchSize { + endIdx := i + batchSize + if endIdx > testKeySize { + endIdx = testKeySize + } + locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 { + return Locations(key, bfs1[0].K(), BlockedBF) + }) + hits := make([]bool, batchSize) + for j := 0; j < bfNum; j++ { + bfs1[j].BatchTestLocations(locations, hits) + } + } + log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3))) + + bfs2 := make([]*basicBloomFilter, 0) + start1 = time.Now() + for i := 0; i < bfNum; i++ { + bf2 := newBasicBloomFilter(uint(capacity), fpr) + for j := 0; j < capacity; j++ { + key := fmt.Sprintf("key%d", time.Now().UnixNano()+int64(i)) + bf2.Add([]byte(key)) + } + bfs2 = append(bfs2, bf2) + } + + log.Info("Basic BF construct cost", zap.Duration("time", time.Since(start1))) + + start3 = time.Now() + for _, key := range testKeys { + locations := Locations(key, bfs2[0].K(), BasicBF) + for i := 0; i < bfNum; i++ { + bfs2[i].TestLocations(locations) + } + } + log.Info("Basic BF TestLocation cost", zap.Duration("time", time.Since(start3))) + + start3 = time.Now() + for i := 0; i < testKeySize; i += batchSize { + endIdx := i + batchSize + if endIdx > testKeySize { + endIdx = testKeySize + } + locations := lo.Map(testKeys[i:endIdx], func(key []byte, _ int) []uint64 { + return Locations(key, bfs2[0].K(), BasicBF) + }) + hits := make([]bool, batchSize) + for j := 0; j < bfNum; j++ { + bfs2[j].BatchTestLocations(locations, hits) + } + } + log.Info("Block BF BatchTestLocation cost", zap.Duration("time", time.Since(start3))) +} + func TestPerformance_Capacity(t *testing.T) { fpr := 0.001 diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 5c566d8541..1862720a96 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -239,15 +239,16 @@ type commonConfig struct { LockSlowLogInfoThreshold ParamItem `refreshable:"true"` LockSlowLogWarnThreshold ParamItem `refreshable:"true"` - StorageScheme ParamItem `refreshable:"false"` - EnableStorageV2 ParamItem `refreshable:"false"` - StoragePathPrefix ParamItem `refreshable:"false"` - TTMsgEnabled ParamItem `refreshable:"true"` - TraceLogMode ParamItem `refreshable:"true"` - BloomFilterSize ParamItem `refreshable:"true"` - BloomFilterType ParamItem `refreshable:"true"` - MaxBloomFalsePositive ParamItem `refreshable:"true"` - PanicWhenPluginFail ParamItem `refreshable:"false"` + StorageScheme ParamItem `refreshable:"false"` + EnableStorageV2 ParamItem `refreshable:"false"` + StoragePathPrefix ParamItem `refreshable:"false"` + TTMsgEnabled ParamItem `refreshable:"true"` + TraceLogMode ParamItem `refreshable:"true"` + BloomFilterSize ParamItem `refreshable:"true"` + BloomFilterType ParamItem `refreshable:"true"` + MaxBloomFalsePositive ParamItem `refreshable:"true"` + BloomFilterApplyBatchSize ParamItem `refreshable:"true"` + PanicWhenPluginFail ParamItem `refreshable:"false"` UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"` @@ -757,6 +758,15 @@ like the old password verification when updating the credential`, } p.MaxBloomFalsePositive.Init(base.mgr) + p.BloomFilterApplyBatchSize = ParamItem{ + Key: "common.bloomFilterApplyBatchSize", + Version: "2.4.4", + DefaultValue: "1000", + Doc: "batch size when to apply pk to bloom filter", + Export: true, + } + p.BloomFilterApplyBatchSize.Init(base.mgr) + p.PanicWhenPluginFail = ParamItem{ Key: "common.panicWhenPluginFail", Version: "2.4.2", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 897b26f94e..01abc07272 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -108,6 +108,8 @@ func TestComponentParam(t *testing.T) { params.Save("common.preCreatedTopic.timeticker", "timeticker") assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings()) + + assert.Equal(t, uint(1000), params.CommonCfg.BloomFilterApplyBatchSize.GetAsUint()) }) t.Run("test rootCoordConfig", func(t *testing.T) {