mirror of https://github.com/milvus-io/milvus.git
Save binlog timestampFrom, timestampTo meta when compact (#26203)
Signed-off-by: wayblink <anyang.wang@zilliz.com>pull/26226/head
parent
b9850ce5c0
commit
21eeb37ce9
|
@ -875,15 +875,22 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
|
|||
for _, l := range binlogs.GetBinlogs() {
|
||||
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
|
||||
if l.TimestampTo < compactTime.expireTime {
|
||||
log.RatedDebug(10, "mark binlog as expired",
|
||||
zap.Int64("segmentID", segment.ID),
|
||||
zap.Int64("binlogID", l.GetLogID()),
|
||||
zap.Uint64("binlogTimestampTo", l.TimestampTo),
|
||||
zap.Uint64("compactExpireTime", compactTime.expireTime))
|
||||
totalExpiredRows += int(l.GetEntriesNum())
|
||||
totalExpiredSize += l.GetLogSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {
|
||||
if float64(totalExpiredRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() ||
|
||||
totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize.GetAsInt64() {
|
||||
log.Info("total expired entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),
|
||||
zap.Int("expired rows", totalExpiredRows), zap.Int64("expired log size", totalExpiredSize))
|
||||
zap.Int("expiredRows", totalExpiredRows), zap.Int64("expiredLogSize", totalExpiredSize),
|
||||
zap.Bool("createdByCompaction", segment.CreatedByCompaction), zap.Int64s("compactionFrom", segment.CompactionFrom))
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -319,8 +319,12 @@ func (t *compactionTask) merge(
|
|||
return false
|
||||
}
|
||||
|
||||
addInsertFieldPath := func(inPaths map[UniqueID]*datapb.FieldBinlog) {
|
||||
addInsertFieldPath := func(inPaths map[UniqueID]*datapb.FieldBinlog, timestampFrom, timestampTo int64) {
|
||||
for fID, path := range inPaths {
|
||||
for _, binlog := range path.GetBinlogs() {
|
||||
binlog.TimestampTo = uint64(timestampTo)
|
||||
binlog.TimestampFrom = uint64(timestampFrom)
|
||||
}
|
||||
tmpBinlog, ok := insertField2Path[fID]
|
||||
if !ok {
|
||||
tmpBinlog = path
|
||||
|
@ -387,6 +391,11 @@ func (t *compactionTask) merge(
|
|||
}
|
||||
|
||||
stats := storage.NewPrimaryKeyStats(pkID, int64(pkType), oldRowNums)
|
||||
// initial timestampFrom, timestampTo = -1, -1 is an illegal value, only to mark initial state
|
||||
var (
|
||||
timestampTo int64 = -1
|
||||
timestampFrom int64 = -1
|
||||
)
|
||||
|
||||
for _, path := range unMergedInsertlogs {
|
||||
downloadStart := time.Now()
|
||||
|
@ -402,6 +411,7 @@ func (t *compactionTask) merge(
|
|||
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
|
||||
for iter.HasNext() {
|
||||
vInter, _ := iter.Next()
|
||||
v, ok := vInter.(*storage.Value)
|
||||
|
@ -421,6 +431,14 @@ func (t *compactionTask) merge(
|
|||
continue
|
||||
}
|
||||
|
||||
// Update timestampFrom, timestampTo
|
||||
if v.Timestamp < timestampFrom || timestampFrom == -1 {
|
||||
timestampFrom = v.Timestamp
|
||||
}
|
||||
if v.Timestamp > timestampTo || timestampFrom == -1 {
|
||||
timestampTo = v.Timestamp
|
||||
}
|
||||
|
||||
row, ok := v.Value.(map[UniqueID]interface{})
|
||||
if !ok {
|
||||
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
|
||||
|
@ -445,7 +463,9 @@ func (t *compactionTask) merge(
|
|||
return nil, nil, 0, err
|
||||
}
|
||||
uploadInsertTimeCost += time.Since(uploadInsertStart)
|
||||
addInsertFieldPath(inPaths)
|
||||
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
|
||||
timestampFrom = -1
|
||||
timestampTo = -1
|
||||
|
||||
fID2Content = make(map[int64][]interface{})
|
||||
currentRows = 0
|
||||
|
@ -465,7 +485,7 @@ func (t *compactionTask) merge(
|
|||
}
|
||||
|
||||
uploadInsertTimeCost += time.Since(uploadStart)
|
||||
addInsertFieldPath(inPaths)
|
||||
addInsertFieldPath(inPaths, timestampFrom, timestampTo)
|
||||
addStatFieldPath(statsPaths)
|
||||
numRows += int64(currentRows)
|
||||
numBinlogs += len(inPaths)
|
||||
|
|
|
@ -323,6 +323,8 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
assert.Equal(t, int64(2), numOfRow)
|
||||
assert.Equal(t, 1, len(inPaths[0].GetBinlogs()))
|
||||
assert.Equal(t, 1, len(statsPaths))
|
||||
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom())
|
||||
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
|
||||
})
|
||||
t.Run("Merge without expiration2", func(t *testing.T) {
|
||||
mockbIO := &binlogIO{cm, alloc}
|
||||
|
@ -364,6 +366,57 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
|
|||
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
|
||||
assert.Equal(t, 1, len(statsPaths))
|
||||
assert.Equal(t, 1, len(statsPaths[0].GetBinlogs()))
|
||||
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampFrom())
|
||||
assert.NotEqual(t, -1, inPaths[0].GetBinlogs()[0].GetTimestampTo())
|
||||
})
|
||||
// set Params.DataNodeCfg.BinLogMaxSize.Key = 1 to generate multi binlogs, each has only one row
|
||||
t.Run("Merge without expiration3", func(t *testing.T) {
|
||||
|
||||
mockbIO := &binlogIO{cm, alloc}
|
||||
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0")
|
||||
BinLogMaxSize := Params.DataNodeCfg.BinLogMaxSize
|
||||
defer func() {
|
||||
Params.DataNodeCfg.BinLogMaxSize = BinLogMaxSize
|
||||
}()
|
||||
paramtable.Get().Save(Params.DataNodeCfg.BinLogMaxSize.Key, "1")
|
||||
iData := genInsertDataWithExpiredTS()
|
||||
|
||||
var allPaths [][]string
|
||||
inpath, err := mockbIO.uploadInsertLog(context.Background(), 1, 0, iData, meta)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 12, len(inpath))
|
||||
binlogNum := len(inpath[0].GetBinlogs())
|
||||
assert.Equal(t, 1, binlogNum)
|
||||
|
||||
for idx := 0; idx < binlogNum; idx++ {
|
||||
var ps []string
|
||||
for _, path := range inpath {
|
||||
ps = append(ps, path.GetBinlogs()[idx].GetLogPath())
|
||||
}
|
||||
allPaths = append(allPaths, ps)
|
||||
}
|
||||
|
||||
dm := map[interface{}]Timestamp{
|
||||
1: 10000,
|
||||
}
|
||||
|
||||
ct := &compactionTask{
|
||||
Channel: channel, downloader: mockbIO, uploader: mockbIO, done: make(chan struct{}, 1),
|
||||
plan: &datapb.CompactionPlan{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{SegmentID: 1}},
|
||||
}}
|
||||
inPaths, statsPaths, numOfRow, err := ct.merge(context.Background(), allPaths, 2, 0, meta, dm)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(2), numOfRow)
|
||||
assert.Equal(t, 2, len(inPaths[0].GetBinlogs()))
|
||||
assert.Equal(t, 1, len(statsPaths))
|
||||
for _, inpath := range inPaths {
|
||||
assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampFrom())
|
||||
assert.NotEqual(t, -1, inpath.GetBinlogs()[0].GetTimestampTo())
|
||||
// as only one row for each binlog, timestampTo == timestampFrom
|
||||
assert.Equal(t, inpath.GetBinlogs()[0].GetTimestampTo(), inpath.GetBinlogs()[0].GetTimestampFrom())
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Merge with expiration", func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue