mirror of https://github.com/milvus-io/milvus.git
Add unittest for BackGroundGC in DataNode (#6522)
Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/6535/head
parent
34370e2c05
commit
dca6c6afde
|
@ -203,8 +203,48 @@ func TestDataNode(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("Test BackGroundGC", func(te *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
node := newIDLEDataNodeMock(ctx)
|
||||
|
||||
collIDCh := make(chan UniqueID)
|
||||
go node.BackGroundGC(collIDCh)
|
||||
node.clearSignal = collIDCh
|
||||
|
||||
testDataSyncs := []struct {
|
||||
collID UniqueID
|
||||
dmChannelName string
|
||||
}{
|
||||
{1, "fake-dm-backgroundgc-1"},
|
||||
{2, "fake-dm-backgroundgc-2"},
|
||||
{3, "fake-dm-backgroundgc-3"},
|
||||
{4, ""},
|
||||
{1, ""},
|
||||
}
|
||||
|
||||
for i, t := range testDataSyncs {
|
||||
if i <= 2 {
|
||||
node.NewDataSyncService(&datapb.VchannelInfo{CollectionID: t.collID, ChannelName: t.dmChannelName})
|
||||
|
||||
msFactory := msgstream.NewPmsFactory()
|
||||
insertStream, _ := msFactory.NewMsgStream(ctx)
|
||||
var insertMsgStream msgstream.MsgStream = insertStream
|
||||
insertMsgStream.Start()
|
||||
}
|
||||
|
||||
collIDCh <- t.collID
|
||||
}
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
node.chanMut.Lock()
|
||||
defer node.chanMut.Unlock()
|
||||
return len(node.vchan2FlushCh) == 0
|
||||
}, time.Second, time.Millisecond)
|
||||
|
||||
cancel()
|
||||
})
|
||||
|
||||
t.Run("Test ReleaseDataSyncService", func(t *testing.T) {
|
||||
t.Skip()
|
||||
dmChannelName := "fake-dm-channel-test-NewDataSyncService"
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
|
@ -214,9 +254,9 @@ func TestDataNode(t *testing.T) {
|
|||
}
|
||||
|
||||
err := node.NewDataSyncService(vchan)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 1, len(node.vchan2FlushCh))
|
||||
assert.Equal(t, 1, len(node.vchan2SyncService))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(node.vchan2FlushCh))
|
||||
require.Equal(t, 1, len(node.vchan2SyncService))
|
||||
time.Sleep(time.Second)
|
||||
|
||||
node.ReleaseDataSyncService(dmChannelName)
|
||||
|
@ -275,40 +315,6 @@ func TestDataNode(t *testing.T) {
|
|||
cancel()
|
||||
})
|
||||
|
||||
t.Run("Test BackGroundGC", func(t *testing.T) {
|
||||
t.Skipf("Skip for data race")
|
||||
collIDCh := make(chan UniqueID)
|
||||
go node.BackGroundGC(collIDCh)
|
||||
|
||||
dmChannelName := "fake-dm-channel-test-BackGroundGC"
|
||||
|
||||
vchan := &datapb.VchannelInfo{
|
||||
CollectionID: 1,
|
||||
ChannelName: dmChannelName,
|
||||
UnflushedSegments: []*datapb.SegmentInfo{},
|
||||
}
|
||||
require.Equal(t, 0, len(node.vchan2FlushCh))
|
||||
require.Equal(t, 0, len(node.vchan2SyncService))
|
||||
|
||||
err := node.NewDataSyncService(vchan)
|
||||
require.NoError(t, err)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
require.Equal(t, 1, len(node.vchan2FlushCh))
|
||||
require.Equal(t, 1, len(node.vchan2SyncService))
|
||||
|
||||
collIDCh <- 1
|
||||
assert.Eventually(t, func() bool {
|
||||
return len(node.vchan2FlushCh) == 0
|
||||
}, time.Second*4, time.Millisecond)
|
||||
|
||||
assert.Equal(t, 0, len(node.vchan2SyncService))
|
||||
|
||||
s, ok := node.vchan2SyncService[dmChannelName]
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, s)
|
||||
})
|
||||
|
||||
cancel()
|
||||
<-node.ctx.Done()
|
||||
node.Stop()
|
||||
|
|
|
@ -88,6 +88,7 @@ func (ddn *ddNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
|
|||
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
|
||||
log.Info("Destroying current flowgraph", zap.Any("collectionID", ddn.collectionID))
|
||||
ddn.clearSignal <- ddn.collectionID
|
||||
return []Msg{}
|
||||
}
|
||||
case commonpb.MsgType_Insert:
|
||||
log.Debug("DDNode with insert messages")
|
||||
|
|
Loading…
Reference in New Issue