Add collection creation, and refactor master

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/4973/head^2
bigsheeper 2020-10-28 15:38:24 +08:00 committed by yefu.chen
parent 1cfc6c7f86
commit 2df04b6947
7 changed files with 373 additions and 167 deletions

View File

@ -0,0 +1,110 @@
package master
import (
"encoding/json"
"errors"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"log"
"strconv"
)
const collectionMetaPrefix = "collection/"
type createCollectionTask struct {
baseTask
req *internalpb.CreateCollectionRequest
}
type dropCollectionTask struct {
baseTask
req *internalpb.DropCollectionRequest
}
func (t *createCollectionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *createCollectionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *createCollectionTask) Execute() commonpb.Status {
var schema schemapb.CollectionSchema
err0 := json.Unmarshal(t.req.Schema.Value, &schema)
if err0 != nil {
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Unmarshal CollectionSchema failed",
}
}
// TODO: allocate collection id
var collectionId uint64 = 0
// TODO: allocate timestamp
var collectionCreateTime uint64 = 0
collection := etcdpb.CollectionMeta{
Id: collectionId,
Schema: &schema,
CreateTime: collectionCreateTime,
// TODO: initial segment?
SegmentIds: make([]uint64, 0),
// TODO: initial partition?
PartitionTags: make([]string, 0),
}
collectionJson, err1 := json.Marshal(&collection)
if err1 != nil {
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Marshal collection failed",
}
}
err2 := (*t.kvBase).Save(collectionMetaPrefix+strconv.FormatUint(collectionId, 10), string(collectionJson))
if err2 != nil {
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Save collection failed",
}
}
t.Notify()
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
func (t *dropCollectionTask) Type() internalpb.ReqType {
if t.req == nil {
log.Printf("null request")
return 0
}
return t.req.ReqType
}
func (t *dropCollectionTask) Ts() (Timestamp, error) {
if t.req == nil {
return 0, errors.New("null request")
}
return Timestamp(t.req.Timestamp), nil
}
func (t *dropCollectionTask) Execute() commonpb.Status {
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}

View File

@ -1,156 +0,0 @@
package grpc
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"net"
"strconv"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"google.golang.org/grpc"
)
func Server(ch chan *schemapb.CollectionSchema, errch chan error, kvbase kv.Base) {
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
// log.Fatal("failed to listen: %v", err)
errch <- err
return
}
s := grpc.NewServer()
masterpb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch, kvbase: kvbase})
if err := s.Serve(lis); err != nil {
// log.Fatalf("failed to serve: %v", err)
errch <- err
return
}
}
type GRPCMasterServer struct {
CreateRequest chan *schemapb.CollectionSchema
kvbase kv.Base
}
func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms GRPCMasterServer) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms GRPCMasterServer) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
Value: true,
},nil
}
func (ms GRPCMasterServer) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) {
return &servicepb.CollectionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
},nil
}
func (ms GRPCMasterServer) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
},nil
}
func (ms GRPCMasterServer) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms GRPCMasterServer) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms GRPCMasterServer) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
Value: true,
},nil
}
func (ms GRPCMasterServer) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) {
return &servicepb.PartitionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
},nil
}
func (ms GRPCMasterServer) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
},nil
}
//func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) {
// // ms.CreateRequest <- in2
// fmt.Println("Handle a new create collection request")
// err := controller.WriteCollection2Datastore(in, ms.kvbase)
// if err != nil {
// return &messagepb.Status{
// ErrorCode: 100,
// Reason: "",
// }, err
// }
// return &messagepb.Status{
// ErrorCode: 0,
// Reason: "",
// }, nil
//}
//func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*messagepb.Status, error) {
// fmt.Println("Handle a new create index request")
// err := controller.UpdateCollectionIndex(in, ms.kvbase)
// if err != nil {
// return &messagepb.Status{
// ErrorCode: 100,
// Reason: "",
// }, err
// }
// return &messagepb.Status{
// ErrorCode: 0,
// Reason: "",
// }, nil
//}

View File

