enhance: Improve datacoord segment filtering with collection (#32831)

See also #32165

This PR modify the `SelectSegments` interface to utilizing collection id
information when selecting segment with provided collection

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/32872/head
congqixia 2024-05-08 21:37:29 +08:00 committed by GitHub
parent 035a508722
commit cedb33ceec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 659 additions and 614 deletions

View File

@ -77,59 +77,6 @@ func (_c *MockBroker_DescribeCollectionInternal_Call) RunAndReturn(run func(cont
return _c
}
// GetDatabaseID provides a mock function with given fields: ctx, dbName
func (_m *MockBroker) GetDatabaseID(ctx context.Context, dbName string) (int64, error) {
ret := _m.Called(ctx, dbName)
var r0 int64
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, string) (int64, error)); ok {
return rf(ctx, dbName)
}
if rf, ok := ret.Get(0).(func(context.Context, string) int64); ok {
r0 = rf(ctx, dbName)
} else {
r0 = ret.Get(0).(int64)
}
if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
r1 = rf(ctx, dbName)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockBroker_GetDatabaseID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDatabaseID'
type MockBroker_GetDatabaseID_Call struct {
*mock.Call
}
// GetDatabaseID is a helper method to define mock.On call
// - ctx context.Context
// - dbName string
func (_e *MockBroker_Expecter) GetDatabaseID(ctx interface{}, dbName interface{}) *MockBroker_GetDatabaseID_Call {
return &MockBroker_GetDatabaseID_Call{Call: _e.mock.On("GetDatabaseID", ctx, dbName)}
}
func (_c *MockBroker_GetDatabaseID_Call) Run(run func(ctx context.Context, dbName string)) *MockBroker_GetDatabaseID_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(string))
})
return _c
}
func (_c *MockBroker_GetDatabaseID_Call) Return(_a0 int64, _a1 error) *MockBroker_GetDatabaseID_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockBroker_GetDatabaseID_Call) RunAndReturn(run func(context.Context, string) (int64, error)) *MockBroker_GetDatabaseID_Call {
_c.Call.Return(run)
return _c
}
// HasCollection provides a mock function with given fields: ctx, collectionID
func (_m *MockBroker) HasCollection(ctx context.Context, collectionID int64) (bool, error) {
ret := _m.Called(ctx, collectionID)

View File

@ -76,7 +76,7 @@ var (
)
type CompactionMeta interface {
SelectSegments(selector SegmentInfoSelector) []*SegmentInfo
SelectSegments(filters ...SegmentFilter) []*SegmentInfo
GetHealthySegment(segID UniqueID) *SegmentInfo
UpdateSegmentsInfo(operators ...UpdateOperator) error
SetSegmentCompacting(segmentID int64, compacting bool)
@ -322,16 +322,15 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
// Select sealed L1 segments for LevelZero compaction that meets the condition:
// dmlPos < triggerInfo.pos
// TODO: select L2 segments too
sealedSegments := c.meta.SelectSegments(func(info *SegmentInfo) bool {
return info.GetCollectionID() == task.triggerInfo.collectionID &&
(task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) &&
sealedSegments := c.meta.SelectSegments(WithCollection(task.triggerInfo.collectionID), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (task.triggerInfo.partitionID == -1 || info.GetPartitionID() == task.triggerInfo.partitionID) &&
info.GetInsertChannel() == plan.GetChannel() &&
isFlushState(info.GetState()) &&
!info.isCompacting &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetDmlPosition().GetTimestamp() < task.triggerInfo.pos.GetTimestamp()
})
}))
if len(sealedSegments) == 0 {
return errors.Errorf("Selected zero L1/L2 segments for the position=%v", task.triggerInfo.pos)
}

View File

@ -203,7 +203,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() {
func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
channel := "Ch-1"
deltalogs := []*datapb.FieldBinlog{getFieldBinlogIDs(101, 3)}
s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{
ID: 200,
@ -310,7 +310,7 @@ func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
Deltalogs: deltalogs,
}}
}).Times(2)
s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SelectSegments(mock.Anything, mock.Anything).Return(nil).Once()
// 2 l0 segments
plan := &datapb.CompactionPlan{

View File

@ -36,9 +36,11 @@ func (s *CompactionTriggerManagerSuite) SetupTest() {
PartitionID: 10,
Channel: "ch-1",
}
s.meta = &meta{segments: &SegmentsInfo{
segments: genSegmentsForMeta(s.testLabel),
}}
segments := genSegmentsForMeta(s.testLabel)
s.meta = &meta{segments: NewSegmentsInfo()}
for id, segment := range segments {
s.meta.segments.SetSegment(id, segment)
}
s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext)
}

