enhance: [2.5]Add missing delete metrics (#38634) (#38747)

Add 2 counter metrics:
- Total delete entries from deltalog:
milvus_datanode_compaction_delete_count
- Total missing deletes: milvus_datanode_compaction_missing_delete_count

See also: #34665
pr: #38634

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/39034/head
XuanYang-cn 2025-01-07 11:20:56 +08:00 committed by GitHub
parent e6dd3e5a57
commit b457c2f415
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 278 additions and 162 deletions

View File

@ -56,7 +56,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -93,7 +92,7 @@ type clusteringCompactionTask struct {
// inner field
collectionID int64
partitionID int64
currentTs typeutil.Timestamp // for TTL
currentTime time.Time // for TTL
isVectorClusteringKey bool
clusteringKeyField *schemapb.FieldSchema
primaryKeyField *schemapb.FieldSchema
@ -223,7 +222,7 @@ func (t *clusteringCompactionTask) init() error {
t.primaryKeyField = pkField
t.isVectorClusteringKey = typeutil.IsVectorType(t.clusteringKeyField.DataType)
t.currentTs = tsoutil.GetCurrentTime()
t.currentTime = time.Now()
t.memoryBufferSize = t.getMemoryBufferSize()
workerPoolSize := t.getWorkerPoolSize()
t.mappingPool = conc.NewPool[any](workerPoolSize)
@ -563,11 +562,7 @@ func (t *clusteringCompactionTask) mappingSegment(
log.Info("mapping segment start")
processStart := time.Now()
fieldBinlogPaths := make([][]string, 0)
var (
expired int64 = 0
deleted int64 = 0
remained int64 = 0
)
var remained int64 = 0
deltaPaths := make([]string, 0)
for _, d := range segment.GetDeltalogs() {
@ -579,17 +574,7 @@ func (t *clusteringCompactionTask) mappingSegment(
if err != nil {
return err
}
isDeletedValue := func(v *storage.Value) bool {
ts, ok := delta[v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
return true
}
return false
}
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
mappingStats := &clusteringpb.ClusteringCentroidIdMappingStats{}
if t.isVectorClusteringKey {
@ -656,15 +641,7 @@ func (t *clusteringCompactionTask) mappingSegment(
v := pkIter.Value()
offset++
// Filtering deleted entity
if isDeletedValue(v) {
deleted++
continue
}
// Filtering expired entity
ts := typeutil.Timestamp(v.Timestamp)
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) {
expired++
if entityFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}
@ -753,13 +730,19 @@ func (t *clusteringCompactionTask) mappingSegment(
}
}
}
missing := entityFilter.GetMissingDeleteCount()
log.Info("mapping segment end",
zap.Int64("remained_entities", remained),
zap.Int64("deleted_entities", deleted),
zap.Int64("expired_entities", expired),
zap.Int("deleted_entities", entityFilter.GetDeletedCount()),
zap.Int("expired_entities", entityFilter.GetExpiredCount()),
zap.Int("deltalog deletes", entityFilter.GetDeltalogDeleteCount()),
zap.Int("missing deletes", missing),
zap.Int64("written_row_num", t.writtenRowNum.Load()),
zap.Duration("elapse", time.Since(processStart)))
metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetDeltalogDeleteCount()))
metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(missing))
return nil
}
@ -1175,8 +1158,6 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
var (
timestampTo int64 = -1
timestampFrom int64 = -1
expired int64 = 0
deleted int64 = 0
remained int64 = 0
analyzeResult map[interface{}]int64 = make(map[interface{}]int64, 0)
)
@ -1203,6 +1184,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}
expiredFilter := newEntityFilter(nil, t.plan.GetCollectionTtl(), t.currentTime)
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
@ -1233,9 +1215,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
v := pkIter.Value()
// Filtering expired entity
ts := typeutil.Timestamp(v.Timestamp)
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, ts) {
expired++
if expiredFilter.Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}
@ -1264,8 +1244,7 @@ func (t *clusteringCompactionTask) scalarAnalyzeSegment(
log.Info("analyze segment end",
zap.Int64("remained entities", remained),
zap.Int64("deleted entities", deleted),
zap.Int64("expired entities", expired),
zap.Int("expired entities", expiredFilter.GetExpiredCount()),
zap.Duration("map elapse", time.Since(processStart)))
return analyzeResult, nil
}

View File

@ -37,25 +37,94 @@ import (
const compactionBatchSize = 100
func isExpiredEntity(ttl int64, now, ts typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if ttl <= 0 {
return false
type EntityFilter struct {
deletedPkTs map[interface{}]typeutil.Timestamp // pk2ts
ttl int64 // nanoseconds
currentTime time.Time
expiredCount int
deletedCount int
}
func newEntityFilter(deletedPkTs map[interface{}]typeutil.Timestamp, ttl int64, currTime time.Time) *EntityFilter {
if deletedPkTs == nil {
deletedPkTs = make(map[interface{}]typeutil.Timestamp)
}
return &EntityFilter{
deletedPkTs: deletedPkTs,
ttl: ttl,
currentTime: currTime,
}
}
func (filter *EntityFilter) Filtered(pk any, ts typeutil.Timestamp) bool {
if filter.isEntityDeleted(pk, ts) {
filter.deletedCount++
return true
}
pts, _ := tsoutil.ParseTS(ts)
pnow, _ := tsoutil.ParseTS(now)
expireTime := pts.Add(time.Duration(ttl))
return expireTime.Before(pnow)
// Filtering expired entity
if filter.isEntityExpired(ts) {
filter.expiredCount++
return true
}
return false
}
func (filter *EntityFilter) GetExpiredCount() int {
return filter.expiredCount
}
func (filter *EntityFilter) GetDeletedCount() int {
return filter.deletedCount
}
func (filter *EntityFilter) GetDeltalogDeleteCount() int {
return len(filter.deletedPkTs)
}
func (filter *EntityFilter) GetMissingDeleteCount() int {
diff := filter.GetDeltalogDeleteCount() - filter.GetDeletedCount()
if diff <= 0 {
diff = 0
}
return diff
}
func (filter *EntityFilter) isEntityDeleted(pk interface{}, pkTs typeutil.Timestamp) bool {
if deleteTs, ok := filter.deletedPkTs[pk]; ok {
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if pkTs < deleteTs {
return true
}
}
return false
}
func (filter *EntityFilter) isEntityExpired(entityTs typeutil.Timestamp) bool {
// entity expire is not enabled if duration <= 0
if filter.ttl <= 0 {
return false
}
entityTime, _ := tsoutil.ParseTS(entityTs)
// this dur can represents 292 million years before or after 1970, enough for milvus
// ttl calculation
dur := filter.currentTime.UnixMilli() - entityTime.UnixMilli()
// filter.ttl is nanoseconds
return filter.ttl/int64(time.Millisecond) <= dur
}
func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[interface{}]typeutil.Timestamp, error) {
pk2ts := make(map[interface{}]typeutil.Timestamp)
pk2Ts := make(map[interface{}]typeutil.Timestamp)
log := log.Ctx(ctx)
if len(paths) == 0 {
log.Debug("compact with no deltalogs, skip merge deltalogs")
return pk2ts, nil
return pk2Ts, nil
}
blobs := make([]*storage.Blob, 0)
@ -88,17 +157,15 @@ func mergeDeltalogs(ctx context.Context, io io.BinlogIO, paths []string) (map[in
}
dl := reader.Value()
// If pk already exists in pk2ts, record the later one.
if ts, ok := pk2ts[dl.Pk.GetValue()]; ok && ts > dl.Ts {
if ts, ok := pk2Ts[dl.Pk.GetValue()]; ok && ts > dl.Ts {
continue
}
pk2ts[dl.Pk.GetValue()] = dl.Ts
pk2Ts[dl.Pk.GetValue()] = dl.Ts
}
log.Info("compact mergeDeltalogs end",
zap.Int("deleted pk counts", len(pk2ts)))
log.Info("compact mergeDeltalogs end", zap.Int("delete entries counts", len(pk2Ts)))
return pk2ts, nil
return pk2Ts, nil
}
func composePaths(segments []*datapb.CompactionSegmentBinlogs) (

View File

@ -0,0 +1,84 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package compaction
import (
"math"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
func TestCompactorCommonTaskSuite(t *testing.T) {
suite.Run(t, new(CompactorCommonSuite))
}
type CompactorCommonSuite struct {
suite.Suite
}
func (s *CompactorCommonSuite) TestEntityFilterByTTL() {
milvusBirthday := getMilvusBirthday()
tests := []struct {
description string
collTTL int64
nowTime time.Time
entityTime time.Time
expect bool
}{
// ttl == maxInt64, dur is 1hour, no entities should expire
{"ttl=maxInt64, now<entity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(time.Hour), false},
{"ttl=maxInt64, now>entity", math.MaxInt64, milvusBirthday, milvusBirthday.Add(-time.Hour), false},
{"ttl=maxInt64, now==entity", math.MaxInt64, milvusBirthday, milvusBirthday, false},
// ttl == 0, no entities should expire
{"ttl=0, now==entity", 0, milvusBirthday, milvusBirthday, false},
{"ttl=0, now>entity", 0, milvusBirthday, milvusBirthday.Add(-time.Hour), false},
{"ttl=0, now<entity", 0, milvusBirthday, milvusBirthday.Add(time.Hour), false},
// ttl == 10days
{"ttl=10days, nowTs-entityTs>10days", 864000000000000, milvusBirthday.AddDate(0, 0, 11), milvusBirthday, true},
{"ttl=10days, nowTs-entityTs==10days", 864000000000000, milvusBirthday.AddDate(0, 0, 10), milvusBirthday, true},
{"ttl=10days, nowTs-entityTs<10days", 864000000000000, milvusBirthday.AddDate(0, 0, 9), milvusBirthday, false},
// ttl is maxInt64
{"ttl=maxInt64, nowTs-entityTs>1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 11), milvusBirthday, true},
{"ttl=maxInt64, nowTs-entityTs==1000years", math.MaxInt64, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true},
{"ttl=maxInt64, nowTs-entityTs==240year", math.MaxInt64, milvusBirthday.AddDate(240, 0, 0), milvusBirthday, false},
{"ttl=maxInt64, nowTs-entityTs==maxDur", math.MaxInt64, milvusBirthday.Add(time.Duration(math.MaxInt64)), milvusBirthday, true},
{"ttl<maxInt64, nowTs-entityTs==1000years", math.MaxInt64 - 1, milvusBirthday.AddDate(1000, 0, 0), milvusBirthday, true},
}
for _, test := range tests {
s.Run(test.description, func() {
filter := newEntityFilter(nil, test.collTTL, test.nowTime)
entityTs := tsoutil.ComposeTSByTime(test.entityTime, 0)
got := filter.Filtered("mockpk", entityTs)
s.Equal(test.expect, got)
if got {
s.Equal(1, filter.GetExpiredCount())
s.Equal(0, filter.GetDeletedCount())
} else {
s.Equal(0, filter.GetExpiredCount())
s.Equal(0, filter.GetDeletedCount())
}
})
}
}

View File

@ -3,8 +3,10 @@ package compaction
import (
"container/heap"
"context"
"fmt"
sio "io"
"math"
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
@ -15,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -25,7 +28,7 @@ func mergeSortMultipleSegments(ctx context.Context,
binlogIO io.BinlogIO,
binlogs []*datapb.CompactionSegmentBinlogs,
tr *timerecord.TimeRecorder,
currentTs typeutil.Timestamp,
currentTime time.Time,
collectionTtl int64,
bm25FieldIds []int64,
) ([]*datapb.CompactionSegment, error) {
@ -41,11 +44,6 @@ func mergeSortMultipleSegments(ctx context.Context,
compAlloc := NewCompactionAllocator(segIDAlloc, logIDAlloc)
mWriter := NewMultiSegmentWriter(binlogIO, compAlloc, plan, maxRows, partitionID, collectionID, bm25FieldIds)
var (
expiredRowCount int64 // the number of expired entities
deletedRowCount int64
)
pkField, err := typeutil.GetPrimaryFieldSchema(plan.GetSchema())
if err != nil {
log.Warn("failed to get pk field from schema")
@ -54,7 +52,7 @@ func mergeSortMultipleSegments(ctx context.Context,
// SegmentDeserializeReaderTest(binlogPaths, t.binlogIO, writer.GetPkID())
segmentReaders := make([]*SegmentDeserializeReader, len(binlogs))
segmentDelta := make([]map[interface{}]storage.Timestamp, len(binlogs))
segmentFilters := make([]*EntityFilter, len(binlogs))
for i, s := range binlogs {
var binlogBatchCount int
for _, b := range s.GetFieldBinlogs() {
@ -84,10 +82,11 @@ func mergeSortMultipleSegments(ctx context.Context,
deltalogPaths = append(deltalogPaths, l.GetLogPath())
}
}
segmentDelta[i], err = mergeDeltalogs(ctx, binlogIO, deltalogPaths)
delta, err := mergeDeltalogs(ctx, binlogIO, deltalogPaths)
if err != nil {
return nil, err
}
segmentFilters[i] = newEntityFilter(delta, collectionTtl, currentTime)
}
advanceRow := func(i int) (*storage.Value, error) {
@ -97,20 +96,10 @@ func mergeSortMultipleSegments(ctx context.Context,
return nil, err
}
ts, ok := segmentDelta[i][v.PK.GetValue()]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && uint64(v.Timestamp) < ts {
deletedRowCount++
if segmentFilters[i].Filtered(v.PK.GetValue(), uint64(v.Timestamp)) {
continue
}
// Filtering expired entity
if isExpiredEntity(collectionTtl, currentTs, typeutil.Timestamp(v.Timestamp)) {
expiredRowCount++
continue
}
return v, nil
}
}
@ -119,12 +108,15 @@ func mergeSortMultipleSegments(ctx context.Context,
heap.Init(&pq)
for i := range segmentReaders {
if v, err := advanceRow(i); err == nil {
heap.Push(&pq, &PQItem{
Value: v,
Index: i,
})
v, err := advanceRow(i)
if err != nil {
log.Warn("compact wrong, failed to advance row", zap.Error(err))
return nil, err
}
heap.Push(&pq, &PQItem{
Value: v,
Index: i,
})
}
for pq.Len() > 0 {
@ -160,12 +152,30 @@ func mergeSortMultipleSegments(ctx context.Context,
seg.IsSorted = true
}
var (
deletedRowCount int
expiredRowCount int
missingDeleteCount int
deltalogDeleteEntriesCount int
)
for _, filter := range segmentFilters {
deletedRowCount += filter.GetDeletedCount()
expiredRowCount += filter.GetExpiredCount()
missingDeleteCount += filter.GetMissingDeleteCount()
deltalogDeleteEntriesCount += filter.GetDeltalogDeleteCount()
}
totalElapse := tr.RecordSpan()
log.Info("compact mergeSortMultipleSegments end",
zap.Int64s("mergeSplit to segments", lo.Keys(mWriter.cachedMeta)),
zap.Int64("deleted row count", deletedRowCount),
zap.Int64("expired entities", expiredRowCount),
zap.Int("deleted row count", deletedRowCount),
zap.Int("expired entities", expiredRowCount),
zap.Int("missing deletes", missingDeleteCount),
zap.Duration("total elapse", totalElapse))
metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(collectionID)).Add(float64(deltalogDeleteEntriesCount))
metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(collectionID)).Add(float64(missingDeleteCount))
return res, nil
}

View File

@ -40,13 +40,12 @@ import (
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type mixCompactionTask struct {
binlogIO io.BinlogIO
currentTs typeutil.Timestamp
binlogIO io.BinlogIO
currentTime time.Time
plan *datapb.CompactionPlan
@ -74,13 +73,13 @@ func NewMixCompactionTask(
) *mixCompactionTask {
ctx1, cancel := context.WithCancel(ctx)
return &mixCompactionTask{
ctx: ctx1,
cancel: cancel,
binlogIO: binlogIO,
plan: plan,
tr: timerecord.NewTimeRecorder("mergeSplit compaction"),
currentTs: tsoutil.GetCurrentTime(),
done: make(chan struct{}, 1),
ctx: ctx1,
cancel: cancel,
binlogIO: binlogIO,
plan: plan,
tr: timerecord.NewTimeRecorder("mergeSplit compaction"),
currentTime: time.Now(),
done: make(chan struct{}, 1),
}
}
@ -201,23 +200,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
log.Warn("compact wrong, fail to merge deltalogs", zap.Error(err))
return
}
isValueDeleted := func(pk any, ts typeutil.Timestamp) bool {
oldts, ok := delta[pk]
// insert task and delete task has the same ts when upsert
// here should be < instead of <=
// to avoid the upsert data to be deleted after compact
if ok && ts < oldts {
deletedRowCount++
return true
}
// Filtering expired entity
if isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, typeutil.Timestamp(ts)) {
expiredRowCount++
return true
}
return false
}
entityFilter := newEntityFilter(delta, t.plan.GetCollectionTtl(), t.currentTime)
reader, err := storage.NewCompositeBinlogRecordReader(blobs)
if err != nil {
@ -265,7 +248,7 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
panic("invalid data type")
}
ts := typeutil.Timestamp(tsArray.Value(i))
if isValueDeleted(pk, ts) {
if entityFilter.Filtered(pk, ts) {
if sliceStart != -1 {
err = writeSlice(r, sliceStart, i)
if err != nil {
@ -288,6 +271,14 @@ func (t *mixCompactionTask) writeSegment(ctx context.Context,
}
}
}
deltalogDeleteEntriesCount := len(delta)
deletedRowCount = int64(entityFilter.GetDeletedCount())
expiredRowCount = int64(entityFilter.GetExpiredCount())
metrics.DataNodeCompactionDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(deltalogDeleteEntriesCount))
metrics.DataNodeCompactionMissingDeleteCount.WithLabelValues(fmt.Sprint(t.collectionID)).Add(float64(entityFilter.GetMissingDeleteCount()))
return
}
@ -347,7 +338,7 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) {
if sortMergeAppicable {
log.Info("compact by merge sort")
res, err = mergeSortMultipleSegments(ctxTimeout, t.plan, t.collectionID, t.partitionID, t.maxRows, t.binlogIO,
t.plan.GetSegmentBinlogs(), t.tr, t.currentTs, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
t.plan.GetSegmentBinlogs(), t.tr, t.currentTime, t.plan.GetCollectionTtl(), t.bm25FieldIDs)
if err != nil {
log.Warn("compact wrong, fail to merge sort segments", zap.Error(err))
return nil, err

View File

@ -340,8 +340,7 @@ func (s *MixCompactionTaskSuite) TestCompactSortedSegment() {
func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
s.initSegBuffer(1, 3)
collTTL := 864000 // 10 days
currTs := tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second*(time.Duration(collTTL)+1)), 0)
s.task.currentTs = currTs
s.task.currentTime = getMilvusBirthday().Add(time.Second * (time.Duration(collTTL) + 1))
s.task.plan.CollectionTtl = int64(collTTL)
alloc := allocator.NewLocalAllocator(888888, math.MaxInt64)
@ -512,15 +511,14 @@ func (s *MixCompactionTaskSuite) TestMergeDeltalogsMultiSegment() {
got, err := mergeDeltalogs(s.task.ctx, s.task.binlogIO, []string{"random"})
s.NoError(err)
s.Equal(len(got), len(test.expectedpk2ts))
s.Equal(len(test.expectedpk2ts), len(got))
gotKeys := lo.Map(lo.Keys(got), func(k interface{}, _ int) int64 {
res, ok := k.(int64)
for gotKT, gotV := range got {
gotK, ok := gotKT.(int64)
s.Require().True(ok)
return res
})
s.ElementsMatch(gotKeys, lo.Keys(test.expectedpk2ts))
s.ElementsMatch(lo.Values(got), lo.Values(test.expectedpk2ts))
s.EqualValues(test.expectedpk2ts[gotK], gotV)
}
})
}
}
@ -551,13 +549,12 @@ func (s *MixCompactionTaskSuite) TestMergeDeltalogsOneSegment() {
s.NotNil(got)
s.Equal(len(expectedMap), len(got))
gotKeys := lo.Map(lo.Keys(got), func(k interface{}, _ int) int64 {
res, ok := k.(int64)
for gotKT, gotV := range got {
gotK, ok := gotKT.(int64)
s.Require().True(ok)
return res
})
s.ElementsMatch(gotKeys, lo.Keys(expectedMap))
s.ElementsMatch(lo.Values(got), lo.Values(expectedMap))
s.EqualValues(expectedMap[gotK], gotV)
}
}
func (s *MixCompactionTaskSuite) TestCompactFail() {
@ -586,44 +583,6 @@ func (s *MixCompactionTaskSuite) TestCompactFail() {
})
}
func (s *MixCompactionTaskSuite) TestIsExpiredEntity() {
milvusBirthdayTs := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)
tests := []struct {
description string
collTTL int64
nowTs uint64
entityTs uint64
expect bool
}{
{"ttl=maxInt64, nowTs-entityTs=ttl", math.MaxInt64, math.MaxInt64, 0, true},
{"ttl=maxInt64, nowTs-entityTs < 0", math.MaxInt64, milvusBirthdayTs, 0, false},
{"ttl=maxInt64, 0<nowTs-entityTs<ttl", math.MaxInt64, 0, milvusBirthdayTs, false},
{"ttl=maxInt64, nowTs-entityTs>ttl v2", math.MaxInt64, math.MaxInt64, milvusBirthdayTs, true},
// entityTs==currTs will never happen
// {"ttl=maxInt64, curTs-entityTs=0", math.MaxInt64, milvusBirthdayTs, milvusBirthdayTs, true},
{"ttl=0, nowTs>entityTs", 0, milvusBirthdayTs + 1, milvusBirthdayTs, false},
{"ttl=0, nowTs==entityTs", 0, milvusBirthdayTs, milvusBirthdayTs, false},
{"ttl=0, nowTs<entityTs", 0, milvusBirthdayTs, milvusBirthdayTs + 1, false},
{"ttl=10days, nowTs-entityTs>10days", 864000, milvusBirthdayTs + 864001, milvusBirthdayTs, true},
{"ttl=10days, nowTs-entityTs==10days", 864000, milvusBirthdayTs + 864000, milvusBirthdayTs, true},
{"ttl=10days, nowTs-entityTs<10days", 864000, milvusBirthdayTs + 10, milvusBirthdayTs, false},
}
for _, test := range tests {
s.Run(test.description, func() {
t := &mixCompactionTask{
plan: &datapb.CompactionPlan{
CollectionTtl: test.collTTL,
},
currentTs: test.nowTs,
}
got := isExpiredEntity(t.plan.GetCollectionTtl(), t.currentTs, test.entityTs)
s.Equal(test.expect, got)
})
}
}
func getRow(magic int64) map[int64]interface{} {
ts := tsoutil.ComposeTSByTime(getMilvusBirthday(), 0)
return map[int64]interface{}{

View File

@ -604,7 +604,7 @@ message CompactionPlan {
CompactionType type = 5;
uint64 timetravel = 6;
string channel = 7;
int64 collection_ttl = 8;
int64 collection_ttl = 8; // nanoseconds
int64 total_rows = 9;
schema.CollectionSchema schema = 10;
int64 clustering_key_field = 11;

View File

@ -237,6 +237,22 @@ var (
nodeIDLabelName,
channelNameLabelName,
})
DataNodeCompactionDeleteCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "compaction_delete_count",
Help: "Number of delete entries in compaction",
}, []string{collectionIDLabelName})
DataNodeCompactionMissingDeleteCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "compaction_missing_delete_count",
Help: "Number of missing deletes in compaction",
}, []string{collectionIDLabelName})
)
// RegisterDataNode registers DataNode metrics
@ -261,6 +277,8 @@ func RegisterDataNode(registry *prometheus.Registry) {
// compaction related
registry.MustRegister(DataNodeCompactionLatency)
registry.MustRegister(DataNodeCompactionLatencyInQueue)
registry.MustRegister(DataNodeCompactionDeleteCount)
registry.MustRegister(DataNodeCompactionMissingDeleteCount)
// deprecated metrics
registry.MustRegister(DataNodeForwardDeleteMsgTimeTaken)
registry.MustRegister(DataNodeNumProducers)
@ -298,4 +316,12 @@ func CleanupDataNodeCollectionMetrics(nodeID int64, collectionID int64, channel
nodeIDLabelName: fmt.Sprint(nodeID),
collectionIDLabelName: fmt.Sprint(collectionID),
})
DataNodeCompactionDeleteCount.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
DataNodeCompactionMissingDeleteCount.Delete(prometheus.Labels{
collectionIDLabelName: fmt.Sprint(collectionID),
})
}