enhance: Optimize DescribeIndex to reduce lock contention (#30939)

issue: #29313 
issue: #30443

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/30989/head
cai.zhang 2024-03-03 19:00:59 +08:00 committed by GitHub
parent e39f46aa38
commit f6ff2588cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 97 additions and 43 deletions

View File

@ -365,9 +365,12 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
}
indexInfo := &indexpb.IndexInfo{}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
ret.State = indexInfo.State
ret.FailReason = indexInfo.IndexStateFailReason
@ -415,35 +418,38 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
return ret, nil
}
func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*SegmentInfo) int64 {
func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments map[int64]*indexStats) int64 {
unIndexed, indexed := typeutil.NewSet[int64](), typeutil.NewSet[int64]()
for _, seg := range segments {
segIdx, ok := seg.segmentIndexes[indexInfo.IndexID]
if !ok {
unIndexed.Insert(seg.GetID())
for segID, seg := range segments {
if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing {
continue
}
switch segIdx.IndexState {
segIdx, ok := seg.indexStates[indexInfo.IndexID]
if !ok {
unIndexed.Insert(segID)
continue
}
switch segIdx.GetState() {
case commonpb.IndexState_Finished:
indexed.Insert(seg.GetID())
indexed.Insert(segID)
default:
unIndexed.Insert(seg.GetID())
unIndexed.Insert(segID)
}
}
retrieveContinue := len(unIndexed) != 0
for retrieveContinue {
for segID := range unIndexed {
unIndexed.Remove(segID)
segment := s.meta.GetSegment(segID)
if segment == nil || len(segment.CompactionFrom) == 0 {
segment := segments[segID]
if segment == nil || len(segment.compactionFrom) == 0 {
continue
}
for _, fromID := range segment.CompactionFrom {
fromSeg := s.meta.GetSegment(fromID)
for _, fromID := range segment.compactionFrom {
fromSeg := segments[fromID]
if fromSeg == nil {
continue
}
if segIndex, ok := fromSeg.segmentIndexes[indexInfo.IndexID]; ok && segIndex.IndexState == commonpb.IndexState_Finished {
if segIndex, ok := fromSeg.indexStates[indexInfo.IndexID]; ok && segIndex.GetState() == commonpb.IndexState_Finished {
indexed.Insert(fromID)
continue
}
@ -454,9 +460,9 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm
}
indexedRows := int64(0)
for segID := range indexed {
segment := s.meta.GetSegment(segID)
segment := segments[segID]
if segment != nil {
indexedRows += segment.GetNumOfRows()
indexedRows += segment.numRows
}
}
return indexedRows
@ -465,7 +471,7 @@ func (s *Server) countIndexedRows(indexInfo *indexpb.IndexInfo, segments []*Segm
// completeIndexInfo get the index row count and index task state
// if realTime, calculate current statistics
// if not realTime, which means get info of the prior `CreateIndex` action, skip segments created after index's create time
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments []*SegmentInfo, realTime bool, ts Timestamp) {
func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.Index, segments map[int64]*indexStats, realTime bool, ts Timestamp) {
var (
cntNone = 0
cntUnissued = 0
@ -478,31 +484,34 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
pendingIndexRows = int64(0)
)
for _, seg := range segments {
totalRows += seg.NumOfRows
segIdx, ok := seg.segmentIndexes[index.IndexID]
if !ok {
if seg.GetLastExpireTime() <= ts {
cntUnissued++
}
pendingIndexRows += seg.GetNumOfRows()
for segID, seg := range segments {
if seg.state != commonpb.SegmentState_Flushed && seg.state != commonpb.SegmentState_Flushing {
continue
}
if segIdx.IndexState != commonpb.IndexState_Finished {
pendingIndexRows += seg.GetNumOfRows()
totalRows += seg.numRows
segIdx, ok := seg.indexStates[index.IndexID]
if !ok {
if seg.lastExpireTime <= ts {
cntUnissued++
}
pendingIndexRows += seg.numRows
continue
}
if segIdx.GetState() != commonpb.IndexState_Finished {
pendingIndexRows += seg.numRows
}
// if realTime, calculate current statistics
// if not realTime, skip segments created after index create
if !realTime && seg.GetLastExpireTime() > ts {
if !realTime && seg.lastExpireTime > ts {
continue
}
switch segIdx.IndexState {
switch segIdx.GetState() {
case commonpb.IndexState_IndexStateNone:
// can't to here
log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segIdx.SegmentID))
log.Warn("receive unexpected index state: IndexStateNone", zap.Int64("segmentID", segID))
cntNone++
case commonpb.IndexState_Unissued:
cntUnissued++
@ -510,10 +519,10 @@ func (s *Server) completeIndexInfo(indexInfo *indexpb.IndexInfo, index *model.In
cntInProgress++
case commonpb.IndexState_Finished:
cntFinished++
indexedRows += seg.NumOfRows
indexedRows += seg.numRows
case commonpb.IndexState_Failed:
cntFailed++
failReason += fmt.Sprintf("%d: %s;", segIdx.SegmentID, segIdx.FailReason)
failReason += fmt.Sprintf("%d: %s;", segID, segIdx.FailReason)
}
}
@ -581,9 +590,13 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
PendingIndexRows: 0,
State: 0,
}
s.completeIndexInfo(indexInfo, indexes[0], s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
}), false, indexes[0].CreateTime)
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
s.completeIndexInfo(indexInfo, indexes[0], segments, false, indexes[0].CreateTime)
log.Info("GetIndexBuildProgress success", zap.Int64("collectionID", req.GetCollectionID()),
zap.String("indexName", req.GetIndexName()))
return &indexpb.GetIndexBuildProgressResponse{
@ -594,6 +607,17 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
}, nil
}
// indexStats just for indexing statistics.
// Please use it judiciously.
type indexStats struct {
ID int64
numRows int64
compactionFrom []int64
indexStates map[int64]*indexpb.SegmentIndexState
state commonpb.SegmentState
lastExpireTime uint64
}
// DescribeIndex describe the index info of the collection.
func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log := log.Ctx(ctx).With(
@ -621,9 +645,10 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
}
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
indexInfo := &indexpb.IndexInfo{
@ -679,9 +704,10 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
}
// The total rows of all indexes should be based on the current perspective
segments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
return isFlush(info) && info.CollectionID == req.GetCollectionID()
segments := s.meta.SelectSegmentIndexes(func(info *SegmentInfo) bool {
return info.GetCollectionID() == req.GetCollectionID() && (isFlush(info) || info.GetState() == commonpb.SegmentState_Dropped)
})
indexInfos := make([]*indexpb.IndexInfo, 0)
for _, index := range indexes {
indexInfo := &indexpb.IndexInfo{

View File

@ -1371,7 +1371,7 @@ func TestServer_DescribeIndex(t *testing.T) {
},
segID - 1: {
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
ID: segID - 1,
CollectionID: collID,
PartitionID: partID,
NumOfRows: 10000,

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/common"
@ -1050,6 +1051,33 @@ func (m *meta) SelectSegments(selector SegmentInfoSelector) []*SegmentInfo {
return ret
}
func (m *meta) SelectSegmentIndexes(selector SegmentInfoSelector) map[int64]*indexStats {
m.RLock()
defer m.RUnlock()
ret := make(map[int64]*indexStats)
for _, info := range m.segments.segments {
if selector(info) {
s := &indexStats{
ID: info.GetID(),
numRows: info.GetNumOfRows(),
compactionFrom: info.GetCompactionFrom(),
indexStates: make(map[int64]*indexpb.SegmentIndexState),
state: info.GetState(),
lastExpireTime: info.GetLastExpireTime(),
}
for indexID, segIndex := range info.segmentIndexes {
s.indexStates[indexID] = &indexpb.SegmentIndexState{
SegmentID: segIndex.SegmentID,
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
}
}
ret[info.GetID()] = s
}
}
return ret
}
// AddAllocation add allocation in segment
func (m *meta) AddAllocation(segmentID UniqueID, allocation *Allocation) error {
log.Debug("meta update: add allocation",