View File

@ -80,9 +80,11 @@ func (s *CompactionViewManagerSuite) SetupTest() {
Channel: "ch-1",
}
meta := &meta{segments: &SegmentsInfo{
segments: genSegmentsForMeta(s.testLabel),
}}
segments := genSegmentsForMeta(s.testLabel)
meta := &meta{segments: NewSegmentsInfo()}
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}
s.m = NewCompactionViewManager(meta, s.mockTriggerManager, s.mockAlloc)
}

View File

@ -410,7 +410,7 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
log.Info("start clear dropped segments...")
defer func() { log.Info("clear dropped segments done", zap.Duration("timeCost", time.Since(start))) }()
all := gc.meta.SelectSegments(func(si *SegmentInfo) bool { return true })
all := gc.meta.SelectSegments()
drops := make(map[int64]*SegmentInfo, 0)
compactTo := make(map[int64]*SegmentInfo)
channels := typeutil.NewSet[string]()

View File

@ -453,28 +453,27 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
indexID = UniqueID(400)
segID = UniqueID(500)
)
return &meta{
segments := map[int64]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: nil,
},
}
meta := &meta{
RWMutex: sync.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: nil,
},
},
},
segments: NewSegmentsInfo(),
indexMeta: &indexMeta{
catalog: catalog,
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
@ -558,6 +557,10 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
channelCPs: nil,
chunkManager: nil,
}
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}
return meta
}
func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) {
@ -608,35 +611,34 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
segID = UniqueID(500)
buildID = UniqueID(600)
)
return &meta{
segments := map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
},
}
meta := &meta{
RWMutex: sync.RWMutex{},
ctx: ctx,
catalog: catalog,
collections: nil,
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Flushed,
},
},
},
},
segments: NewSegmentsInfo(),
indexMeta: &indexMeta{
catalog: catalog,
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
@ -734,6 +736,12 @@ func createMetaTableForRecycleUnusedIndexFiles(catalog *datacoord.Catalog) *meta
},
},
}
for id, segment := range segments {
meta.segments.SetSegment(id, segment)
}
return meta
}
func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) {
@ -847,195 +855,192 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
Timestamp: 1000,
}
m := &meta{
catalog: catalog,
channelCPs: channelCPs,
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 5000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 0,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Binlogs: []*datapb.FieldBinlog{
segments := map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 5000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 0,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "log2",
LogSize: 1024,
},
},
LogPath: "log1",
LogSize: 1024,
},
},
Deltalogs: []*datapb.FieldBinlog{
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log2",
LogSize: 1024,
},
},
},
},
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "stats_log1",
LogSize: 1024,
},
},
LogPath: "log2",
LogSize: 1024,
},
},
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 5000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 0,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
Deltalogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log1",
LogSize: 1024,
},
},
},
{
FieldID: 2,
Binlogs: []*datapb.Binlog{
{
LogPath: "del_log2",
LogSize: 1024,
},
},
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: "stats_log1",
LogSize: 1024,
},
},
CompactionFrom: []int64{segID, segID + 1},
},
},
segID + 3: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 3,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: nil,
},
},
segID + 4: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 4,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 12000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: []int64{segID + 2, segID + 3},
},
},
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 5,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1200,
},
},
},
segID + 6: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 6,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: uint64(time.Now().Add(time.Hour).UnixNano()),
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Compacted: true,
},
},
// compacted and child is GCed, dml pos is big than channel cp
segID + 7: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 7,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1200,
},
Compacted: true,
},
},
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 5000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 0,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: []int64{segID, segID + 1},
},
},
segID + 3: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 3,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: nil,
},
},
segID + 4: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 4,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 12000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
DroppedAt: 10,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
CompactionFrom: []int64{segID + 2, segID + 3},
},
},
segID + 5: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 5,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1200,
},
},
},
segID + 6: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 6,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: uint64(time.Now().Add(time.Hour).UnixNano()),
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 900,
},
Compacted: true,
},
},
// compacted and child is GCed, dml pos is big than channel cp
segID + 7: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 7,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "dmlChannel",
NumOfRows: 2000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65535,
DroppedAt: 0,
CompactionFrom: nil,
DmlPosition: &msgpb.MsgPosition{
Timestamp: 1200,
},
Compacted: true,
},
},
}
m := &meta{
catalog: catalog,
channelCPs: channelCPs,
segments: NewSegmentsInfo(),
indexMeta: &indexMeta{
catalog: catalog,
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
@ -1162,6 +1167,10 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
},
}
for id, segment := range segments {
m.segments.SetSegment(id, segment)
}
for segID, segment := range map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{

View File

@ -57,9 +57,9 @@ func newServerHandler(s *Server) *ServerHandler {
// GetDataVChanPositions gets vchannel latest positions with provided dml channel names for DataNode.
func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID UniqueID) *datapb.VchannelInfo {
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
segments := h.s.meta.SelectSegments(SegmentFilterFunc(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.GetName() && !s.GetIsFake()
})
}))
log.Info("GetDataVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
@ -105,9 +105,9 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni
// the unflushed segments are actually the segments without index, even they are flushed.
func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo {
// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
segments := h.s.meta.SelectSegments(SegmentFilterFunc(func(s *SegmentInfo) bool {
return s.InsertChannel == channel.GetName() && !s.GetIsFake()
})
}))
segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexed := make(typeutil.UniqueSet)
@ -223,9 +223,7 @@ func (h *ServerHandler) getEarliestSegmentDMLPos(channel string, partitionIDs ..
var minPos *msgpb.MsgPosition
var minPosSegID int64
var minPosTs uint64
segments := h.s.meta.SelectSegments(func(s *SegmentInfo) bool {
return s.InsertChannel == channel
})
segments := h.s.meta.SelectSegments(WithChannel(channel))
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
partitionSet := typeutil.NewUniqueSet(validPartitions...)

View File

@ -93,9 +93,9 @@ func (s *Server) createIndexesForSegment(segment *SegmentInfo) error {
}
func (s *Server) getUnIndexTaskSegments() []*SegmentInfo {
flushedSegments := s.meta.SelectSegments(func(seg *SegmentInfo) bool {
flushedSegments := s.meta.SelectSegments(SegmentFilterFunc(func(seg *SegmentInfo) bool {
return isFlush(seg)
})
}))
unindexedSegments := make([]*SegmentInfo, 0)
for _, segment := range flushedSegments {
@ -127,9 +127,9 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) {
}
case collectionID := <-s.notifyIndexChan:
log.Info("receive create index notify", zap.Int64("collectionID", collectionID))
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && collectionID == info.CollectionID
})
segments := s.meta.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(info *SegmentInfo) bool {
return isFlush(info)
}))
for _, segment := range segments {
if err := s.createIndexesForSegment(segment); err != nil {
log.Warn("create index for segment fail, wait for retry", zap.Int64("segmentID", segment.ID))
@ -396,9 +396,9 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
indexInfo := &indexpb.IndexInfo{}
// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
}))
s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
ret.State = indexInfo.State
@ -448,10 +448,10 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
return ret, nil
}
func (s *Server) selectSegmentIndexesStats(selector SegmentInfoSelector) map[int64]*indexStats {
func (s *Server) selectSegmentIndexesStats(filters ...SegmentFilter) map[int64]*indexStats {
ret := make(map[int64]*indexStats)
segments := s.meta.SelectSegments(selector)
segments := s.meta.SelectSegments(filters...)
segmentIDs := lo.Map(segments, func(info *SegmentInfo, i int) int64 {
return info.GetID()
})
@ -647,9 +647,9 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}
// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
}))
s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()),
@ -700,9 +700,9 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
}
// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
}))
indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
@ -759,9 +759,9 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
}
// The total rows of all indexes should be based on the current perspective
segments := s.selectSegmentIndexesStats(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
segments := s.selectSegmentIndexesStats(WithCollection(req.GetCollectionID()), SegmentFilterFunc(func(info *SegmentInfo) bool {
return (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
}))
indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {

View File

@ -693,6 +693,28 @@ func TestServer_GetIndexState(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_IndexNotExist, resp.GetStatus().GetErrorCode())
})
segments := map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
}
s.meta = &meta{
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexMeta: &indexMeta{
@ -717,31 +739,10 @@ func TestServer_GetIndexState(t *testing.T) {
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
},
},
segments: NewSegmentsInfo(),
}
for id, segment := range segments {
s.meta.segments.SetSegment(id, segment)
}
t.Run("index state is unissued", func(t *testing.T) {
@ -751,6 +752,28 @@ func TestServer_GetIndexState(t *testing.T) {
assert.Equal(t, commonpb.IndexState_InProgress, resp.GetState())
})
segments = map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
}
s.meta = &meta{
catalog: &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)},
indexMeta: &indexMeta{
@ -794,31 +817,10 @@ func TestServer_GetIndexState(t *testing.T) {
},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS - 1,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS - 1,
},
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
},
},
},
segments: NewSegmentsInfo(),
}
for id, segment := range segments {
s.meta.segments.SetSegment(id, segment)
}
t.Run("index state is none", func(t *testing.T) {
@ -935,14 +937,14 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
IndexSize: 1025,
WriteHandoff: false,
})
s.meta.segments.segments[segID] = &SegmentInfo{
s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: nil,
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
}
})
resp, err := s.GetSegmentIndexState(ctx, req)
assert.NoError(t, err)
@ -1040,30 +1042,27 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
UserIndexParams: nil,
},
}
s.meta.segments = &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
currRows: 10250,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
s.meta.segments = NewSegmentsInfo()
s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
}
currRows: 10250,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
})
resp, err := s.GetIndexBuildProgress(ctx, req)
assert.NoError(t, err)
@ -1090,30 +1089,27 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
IndexSize: 0,
WriteHandoff: false,
})
s.meta.segments = &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
currRows: 10250,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
s.meta.segments = NewSegmentsInfo()
s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 10250,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
}
currRows: 10250,
allocations: nil,
lastFlushTime: time.Time{},
isCompacting: false,
lastWrittenTime: time.Time{},
})
resp, err := s.GetIndexBuildProgress(ctx, req)
assert.NoError(t, err)
@ -1194,6 +1190,53 @@ func TestServer_DescribeIndex(t *testing.T) {
mock.Anything,
).Return(nil)
segments := map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
}
s := &Server{
meta: &meta{
catalog: catalog,
@ -1441,60 +1484,14 @@ func TestServer_DescribeIndex(t *testing.T) {
},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: invalidSegID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
CreatedByCompaction: true,
CompactionFrom: []int64{segID - 1},
},
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Dropped,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
},
},
segments: NewSegmentsInfo(),
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
}
for id, segment := range segments {
s.meta.segments.SetSegment(id, segment)
}
t.Run("server not available", func(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing)
@ -1650,10 +1647,7 @@ func TestServer_ListIndexes(t *testing.T) {
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{},
},
segments: NewSegmentsInfo(),
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
@ -1713,6 +1707,37 @@ func TestServer_GetIndexStatistics(t *testing.T) {
mock.Anything,
).Return(nil)
segments := map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
}
s := &Server{
meta: &meta{
catalog: catalog,
@ -1897,44 +1922,14 @@ func TestServer_GetIndexStatistics(t *testing.T) {
},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
invalidSegID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
// timesamp > index start time, will be filtered out
Timestamp: createTS + 1,
},
},
},
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
StartPosition: &msgpb.MsgPosition{
Timestamp: createTS,
},
},
},
},
},
segments: NewSegmentsInfo(),
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
}
for id, segment := range segments {
s.meta.segments.SetSegment(id, segment)
}
t.Run("server not available", func(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing)
@ -2084,27 +2079,24 @@ func TestServer_DropIndex(t *testing.T) {
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
},
},
},
},
segments: NewSegmentsInfo(),
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
}
s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
},
})
t.Run("server not available", func(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.DropIndex(ctx, req)
@ -2250,27 +2242,23 @@ func TestServer_GetIndexInfos(t *testing.T) {
},
},
segments: &SegmentsInfo{
compactionTo: make(map[int64]int64),
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
},
},
},
},
segments: NewSegmentsInfo(),
chunkManager: cli,
},
allocator: newMockAllocator(),
notifyIndexChan: make(chan UniqueID, 1),
}
s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,
State: commonpb.SegmentState_Flushed,
MaxRowNum: 65536,
LastExpireTime: createTS,
},
})
t.Run("server not available", func(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing)
@ -2289,41 +2277,40 @@ func TestServer_GetIndexInfos(t *testing.T) {
}
func TestMeta_GetHasUnindexTaskSegments(t *testing.T) {
m := &meta{
segments: &SegmentsInfo{
segments: map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Growing,
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Dropped,
},
},
segments := map[UniqueID]*SegmentInfo{
segID: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Flushed,
},
},
segID + 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Growing,
},
},
segID + 2: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 2,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1025,
State: commonpb.SegmentState_Dropped,
},
},
}
m := &meta{
segments: NewSegmentsInfo(),
indexMeta: &indexMeta{
buildID2SegmentIndex: make(map[UniqueID]*model.SegmentIndex),
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
@ -2359,6 +2346,9 @@ func TestMeta_GetHasUnindexTaskSegments(t *testing.T) {
},
},
}
for id, segment := range segments {
m.segments.SetSegment(id, segment)
}
s := &Server{meta: m}
t.Run("normal", func(t *testing.T) {

View File

@ -962,23 +962,17 @@ func (m *meta) batchSaveDropSegments(channel string, modSegments map[int64]*Segm
// GetSegmentsByChannel returns all segment info which insert channel equals provided `dmlCh`
func (m *meta) GetSegmentsByChannel(channel string) []*SegmentInfo {
return m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.InsertChannel == channel
})
return m.SelectSegments(SegmentFilterFunc(isSegmentHealthy), WithChannel(channel))
}
// GetSegmentsOfCollection get all segments of collection
func (m *meta) GetSegmentsOfCollection(collectionID UniqueID) []*SegmentInfo {
return m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.GetCollectionID() == collectionID
})
return m.SelectSegments(SegmentFilterFunc(isSegmentHealthy), WithCollection(collectionID))
}
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID`
func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID {
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) && segment.CollectionID == collectionID
})
segments := m.SelectSegments(SegmentFilterFunc(isSegmentHealthy), WithCollection(collectionID))
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
@ -987,12 +981,11 @@ func (m *meta) GetSegmentsIDOfCollection(collectionID UniqueID) []UniqueID {
// GetSegmentsIDOfCollection returns all segment ids which collection equals to provided `collectionID`
func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []UniqueID {
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment != nil &&
segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
segment.GetState() != commonpb.SegmentState_NotExist &&
segment.CollectionID == collectionID
})
segment.GetState() != commonpb.SegmentState_NotExist
}))
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
@ -1001,11 +994,10 @@ func (m *meta) GetSegmentsIDOfCollectionWithDropped(collectionID UniqueID) []Uni
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []UniqueID {
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
segment.CollectionID == collectionID &&
segment.PartitionID == partitionID
})
}))
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
@ -1014,12 +1006,11 @@ func (m *meta) GetSegmentsIDOfPartition(collectionID, partitionID UniqueID) []Un
// GetSegmentsIDOfPartition returns all segments ids which collection & partition equals to provided `collectionID`, `partitionID`
func (m *meta) GetSegmentsIDOfPartitionWithDropped(collectionID, partitionID UniqueID) []UniqueID {
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment.GetState() != commonpb.SegmentState_SegmentStateNone &&
segment.GetState() != commonpb.SegmentState_NotExist &&
segment.CollectionID == collectionID &&
segment.PartitionID == partitionID
})
}))
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 {
return segment.ID
@ -1031,34 +1022,34 @@ func (m *meta) GetNumRowsOfPartition(collectionID UniqueID, partitionID UniqueID
m.RLock()
defer m.RUnlock()
var ret int64
segments := m.segments.GetSegments()
segments := m.SelectSegments(WithCollection(collectionID), SegmentFilterFunc(func(si *SegmentInfo) bool {
return isSegmentHealthy(si) && si.GetPartitionID() == partitionID
}))
for _, segment := range segments {
if isSegmentHealthy(segment) && segment.CollectionID == collectionID && segment.PartitionID == partitionID {
ret += segment.NumOfRows
}
ret += segment.NumOfRows
}
return ret
}
// GetUnFlushedSegments get all segments which state is not `Flushing` nor `Flushed`
func (m *meta) GetUnFlushedSegments() []*SegmentInfo {
return m.SelectSegments(func(segment *SegmentInfo) bool {
return m.SelectSegments(SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing || segment.GetState() == commonpb.SegmentState_Sealed
})
}))
}
// GetFlushingSegments get all segments which state is `Flushing`
func (m *meta) GetFlushingSegments() []*SegmentInfo {
return m.SelectSegments(func(segment *SegmentInfo) bool {
return m.SelectSegments(SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushing
})
}))
}
// SelectSegments select segments with selector
func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
func (m *meta) SelectSegments(filters ...SegmentFilter) []*SegmentInfo {
m.RLock()
defer m.RUnlock()
return m.segments.GetSegmentsBySelector(selector)
return m.segments.GetSegmentsBySelector(filters...)
}
// AddAllocation add allocation in segment
@ -1406,12 +1397,12 @@ func (m *meta) GcConfirm(ctx context.Context, collectionID, partitionID UniqueID
}
func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo {
allSegs := m.SelectSegments(func(segment *SegmentInfo) bool {
allSegs := m.SelectSegments(SegmentFilterFunc(func(segment *SegmentInfo) bool {
return isSegmentHealthy(segment) &&
isFlush(segment) && // sealed segment
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() // not importing now
})
}))
ret := make(map[int64][]*SegmentInfo)
for _, seg := range allSegs {
@ -1426,12 +1417,11 @@ func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo
}
func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupLabel) *msgpb.MsgPosition {
segments := m.SelectSegments(func(segment *SegmentInfo) bool {
segments := m.SelectSegments(WithCollection(label.CollectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Growing &&
segment.GetCollectionID() == label.CollectionID &&
segment.GetPartitionID() == label.PartitionID &&
segment.GetInsertChannel() == label.Channel
})
}))
earliest := &msgpb.MsgPosition{Timestamp: math.MaxUint64}
for _, seg := range segments {

View File

@ -128,13 +128,19 @@ func (_c *MockCompactionMeta_GetHealthySegment_Call) RunAndReturn(run func(int64
return _c
}
// SelectSegments provides a mock function with given fields: selector
func (_m *MockCompactionMeta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
ret := _m.Called(selector)
// SelectSegments provides a mock function with given fields: filters
func (_m *MockCompactionMeta) SelectSegments(filters ...SegmentFilter) []*SegmentInfo {
_va := make([]interface{}, len(filters))
for _i := range filters {
_va[_i] = filters[_i]
}
var _ca []interface{}
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 []*SegmentInfo
if rf, ok := ret.Get(0).(func(SegmentInfoSelector) []*SegmentInfo); ok {
r0 = rf(selector)
if rf, ok := ret.Get(0).(func(...SegmentFilter) []*SegmentInfo); ok {
r0 = rf(filters...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]*SegmentInfo)
@ -150,14 +156,21 @@ type MockCompactionMeta_SelectSegments_Call struct {
}
// SelectSegments is a helper method to define mock.On call
// - selector SegmentInfoSelector
func (_e *MockCompactionMeta_Expecter) SelectSegments(selector interface{}) *MockCompactionMeta_SelectSegments_Call {
return &MockCompactionMeta_SelectSegments_Call{Call: _e.mock.On("SelectSegments", selector)}
// - filters ...SegmentFilter
func (_e *MockCompactionMeta_Expecter) SelectSegments(filters ...interface{}) *MockCompactionMeta_SelectSegments_Call {
return &MockCompactionMeta_SelectSegments_Call{Call: _e.mock.On("SelectSegments",
append([]interface{}{}, filters...)...)}
}
func (_c *MockCompactionMeta_SelectSegments_Call) Run(run func(selector SegmentInfoSelector)) *MockCompactionMeta_SelectSegments_Call {
func (_c *MockCompactionMeta_SelectSegments_Call) Run(run func(filters ...SegmentFilter)) *MockCompactionMeta_SelectSegments_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(SegmentInfoSelector))
variadicArgs := make([]SegmentFilter, len(args)-0)
for i, a := range args[0:] {
if a != nil {
variadicArgs[i] = a.(SegmentFilter)
}
}
run(variadicArgs...)
})
return _c
}
@ -167,7 +180,7 @@ func (_c *MockCompactionMeta_SelectSegments_Call) Return(_a0 []*SegmentInfo) *Mo
return _c
}
func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(SegmentInfoSelector) []*SegmentInfo) *MockCompactionMeta_SelectSegments_Call {
func (_c *MockCompactionMeta_SelectSegments_Call) RunAndReturn(run func(...SegmentFilter) []*SegmentInfo) *MockCompactionMeta_SelectSegments_Call {
_c.Call.Return(run)
return _c
}

View File

@ -20,6 +20,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
@ -32,6 +33,7 @@ import (
// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
type SegmentsInfo struct {
segments map[UniqueID]*SegmentInfo
collSegments map[UniqueID]*CollectionSegments
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
// A segment can be compacted to only one segment finally in meta.
}
@ -68,10 +70,15 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
func NewSegmentsInfo() *SegmentsInfo {
return &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
collSegments: make(map[UniqueID]*CollectionSegments),
compactionTo: make(map[UniqueID]UniqueID),
}
}
type CollectionSegments struct {
segments map[int64]*SegmentInfo
}
// GetSegment returns SegmentInfo
// the logPath in meta is empty
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
@ -86,21 +93,33 @@ func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
// no deep copy applied
// the logPath in meta is empty
func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
segments := make([]*SegmentInfo, 0, len(s.segments))
for _, segment := range s.segments {
segments = append(segments, segment)
}
return segments
return lo.Values(s.segments)
}
func (s *SegmentsInfo) GetSegmentsBySelector(selector SegmentInfoSelector) []*SegmentInfo {
var segments []*SegmentInfo
for _, segment := range s.segments {
if selector(segment) {
segments = append(segments, segment)
func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo {
criterion := &segmentCriterion{}
for _, filter := range filters {
filter.AddFilter(criterion)
}
var result []*SegmentInfo
var candidates []*SegmentInfo
// apply criterion
switch {
case criterion.collectionID > 0:
collSegments, ok := s.collSegments[criterion.collectionID]
if !ok {
return nil
}
candidates = lo.Values(collSegments.segments)
default:
candidates = lo.Values(s.segments)
}
for _, segment := range candidates {
if criterion.Match(segment) {
result = append(result, segment)
}
}
return segments
return result
}
// GetCompactionTo returns the segment that the provided segment is compacted to.
@ -125,6 +144,7 @@ func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool)
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
if segment, ok := s.segments[segmentID]; ok {
s.deleteCompactTo(segment)
s.delCollection(segment)
delete(s.segments, segmentID)
}
}
@ -136,8 +156,10 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
if segment, ok := s.segments[segmentID]; ok {
// Remove old segment compact to relation first.
s.deleteCompactTo(segment)
s.delCollection(segment)
}
s.segments[segmentID] = segment
s.addCollection(segment)
s.addCompactTo(segment)
}
@ -274,6 +296,30 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
return cloned
}
func (s *SegmentsInfo) addCollection(segment *SegmentInfo) {
collID := segment.GetCollectionID()
collSegment, ok := s.collSegments[collID]
if !ok {
collSegment = &CollectionSegments{
segments: make(map[UniqueID]*SegmentInfo),
}
s.collSegments[collID] = collSegment
}
collSegment.segments[segment.GetID()] = segment
}
func (s *SegmentsInfo) delCollection(segment *SegmentInfo) {
collID := segment.GetCollectionID()
collSegment, ok := s.collSegments[collID]
if !ok {
return
}
delete(collSegment.segments, segment.GetID())
if len(collSegment.segments) == 0 {
delete(s.collSegments, segment.GetCollectionID())
}
}
// addCompactTo adds the compact relation to the segment
func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) {
for _, from := range segment.GetCompactionFrom() {

View File

@ -28,3 +28,52 @@ func SetMaxRowCount(maxRow int64) SegmentOperator {
return true
}
}
type segmentCriterion struct {
collectionID int64
others []SegmentFilter
}
func (sc *segmentCriterion) Match(segment *SegmentInfo) bool {
for _, filter := range sc.others {
if !filter.Match(segment) {
return false
}
}
return true
}
type SegmentFilter interface {
Match(segment *SegmentInfo) bool
AddFilter(*segmentCriterion)
}
type CollectionFilter int64
func (f CollectionFilter) Match(segment *SegmentInfo) bool {
return segment.GetCollectionID() == int64(f)
}
func (f CollectionFilter) AddFilter(criterion *segmentCriterion) {
criterion.collectionID = int64(f)
}
func WithCollection(collectionID int64) SegmentFilter {
return CollectionFilter(collectionID)
}
type SegmentFilterFunc func(*SegmentInfo) bool
func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool {
return f(segment)
}
func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) {
criterion.others = append(criterion.others, f)
}
func WithChannel(channel string) SegmentFilter {
return SegmentFilterFunc(func(si *SegmentInfo) bool {
return si.GetInsertChannel() == channel
})
}