fix: Donot set LogPath when executing compaction (#30537)

Compaction would copy logPaths from comapctFrom segA to compactTo segB,
and previous code would copy the logPath directly, causing there're
full-logPaths-of-segA in compactTo segB's meta. So, for the next
compaction of segB, if segA has been GCed, Download would report error
"The sperified key not found".

This PR makes sure compactTo segment's meta contains logID only. And
this PR also refines CompleteComapctionMutation, increasing some
readability and merge two methods into one.

See also: #30496

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/30372/head
XuanYang-cn 2024-02-28 19:03:01 +08:00 committed by GitHub
parent 4082315bd0
commit cdc5ce5d6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 220 additions and 300 deletions

View File

@ -558,9 +558,9 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
s.Equal(failed, task.state)
}
func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog {
func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: id,
FieldID: fieldID,
Binlogs: make([]*datapb.Binlog, 0, len(logIDs)),
}
for _, id := range logIDs {
@ -569,9 +569,9 @@ func getFieldBinlogIDs(id int64, logIDs ...int64) *datapb.FieldBinlog {
return l
}
func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog {
func getFieldBinlogPaths(fieldID int64, paths ...string) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: id,
FieldID: fieldID,
Binlogs: make([]*datapb.Binlog, 0, len(paths)),
}
for _, path := range paths {
@ -580,9 +580,9 @@ func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog {
return l
}
func getFieldBinlogIDsWithEntry(id int64, entry int64, logIDs ...int64) *datapb.FieldBinlog {
func getFieldBinlogIDsWithEntry(fieldID int64, entry int64, logIDs ...int64) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: id,
FieldID: fieldID,
Binlogs: make([]*datapb.Binlog, 0, len(logIDs)),
}
for _, id := range logIDs {

View File

@ -21,7 +21,6 @@ import (
"context"
"fmt"
"math"
"path"
"sync"
"time"
@ -35,7 +34,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
@ -44,6 +42,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
@ -978,225 +977,163 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
m.segments.SetIsCompacting(segmentID, compacting)
}
// CompleteCompactionMutation completes compaction mutation.
func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan,
result *datapb.CompactionPlanResult,
) ([]*SegmentInfo, *segMetricMutation, error) {
func (m *meta) CompleteCompactionMutation(plan *datapb.CompactionPlan, result *datapb.CompactionPlanResult) ([]*SegmentInfo, *segMetricMutation, error) {
m.Lock()
defer m.Unlock()
modSegments, segments, metricMutation, err := m.prepareCompactionMutation(plan, result)
if err != nil {
log.Warn("fail to prepare for complete compaction mutation", zap.Error(err), zap.Int64("planID", plan.GetPlanID()))
return nil, nil, err
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.String("type", plan.GetType().String()))
metricMutation := &segMetricMutation{stateChange: make(map[string]map[string]int)}
var compactFromSegIDs []int64
var latestCompactFromSegments []*SegmentInfo
for _, segmentBinlogs := range plan.GetSegmentBinlogs() {
segment := m.segments.GetSegment(segmentBinlogs.GetSegmentID())
if segment == nil {
return nil, nil, merr.WrapErrSegmentNotFound(segmentBinlogs.GetSegmentID())
}
cloned := segment.Clone()
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
latestCompactFromSegments = append(latestCompactFromSegments, cloned)
compactFromSegIDs = append(compactFromSegIDs, cloned.GetID())
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
}
if err := m.alterMetaStoreAfterCompaction(segments, modSegments); err != nil {
newSegIDs := lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
log.Warn("fail to alert meta store", zap.Error(err), zap.Int64s("segmentIDs", newSegIDs), zap.Int64("planID", plan.GetPlanID()))
return nil, nil, err
}
return segments, metricMutation, err
}
// prepareCompactionMutation returns
// - the segment info of compactedFrom segments after compaction to alter
// - the segment info of compactedTo segment after compaction to add
// The compactedTo segment could contain 0 numRows
// TODO: too complicated
// TODO: support Major compaction
func (m *meta) prepareCompactionMutation(plan *datapb.CompactionPlan,
result *datapb.CompactionPlanResult,
) ([]*SegmentInfo, []*SegmentInfo, *segMetricMutation, error) {
log.Info("meta update: prepare for complete compaction mutation")
compactionLogs := plan.GetSegmentBinlogs()
modSegments := make([]*SegmentInfo, 0, len(compactionLogs))
metricMutation := &segMetricMutation{
stateChange: make(map[string]map[string]int),
}
for _, cl := range compactionLogs {
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
cloned := segment.Clone()
err := binlog.DecompressBinLog(storage.DeleteBinlog, cloned.GetCollectionID(), cloned.GetPartitionID(), cloned.GetID(), cloned.GetDeltalogs())
if err != nil {
return nil, nil, nil, err
logIDsFromPlan := make(map[int64]struct{})
for _, segBinlogs := range plan.GetSegmentBinlogs() {
for _, fieldBinlog := range segBinlogs.GetDeltalogs() {
for _, binlog := range fieldBinlog.GetBinlogs() {
logIDsFromPlan[binlog.GetLogID()] = struct{}{}
}
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
cloned.DroppedAt = uint64(time.Now().UnixNano())
cloned.Compacted = true
modSegments = append(modSegments, cloned)
}
}
var startPosition, dmlPosition *msgpb.MsgPosition
for _, s := range modSegments {
if dmlPosition == nil ||
s.GetDmlPosition() != nil && s.GetDmlPosition().GetTimestamp() < dmlPosition.GetTimestamp() {
dmlPosition = s.GetDmlPosition()
}
if startPosition == nil ||
s.GetStartPosition() != nil && s.GetStartPosition().GetTimestamp() < startPosition.GetTimestamp() {
startPosition = s.GetStartPosition()
}
}
// find new added delta logs when executing compaction
// TODO: won't be needed when enable L0 Segment
var originDeltalogs []*datapb.FieldBinlog
for _, s := range modSegments {
originDeltalogs = append(originDeltalogs, s.GetDeltalogs()...)
}
var deletedDeltalogs []*datapb.FieldBinlog
for _, l := range compactionLogs {
deletedDeltalogs = append(deletedDeltalogs, l.GetDeltalogs()...)
}
// MixCompaction / MergeCompaction will generates one and only one segment
compactToSegment := result.GetSegments()[0]
newAddedDeltalogs := updateDeltalogs(originDeltalogs, deletedDeltalogs)
copiedDeltalogs, err := m.copyDeltaFiles(newAddedDeltalogs, modSegments[0].CollectionID, modSegments[0].PartitionID, compactToSegment.GetSegmentID())
// copy new deltalogs in compactFrom segments to compactTo segments.
// TODO: Not needed when enable L0 segments.
newDeltalogs, err := m.copyNewDeltalogs(latestCompactFromSegments, logIDsFromPlan, compactToSegment.GetSegmentID())
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
deltalogs := append(compactToSegment.GetDeltalogs(), copiedDeltalogs...)
compactionFrom := make([]UniqueID, 0, len(modSegments))
for _, s := range modSegments {
compactionFrom = append(compactionFrom, s.GetID())
if len(newDeltalogs) > 0 {
compactToSegment.Deltalogs = append(compactToSegment.GetDeltalogs(), &datapb.FieldBinlog{Binlogs: newDeltalogs})
}
segmentInfo := &datapb.SegmentInfo{
ID: compactToSegment.GetSegmentID(),
CollectionID: modSegments[0].CollectionID,
PartitionID: modSegments[0].PartitionID,
InsertChannel: modSegments[0].InsertChannel,
NumOfRows: compactToSegment.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: modSegments[0].MaxRowNum,
Binlogs: compactToSegment.GetInsertLogs(),
Statslogs: compactToSegment.GetField2StatslogPaths(),
Deltalogs: deltalogs,
StartPosition: startPosition,
DmlPosition: dmlPosition,
CreatedByCompaction: true,
CompactionFrom: compactionFrom,
LastExpireTime: plan.GetStartTime(),
Level: datapb.SegmentLevel_L1,
getMinPosition := func(positions []*msgpb.MsgPosition) *msgpb.MsgPosition {
var minPos *msgpb.MsgPosition
for _, pos := range positions {
if minPos == nil ||
pos != nil && pos.GetTimestamp() < minPos.GetTimestamp() {
minPos = pos
}
}
return minPos
}
segment := NewSegmentInfo(segmentInfo)
compactToSegmentInfo := NewSegmentInfo(
&datapb.SegmentInfo{
ID: compactToSegment.GetSegmentID(),
CollectionID: latestCompactFromSegments[0].CollectionID,
PartitionID: latestCompactFromSegments[0].PartitionID,
InsertChannel: plan.GetChannel(),
NumOfRows: compactToSegment.NumOfRows,
State: commonpb.SegmentState_Flushed,
MaxRowNum: latestCompactFromSegments[0].MaxRowNum,
Binlogs: compactToSegment.GetInsertLogs(),
Statslogs: compactToSegment.GetField2StatslogPaths(),
Deltalogs: compactToSegment.GetDeltalogs(),
CreatedByCompaction: true,
CompactionFrom: compactFromSegIDs,
LastExpireTime: plan.GetStartTime(),
Level: datapb.SegmentLevel_L1,
StartPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetStartPosition()
})),
DmlPosition: getMinPosition(lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *msgpb.MsgPosition {
return info.GetDmlPosition()
})),
})
// L1 segment with NumRows=0 will be discarded, so no need to change the metric
if segmentInfo.GetNumOfRows() > 0 {
metricMutation.addNewSeg(segment.GetState(), segment.GetLevel(), segment.GetNumOfRows())
if compactToSegmentInfo.GetNumOfRows() > 0 {
// metrics mutation for compactTo segments
metricMutation.addNewSeg(compactToSegmentInfo.GetState(), compactToSegmentInfo.GetLevel(), compactToSegmentInfo.GetNumOfRows())
} else {
compactToSegmentInfo.State = commonpb.SegmentState_Dropped
}
log.Info("meta update: prepare for complete compaction mutation - complete",
zap.Int64("collectionID", segment.GetCollectionID()),
zap.Int64("partitionID", segment.GetPartitionID()),
zap.Int64("new segment ID", segment.GetID()),
zap.String("new segment level", segment.GetLevel().String()),
zap.Int64("new segment num of rows", segment.GetNumOfRows()),
zap.Any("compacted from", segment.GetCompactionFrom()))
log = log.With(
zap.String("channel", plan.GetChannel()),
zap.Int64("partitionID", compactToSegmentInfo.GetPartitionID()),
zap.Int64("compactTo segmentID", compactToSegmentInfo.GetID()),
zap.Int64("compactTo segment numRows", compactToSegmentInfo.GetNumOfRows()),
zap.Any("compactFrom segments(to be updated as dropped)", compactFromSegIDs),
)
return modSegments, []*SegmentInfo{segment}, metricMutation, nil
}
func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, partitionID, targetSegmentID int64) ([]*datapb.FieldBinlog, error) {
ret := make([]*datapb.FieldBinlog, 0, len(binlogs))
for _, fieldBinlog := range binlogs {
fieldBinlog = proto.Clone(fieldBinlog).(*datapb.FieldBinlog)
for _, binlog := range fieldBinlog.Binlogs {
blobKey := metautil.JoinIDPath(collectionID, partitionID, targetSegmentID, binlog.LogID)
blobPath := path.Join(m.chunkManager.RootPath(), common.SegmentDeltaLogPath, blobKey)
blob, err := m.chunkManager.Read(m.ctx, binlog.LogPath)
if err != nil {
return nil, err
}
err = m.chunkManager.Write(m.ctx, blobPath, blob)
if err != nil {
return nil, err
}
}
ret = append(ret, fieldBinlog)
}
return ret, nil
}
func (m *meta) alterMetaStoreAfterCompaction(segmentsCompactTo []*SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom))
for _, segment := range segmentsCompactFrom {
modInfos = append(modInfos, segment.SegmentInfo)
}
newSegments := make([]*datapb.SegmentInfo, len(segmentsCompactTo))
binlogsIncrements := make([]metastore.BinlogsIncrement, len(segmentsCompactTo))
for i, seg := range segmentsCompactTo {
newSegment := seg.SegmentInfo
if newSegment.GetNumOfRows() == 0 {
newSegment.State = commonpb.SegmentState_Dropped
}
newSegments[i] = newSegment
binlogsIncrements[i] = metastore.BinlogsIncrement{
Segment: newSegment,
}
}
modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
newSegIDs := lo.Map(newSegments, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
log.Debug("meta update: prepare for meta mutation - complete")
compactFromInfos := lo.Map(latestCompactFromSegments, func(info *SegmentInfo, _ int) *datapb.SegmentInfo {
return info.SegmentInfo
})
log.Debug("meta update: alter meta store for compaction updates",
zap.Int64s("compact from segment IDs", modSegIDs),
zap.Int64s("compact to segment IDs", newSegIDs))
err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegments...), binlogsIncrements...)
if err != nil {
zap.Int("binlog count", len(compactToSegmentInfo.GetBinlogs())),
zap.Int("statslog count", len(compactToSegmentInfo.GetStatslogs())),
zap.Int("deltalog count", len(compactToSegmentInfo.GetDeltalogs())),
)
if err := m.catalog.AlterSegments(m.ctx, append(compactFromInfos, compactToSegmentInfo.SegmentInfo),
metastore.BinlogsIncrement{Segment: compactToSegmentInfo.SegmentInfo},
); err != nil {
log.Warn("fail to alter segments and new segment", zap.Error(err))
return err
return nil, nil, err
}
for _, s := range segmentsCompactFrom {
m.segments.SetSegment(s.GetID(), s)
}
for _, s := range segmentsCompactTo {
m.segments.SetSegment(s.GetID(), s)
}
lo.ForEach(latestCompactFromSegments, func(info *SegmentInfo, _ int) {
m.segments.SetSegment(info.GetID(), info)
})
m.segments.SetSegment(compactToSegmentInfo.GetID(), compactToSegmentInfo)
log.Info("meta update: alter in memory meta after compaction - complete",
zap.Int64s("compact from segment IDs", modSegIDs),
zap.Int64s("compact to segment IDs", newSegIDs))
return nil
log.Info("meta update: alter in memory meta after compaction - complete")
return []*SegmentInfo{compactToSegmentInfo}, metricMutation, nil
}
func updateDeltalogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog) []*datapb.FieldBinlog {
res := make([]*datapb.FieldBinlog, 0, len(origin))
for _, fbl := range origin {
logs := make(map[int64]*datapb.Binlog)
for _, d := range fbl.GetBinlogs() {
logs[d.GetLogID()] = d
}
for _, remove := range removes {
if remove.GetFieldID() == fbl.GetFieldID() {
for _, r := range remove.GetBinlogs() {
delete(logs, r.GetLogID())
func (m *meta) copyNewDeltalogs(latestCompactFromInfos []*SegmentInfo, logIDsInPlan map[int64]struct{}, toSegment int64) ([]*datapb.Binlog, error) {
newBinlogs := []*datapb.Binlog{}
for _, seg := range latestCompactFromInfos {
for _, fieldLog := range seg.GetDeltalogs() {
for _, l := range fieldLog.GetBinlogs() {
if _, ok := logIDsInPlan[l.GetLogID()]; !ok {
fromKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, seg.ID, l.GetLogID())
toKey := metautil.BuildDeltaLogPath(m.chunkManager.RootPath(), seg.CollectionID, seg.PartitionID, toSegment, l.GetLogID())
log.Warn("found new deltalog in compactFrom segment, copying it...",
zap.Any("deltalog", l),
zap.Int64("copyFrom segmentID", seg.GetID()),
zap.Int64("copyTo segmentID", toSegment),
zap.String("copyFrom key", fromKey),
zap.String("copyTo key", toKey),
)
blob, err := m.chunkManager.Read(m.ctx, fromKey)
if err != nil {
return nil, err
}
if err := m.chunkManager.Write(m.ctx, toKey, blob); err != nil {
return nil, err
}
newBinlogs = append(newBinlogs, l)
}
}
}
binlogs := make([]*datapb.Binlog, 0, len(logs))
for _, l := range logs {
binlogs = append(binlogs, l)
}
if len(binlogs) > 0 {
res = append(res, &datapb.FieldBinlog{
FieldID: fbl.GetFieldID(),
Binlogs: binlogs,
})
}
}
return res
return newBinlogs, nil
}
// buildSegment utility function for compose datapb.SegmentInfo struct with provided info

View File

@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
@ -197,97 +196,129 @@ func (suite *MetaBasicSuite) TestCollection() {
suite.MetricsEqual(metrics.DataCoordNumCollections.WithLabelValues(), 1)
}
func (suite *MetaBasicSuite) TestPrepareCompleteCompactionMutation() {
prepareSegments := &SegmentsInfo{
func (suite *MetaBasicSuite) TestCompleteCompactionMutation() {
latestSegments := &SegmentsInfo{
map[UniqueID]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)},
NumOfRows: 1,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 10000, 10001)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 20000, 20001)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 30000), getFieldBinlogIDs(0, 30001)},
NumOfRows: 2,
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 100,
PartitionID: 10,
State: commonpb.SegmentState_Flushed,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)},
NumOfRows: 1,
Level: datapb.SegmentLevel_L1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 11000)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 21000)},
// latest segment has 2 deltalogs, one submit for compaction, one is appended before compaction done
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 31000), getFieldBinlogIDs(0, 31001)},
NumOfRows: 2,
}},
},
}
// m := suite.meta
mockChMgr := mocks.NewChunkManager(suite.T())
mockChMgr.EXPECT().RootPath().Return("mockroot").Times(4)
mockChMgr.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, nil).Twice()
mockChMgr.EXPECT().Write(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: prepareSegments,
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: latestSegments,
chunkManager: mockChMgr,
}
plan := &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 1,
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 1, 2)},
FieldBinlogs: m.GetSegment(1).GetBinlogs(),
Field2StatslogPaths: m.GetSegment(1).GetStatslogs(),
Deltalogs: m.GetSegment(1).GetDeltalogs()[:1], // compaction plan use only 1 deltalog
},
{
SegmentID: 2,
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 3, 4)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 3, 4)},
FieldBinlogs: m.GetSegment(2).GetBinlogs(),
Field2StatslogPaths: m.GetSegment(2).GetStatslogs(),
Deltalogs: m.GetSegment(2).GetDeltalogs()[:1], // compaction plan use only 1 deltalog
},
},
StartTime: 15,
}
inSegment := &datapb.CompactionSegment{
compactToSeg := &datapb.CompactionSegment{
SegmentID: 3,
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 5)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 5)},
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50000)},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogIDs(0, 50001)},
NumOfRows: 2,
}
inCompactionResult := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{inSegment},
result := &datapb.CompactionPlanResult{
Segments: []*datapb.CompactionSegment{compactToSeg},
}
afterCompact, newSegment, metricMutation, err := m.prepareCompactionMutation(plan, inCompactionResult)
infos, mutation, err := m.CompleteCompactionMutation(plan, result)
suite.Equal(1, len(infos))
info := infos[0]
suite.NoError(err)
suite.NotNil(afterCompact)
suite.NotNil(newSegment)
suite.Equal(2, len(metricMutation.stateChange[datapb.SegmentLevel_Legacy.String()]))
suite.Equal(1, len(metricMutation.stateChange[datapb.SegmentLevel_L1.String()]))
suite.Equal(int64(0), metricMutation.rowCountChange)
suite.Equal(int64(2), metricMutation.rowCountAccChange)
suite.NotNil(info)
suite.NotNil(mutation)
suite.Require().Equal(2, len(afterCompact))
suite.Equal(commonpb.SegmentState_Dropped, afterCompact[0].GetState())
suite.Equal(commonpb.SegmentState_Dropped, afterCompact[1].GetState())
suite.NotZero(afterCompact[0].GetDroppedAt())
suite.NotZero(afterCompact[1].GetDroppedAt())
// check newSegment
suite.EqualValues(3, info.GetID())
suite.Equal(datapb.SegmentLevel_L1, info.GetLevel())
suite.Equal(commonpb.SegmentState_Flushed, info.GetState())
suite.Equal(inSegment.SegmentID, newSegment[0].GetID())
suite.Equal(UniqueID(100), newSegment[0].GetCollectionID())
suite.Equal(UniqueID(10), newSegment[0].GetPartitionID())
suite.Equal(inSegment.NumOfRows, newSegment[0].GetNumOfRows())
suite.Equal(commonpb.SegmentState_Flushed, newSegment[0].GetState())
binlogs := info.GetBinlogs()
for _, fbinlog := range binlogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
suite.EqualValues(50000, blog.GetLogID())
}
}
suite.EqualValues(inSegment.GetInsertLogs(), newSegment[0].GetBinlogs())
suite.EqualValues(inSegment.GetField2StatslogPaths(), newSegment[0].GetStatslogs())
suite.EqualValues(inSegment.GetDeltalogs(), newSegment[0].GetDeltalogs())
suite.NotZero(newSegment[0].lastFlushTime)
suite.Equal(uint64(15), newSegment[0].GetLastExpireTime())
statslogs := info.GetStatslogs()
for _, fbinlog := range statslogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
suite.EqualValues(50001, blog.GetLogID())
}
}
segmentsDone, metricMutationDone, err := m.CompleteCompactionMutation(plan, inCompactionResult)
suite.NoError(err)
suite.NotNil(segmentsDone)
suite.NotNil(metricMutationDone)
deltalogs := info.GetDeltalogs()
deltalogIDs := []int64{}
for _, fbinlog := range deltalogs {
for _, blog := range fbinlog.GetBinlogs() {
suite.Empty(blog.GetLogPath())
deltalogIDs = append(deltalogIDs, blog.GetLogID())
}
}
suite.ElementsMatch([]int64{30001, 31001}, deltalogIDs)
// check compactFrom segments
for _, segID := range []int64{1, 2} {
seg := m.GetSegment(segID)
suite.Equal(commonpb.SegmentState_Dropped, seg.GetState())
suite.NotEmpty(seg.GetDroppedAt())
suite.EqualValues(segID, seg.GetID())
suite.ElementsMatch(latestSegments.segments[segID].GetBinlogs(), seg.GetBinlogs())
suite.ElementsMatch(latestSegments.segments[segID].GetStatslogs(), seg.GetStatslogs())
suite.ElementsMatch(latestSegments.segments[segID].GetDeltalogs(), seg.GetDeltalogs())
}
// check mutation metrics
suite.Equal(2, len(mutation.stateChange[datapb.SegmentLevel_L1.String()]))
suite.EqualValues(-2, mutation.rowCountChange)
suite.EqualValues(2, mutation.rowCountAccChange)
}
func TestMeta(t *testing.T) {
@ -731,59 +762,6 @@ func TestUpdateSegmentsInfo(t *testing.T) {
})
}
func TestMeta_alterMetaStore(t *testing.T) {
toAlter := []*datapb.SegmentInfo{
{
CollectionID: 100,
PartitionID: 10,
ID: 1,
NumOfRows: 10,
},
}
newSeg := &datapb.SegmentInfo{
Binlogs: []*datapb.FieldBinlog{
{
FieldID: 101,
Binlogs: []*datapb.Binlog{},
},
},
Statslogs: []*datapb.FieldBinlog{
{
FieldID: 101,
Binlogs: []*datapb.Binlog{},
},
},
Deltalogs: []*datapb.FieldBinlog{
{
FieldID: 101,
Binlogs: []*datapb.Binlog{},
},
},
CollectionID: 100,
PartitionID: 10,
ID: 2,
NumOfRows: 15,
}
m := &meta{
catalog: &datacoord.Catalog{MetaKv: NewMetaMemoryKV()},
segments: &SegmentsInfo{map[int64]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogIDs(1, 1, 2)},
}},
}},
}
err := m.alterMetaStoreAfterCompaction([]*SegmentInfo{{SegmentInfo: newSeg}}, lo.Map(toAlter, func(t *datapb.SegmentInfo, _ int) *SegmentInfo {
return &SegmentInfo{SegmentInfo: t}
}))
assert.NoError(t, err)
}
func Test_meta_SetSegmentCompacting(t *testing.T) {
type fields struct {
client kv.MetaKv

View File

@ -540,6 +540,11 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
log.Error("save binlog and checkpoints failed", zap.Error(err))
return merr.Status(err), nil
}
log.Info("SaveBinlogPaths sync segment with meta",
zap.Any("binlogs", req.GetField2BinlogPaths()),
zap.Any("deltalogs", req.GetDeltalogs()),
zap.Any("statslogs", req.GetField2StatslogPaths()),
)
if req.GetSegLevel() == datapb.SegmentLevel_L0 {
metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths()))