Implement automatic item expiration on compaction (#14418)

* Implement automatic item expiration on compaction

Signed-off-by: JunSung Park <writtic@gmail.com>

* Change entityExpiration value comparison logic

Signed-off-by: JunSung Park <writtic@gmail.com>

* Add logging for the number of expired entity

Signed-off-by: JunSung Park <writtic@gmail.com>

* Set default value of CompactionEntityExpiration to off

Signed-off-by: JunSung Park <writtic@gmail.com>

* Fix errors for code checker

Signed-off-by: JunSung Park <writtic@gmail.com>

* Set default entityExpiration to max value

Signed-off-by: JunSung Park <writtic@gmail.com>
pull/14418/merge
JunSung Park 2022-02-07 23:45:46 +09:00 committed by GitHub
parent 18b2753c09
commit f6873d3dc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 221 additions and 23 deletions

View File

@ -161,10 +161,11 @@ dataCoord:
segment:
maxSize: 512 # Maximum size of a segment in MB
sealProportion: 0.75 # It's the minimum proportion for a segment which can be sealed
assignmentExpiration: 2000 # The time of the assignment expiration in ms
assignmentExpiration: 2000 # The time of the assignment expiration in ms
compaction:
enableAutoCompaction: true
entityExpiration: 9223372037 # Entity expiration in seconds, CAUTION make sure entityExpiration >= retentionDuration and 9223372037 is the maximum value of entityExpiration
gc:
interval: 3600 # gc interval in seconds

View File

@ -33,6 +33,8 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -179,13 +181,14 @@ func (t *compactionTask) mergeDeltalogs(dBlobs map[UniqueID][]*Blob, timetravelT
return pk2ts, dbuff, nil
}
func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema) ([]*InsertData, int64, error) {
func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp, schema *schemapb.CollectionSchema, currentTs Timestamp) ([]*InsertData, int64, error) {
var (
dim int // dimension of vector field
num int // numOfRows in each binlog
n int // binlog number
err error
dim int // dimension of vector field
num int // numOfRows in each binlog
n int // binlog number
expired int64 // the number of expired entity
err error
iDatas = make([]*InsertData, 0)
fID2Type = make(map[UniqueID]schemapb.DataType)
@ -209,6 +212,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
}
expired = 0
for mergeItr.HasNext() {
// no error if HasNext() returns true
vInter, _ := mergeItr.Next()
@ -223,6 +227,13 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
continue
}
ts := Timestamp(v.Timestamp)
// Filtering expired entity
if t.isExpiredEntity(ts, currentTs) {
expired++
continue
}
row, ok := v.Value.(map[UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong")
@ -273,7 +284,7 @@ func (t *compactionTask) merge(mergeItr iterator, delta map[UniqueID]Timestamp,
}
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows))
log.Debug("merge end", zap.Int64("planID", t.getPlanID()), zap.Int64("remaining insert numRows", numRows), zap.Int64("expired entities", expired))
return iDatas, numRows, nil
}
@ -415,7 +426,7 @@ func (t *compactionTask) compact() error {
return err
}
iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema())
iDatas, numRows, err := t.merge(mergeItr, deltaPk2Ts, meta.GetSchema(), t.GetCurrentTime())
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
@ -647,3 +658,26 @@ func (t *compactionTask) getSegmentMeta(segID UniqueID) (UniqueID, UniqueID, *et
func (t *compactionTask) getCollection() UniqueID {
return t.getCollectionID()
}
func (t *compactionTask) GetCurrentTime() typeutil.Timestamp {
return tsoutil.GetCurrentTime()
}
func (t *compactionTask) isExpiredEntity(ts, now Timestamp) bool {
const MaxEntityExpiration = 9223372036 // the value was setup by math.MaxInt64 / time.Second
// Check calculable range of milvus config value
if Params.DataCoordCfg.CompactionEntityExpiration > MaxEntityExpiration {
return false
}
duration := time.Duration(Params.DataCoordCfg.CompactionEntityExpiration) * time.Second
// Prevent from duration overflow value
if duration < 0 {
return false
}
pts, _ := tsoutil.ParseTS(ts)
pnow, _ := tsoutil.ParseTS(now)
expireTime := pts.Add(duration)
return expireTime.Before(pnow)
}

View File

@ -18,6 +18,7 @@ package datanode
import (
"context"
"math"
"testing"
"time"
@ -235,27 +236,114 @@ func TestCompactionTaskInnerMethods(t *testing.T) {
})
t.Run("Test merge", func(t *testing.T) {
iData := genInsertData()
meta := NewMetaFactory().GetCollectionMeta(1, "test")
t.Run("Merge without expiration", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test")
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
mitr := storage.NewMergeIterator([]iterator{iitr})
dm := map[UniqueID]Timestamp{
1: 10000,
}
dm := map[UniqueID]Timestamp{
1: 10000,
}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema())
assert.NoError(t, err)
assert.Equal(t, int64(1), numOfRow)
assert.Equal(t, 1, len(idata))
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), ct.GetCurrentTime())
assert.NoError(t, err)
assert.Equal(t, int64(1), numOfRow)
assert.Equal(t, 1, len(idata))
assert.NotEmpty(t, idata[0].Data)
})
t.Run("Merge with expiration", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = 864000 // 10 days in seconds
iData := genInsertDataWithExpiredTS()
meta := NewMetaFactory().GetCollectionMeta(1, "test")
iblobs, err := getInsertBlobs(100, iData, meta)
require.NoError(t, err)
iitr, err := storage.NewInsertBinlogIterator(iblobs, 106)
require.NoError(t, err)
mitr := storage.NewMergeIterator([]iterator{iitr})
dm := map[UniqueID]Timestamp{
1: 10000,
}
ct := &compactionTask{}
idata, numOfRow, err := ct.merge(mitr, dm, meta.GetSchema(), genTimestamp())
assert.NoError(t, err)
assert.Equal(t, int64(0), numOfRow)
assert.Equal(t, 1, len(idata))
assert.Empty(t, idata[0].Data)
})
})
t.Run("Test isExpiredEntity", func(t *testing.T) {
t.Run("When CompactionEntityExpiration is set math.MaxInt64", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64
ct := &compactionTask{}
res := ct.isExpiredEntity(0, genTimestamp())
assert.Equal(t, false, res)
res = ct.isExpiredEntity(math.MaxInt64, genTimestamp())
assert.Equal(t, false, res)
res = ct.isExpiredEntity(0, math.MaxInt64)
assert.Equal(t, false, res)
res = ct.isExpiredEntity(math.MaxInt64, math.MaxInt64)
assert.Equal(t, false, res)
res = ct.isExpiredEntity(math.MaxInt64, 0)
assert.Equal(t, false, res)
})
t.Run("When CompactionEntityExpiration is set MAX_ENTITY_EXPIRATION = 9223372036", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = 9223372036 // math.MaxInt64 / time.Second
ct := &compactionTask{}
res := ct.isExpiredEntity(0, genTimestamp())
assert.Equal(t, false, res)
res = ct.isExpiredEntity(math.MaxInt64, genTimestamp())
assert.Equal(t, false, res)
res = ct.isExpiredEntity(0, math.MaxInt64)
assert.Equal(t, true, res)
res = ct.isExpiredEntity(math.MaxInt64, math.MaxInt64)
assert.Equal(t, false, res)
res = ct.isExpiredEntity(math.MaxInt64, 0)
assert.Equal(t, false, res)
})
t.Run("When CompactionEntityExpiration is set 10 days", func(t *testing.T) {
Params.DataCoordCfg.CompactionEntityExpiration = 864000 // 10 days in seconds
ct := &compactionTask{}
res := ct.isExpiredEntity(0, genTimestamp())
assert.Equal(t, true, res)
res = ct.isExpiredEntity(math.MaxInt64, genTimestamp())
assert.Equal(t, false, res)
res = ct.isExpiredEntity(0, math.MaxInt64)
assert.Equal(t, true, res)
res = ct.isExpiredEntity(math.MaxInt64, math.MaxInt64)
assert.Equal(t, false, res)
res = ct.isExpiredEntity(math.MaxInt64, 0)
assert.Equal(t, false, res)
})
})
}
@ -285,6 +373,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Field2StatslogPaths: nil,
Deltalogs: nil,
}}
Params.DataCoordCfg.CompactionEntityExpiration = math.MaxInt64 // Turn off auto expiration
t.Run("Test compact invalid", func(t *testing.T) {
invalidAlloc := NewAllocatorFactory(-1)

View File

@ -33,6 +33,8 @@ import (
"github.com/milvus-io/milvus/internal/msgstream"
s "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/commonpb"
@ -816,3 +818,61 @@ func genEmptyInsertData() *InsertData {
},
}}
}
func genInsertDataWithExpiredTS() *InsertData {
return &InsertData{
Data: map[int64]s.FieldData{
0: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{11, 22},
},
1: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{329749364736000000, 329500223078400000}, // 2009-11-10 23:00:00 +0000 UTC, 2009-10-31 23:00:00 +0000 UTC
},
100: &s.FloatVectorFieldData{
NumRows: []int64{2},
Data: []float32{1.0, 6.0, 7.0, 8.0},
Dim: 2,
},
101: &s.BinaryVectorFieldData{
NumRows: []int64{2},
Data: []byte{0, 255, 255, 255, 128, 128, 128, 0},
Dim: 32,
},
102: &s.BoolFieldData{
NumRows: []int64{2},
Data: []bool{true, false},
},
103: &s.Int8FieldData{
NumRows: []int64{2},
Data: []int8{5, 6},
},
104: &s.Int16FieldData{
NumRows: []int64{2},
Data: []int16{7, 8},
},
105: &s.Int32FieldData{
NumRows: []int64{2},
Data: []int32{9, 10},
},
106: &s.Int64FieldData{
NumRows: []int64{2},
Data: []int64{1, 2},
},
107: &s.FloatFieldData{
NumRows: []int64{2},
Data: []float32{2.333, 2.334},
},
108: &s.DoubleFieldData{
NumRows: []int64{2},
Data: []float64{3.333, 3.334},
},
}}
}
func genTimestamp() typeutil.Timestamp {
// Generate birthday of Golang
gb := time.Date(2009, time.Month(11), 10, 23, 0, 0, 0, time.UTC)
return tsoutil.ComposeTSByTime(gb, 0)
}

View File

@ -707,6 +707,9 @@ type dataCoordConfig struct {
EnableAutoCompaction bool
EnableGarbageCollection bool
RetentionDuration int64
CompactionEntityExpiration int64
// Garbage Collection
GCInterval time.Duration
GCMissingTolerance time.Duration
@ -724,6 +727,7 @@ func (p *dataCoordConfig) init(bp *BaseParamTable) {
p.initEnableCompaction()
p.initEnableAutoCompaction()
p.initCompactionEntityExpiration()
p.initEnableGarbageCollection()
p.initGCInterval()
@ -774,6 +778,16 @@ func (p *dataCoordConfig) initEnableAutoCompaction() {
p.EnableAutoCompaction = p.BaseParams.ParseBool("dataCoord.compaction.enableAutoCompaction", false)
}
func (p *dataCoordConfig) initCompactionEntityExpiration() {
p.CompactionEntityExpiration = p.BaseParams.ParseInt64WithDefault("dataCoord.compaction.entityExpiration", math.MaxInt64)
p.CompactionEntityExpiration = func(x, y int64) int64 {
if x > y {
return x
}
return y
}(p.CompactionEntityExpiration, p.RetentionDuration)
}
///////////////////////////////////////////////////////////////////////////////
// --- datanode ---
type dataNodeConfig struct {