mirror of https://github.com/milvus-io/milvus.git
Avoid update all binlogs every a segment is updated (#25526)
Signed-off-by: sunby <bingyi.sun@zilliz.com> Co-authored-by: sunby <bingyi.sun@zilliz.com>pull/25552/head
parent
ed3e4b0bc5
commit
b22d6d2a5d
|
@ -1091,10 +1091,11 @@ func (m *meta) copyDeltaFiles(binlogs []*datapb.FieldBinlog, collectionID, parti
|
|||
}
|
||||
|
||||
func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segmentsCompactFrom []*SegmentInfo) error {
|
||||
modInfos := make([]*datapb.SegmentInfo, len(segmentsCompactFrom))
|
||||
for i := range segmentsCompactFrom {
|
||||
modInfos[i] = segmentsCompactFrom[i].SegmentInfo
|
||||
modInfos := make([]*datapb.SegmentInfo, 0, len(segmentsCompactFrom))
|
||||
for _, segment := range segmentsCompactFrom {
|
||||
modInfos = append(modInfos, segment.SegmentInfo)
|
||||
}
|
||||
|
||||
newSegment := segmentCompactTo.SegmentInfo
|
||||
|
||||
modSegIDs := lo.Map(modInfos, func(segment *datapb.SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
|
@ -1110,7 +1111,12 @@ func (m *meta) alterMetaStoreAfterCompaction(segmentCompactTo *SegmentInfo, segm
|
|||
zap.Int("delta logs", len(newSegment.GetDeltalogs())),
|
||||
zap.Int64("compact to segment", newSegment.GetID()))
|
||||
|
||||
err := m.catalog.AlterSegmentsAndAddNewSegment(m.ctx, modInfos, newSegment)
|
||||
err := m.catalog.AlterSegments(m.ctx, append(modInfos, newSegment), metastore.BinlogsIncrement{
|
||||
Segment: newSegment,
|
||||
Insertlogs: newSegment.GetBinlogs(),
|
||||
Deltalogs: newSegment.GetDeltalogs(),
|
||||
Statslogs: newSegment.GetStatslogs(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Warn("fail to alter segments and new segment", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -254,12 +254,12 @@ func (kc *Catalog) LoadFromSegmentPath(colID, partID, segID typeutil.UniqueID) (
|
|||
return segInfo, nil
|
||||
}
|
||||
|
||||
func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error {
|
||||
if len(newSegments) == 0 {
|
||||
func (kc *Catalog) AlterSegments(ctx context.Context, segments []*datapb.SegmentInfo, binlogs ...metastore.BinlogsIncrement) error {
|
||||
if len(segments) == 0 {
|
||||
return nil
|
||||
}
|
||||
kvs := make(map[string]string)
|
||||
for _, segment := range newSegments {
|
||||
for _, segment := range segments {
|
||||
kc.collectMetrics(segment)
|
||||
|
||||
// we don't persist binlog fields, but instead store binlogs as independent kvs
|
||||
|
@ -271,6 +271,14 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm
|
|||
cloned.NumOfRows = rowCount
|
||||
}
|
||||
|
||||
if segment.GetState() == commonpb.SegmentState_Dropped {
|
||||
binlogs, err := kc.handleDroppedSegment(segment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
maps.Copy(kvs, binlogs)
|
||||
}
|
||||
|
||||
k, v, err := buildSegmentKv(cloned)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -288,6 +296,26 @@ func (kc *Catalog) AlterSegments(ctx context.Context, newSegments []*datapb.Segm
|
|||
maps.Copy(kvs, binlogKvs)
|
||||
}
|
||||
|
||||
return kc.SaveByBatch(kvs)
|
||||
}
|
||||
|
||||
func (kc *Catalog) handleDroppedSegment(segment *datapb.SegmentInfo) (kvs map[string]string, err error) {
|
||||
var has bool
|
||||
has, err = kc.hasBinlogPrefix(segment)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// To be compatible with previous implementation, we have to write binlogs on etcd for correct gc.
|
||||
if !has {
|
||||
kvs, err = buildBinlogKvsWithLogID(segment.GetCollectionID(), segment.GetPartitionID(), segment.GetID(), cloneLogs(segment.GetBinlogs()), cloneLogs(segment.GetDeltalogs()), cloneLogs(segment.GetStatslogs()), true)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (kc *Catalog) SaveByBatch(kvs map[string]string) error {
|
||||
saveFn := func(partialKvs map[string]string) error {
|
||||
return kc.MetaKv.MultiSave(partialKvs)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue