mirror of https://github.com/milvus-io/milvus.git
Fix segment meta only record last flushed binlog (#25707)
Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/25711/head
parent
224515eaa3
commit
bccdef1ad7
|
@ -590,10 +590,7 @@ func (m *meta) UpdateFlushSegmentsInfo(
|
|||
}
|
||||
if err := m.catalog.AlterSegments(m.ctx, segments,
|
||||
metastore.BinlogsIncrement{
|
||||
Segment: clonedSegment.SegmentInfo,
|
||||
Insertlogs: binlogs,
|
||||
Statslogs: statslogs,
|
||||
Deltalogs: deltalogs,
|
||||
Segment: clonedSegment.SegmentInfo,
|
||||
}); err != nil {
|
||||
log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd",
|
||||
zap.Error(err))
|
||||
|
@ -1107,10 +1104,7 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm
|
|||
zap.Int64("compact to segment", newSegment.GetID()))
|
||||
|
||||
err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegment), metastore.BinlogsIncrement{
|
||||
Segment: newSegment,
|
||||
Insertlogs: newSegment.GetBinlogs(),
|
||||
Deltalogs: newSegment.GetDeltalogs(),
|
||||
Statslogs: newSegment.GetStatslogs(),
|
||||
Segment: newSegment,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
|
|
|
@ -103,10 +103,7 @@ func (t AlterType) String() string {
|
|||
}
|
||||
|
||||
type BinlogsIncrement struct {
|
||||
Segment *datapb.SegmentInfo
|
||||
Insertlogs []*datapb.FieldBinlog
|
||||
Statslogs []*datapb.FieldBinlog
|
||||
Deltalogs []*datapb.FieldBinlog
|
||||
Segment *datapb.SegmentInfo
|
||||
}
|
||||
|
||||
//go:generate mockery --name=DataCoordCatalog --with-expecter
|
||||
|
|
|
@ -288,8 +288,9 @@ func (kc *Catalog) AlterSegments(ctx context.Context, segments []*datapb.Segment
|
|||
|
||||
for _, b := range binlogs {
|
||||
segment := b.Segment
|
||||
binlogKvs, err := buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(),
|
||||
segment.GetID(), cloneLogs(b.Insertlogs), cloneLogs(b.Deltalogs), cloneLogs(b.Statslogs), len(segment.GetCompactionFrom()) > 0)
|
||||
binlogKvs, err := buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(),
|
||||
cloneLogs(segment.GetBinlogs()), cloneLogs(segment.GetDeltalogs()), cloneLogs(segment.GetStatslogs()),
|
||||
len(segment.GetCompactionFrom()) > 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -329,10 +329,7 @@ func Test_AlterSegments(t *testing.T) {
|
|||
catalog := NewCatalog(metakv, rootPath, "")
|
||||
assert.Panics(t, func() {
|
||||
catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{invalidSegment}, metastore.BinlogsIncrement{
|
||||
Segment: invalidSegment,
|
||||
Insertlogs: invalidSegment.Binlogs,
|
||||
Statslogs: invalidSegment.Statslogs,
|
||||
Deltalogs: invalidSegment.Deltalogs,
|
||||
Segment: invalidSegment,
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -360,10 +357,7 @@ func Test_AlterSegments(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
|
||||
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segment1}, metastore.BinlogsIncrement{
|
||||
Segment: segment1,
|
||||
Insertlogs: segment1.Binlogs,
|
||||
Statslogs: segment1.Statslogs,
|
||||
Deltalogs: segment1.Deltalogs,
|
||||
Segment: segment1,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
@ -423,10 +417,7 @@ func Test_AlterSegments(t *testing.T) {
|
|||
|
||||
err = catalog.AlterSegments(context.TODO(), []*datapb.SegmentInfo{segmentXL},
|
||||
metastore.BinlogsIncrement{
|
||||
Segment: segmentXL,
|
||||
Insertlogs: segmentXL.Binlogs,
|
||||
Statslogs: segmentXL.Statslogs,
|
||||
Deltalogs: segmentXL.Deltalogs,
|
||||
Segment: segmentXL,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 255+3, len(savedKvs))
|
||||
|
|
Loading…
Reference in New Issue