enhance: Add segment id short cut for WithSegmentID filter (#31144)

See also #31143

This PR add short cut for datanoe metacache `WithSegmentIDs` filter,
which could just fetch segment from map with provided segmentIDs. Also
add benchmark for new implementation vs old one.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/31167/head
congqixia 2024-03-11 10:55:02 +08:00 committed by GitHub
parent 8cb06acfed
commit ff1e967e89
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 170 additions and 60 deletions

View File

@ -533,7 +533,6 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
Schema: meta.GetSchema(), Schema: meta.GetSchema(),
}, dm) }, dm)
assert.Error(t, err) assert.Error(t, err)
t.Log(err)
}) })
t.Run("Merge with meta error", func(t *testing.T) { t.Run("Merge with meta error", func(t *testing.T) {

View File

@ -25,50 +25,77 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
) )
type SegmentFilter func(info *SegmentInfo) bool type SegmentFilter interface {
Filter(info *SegmentInfo) bool
SegmentIDs() ([]int64, bool)
}
func WithPartitionID(partitionID int64) SegmentFilter { type SegmentIDFilter struct {
return func(info *SegmentInfo) bool { segmentIDs []int64
return partitionID == common.InvalidPartitionID || info.partitionID == partitionID ids typeutil.Set[int64]
}
} }
func WithSegmentIDs(segmentIDs ...int64) SegmentFilter { func WithSegmentIDs(segmentIDs ...int64) SegmentFilter {
set := typeutil.NewSet[int64](segmentIDs...) set := typeutil.NewSet(segmentIDs...)
return func(info *SegmentInfo) bool { return &SegmentIDFilter{
return set.Contain(info.segmentID) segmentIDs: segmentIDs,
ids: set,
} }
} }
func (f *SegmentIDFilter) Filter(info *SegmentInfo) bool {
return f.ids.Contain(info.segmentID)
}
func (f *SegmentIDFilter) SegmentIDs() ([]int64, bool) {
return f.segmentIDs, true
}
type SegmentFilterFunc func(info *SegmentInfo) bool
func (f SegmentFilterFunc) Filter(info *SegmentInfo) bool {
return f(info)
}
func (f SegmentFilterFunc) SegmentIDs() ([]int64, bool) {
return nil, false
}
func WithPartitionID(partitionID int64) SegmentFilter {
return SegmentFilterFunc(func(info *SegmentInfo) bool {
return partitionID == common.InvalidPartitionID || info.partitionID == partitionID
})
}
func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter { func WithSegmentState(states ...commonpb.SegmentState) SegmentFilter {
set := typeutil.NewSet(states...) set := typeutil.NewSet(states...)
return func(info *SegmentInfo) bool { return SegmentFilterFunc(func(info *SegmentInfo) bool {
return set.Len() > 0 && set.Contain(info.state) return set.Len() > 0 && set.Contain(info.state)
} })
} }
func WithStartPosNotRecorded() SegmentFilter { func WithStartPosNotRecorded() SegmentFilter {
return func(info *SegmentInfo) bool { return SegmentFilterFunc(func(info *SegmentInfo) bool {
return !info.startPosRecorded return !info.startPosRecorded
} })
} }
func WithLevel(level datapb.SegmentLevel) SegmentFilter { func WithLevel(level datapb.SegmentLevel) SegmentFilter {
return func(info *SegmentInfo) bool { return SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.level == level return info.level == level
} })
} }
func WithCompacted() SegmentFilter { func WithCompacted() SegmentFilter {
return func(info *SegmentInfo) bool { return SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.compactTo != 0 return info.compactTo != 0
} })
} }
func WithNoSyncingTask() SegmentFilter { func WithNoSyncingTask() SegmentFilter {
return func(info *SegmentInfo) bool { return SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.syncingTasks == 0 return info.syncingTasks == 0
} })
} }
type SegmentAction func(info *SegmentInfo) type SegmentAction func(info *SegmentInfo)

View File

@ -35,29 +35,29 @@ func (s *SegmentFilterSuite) TestFilters() {
partitionID := int64(1001) partitionID := int64(1001)
filter := WithPartitionID(partitionID) filter := WithPartitionID(partitionID)
info.partitionID = partitionID + 1 info.partitionID = partitionID + 1
s.False(filter(info)) s.False(filter.Filter(info))
info.partitionID = partitionID info.partitionID = partitionID
s.True(filter(info)) s.True(filter.Filter(info))
segmentID := int64(10001) segmentID := int64(10001)
filter = WithSegmentIDs(segmentID) filter = WithSegmentIDs(segmentID)
info.segmentID = segmentID + 1 info.segmentID = segmentID + 1
s.False(filter(info)) s.False(filter.Filter(info))
info.segmentID = segmentID info.segmentID = segmentID
s.True(filter(info)) s.True(filter.Filter(info))
state := commonpb.SegmentState_Growing state := commonpb.SegmentState_Growing
filter = WithSegmentState(state) filter = WithSegmentState(state)
info.state = commonpb.SegmentState_Flushed info.state = commonpb.SegmentState_Flushed
s.False(filter(info)) s.False(filter.Filter(info))
info.state = state info.state = state
s.True(filter(info)) s.True(filter.Filter(info))
filter = WithStartPosNotRecorded() filter = WithStartPosNotRecorded()
info.startPosRecorded = true info.startPosRecorded = true
s.False(filter(info)) s.False(filter.Filter(info))
info.startPosRecorded = false info.startPosRecorded = false
s.True(filter(info)) s.True(filter.Filter(info))
} }
func TestFilters(t *testing.T) { func TestFilters(t *testing.T) {

View File

@ -157,30 +157,23 @@ func (c *metaCacheImpl) RemoveSegments(filters ...SegmentFilter) []int64 {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
filter := c.mergeFilters(filters...) var result []int64
process := func(id int64, info *SegmentInfo) {
var ids []int64 delete(c.segmentInfos, id)
for segID, info := range c.segmentInfos { result = append(result, id)
if filter(info) {
ids = append(ids, segID)
delete(c.segmentInfos, segID)
}
} }
return ids c.rangeWithFilter(process, filters...)
return result
} }
func (c *metaCacheImpl) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo { func (c *metaCacheImpl) GetSegmentsBy(filters ...SegmentFilter) []*SegmentInfo {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
filter := c.mergeFilters(filters...)
var segments []*SegmentInfo var segments []*SegmentInfo
for _, info := range c.segmentInfos { c.rangeWithFilter(func(_ int64, info *SegmentInfo) {
if filter(info) { segments = append(segments, info)
segments = append(segments, info) }, filters...)
}
}
return segments return segments
} }
@ -193,8 +186,10 @@ func (c *metaCacheImpl) GetSegmentByID(id int64, filters ...SegmentFilter) (*Seg
if !ok { if !ok {
return nil, false return nil, false
} }
if !c.mergeFilters(filters...)(segment) { for _, filter := range filters {
return nil, false if !filter.Filter(segment) {
return nil, false
}
} }
return segment, ok return segment, ok
} }
@ -208,16 +203,11 @@ func (c *metaCacheImpl) UpdateSegments(action SegmentAction, filters ...SegmentF
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
filter := c.mergeFilters(filters...) c.rangeWithFilter(func(id int64, info *SegmentInfo) {
for id, info := range c.segmentInfos {
if !filter(info) {
continue
}
nInfo := info.Clone() nInfo := info.Clone()
action(nInfo) action(nInfo)
c.segmentInfos[id] = nInfo c.segmentInfos[id] = nInfo
} }, filters...)
} }
func (c *metaCacheImpl) PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) { func (c *metaCacheImpl) PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) {
@ -231,13 +221,40 @@ func (c *metaCacheImpl) PredictSegments(pk storage.PrimaryKey, filters ...Segmen
return predicts, len(predicts) > 0 return predicts, len(predicts) > 0
} }
func (c *metaCacheImpl) mergeFilters(filters ...SegmentFilter) SegmentFilter { func (c *metaCacheImpl) rangeWithFilter(fn func(id int64, info *SegmentInfo), filters ...SegmentFilter) {
return func(info *SegmentInfo) bool { var hasIDs bool
for _, filter := range filters { set := typeutil.NewSet[int64]()
if !filter(info) { filtered := make([]SegmentFilter, 0, len(filters))
for _, filter := range filters {
ids, ok := filter.SegmentIDs()
if ok {
set.Insert(ids...)
hasIDs = true
} else {
filtered = append(filtered, filter)
}
}
mergedFilter := func(info *SegmentInfo) bool {
for _, filter := range filtered {
if !filter.Filter(info) {
return false return false
} }
} }
return true return true
} }
if hasIDs {
for id := range set {
info, has := c.segmentInfos[id]
if has && mergedFilter(info) {
fn(id, info)
}
}
} else {
for id, info := range c.segmentInfos {
if mergedFilter(info) {
fn(id, info)
}
}
}
} }

