mirror of https://github.com/milvus-io/milvus.git
parent
7fee4743f5
commit
9deb245d77
|
@ -48,6 +48,8 @@ var (
|
|||
globalSegInfosMutex sync.RWMutex
|
||||
)
|
||||
|
||||
type rpcHandler func() (*commonpb.Status, error)
|
||||
|
||||
type queryNodeServerMock struct {
|
||||
querypb.QueryNodeServer
|
||||
ctx context.Context
|
||||
|
@ -60,14 +62,15 @@ type queryNodeServerMock struct {
|
|||
queryNodePort int64
|
||||
queryNodeID int64
|
||||
|
||||
addQueryChannels func() (*commonpb.Status, error)
|
||||
removeQueryChannels func() (*commonpb.Status, error)
|
||||
watchDmChannels func() (*commonpb.Status, error)
|
||||
watchDeltaChannels func() (*commonpb.Status, error)
|
||||
loadSegment func() (*commonpb.Status, error)
|
||||
releaseCollection func() (*commonpb.Status, error)
|
||||
releasePartition func() (*commonpb.Status, error)
|
||||
releaseSegments func() (*commonpb.Status, error)
|
||||
rwmutex sync.RWMutex // guard for all modification
|
||||
addQueryChannels rpcHandler
|
||||
removeQueryChannels rpcHandler
|
||||
watchDmChannels rpcHandler
|
||||
watchDeltaChannels rpcHandler
|
||||
loadSegment rpcHandler
|
||||
releaseCollection rpcHandler
|
||||
releasePartition rpcHandler
|
||||
releaseSegments rpcHandler
|
||||
getSegmentInfos func() (*querypb.GetSegmentInfoResponse, error)
|
||||
getMetrics func() (*milvuspb.GetMetricsResponse, error)
|
||||
|
||||
|
@ -83,6 +86,7 @@ func newQueryNodeServerMock(ctx context.Context) *queryNodeServerMock {
|
|||
cancel: cancel,
|
||||
grpcErrChan: make(chan error),
|
||||
|
||||
rwmutex: sync.RWMutex{},
|
||||
addQueryChannels: returnSuccessResult,
|
||||
removeQueryChannels: returnSuccessResult,
|
||||
watchDmChannels: returnSuccessResult,
|
||||
|
@ -100,6 +104,12 @@ func newQueryNodeServerMock(ctx context.Context) *queryNodeServerMock {
|
|||
}
|
||||
}
|
||||
|
||||
func (qs *queryNodeServerMock) setRPCInterface(interfacePointer *rpcHandler, newhandler rpcHandler) {
|
||||
qs.rwmutex.Lock()
|
||||
defer qs.rwmutex.Unlock()
|
||||
*interfacePointer = newhandler
|
||||
}
|
||||
|
||||
func (qs *queryNodeServerMock) Register() error {
|
||||
log.Debug("query node session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath))
|
||||
etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg)
|
||||
|
@ -231,6 +241,8 @@ func (qs *queryNodeServerMock) LoadSegments(ctx context.Context, req *querypb.Lo
|
|||
}
|
||||
|
||||
func (qs *queryNodeServerMock) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
qs.rwmutex.RLock()
|
||||
defer qs.rwmutex.RUnlock()
|
||||
return qs.releaseCollection()
|
||||
}
|
||||
|
||||
|
|
|
@ -515,6 +515,7 @@ func Test_saveInternalTaskToEtcd(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
queryCoord, err := startQueryCoord(ctx)
|
||||
assert.Nil(t, err)
|
||||
defer queryCoord.Stop()
|
||||
|
||||
testTask := &testTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -577,6 +578,7 @@ func Test_generateDerivedInternalTasks(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
queryCoord.Stop()
|
||||
err = removeAllSession()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
|
@ -520,20 +520,16 @@ func Test_ReleaseCollectionExecuteFail(t *testing.T) {
|
|||
|
||||
node, err := startQueryNodeServer(ctx)
|
||||
assert.Nil(t, err)
|
||||
node.releaseCollection = returnFailedResult
|
||||
node.setRPCInterface(&node.releaseCollection, returnFailedResult)
|
||||
|
||||
waitQueryNodeOnline(queryCoord.cluster, node.queryNodeID)
|
||||
releaseCollectionTask := genReleaseCollectionTask(ctx, queryCoord)
|
||||
err = queryCoord.scheduler.Enqueue(releaseCollectionTask)
|
||||
assert.Nil(t, err)
|
||||
|
||||
for {
|
||||
if releaseCollectionTask.getState() == taskDone {
|
||||
break
|
||||
}
|
||||
}
|
||||
node.releaseCollection = returnSuccessResult
|
||||
waitTaskFinalState(releaseCollectionTask, taskDone)
|
||||
|
||||
node.setRPCInterface(&node.releaseCollection, returnSuccessResult)
|
||||
waitTaskFinalState(releaseCollectionTask, taskExpired)
|
||||
|
||||
node.stop()
|
||||
|
@ -1107,10 +1103,12 @@ func TestLoadBalanceAndRescheduleDmChannelTaskAfterNodeDown(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
queryCoord, err := startQueryCoord(ctx)
|
||||
assert.Nil(t, err)
|
||||
defer queryCoord.Stop()
|
||||
|
||||
node1, err := startQueryNodeServer(ctx)
|
||||
assert.Nil(t, err)
|
||||
waitQueryNodeOnline(queryCoord.cluster, node1.queryNodeID)
|
||||
defer node1.stop()
|
||||
|
||||
loadCollectionTask := genLoadCollectionTask(ctx, queryCoord)
|
||||
|
||||
|
@ -1120,6 +1118,7 @@ func TestLoadBalanceAndRescheduleDmChannelTaskAfterNodeDown(t *testing.T) {
|
|||
|
||||
node2, err := startQueryNodeServer(ctx)
|
||||
assert.Nil(t, err)
|
||||
defer node2.stop()
|
||||
node2.watchDmChannels = returnFailedResult
|
||||
waitQueryNodeOnline(queryCoord.cluster, node2.queryNodeID)
|
||||
|
||||
|
@ -1134,6 +1133,7 @@ func TestLoadBalanceAndRescheduleDmChannelTaskAfterNodeDown(t *testing.T) {
|
|||
|
||||
node3, err := startQueryNodeServer(ctx)
|
||||
assert.Nil(t, err)
|
||||
defer node3.stop()
|
||||
waitQueryNodeOnline(queryCoord.cluster, node3.queryNodeID)
|
||||
|
||||
dmChannelInfos := queryCoord.meta.getDmChannelInfosByNodeID(node3.queryNodeID)
|
||||
|
|
Loading…
Reference in New Issue