mirror of https://github.com/milvus-io/milvus.git
fix:handle err when ManualCompaction (#28804)
handle err when ManualCompaction #28644 Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/28866/head
parent
5d0a9f9344
commit
adce6aab4f
|
@ -119,8 +119,14 @@ func (t *compactionTrigger) start() {
|
|||
case signal := <-t.signals:
|
||||
switch {
|
||||
case signal.isGlobal:
|
||||
t.handleGlobalSignal(signal)
|
||||
// ManualCompaction also use use handleGlobalSignal
|
||||
// so throw err here
|
||||
err := t.handleGlobalSignal(signal)
|
||||
if err != nil {
|
||||
log.Warn("unable to handleGlobalSignal", zap.Error(err))
|
||||
}
|
||||
default:
|
||||
// no need to handle err in handleSignal
|
||||
t.handleSignal(signal)
|
||||
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
|
||||
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
|
||||
|
@ -262,7 +268,13 @@ func (t *compactionTrigger) forceTriggerCompaction(collectionID int64) (UniqueID
|
|||
isGlobal: true,
|
||||
collectionID: collectionID,
|
||||
}
|
||||
t.handleGlobalSignal(signal)
|
||||
|
||||
err = t.handleGlobalSignal(signal)
|
||||
if err != nil {
|
||||
log.Warn("unable to handleGlobalSignal", zap.Error(err))
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
|
@ -332,11 +344,14 @@ func (t *compactionTrigger) updateSegmentMaxSize(segments []*SegmentInfo) (bool,
|
|||
return isDiskANN, nil
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
||||
func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
|
||||
t.forceMu.Lock()
|
||||
defer t.forceMu.Unlock()
|
||||
|
||||
log := log.With(zap.Int64("compactionID", signal.id))
|
||||
log := log.With(zap.Int64("compactionID", signal.id),
|
||||
zap.Int64("signal.collectionID", signal.collectionID),
|
||||
zap.Int64("signal.partitionID", signal.partitionID),
|
||||
zap.Int64("signal.segmentID", signal.segmentID))
|
||||
m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
|
||||
return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) &&
|
||||
isSegmentHealthy(segment) &&
|
||||
|
@ -346,20 +361,22 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
}) // m is list of chanPartSegments, which is channel-partition organized segments
|
||||
|
||||
if len(m) == 0 {
|
||||
return
|
||||
log.Info("the length of SegmentsChanPart is 0, skip to handle compaction")
|
||||
return nil
|
||||
}
|
||||
|
||||
ts, err := t.allocTs()
|
||||
if err != nil {
|
||||
log.Warn("allocate ts failed, skip to handle compaction",
|
||||
zap.Int64("collectionID", signal.collectionID),
|
||||
zap.Int64("partitionID", signal.partitionID),
|
||||
zap.Int64("segmentID", signal.segmentID))
|
||||
return
|
||||
log.Warn("allocate ts failed, skip to handle compaction")
|
||||
return err
|
||||
}
|
||||
|
||||
for _, group := range m {
|
||||
log := log.With(zap.Int64("collectionID", group.collectionID),
|
||||
zap.Int64("partitionID", group.partitionID),
|
||||
zap.String("channel", group.channelName))
|
||||
if !signal.isForce && t.compactionHandler.isFull() {
|
||||
log.Warn("compaction plan skipped due to handler full")
|
||||
break
|
||||
}
|
||||
if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() {
|
||||
|
@ -374,20 +391,15 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
|
||||
coll, err := t.getCollection(group.collectionID)
|
||||
if err != nil {
|
||||
log.Warn("get collection info failed, skip handling compaction",
|
||||
zap.Int64("collectionID", group.collectionID),
|
||||
zap.Int64("partitionID", group.partitionID),
|
||||
zap.String("channel", group.channelName),
|
||||
zap.Error(err),
|
||||
)
|
||||
return
|
||||
log.Warn("get collection info failed, skip handling compaction", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) {
|
||||
log.RatedInfo(20, "collection auto compaction disabled",
|
||||
zap.Int64("collectionID", group.collectionID),
|
||||
)
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
ct, err := t.getCompactTime(ts, coll)
|
||||
|
@ -396,7 +408,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
zap.Int64("collectionID", group.collectionID),
|
||||
zap.Int64("partitionID", group.partitionID),
|
||||
zap.String("channel", group.channelName))
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
plans := t.generatePlans(group.segments, signal.isForce, isDiskIndex, ct)
|
||||
|
@ -442,6 +454,7 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
zap.Int64s("segmentIDs", segIDs))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleSignal processes segment flush caused partition-chan level compaction signal
|
||||
|
@ -451,6 +464,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
|
|||
|
||||
// 1. check whether segment's binlogs should be compacted or not
|
||||
if t.compactionHandler.isFull() {
|
||||
log.Warn("compaction plan skipped due to handler full")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -466,6 +480,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
|
|||
segments := t.getCandidateSegments(channel, partitionID)
|
||||
|
||||
if len(segments) == 0 {
|
||||
log.Info("the length of segments is 0, skip to handle compaction")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue