Protect DataCoord from calculating segment lines by stale log entries num (#26523)

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/26375/head
yah01 2023-08-22 14:16:22 +08:00 committed by GitHub
parent fe16cdbca8
commit bed034a44b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 13 additions and 7 deletions

View File

@ -554,14 +554,15 @@ func (m *meta) UpdateFlushSegmentsInfo(
if importing {
s := clonedSegment
s.NumOfRows = s.currRows
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
if count != segment.currRows {
if count != segment.currRows && count > 0 {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segmentID", segment.GetID()),
zap.Int64("current rows (wrong)", segment.currRows),
zap.Int64("segment bin log row count (correct)", count))
s.NumOfRows = count
}
s.NumOfRows = count
modSegments[segmentID] = s
} else {
for _, cp := range checkpoints {
@ -578,15 +579,16 @@ func (m *meta) UpdateFlushSegmentsInfo(
continue
}
s.NumOfRows = cp.NumOfRows
count := segmentutil.CalcRowCountFromBinLog(s.SegmentInfo)
// count should smaller than or equal to cp reported
if count != cp.NumOfRows {
if count != cp.NumOfRows && count > 0 {
log.Info("check point reported inconsistent with bin log row count",
zap.Int64("segmentID", segment.GetID()),
zap.Int64("check point (wrong)", cp.NumOfRows),
zap.Int64("segment bin log row count (correct)", count))
s.NumOfRows = count
}
s.NumOfRows = count
s.DmlPosition = cp.GetPosition()
modSegments[cp.GetSegmentID()] = s

View File

@ -490,7 +490,7 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
err = meta.AddSegment(segment1)
assert.NoError(t, err)
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))},
err = meta.UpdateFlushSegmentsInfo(1, true, false, false, []*datapb.FieldBinlog{getFieldBinlogPathsWithEntry(1, 10, getInsertLogPath("binlog1", 1))},
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, getStatsLogPath("statslog1", 1))},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1)}}}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})

View File

@ -713,7 +713,7 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
segment2Binlogs[id] = append(segment2Binlogs[id], fieldBinlogs)
}
if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows {
if newCount := segmentutil.CalcRowCountFromBinLog(segment.SegmentInfo); newCount != segment.NumOfRows && newCount > 0 {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segmentID", segment.GetID()),
zap.Int64("segment meta row count (wrong)", segment.GetNumOfRows()),

View File

@ -11,7 +11,7 @@ import (
// Note that `segCloned` should be a copied version of `seg`.
func ReCalcRowCount(seg, segCloned *datapb.SegmentInfo) {
// `segment` is not mutated but only cloned above and is safe to be referred here.
if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() {
if newCount := CalcRowCountFromBinLog(seg); newCount != seg.GetNumOfRows() && newCount > 0 {
log.Warn("segment row number meta inconsistent with bin log row count and will be corrected",
zap.Int64("segmentID", seg.GetID()),
zap.Int64("segment meta row count (wrong)", seg.GetNumOfRows()),
@ -27,6 +27,10 @@ func CalcRowCountFromBinLog(seg *datapb.SegmentInfo) int64 {
if len(seg.GetBinlogs()) > 0 {
for _, ct := range seg.GetBinlogs()[0].GetBinlogs() {
rowCt += ct.GetEntriesNum()
// This segment contains stale log with incorrect entries num,
if ct.GetEntriesNum() <= 0 {
return -1
}
}
}
return rowCt