enhance: Execute bloom filter apply in parallel to speed up segment predict (#33792)

issue: #33610

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/33861/head
wei liu 2024-06-14 11:37:56 +08:00 committed by GitHub
parent b69e9093c8
commit 4987067375
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 464 additions and 101 deletions

View File

@ -54,12 +54,12 @@ type MockManager_AllocImportSegment_Call struct {
}
// AllocImportSegment is a helper method to define mock.On call
// - ctx context.Context
// - taskID int64
// - collectionID int64
// - partitionID int64
// - channelName string
// - level datapb.SegmentLevel
// - ctx context.Context
// - taskID int64
// - collectionID int64
// - partitionID int64
// - channelName string
// - level datapb.SegmentLevel
func (_e *MockManager_Expecter) AllocImportSegment(ctx interface{}, taskID interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, level interface{}) *MockManager_AllocImportSegment_Call {
return &MockManager_AllocImportSegment_Call{Call: _e.mock.On("AllocImportSegment", ctx, taskID, collectionID, partitionID, channelName, level)}
}
@ -113,11 +113,11 @@ type MockManager_AllocSegment_Call struct {
}
// AllocSegment is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - channelName string
// - requestRows int64
// - ctx context.Context
// - collectionID int64
// - partitionID int64
// - channelName string
// - requestRows int64
func (_e *MockManager_Expecter) AllocSegment(ctx interface{}, collectionID interface{}, partitionID interface{}, channelName interface{}, requestRows interface{}) *MockManager_AllocSegment_Call {
return &MockManager_AllocSegment_Call{Call: _e.mock.On("AllocSegment", ctx, collectionID, partitionID, channelName, requestRows)}
}
@ -150,8 +150,8 @@ type MockManager_DropSegment_Call struct {
}
// DropSegment is a helper method to define mock.On call
// - ctx context.Context
// - segmentID int64
// - ctx context.Context
// - segmentID int64
func (_e *MockManager_Expecter) DropSegment(ctx interface{}, segmentID interface{}) *MockManager_DropSegment_Call {
return &MockManager_DropSegment_Call{Call: _e.mock.On("DropSegment", ctx, segmentID)}
}
@ -184,8 +184,8 @@ type MockManager_DropSegmentsOfChannel_Call struct {
}
// DropSegmentsOfChannel is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ctx context.Context
// - channel string
func (_e *MockManager_Expecter) DropSegmentsOfChannel(ctx interface{}, channel interface{}) *MockManager_DropSegmentsOfChannel_Call {
return &MockManager_DropSegmentsOfChannel_Call{Call: _e.mock.On("DropSegmentsOfChannel", ctx, channel)}
}
@ -227,8 +227,8 @@ type MockManager_ExpireAllocations_Call struct {
}
// ExpireAllocations is a helper method to define mock.On call
// - channel string
// - ts uint64
// - channel string
// - ts uint64
func (_e *MockManager_Expecter) ExpireAllocations(channel interface{}, ts interface{}) *MockManager_ExpireAllocations_Call {
return &MockManager_ExpireAllocations_Call{Call: _e.mock.On("ExpireAllocations", channel, ts)}
}
@ -270,9 +270,9 @@ type MockManager_FlushImportSegments_Call struct {
}
// FlushImportSegments is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segmentIDs []int64
// - ctx context.Context
// - collectionID int64
// - segmentIDs []int64
func (_e *MockManager_Expecter) FlushImportSegments(ctx interface{}, collectionID interface{}, segmentIDs interface{}) *MockManager_FlushImportSegments_Call {
return &MockManager_FlushImportSegments_Call{Call: _e.mock.On("FlushImportSegments", ctx, collectionID, segmentIDs)}
}
@ -326,9 +326,9 @@ type MockManager_GetFlushableSegments_Call struct {
}
// GetFlushableSegments is a helper method to define mock.On call
// - ctx context.Context
// - channel string
// - ts uint64
// - ctx context.Context
// - channel string
// - ts uint64
func (_e *MockManager_Expecter) GetFlushableSegments(ctx interface{}, channel interface{}, ts interface{}) *MockManager_GetFlushableSegments_Call {
return &MockManager_GetFlushableSegments_Call{Call: _e.mock.On("GetFlushableSegments", ctx, channel, ts)}
}
@ -382,9 +382,9 @@ type MockManager_SealAllSegments_Call struct {
}
// SealAllSegments is a helper method to define mock.On call
// - ctx context.Context
// - collectionID int64
// - segIDs []int64
// - ctx context.Context
// - collectionID int64
// - segIDs []int64
func (_e *MockManager_Expecter) SealAllSegments(ctx interface{}, collectionID interface{}, segIDs interface{}) *MockManager_SealAllSegments_Call {
return &MockManager_SealAllSegments_Call{Call: _e.mock.On("SealAllSegments", ctx, collectionID, segIDs)}
}

View File

@ -242,11 +242,18 @@ func (t *LevelZeroCompactionTask) splitDelta(
})
// spilt all delete data to segments
retMap := t.applyBFInParallel(allDelta, io.GetBFApplyPool(), segmentBfs)
targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
split := func(pks []storage.PrimaryKey, pkTss []uint64) {
lc := storage.NewBatchLocationsCache(pks)
for segmentID, bf := range segmentBfs {
hits := bf.BatchPkExist(lc)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits
pks := allDelta[value.DeleteDataIdx].Pks
tss := allDelta[value.DeleteDataIdx].Tss
for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
writer, ok := targetSegBuffer[segmentID]
@ -255,27 +262,65 @@ func (t *LevelZeroCompactionTask) splitDelta(
writer = NewSegmentDeltaWriter(segmentID, segment.GetPartitionID(), segment.GetCollectionID())
targetSegBuffer[segmentID] = writer
}
writer.Write(pks[i], pkTss[i])
writer.Write(pks[startIdx+i], tss[startIdx+i])
}
}
}
return true
})
return targetSegBuffer
}
type BatchApplyRet = struct {
DeleteDataIdx int
StartIdx int
Segment2Hits map[int64][]bool
}
func (t *LevelZeroCompactionTask) applyBFInParallel(deleteDatas []*storage.DeleteData, pool *conc.Pool[any], segmentBfs map[int64]*metacache.BloomFilterSet) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
batchPredict := func(pks []storage.PrimaryKey) map[int64][]bool {
segment2Hits := make(map[int64][]bool, 0)
lc := storage.NewBatchLocationsCache(pks)
for segmentID, bf := range segmentBfs {
hits := bf.BatchPkExist(lc)
segment2Hits[segmentID] = hits
}
return segment2Hits
}
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
// spilt all delete data to segments
for _, deleteBuffer := range allDelta {
pks := deleteBuffer.Pks
pkTss := deleteBuffer.Tss
retIdx := 0
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
var futures []*conc.Future[any]
for didx, data := range deleteDatas {
pks := data.Pks
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx])
retIdx += 1
tmpRetIndex := retIdx
deleteDataId := didx
future := pool.Submit(func() (any, error) {
ret := batchPredict(pks[startIdx:endIdx])
retMap.Insert(tmpRetIndex, &BatchApplyRet{
DeleteDataIdx: deleteDataId,
StartIdx: startIdx,
Segment2Hits: ret,
})
return nil, nil
})
futures = append(futures, future)
}
}
conc.AwaitAll(futures...)
return targetSegBuffer
return retMap
}
func (t *LevelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []*datapb.CompactionSegmentBinlogs, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {

View File

@ -1,8 +1,14 @@
package io
import (
"context"
"sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
@ -18,6 +24,11 @@ var (
statsPoolInitOnce sync.Once
)
var (
bfApplyPool atomic.Pointer[conc.Pool[any]]
bfApplyPoolInitOnce sync.Once
)
func initIOPool() {
capacity := paramtable.Get().DataNodeCfg.IOConcurrency.GetAsInt()
if capacity > 32 {
@ -58,3 +69,50 @@ func getMultiReadPool() *conc.Pool[any] {
ioPoolInitOnce.Do(initMultiReadPool)
return ioPool
}
func resizePool(pool *conc.Pool[any], newSize int, tag string) {
log := log.Ctx(context.Background()).
With(
zap.String("poolTag", tag),
zap.Int("newSize", newSize),
)
if newSize <= 0 {
log.Warn("cannot set pool size to non-positive value")
return
}
err := pool.Resize(newSize)
if err != nil {
log.Warn("failed to resize pool", zap.Error(err))
return
}
log.Info("pool resize successfully")
}
func ResizeBFApplyPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newSize := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
resizePool(GetBFApplyPool(), newSize, "BFApplyPool")
}
}
func initBFApplyPool() {
bfApplyPoolInitOnce.Do(func() {
pt := paramtable.Get()
poolSize := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
log.Info("init BFApplyPool", zap.Int("poolSize", poolSize))
pool := conc.NewPool[any](
poolSize,
)
bfApplyPool.Store(pool)
pt.Watch(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key, config.NewHandler("dn.bfapply.parallel", ResizeBFApplyPool))
})
}
func GetBFApplyPool() *conc.Pool[any] {
initBFApplyPool()
return bfApplyPool.Load()
}

View File

@ -1,12 +1,15 @@
package io
import (
"strconv"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -36,3 +39,33 @@ func TestGetOrCreateIOPool(t *testing.T) {
}
wg.Wait()
}
func TestResizePools(t *testing.T) {
paramtable.Init()
pt := paramtable.Get()
defer func() {
pt.Reset(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key)
}()
t.Run("BfApplyPool", func(t *testing.T) {
expectedCap := hardware.GetCPUNum() * pt.DataNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
pt.Save(pt.DataNodeCfg.BloomFilterApplyParallelFactor.Key, strconv.FormatFloat(pt.DataNodeCfg.BloomFilterApplyParallelFactor.GetAsFloat()*2, 'f', 10, 64))
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
pt.Save(pt.DataNodeCfg.BloomFilterApplyParallelFactor.Key, "0")
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
})
}

View File

@ -85,6 +85,21 @@ func (bfs *BloomFilterSet) BatchPkExist(lc *storage.BatchLocationsCache) []bool
return hits
}
func (bfs *BloomFilterSet) BatchPkExistWithHits(lc *storage.BatchLocationsCache, hits []bool) []bool {
bfs.mut.RLock()
defer bfs.mut.RUnlock()
if bfs.current != nil {
bfs.current.BatchPkExist(lc, hits)
}
for _, bf := range bfs.history {
bf.BatchPkExist(lc, hits)
}
return hits
}
func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error {
bfs.mut.Lock()
defer bfs.mut.Unlock()

View File

@ -99,6 +99,12 @@ func (s *BloomFilterSetSuite) TestBatchPkExist() {
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
hits = make([]bool, lc.Size())
bfs.BatchPkExistWithHits(lc, hits)
for _, hit := range hits {
s.True(hit, "pk shall return exist after batch update")
}
}
}

View File

@ -3,17 +3,20 @@ package writebuffer
import (
"context"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
@ -49,60 +52,93 @@ func NewL0WriteBuffer(channel string, metacache metacache.MetaCache, storageV2Ca
func (wb *l0WriteBuffer) dispatchDeleteMsgs(groups []*inData, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) {
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
split := func(pks []storage.PrimaryKey, pkTss []uint64, segments []*metacache.SegmentInfo, l0SegmentID int64) {
split := func(pks []storage.PrimaryKey, pkTss []uint64, partitionSegments []*metacache.SegmentInfo, partitionGroups []*inData) []bool {
lc := storage.NewBatchLocationsCache(pks)
for _, segment := range segments {
// use hits to cache result
hits := make([]bool, len(pks))
for _, segment := range partitionSegments {
if segment.CompactTo() != 0 {
continue
}
hits := segment.GetBloomFilterSet().BatchPkExist(lc)
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for i, hit := range hits {
if hit {
deletePks = append(deletePks, pks[i])
deleteTss = append(deleteTss, pkTss[i])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
}
hits = segment.GetBloomFilterSet().BatchPkExistWithHits(lc, hits)
}
for _, inData := range partitionGroups {
hits = inData.batchPkExists(pks, pkTss, hits)
}
return hits
}
for _, delMsg := range deleteMsgs {
l0SegmentID := wb.getL0SegmentID(delMsg.GetPartitionID(), startPos)
pks := storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
type BatchApplyRet = struct {
// represent the idx for delete msg in deleteMsgs
DeleteDataIdx int
// represent the start idx for the batch in each deleteMsg
StartIdx int
Hits []bool
}
// transform pk to primary key
pksInDeleteMsgs := lo.Map(deleteMsgs, func(delMsg *msgstream.DeleteMsg, _ int) []storage.PrimaryKey {
return storage.ParseIDs2PrimaryKeys(delMsg.GetPrimaryKeys())
})
retIdx := 0
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
pool := io.GetBFApplyPool()
var futures []*conc.Future[any]
for didx, delMsg := range deleteMsgs {
pks := pksInDeleteMsgs[didx]
pkTss := delMsg.GetTimestamps()
segments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
partitionSegments := wb.metaCache.GetSegmentsBy(metacache.WithPartitionID(delMsg.PartitionID),
metacache.WithSegmentState(commonpb.SegmentState_Growing, commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed))
partitionGroups := lo.Filter(groups, func(inData *inData, _ int) bool {
return delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID
})
for idx := 0; idx < len(pks); idx += batchSize {
startIdx := idx
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
split(pks[idx:endIdx], pkTss[idx:endIdx], segments, l0SegmentID)
}
for _, inData := range groups {
if delMsg.GetPartitionID() == common.AllPartitionsID || delMsg.GetPartitionID() == inData.partitionID {
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for idx, pk := range pks {
ts := delMsg.GetTimestamps()[idx]
if inData.pkExists(pk, ts) {
deletePks = append(deletePks, pk)
deleteTss = append(deleteTss, delMsg.GetTimestamps()[idx])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
}
}
retIdx += 1
tmpRetIdx := retIdx
deleteDataId := didx
future := pool.Submit(func() (any, error) {
hits := split(pks[startIdx:endIdx], pkTss[startIdx:endIdx], partitionSegments, partitionGroups)
retMap.Insert(tmpRetIdx, &BatchApplyRet{
DeleteDataIdx: deleteDataId,
StartIdx: startIdx,
Hits: hits,
})
return nil, nil
})
futures = append(futures, future)
}
}
conc.AwaitAll(futures...)
retMap.Range(func(key int, value *BatchApplyRet) bool {
l0SegmentID := wb.getL0SegmentID(deleteMsgs[value.DeleteDataIdx].GetPartitionID(), startPos)
pks := pksInDeleteMsgs[value.DeleteDataIdx]
pkTss := deleteMsgs[value.DeleteDataIdx].GetTimestamps()
var deletePks []storage.PrimaryKey
var deleteTss []typeutil.Timestamp
for i, hit := range value.Hits {
if hit {
deletePks = append(deletePks, pks[value.StartIdx+i])
deleteTss = append(deleteTss, pkTss[value.StartIdx+i])
}
}
if len(deletePks) > 0 {
wb.bufferDelete(l0SegmentID, deletePks, deleteTss, startPos, endPos)
}
return true
})
}
func (wb *l0WriteBuffer) BufferData(insertMsgs []*msgstream.InsertMsg, deleteMsgs []*msgstream.DeleteMsg, startPos, endPos *msgpb.MsgPosition) error {

View File

@ -444,6 +444,32 @@ func (id *inData) pkExists(pk storage.PrimaryKey, ts uint64) bool {
return ok && ts > uint64(minTs)
}
func (id *inData) batchPkExists(pks []storage.PrimaryKey, tss []uint64, hits []bool) []bool {
if len(pks) == 0 {
return nil
}
pkType := pks[0].Type()
switch pkType {
case schemapb.DataType_Int64:
for i := range pks {
if !hits[i] {
minTs, ok := id.intPKTs[pks[i].GetValue().(int64)]
hits[i] = ok && tss[i] > uint64(minTs)
}
}
case schemapb.DataType_VarChar:
for i := range pks {
if !hits[i] {
minTs, ok := id.strPKTs[pks[i].GetValue().(string)]
hits[i] = ok && tss[i] > uint64(minTs)
}
}
}
return hits
}
// prepareInsert transfers InsertMsg into organized InsertData grouped by segmentID
// also returns primary key field data
func (wb *writeBufferBase) prepareInsert(insertMsgs []*msgstream.InsertMsg) ([]*inData, error) {

View File

@ -199,34 +199,37 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
Data: cacheItems,
})
start := time.Now()
retMap := sd.applyBFInParallel(deleteData, segments.GetBFApplyPool())
// segment => delete data
delRecords := make(map[int64]DeleteData)
for _, data := range deleteData {
pks := data.PrimaryKeys
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
for idx := 0; idx < len(pks); idx += batchSize {
endIdx := idx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits
pk2SegmentIDs := sd.pkOracle.BatchGet(pks[idx:endIdx], pkoracle.WithPartitionID(data.PartitionID))
for i, segmentIDs := range pk2SegmentIDs {
for _, segmentID := range segmentIDs {
pks := deleteData[value.DeleteDataIdx].PrimaryKeys
tss := deleteData[value.DeleteDataIdx].Timestamps
for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
delRecord := delRecords[segmentID]
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[idx+i])
delRecord.Timestamps = append(delRecord.Timestamps, data.Timestamps[idx+i])
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[startIdx+i])
delRecord.Timestamps = append(delRecord.Timestamps, tss[startIdx+i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
}
}
}
}
return true
})
bfCost := time.Since(start)
offlineSegments := typeutil.NewConcurrentSet[int64]()
sealed, growing, version := sd.distribution.PinOnlineSegments()
start = time.Now()
eg, ctx := errgroup.WithContext(context.Background())
for _, entry := range sealed {
entry := entry
@ -260,9 +263,9 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
return nil
})
}
// not error return in apply delete
_ = eg.Wait()
forwardDeleteCost := time.Since(start)
sd.distribution.Unpin(version)
offlineSegIDs := offlineSegments.Collect()
@ -273,6 +276,49 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.QueryNodeApplyBFCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(bfCost.Milliseconds()))
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
}
type BatchApplyRet = struct {
DeleteDataIdx int
StartIdx int
Segment2Hits map[int64][]bool
}
func (sd *shardDelegator) applyBFInParallel(deleteDatas []*DeleteData, pool *conc.Pool[any]) *typeutil.ConcurrentMap[int, *BatchApplyRet] {
retIdx := 0
retMap := typeutil.NewConcurrentMap[int, *BatchApplyRet]()
batchSize := paramtable.Get().CommonCfg.BloomFilterApplyBatchSize.GetAsInt()
var futures []*conc.Future[any]
for didx, data := range deleteDatas {
pks := data.PrimaryKeys
for idx := 0; idx < len(pks); idx += batchSize {
startIdx := idx
endIdx := startIdx + batchSize
if endIdx > len(pks) {
endIdx = len(pks)
}
retIdx += 1
tmpRetIndex := retIdx
deleteDataId := didx
future := pool.Submit(func() (any, error) {
ret := sd.pkOracle.BatchGet(pks[startIdx:endIdx], pkoracle.WithPartitionID(data.PartitionID))
retMap.Insert(tmpRetIndex, &BatchApplyRet{
DeleteDataIdx: deleteDataId,
StartIdx: startIdx,
Segment2Hits: ret,
})
return nil, nil
})
futures = append(futures, future)
}
}
conc.AwaitAll(futures...)
return retMap
}
// applyDelete handles delete record and apply them to corresponding workers.

View File

@ -28,7 +28,7 @@ import (
type PkOracle interface {
// GetCandidates returns segment candidates of which pk might belongs to.
Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]int64, error)
BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64
BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) map[int64][]bool
// RegisterCandidate adds candidate into pkOracle.
Register(candidate Candidate, workerID int64) error
// RemoveCandidate removes candidate
@ -64,8 +64,8 @@ func (pko *pkOracle) Get(pk storage.PrimaryKey, filters ...CandidateFilter) ([]i
return result, nil
}
func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) [][]int64 {
result := make([][]int64, len(pks))
func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilter) map[int64][]bool {
result := make(map[int64][]bool)
lc := storage.NewBatchLocationsCache(pks)
pko.candidates.Range(func(key string, candidate candidateWithWorker) bool {
@ -76,11 +76,7 @@ func (pko *pkOracle) BatchGet(pks []storage.PrimaryKey, filters ...CandidateFilt
}
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
result[i] = append(result[i], candidate.ID())
}
}
result[candidate.ID()] = hits
return true
})

View File

@ -45,6 +45,9 @@ var (
loadOnce sync.Once
warmupPool atomic.Pointer[conc.Pool[any]]
warmupOnce sync.Once
bfPool atomic.Pointer[conc.Pool[any]]
bfApplyOnce sync.Once
)
// initSQPool initialize
@ -115,6 +118,19 @@ func initWarmupPool() {
})
}
func initBFApplyPool() {
bfApplyOnce.Do(func() {
pt := paramtable.Get()
poolSize := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
pool := conc.NewPool[any](
poolSize,
)
bfPool.Store(pool)
pt.Watch(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key, config.NewHandler("qn.bfapply.parallel", ResizeBFApplyPool))
})
}
// GetSQPool returns the singleton pool instance for search/query operations.
func GetSQPool() *conc.Pool[any] {
initSQPool()
@ -137,6 +153,11 @@ func GetWarmupPool() *conc.Pool[any] {
return warmupPool.Load()
}
func GetBFApplyPool() *conc.Pool[any] {
initBFApplyPool()
return bfPool.Load()
}
func ResizeSQPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
@ -163,6 +184,14 @@ func ResizeWarmupPool(evt *config.Event) {
}
}
func ResizeBFApplyPool(evt *config.Event) {
if evt.HasUpdated {
pt := paramtable.Get()
newSize := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
resizePool(GetBFApplyPool(), newSize, "BFApplyPool")
}
}
func resizePool(pool *conc.Pool[any], newSize int, tag string) {
log := log.Ctx(context.Background()).
With(

View File

@ -37,6 +37,7 @@ func TestResizePools(t *testing.T) {
pt.Reset(pt.QueryNodeCfg.MaxReadConcurrency.Key)
pt.Reset(pt.QueryNodeCfg.CGOPoolSizeRatio.Key)
pt.Reset(pt.CommonCfg.MiddlePriorityThreadCoreCoefficient.Key)
pt.Reset(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key)
}()
t.Run("SQPool", func(t *testing.T) {
@ -103,6 +104,27 @@ func TestResizePools(t *testing.T) {
assert.Equal(t, expectedCap, GetWarmupPool().Cap())
})
t.Run("BfApplyPool", func(t *testing.T) {
expectedCap := hardware.GetCPUNum() * pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsInt()
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
pt.Save(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key, strconv.FormatFloat(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.GetAsFloat()*2, 'f', 10, 64))
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
pt.Save(pt.QueryNodeCfg.BloomFilterApplyParallelFactor.Key, "0")
ResizeBFApplyPool(&config.Event{
HasUpdated: true,
})
assert.Equal(t, expectedCap, GetBFApplyPool().Cap())
})
t.Run("error_pool", func(*testing.T) {
pool := conc.NewDefaultPool[any]()
c := pool.Cap()

View File

@ -59,6 +59,30 @@ var (
msgTypeLabelName,
})
QueryNodeApplyBFCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "apply_bf_latency",
Help: "apply bf cost in ms",
Buckets: buckets,
}, []string{
functionLabelName,
nodeIDLabelName,
})
QueryNodeForwardDeleteCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "forward_delete_latency",
Help: "forward delete cost in ms",
Buckets: buckets,
}, []string{
functionLabelName,
nodeIDLabelName,
})
QueryNodeWaitProcessingMsgCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
@ -766,6 +790,8 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeDiskCacheEvictDuration)
registry.MustRegister(QueryNodeDiskCacheEvictGlobalDuration)
registry.MustRegister(QueryNodeSegmentPruneRatio)
registry.MustRegister(QueryNodeApplyBFCost)
registry.MustRegister(QueryNodeForwardDeleteCost)
}
func CleanupQueryNodeCollectionMetrics(nodeID int64, collectionID int64) {

View File

@ -760,7 +760,7 @@ like the old password verification when updating the credential`,
p.BloomFilterApplyBatchSize = ParamItem{
Key: "common.bloomFilterApplyBatchSize",
Version: "2.4.4",
Version: "2.4.5",
DefaultValue: "1000",
Doc: "batch size when to apply pk to bloom filter",
Export: true,
@ -2181,6 +2181,7 @@ type queryNodeConfig struct {
DefaultSegmentFilterRatio ParamItem `refreshable:"false"`
UseStreamComputing ParamItem `refreshable:"false"`
QueryStreamBatchSize ParamItem `refreshable:"false"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`
}
func (p *queryNodeConfig) init(base *BaseTable) {
@ -2772,6 +2773,15 @@ user-task-polling:
Export: true,
}
p.QueryStreamBatchSize.Init(base.mgr)
p.BloomFilterApplyParallelFactor = ParamItem{
Key: "queryNode.bloomFilterApplyBatchSize",
Version: "2.4.5",
DefaultValue: "4",
Doc: "parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM",
Export: true,
}
p.BloomFilterApplyParallelFactor.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////
@ -3699,6 +3709,8 @@ type dataNodeConfig struct {
// clustering compaction
ClusteringCompactionMemoryBufferRatio ParamItem `refreshable:"true"`
ClusteringCompactionWorkerPoolSize ParamItem `refreshable:"true"`
BloomFilterApplyParallelFactor ParamItem `refreshable:"true"`
}
func (p *dataNodeConfig) init(base *BaseTable) {
@ -4033,6 +4045,15 @@ if this parameter <= 0, will set it as 10`,
Export: true,
}
p.ClusteringCompactionWorkerPoolSize.Init(base.mgr)
p.BloomFilterApplyParallelFactor = ParamItem{
Key: "datanode.bloomFilterApplyBatchSize",
Version: "2.4.5",
DefaultValue: "4",
Doc: "parallel factor when to apply pk to bloom filter, default to 4*CPU_CORE_NUM",
Export: true,
}
p.BloomFilterApplyParallelFactor.Init(base.mgr)
}
// /////////////////////////////////////////////////////////////////////////////

View File

@ -109,7 +109,7 @@ func TestComponentParam(t *testing.T) {
params.Save("common.preCreatedTopic.timeticker", "timeticker")
assert.Equal(t, []string{"timeticker"}, Params.TimeTicker.GetAsStrings())
assert.Equal(t, uint(1000), params.CommonCfg.BloomFilterApplyBatchSize.GetAsUint())
assert.Equal(t, 1000, params.CommonCfg.BloomFilterApplyBatchSize.GetAsInt())
})
t.Run("test rootCoordConfig", func(t *testing.T) {
@ -418,6 +418,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 2*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond))
params.Save("queryNode.lazyload.requestResourceRetryInterval", "3000")
assert.Equal(t, 3*time.Second, Params.LazyLoadRequestResourceRetryInterval.GetAsDuration(time.Millisecond))
assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
})
t.Run("test dataCoordConfig", func(t *testing.T) {
@ -514,6 +516,8 @@ func TestComponentParam(t *testing.T) {
// clustering compaction
params.Save("datanode.clusteringCompaction.memoryBufferRatio", "0.1")
assert.Equal(t, 0.1, Params.ClusteringCompactionMemoryBufferRatio.GetAsFloat())
assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt())
})
t.Run("test indexNodeConfig", func(t *testing.T) {