View File

@ -193,17 +193,17 @@ func (s *MetaCacheSuite) TestPredictSegments() {
err := info.GetBloomFilterSet().UpdatePKRange(pkFieldData) err := info.GetBloomFilterSet().UpdatePKRange(pkFieldData)
s.Require().NoError(err) s.Require().NoError(err)
predict, ok = s.cache.PredictSegments(pk, func(s *SegmentInfo) bool { predict, ok = s.cache.PredictSegments(pk, SegmentFilterFunc(func(s *SegmentInfo) bool {
return s.segmentID == 1 return s.segmentID == 1
}) }))
s.False(ok) s.False(ok)
s.Empty(predict) s.Empty(predict)
predict, ok = s.cache.PredictSegments( predict, ok = s.cache.PredictSegments(
storage.NewInt64PrimaryKey(5), storage.NewInt64PrimaryKey(5),
func(s *SegmentInfo) bool { SegmentFilterFunc(func(s *SegmentInfo) bool {
return s.segmentID == 1 return s.segmentID == 1
}) }))
s.True(ok) s.True(ok)
s.NotEmpty(predict) s.NotEmpty(predict)
s.Equal(1, len(predict)) s.Equal(1, len(predict))
@ -213,3 +213,70 @@ func (s *MetaCacheSuite) TestPredictSegments() {
func TestMetaCacheSuite(t *testing.T) { func TestMetaCacheSuite(t *testing.T) {
suite.Run(t, new(MetaCacheSuite)) suite.Run(t, new(MetaCacheSuite))
} }
func BenchmarkGetSegmentsBy(b *testing.B) {
paramtable.Init()
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
flushSegmentInfos := lo.RepeatBy(10000, func(i int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: int64(i),
State: commonpb.SegmentState_Flushed,
}
})
cache := NewMetaCache(&datapb.ChannelWatchInfo{
Schema: schema,
Vchan: &datapb.VchannelInfo{
FlushedSegments: flushSegmentInfos,
},
}, func(*datapb.SegmentInfo) *BloomFilterSet {
return NewBloomFilterSet()
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
filter := WithSegmentIDs(0)
cache.GetSegmentsBy(filter)
}
}
func BenchmarkGetSegmentsByWithoutIDs(b *testing.B) {
paramtable.Init()
schema := &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{FieldID: 100, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"},
{FieldID: 101, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "128"},
}},
},
}
flushSegmentInfos := lo.RepeatBy(10000, func(i int) *datapb.SegmentInfo {
return &datapb.SegmentInfo{
ID: int64(i),
State: commonpb.SegmentState_Flushed,
}
})
cache := NewMetaCache(&datapb.ChannelWatchInfo{
Schema: schema,
Vchan: &datapb.VchannelInfo{
FlushedSegments: flushSegmentInfos,
},
}, func(*datapb.SegmentInfo) *BloomFilterSet {
return NewBloomFilterSet()
})
b.ResetTimer()
for i := 0; i < b.N; i++ {
// use old func filter
filter := SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.segmentID == 0
})
cache.GetSegmentsBy(filter)
}
}