mirror of https://github.com/milvus-io/milvus.git
Make memory and etcd data as consistent as possible (#20683)
Signed-off-by: SimFG <bang.fu@zilliz.com> Signed-off-by: SimFG <bang.fu@zilliz.com>pull/20807/head
parent
b834cce8c3
commit
6d9d24b4ca
|
@ -256,13 +256,8 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
}
|
||||
log := log.With(zap.Int64("planID", plan.GetPlanID()))
|
||||
|
||||
modInfos := make([]*datapb.SegmentInfo, len(modSegments))
|
||||
for i := range modSegments {
|
||||
modInfos[i] = modSegments[i].SegmentInfo
|
||||
}
|
||||
|
||||
log.Info("handleCompactionResult: altering metastore after compaction")
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(modInfos, newSegment.SegmentInfo); err != nil {
|
||||
if err := c.meta.alterMetaStoreAfterCompaction(modSegments, newSegment); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to alter metastore after compaction", zap.Error(err))
|
||||
return fmt.Errorf("fail to alter metastore after compaction, err=%w", err)
|
||||
}
|
||||
|
@ -280,12 +275,11 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
|
|||
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
|
||||
log.Warn("handleCompactionResult: fail to sync segments with node, reverting metastore",
|
||||
zap.Int64("nodeID", nodeID), zap.String("reason", err.Error()))
|
||||
return c.meta.revertAlterMetaStoreAfterCompaction(oldSegments, newSegment.SegmentInfo)
|
||||
return c.meta.revertAlterMetaStoreAfterCompaction(oldSegments, newSegment)
|
||||
}
|
||||
// Apply metrics after successful meta update.
|
||||
metricMutation.commit()
|
||||
|
||||
c.meta.alterInMemoryMetaAfterCompaction(newSegment, modSegments)
|
||||
log.Info("handleCompactionResult: success to handle merge compaction result")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -42,6 +42,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -898,13 +899,13 @@ func (m *meta) SetSegmentCompacting(segmentID UniqueID, compacting bool) {
|
|||
// - the segment info of compactedTo segment after compaction to add
|
||||
// The compactedTo segment could contain 0 numRows
|
||||
func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.CompactionSegmentBinlogs,
|
||||
result *datapb.CompactionResult) ([]*datapb.SegmentInfo, []*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
|
||||
result *datapb.CompactionResult) ([]*SegmentInfo, []*SegmentInfo, *SegmentInfo, *segMetricMutation, error) {
|
||||
log.Info("meta update: prepare for complete compaction mutation")
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
var (
|
||||
oldSegments = make([]*datapb.SegmentInfo, 0, len(compactionLogs))
|
||||
oldSegments = make([]*SegmentInfo, 0, len(compactionLogs))
|
||||
modSegments = make([]*SegmentInfo, 0, len(compactionLogs))
|
||||
)
|
||||
|
||||
|
@ -913,7 +914,7 @@ func (m *meta) PrepareCompleteCompactionMutation(compactionLogs []*datapb.Compac
|
|||
}
|
||||
for _, cl := range compactionLogs {
|
||||
if segment := m.segments.GetSegment(cl.GetSegmentID()); segment != nil {
|
||||
oldSegments = append(oldSegments, segment.Clone().SegmentInfo)
|
||||
oldSegments = append(oldSegments, segment.Clone())
|
||||
|
||||
cloned := segment.Clone()
|
||||
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)
|
||||
|
@ -1008,7 +1009,7 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
|||
return ret, nil
|
||||
}
|
||||
|
||||
func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo, newSegment *datapb.SegmentInfo) error {
|
||||
func (m *meta) alterMetaStoreAfterCompaction(modSegments []*SegmentInfo, newSegment *SegmentInfo) error {
|
||||
var modSegIDs []int64
|
||||
for _, seg := range modSegments {
|
||||
modSegIDs = append(modSegIDs, seg.GetID())
|
||||
|
@ -1016,41 +1017,56 @@ func (m *meta) alterMetaStoreAfterCompaction(modSegments []*datapb.SegmentInfo,
|
|||
log.Info("meta update: alter meta store for compaction updates",
|
||||
zap.Int64s("compact from segments (segments to be updated as dropped)", modSegIDs),
|
||||
zap.Int64("compact to segment", newSegment.GetID()))
|
||||
return m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modSegments, newSegment)
|
||||
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
modInfos := lo.Map[*SegmentInfo, *datapb.SegmentInfo](modSegments, func(item *SegmentInfo, _ int) *datapb.SegmentInfo {
|
||||
return item.SegmentInfo
|
||||
})
|
||||
|
||||
if err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modInfos, newSegment.SegmentInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range modSegments {
|
||||
m.segments.SetSegment(s.GetID(), s)
|
||||
}
|
||||
|
||||
if newSegment.GetNumOfRows() > 0 {
|
||||
m.segments.SetSegment(newSegment.GetID(), newSegment)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*datapb.SegmentInfo, removalSegment *datapb.SegmentInfo) error {
|
||||
func (m *meta) revertAlterMetaStoreAfterCompaction(oldSegments []*SegmentInfo, removalSegment *SegmentInfo) error {
|
||||
log.Info("meta update: revert metastore after compaction failure",
|
||||
zap.Int64("collectionID", removalSegment.CollectionID),
|
||||
zap.Int64("partitionID", removalSegment.PartitionID),
|
||||
zap.Int64("compactedTo (segment to remove)", removalSegment.ID),
|
||||
zap.Int64s("compactedFrom (segments to add back)", removalSegment.GetCompactionFrom()),
|
||||
)
|
||||
return m.catalog.RevertAlterSegmentsAndAddNewSegment(m.ctx, oldSegments, removalSegment)
|
||||
}
|
||||
|
||||
func (m *meta) alterInMemoryMetaAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) {
|
||||
var compactFromIDs []int64
|
||||
for _, v := range segmentsCompactFrom {
|
||||
compactFromIDs = append(compactFromIDs, v.GetID())
|
||||
}
|
||||
log.Info("meta update: alter in memory meta after compaction",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
for _, s := range segmentsCompactFrom {
|
||||
oldSegmentInfos := lo.Map[*SegmentInfo, *datapb.SegmentInfo](oldSegments, func(item *SegmentInfo, _ int) *datapb.SegmentInfo {
|
||||
return item.SegmentInfo
|
||||
})
|
||||
|
||||
if err := m.catalog.RevertAlterSegmentsAndAddNewSegment(m.ctx, oldSegmentInfos, removalSegment.SegmentInfo); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, s := range oldSegments {
|
||||
m.segments.SetSegment(s.GetID(), s)
|
||||
}
|
||||
|
||||
// Handle empty segment generated by merge-compaction
|
||||
if segmentCompactTo.GetNumOfRows() > 0 {
|
||||
m.segments.SetSegment(segmentCompactTo.GetID(), segmentCompactTo)
|
||||
if removalSegment.GetNumOfRows() > 0 {
|
||||
m.segments.DropSegment(removalSegment.GetID())
|
||||
}
|
||||
log.Info("meta update: alter in memory meta after compaction - complete",
|
||||
zap.Int64("compact to segment ID", segmentCompactTo.GetID()),
|
||||
zap.Int64s("compact from segment IDs", compactFromIDs))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *meta) updateBinlogs(origin []*datapb.FieldBinlog, removes []*datapb.FieldBinlog, adds []*datapb.FieldBinlog) []*datapb.FieldBinlog {
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
"github.com/milvus-io/milvus/internal/util"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -690,50 +691,17 @@ func TestMeta_alterMetaStore(t *testing.T) {
|
|||
}},
|
||||
}
|
||||
|
||||
err := m.alterMetaStoreAfterCompaction(toAlter, newSeg)
|
||||
toAlterInfo := lo.Map[*datapb.SegmentInfo, *SegmentInfo](toAlter, func(item *datapb.SegmentInfo, _ int) *SegmentInfo {
|
||||
return &SegmentInfo{SegmentInfo: item}
|
||||
})
|
||||
|
||||
err := m.alterMetaStoreAfterCompaction(toAlterInfo, &SegmentInfo{SegmentInfo: newSeg})
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = m.revertAlterMetaStoreAfterCompaction(toAlter, newSeg)
|
||||
err = m.revertAlterMetaStoreAfterCompaction(toAlterInfo, &SegmentInfo{SegmentInfo: newSeg})
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMeta_alterInMemoryMetaAfterCompaction(t *testing.T) {
|
||||
m := &meta{
|
||||
catalog: &datacoord.Catalog{Txn: memkv.NewMemoryKV()},
|
||||
segments: &SegmentsInfo{make(map[UniqueID]*SegmentInfo)},
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
compactToSeg *SegmentInfo
|
||||
}{
|
||||
{
|
||||
"numRows>0", &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
NumOfRows: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"numRows=0", &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
compactFrom := []*SegmentInfo{{}, {}}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
m.alterInMemoryMetaAfterCompaction(test.compactToSeg, compactFrom)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestMeta_PrepareCompleteCompactionMutation(t *testing.T) {
|
||||
prepareSegments := &SegmentsInfo{
|
||||
map[UniqueID]*SegmentInfo{
|
||||
|
|
Loading…
Reference in New Issue