mirror of https://github.com/milvus-io/milvus.git
Fix rootcoord restoration missing gcConfirmStep (#25280)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/25454/head
parent
17b22b9b08
commit
f55b0545e3
|
@ -65,6 +65,7 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
|
|||
baseStep: baseStep{core: c.s},
|
||||
pChannels: collMeta.PhysicalChannelNames,
|
||||
})
|
||||
redo.AddAsyncStep(newConfirmGCStep(c.s, collMeta.CollectionID, allPartition))
|
||||
redo.AddAsyncStep(&deleteCollectionMetaStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
collectionID: collMeta.CollectionID,
|
||||
|
@ -125,6 +126,12 @@ func (c *bgGarbageCollector) ReDropPartition(dbID int64, pChannels []string, par
|
|||
collID: partition.CollectionID,
|
||||
partIDs: []UniqueID{partition.PartitionID},
|
||||
})
|
||||
redo.AddAsyncStep(&deletePartitionDataStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
pchans: pChannels,
|
||||
partition: partition,
|
||||
})
|
||||
redo.AddAsyncStep(newConfirmGCStep(c.s, partition.CollectionID, partition.PartitionID))
|
||||
redo.AddAsyncStep(&removePartitionMetaStep{
|
||||
baseStep: baseStep{core: c.s},
|
||||
dbID: dbID,
|
||||
|
|
|
@ -32,6 +32,11 @@ import (
|
|||
)
|
||||
|
||||
func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
||||
oldValue := confirmGCInterval
|
||||
defer func() {
|
||||
confirmGCInterval = oldValue
|
||||
}()
|
||||
confirmGCInterval = 0
|
||||
t.Run("failed to release collection", func(t *testing.T) {
|
||||
broker := newMockBroker()
|
||||
broker.ReleaseCollectionFunc = func(ctx context.Context, collectionID UniqueID) error {
|
||||
|
@ -107,6 +112,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
|||
releaseCollectionChan <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
dropCollectionIndexCalled := false
|
||||
dropCollectionIndexChan := make(chan struct{}, 1)
|
||||
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
|
||||
|
@ -143,6 +155,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
|||
assert.True(t, releaseCollectionCalled)
|
||||
<-dropCollectionIndexChan
|
||||
assert.True(t, dropCollectionIndexCalled)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
<-dropMetaChan
|
||||
})
|
||||
|
||||
|
@ -162,6 +176,13 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
|||
dropCollectionIndexChan <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
meta := mockrootcoord.NewIMetaTable(t)
|
||||
removeCollectionCalled := false
|
||||
removeCollectionChan := make(chan struct{}, 1)
|
||||
|
@ -194,6 +215,8 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
|
|||
assert.True(t, dropCollectionIndexCalled)
|
||||
<-removeCollectionChan
|
||||
assert.True(t, removeCollectionCalled)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -309,6 +332,11 @@ func TestGarbageCollectorCtx_RemoveCreatingCollection(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
||||
oldValue := confirmGCInterval
|
||||
defer func() {
|
||||
confirmGCInterval = oldValue
|
||||
}()
|
||||
confirmGCInterval = 0
|
||||
t.Run("failed to GcPartitionData", func(t *testing.T) {
|
||||
ticker := newTickerWithMockFailStream() // failed to broadcast drop msg.
|
||||
shardsNum := int(common.DefaultShardsNum)
|
||||
|
@ -344,15 +372,35 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
|||
return errors.New("error mock RemovePartition")
|
||||
})
|
||||
|
||||
broker := newMockBroker()
|
||||
dropCollectionIndexCalled := false
|
||||
dropCollectionIndexChan := make(chan struct{}, 1)
|
||||
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
|
||||
dropCollectionIndexCalled = true
|
||||
dropCollectionIndexChan <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
|
||||
tsoAllocator := newMockTsoAllocator()
|
||||
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
|
||||
return 100, nil
|
||||
}
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker))
|
||||
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.garbageCollector = gc
|
||||
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
|
||||
<-dropCollectionIndexChan
|
||||
assert.True(t, dropCollectionIndexCalled)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
<-removePartitionChan
|
||||
assert.True(t, removePartitionCalled)
|
||||
})
|
||||
|
@ -377,15 +425,35 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
|
|||
return nil
|
||||
})
|
||||
|
||||
broker := newMockBroker()
|
||||
dropCollectionIndexCalled := false
|
||||
dropCollectionIndexChan := make(chan struct{}, 1)
|
||||
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
|
||||
dropCollectionIndexCalled = true
|
||||
dropCollectionIndexChan <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
gcConfirmCalled := false
|
||||
gcConfirmChan := make(chan struct{})
|
||||
broker.GCConfirmFunc = func(ctx context.Context, collectionID, partitionID UniqueID) bool {
|
||||
gcConfirmCalled = true
|
||||
close(gcConfirmChan)
|
||||
return true
|
||||
}
|
||||
|
||||
tsoAllocator := newMockTsoAllocator()
|
||||
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
|
||||
return 100, nil
|
||||
}
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
|
||||
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex(), withBroker(broker))
|
||||
core.ddlTsLockManager = newDdlTsLockManager(core.tsoAllocator)
|
||||
gc := newBgGarbageCollector(core)
|
||||
core.garbageCollector = gc
|
||||
gc.ReDropPartition(0, pchans, &model.Partition{}, 100000)
|
||||
<-dropCollectionIndexChan
|
||||
assert.True(t, dropCollectionIndexCalled)
|
||||
<-gcConfirmChan
|
||||
assert.True(t, gcConfirmCalled)
|
||||
<-removePartitionChan
|
||||
assert.True(t, removePartitionCalled)
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue