mirror of https://github.com/milvus-io/milvus.git
Fix Flush hang after SyncSegments timeout (#24953)
Signed-off-by: jaime <yun.zhang@zilliz.com>pull/25009/head
parent
af1d84e5e1
commit
b75dcf90c0
|
@ -58,11 +58,12 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
|
|||
task.complete()
|
||||
c.executing.Delete(task.getPlanID())
|
||||
}
|
||||
func (c *compactionExecutor) injectDone(planID UniqueID) {
|
||||
|
||||
func (c *compactionExecutor) injectDone(planID UniqueID, success bool) {
|
||||
c.completed.Delete(planID)
|
||||
task, loaded := c.completedCompactor.LoadAndDelete(planID)
|
||||
if loaded {
|
||||
task.(compactor).injectDone()
|
||||
task.(compactor).injectDone(success)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,7 +129,7 @@ func (c *compactionExecutor) stopExecutingtaskByVChannelName(vChannelName string
|
|||
// remove all completed plans for vChannelName
|
||||
c.completed.Range(func(key interface{}, value interface{}) bool {
|
||||
if value.(*datapb.CompactionResult).GetChannel() == vChannelName {
|
||||
c.injectDone(key.(UniqueID))
|
||||
c.injectDone(key.(UniqueID), true)
|
||||
log.Info("remove compaction results for dropped channel",
|
||||
zap.String("channel", vChannelName),
|
||||
zap.Int64("planID", key.(UniqueID)))
|
||||
|
|
|
@ -142,7 +142,7 @@ func (mc *mockCompactor) complete() {
|
|||
mc.done <- struct{}{}
|
||||
}
|
||||
|
||||
func (mc *mockCompactor) injectDone() {
|
||||
func (mc *mockCompactor) injectDone(success bool) {
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ type iterator = storage.Iterator
|
|||
type compactor interface {
|
||||
complete()
|
||||
compact() (*datapb.CompactionResult, error)
|
||||
injectDone()
|
||||
injectDone(success bool)
|
||||
stop()
|
||||
getPlanID() UniqueID
|
||||
getCollection() UniqueID
|
||||
|
@ -126,7 +126,7 @@ func (t *compactionTask) complete() {
|
|||
func (t *compactionTask) stop() {
|
||||
t.cancel()
|
||||
<-t.done
|
||||
t.injectDone()
|
||||
t.injectDone(true)
|
||||
}
|
||||
|
||||
func (t *compactionTask) getPlanID() UniqueID {
|
||||
|
@ -759,10 +759,10 @@ func (t *compactionTask) compact() (*datapb.CompactionResult, error) {
|
|||
return pack, nil
|
||||
}
|
||||
|
||||
func (t *compactionTask) injectDone() {
|
||||
func (t *compactionTask) injectDone(success bool) {
|
||||
if t.inject != nil {
|
||||
uninjectStart := time.Now()
|
||||
t.inject.injectDone(true)
|
||||
t.inject.injectDone(success)
|
||||
uninjectEnd := time.Now()
|
||||
log.Info("uninject elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Float64("elapse", nano2Milli(uninjectEnd.Sub(uninjectStart))))
|
||||
}
|
||||
|
|
|
@ -863,7 +863,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
task.injectDone(true)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
}
|
||||
|
@ -958,7 +958,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
assert.NotEmpty(t, result.Field2StatslogPaths)
|
||||
|
||||
assert.Equal(t, 0, mockfm.injectCount())
|
||||
task.injectDone()
|
||||
task.injectDone(true)
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
assert.Equal(t, 1, mockfm.injectCount())
|
||||
})
|
||||
|
|
|
@ -399,9 +399,10 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
|
|||
ds.fg.Blockall()
|
||||
defer ds.fg.Unblock()
|
||||
if err := channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
|
||||
node.compactionExecutor.injectDone(req.GetPlanID(), false)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
node.compactionExecutor.injectDone(req.GetPlanID())
|
||||
node.compactionExecutor.injectDone(req.GetPlanID(), true)
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue