fix: [2.4] L0 compactor may cause DN OOM (#33564)

See also: #33547
pr: #33554

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/33597/head^2
XuanYang-cn 2024-06-05 10:51:50 +08:00 committed by GitHub
parent aaf6c85095
commit 95582b0208
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 283 additions and 381 deletions

View File

@ -20,7 +20,6 @@ import (
"context"
"fmt"
"math"
"time"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
@ -29,19 +28,15 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/datanode/syncmgr"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -122,9 +117,6 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return nil, errContext
}
ctxTimeout, cancelAll := context.WithTimeout(ctx, time.Duration(t.plan.GetTimeoutInSeconds())*time.Second)
defer cancelAll()
l0Segments := lo.Filter(t.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
return s.Level == datapb.SegmentLevel_L0
})
@ -162,13 +154,8 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
}
}
var resultSegments []*datapb.CompactionSegment
if float64(hardware.GetFreeMemoryCount())*paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat() < float64(totalSize) {
resultSegments, err = t.linearProcess(ctxTimeout, targetSegIDs, totalDeltalogs)
} else {
resultSegments, err = t.batchProcess(ctxTimeout, targetSegIDs, lo.Values(totalDeltalogs)...)
}
batchSize := getMaxBatchSize(totalSize)
resultSegments, err := t.process(ctx, batchSize, targetSegIDs, lo.Values(totalDeltalogs)...)
if err != nil {
return nil, err
}
@ -188,91 +175,70 @@ func (t *levelZeroCompactionTask) compact() (*datapb.CompactionPlanResult, error
return result, nil
}
func (t *levelZeroCompactionTask) linearProcess(ctx context.Context, targetSegments []int64, totalDeltalogs map[int64][]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)
var (
resultSegments = make(map[int64]*datapb.CompactionSegment)
alteredSegments = make(map[int64]*storage.DeleteData)
)
for segID, deltaLogs := range totalDeltalogs {
log := log.With(zap.Int64("levelzero segment", segID))
log.Info("Linear L0 compaction start processing segment")
allIters, err := t.loadDelta(ctx, deltaLogs)
if err != nil {
log.Warn("Linear L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
return nil, err
}
t.splitDelta(ctx, allIters, alteredSegments, targetSegments)
err = t.uploadByCheck(ctx, true, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload buffer fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
return nil, err
}
// batch size means segment count
func getMaxBatchSize(totalSize int64) int {
max := 1
memLimit := float64(hardware.GetFreeMemoryCount()) * paramtable.Get().DataNodeCfg.L0BatchMemoryRatio.GetAsFloat()
if memLimit > float64(totalSize) {
max = int(memLimit / float64(totalSize))
}
err := t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Linear L0 compaction upload all buffer fail", zap.Int64s("target segment", targetSegments), zap.Error(err))
return nil, err
}
log.Info("Linear L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
return max
}
func (t *levelZeroCompactionTask) batchProcess(ctx context.Context, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.String("type", t.plan.GetType().String()),
zap.Int("target segment counts", len(targetSegments)),
)
log.Info("Batch L0 compaction start processing")
resultSegments := make(map[int64]*datapb.CompactionSegment)
iters, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
if err != nil {
log.Warn("Batch L0 compaction loadDelta fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
return nil, err
}
alteredSegments := make(map[int64]*storage.DeleteData)
t.splitDelta(ctx, iters, alteredSegments, targetSegments)
err = t.uploadByCheck(ctx, false, alteredSegments, resultSegments)
if err != nil {
log.Warn("Batch L0 compaction upload fail", zap.Int64s("target segments", targetSegments), zap.Error(err))
return nil, err
}
log.Info("Batch L0 compaction finished", zap.Duration("elapse", t.tr.RecordSpan()))
return lo.Values(resultSegments), nil
}
func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*iter.DeltalogIterator, error) {
allIters := make([]*iter.DeltalogIterator, 0)
for _, paths := range deltaLogs {
blobs, err := t.Download(ctx, paths)
func (t *levelZeroCompactionTask) serializeUpload(ctx context.Context, segmentWriters map[int64]*SegmentDeltaWriter) ([]*datapb.CompactionSegment, error) {
allBlobs := make(map[string][]byte)
results := make([]*datapb.CompactionSegment, 0)
for segID, writer := range segmentWriters {
blob, tr, err := writer.Finish()
if err != nil {
log.Warn("L0 compaction serializeUpload serialize failed", zap.Error(err))
return nil, err
}
allIters = append(allIters, iter.NewDeltalogIterator(blobs, nil))
logID, err := t.allocator.AllocOne()
if err != nil {
log.Warn("L0 compaction serializeUpload alloc failed", zap.Error(err))
return nil, err
}
blobKey, _ := binlog.BuildLogPath(storage.DeleteBinlog, writer.collectionID, writer.partitionID, writer.segmentID, -1, logID)
allBlobs[blobKey] = blob.GetValue()
deltalog := &datapb.Binlog{
EntriesNum: writer.GetRowNum(),
LogSize: int64(len(blob.GetValue())),
MemorySize: blob.GetMemorySize(),
LogPath: blobKey,
LogID: logID,
TimestampFrom: tr.GetMinTimestamp(),
TimestampTo: tr.GetMaxTimestamp(),
}
results = append(results, &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{deltalog}}},
Channel: t.plan.GetChannel(),
})
}
return allIters, nil
if len(allBlobs) == 0 {
return nil, nil
}
if err := t.Upload(ctx, allBlobs); err != nil {
log.Warn("L0 compaction serializeUpload upload failed", zap.Error(err))
return nil, err
}
return results, nil
}
func (t *levelZeroCompactionTask) splitDelta(
ctx context.Context,
allIters []*iter.DeltalogIterator,
targetSegBuffer map[int64]*storage.DeleteData,
allDelta []*storage.DeleteData,
targetSegIDs []int64,
) {
) map[int64]*SegmentDeltaWriter {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact splitDelta")
defer span.End()
@ -285,113 +251,92 @@ func (t *levelZeroCompactionTask) splitDelta(
})
}
// spilt all delete data to segments
for _, deltaIter := range allIters {
for deltaIter.HasNext() {
// checked by HasNext, no error here
labeled, _ := deltaIter.Next()
targetSeg := lo.Associate(segments, func(info *metacache.SegmentInfo) (int64, *metacache.SegmentInfo) {
return info.SegmentID(), info
})
predicted := split(labeled.GetPk())
// spilt all delete data to segments
targetSegBuffer := make(map[int64]*SegmentDeltaWriter)
for _, delta := range allDelta {
for i, pk := range delta.Pks {
predicted := split(pk)
for _, gotSeg := range predicted {
delBuffer, ok := targetSegBuffer[gotSeg]
writer, ok := targetSegBuffer[gotSeg]
if !ok {
delBuffer = &storage.DeleteData{}
targetSegBuffer[gotSeg] = delBuffer
segment := targetSeg[gotSeg]
writer = NewSegmentDeltaWriter(gotSeg, segment.PartitionID(), t.getCollection())
targetSegBuffer[gotSeg] = writer
}
delBuffer.Append(labeled.GetPk(), labeled.GetTimestamp())
writer.Write(pk, delta.Tss[i])
}
}
}
return targetSegBuffer
}
func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) {
var (
collID = t.metacache.Collection()
uploadKv = make(map[string][]byte)
func (t *levelZeroCompactionTask) process(ctx context.Context, batchSize int, targetSegments []int64, deltaLogs ...[]string) ([]*datapb.CompactionSegment, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact process")
defer span.End()
results := make([]*datapb.CompactionSegment, 0)
batch := int(math.Ceil(float64(len(targetSegments)) / float64(batchSize)))
log := log.Ctx(t.ctx).With(
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("max conc segment counts", batchSize),
zap.Int("total segment counts", len(targetSegments)),
zap.Int("total batch", batch),
)
seg, ok := t.metacache.GetSegmentByID(segmentID)
if !ok {
return nil, nil, merr.WrapErrSegmentLack(segmentID)
}
blob, err := storage.NewDeleteCodec().Serialize(collID, seg.PartitionID(), segmentID, dData)
log.Info("L0 compaction process start")
allDelta, err := t.loadDelta(ctx, lo.Flatten(deltaLogs))
if err != nil {
return nil, nil, err
log.Warn("L0 compaction loadDelta fail", zap.Error(err))
return nil, err
}
logID, err := t.allocator.AllocOne()
if err != nil {
return nil, nil, err
}
blobKey := metautil.JoinIDPath(collID, seg.PartitionID(), segmentID, logID)
blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey)
uploadKv[blobPath] = blob.GetValue()
minTs := uint64(math.MaxUint64)
maxTs := uint64(0)
for _, ts := range dData.Tss {
if ts > maxTs {
maxTs = ts
for i := 0; i < batch; i++ {
left, right := i*batchSize, (i+1)*batchSize
if right > len(targetSegments) {
right = len(targetSegments)
}
if ts < minTs {
minTs = ts
batchSegments := targetSegments[left:right]
batchSegWriter := t.splitDelta(ctx, allDelta, batchSegments)
batchResults, err := t.serializeUpload(ctx, batchSegWriter)
if err != nil {
log.Warn("L0 compaction serialize upload fail", zap.Error(err))
return nil, err
}
log.Info("L0 compaction finished one batch", zap.Int("batch no.", i), zap.Int("batch segment count", len(batchResults)))
results = append(results, batchResults...)
}
deltalog := &datapb.Binlog{
EntriesNum: dData.RowCount,
LogSize: int64(len(blob.GetValue())),
LogPath: blobPath,
LogID: logID,
TimestampFrom: minTs,
TimestampTo: maxTs,
MemorySize: dData.Size(),
}
return uploadKv, deltalog, nil
log.Info("L0 compaction process done")
return results, nil
}
func (t *levelZeroCompactionTask) uploadByCheck(ctx context.Context, requireCheck bool, alteredSegments map[int64]*storage.DeleteData, resultSegments map[int64]*datapb.CompactionSegment) error {
allBlobs := make(map[string][]byte)
tmpResults := make(map[int64]*datapb.CompactionSegment)
for segID, dData := range alteredSegments {
if !requireCheck || (dData.Size() >= paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.GetAsInt64()) {
blobs, binlog, err := t.composeDeltalog(segID, dData)
if err != nil {
log.Warn("L0 compaction composeDelta fail", zap.Int64("segmentID", segID), zap.Error(err))
return err
}
allBlobs = lo.Assign(blobs, allBlobs)
tmpResults[segID] = &datapb.CompactionSegment{
SegmentID: segID,
Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{binlog}}},
Channel: t.plan.GetChannel(),
}
delete(alteredSegments, segID)
func (t *levelZeroCompactionTask) loadDelta(ctx context.Context, deltaLogs ...[]string) ([]*storage.DeleteData, error) {
_, span := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "L0Compact loadDelta")
defer span.End()
allData := make([]*storage.DeleteData, 0, len(deltaLogs))
for _, paths := range deltaLogs {
blobBytes, err := t.Download(ctx, paths)
if err != nil {
return nil, err
}
}
if len(allBlobs) == 0 {
return nil
}
if err := t.Upload(ctx, allBlobs); err != nil {
log.Warn("L0 compaction upload blobs fail", zap.Error(err))
return err
}
for segID, compSeg := range tmpResults {
if _, ok := resultSegments[segID]; !ok {
resultSegments[segID] = compSeg
} else {
binlog := compSeg.Deltalogs[0].Binlogs[0]
resultSegments[segID].Deltalogs[0].Binlogs = append(resultSegments[segID].Deltalogs[0].Binlogs, binlog)
blobs := make([]*storage.Blob, 0, len(blobBytes))
for _, blob := range blobBytes {
blobs = append(blobs, &storage.Blob{Value: blob})
}
_, _, dData, err := storage.NewDeleteCodec().Deserialize(blobs)
if err != nil {
return nil, err
}
}
return nil
allData = append(allData, dData)
}
return allData, nil
}

View File

@ -18,7 +18,6 @@ package datanode
import (
"context"
"path"
"testing"
"github.com/cockroachdb/errors"
@ -29,14 +28,10 @@ import (
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/datanode/io"
iter "github.com/milvus-io/milvus/internal/datanode/iterators"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
)
@ -80,7 +75,7 @@ func (s *LevelZeroCompactionTaskSuite) SetupTest() {
s.dBlob = blob.GetValue()
}
func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() {
func (s *LevelZeroCompactionTaskSuite) TestProcessLoadDeltaFail() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
@ -103,21 +98,17 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() {
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Twice()
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Once()
targetSegments := []int64{200}
deltaLogs := map[int64][]string{100: {"a/b/c1"}}
segments, err := s.task.linearProcess(context.Background(), targetSegments, deltaLogs)
s.Error(err)
s.Empty(segments)
segments, err = s.task.batchProcess(context.Background(), targetSegments, lo.Values(deltaLogs)...)
segments, err := s.task.process(context.Background(), 1, targetSegments, lo.Values(deltaLogs)...)
s.Error(err)
s.Empty(segments)
}
func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() {
func (s *LevelZeroCompactionTaskSuite) TestProcessUploadFail() {
plan := &datapb.CompactionPlan{
PlanID: 19530,
Type: datapb.CompactionType_Level0DeleteCompaction,
@ -140,25 +131,22 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() {
s.task.plan = plan
s.task.tr = timerecord.NewTimeRecorder("test")
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload fail")).Once()
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Once()
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Twice()
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).RunAndReturn(
func(filters ...metacache.SegmentFilter) []*metacache.SegmentInfo {
bfs1 := metacache.NewBloomFilterSetWithBatchSize(100)
bfs1.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
segment1 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 200}, bfs1)
return []*metacache.SegmentInfo{segment1}
}).Twice()
}).Once()
targetSegments := []int64{200}
deltaLogs := map[int64][]string{100: {"a/b/c1"}}
segments, err := s.task.linearProcess(context.Background(), targetSegments, deltaLogs)
s.Error(err)
s.Empty(segments)
segments, err = s.task.batchProcess(context.Background(), targetSegments, lo.Values(deltaLogs)...)
segments, err := s.task.process(context.Background(), 2, targetSegments, lo.Values(deltaLogs)...)
s.Error(err)
s.Empty(segments)
}
@ -207,20 +195,27 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
bfs2.UpdatePKRange(&storage.Int64FieldData{Data: []int64{1, 2}})
segment2 := metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 201}, bfs2)
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2)
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2})
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(1)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Twice()
times := 1
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).RunAndReturn(
func(filters ...metacache.SegmentFilter) []*metacache.SegmentInfo {
if times == 1 {
times += 1
return []*metacache.SegmentInfo{segment1}
}
if times == 2 {
times += 1
return []*metacache.SegmentInfo{segment2}
}
return []*metacache.SegmentInfo{segment1, segment2}
}).Twice()
s.mockMeta.EXPECT().Collection().Return(1)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
RunAndReturn(func(paths ...string) string {
return path.Join(paths...)
}).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()
s.Require().Equal(plan.GetPlanID(), s.task.getPlanID())
s.Require().Equal(plan.GetChannel(), s.task.getChannelName())
@ -249,7 +244,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() {
totalDeltalogs[s.GetSegmentID()] = paths
}
}
segments, err := s.task.linearProcess(context.Background(), targetSegIDs, totalDeltalogs)
segments, err := s.task.process(context.Background(), 1, targetSegIDs, lo.Values(totalDeltalogs)...)
s.NoError(err)
s.NotEmpty(segments)
s.Equal(2, len(segments))
@ -312,16 +307,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once()
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything).
RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true
})
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).
RunAndReturn(func(paths ...string) string {
return path.Join(paths...)
}).Times(2)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil).Once()
l0Segments := lo.Filter(s.task.plan.GetSegmentBinlogs(), func(s *datapb.CompactionSegmentBinlogs, _ int) bool {
@ -334,8 +320,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
}
return 0, false
})
totalDeltalogs := make(map[UniqueID][]string)
totalDeltalogs := make(map[UniqueID][]string)
for _, s := range l0Segments {
paths := []string{}
for _, d := range s.GetDeltalogs() {
@ -347,7 +333,7 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
totalDeltalogs[s.GetSegmentID()] = paths
}
}
segments, err := s.task.batchProcess(context.TODO(), targetSegIDs, lo.Values(totalDeltalogs)...)
segments, err := s.task.process(context.TODO(), 2, targetSegIDs, lo.Values(totalDeltalogs)...)
s.NoError(err)
s.NotEmpty(segments)
s.Equal(2, len(segments))
@ -359,148 +345,49 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() {
log.Info("test segment results", zap.Any("result", segments))
}
func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() {
func (s *LevelZeroCompactionTaskSuite) TestSerializeUpload() {
ctx := context.Background()
s.Run("uploadByCheck directly composeDeltalog failed", func() {
s.Run("serializeUpload allocator Alloc failed", func() {
s.SetupTest()
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Once()
s.mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc wrong"))
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
writers := map[int64]*SegmentDeltaWriter{100: writer}
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
err := s.task.uploadByCheck(ctx, false, segments, results)
result, err := s.task.serializeUpload(ctx, writers)
s.Error(err)
s.Equal(0, len(results))
s.Equal(0, len(result))
})
s.Run("uploadByCheck directly Upload failed", func() {
s.Run("serializeUpload Upload failed", func() {
s.SetupTest()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload failed"))
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
err := s.task.uploadByCheck(ctx, false, segments, results)
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
writers := map[int64]*SegmentDeltaWriter{100: writer}
results, err := s.task.serializeUpload(ctx, writers)
s.Error(err)
s.Equal(0, len(results))
})
s.Run("upload directly", func() {
s.Run("upload success", func() {
s.SetupTest()
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
err := s.task.uploadByCheck(ctx, false, segments, results)
writer := NewSegmentDeltaWriter(100, 10, 1)
writer.WriteBatch(s.dData.Pks, s.dData.Tss)
writers := map[int64]*SegmentDeltaWriter{100: writer}
results, err := s.task.serializeUpload(ctx, writers)
s.NoError(err)
s.Equal(1, len(results))
seg1, ok := results[100]
s.True(ok)
seg1 := results[0]
s.EqualValues(100, seg1.GetSegmentID())
s.Equal(1, len(seg1.GetDeltalogs()))
s.Equal(1, len(seg1.GetDeltalogs()[0].GetBinlogs()))
})
s.Run("check without upload", func() {
s.SetupTest()
segments := map[int64]*storage.DeleteData{100: s.dData}
results := make(map[int64]*datapb.CompactionSegment)
s.Require().Empty(results)
err := s.task.uploadByCheck(ctx, true, segments, results)
s.NoError(err)
s.Empty(results)
})
s.Run("check with upload", func() {
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
segments := map[int64]*storage.DeleteData{100: s.dData}
results := map[int64]*datapb.CompactionSegment{
100: {SegmentID: 100, Deltalogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{LogID: 1}}}}},
}
s.Require().Equal(1, len(results))
paramtable.Get().Save(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataNodeCfg.FlushDeleteBufferBytes.Key)
err := s.task.uploadByCheck(ctx, true, segments, results)
s.NoError(err)
s.NotEmpty(results)
s.Equal(1, len(results))
seg1, ok := results[100]
s.True(ok)
s.EqualValues(100, seg1.GetSegmentID())
s.Equal(1, len(seg1.GetDeltalogs()))
s.Equal(2, len(seg1.GetDeltalogs()[0].GetBinlogs()))
})
}
func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() {
s.mockMeta.EXPECT().Collection().Return(1)
s.mockMeta.EXPECT().
GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 100
}), mock.Anything).
Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true)
s.mockMeta.EXPECT().
GetSegmentByID(
mock.MatchedBy(func(ID int64) bool {
return ID == 101
}), mock.Anything).
Return(nil, false)
s.mockAlloc.EXPECT().AllocOne().Return(19530, nil)
blobKey := metautil.JoinIDPath(1, 10, 100, 19530)
blobPath := path.Join(common.SegmentDeltaLogPath, blobKey)
s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath)
kvs, binlog, err := s.task.composeDeltalog(100, s.dData)
s.NoError(err)
s.Equal(1, len(kvs))
v, ok := kvs[blobPath]
s.True(ok)
s.NotNil(v)
s.Equal(blobPath, binlog.LogPath)
_, _, err = s.task.composeDeltalog(101, s.dData)
s.Error(err)
}
func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
@ -516,23 +403,20 @@ func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() {
predicted := []int64{100, 101, 102}
s.mockMeta.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{segment1, segment2, segment3})
s.mockMeta.EXPECT().Collection().Return(1)
diter := iter.NewDeltalogIterator([][]byte{s.dBlob}, nil)
s.Require().NotNil(diter)
targetSegBuffer := make(map[int64]*storage.DeleteData)
targetSegIDs := predicted
s.task.splitDelta(context.TODO(), []*iter.DeltalogIterator{diter}, targetSegBuffer, targetSegIDs)
deltaWriters := s.task.splitDelta(context.TODO(), []*storage.DeleteData{s.dData}, targetSegIDs)
s.NotEmpty(targetSegBuffer)
s.ElementsMatch(predicted, lo.Keys(targetSegBuffer))
s.EqualValues(2, targetSegBuffer[100].RowCount)
s.EqualValues(1, targetSegBuffer[101].RowCount)
s.EqualValues(1, targetSegBuffer[102].RowCount)
s.NotEmpty(deltaWriters)
s.ElementsMatch(predicted, lo.Keys(deltaWriters))
s.EqualValues(2, deltaWriters[100].GetRowNum())
s.EqualValues(1, deltaWriters[101].GetRowNum())
s.EqualValues(1, deltaWriters[102].GetRowNum())
s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, targetSegBuffer[100].Pks)
s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[101].Pks[0])
s.Equal(storage.NewInt64PrimaryKey(3), targetSegBuffer[102].Pks[0])
s.ElementsMatch([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(3)}, deltaWriters[100].deleteData.Pks)
s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[101].deleteData.Pks[0])
s.Equal(storage.NewInt64PrimaryKey(3), deltaWriters[102].deleteData.Pks[0])
}
func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
@ -557,47 +441,24 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadDelta() {
description string
paths []string
expectNilIter bool
expectError bool
expectError bool
}{
{"no error", []string{"correct"}, false, false},
{"download error", []string{"error"}, true, true},
{"new iter error", []string{"invalid-blobs"}, true, false},
{"no error", []string{"correct"}, false},
{"download error", []string{"error"}, true},
{"deserialize error", []string{"invalid-blobs"}, true},
}
for _, test := range tests {
iters, err := s.task.loadDelta(ctx, test.paths)
if test.expectNilIter {
if len(iters) > 0 {
for _, iter := range iters {
s.False(iter.HasNext())
}
} else {
s.Nil(iters)
}
} else {
s.NotNil(iters)
s.Equal(1, len(iters))
s.True(iters[0].HasNext())
iter := iters[0]
var pks []storage.PrimaryKey
var tss []storage.Timestamp
for iter.HasNext() {
labeled, err := iter.Next()
s.NoError(err)
pks = append(pks, labeled.GetPk())
tss = append(tss, labeled.GetTimestamp())
}
s.ElementsMatch(pks, s.dData.Pks)
s.ElementsMatch(tss, s.dData.Tss)
}
dDatas, err := s.task.loadDelta(ctx, test.paths)
if test.expectError {
s.Error(err)
} else {
s.NoError(err)
s.NotEmpty(dDatas)
s.EqualValues(1, len(dDatas))
s.ElementsMatch(s.dData.Pks, dDatas[0].Pks)
s.Equal(s.dData.RowCount, dDatas[0].RowCount)
}
}
}

