Skip delta logs have been applied (#26971)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/27235/head
yah01 2023-09-19 16:21:23 +08:00 committed by GitHub
parent 99721c8dd2
commit 0a750408d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 2 deletions

View File

@ -879,7 +879,11 @@ func SaveDeltaLog(collectionID int64,
kvs[keyPath] = blob.Value[:]
fieldBinlog = append(fieldBinlog, &datapb.FieldBinlog{
FieldID: pkFieldID,
Binlogs: []*datapb.Binlog{{LogPath: keyPath}},
Binlogs: []*datapb.Binlog{{
LogPath: keyPath,
TimestampFrom: 100,
TimestampTo: 200,
}},
})
log.Debug("[query node unittest] save delta log file to MinIO/S3")

View File

@ -188,7 +188,7 @@ func NewSegment(collection *Collection,
var segment = &LocalSegment{
baseSegment: newBaseSegment(segmentID, partitionID, collectionID, shard, segmentType, version, startPosition),
ptr: segmentPtr,
lastDeltaTimestamp: atomic.NewUint64(deltaPosition.GetTimestamp()),
lastDeltaTimestamp: atomic.NewUint64(0),
fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](),
}
@ -838,6 +838,8 @@ func (s *LocalSegment) LoadDeltaData(deltaData *storage.DeleteData) error {
return err
}
s.lastDeltaTimestamp.Store(tss[len(tss)-1])
log.Info("load deleted record done",
zap.Int64("rowNum", rowNum),
zap.String("segmentType", s.Type().String()))

View File

@ -769,6 +769,11 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment *LocalSe
var blobs []*storage.Blob
for _, deltaLog := range deltaLogs {
for _, bLog := range deltaLog.GetBinlogs() {
// the segment has applied the delta logs, skip it
if bLog.GetTimestampTo() > 0 && // this field may be missed in legacy versions
bLog.GetTimestampTo() < segment.LastDeltaTimestamp() {
continue
}
value, err := loader.cm.Read(ctx, bLog.GetLogPath())
if err != nil {
return err

View File

@ -356,6 +356,66 @@ func (suite *SegmentLoaderSuite) TestLoadDeltaLogs() {
}
}
func (suite *SegmentLoaderSuite) TestLoadDupDeltaLogs() {
ctx := context.Background()
loadInfos := make([]*querypb.SegmentLoadInfo, 0, suite.segmentNum)
msgLength := 100
// Load sealed
for i := 0; i < suite.segmentNum; i++ {
segmentID := suite.segmentID + int64(i)
binlogs, statsLogs, err := SaveBinLog(ctx,
suite.collectionID,
suite.partitionID,
segmentID,
msgLength,
suite.schema,
suite.chunkManager,
)
suite.NoError(err)
// Delete PKs 1, 2
deltaLogs, err := SaveDeltaLog(suite.collectionID,
suite.partitionID,
segmentID,
suite.chunkManager,
)
suite.NoError(err)
loadInfos = append(loadInfos, &querypb.SegmentLoadInfo{
SegmentID: segmentID,
PartitionID: suite.partitionID,
CollectionID: suite.collectionID,
BinlogPaths: binlogs,
Statslogs: statsLogs,
Deltalogs: deltaLogs,
NumOfRows: int64(msgLength),
})
}
segments, err := suite.loader.Load(ctx, suite.collectionID, SegmentTypeGrowing, 0, loadInfos...)
suite.NoError(err)
for i, segment := range segments {
suite.Equal(int64(100-2), segment.RowNum())
for pk := 0; pk < 100; pk++ {
if pk == 1 || pk == 2 {
continue
}
exist := segment.MayPkExist(storage.NewInt64PrimaryKey(int64(pk)))
suite.Require().True(exist)
}
seg := segment.(*LocalSegment)
// nothing would happen as the delta logs have been all applied,
// so the released segment won't cause error
seg.Release()
loadInfos[i].Deltalogs[0].Binlogs[0].TimestampTo--
err := suite.loader.LoadDeltaLogs(ctx, seg, loadInfos[i].GetDeltalogs())
suite.NoError(err)
}
}
func (suite *SegmentLoaderSuite) TestLoadIndex() {
ctx := context.Background()
segment := &LocalSegment{}