@ -0,0 +1,149 @@
package master
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/proto/servicepb"
"google.golang.org/grpc"
"net"
"strconv"
)
func Server(ch chan *schemapb.CollectionSchema, errch chan error, kvbase kv.Base) {
defaultGRPCPort := ":"
defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10)
lis, err := net.Listen("tcp", defaultGRPCPort)
if err != nil {
// log.Fatal("failed to listen: %v", err)
errch <- err
return
}
s := grpc.NewServer()
masterpb.RegisterMasterServer(s, Master{CreateRequest: ch, kvBase: kvbase})
if err := s.Serve(lis); err != nil {
// log.Fatalf("failed to serve: %v", err)
errch <- err
return
}
}
type Master struct {
CreateRequest chan *schemapb.CollectionSchema
kvBase kv.Base
scheduler *ddRequestScheduler
mt metaTable
}
func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) {
var t task = &createCollectionTask{
req: in,
baseTask: baseTask{
kvBase: &ms.kvBase,
mt: &ms.mt,
cv: make(chan int),
},
}
var status = ms.scheduler.Enqueue(&t)
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err := errors.New("Enqueue failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "Enqueue failed",
}, err
}
status = t.WaitToFinish(ctx)
if status.ErrorCode != commonpb.ErrorCode_SUCCESS {
err := errors.New("WaitToFinish failed")
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
Reason: "WaitToFinish failed",
}, err
}
return &status, nil
}
func (ms Master) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms Master) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
Value: true,
}, nil
}
func (ms Master) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) {
return &servicepb.CollectionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
}, nil
}
func (ms Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
}, nil
}
func (ms Master) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms Master) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) {
return &commonpb.Status{
ErrorCode: 0,
Reason: "",
}, nil
}
func (ms Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) {
return &servicepb.BoolResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
Value: true,
}, nil
}
func (ms Master) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) {
return &servicepb.PartitionDescription{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
}, nil
}
func (ms Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) {
return &servicepb.StringListResponse{
Status: &commonpb.Status{
ErrorCode: 0,
Reason: "",
},
}, nil
}

View File

@ -0,0 +1,26 @@
package master
import (
"github.com/zilliztech/milvus-distributed/internal/errors"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
)
type metaTable struct {
client kv.Base // client of a reliable kv service, i.e. etcd client
rootPath string // this metaTable's working root path on the reliable kv service
tenantMeta map[int64]etcdpb.TenantMeta // tenant id to tenant meta
proxyMeta map[int64]etcdpb.ProxyMeta // proxy id to proxy meta
collMeta map[int64]etcdpb.CollectionMeta // collection id to collection meta
segMeta map[int64]etcdpb.SegmentMeta // segment id to segment meta
}
func (mt *metaTable) getCollectionMetaByName(name string) (*etcdpb.CollectionMeta, error) {
for _, v := range mt.collMeta {
if v.Schema.Name == name {
return &v, nil
}
}
return nil, errors.New("Cannot found collection: " + name)
}

View File

@ -0,0 +1,30 @@
package master
import (
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
)
type ddRequestScheduler struct {
reqQueue chan *task
}
func NewDDRequestScheduler() *ddRequestScheduler {
const channelSize = 1024
rs := ddRequestScheduler{
reqQueue: make(chan *task, channelSize),
}
return &rs
}
func (rs *ddRequestScheduler) Enqueue(task *task) commonpb.Status {
rs.reqQueue <- task
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
func (rs *ddRequestScheduler) schedule() *task {
t := <- rs.reqQueue
return t
}

View File

@ -7,27 +7,26 @@ import (
"github.com/zilliztech/milvus-distributed/internal/conf"
"github.com/zilliztech/milvus-distributed/internal/master/controller"
milvusgrpc "github.com/zilliztech/milvus-distributed/internal/master/grpc"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
"go.etcd.io/etcd/clientv3"
)
func Run() {
kvbase := newKvBase()
kvBase := newKvBase()
collectionChan := make(chan *schemapb.CollectionSchema)
defer close(collectionChan)
errorch := make(chan error)
defer close(errorch)
errorCh := make(chan error)
defer close(errorCh)
go milvusgrpc.Server(collectionChan, errorch, kvbase)
go controller.SegmentStatsController(kvbase, errorch)
go controller.CollectionController(collectionChan, kvbase, errorch)
go Server(collectionChan, errorCh, kvBase)
go controller.SegmentStatsController(kvBase, errorCh)
go controller.CollectionController(collectionChan, kvBase, errorCh)
//go timetick.TimeTickService()
for {
for v := range errorch {
for v := range errorCh {
log.Fatal(v)
}
}
@ -42,6 +41,6 @@ func newKvBase() kv.Base {
DialTimeout: 5 * time.Second,
})
// defer cli.Close()
kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
return kvbase
kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath)
return kvBase
}

48
internal/master/task.go Normal file
View File

@ -0,0 +1,48 @@
package master
import (
"context"
"github.com/zilliztech/milvus-distributed/internal/master/kv"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
)
// TODO: get timestamp from timestampOracle
type Timestamp uint64
type baseTask struct {
kvBase *kv.Base
mt *metaTable
cv chan int
}
type task interface {
Type() internalpb.ReqType
Ts() (Timestamp, error)
Execute() commonpb.Status
WaitToFinish(ctx context.Context) commonpb.Status
Notify() commonpb.Status
}
func (bt *baseTask) Notify() commonpb.Status {
bt.cv <- 0
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
func (bt *baseTask) WaitToFinish(ctx context.Context) commonpb.Status {
for {
select {
case <-ctx.Done():
return commonpb.Status{
// TODO: if to return unexpected error
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
}
case <-bt.cv:
return commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
}
}
}
}