Fix querynodev2 stop logic (#23487)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/23568/head
congqixia 2023-04-18 17:56:30 +08:00 committed by GitHub
parent d83654c33f
commit eb690ef033
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 13 deletions

View File

@ -89,6 +89,7 @@ type QueryNode struct {
// call once
initOnce sync.Once
stopOnce sync.Once
// internal components
manager *segments.Manager
@ -344,12 +345,21 @@ func (node *QueryNode) Start() error {
// Stop mainly stop QueryNode's query service, historical loop and streaming loop.
func (node *QueryNode) Stop() error {
log.Warn("Query node stop..")
node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.lifetime.Wait()
node.session.Revoke(time.Second)
node.pipelineManager.Close()
node.stopOnce.Do(func() {
log.Info("Query node stop...")
node.UpdateStateCode(commonpb.StateCode_Abnormal)
node.lifetime.Wait()
node.cancel()
if node.pipelineManager != nil {
node.pipelineManager.Close()
}
if node.session != nil {
node.session.Stop()
}
if node.dispClient != nil {
node.dispClient.Close()
}
})
return nil
}

View File

@ -37,6 +37,7 @@ type (
type Client interface {
Register(vchannel string, pos *Pos, subPos SubPos) (<-chan *MsgPack, error)
Deregister(vchannel string)
Close()
}
var _ Client = (*client)(nil)
@ -97,3 +98,16 @@ func (c *client) Deregister(vchannel string) {
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
}
}
func (c *client) Close() {
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID))
c.managerMu.Lock()
defer c.managerMu.Unlock()
for pchannel, manager := range c.managers {
log.Info("close manager", zap.String("channel", pchannel))
delete(c.managers, pchannel)
manager.Close()
}
log.Info("dispatcher client closed")
}

View File

@ -1,12 +1,12 @@
// Code generated by mockery v2.15.0. DO NOT EDIT.
// Code generated by mockery v2.16.0. DO NOT EDIT.
package msgdispatcher
import (
"github.com/milvus-io/milvus-proto/go-api/msgpb"
mqwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
mock "github.com/stretchr/testify/mock"
mqwrapper "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
msgpb "github.com/milvus-io/milvus-proto/go-api/msgpb"
msgstream "github.com/milvus-io/milvus/pkg/mq/msgstream"
)
@ -24,6 +24,33 @@ func (_m *MockClient) EXPECT() *MockClient_Expecter {
return &MockClient_Expecter{mock: &_m.Mock}
}
// Close provides a mock function with given fields:
func (_m *MockClient) Close() {
_m.Called()
}
// MockClient_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
type MockClient_Close_Call struct {
*mock.Call
}
// Close is a helper method to define mock.On call
func (_e *MockClient_Expecter) Close() *MockClient_Close_Call {
return &MockClient_Close_Call{Call: _e.mock.On("Close")}
}
func (_c *MockClient_Close_Call) Run(run func()) *MockClient_Close_Call {
_c.Call.Run(func(args mock.Arguments) {
run()
})
return _c
}
func (_c *MockClient_Close_Call) Return() *MockClient_Close_Call {
_c.Call.Return()
return _c
}
// Deregister provides a mock function with given fields: vchannel
func (_m *MockClient) Deregister(vchannel string) {
_m.Called(vchannel)
@ -35,7 +62,7 @@ type MockClient_Deregister_Call struct {
}
// Deregister is a helper method to define mock.On call
// - vchannel string
// - vchannel string
func (_e *MockClient_Expecter) Deregister(vchannel interface{}) *MockClient_Deregister_Call {
return &MockClient_Deregister_Call{Call: _e.mock.On("Deregister", vchannel)}
}
@ -81,9 +108,9 @@ type MockClient_Register_Call struct {
}
// Register is a helper method to define mock.On call
// - vchannel string
// - pos *msgpb.MsgPosition
// - subPos mqwrapper.SubscriptionInitialPosition
// - vchannel string
// - pos *msgpb.MsgPosition
// - subPos mqwrapper.SubscriptionInitialPosition
func (_e *MockClient_Expecter) Register(vchannel interface{}, pos interface{}, subPos interface{}) *MockClient_Register_Call {
return &MockClient_Register_Call{Call: _e.mock.On("Register", vchannel, pos, subPos)}
}