Refactor master scheduler: show_collections

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2020-11-14 15:37:07 +08:00 committed by yefu.chen
parent e94138816f
commit fd6d0c78e5
3 changed files with 81 additions and 32 deletions

View File

@ -7,7 +7,6 @@ import (
"github.com/golang/protobuf/proto"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
@ -16,8 +15,6 @@ import (
type Timestamp = typeutil.Timestamp
const collectionMetaPrefix = "collection/"
type createCollectionTask struct {
baseTask
req *internalpb.CreateCollectionRequest
@ -211,19 +208,12 @@ func (t *showCollectionsTask) Execute() error {
return errors.New("null request")
}
collections := make([]string, 0)
for _, collection := range t.mt.collID2Meta {
collections = append(collections, collection.Schema.Name)
colls, err := t.mt.ListCollections()
if err != nil {
return err
}
stringListResponse := servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
Values: collections,
}
t.stringListResponse = &stringListResponse
t.stringListResponse.Values = colls
return nil
}

View File

@ -17,7 +17,7 @@ import (
"google.golang.org/grpc"
)
func TestMaster_CreateCollectionTask(t *testing.T) {
func TestMaster_CollectionTask(t *testing.T) {
err := gparams.GParams.LoadYaml("config.yaml")
if err != nil {
panic(err)
@ -118,6 +118,7 @@ func TestMaster_CreateCollectionTask(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
// HasCollection
reqHasCollection := internalpb.HasCollectionRequest{
MsgType: internalpb.MsgType_kHasCollection,
ReqID: 1,
@ -128,27 +129,46 @@ func TestMaster_CreateCollectionTask(t *testing.T) {
},
}
// HasCollection "col1" is true
// "col1" is true
log.Printf("... [Has] collection col1\n")
boolResp, err := cli.HasCollection(ctx, &reqHasCollection)
assert.Nil(t, err)
assert.Equal(t, true, boolResp.Value)
assert.Equal(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
// HasCollection "colNotExist" is false
// "colNotExist" is false
reqHasCollection.CollectionName.CollectionName = "colNotExist"
boolResp, err = cli.HasCollection(ctx, &reqHasCollection)
assert.Nil(t, err)
assert.Equal(t, boolResp.Value, false)
assert.Equal(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
// HasCollection error
// error
reqHasCollection.Timestamp = Timestamp(10)
reqHasCollection.CollectionName.CollectionName = "col1"
boolResp, err = cli.HasCollection(ctx, &reqHasCollection)
assert.Nil(t, err)
assert.NotEqual(t, boolResp.Status.ErrorCode, commonpb.ErrorCode_SUCCESS)
// ShowCollection
reqShowCollection := internalpb.ShowCollectionRequest{
MsgType: internalpb.MsgType_kShowCollections,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
}
listResp, err := cli.ShowCollections(ctx, &reqShowCollection)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode)
assert.Equal(t, 1, len(listResp.Values))
assert.Equal(t, "col1", listResp.Values[0])
reqShowCollection.Timestamp = Timestamp(10)
listResp, err = cli.ShowCollections(ctx, &reqShowCollection)
assert.Nil(t, err)
assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode)
// CreateCollection Test
collMeta, err := svr.mt.GetCollectionByName(sch.Name)
assert.Nil(t, err)
@ -183,7 +203,12 @@ func TestMaster_CreateCollectionTask(t *testing.T) {
assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[0].Value, "col1_f2_iv1")
assert.Equal(t, collMeta.Schema.Fields[1].IndexParams[1].Value, "col1_f2_iv2")
// DescribeCollection
req.Timestamp = Timestamp(10)
st, err = cli.CreateCollection(ctx, &req)
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
// DescribeCollection Test
reqDescribe := &internalpb.DescribeCollectionRequest{
MsgType: internalpb.MsgType_kDescribeCollection,
ReqID: 1,
@ -239,11 +264,6 @@ func TestMaster_CreateCollectionTask(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_SUCCESS, des.Status.ErrorCode)
log.Printf(des.Status.Reason)
req.Timestamp = Timestamp(10)
st, err = cli.CreateCollection(ctx, &req)
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
// ------------------------------DropCollectionTask---------------------------
log.Printf("... [Drop] collection col1\n")
ser := servicepb.CollectionName{CollectionName: "col1"}
@ -272,16 +292,22 @@ func TestMaster_CreateCollectionTask(t *testing.T) {
assert.Equal(t, false, boolResp.Value)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, boolResp.Status.ErrorCode)
// ShowCollections
reqShowCollection.Timestamp = Timestamp(11)
listResp, err = cli.ShowCollections(ctx, &reqShowCollection)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode)
assert.Equal(t, 0, len(listResp.Values))
// Drop again
st, err = cli.DropCollection(ctx, &reqDrop)
assert.Nil(t, err)
assert.NotEqual(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
// Create
// Create "col1"
req.Timestamp = Timestamp(11)
st, err = cli.CreateCollection(ctx, &req)
assert.Nil(t, err)
log.Printf(st.Reason)
assert.Equal(t, st.ErrorCode, commonpb.ErrorCode_SUCCESS)
boolResp, err = cli.HasCollection(ctx, &reqHasCollection)
@ -289,5 +315,28 @@ func TestMaster_CreateCollectionTask(t *testing.T) {
assert.Equal(t, true, boolResp.Value)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, boolResp.Status.ErrorCode)
// Create "col2"
sch.Name = "col2"
schemaBytes, err = proto.Marshal(&sch)
assert.Nil(t, err)
req = internalpb.CreateCollectionRequest{
MsgType: internalpb.MsgType_kCreateCollection,
ReqID: 1,
Timestamp: 11,
ProxyID: 1,
Schema: &commonpb.Blob{Value: schemaBytes},
}
st, err = cli.CreateCollection(ctx, &req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, st.ErrorCode)
// Show Collections
listResp, err = cli.ShowCollections(ctx, &reqShowCollection)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_SUCCESS, listResp.Status.ErrorCode)
assert.Equal(t, 2, len(listResp.Values))
assert.ElementsMatch(t, []string{"col1", "col2"}, listResp.Values)
svr.Close()
}

View File

@ -4,7 +4,6 @@ import (
"context"
"time"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/master/id"
"github.com/zilliztech/milvus-distributed/internal/master/tso"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
@ -158,19 +157,30 @@ func (s *Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollect
stringListResponse: nil,
}
response := &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "",
},
Values: nil,
}
t.(*showCollectionsTask).stringListResponse = response
var err = s.scheduler.Enqueue(t)
if err != nil {
err := errors.New("Enqueue failed")
return t.(*showCollectionsTask).stringListResponse, err
response.Status.Reason = "Enqueue filed: " + err.Error()
return response, nil
}
err = t.WaitToFinish(ctx)
if err != nil {
err := errors.New("WaitToFinish failed")
return t.(*showCollectionsTask).stringListResponse, err
response.Status.Reason = "Show Collections failed: " + err.Error()
return response, nil
}
return t.(*showCollectionsTask).stringListResponse, nil
response.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
return response, nil
}
//////////////////////////////////////////////////////////////////////////