diff --git a/internal/master/collection_task.go b/internal/master/collection_task.go new file mode 100644 index 0000000000..becd0f96d6 --- /dev/null +++ b/internal/master/collection_task.go @@ -0,0 +1,110 @@ +package master + +import ( + "encoding/json" + "errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "log" + "strconv" +) + +const collectionMetaPrefix = "collection/" + +type createCollectionTask struct { + baseTask + req *internalpb.CreateCollectionRequest +} + +type dropCollectionTask struct { + baseTask + req *internalpb.DropCollectionRequest +} + +func (t *createCollectionTask) Type() internalpb.ReqType { + if t.req == nil { + log.Printf("null request") + return 0 + } + return t.req.ReqType +} + +func (t *createCollectionTask) Ts() (Timestamp, error) { + if t.req == nil { + return 0, errors.New("null request") + } + return Timestamp(t.req.Timestamp), nil +} + +func (t *createCollectionTask) Execute() commonpb.Status { + var schema schemapb.CollectionSchema + err0 := json.Unmarshal(t.req.Schema.Value, &schema) + if err0 != nil { + t.Notify() + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "Unmarshal CollectionSchema failed", + } + } + + // TODO: allocate collection id + var collectionId uint64 = 0 + // TODO: allocate timestamp + var collectionCreateTime uint64 = 0 + + collection := etcdpb.CollectionMeta{ + Id: collectionId, + Schema: &schema, + CreateTime: collectionCreateTime, + // TODO: initial segment? + SegmentIds: make([]uint64, 0), + // TODO: initial partition? + PartitionTags: make([]string, 0), + } + + collectionJson, err1 := json.Marshal(&collection) + if err1 != nil { + t.Notify() + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "Marshal collection failed", + } + } + + err2 := (*t.kvBase).Save(collectionMetaPrefix+strconv.FormatUint(collectionId, 10), string(collectionJson)) + if err2 != nil { + t.Notify() + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "Save collection failed", + } + } + + t.Notify() + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } +} + +func (t *dropCollectionTask) Type() internalpb.ReqType { + if t.req == nil { + log.Printf("null request") + return 0 + } + return t.req.ReqType +} + +func (t *dropCollectionTask) Ts() (Timestamp, error) { + if t.req == nil { + return 0, errors.New("null request") + } + return Timestamp(t.req.Timestamp), nil +} + +func (t *dropCollectionTask) Execute() commonpb.Status { + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } +} diff --git a/internal/master/grpc/server.go b/internal/master/grpc/server.go deleted file mode 100644 index 5bb6bd21ee..0000000000 --- a/internal/master/grpc/server.go +++ /dev/null @@ -1,156 +0,0 @@ -package grpc - -import ( - "context" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" - "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" - "net" - "strconv" - - "github.com/zilliztech/milvus-distributed/internal/conf" - "github.com/zilliztech/milvus-distributed/internal/master/kv" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" - "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" - "google.golang.org/grpc" -) - -func Server(ch chan *schemapb.CollectionSchema, errch chan error, kvbase kv.Base) { - defaultGRPCPort := ":" - defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) - lis, err := net.Listen("tcp", defaultGRPCPort) - if err != nil { - // log.Fatal("failed to listen: %v", err) - errch <- err - return - } - s := grpc.NewServer() - masterpb.RegisterMasterServer(s, GRPCMasterServer{CreateRequest: ch, kvbase: kvbase}) - if err := s.Serve(lis); err != nil { - // log.Fatalf("failed to serve: %v", err) - errch <- err - return - } -} - -type GRPCMasterServer struct { - CreateRequest chan *schemapb.CollectionSchema - kvbase kv.Base -} - -func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - -func (ms GRPCMasterServer) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - -func (ms GRPCMasterServer) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) { - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, - Value: true, - },nil -} - -func (ms GRPCMasterServer) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) { - return &servicepb.CollectionDescription{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, - },nil -} - -func (ms GRPCMasterServer) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) { - return &servicepb.StringListResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, - },nil -} - - -func (ms GRPCMasterServer) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - - -func (ms GRPCMasterServer) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) { - return &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, nil -} - -func (ms GRPCMasterServer) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) { - return &servicepb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, - Value: true, - },nil -} - -func (ms GRPCMasterServer) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) { - return &servicepb.PartitionDescription{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, - },nil -} - -func (ms GRPCMasterServer) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) { - return &servicepb.StringListResponse{ - Status: &commonpb.Status{ - ErrorCode: 0, - Reason: "", - }, - },nil -} - -//func (ms GRPCMasterServer) CreateCollection(ctx context.Context, in *messagepb.Mapping) (*messagepb.Status, error) { -// // ms.CreateRequest <- in2 -// fmt.Println("Handle a new create collection request") -// err := controller.WriteCollection2Datastore(in, ms.kvbase) -// if err != nil { -// return &messagepb.Status{ -// ErrorCode: 100, -// Reason: "", -// }, err -// } -// return &messagepb.Status{ -// ErrorCode: 0, -// Reason: "", -// }, nil -//} - -//func (ms GRPCMasterServer) CreateIndex(ctx context.Context, in *messagepb.IndexParam) (*messagepb.Status, error) { -// fmt.Println("Handle a new create index request") -// err := controller.UpdateCollectionIndex(in, ms.kvbase) -// if err != nil { -// return &messagepb.Status{ -// ErrorCode: 100, -// Reason: "", -// }, err -// } -// return &messagepb.Status{ -// ErrorCode: 0, -// Reason: "", -// }, nil -//} diff --git a/internal/master/grpc_service.go b/internal/master/grpc_service.go new file mode 100644 index 0000000000..8e23b161a6 --- /dev/null +++ b/internal/master/grpc_service.go @@ -0,0 +1,149 @@ +package master + +import ( + "context" + "github.com/zilliztech/milvus-distributed/internal/conf" + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" + "github.com/zilliztech/milvus-distributed/internal/proto/servicepb" + "google.golang.org/grpc" + "net" + "strconv" +) + +func Server(ch chan *schemapb.CollectionSchema, errch chan error, kvbase kv.Base) { + defaultGRPCPort := ":" + defaultGRPCPort += strconv.FormatInt(int64(conf.Config.Master.Port), 10) + lis, err := net.Listen("tcp", defaultGRPCPort) + if err != nil { + // log.Fatal("failed to listen: %v", err) + errch <- err + return + } + s := grpc.NewServer() + masterpb.RegisterMasterServer(s, Master{CreateRequest: ch, kvBase: kvbase}) + if err := s.Serve(lis); err != nil { + // log.Fatalf("failed to serve: %v", err) + errch <- err + return + } +} + +type Master struct { + CreateRequest chan *schemapb.CollectionSchema + kvBase kv.Base + scheduler *ddRequestScheduler + mt metaTable +} + +func (ms Master) CreateCollection(ctx context.Context, in *internalpb.CreateCollectionRequest) (*commonpb.Status, error) { + var t task = &createCollectionTask{ + req: in, + baseTask: baseTask{ + kvBase: &ms.kvBase, + mt: &ms.mt, + cv: make(chan int), + }, + } + + var status = ms.scheduler.Enqueue(&t) + if status.ErrorCode != commonpb.ErrorCode_SUCCESS { + err := errors.New("Enqueue failed") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "Enqueue failed", + }, err + } + + status = t.WaitToFinish(ctx) + if status.ErrorCode != commonpb.ErrorCode_SUCCESS { + err := errors.New("WaitToFinish failed") + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "WaitToFinish failed", + }, err + } + + return &status, nil +} + +func (ms Master) DropCollection(ctx context.Context, in *internalpb.DropCollectionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} + +func (ms Master) HasCollection(ctx context.Context, in *internalpb.HasCollectionRequest) (*servicepb.BoolResponse, error) { + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, + Value: true, + }, nil +} + +func (ms Master) DescribeCollection(ctx context.Context, in *internalpb.DescribeCollectionRequest) (*servicepb.CollectionDescription, error) { + return &servicepb.CollectionDescription{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, + }, nil +} + +func (ms Master) ShowCollections(ctx context.Context, in *internalpb.ShowCollectionRequest) (*servicepb.StringListResponse, error) { + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, + }, nil +} + +func (ms Master) CreatePartition(ctx context.Context, in *internalpb.CreatePartitionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} + +func (ms Master) DropPartition(ctx context.Context, in *internalpb.DropPartitionRequest) (*commonpb.Status, error) { + return &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, nil +} + +func (ms Master) HasPartition(ctx context.Context, in *internalpb.HasPartitionRequest) (*servicepb.BoolResponse, error) { + return &servicepb.BoolResponse{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, + Value: true, + }, nil +} + +func (ms Master) DescribePartition(ctx context.Context, in *internalpb.DescribePartitionRequest) (*servicepb.PartitionDescription, error) { + return &servicepb.PartitionDescription{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, + }, nil +} + +func (ms Master) ShowPartitions(ctx context.Context, in *internalpb.ShowPartitionRequest) (*servicepb.StringListResponse, error) { + return &servicepb.StringListResponse{ + Status: &commonpb.Status{ + ErrorCode: 0, + Reason: "", + }, + }, nil +} diff --git a/internal/master/meta_table.go b/internal/master/meta_table.go new file mode 100644 index 0000000000..41cb4faefb --- /dev/null +++ b/internal/master/meta_table.go @@ -0,0 +1,26 @@ +package master + +import ( + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/etcdpb" +) + +type metaTable struct { + client kv.Base // client of a reliable kv service, i.e. etcd client + rootPath string // this metaTable's working root path on the reliable kv service + tenantMeta map[int64]etcdpb.TenantMeta // tenant id to tenant meta + proxyMeta map[int64]etcdpb.ProxyMeta // proxy id to proxy meta + collMeta map[int64]etcdpb.CollectionMeta // collection id to collection meta + segMeta map[int64]etcdpb.SegmentMeta // segment id to segment meta +} + +func (mt *metaTable) getCollectionMetaByName(name string) (*etcdpb.CollectionMeta, error) { + for _, v := range mt.collMeta { + if v.Schema.Name == name { + return &v, nil + } + } + + return nil, errors.New("Cannot found collection: " + name) +} diff --git a/internal/master/scheduler.go b/internal/master/scheduler.go new file mode 100644 index 0000000000..94333478dd --- /dev/null +++ b/internal/master/scheduler.go @@ -0,0 +1,30 @@ +package master + +import ( + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" +) + +type ddRequestScheduler struct { + reqQueue chan *task +} + +func NewDDRequestScheduler() *ddRequestScheduler { + const channelSize = 1024 + + rs := ddRequestScheduler{ + reqQueue: make(chan *task, channelSize), + } + return &rs +} + +func (rs *ddRequestScheduler) Enqueue(task *task) commonpb.Status { + rs.reqQueue <- task + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } +} + +func (rs *ddRequestScheduler) schedule() *task { + t := <- rs.reqQueue + return t +} diff --git a/internal/master/server.go b/internal/master/server.go index 2789778b47..d8f79bf4f5 100644 --- a/internal/master/server.go +++ b/internal/master/server.go @@ -7,27 +7,26 @@ import ( "github.com/zilliztech/milvus-distributed/internal/conf" "github.com/zilliztech/milvus-distributed/internal/master/controller" - milvusgrpc "github.com/zilliztech/milvus-distributed/internal/master/grpc" - "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" "go.etcd.io/etcd/clientv3" ) func Run() { - kvbase := newKvBase() + kvBase := newKvBase() collectionChan := make(chan *schemapb.CollectionSchema) defer close(collectionChan) - errorch := make(chan error) - defer close(errorch) + errorCh := make(chan error) + defer close(errorCh) - go milvusgrpc.Server(collectionChan, errorch, kvbase) - go controller.SegmentStatsController(kvbase, errorch) - go controller.CollectionController(collectionChan, kvbase, errorch) + go Server(collectionChan, errorCh, kvBase) + go controller.SegmentStatsController(kvBase, errorCh) + go controller.CollectionController(collectionChan, kvBase, errorCh) //go timetick.TimeTickService() for { - for v := range errorch { + for v := range errorCh { log.Fatal(v) } } @@ -42,6 +41,6 @@ func newKvBase() kv.Base { DialTimeout: 5 * time.Second, }) // defer cli.Close() - kvbase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) - return kvbase + kvBase := kv.NewEtcdKVBase(cli, conf.Config.Etcd.Rootpath) + return kvBase } diff --git a/internal/master/task.go b/internal/master/task.go new file mode 100644 index 0000000000..825d07122b --- /dev/null +++ b/internal/master/task.go @@ -0,0 +1,48 @@ +package master + +import ( + "context" + "github.com/zilliztech/milvus-distributed/internal/master/kv" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb" +) + +// TODO: get timestamp from timestampOracle +type Timestamp uint64 + +type baseTask struct { + kvBase *kv.Base + mt *metaTable + cv chan int +} + +type task interface { + Type() internalpb.ReqType + Ts() (Timestamp, error) + Execute() commonpb.Status + WaitToFinish(ctx context.Context) commonpb.Status + Notify() commonpb.Status +} + +func (bt *baseTask) Notify() commonpb.Status { + bt.cv <- 0 + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } +} + +func (bt *baseTask) WaitToFinish(ctx context.Context) commonpb.Status { + for { + select { + case <-ctx.Done(): + return commonpb.Status{ + // TODO: if to return unexpected error + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + } + case <-bt.cv: + return commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + } + } + } +}