mirror of https://github.com/milvus-io/milvus.git
parent
7c210310dc
commit
3b08162059
|
@ -0,0 +1,82 @@
|
|||
package dataservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/distributed/dataservice"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/master"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
service := dataservice.NewGrpcService(ctx)
|
||||
|
||||
master.Params.Init()
|
||||
client, err := masterservice.NewGrpcClient(fmt.Sprintf("%s:%d", master.Params.Address, master.Params.Port), 30*time.Second)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
log.Println("master client create complete")
|
||||
if err = client.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err = client.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
service.SetMasterClient(client)
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
tctx, tcancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer func() {
|
||||
if err = client.Stop(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ticker.Stop()
|
||||
tcancel()
|
||||
}()
|
||||
|
||||
for {
|
||||
var states *internalpb2.ComponentStates
|
||||
select {
|
||||
case <-ticker.C:
|
||||
states, err = client.GetComponentStates()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
case <-tctx.Done():
|
||||
panic("master timeout")
|
||||
}
|
||||
if states.State.StateCode == internalpb2.StateCode_INITIALIZING || states.State.StateCode == internalpb2.StateCode_HEALTHY {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err = service.Init(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err = service.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sc := make(chan os.Signal)
|
||||
signal.Notify(sc,
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGINT,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGQUIT)
|
||||
<-sc
|
||||
cancel()
|
||||
if err = service.Stop(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
log.Println("shut down data service")
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
package dataservice
|
||||
|
||||
import (
|
||||
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
)
|
||||
|
@ -12,10 +11,10 @@ type allocator interface {
|
|||
}
|
||||
|
||||
type allocatorImpl struct {
|
||||
masterClient *masterservice.GrpcClient
|
||||
masterClient MasterClient
|
||||
}
|
||||
|
||||
func newAllocatorImpl(masterClient *masterservice.GrpcClient) *allocatorImpl {
|
||||
func newAllocatorImpl(masterClient MasterClient) *allocatorImpl {
|
||||
return &allocatorImpl{
|
||||
masterClient: masterClient,
|
||||
}
|
||||
|
@ -27,7 +26,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) {
|
|||
MsgType: commonpb.MsgType_kShowCollections,
|
||||
MsgID: -1, // todo add msg id
|
||||
Timestamp: 0, // todo
|
||||
SourceID: -1, // todo
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
Count: 1,
|
||||
})
|
||||
|
@ -43,7 +42,7 @@ func (allocator *allocatorImpl) allocID() (UniqueID, error) {
|
|||
MsgType: commonpb.MsgType_kShowCollections,
|
||||
MsgID: -1, // todo add msg id
|
||||
Timestamp: 0, // todo
|
||||
SourceID: -1, // todo
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
Count: 1,
|
||||
})
|
||||
|
|
|
@ -89,7 +89,7 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) {
|
|||
MsgType: commonpb.MsgType_kDescribeCollection,
|
||||
MsgID: -1, // todo
|
||||
Timestamp: 0, // todo
|
||||
SourceID: -1, // todo
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
ChannelNames: group,
|
||||
})
|
||||
|
@ -105,7 +105,7 @@ func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, err
|
|||
defer c.mu.RUnlock()
|
||||
ret := make([]*internalpb2.ComponentInfo, 0)
|
||||
for _, node := range c.nodes {
|
||||
states, err := node.client.GetComponentStates(nil)
|
||||
states, err := node.client.GetComponentStates(&commonpb.Empty{})
|
||||
if err != nil {
|
||||
log.Println(err.Error())
|
||||
continue
|
||||
|
|
|
@ -32,6 +32,8 @@ type ParamTable struct {
|
|||
SegmentInfoChannelName string
|
||||
DataServiceSubscriptionName string
|
||||
K2SChannelNames []string
|
||||
|
||||
SegmentFlushMetaPath string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
|
@ -203,3 +205,11 @@ func (p *ParamTable) initK2SChannelNames() {
|
|||
}
|
||||
p.K2SChannelNames = ret
|
||||
}
|
||||
|
||||
func (p *ParamTable) initSegmentFlushMetaPath() {
|
||||
subPath, err := p.Load("etcd.segFlushMetaSubPath")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.SegmentFlushMetaPath = subPath
|
||||
}
|
||||
|
|
|
@ -2,17 +2,23 @@ package dataservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"path"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/util"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||
"github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/timesync"
|
||||
|
@ -45,6 +51,16 @@ type DataService interface {
|
|||
GetComponentStates() (*internalpb2.ComponentStates, error)
|
||||
}
|
||||
|
||||
type MasterClient interface {
|
||||
ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
|
||||
DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
|
||||
ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
|
||||
GetDdChannel() (string, error)
|
||||
AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error)
|
||||
AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error)
|
||||
GetComponentStates() (*internalpb2.ComponentStates, error)
|
||||
}
|
||||
|
||||
type (
|
||||
UniqueID = typeutil.UniqueID
|
||||
Timestamp = typeutil.Timestamp
|
||||
|
@ -53,7 +69,7 @@ type (
|
|||
serverLoopCtx context.Context
|
||||
serverLoopCancel context.CancelFunc
|
||||
serverLoopWg sync.WaitGroup
|
||||
state internalpb2.StateCode
|
||||
state atomic.Value
|
||||
client *etcdkv.EtcdKV
|
||||
meta *meta
|
||||
segAllocator segmentAllocator
|
||||
|
@ -63,7 +79,7 @@ type (
|
|||
cluster *dataNodeCluster
|
||||
msgProducer *timesync.MsgProducer
|
||||
registerFinishCh chan struct{}
|
||||
masterClient *masterservice.GrpcClient
|
||||
masterClient MasterClient
|
||||
ttMsgStream msgstream.MsgStream
|
||||
k2sMsgStream msgstream.MsgStream
|
||||
ddChannelName string
|
||||
|
@ -72,17 +88,21 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) {
|
||||
func CreateServer(ctx context.Context) (*Server, error) {
|
||||
Params.Init()
|
||||
ch := make(chan struct{})
|
||||
return &Server{
|
||||
s := &Server{
|
||||
ctx: ctx,
|
||||
state: internalpb2.StateCode_INITIALIZING,
|
||||
insertChannelMgr: newInsertChannelManager(),
|
||||
registerFinishCh: ch,
|
||||
cluster: newDataNodeCluster(ch),
|
||||
masterClient: client,
|
||||
}, nil
|
||||
}
|
||||
s.state.Store(internalpb2.StateCode_INITIALIZING)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Server) SetMasterClient(masterClient MasterClient) {
|
||||
s.masterClient = masterClient
|
||||
}
|
||||
|
||||
func (s *Server) Init() error {
|
||||
|
@ -109,11 +129,15 @@ func (s *Server) Start() error {
|
|||
}
|
||||
s.startServerLoop()
|
||||
s.waitDataNodeRegister()
|
||||
s.state = internalpb2.StateCode_HEALTHY
|
||||
s.state.Store(internalpb2.StateCode_HEALTHY)
|
||||
log.Println("start success")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) checkStateIsHealthy() bool {
|
||||
return s.state.Load().(internalpb2.StateCode) == internalpb2.StateCode_HEALTHY
|
||||
}
|
||||
|
||||
func (s *Server) initMeta() error {
|
||||
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
|
||||
if err != nil {
|
||||
|
@ -160,12 +184,15 @@ func (s *Server) initMsgProducer() error {
|
|||
|
||||
func (s *Server) loadMetaFromMaster() error {
|
||||
log.Println("loading collection meta from master")
|
||||
if err := s.checkMasterIsHealthy(); err != nil {
|
||||
return err
|
||||
}
|
||||
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_kShowCollections,
|
||||
MsgID: -1, // todo add msg id
|
||||
Timestamp: 0, // todo
|
||||
SourceID: -1, // todo
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
DbName: "",
|
||||
})
|
||||
|
@ -178,7 +205,7 @@ func (s *Server) loadMetaFromMaster() error {
|
|||
MsgType: commonpb.MsgType_kDescribeCollection,
|
||||
MsgID: -1, // todo
|
||||
Timestamp: 0, // todo
|
||||
SourceID: -1, // todo
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
DbName: "",
|
||||
CollectionName: collectionName,
|
||||
|
@ -192,7 +219,7 @@ func (s *Server) loadMetaFromMaster() error {
|
|||
MsgType: commonpb.MsgType_kShowPartitions,
|
||||
MsgID: -1, // todo
|
||||
Timestamp: 0, // todo
|
||||
SourceID: -1, // todo
|
||||
SourceID: Params.NodeID,
|
||||
},
|
||||
DbName: "",
|
||||
CollectionName: collectionName,
|
||||
|
@ -215,6 +242,36 @@ func (s *Server) loadMetaFromMaster() error {
|
|||
log.Println("load collection meta from master complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) checkMasterIsHealthy() error {
|
||||
ticker := time.NewTicker(300 * time.Millisecond)
|
||||
ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
cancel()
|
||||
}()
|
||||
for {
|
||||
var resp *internalpb2.ComponentStates
|
||||
var err error
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("master is not healthy")
|
||||
case <-ticker.C:
|
||||
resp, err = s.masterClient.GetComponentStates()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS {
|
||||
return errors.New(resp.Status.Reason)
|
||||
}
|
||||
}
|
||||
if resp.State.StateCode == internalpb2.StateCode_HEALTHY {
|
||||
break
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) startServerLoop() {
|
||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||
s.serverLoopWg.Add(2)
|
||||
|
@ -308,7 +365,7 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
|
|||
State: &internalpb2.ComponentInfo{
|
||||
NodeID: Params.NodeID,
|
||||
Role: role,
|
||||
StateCode: s.state,
|
||||
StateCode: s.state.Load().(internalpb2.StateCode),
|
||||
},
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
|
@ -361,6 +418,12 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register
|
|||
}
|
||||
|
||||
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
|
||||
if !s.checkStateIsHealthy() {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: "server is initializing",
|
||||
}, nil
|
||||
}
|
||||
s.segAllocator.SealAllSegments(req.CollectionID)
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_SUCCESS,
|
||||
|
@ -374,6 +437,11 @@ func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.Assign
|
|||
},
|
||||
SegIDAssignments: make([]*datapb.SegIDAssignment, 0),
|
||||
}
|
||||
if !s.checkStateIsHealthy() {
|
||||
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
|
||||
resp.Status.Reason = "server is initializing"
|
||||
return resp, nil
|
||||
}
|
||||
for _, r := range req.SegIDRequests {
|
||||
result := &datapb.SegIDAssignment{
|
||||
Status: &commonpb.Status{
|
||||
|
@ -460,6 +528,14 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
|
|||
}
|
||||
|
||||
func (s *Server) ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
|
||||
if !s.checkStateIsHealthy() {
|
||||
return &datapb.ShowSegmentResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
Reason: "server is initializing",
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
ids := s.meta.GetSegmentsByCollectionAndPartitionID(req.CollectionID, req.PartitionID)
|
||||
return &datapb.ShowSegmentResponse{SegmentIDs: ids}, nil
|
||||
}
|
||||
|
@ -470,6 +546,10 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg
|
|||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
},
|
||||
}
|
||||
if !s.checkStateIsHealthy() {
|
||||
resp.Status.Reason = "server is initializing"
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
segmentInfo, err := s.meta.GetSegment(req.SegmentID)
|
||||
if err != nil {
|
||||
|
@ -486,10 +566,39 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg
|
|||
}
|
||||
|
||||
func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
|
||||
panic("implement me")
|
||||
// todo
|
||||
resp := &datapb.InsertBinlogPathsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
|
||||
},
|
||||
}
|
||||
p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10))
|
||||
value, err := s.client.Load(p)
|
||||
if err != nil {
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
flushMeta := &datapb.SegmentFlushMeta{}
|
||||
err = proto.UnmarshalText(value, flushMeta)
|
||||
if err != nil {
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
fields := make([]UniqueID, len(flushMeta.Fields))
|
||||
paths := make([]*internalpb2.StringList, len(flushMeta.Fields))
|
||||
for _, field := range flushMeta.Fields {
|
||||
fields = append(fields, field.FieldID)
|
||||
paths = append(paths, &internalpb2.StringList{Values: field.BinlogPaths})
|
||||
}
|
||||
resp.FieldIDs = fields
|
||||
resp.Paths = paths
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) {
|
||||
if !s.checkStateIsHealthy() {
|
||||
return nil, errors.New("server is initializing")
|
||||
}
|
||||
contains, ret := s.insertChannelMgr.ContainsCollection(req.CollectionID)
|
||||
if contains {
|
||||
return ret, nil
|
||||
|
|
|
@ -2,9 +2,9 @@ package dataservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/distributed/masterservice"
|
||||
|
||||
|
@ -22,52 +22,37 @@ import (
|
|||
type Service struct {
|
||||
server *dataservice.Server
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
grpcServer *grpc.Server
|
||||
masterClient *masterservice.GrpcClient
|
||||
}
|
||||
|
||||
func NewGrpcService() {
|
||||
func NewGrpcService(ctx context.Context) *Service {
|
||||
s := &Service{}
|
||||
var err error
|
||||
s.ctx, s.cancel = context.WithCancel(context.Background())
|
||||
if err = s.connectMaster(); err != nil {
|
||||
log.Fatal("connect to master" + err.Error())
|
||||
}
|
||||
s.server, err = dataservice.CreateServer(s.ctx, s.masterClient)
|
||||
s.ctx = ctx
|
||||
s.server, err = dataservice.CreateServer(s.ctx)
|
||||
if err != nil {
|
||||
log.Fatalf("create server error: %s", err.Error())
|
||||
return
|
||||
return nil
|
||||
}
|
||||
s.grpcServer = grpc.NewServer()
|
||||
datapb.RegisterDataServiceServer(s.grpcServer, s)
|
||||
lis, err := net.Listen("tcp", "localhost:11111") // todo address
|
||||
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port))
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
return
|
||||
return nil
|
||||
}
|
||||
if err = s.grpcServer.Serve(lis); err != nil {
|
||||
log.Fatal(err.Error())
|
||||
return
|
||||
return nil
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Service) connectMaster() error {
|
||||
log.Println("connecting to master")
|
||||
master, err := masterservice.NewGrpcClient("localhost:10101", 30*time.Second) // todo address
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = master.Init(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = master.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.masterClient = master
|
||||
log.Println("connect to master success")
|
||||
return nil
|
||||
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
|
||||
s.server.SetMasterClient(masterClient)
|
||||
}
|
||||
|
||||
func (s *Service) Init() error {
|
||||
return s.server.Init()
|
||||
}
|
||||
|
@ -79,7 +64,6 @@ func (s *Service) Start() error {
|
|||
func (s *Service) Stop() error {
|
||||
err := s.server.Stop()
|
||||
s.grpcServer.GracefulStop()
|
||||
s.cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue