diff --git a/internal/datanode/metacache/bloom_filter_set.go b/internal/datanode/metacache/bloom_filter_set.go index 3251c53f81..6765f1b168 100644 --- a/internal/datanode/metacache/bloom_filter_set.go +++ b/internal/datanode/metacache/bloom_filter_set.go @@ -26,15 +26,32 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) +// BloomFilterSet is a struct with multiple `storage.PkStatstics`. +// it maintains bloom filter generated from segment primary keys. +// it may be updated with new insert FieldData when serving growing segments. type BloomFilterSet struct { - mut sync.Mutex - current *storage.PkStatistics - history []*storage.PkStatistics + mut sync.Mutex + batchSize uint + current *storage.PkStatistics + history []*storage.PkStatistics } +// NewBloomFilterSet returns a BloomFilterSet with provided historyEntries. +// Shall serve Flushed segments only. For growing segments, use `NewBloomFilterSetWithBatchSize` instead. func NewBloomFilterSet(historyEntries ...*storage.PkStatistics) *BloomFilterSet { return &BloomFilterSet{ - history: historyEntries, + batchSize: paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + history: historyEntries, + } +} + +// NewBloomFilterSetWithBatchSize returns a BloomFilterSet. +// The batchSize parameter is used to initialize new bloom filter. +// It shall be the estimated row count per batch for segment to sync with. +func NewBloomFilterSetWithBatchSize(batchSize uint, historyEntries ...*storage.PkStatistics) *BloomFilterSet { + return &BloomFilterSet{ + batchSize: batchSize, + history: historyEntries, } } @@ -59,7 +76,7 @@ func (bfs *BloomFilterSet) UpdatePKRange(ids storage.FieldData) error { if bfs.current == nil { bfs.current = &storage.PkStatistics{ - PkFilter: bloom.NewWithEstimates(paramtable.Get().CommonCfg.BloomFilterSize.GetAsUint(), + PkFilter: bloom.NewWithEstimates(bfs.batchSize, paramtable.Get().CommonCfg.MaxBloomFalsePositive.GetAsFloat()), } } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index dae15a1e38..2a328e20aa 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -76,12 +76,13 @@ type writeBufferBase struct { collectionID int64 channelName string - metaWriter syncmgr.MetaWriter - collSchema *schemapb.CollectionSchema - metaCache metacache.MetaCache - syncMgr syncmgr.SyncManager - broker broker.Broker - serializer syncmgr.Serializer + metaWriter syncmgr.MetaWriter + collSchema *schemapb.CollectionSchema + estSizePerRecord int + metaCache metacache.MetaCache + syncMgr syncmgr.SyncManager + broker broker.Broker + serializer syncmgr.Serializer buffers map[int64]*segmentBuffer // segmentID => segmentBuffer @@ -115,18 +116,25 @@ func newWriteBufferBase(channel string, metacache metacache.MetaCache, storageV2 return nil, err } + schema := metacache.Schema() + estSize, err := typeutil.EstimateSizePerRecord(schema) + if err != nil { + return nil, err + } + return &writeBufferBase{ - channelName: channel, - collectionID: metacache.Collection(), - collSchema: metacache.Schema(), - syncMgr: syncMgr, - metaWriter: option.metaWriter, - buffers: make(map[int64]*segmentBuffer), - metaCache: metacache, - serializer: serializer, - syncPolicies: option.syncPolicies, - flushTimestamp: flushTs, - storagev2Cache: storageV2Cache, + channelName: channel, + collectionID: metacache.Collection(), + collSchema: schema, + estSizePerRecord: estSize, + syncMgr: syncMgr, + metaWriter: option.metaWriter, + buffers: make(map[int64]*segmentBuffer), + metaCache: metacache, + serializer: serializer, + syncPolicies: option.syncPolicies, + flushTimestamp: flushTs, + storagev2Cache: storageV2Cache, }, nil } @@ -334,7 +342,9 @@ func (wb *writeBufferBase) bufferInsert(insertMsgs []*msgstream.InsertMsg, start InsertChannel: wb.channelName, StartPosition: startPos, State: commonpb.SegmentState_Growing, - }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }, metacache.SetStartPosRecorded(false)) + }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSetWithBatchSize(wb.getEstBatchSize()) + }, metacache.SetStartPosRecorded(false)) } segBuf := wb.getOrCreateBuffer(segmentID) @@ -415,6 +425,12 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy return wb.serializer.EncodeBuffer(ctx, pack) } +// getEstBatchSize returns the batch size based on estimated size per record and FlushBufferSize configuration value. +func (wb *writeBufferBase) getEstBatchSize() uint { + sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64() + return uint(sizeLimit / int64(wb.estSizePerRecord)) +} + func (wb *writeBufferBase) Close(drop bool) { // sink all data and call Drop for meta writer wb.mut.Lock()