enhance: Make datanode exit and case `TestProxy` faster (#32218)

/kind improvement
issue: #32219

Signed-off-by: SimFG <bang.fu@zilliz.com>
pull/32297/head
SimFG 2024-04-16 10:49:20 +08:00 committed by GitHub
parent 7f0c56ad0a
commit 1af084ea6b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 60 additions and 54 deletions

View File

@ -434,6 +434,7 @@ func (node *DataNode) Stop() error {
// https://github.com/milvus-io/milvus/issues/12282
node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.flowgraphManager.Close()
node.eventManager.CloseAll()
if node.writeBufferManager != nil {

View File

@ -41,16 +41,23 @@ type FlowgraphManager interface {
HasFlowgraphWithOpID(channel string, opID UniqueID) bool
GetFlowgraphCount() int
GetCollectionIDs() []int64
Close()
}
var _ FlowgraphManager = (*fgManagerImpl)(nil)
type fgManagerImpl struct {
ctx context.Context
cancelFunc context.CancelFunc
flowgraphs *typeutil.ConcurrentMap[string, *dataSyncService]
}
func newFlowgraphManager() *fgManagerImpl {
ctx, cancelFunc := context.WithCancel(context.TODO())
return &fgManagerImpl{
ctx: ctx,
cancelFunc: cancelFunc,
flowgraphs: typeutil.NewConcurrentMap[string, *dataSyncService](),
}
}
@ -67,7 +74,7 @@ func (fm *fgManagerImpl) AddandStartWithEtcdTickler(dn *DataNode, vchan *datapb.
return nil
}
dataSyncService, err := newServiceWithEtcdTickler(context.TODO(), dn, &datapb.ChannelWatchInfo{
dataSyncService, err := newServiceWithEtcdTickler(fm.ctx, dn, &datapb.ChannelWatchInfo{
Schema: schema,
Vchan: vchan,
}, tickler)
@ -131,3 +138,7 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 {
return collectionSet.Collect()
}
func (fm *fgManagerImpl) Close() {
fm.cancelFunc()
}

View File

@ -358,69 +358,21 @@ func TestProxy(t *testing.T) {
rc := runRootCoord(ctx, localMsg)
log.Info("running RootCoord ...")
if rc != nil {
defer func() {
err := rc.Stop()
assert.NoError(t, err)
log.Info("stop RootCoord")
}()
}
dc := runDataCoord(ctx, localMsg)
log.Info("running DataCoord ...")
if dc != nil {
defer func() {
err := dc.Stop()
assert.NoError(t, err)
log.Info("stop DataCoord")
}()
}
dn := runDataNode(ctx, localMsg, alias)
log.Info("running DataNode ...")
if dn != nil {
defer func() {
err := dn.Stop()
assert.NoError(t, err)
log.Info("stop DataNode")
}()
}
qc := runQueryCoord(ctx, localMsg)
log.Info("running QueryCoord ...")
if qc != nil {
defer func() {
err := qc.Stop()
assert.NoError(t, err)
log.Info("stop QueryCoord")
}()
}
qn := runQueryNode(ctx, localMsg, alias)
log.Info("running QueryNode ...")
if qn != nil {
defer func() {
err := qn.Stop()
assert.NoError(t, err)
log.Info("stop query node")
}()
}
in := runIndexNode(ctx, localMsg, alias)
log.Info("running IndexNode ...")
if in != nil {
defer func() {
err := in.Stop()
assert.NoError(t, err)
log.Info("stop IndexNode")
}()
}
time.Sleep(10 * time.Millisecond)
proxy, err := NewProxy(ctx, factory)
@ -488,8 +440,52 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err)
log.Info("Register proxy done")
defer func() {
err := proxy.Stop()
assert.NoError(t, err)
a := []any{rc, dc, qc, qn, in, dn, proxy}
fmt.Println(len(a))
// HINT: the order of stopping service refers to the `roles.go` file
log.Info("start to stop the services")
{
err := rc.Stop()
assert.NoError(t, err)
log.Info("stop RootCoord")
}
{
err := dc.Stop()
assert.NoError(t, err)
log.Info("stop DataCoord")
}
{
err := qc.Stop()
assert.NoError(t, err)
log.Info("stop QueryCoord")
}
{
err := qn.Stop()
assert.NoError(t, err)
log.Info("stop query node")
}
{
err := in.Stop()
assert.NoError(t, err)
log.Info("stop IndexNode")
}
{
err := dn.Stop()
assert.NoError(t, err)
log.Info("stop DataNode")
}
{
err := proxy.Stop()
assert.NoError(t, err)
log.Info("stop Proxy")
}
cancel()
}()
t.Run("get component states", func(t *testing.T) {
@ -3848,11 +3844,9 @@ func TestProxy(t *testing.T) {
assert.Equal(t, 0, len(resp.ErrIndex))
assert.Equal(t, int64(rowNum), resp.UpsertCnt)
})
testServer.gracefulStop()
wg.Wait()
cancel()
log.Info("case done")
}
func testProxyRole(ctx context.Context, t *testing.T, proxy *Proxy) {