From ff1e967e894a0e08e7d9520ff899f01f6f74b355 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 11 Mar 2024 10:55:02 +0800 Subject: [PATCH] enhance: Add segment id short cut for WithSegmentID filter (#31144) See also #31143 This PR add short cut for datanoe metacache `WithSegmentIDs` filter, which could just fetch segment from map with provided segmentIDs. Also add benchmark for new implementation vs old one. Signed-off-by: Congqi Xia --- internal/datanode/compactor_test.go | 1 - internal/datanode/metacache/actions.go | 63 +++++++++++----- internal/datanode/metacache/actions_test.go | 16 ++-- internal/datanode/metacache/meta_cache.go | 75 ++++++++++++------- .../datanode/metacache/meta_cache_test.go | 75 ++++++++++++++++++- 5 files changed, 170 insertions(+), 60 deletions(-) diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index eee7bf97e4..cd67f4af96 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -533,7 +533,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) { Schema: meta.GetSchema(), }, dm) assert.Error(t, err) - t.Log(err) }) t.Run("Merge with meta error", func(t *testing.T) { diff --git a/internal/datanode/metacache/actions.go b/internal/datanode/metacache/actions.go index 57c5a21b16..b83a88636e 100644 --- a/internal/datanode/metacache/actions.go +++ b/internal/datanode/metacache/actions.go @@ -25,50 +25,77 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type SegmentFilter func(info *SegmentInfo) bool +type SegmentFilter interface { + Filter(info *SegmentInfo) bool + SegmentIDs() ([]int64, bool) +} -func WithPartitionID(partitionID int64) SegmentFilter { - return func(info *SegmentInfo) bool { - return partitionID == common.InvalidPartitionID || info.partitionID == partitionID - } +type SegmentIDFilter struct { + segmentIDs []int64 + ids typeutil.Set[int64] } func WithSegmentIDs(segmentIDs ...int64) SegmentFilter { - set := typeutil.NewSet[int64](segmentIDs...) - return func(info *SegmentInfo) bool { - return set.Contain(info.segmentID) + set := typeutil.NewSet(segmentIDs...) + return &SegmentIDFilter{ + segmentIDs: segmentIDs, + ids: set, } } +func (f *SegmentIDFilter) Filter(info *SegmentInfo) bool { + return f.ids.Contain(info.segmentID) +} + +func (f *SegmentIDFilter) SegmentIDs() ([]int64, bool) { + return f.segmentIDs, true +} + +type SegmentFilterFunc func(info *SegmentInfo) bool + +func (f SegmentFilterFunc) Filter(info *SegmentInfo) bool { + return f(info) +} + +func (f SegmentFilterFunc) SegmentIDs() ([]int64, bool) { + return nil, false +} + +func WithPartitionID(partitionID int64) SegmentFilter { + return SegmentFilterFunc(func(info *SegmentInfo) bool { + return partitionID == common.InvalidPartitionID || info.partitionID == partitionID + }) +} + func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter { set := typeutil.NewSet(states...) - return func(info *SegmentInfo) bool { + return SegmentFilterFunc(func(info *SegmentInfo) bool { return set.Len() > 0 && set.Contain(info.state) - } + }) } func WithStartPosNotRecorded() SegmentFilter { - return func(info *SegmentInfo) bool { + return SegmentFilterFunc(func(info *SegmentInfo) bool { return !info.startPosRecorded - } + }) } func WithLevel(level datapb.SegmentLevel) SegmentFilter { - return func(info *SegmentInfo) bool { + return SegmentFilterFunc(func(info *SegmentInfo) bool { return info.level == level - } + }) } func WithCompacted() SegmentFilter { - return func(info *SegmentInfo) bool { + return SegmentFilterFunc(func(info *SegmentInfo) bool { return info.compactTo != 0 - } + }) } func WithNoSyncingTask() SegmentFilter { - return func(info *SegmentInfo) bool { + return SegmentFilterFunc(func(info *SegmentInfo) bool { return info.syncingTasks == 0 - } + }) } type SegmentAction func(info *SegmentInfo) diff --git a/internal/datanode/metacache/actions_test.go b/internal/datanode/metacache/actions_test.go index 29cafd29c3..b6cce563a1 100644 --- a/internal/datanode/metacache/actions_test.go +++ b/internal/datanode/metacache/actions_test.go @@ -35,29 +35,29 @@ func (s *SegmentFilterSuite) TestFilters() { partitionID := int64(1001) filter := WithPartitionID(partitionID) info.partitionID = partitionID + 1 - s.False(filter(info)) + s.False(filter.Filter(info)) info.partitionID = partitionID - s.True(filter(info)) + s.True(filter.Filter(info)) segmentID := int64(10001) filter = WithSegmentIDs(segmentID) info.segmentID = segmentID + 1 - s.False(filter(info)) + s.False(filter.Filter(info)) info.segmentID = segmentID - s.True(filter(info)) + s.True(filter.Filter(info)) state := commonpb.SegmentState_Growing filter = WithSegmentState(state) info.state = commonpb.SegmentState_Flushed - s.False(filter(info)) + s.False(filter.Filter(info)) info.state = state - s.True(filter(info)) + s.True(filter.Filter(info)) filter = WithStartPosNotRecorded() info.startPosRecorded = true - s.False(filter(info)) + s.False(filter.Filter(info)) info.startPosRecorded = false - s.True(filter(info)) + s.True(filter.Filter(info)) } func TestFilters(t *testing.T) { diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index ea91ceacac..c0b0be0206 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -157,30 +157,23 @@ func (c *metaCacheImpl) RemoveSegments(filters ...SegmentFilter) []int64 { c.mu.Lock() defer c.mu.Unlock() - filter := c.mergeFilters(filters...) - - var ids []int64 - for segID, info := range c.segmentInfos { - if filter(info) { - ids = append(ids, segID) - delete(c.segmentInfos, segID) - } + var result []int64 + process := func(id int64, info *SegmentInfo) { + delete(c.segmentInfos, id) + result = append(result, id) } - return ids + c.rangeWithFilter(process, filters...) + return result } func (c *metaCacheImpl) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo { c.mu.RLock() defer c.mu.RUnlock() - filter := c.mergeFilters(filters...) - var segments []*SegmentInfo - for _, info := range c.segmentInfos { - if filter(info) { - segments = append(segments, info) - } - } + c.rangeWithFilter(func(_ int64, info *SegmentInfo) { + segments = append(segments, info) + }, filters...) return segments } @@ -193,8 +186,10 @@ func (c *metaCacheImpl) GetSegmentByID(id int64, filters ...SegmentFilter) (*Seg if !ok { return nil, false } - if !c.mergeFilters(filters...)(segment) { - return nil, false + for _, filter := range filters { + if !filter.Filter(segment) { + return nil, false + } } return segment, ok } @@ -208,16 +203,11 @@ func (c *metaCacheImpl) UpdateSegments(action SegmentAction, filters ...SegmentF c.mu.Lock() defer c.mu.Unlock() - filter := c.mergeFilters(filters...) - - for id, info := range c.segmentInfos { - if !filter(info) { - continue - } + c.rangeWithFilter(func(id int64, info *SegmentInfo) { nInfo := info.Clone() action(nInfo) c.segmentInfos[id] = nInfo - } + }, filters...) } func (c *metaCacheImpl) PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) { @@ -231,13 +221,40 @@ func (c *metaCacheImpl) PredictSegments(pk storage.PrimaryKey, filters ...Segmen return predicts, len(predicts) > 0 } -func (c *metaCacheImpl) mergeFilters(filters ...SegmentFilter) SegmentFilter { - return func(info *SegmentInfo) bool { - for _, filter := range filters { - if !filter(info) { +func (c *metaCacheImpl) rangeWithFilter(fn func(id int64, info *SegmentInfo), filters ...SegmentFilter) { + var hasIDs bool + set := typeutil.NewSet[int64]() + filtered := make([]SegmentFilter, 0, len(filters)) + for _, filter := range filters { + ids, ok := filter.SegmentIDs() + if ok { + set.Insert(ids...) + hasIDs = true + } else { + filtered = append(filtered, filter) + } + } + mergedFilter := func(info *SegmentInfo) bool { + for _, filter := range filtered { + if !filter.Filter(info) { return false } } return true } + + if hasIDs { + for id := range set { + info, has := c.segmentInfos[id] + if has && mergedFilter(info) { + fn(id, info) + } + } + } else { + for id, info := range c.segmentInfos { + if mergedFilter(info) { + fn(id, info) + } + } + } } diff --git a/internal/datanode/metacache/meta_cache_test.go b/internal/datanode/metacache/meta_cache_test.go index 48b06b1aeb..c3b1663761 100644 --- a/internal/datanode/metacache/meta_cache_test.go +++ b/internal/datanode/metacache/meta_cache_test.go @@ -193,17 +193,17 @@ func (s *MetaCacheSuite) TestPredictSegments() { err := info.GetBloomFilterSet().UpdatePKRange(pkFieldData) s.Require().NoError(err) - predict, ok = s.cache.PredictSegments(pk, func(s *SegmentInfo) bool { + predict, ok = s.cache.PredictSegments(pk, SegmentFilterFunc(func(s *SegmentInfo) bool { return s.segmentID == 1 - }) + })) s.False(ok) s.Empty(predict) predict, ok = s.cache.PredictSegments( storage.NewInt64PrimaryKey(5), - func(s *SegmentInfo) bool { + SegmentFilterFunc(func(s *SegmentInfo) bool { return s.segmentID == 1 - }) + })) s.True(ok) s.NotEmpty(predict) s.Equal(1, len(predict)) @@ -213,3 +213,70 @@ func (s *MetaCacheSuite) TestPredictSegments() { func TestMetaCacheSuite(t *testing.T) { suite.Run(t, new(MetaCacheSuite)) } + +func BenchmarkGetSegmentsBy(b *testing.B) { + paramtable.Init() + schema := &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, + {FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }}, + }, + } + flushSegmentInfos := lo.RepeatBy(10000, func(i int) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: int64(i), + State: commonpb.SegmentState_Flushed, + } + }) + cache := NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: schema, + Vchan: &datapb.VchannelInfo{ + FlushedSegments: flushSegmentInfos, + }, + }, func(*datapb.SegmentInfo) *BloomFilterSet { + return NewBloomFilterSet() + }) + b.ResetTimer() + for i := 0; i < b.N; i++ { + filter := WithSegmentIDs(0) + cache.GetSegmentsBy(filter) + } +} + +func BenchmarkGetSegmentsByWithoutIDs(b *testing.B) { + paramtable.Init() + schema := &schemapb.CollectionSchema{ + Name: "test_collection", + Fields: []*schemapb.FieldSchema{ + {FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, + {FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "128"}, + }}, + }, + } + flushSegmentInfos := lo.RepeatBy(10000, func(i int) *datapb.SegmentInfo { + return &datapb.SegmentInfo{ + ID: int64(i), + State: commonpb.SegmentState_Flushed, + } + }) + cache := NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: schema, + Vchan: &datapb.VchannelInfo{ + FlushedSegments: flushSegmentInfos, + }, + }, func(*datapb.SegmentInfo) *BloomFilterSet { + return NewBloomFilterSet() + }) + b.ResetTimer() + for i := 0; i < b.N; i++ { + // use old func filter + filter := SegmentFilterFunc(func(info *SegmentInfo) bool { + return info.segmentID == 0 + }) + cache.GetSegmentsBy(filter) + } +}