enhance: Refine code for GetRecoveryInfo (#34973)

issue: #34495

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/35050/head
cai.zhang 2024-07-29 00:01:46 +08:00 committed by GitHub
parent 2372452fac
commit 497afcb897
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 103 additions and 94 deletions

View File

@ -119,117 +119,126 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
levelZeroIDs = make(typeutil.UniqueSet)
)
for _, partitionID := range validPartitions {
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())
segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
}
unIndexedIDs := make(typeutil.UniqueSet)
for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
if s.GetIsImporting() {
// Skip bulk insert segments.
continue
}
log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion),
)
unIndexedIDs := make(typeutil.UniqueSet)
for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
if s.GetIsImporting() {
// Skip bulk insert segments.
continue
}
if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}
segmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
indexedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()):
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), s.GetPartitionID(), channel.GetName())
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}
segmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
indexedIDs.Insert(s.GetID())
case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed
indexedIDs.Insert(s.GetID())
default:
unIndexedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()):
indexedIDs.Insert(s.GetID())
case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed
indexedIDs.Insert(s.GetID())
default:
unIndexedIDs.Insert(s.GetID())
}
}
// ================================================
// Segments blood relationship:
// a b
// \ /
// c d
// \ /
// e
//
// GC: a, b
// Indexed: c, d, e
// ||
// || (Index dropped and creating new index and not finished)
// \/
// UnIndexed: c, d, e
//
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := segmentInfos[id]; !ok || seg == nil {
return false
}
// ================================================
// Segments blood relationship:
// a b
// \ /
// c d
// \ /
// e
//
// GC: a, b
// Indexed: c, d, e
// ||
// || (Index dropped and creating new index and not finished)
// \/
// UnIndexed: c, d, e
//
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := segmentInfos[id]; !ok || seg == nil {
return false
}
return true
}
retrieveUnIndexed := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
compactionFrom := segmentInfos[id].GetCompactionFrom()
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
for _, fromID := range compactionFrom {
if indexed.Contain(fromID) {
indexedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
return true
}
retrieveUnIndexed := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
compactionFrom := segmentInfos[id].GetCompactionFrom()
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
for _, fromID := range compactionFrom {
if indexed.Contain(fromID) {
indexedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
unIndexedIDs.Remove(id)
droppedIDs.Remove(compactionFrom...)
}
unIndexedIDs.Remove(id)
droppedIDs.Remove(compactionFrom...)
}
return continueRetrieve
}
for retrieveUnIndexed() {
}
return continueRetrieve
}
for retrieveUnIndexed() {
}
// unindexed is flushed segments as well
indexedIDs.Insert(unIndexedIDs.Collect()...)
// unindexed is flushed segments as well
indexedIDs.Insert(unIndexedIDs.Collect()...)
for _, partitionID := range validPartitions {
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
partStatsVersionsMap[partitionID] = currentPartitionStatsVersion
}
log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
zap.Int("result indexed", len(indexedIDs)),
zap.Int("result unIndexed", len(unIndexedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Any("partition stats", partStatsVersionsMap),
)
return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),