Remove TimeTravel in compactor (#26785)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/26826/head
XuanYang-cn 2023-09-04 17:41:48 +08:00 committed by GitHub
parent d7cd1f2a6d
commit b2e7cbdf4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 65 additions and 339 deletions

View File

@ -77,7 +77,7 @@ minio:
# Custom endpoint for fetch IAM role credentials. when useIAM is true & cloudProvider is "aws".
# Leave it empty if you want to use AWS default endpoint
iamEndpoint:
# Log level for aws sdk log.
# Log level for aws sdk log.
# Supported level: off, fatal, error, warn, info, debug, trace
logLevel: fatal
# Cloud data center region
@ -440,8 +440,7 @@ common:
dataNodeSubNamePrefix: dataNode
defaultPartitionName: _default # default partition name for a collection
defaultIndexName: _default_idx # default index name
retentionDuration: 0 # time travel reserved time, insert/delete will not be cleaned in this period. disable it by default
entityExpiration: -1 # Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and -1 means never expire
entityExpiration: -1 # Entity expiration in seconds, CAUTION -1 means never expire
indexSliceSize: 16 # MB
threadCoreCoefficient:
highPriority: 10 # This parameter specify how many times the number of threads is the number of cores in high priority thread pool

View File

@ -35,7 +35,6 @@ import (
)
type compactTime struct {
travelTime Timestamp
expireTime Timestamp
collectionTTL time.Duration
}
@ -199,17 +198,15 @@ func (t *compactionTrigger) getCompactTime(ts Timestamp, coll *collectionInfo) (
}
pts, _ := tsoutil.ParseTS(ts)
ttRetention := pts.Add(Params.CommonCfg.RetentionDuration.GetAsDuration(time.Second) * -1)
ttRetentionLogic := tsoutil.ComposeTS(ttRetention.UnixNano()/int64(time.Millisecond), 0)
if collectionTTL > 0 {
ttexpired := pts.Add(-collectionTTL)
ttexpiredLogic := tsoutil.ComposeTS(ttexpired.UnixNano()/int64(time.Millisecond), 0)
return &compactTime{ttRetentionLogic, ttexpiredLogic, collectionTTL}, nil
return &compactTime{ttexpiredLogic, collectionTTL}, nil
}
// no expiration time
return &compactTime{ttRetentionLogic, 0, 0}, nil
return &compactTime{0, 0}, nil
}
// triggerCompaction trigger a compaction if any compaction condition satisfy.
@ -558,6 +555,7 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
nonPlannedSegments = append(nonPlannedSegments, segment)
}
}
var plans []*datapb.CompactionPlan
// sort segment from large to small
sort.Slice(prioritizedCandidates, func(i, j int) bool {
@ -587,7 +585,6 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
}
// greedy pick from large segment to small, the goal is to fill each segment to reach 512M
// we must ensure all prioritized candidates is in a plan
//TODO the compaction policy should consider segment with similar timestamp together so timetravel and data expiration could work better.
//TODO the compaction selection policy should consider if compaction workload is high
for len(prioritizedCandidates) > 0 {
var bucket []*SegmentInfo
@ -720,7 +717,6 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, i
func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.CompactionPlan {
plan := &datapb.CompactionPlan{
Timetravel: compactTime.travelTime,
Type: datapb.CompactionType_MixCompaction,
Channel: segments[0].GetInsertChannel(),
CollectionTtl: compactTime.collectionTTL.Nanoseconds(),
@ -894,29 +890,22 @@ func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, isDis
return true
}
// single compaction only merge insert and delta log beyond the timetravel
// segment's insert binlogs dont have time range info, so we wait until the segment's last expire time is less than timetravel
// to ensure that all insert logs is beyond the timetravel.
// TODO: add meta in insert binlog
if segment.LastExpireTime >= compactTime.travelTime {
return false
}
totalDeletedRows := 0
totalDeleteLogSize := int64(0)
for _, deltaLogs := range segment.GetDeltalogs() {
for _, l := range deltaLogs.GetBinlogs() {
if l.TimestampTo < compactTime.travelTime {
totalDeletedRows += int(l.GetEntriesNum())
totalDeleteLogSize += l.GetLogSize()
}
totalDeletedRows += int(l.GetEntriesNum())
totalDeleteLogSize += l.GetLogSize()
}
}
// currently delta log size and delete ratio policy is applied
if float64(totalDeletedRows)/float64(segment.GetNumOfRows()) >= Params.DataCoordCfg.SingleCompactionRatioThreshold.GetAsFloat() || totalDeleteLogSize > Params.DataCoordCfg.SingleCompactionDeltaLogMaxSize.GetAsInt64() {
log.Info("total delete entities is too much, trigger compaction", zap.Int64("segmentID", segment.ID),
zap.Int("deleted rows", totalDeletedRows), zap.Int64("delete log size", totalDeleteLogSize))
log.Info("total delete entities is too much, trigger compaction",
zap.Int64("segmentID", segment.ID),
zap.Int64("numRows", segment.GetNumOfRows()),
zap.Int("deleted rows", totalDeletedRows),
zap.Int64("delete log size", totalDeleteLogSize))
return true
}

View File

@ -32,7 +32,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type spyCompactionHandler struct {
@ -85,14 +84,6 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger *time.Ticker
}
paramtable.Init()
paramtable.Get().Save(Params.CommonCfg.RetentionDuration.Key, "200")
defer paramtable.Get().Reset(Params.CommonCfg.RetentionDuration.Key)
pts, _ := tsoutil.ParseTS(0)
ttRetention := pts.Add(-1 * Params.CommonCfg.RetentionDuration.GetAsDuration(time.Second))
timeTravel := tsoutil.ComposeTS(ttRetention.UnixNano()/int64(time.Millisecond), 0)
vecFieldID := int64(201)
indexID := int64(1001)
tests := []struct {
@ -446,7 +437,6 @@ func Test_compactionTrigger_force(t *testing.T) {
StartTime: 0,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Timetravel: timeTravel,
Channel: "ch1",
TotalRows: 200,
},
@ -741,7 +731,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
},
args{
2,
&compactTime{travelTime: 200, expireTime: 0},
&compactTime{},
},
false,
[]*datapb.CompactionPlan{
@ -788,7 +778,6 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
StartTime: 3,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Timetravel: 200,
Channel: "ch1",
},
},
@ -933,7 +922,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
},
args{
2,
&compactTime{travelTime: 200, expireTime: 0},
&compactTime{},
},
false,
nil,
@ -992,13 +981,13 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: numRows,
MaxRowNum: 110,
MaxRowNum: 150,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{
{
Binlogs: []*datapb.Binlog{
{EntriesNum: 5, LogPath: "log1", LogSize: 100},
{EntriesNum: numRows, LogPath: "log1", LogSize: 100},
},
},
},
@ -1030,7 +1019,6 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
tests := []struct {
name string
fields fields
args args
wantErr bool
wantPlans []*datapb.CompactionPlan
}{
@ -1038,7 +1026,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
"test small segment",
fields{
&meta{
// 4 small segments
// 8 small segments
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
1: {
@ -1071,16 +1059,6 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
lastFlushTime: time.Now(),
segmentIndexes: genSegIndex(6, indexID, 20),
},
7: {
SegmentInfo: genSeg(7, 20),
lastFlushTime: time.Now(),
segmentIndexes: genSegIndex(7, indexID, 20),
},
8: {
SegmentInfo: genSeg(8, 20),
lastFlushTime: time.Now(),
segmentIndexes: genSegIndex(8, indexID, 20),
},
},
},
collections: map[int64]*collectionInfo{
@ -1130,10 +1108,6 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
&spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 1)},
nil,
},
args{
2,
&compactTime{travelTime: 200, expireTime: 0},
},
false,
nil,
},
@ -1157,7 +1131,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
select {
case val := <-spy.spyChan:
// 6 segments in the final pick list
assert.Equal(t, len(val.SegmentBinlogs), 6)
assert.Equal(t, 6, len(val.SegmentBinlogs))
return
case <-time.After(3 * time.Second):
assert.Fail(t, "failed to get plan")
@ -1312,7 +1286,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
},
args{
2,
&compactTime{travelTime: 200, expireTime: 0},
&compactTime{},
},
false,
nil,
@ -1490,7 +1464,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
},
args{
2,
&compactTime{travelTime: 200, expireTime: 0},
&compactTime{},
},
false,
nil,
@ -1655,7 +1629,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
},
args{
2,
&compactTime{travelTime: 200, expireTime: 0},
&compactTime{},
},
false,
nil,
@ -1741,7 +1715,7 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
},
}
couldDo := trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
couldDo := trigger.ShouldDoSingleCompaction(info, false, &compactTime{})
assert.True(t, couldDo)
//Test too many stats log
@ -1759,22 +1733,22 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
},
}
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{})
assert.True(t, couldDo)
couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{travelTime: 200, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{})
assert.True(t, couldDo)
// if only 10 bin logs, then disk index won't trigger compaction
info.Statslogs = binlogs[0:20]
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{})
assert.True(t, couldDo)
couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{travelTime: 200, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info, true, &compactTime{})
assert.False(t, couldDo)
//Test too many stats log but compacted
info.CompactionFrom = []int64{0, 1}
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{travelTime: 200, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info, false, &compactTime{})
assert.False(t, couldDo)
//Test expire triggered compaction
@ -1809,15 +1783,15 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
}
// expire time < Timestamp To
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 300})
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{expireTime: 300})
assert.False(t, couldDo)
// didn't reach single compaction size 10 * 1024 * 1024
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 600})
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{expireTime: 600})
assert.False(t, couldDo)
// expire time < Timestamp False
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{travelTime: 200, expireTime: 1200})
couldDo = trigger.ShouldDoSingleCompaction(info2, false, &compactTime{expireTime: 1200})
assert.True(t, couldDo)
// Test Delete triggered compaction
@ -1851,16 +1825,12 @@ func Test_compactionTrigger_shouldDoSingleCompaction(t *testing.T) {
},
}
// expire time < Timestamp To
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{travelTime: 600, expireTime: 0})
assert.False(t, couldDo)
// deltalog is large enough, should do compaction
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{travelTime: 800, expireTime: 0})
couldDo = trigger.ShouldDoSingleCompaction(info3, false, &compactTime{})
assert.True(t, couldDo)
}
func Test_newCompactionTrigger(t *testing.T) {
func Test_compactionTrigger_new(t *testing.T) {
type args struct {
meta *meta
compactionHandler compactionPlanContext
@ -1889,7 +1859,7 @@ func Test_newCompactionTrigger(t *testing.T) {
}
}
func Test_handleSignal(t *testing.T) {
func Test_compactionTrigger_handleSignal(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
signal := &compactionSignal{
segmentID: 1,
@ -1899,7 +1869,7 @@ func Test_handleSignal(t *testing.T) {
})
}
func Test_allocTs(t *testing.T) {
func Test_compactionTrigger_allocTs(t *testing.T) {
got := newCompactionTrigger(&meta{segments: NewSegmentsInfo()}, &compactionPlanHandler{}, newMockAllocator(), newMockHandler())
ts, err := got.allocTs()
assert.NoError(t, err)
@ -1911,7 +1881,7 @@ func Test_allocTs(t *testing.T) {
assert.Equal(t, uint64(0), ts)
}
func Test_getCompactTime(t *testing.T) {
func Test_compactionTrigger_getCompactTime(t *testing.T) {
collections := map[UniqueID]*collectionInfo{
1: {
ID: 1,

View File

@ -23,7 +23,6 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -80,26 +79,6 @@ func failResponseWithCode(status *commonpb.Status, errCode commonpb.ErrorCode, r
status.Reason = reason
}
func GetCompactTime(ctx context.Context, allocator allocator) (*compactTime, error) {
ts, err := allocator.allocTimestamp(ctx)
if err != nil {
return nil, err
}
pts, _ := tsoutil.ParseTS(ts)
ttRetention := pts.Add(-1 * Params.CommonCfg.RetentionDuration.GetAsDuration(time.Second))
ttRetentionLogic := tsoutil.ComposeTS(ttRetention.UnixNano()/int64(time.Millisecond), 0)
// TODO, change to collection level
if Params.CommonCfg.EntityExpirationTTL.GetAsInt() > 0 {
ttexpired := pts.Add(-1 * Params.CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second))
ttexpiredLogic := tsoutil.ComposeTS(ttexpired.UnixNano()/int64(time.Millisecond), 0)
return &compactTime{ttRetentionLogic, ttexpiredLogic, Params.CommonCfg.EntityExpirationTTL.GetAsDuration(time.Second)}, nil
}
// no expiration time
return &compactTime{ttRetentionLogic, 0, 0}, nil
}
func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo {
if len(segments) == 0 {
return nil

View File

@ -22,13 +22,12 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
type UtilSuite struct {
@ -118,38 +117,6 @@ func (suite *UtilSuite) TestVerifyResponse() {
}
}
func (suite *UtilSuite) TestGetCompactTime() {
paramtable.Get().Save(Params.CommonCfg.RetentionDuration.Key, "43200") // 5 days
defer paramtable.Get().Reset(Params.CommonCfg.RetentionDuration.Key) // 5 days
tFixed := time.Date(2021, 11, 15, 0, 0, 0, 0, time.Local)
tBefore := tFixed.Add(-1 * Params.CommonCfg.RetentionDuration.GetAsDuration(time.Second))
type args struct {
allocator allocator
}
tests := []struct {
name string
args args
want *compactTime
wantErr bool
}{
{
"test get timetravel",
args{&fixedTSOAllocator{fixedTime: tFixed}},
&compactTime{tsoutil.ComposeTS(tBefore.UnixNano()/int64(time.Millisecond), 0), 0, 0},
false,
},
}
for _, tt := range tests {
suite.Run(tt.name, func() {
got, err := GetCompactTime(context.TODO(), tt.args.allocator)
suite.Equal(tt.wantErr, err != nil)
suite.EqualValues(tt.want, got)
})
}
}
func TestUtil(t *testing.T) {
suite.Run(t, new(UtilSuite))
}

View File

@ -19,7 +19,6 @@ package datanode
import (
"context"
"fmt"
"math"
"path"
"strconv"
"strings"
@ -151,59 +150,33 @@ func (t *compactionTask) getNumRows() (int64, error) {
return numRows, nil
}
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelTs Timestamp) (
map[interface{}]Timestamp, *DelDataBuf, error) {
func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob) (map[interface{}]Timestamp, error) {
log := log.With(zap.Int64("planID", t.getPlanID()))
mergeStart := time.Now()
dCodec := storage.NewDeleteCodec()
var (
pk2ts = make(map[interface{}]Timestamp)
dbuff = &DelDataBuf{
delData: &DeleteData{
Pks: make([]primaryKey, 0),
Tss: make([]Timestamp, 0)},
Binlog: datapb.Binlog{
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
}
)
var pk2ts = make(map[interface{}]Timestamp)
for _, blobs := range dBlobs {
_, _, dData, err := dCodec.Deserialize(blobs)
if err != nil {
log.Warn("merge deltalogs wrong", zap.Error(err))
return nil, nil, err
return nil, err
}
for i := int64(0); i < dData.RowCount; i++ {
pk := dData.Pks[i]
ts := dData.Tss[i]
if timetravelTs != Timestamp(0) && dData.Tss[i] <= timetravelTs {
pk2ts[pk.GetValue()] = ts
continue
}
dbuff.delData.Append(pk, ts)
if ts < dbuff.TimestampFrom {
dbuff.TimestampFrom = ts
}
if ts > dbuff.TimestampTo {
dbuff.TimestampTo = ts
}
pk2ts[pk.GetValue()] = ts
}
}
dbuff.accumulateEntriesNum(dbuff.delData.RowCount)
log.Info("mergeDeltalogs end",
zap.Int("number of deleted pks to compact in insert logs", len(pk2ts)),
zap.Duration("elapse", time.Since(mergeStart)))
return pk2ts, dbuff, nil
return pk2ts, nil
}
func (t *compactionTask) uploadRemainLog(
@ -717,7 +690,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
return nil, err
}
deltaPk2Ts, deltaBuf, err := t.mergeDeltalogs(dblobs, t.plan.GetTimetravel())
deltaPk2Ts, err := t.mergeDeltalogs(dblobs)
if err != nil {
return nil, err
}
@ -728,29 +701,11 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
return nil, err
}
uploadDeltaStart := time.Now()
deltaInfo, err := t.uploadDeltaLog(ctxTimeout, targetSegID, partID, deltaBuf.delData, meta)
if err != nil {
log.Warn("compact wrong", zap.Error(err))
return nil, err
}
log.Info("compact upload deltalog elapse", zap.Duration("elapse", time.Since(uploadDeltaStart)))
for _, fbl := range deltaInfo {
for _, deltaLogInfo := range fbl.GetBinlogs() {
deltaLogInfo.LogSize = deltaBuf.GetLogSize()
deltaLogInfo.TimestampFrom = deltaBuf.GetTimestampFrom()
deltaLogInfo.TimestampTo = deltaBuf.GetTimestampTo()
deltaLogInfo.EntriesNum = deltaBuf.GetEntriesNum()
}
}
pack := &datapb.CompactionResult{
PlanID: t.plan.GetPlanID(),
SegmentID: targetSegID,
InsertLogs: inPaths,
Field2StatslogPaths: statsPaths,
Deltalogs: deltaInfo,
NumOfRows: numRows,
Channel: t.plan.GetChannel(),
}
@ -762,7 +717,7 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
zap.Int64s("compactedFrom", segIDs),
zap.Int("num of binlog paths", len(inPaths)),
zap.Int("num of stats paths", len(statsPaths)),
zap.Int("num of delta paths", len(deltaInfo)),
zap.Int("num of delta paths", len(pack.GetDeltalogs())),
)
log.Info("compact overall elapse", zap.Duration("elapse", time.Since(compactStart)))

View File

@ -139,7 +139,7 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Test mergeDeltalogs", func(t *testing.T) {
t.Run("One segment with timetravel", func(t *testing.T) {
t.Run("One segment", func(t *testing.T) {
invalidBlobs := map[UniqueID][]*Blob{
1: {},
}
@ -171,13 +171,12 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
tests := []struct {
isvalid bool
dBlobs map[UniqueID][]*Blob
timetravel Timestamp
dBlobs map[UniqueID][]*Blob
description string
}{
{false, invalidBlobs, 0, "invalid dBlobs"},
{true, validBlobs, 21000, "valid blobs"},
{false, invalidBlobs, "invalid dBlobs"},
{true, validBlobs, "valid blobs"},
}
for _, test := range tests {
@ -185,29 +184,19 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
done: make(chan struct{}, 1),
}
t.Run(test.description, func(t *testing.T) {
pk2ts, err := task.mergeDeltalogs(test.dBlobs)
if test.isvalid {
pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel)
assert.NoError(t, err)
assert.Equal(t, 3, len(pk2ts))
assert.Equal(t, int64(3), db.GetEntriesNum())
assert.Equal(t, int64(3), db.delData.RowCount)
matchedPks := []primaryKey{newInt64PrimaryKey(1), newInt64PrimaryKey(4), newInt64PrimaryKey(5)}
assert.ElementsMatch(t, matchedPks, db.delData.Pks)
assert.ElementsMatch(t, []Timestamp{30000, 50000, 50000}, db.delData.Tss)
assert.Equal(t, 5, len(pk2ts))
} else {
pk2ts, db, err := task.mergeDeltalogs(test.dBlobs, test.timetravel)
assert.Error(t, err)
assert.Nil(t, pk2ts)
assert.Nil(t, db)
}
})
}
})
t.Run("Multiple segments with timetravel", func(t *testing.T) {
t.Run("Multiple segments", func(t *testing.T) {
tests := []struct {
segIDA UniqueID
dataApk []UniqueID
@ -221,22 +210,20 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
dataCpk []UniqueID
dataCts []Timestamp
timetravel Timestamp
expectedpk2ts int
expecteddb int
description string
}{
{
0, nil, nil,
100, []UniqueID{1, 2, 3}, []Timestamp{20000, 30000, 20005},
200, []UniqueID{4, 5, 6}, []Timestamp{50000, 50001, 50002},
40000, 3, 3, "2 segments with timetravel 40000",
6, "2 segments",
},
{
300, []UniqueID{10, 20}, []Timestamp{20001, 40001},
100, []UniqueID{1, 2, 3}, []Timestamp{20000, 30000, 20005},
200, []UniqueID{4, 5, 6}, []Timestamp{50000, 50001, 50002},
40000, 4, 4, "3 segments with timetravel 40000",
8, "3 segments",
},
}
@ -262,10 +249,9 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
task := &compactionTask{
done: make(chan struct{}, 1),
}
pk2ts, db, err := task.mergeDeltalogs(dBlobs, test.timetravel)
pk2ts, err := task.mergeDeltalogs(dBlobs)
assert.NoError(t, err)
assert.Equal(t, test.expectedpk2ts, len(pk2ts))
assert.Equal(t, test.expecteddb, int(db.GetEntriesNum()))
})
}
})
@ -854,7 +840,6 @@ func TestCompactorInterfaceMethods(t *testing.T) {
StartTime: 0,
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 40000,
Channel: "channelname",
}
@ -870,12 +855,10 @@ func TestCompactorInterfaceMethods(t *testing.T) {
assert.NotEmpty(t, result.Field2StatslogPaths)
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
plan.PlanID++
plan.Timetravel = Timestamp(25000)
channel.addFlushedSegmentWithPKs(c.segID1, c.colID, c.parID, 2, c.iData1)
channel.addFlushedSegmentWithPKs(c.segID2, c.colID, c.parID, 2, c.iData2)
channel.removeSegments(19530)
@ -889,31 +872,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
assert.Equal(t, plan.GetPlanID(), result.GetPlanID())
assert.Equal(t, UniqueID(19530), result.GetSegmentID())
assert.Equal(t, int64(3), result.GetNumOfRows())
assert.NotEmpty(t, result.InsertLogs)
assert.NotEmpty(t, result.Field2StatslogPaths)
// New test, remove all the binlogs in memkv
// Deltas in timetravel range
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
plan.PlanID++
plan.Timetravel = Timestamp(10000)
channel.addFlushedSegmentWithPKs(c.segID1, c.colID, c.parID, 2, c.iData1)
channel.addFlushedSegmentWithPKs(c.segID2, c.colID, c.parID, 2, c.iData2)
channel.removeSegments(19530)
require.True(t, channel.hasSegment(c.segID1, true))
require.True(t, channel.hasSegment(c.segID2, true))
require.False(t, channel.hasSegment(19530, true))
result, err = task.compact()
assert.NoError(t, err)
assert.NotNil(t, result)
assert.Equal(t, plan.GetPlanID(), result.GetPlanID())
assert.Equal(t, UniqueID(19530), result.GetSegmentID())
assert.Equal(t, int64(4), result.GetNumOfRows())
assert.Equal(t, int64(2), result.GetNumOfRows())
assert.NotEmpty(t, result.InsertLogs)
assert.NotEmpty(t, result.Field2StatslogPaths)
@ -997,7 +956,6 @@ func TestCompactorInterfaceMethods(t *testing.T) {
StartTime: 0,
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 40000,
Channel: "channelname",
}

View File

@ -36,7 +36,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -559,16 +558,13 @@ func (ss *SuffixSnapshot) startBackgroundGC() {
ticker := time.NewTicker(60 * time.Minute)
defer ticker.Stop()
params := paramtable.Get()
retentionDuration := params.CommonCfg.RetentionDuration.GetAsDuration(time.Second)
for {
select {
case <-ss.closeGC:
log.Warn("quit suffix snapshot GC goroutine!")
return
case now := <-ticker.C:
err := ss.removeExpiredKvs(now, retentionDuration)
err := ss.removeExpiredKvs(now)
if err != nil {
log.Warn("remove expired data fail during GC", zap.Error(err))
}
@ -602,7 +598,7 @@ func (ss *SuffixSnapshot) batchRemoveExpiredKvs(keyGroup []string, originalKey s
return etcd.RemoveByBatch(keyGroup, removeFn)
}
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time, retentionDuration time.Duration) error {
func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time) error {
keyGroup := make([]string, 0)
latestOriginalKey := ""
latestValue := ""
@ -652,8 +648,7 @@ func (ss *SuffixSnapshot) removeExpiredKvs(now time.Time, retentionDuration time
latestOriginalKey = curOriginalKey
// record keys if the kv is expired
pts, _ := tsoutil.ParseTS(ts)
expireTime := pts.Add(retentionDuration)
expireTime, _ := tsoutil.ParseTS(ts)
// break loop if it reaches expire time
if expireTime.Before(now) {
keyGroup = append(keyGroup, key)

View File

@ -502,7 +502,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
cnt := countPrefix(prefix)
assert.Equal(t, keyCnt*keyVersion+keyCnt, cnt)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
err = ss.removeExpiredKvs(now)
assert.NoError(t, err)
cnt = countPrefix(prefix)
@ -542,11 +542,12 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
cnt := countPrefix(prefix)
assert.Equal(t, 12, cnt)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
// err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
err = ss.removeExpiredKvs(now)
assert.NoError(t, err)
cnt = countPrefix(prefix)
assert.Equal(t, 6, cnt)
assert.Equal(t, 4, cnt)
// clean all data
err := etcdkv.RemoveWithPrefix("")
@ -559,7 +560,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
err = etcdkv.Save(ss.composeSnapshotPrefix(key), "")
assert.NoError(t, err)
err = ss.removeExpiredKvs(now, time.Duration(50)*time.Millisecond)
err = ss.removeExpiredKvs(now)
assert.NoError(t, err)
cnt := countPrefix(prefix)
@ -582,7 +583,7 @@ func Test_SuffixSnapshotRemoveExpiredKvs(t *testing.T) {
assert.NotNil(t, ss)
assert.NoError(t, err)
err = ss.removeExpiredKvs(time.Now(), time.Duration(100))
err = ss.removeExpiredKvs(time.Now())
assert.Error(t, err)
})
}

View File

@ -1421,32 +1421,6 @@ func TestProxy(t *testing.T) {
// })
//
// wg.Add(1)
// t.Run("search_travel", func(t *testing.T) {
// defer wg.Done()
// past := time.Now().Add(time.Duration(-1*Params.CommonCfg.RetentionDuration-100) * time.Second)
// travelTs := tsoutil.ComposeTSByTime(past, 0)
// req := constructSearchRequest()
// req.TravelTimestamp = travelTs
// //resp, err := proxy.Search(ctx, req)
// res, err := proxy.Search(ctx, req)
// assert.NoError(t, err)
// assert.NotEqual(t, commonpb.ErrorCode_Success, res.Status.ErrorCode)
// })
//
// wg.Add(1)
// t.Run("search_travel_succ", func(t *testing.T) {
// defer wg.Done()
// past := time.Now().Add(time.Duration(-1*Params.CommonCfg.RetentionDuration+100) * time.Second)
// travelTs := tsoutil.ComposeTSByTime(past, 0)
// req := constructSearchRequest()
// req.TravelTimestamp = travelTs
// //resp, err := proxy.Search(ctx, req)
// res, err := proxy.Search(ctx, req)
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, res.Status.ErrorCode)
// })
//
// wg.Add(1)
// t.Run("query", func(t *testing.T) {
// defer wg.Done()
// //resp, err := proxy.Query(ctx, &milvuspb.QueryRequest{
@ -1465,47 +1439,6 @@ func TestProxy(t *testing.T) {
// // assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
// // TODO(dragondriver): compare query result
// })
//
// wg.Add(1)
// t.Run("query_travel", func(t *testing.T) {
// defer wg.Done()
// past := time.Now().Add(time.Duration(-1*Params.CommonCfg.RetentionDuration-100) * time.Second)
// travelTs := tsoutil.ComposeTSByTime(past, 0)
// queryReq := &milvuspb.QueryRequest{
// Base: nil,
// DbName: dbName,
// CollectionName: collectionName,
// Expr: expr,
// OutputFields: nil,
// PartitionNames: nil,
// TravelTimestamp: travelTs,
// GuaranteeTimestamp: 0,
// }
// res, err := proxy.Query(ctx, queryReq)
// assert.NoError(t, err)
// assert.NotEqual(t, commonpb.ErrorCode_Success, res.Status.ErrorCode)
// })
//
// wg.Add(1)
// t.Run("query_travel_succ", func(t *testing.T) {
// defer wg.Done()
// past := time.Now().Add(time.Duration(-1*Params.CommonCfg.RetentionDuration+100) * time.Second)
// travelTs := tsoutil.ComposeTSByTime(past, 0)
// queryReq := &milvuspb.QueryRequest{
// Base: nil,
// DbName: dbName,
// CollectionName: collectionName,
// Expr: expr,
// OutputFields: nil,
// PartitionNames: nil,
// TravelTimestamp: travelTs,
// GuaranteeTimestamp: 0,
// }
// res, err := proxy.Query(ctx, queryReq)
// assert.NoError(t, err)
// assert.Equal(t, commonpb.ErrorCode_EmptyCollection, res.Status.ErrorCode)
// })
// }
wg.Add(1)
t.Run("calculate distance", func(t *testing.T) {

View File

@ -29,9 +29,6 @@ import (
)
const (
// DefaultRetentionDuration defines the default duration for retention which is 1 days in seconds.
DefaultRetentionDuration = 0
// DefaultIndexSliceSize defines the default slice size of index file when serializing.
DefaultIndexSliceSize = 16
DefaultGracefulTime = 5000 // ms
@ -178,7 +175,6 @@ type commonConfig struct {
DefaultPartitionName ParamItem `refreshable:"false"`
DefaultIndexName ParamItem `refreshable:"true"`
RetentionDuration ParamItem `refreshable:"true"`
EntityExpirationTTL ParamItem `refreshable:"true"`
IndexSliceSize ParamItem `refreshable:"false"`
@ -355,15 +351,6 @@ func (p *commonConfig) init(base *BaseTable) {
}
p.DefaultIndexName.Init(base.mgr)
p.RetentionDuration = ParamItem{
Key: "common.retentionDuration",
Version: "2.0.0",
DefaultValue: strconv.Itoa(DefaultRetentionDuration),
Doc: "time travel reserved time, insert/delete will not be cleaned in this period. disable it by default",
Export: true,
}
p.RetentionDuration.Init(base.mgr)
p.EntityExpirationTTL = ParamItem{
Key: "common.entityExpiration",
Version: "2.1.0",
@ -374,13 +361,9 @@ func (p *commonConfig) init(base *BaseTable) {
return "-1"
}
// make sure ttl is larger than retention duration to ensure time travel works
if ttl > p.RetentionDuration.GetAsInt() {
return strconv.Itoa(ttl)
}
return p.RetentionDuration.GetValue()
return strconv.Itoa(ttl)
},
Doc: "Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and -1 means never expire",
Doc: "Entity expiration in seconds, CAUTION -1 means never expire",
Export: true,
}
p.EntityExpirationTTL.Init(base.mgr)

View File

@ -40,9 +40,6 @@ func TestComponentParam(t *testing.T) {
assert.NotEqual(t, Params.DefaultIndexName.GetValue(), "")
t.Logf("default index name = %s", Params.DefaultIndexName.GetValue())
assert.Equal(t, Params.RetentionDuration.GetAsInt64(), int64(DefaultRetentionDuration))
t.Logf("default retention duration = %d", Params.RetentionDuration.GetAsInt64())
assert.Equal(t, Params.EntityExpirationTTL.GetAsInt64(), int64(-1))
t.Logf("default entity expiration = %d", Params.EntityExpirationTTL.GetAsInt64())