Add triggerQueue check for handoff verifyReq (#19064)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/19063/head
congqixia 2022-09-07 16:14:35 +08:00 committed by GitHub
parent c357a7aa69
commit 39b847b67a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 3652 additions and 5 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -73,6 +73,23 @@ func (broker *globalMetaBroker) invalidateCollectionMetaCache(ctx context.Contex
return nil
}
func (broker *globalMetaBroker) describeCollection(ctx context.Context, collectionID UniqueID) (*schemapb.CollectionSchema, error) {
req := &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
},
CollectionID: collectionID,
}
resp, err := broker.rootCoord.DescribeCollection(ctx, req)
if err != nil {
log.Warn("failed to describe collection schema", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
return resp.GetSchema(), nil
}
func (broker *globalMetaBroker) showPartitionIDs(ctx context.Context, collectionID UniqueID) ([]UniqueID, error) {
ctx2, cancel2 := context.WithTimeout(ctx, timeoutForRPC)
defer cancel2()

View File

@ -18,14 +18,61 @@ package querycoord
import (
"context"
"errors"
"testing"
"github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
var globalMetaTestDir = "/tmp/milvus_test/global_meta"
func getMockGlobalMetaBroker(ctx context.Context) (*globalMetaBroker, *mocks.DataCoord, *mocks.RootCoord, error) {
dc := &mocks.DataCoord{}
rc := &mocks.RootCoord{}
handler, err := newGlobalMetaBroker(ctx, rc, dc, nil, nil)
return handler, dc, rc, err
}
func TestGlobalMetaBroker_describeCollection(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Run("success case", func(t *testing.T) {
rootCoord := &mocks.RootCoord{}
defer cancel()
handler, err := newGlobalMetaBroker(ctx, rootCoord, nil, nil, nil)
require.NoError(t, err)
schema := genDefaultCollectionSchema(false)
rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Schema: schema,
}, nil)
result, err := handler.describeCollection(ctx, defaultCollectionID)
assert.NoError(t, err)
assert.Equal(t, schema, result)
})
t.Run("failure case", func(t *testing.T) {
rootCoord := &mocks.RootCoord{}
defer cancel()
handler, err := newGlobalMetaBroker(ctx, rootCoord, nil, nil, nil)
require.NoError(t, err)
rootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, errors.New("mocked error"))
_, err = handler.describeCollection(ctx, defaultCollectionID)
assert.Error(t, err)
})
}
func TestGlobalMetaBroker_DataCoord(t *testing.T) {
refreshParams()
ctx, cancel := context.WithCancel(context.Background())

View File

@ -168,7 +168,23 @@ func (handler *HandoffHandler) verifyRequest(req *querypb.SegmentInfo) (bool, *q
// if collection has not been loaded, then skip the segment
collectionInfo, err := handler.meta.getCollectionInfoByID(req.CollectionID)
if err != nil {
return false, nil
msgType := handler.scheduler.triggerTaskQueue.willLoadOrRelease(req.CollectionID)
switch msgType {
case commonpb.MsgType_LoadCollection, commonpb.MsgType_LoadPartitions:
// collection/partition may be loaded, return valid and let handoff task do the check
schema, err := handler.broker.describeCollection(handler.ctx, req.CollectionID)
if err != nil {
return false, nil
}
collectionInfo = &querypb.CollectionInfo{
CollectionID: req.CollectionID,
Schema: schema,
// use load collection to by-pass partition id check
LoadType: querypb.LoadType_LoadCollection,
}
default:
return false, nil
}
}
// if partition has not been loaded or released, then skip handoff the segment

View File

@ -17,6 +17,7 @@
package querycoord
import (
"container/list"
"context"
"fmt"
"math/rand"
@ -27,11 +28,14 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
@ -64,8 +68,13 @@ func TestHandoffHandlerReloadFromKV(t *testing.T) {
err = kv.Save(key, string(value))
assert.Nil(t, err)
scheduler, err := newTaskScheduler(baseCtx, meta, nil, kv, nil, func() (UniqueID, error) {
return 1, nil
})
require.NoError(t, err)
t.Run("Test_CollectionNotExist", func(t *testing.T) {
handoffHandler, err := newHandoffHandler(baseCtx, kv, meta, nil, nil, nil)
handoffHandler, err := newHandoffHandler(baseCtx, kv, meta, nil, scheduler, nil)
assert.Nil(t, err)
assert.True(t, len(handoffHandler.tasks) > 0)
for _, task := range handoffHandler.tasks {
@ -100,6 +109,32 @@ func TestHandoffHandlerReloadFromKV(t *testing.T) {
}
})
t.Run("Test_LoadInQueue", func(t *testing.T) {
broker, _, rc, err := getMockGlobalMetaBroker(baseCtx)
assert.NoError(t, err)
list := list.New()
list.PushBack(&querypb.LoadCollectionRequest{
CollectionID: defaultCollectionID,
})
handler, err := newHandoffHandler(baseCtx, kv, meta, nil, &TaskScheduler{
triggerTaskQueue: &taskQueue{
tasks: list,
},
}, broker)
schema := genDefaultCollectionSchema(false)
rc.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Schema: schema,
}, nil)
assert.Nil(t, err)
assert.True(t, len(handler.tasks) > 0)
for _, task := range handler.tasks {
assert.Equal(t, handoffTaskInit, task.state)
}
})
cancel()
}

View File

@ -32,6 +32,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
@ -635,15 +636,24 @@ func TestQueryCoord_watchHandoffSegmentLoop(t *testing.T) {
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath)
broker, _, _, err := getMockGlobalMetaBroker(ctx)
require.NoError(t, err)
scheduler, err := newTaskScheduler(ctx, nil, nil, etcdKV, broker, func() (UniqueID, error) {
return 1, nil
})
require.NoError(t, err)
qc := &QueryCoord{
loopCtx: ctx,
loopWg: sync.WaitGroup{},
kvClient: etcdKV,
handoffHandler: &HandoffHandler{
ctx: ctx,
cancel: cancel,
client: etcdKV,
ctx: ctx,
cancel: cancel,
client: etcdKV,
scheduler: scheduler,
},
scheduler: scheduler,
}
t.Run("chan closed", func(t *testing.T) {