From 3b081620595adc6e49150f4490145a5e3d2e2f8c Mon Sep 17 00:00:00 2001 From: sunby Date: Tue, 26 Jan 2021 15:14:49 +0800 Subject: [PATCH] Add main function Signed-off-by: sunby --- cmd/dataservice/main.go | 82 +++++++++++ internal/dataservice/allocator.go | 9 +- internal/dataservice/cluster.go | 4 +- internal/dataservice/param.go | 10 ++ internal/dataservice/server.go | 139 ++++++++++++++++-- .../distributed/dataservice/grpc_service.go | 40 ++--- 6 files changed, 234 insertions(+), 50 deletions(-) create mode 100644 cmd/dataservice/main.go diff --git a/cmd/dataservice/main.go b/cmd/dataservice/main.go new file mode 100644 index 0000000000..ac07eb9ecf --- /dev/null +++ b/cmd/dataservice/main.go @@ -0,0 +1,82 @@ +package dataservice + +import ( + "context" + "fmt" + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/zilliztech/milvus-distributed/internal/distributed/dataservice" + + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + + "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" + "github.com/zilliztech/milvus-distributed/internal/master" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + + service := dataservice.NewGrpcService(ctx) + + master.Params.Init() + client, err := masterservice.NewGrpcClient(fmt.Sprintf("%s:%d", master.Params.Address, master.Params.Port), 30*time.Second) + if err != nil { + panic(err) + } + log.Println("master client create complete") + if err = client.Init(); err != nil { + panic(err) + } + if err = client.Start(); err != nil { + panic(err) + } + service.SetMasterClient(client) + ticker := time.NewTicker(500 * time.Millisecond) + tctx, tcancel := context.WithTimeout(ctx, 30*time.Second) + defer func() { + if err = client.Stop(); err != nil { + panic(err) + } + ticker.Stop() + tcancel() + }() + + for { + var states *internalpb2.ComponentStates + select { + case <-ticker.C: + states, err = client.GetComponentStates() + if err != nil { + continue + } + case <-tctx.Done(): + panic("master timeout") + } + if states.State.StateCode == internalpb2.StateCode_INITIALIZING || states.State.StateCode == internalpb2.StateCode_HEALTHY { + break + } + } + + if err = service.Init(); err != nil { + panic(err) + } + if err = service.Start(); err != nil { + panic(err) + } + sc := make(chan os.Signal) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + <-sc + cancel() + if err = service.Stop(); err != nil { + panic(err) + } + log.Println("shut down data service") +} diff --git a/internal/dataservice/allocator.go b/internal/dataservice/allocator.go index b3b4b37308..66f23f048e 100644 --- a/internal/dataservice/allocator.go +++ b/internal/dataservice/allocator.go @@ -1,7 +1,6 @@ package dataservice import ( - "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" ) @@ -12,10 +11,10 @@ type allocator interface { } type allocatorImpl struct { - masterClient *masterservice.GrpcClient + masterClient MasterClient } -func newAllocatorImpl(masterClient *masterservice.GrpcClient) *allocatorImpl { +func newAllocatorImpl(masterClient MasterClient) *allocatorImpl { return &allocatorImpl{ masterClient: masterClient, } @@ -27,7 +26,7 @@ func (allocator *allocatorImpl) allocTimestamp() (Timestamp, error) { MsgType: commonpb.MsgType_kShowCollections, MsgID: -1, // todo add msg id Timestamp: 0, // todo - SourceID: -1, // todo + SourceID: Params.NodeID, }, Count: 1, }) @@ -43,7 +42,7 @@ func (allocator *allocatorImpl) allocID() (UniqueID, error) { MsgType: commonpb.MsgType_kShowCollections, MsgID: -1, // todo add msg id Timestamp: 0, // todo - SourceID: -1, // todo + SourceID: Params.NodeID, }, Count: 1, }) diff --git a/internal/dataservice/cluster.go b/internal/dataservice/cluster.go index aea5cf0765..ef74df537c 100644 --- a/internal/dataservice/cluster.go +++ b/internal/dataservice/cluster.go @@ -89,7 +89,7 @@ func (c *dataNodeCluster) WatchInsertChannels(groups []channelGroup) { MsgType: commonpb.MsgType_kDescribeCollection, MsgID: -1, // todo Timestamp: 0, // todo - SourceID: -1, // todo + SourceID: Params.NodeID, }, ChannelNames: group, }) @@ -105,7 +105,7 @@ func (c *dataNodeCluster) GetDataNodeStates() ([]*internalpb2.ComponentInfo, err defer c.mu.RUnlock() ret := make([]*internalpb2.ComponentInfo, 0) for _, node := range c.nodes { - states, err := node.client.GetComponentStates(nil) + states, err := node.client.GetComponentStates(&commonpb.Empty{}) if err != nil { log.Println(err.Error()) continue diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 0e381ed15f..9a574924a8 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -32,6 +32,8 @@ type ParamTable struct { SegmentInfoChannelName string DataServiceSubscriptionName string K2SChannelNames []string + + SegmentFlushMetaPath string } var Params ParamTable @@ -203,3 +205,11 @@ func (p *ParamTable) initK2SChannelNames() { } p.K2SChannelNames = ret } + +func (p *ParamTable) initSegmentFlushMetaPath() { + subPath, err := p.Load("etcd.segFlushMetaSubPath") + if err != nil { + panic(err) + } + p.SegmentFlushMetaPath = subPath +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index f3c07d99f7..a478bbc367 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -2,17 +2,23 @@ package dataservice import ( "context" + "errors" "fmt" "log" + "path" + "strconv" "sync" + "sync/atomic" + "time" + + "github.com/golang/protobuf/proto" + "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/msgstream/util" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" - "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" "github.com/zilliztech/milvus-distributed/internal/timesync" @@ -45,6 +51,16 @@ type DataService interface { GetComponentStates() (*internalpb2.ComponentStates, error) } +type MasterClient interface { + ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error) + DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) + ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) + GetDdChannel() (string, error) + AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error) + AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error) + GetComponentStates() (*internalpb2.ComponentStates, error) +} + type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp @@ -53,7 +69,7 @@ type ( serverLoopCtx context.Context serverLoopCancel context.CancelFunc serverLoopWg sync.WaitGroup - state internalpb2.StateCode + state atomic.Value client *etcdkv.EtcdKV meta *meta segAllocator segmentAllocator @@ -63,7 +79,7 @@ type ( cluster *dataNodeCluster msgProducer *timesync.MsgProducer registerFinishCh chan struct{} - masterClient *masterservice.GrpcClient + masterClient MasterClient ttMsgStream msgstream.MsgStream k2sMsgStream msgstream.MsgStream ddChannelName string @@ -72,17 +88,21 @@ type ( } ) -func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) { +func CreateServer(ctx context.Context) (*Server, error) { Params.Init() ch := make(chan struct{}) - return &Server{ + s := &Server{ ctx: ctx, - state: internalpb2.StateCode_INITIALIZING, insertChannelMgr: newInsertChannelManager(), registerFinishCh: ch, cluster: newDataNodeCluster(ch), - masterClient: client, - }, nil + } + s.state.Store(internalpb2.StateCode_INITIALIZING) + return s, nil +} + +func (s *Server) SetMasterClient(masterClient MasterClient) { + s.masterClient = masterClient } func (s *Server) Init() error { @@ -109,11 +129,15 @@ func (s *Server) Start() error { } s.startServerLoop() s.waitDataNodeRegister() - s.state = internalpb2.StateCode_HEALTHY + s.state.Store(internalpb2.StateCode_HEALTHY) log.Println("start success") return nil } +func (s *Server) checkStateIsHealthy() bool { + return s.state.Load().(internalpb2.StateCode) == internalpb2.StateCode_HEALTHY +} + func (s *Server) initMeta() error { etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) if err != nil { @@ -160,12 +184,15 @@ func (s *Server) initMsgProducer() error { func (s *Server) loadMetaFromMaster() error { log.Println("loading collection meta from master") + if err := s.checkMasterIsHealthy(); err != nil { + return err + } collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, MsgID: -1, // todo add msg id Timestamp: 0, // todo - SourceID: -1, // todo + SourceID: Params.NodeID, }, DbName: "", }) @@ -178,7 +205,7 @@ func (s *Server) loadMetaFromMaster() error { MsgType: commonpb.MsgType_kDescribeCollection, MsgID: -1, // todo Timestamp: 0, // todo - SourceID: -1, // todo + SourceID: Params.NodeID, }, DbName: "", CollectionName: collectionName, @@ -192,7 +219,7 @@ func (s *Server) loadMetaFromMaster() error { MsgType: commonpb.MsgType_kShowPartitions, MsgID: -1, // todo Timestamp: 0, // todo - SourceID: -1, // todo + SourceID: Params.NodeID, }, DbName: "", CollectionName: collectionName, @@ -215,6 +242,36 @@ func (s *Server) loadMetaFromMaster() error { log.Println("load collection meta from master complete") return nil } + +func (s *Server) checkMasterIsHealthy() error { + ticker := time.NewTicker(300 * time.Millisecond) + ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second) + defer func() { + ticker.Stop() + cancel() + }() + for { + var resp *internalpb2.ComponentStates + var err error + select { + case <-ctx.Done(): + return fmt.Errorf("master is not healthy") + case <-ticker.C: + resp, err = s.masterClient.GetComponentStates() + if err != nil { + return err + } + if resp.Status.ErrorCode != commonpb.ErrorCode_SUCCESS { + return errors.New(resp.Status.Reason) + } + } + if resp.State.StateCode == internalpb2.StateCode_HEALTHY { + break + } + } + return nil +} + func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) s.serverLoopWg.Add(2) @@ -308,7 +365,7 @@ func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { State: &internalpb2.ComponentInfo{ NodeID: Params.NodeID, Role: role, - StateCode: s.state, + StateCode: s.state.Load().(internalpb2.StateCode), }, Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, @@ -361,6 +418,12 @@ func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.Register } func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) { + if !s.checkStateIsHealthy() { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "server is initializing", + }, nil + } s.segAllocator.SealAllSegments(req.CollectionID) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -374,6 +437,11 @@ func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.Assign }, SegIDAssignments: make([]*datapb.SegIDAssignment, 0), } + if !s.checkStateIsHealthy() { + resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR + resp.Status.Reason = "server is initializing" + return resp, nil + } for _, r := range req.SegIDRequests { result := &datapb.SegIDAssignment{ Status: &commonpb.Status{ @@ -460,6 +528,14 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha } func (s *Server) ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) { + if !s.checkStateIsHealthy() { + return &datapb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + Reason: "server is initializing", + }, + }, nil + } ids := s.meta.GetSegmentsByCollectionAndPartitionID(req.CollectionID, req.PartitionID) return &datapb.ShowSegmentResponse{SegmentIDs: ids}, nil } @@ -470,6 +546,10 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, } + if !s.checkStateIsHealthy() { + resp.Status.Reason = "server is initializing" + return resp, nil + } segmentInfo, err := s.meta.GetSegment(req.SegmentID) if err != nil { @@ -486,10 +566,39 @@ func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.Seg } func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) { - panic("implement me") + // todo + resp := &datapb.InsertBinlogPathsResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, + }, + } + p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10)) + value, err := s.client.Load(p) + if err != nil { + resp.Status.Reason = err.Error() + return resp, nil + } + flushMeta := &datapb.SegmentFlushMeta{} + err = proto.UnmarshalText(value, flushMeta) + if err != nil { + resp.Status.Reason = err.Error() + return resp, nil + } + fields := make([]UniqueID, len(flushMeta.Fields)) + paths := make([]*internalpb2.StringList, len(flushMeta.Fields)) + for _, field := range flushMeta.Fields { + fields = append(fields, field.FieldID) + paths = append(paths, &internalpb2.StringList{Values: field.BinlogPaths}) + } + resp.FieldIDs = fields + resp.Paths = paths + return resp, nil } func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) ([]string, error) { + if !s.checkStateIsHealthy() { + return nil, errors.New("server is initializing") + } contains, ret := s.insertChannelMgr.ContainsCollection(req.CollectionID) if contains { return ret, nil diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index 37a1edf2d8..67c20bb458 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -2,9 +2,9 @@ package dataservice import ( "context" + "fmt" "log" "net" - "time" "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" @@ -22,52 +22,37 @@ import ( type Service struct { server *dataservice.Server ctx context.Context - cancel context.CancelFunc grpcServer *grpc.Server masterClient *masterservice.GrpcClient } -func NewGrpcService() { +func NewGrpcService(ctx context.Context) *Service { s := &Service{} var err error - s.ctx, s.cancel = context.WithCancel(context.Background()) - if err = s.connectMaster(); err != nil { - log.Fatal("connect to master" + err.Error()) - } - s.server, err = dataservice.CreateServer(s.ctx, s.masterClient) + s.ctx = ctx + s.server, err = dataservice.CreateServer(s.ctx) if err != nil { log.Fatalf("create server error: %s", err.Error()) - return + return nil } s.grpcServer = grpc.NewServer() datapb.RegisterDataServiceServer(s.grpcServer, s) - lis, err := net.Listen("tcp", "localhost:11111") // todo address + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port)) if err != nil { log.Fatal(err.Error()) - return + return nil } if err = s.grpcServer.Serve(lis); err != nil { log.Fatal(err.Error()) - return + return nil } + return s } -func (s *Service) connectMaster() error { - log.Println("connecting to master") - master, err := masterservice.NewGrpcClient("localhost:10101", 30*time.Second) // todo address - if err != nil { - return err - } - if err = master.Init(); err != nil { - return err - } - if err = master.Start(); err != nil { - return err - } - s.masterClient = master - log.Println("connect to master success") - return nil +func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) { + s.server.SetMasterClient(masterClient) } + func (s *Service) Init() error { return s.server.Init() } @@ -79,7 +64,6 @@ func (s *Service) Start() error { func (s *Service) Stop() error { err := s.server.Stop() s.grpcServer.GracefulStop() - s.cancel() return err }