From f6873d3dc1fb0d7a8e7baadc6720772846df65f8 Mon Sep 17 00:00:00 2001 From: JunSung Park Date: Mon, 7 Feb 2022 23:45:46 +0900 Subject: [PATCH] Implement automatic item expiration on compaction (#14418) * Implement automatic item expiration on compaction Signed-off-by: JunSung Park * Change entityExpiration value comparison logic Signed-off-by: JunSung Park * Add logging for the number of expired entity Signed-off-by: JunSung Park * Set default value of CompactionEntityExpiration to off Signed-off-by: JunSung Park * Fix errors for code checker Signed-off-by: JunSung Park * Set default entityExpiration to max value Signed-off-by: JunSung Park --- configs/milvus.yaml | 3 +- internal/datanode/compactor.go | 48 +++++++-- internal/datanode/compactor_test.go | 119 ++++++++++++++++++++--- internal/datanode/mock_test.go | 60 ++++++++++++ internal/util/paramtable/global_param.go | 14 +++ 5 files changed, 221 insertions(+), 23 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 832c941536..73169a5d15 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -161,10 +161,11 @@ dataCoord: segment: maxSize: 512 # Maximum size of a segment in MB sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed - assignmentExpiration: 2000 # The time of the assignment expiration in ms + assignmentExpiration: 2000 # The time of the assignment expiration in ms compaction: enableAutoCompaction: true + entityExpiration: 9223372037 # Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and 9223372037 is the maximum value of entityExpiration gc: interval: 3600 # gc interval in seconds diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 9d47eafa32..0b1fe238fe 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -33,6 +33,8 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -179,13 +181,14 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT return pk2ts, dbuff, nil } -func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema) ([]*InsertData, int64, error) { +func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) { var ( - dim int // dimension of vector field - num int // numOfRows in each binlog - n int // binlog number - err error + dim int // dimension of vector field + num int // numOfRows in each binlog + n int // binlog number + expired int64 // the number of expired entity + err error iDatas = make([]*InsertData, 0) fID2Type = make(map[UniqueID]schemapb.DataType) @@ -209,6 +212,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, } } + expired = 0 for mergeItr.HasNext() { // no error if HasNext() returns true vInter, _ := mergeItr.Next() @@ -223,6 +227,13 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, continue } + ts := Timestamp(v.Timestamp) + // Filtering expired entity + if t.isExpiredEntity(ts, currentTs) { + expired++ + continue + } + row, ok := v.Value.(map[UniqueID]interface{}) if !ok { log.Warn("transfer interface to map wrong") @@ -273,7 +284,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, } - log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows)) + log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired)) return iDatas, numRows, nil } @@ -415,7 +426,7 @@ func (t *compactionTask) compact() error { return err } - iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema()) + iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime()) if err != nil { log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err)) return err @@ -647,3 +658,26 @@ func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *et func (t *compactionTask) getCollection() UniqueID { return t.getCollectionID() } + +func (t *compactionTask) GetCurrentTime() typeutil.Timestamp { + return tsoutil.GetCurrentTime() +} + +func (t *compactionTask) isExpiredEntity(ts, now Timestamp) bool { + const MaxEntityExpiration = 9223372036 // the value was setup by math.MaxInt64 / time.Second + // Check calculable range of milvus config value + if Params.DataCoordCfg.CompactionEntityExpiration > MaxEntityExpiration { + return false + } + + duration := time.Duration(Params.DataCoordCfg.CompactionEntityExpiration) * time.Second + // Prevent from duration overflow value + if duration < 0 { + return false + } + + pts, _ := tsoutil.ParseTS(ts) + pnow, _ := tsoutil.ParseTS(now) + expireTime := pts.Add(duration) + return expireTime.Before(pnow) +} diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 13091b6ba9..36bef344fd 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -18,6 +18,7 @@ package datanode import ( "context" + "math" "testing" "time" @@ -235,27 +236,114 @@ func TestCompactionTaskInnerMethods(t *testing.T) { }) t.Run("Test merge", func(t *testing.T) { - iData := genInsertData() - meta := NewMetaFactory().GetCollectionMeta(1, "test") + t.Run("Merge without expiration", func(t *testing.T) { + Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64 + iData := genInsertDataWithExpiredTS() + meta := NewMetaFactory().GetCollectionMeta(1, "test") - iblobs, err := getInsertBlobs(100, iData, meta) - require.NoError(t, err) + iblobs, err := getInsertBlobs(100, iData, meta) + require.NoError(t, err) - iitr, err := storage.NewInsertBinlogIterator(iblobs, 106) - require.NoError(t, err) + iitr, err := storage.NewInsertBinlogIterator(iblobs, 106) + require.NoError(t, err) - mitr := storage.NewMergeIterator([]iterator{iitr}) + mitr := storage.NewMergeIterator([]iterator{iitr}) - dm := map[UniqueID]Timestamp{ - 1: 10000, - } + dm := map[UniqueID]Timestamp{ + 1: 10000, + } - ct := &compactionTask{} - idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema()) - assert.NoError(t, err) - assert.Equal(t, int64(1), numOfRow) - assert.Equal(t, 1, len(idata)) + ct := &compactionTask{} + idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime()) + assert.NoError(t, err) + assert.Equal(t, int64(1), numOfRow) + assert.Equal(t, 1, len(idata)) + assert.NotEmpty(t, idata[0].Data) + }) + t.Run("Merge with expiration", func(t *testing.T) { + Params.DataCoordCfg.CompactionEntityExpiration = 864000 // 10 days in seconds + iData := genInsertDataWithExpiredTS() + meta := NewMetaFactory().GetCollectionMeta(1, "test") + iblobs, err := getInsertBlobs(100, iData, meta) + require.NoError(t, err) + + iitr, err := storage.NewInsertBinlogIterator(iblobs, 106) + require.NoError(t, err) + + mitr := storage.NewMergeIterator([]iterator{iitr}) + + dm := map[UniqueID]Timestamp{ + 1: 10000, + } + + ct := &compactionTask{} + idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp()) + assert.NoError(t, err) + assert.Equal(t, int64(0), numOfRow) + assert.Equal(t, 1, len(idata)) + assert.Empty(t, idata[0].Data) + }) + }) + + t.Run("Test isExpiredEntity", func(t *testing.T) { + t.Run("When CompactionEntityExpiration is set math.MaxInt64", func(t *testing.T) { + Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64 + + ct := &compactionTask{} + res := ct.isExpiredEntity(0, genTimestamp()) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(math.MaxInt64, genTimestamp()) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(0, math.MaxInt64) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(math.MaxInt64, math.MaxInt64) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(math.MaxInt64, 0) + assert.Equal(t, false, res) + }) + t.Run("When CompactionEntityExpiration is set MAX_ENTITY_EXPIRATION = 9223372036", func(t *testing.T) { + Params.DataCoordCfg.CompactionEntityExpiration = 9223372036 // math.MaxInt64 / time.Second + + ct := &compactionTask{} + res := ct.isExpiredEntity(0, genTimestamp()) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(math.MaxInt64, genTimestamp()) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(0, math.MaxInt64) + assert.Equal(t, true, res) + + res = ct.isExpiredEntity(math.MaxInt64, math.MaxInt64) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(math.MaxInt64, 0) + assert.Equal(t, false, res) + }) + t.Run("When CompactionEntityExpiration is set 10 days", func(t *testing.T) { + Params.DataCoordCfg.CompactionEntityExpiration = 864000 // 10 days in seconds + + ct := &compactionTask{} + res := ct.isExpiredEntity(0, genTimestamp()) + assert.Equal(t, true, res) + + res = ct.isExpiredEntity(math.MaxInt64, genTimestamp()) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(0, math.MaxInt64) + assert.Equal(t, true, res) + + res = ct.isExpiredEntity(math.MaxInt64, math.MaxInt64) + assert.Equal(t, false, res) + + res = ct.isExpiredEntity(math.MaxInt64, 0) + assert.Equal(t, false, res) + }) }) } @@ -285,6 +373,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { Field2StatslogPaths: nil, Deltalogs: nil, }} + Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64 // Turn off auto expiration t.Run("Test compact invalid", func(t *testing.T) { invalidAlloc := NewAllocatorFactory(-1) diff --git a/internal/datanode/mock_test.go b/internal/datanode/mock_test.go index baea8cf53b..e6e1fdf26b 100644 --- a/internal/datanode/mock_test.go +++ b/internal/datanode/mock_test.go @@ -33,6 +33,8 @@ import ( "github.com/milvus-io/milvus/internal/msgstream" s "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/tsoutil" + "github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/proto/commonpb" @@ -816,3 +818,61 @@ func genEmptyInsertData() *InsertData { }, }} } + +func genInsertDataWithExpiredTS() *InsertData { + return &InsertData{ + Data: map[int64]s.FieldData{ + 0: &s.Int64FieldData{ + NumRows: []int64{2}, + Data: []int64{11, 22}, + }, + 1: &s.Int64FieldData{ + NumRows: []int64{2}, + Data: []int64{329749364736000000, 329500223078400000}, // 2009-11-10 23:00:00 +0000 UTC, 2009-10-31 23:00:00 +0000 UTC + }, + 100: &s.FloatVectorFieldData{ + NumRows: []int64{2}, + Data: []float32{1.0, 6.0, 7.0, 8.0}, + Dim: 2, + }, + 101: &s.BinaryVectorFieldData{ + NumRows: []int64{2}, + Data: []byte{0, 255, 255, 255, 128, 128, 128, 0}, + Dim: 32, + }, + 102: &s.BoolFieldData{ + NumRows: []int64{2}, + Data: []bool{true, false}, + }, + 103: &s.Int8FieldData{ + NumRows: []int64{2}, + Data: []int8{5, 6}, + }, + 104: &s.Int16FieldData{ + NumRows: []int64{2}, + Data: []int16{7, 8}, + }, + 105: &s.Int32FieldData{ + NumRows: []int64{2}, + Data: []int32{9, 10}, + }, + 106: &s.Int64FieldData{ + NumRows: []int64{2}, + Data: []int64{1, 2}, + }, + 107: &s.FloatFieldData{ + NumRows: []int64{2}, + Data: []float32{2.333, 2.334}, + }, + 108: &s.DoubleFieldData{ + NumRows: []int64{2}, + Data: []float64{3.333, 3.334}, + }, + }} +} + +func genTimestamp() typeutil.Timestamp { + // Generate birthday of Golang + gb := time.Date(2009, time.Month(11), 10, 23, 0, 0, 0, time.UTC) + return tsoutil.ComposeTSByTime(gb, 0) +} diff --git a/internal/util/paramtable/global_param.go b/internal/util/paramtable/global_param.go index cea0b20157..ebfa4848c2 100644 --- a/internal/util/paramtable/global_param.go +++ b/internal/util/paramtable/global_param.go @@ -707,6 +707,9 @@ type dataCoordConfig struct { EnableAutoCompaction bool EnableGarbageCollection bool + RetentionDuration int64 + CompactionEntityExpiration int64 + // Garbage Collection GCInterval time.Duration GCMissingTolerance time.Duration @@ -724,6 +727,7 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) { p.initEnableCompaction() p.initEnableAutoCompaction() + p.initCompactionEntityExpiration() p.initEnableGarbageCollection() p.initGCInterval() @@ -774,6 +778,16 @@ func (p *dataCoordConfig) initEnableAutoCompaction() { p.EnableAutoCompaction = p.BaseParams.ParseBool("dataCoord.compaction.enableAutoCompaction", false) } +func (p *dataCoordConfig) initCompactionEntityExpiration() { + p.CompactionEntityExpiration = p.BaseParams.ParseInt64WithDefault("dataCoord.compaction.entityExpiration", math.MaxInt64) + p.CompactionEntityExpiration = func(x, y int64) int64 { + if x > y { + return x + } + return y + }(p.CompactionEntityExpiration, p.RetentionDuration) +} + /////////////////////////////////////////////////////////////////////////////// // --- datanode --- type dataNodeConfig struct {