View File

@ -0,0 +1,81 @@
package datanode
import (
"math"
"github.com/milvus-io/milvus/internal/datanode/writebuffer"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
func NewSegmentDeltaWriter(segmentID, partitionID, collectionID int64) *SegmentDeltaWriter {
return &SegmentDeltaWriter{
deleteData: &storage.DeleteData{},
segmentID: segmentID,
partitionID: partitionID,
collectionID: collectionID,
tsFrom: math.MaxUint64,
tsTo: 0,
}
}
type SegmentDeltaWriter struct {
deleteData *storage.DeleteData
segmentID int64
partitionID int64
collectionID int64
tsFrom typeutil.Timestamp
tsTo typeutil.Timestamp
}
func (w *SegmentDeltaWriter) GetCollectionID() int64 {
return w.collectionID
}
func (w *SegmentDeltaWriter) GetPartitionID() int64 {
return w.partitionID
}
func (w *SegmentDeltaWriter) GetSegmentID() int64 {
return w.segmentID
}
func (w *SegmentDeltaWriter) GetRowNum() int64 {
return w.deleteData.RowCount
}
func (w *SegmentDeltaWriter) GetTimeRange() *writebuffer.TimeRange {
return writebuffer.NewTimeRange(w.tsFrom, w.tsTo)
}
func (w *SegmentDeltaWriter) updateRange(ts typeutil.Timestamp) {
if ts < w.tsFrom {
w.tsFrom = ts
}
if ts > w.tsTo {
w.tsTo = ts
}
}
func (w *SegmentDeltaWriter) Write(pk storage.PrimaryKey, ts typeutil.Timestamp) {
w.deleteData.Append(pk, ts)
w.updateRange(ts)
}
func (w *SegmentDeltaWriter) WriteBatch(pks []storage.PrimaryKey, tss []typeutil.Timestamp) {
w.deleteData.AppendBatch(pks, tss)
for _, ts := range tss {
w.updateRange(ts)
}
}
func (w *SegmentDeltaWriter) Finish() (*storage.Blob, *writebuffer.TimeRange, error) {
blob, err := storage.NewDeleteCodec().Serialize(w.collectionID, w.partitionID, w.segmentID, w.deleteData)
if err != nil {
return nil, nil, err
}
return blob, w.GetTimeRange(), nil
}

View File

@ -85,6 +85,21 @@ func (tr *TimeRange) Merge(other *TimeRange) {
}
}
func (tr *TimeRange) GetMinTimestamp() typeutil.Timestamp {
return tr.timestampMin
}
func (tr *TimeRange) GetMaxTimestamp() typeutil.Timestamp {
return tr.timestampMax
}
func NewTimeRange(tsFrom, tsTo typeutil.Timestamp) *TimeRange {
return &TimeRange{
timestampMin: tsFrom,
timestampMax: tsTo,
}
}
func getEarliestCheckpoint(cps ...*msgpb.MsgPosition) *msgpb.MsgPosition {
var result *msgpb.MsgPosition
for _, cp := range cps {

View File

@ -142,7 +142,7 @@ func DecompressBinLog(binlogType storage.BinlogType, collectionID, partitionID,
for _, fieldBinlog := range fieldBinlogs {
for _, binlog := range fieldBinlog.Binlogs {
if binlog.GetLogPath() == "" {
path, err := buildLogPath(binlogType, collectionID, partitionID,
path, err := BuildLogPath(binlogType, collectionID, partitionID,
segmentID, fieldBinlog.GetFieldID(), binlog.GetLogID())
if err != nil {
return err
@ -155,7 +155,7 @@ func DecompressBinLog(binlogType storage.BinlogType, collectionID, partitionID,
}
// build a binlog path on the storage by metadata
func buildLogPath(binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) {
func BuildLogPath(binlogType storage.BinlogType, collectionID, partitionID, segmentID, fieldID, logID typeutil.UniqueID) (string, error) {
chunkManagerRootPath := paramtable.Get().MinioCfg.RootPath.GetValue()
if paramtable.Get().CommonCfg.StorageType.GetValue() == "local" {
chunkManagerRootPath = paramtable.Get().LocalStorageCfg.Path.GetValue()