From 8c54af292e8af5196db51a293d94963a62e1bfaf Mon Sep 17 00:00:00 2001 From: neza2017 Date: Tue, 26 Jan 2021 17:55:15 +0800 Subject: [PATCH] Add dd handler Signed-off-by: neza2017 --- internal/dataservice/dd_handler.go | 89 +++++++++++++++++++ internal/dataservice/server.go | 81 ++++++++++++----- .../distributed/dataservice/grpc_service.go | 16 ++-- 3 files changed, 154 insertions(+), 32 deletions(-) create mode 100644 internal/dataservice/dd_handler.go diff --git a/internal/dataservice/dd_handler.go b/internal/dataservice/dd_handler.go new file mode 100644 index 0000000000..c3f001067d --- /dev/null +++ b/internal/dataservice/dd_handler.go @@ -0,0 +1,89 @@ +package dataservice + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + + "github.com/zilliztech/milvus-distributed/internal/msgstream" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/schemapb" +) + +type ddHandler struct { + meta *meta + segmentAllocator segmentAllocator +} + +func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler { + return &ddHandler{ + meta: meta, + segmentAllocator: allocator, + } +} + +func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error { + switch msg.Type() { + case commonpb.MsgType_kCreateCollection: + realMsg := msg.(*msgstream.CreateCollectionMsg) + return handler.handleCreateCollection(realMsg) + case commonpb.MsgType_kDropCollection: + realMsg := msg.(*msgstream.DropCollectionMsg) + return handler.handleDropCollection(realMsg) + case commonpb.MsgType_kCreatePartition: + realMsg := msg.(*msgstream.CreatePartitionMsg) + return handler.handleCreatePartition(realMsg) + case commonpb.MsgType_kDropPartition: + realMsg := msg.(*msgstream.DropPartitionMsg) + return handler.handleDropPartition(realMsg) + default: + return fmt.Errorf("unknown msg type: %v", msg.Type()) + } +} + +func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollectionMsg) error { + schema := &schemapb.CollectionSchema{} + if err := proto.Unmarshal(msg.Schema, schema); err != nil { + return err + } + err := handler.meta.AddCollection(&collectionInfo{ + ID: msg.CollectionID, + Schema: schema, + }) + if err != nil { + return err + } + return nil +} + +func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error { + ids := handler.meta.GetSegmentsByCollectionID(msg.CollectionID) + for _, id := range ids { + if err := handler.meta.DropSegment(id); err != nil { + continue + } + handler.segmentAllocator.DropSegment(id) + } + if err := handler.meta.DropCollection(msg.CollectionID); err != nil { + return err + } + return nil +} + +func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error { + ids := handler.meta.GetSegmentsByCollectionAndPartitionID(msg.CollectionID, msg.PartitionID) + for _, id := range ids { + if err := handler.meta.DropSegment(id); err != nil { + return err + } + handler.segmentAllocator.DropSegment(id) + } + if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil { + return err + } + return nil +} + +func (handler *ddHandler) handleCreatePartition(msg *msgstream.CreatePartitionMsg) error { + return handler.meta.AddPartition(msg.CollectionID, msg.PartitionID) +} diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index a478bbc367..4844b84110 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -65,26 +65,26 @@ type ( UniqueID = typeutil.UniqueID Timestamp = typeutil.Timestamp Server struct { - ctx context.Context - serverLoopCtx context.Context - serverLoopCancel context.CancelFunc - serverLoopWg sync.WaitGroup - state atomic.Value - client *etcdkv.EtcdKV - meta *meta - segAllocator segmentAllocator - statsHandler *statsHandler - insertChannelMgr *insertChannelManager - allocator allocator - cluster *dataNodeCluster - msgProducer *timesync.MsgProducer - registerFinishCh chan struct{} - masterClient MasterClient - ttMsgStream msgstream.MsgStream - k2sMsgStream msgstream.MsgStream - ddChannelName string - segmentInfoStream msgstream.MsgStream - segmentFlushStream msgstream.MsgStream + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel context.CancelFunc + serverLoopWg sync.WaitGroup + state atomic.Value + client *etcdkv.EtcdKV + meta *meta + segAllocator segmentAllocator + statsHandler *statsHandler + ddHandler *ddHandler + insertChannelMgr *insertChannelManager + allocator allocator + cluster *dataNodeCluster + msgProducer *timesync.MsgProducer + registerFinishCh chan struct{} + masterClient MasterClient + ttMsgStream msgstream.MsgStream + k2sMsgStream msgstream.MsgStream + ddChannelName string + segmentInfoStream msgstream.MsgStream } ) @@ -97,7 +97,6 @@ func CreateServer(ctx context.Context) (*Server, error) { registerFinishCh: ch, cluster: newDataNodeCluster(ch), } - s.state.Store(internalpb2.StateCode_INITIALIZING) return s, nil } @@ -106,6 +105,7 @@ func (s *Server) SetMasterClient(masterClient MasterClient) { } func (s *Server) Init() error { + s.state.Store(internalpb2.StateCode_INITIALIZING) return nil } @@ -120,6 +120,7 @@ func (s *Server) Start() error { if err != nil { return err } + s.ddHandler = newDDHandler(s.meta, s.segAllocator) s.initSegmentInfoChannel() if err = s.initMsgProducer(); err != nil { return err @@ -187,6 +188,13 @@ func (s *Server) loadMetaFromMaster() error { if err := s.checkMasterIsHealthy(); err != nil { return err } + if s.ddChannelName == "" { + channel, err := s.masterClient.GetDdChannel() + if err != nil { + return err + } + s.ddChannelName = channel + } collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kShowCollections, @@ -274,9 +282,10 @@ func (s *Server) checkMasterIsHealthy() error { func (s *Server) startServerLoop() { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - s.serverLoopWg.Add(2) + s.serverLoopWg.Add(3) go s.startStatsChannel(s.serverLoopCtx) go s.startSegmentFlushChannel(s.serverLoopCtx) + go s.startDDChannel(s.serverLoopCtx) } func (s *Server) startStatsChannel(ctx context.Context) { @@ -340,6 +349,30 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) { } } +func (s *Server) startDDChannel(ctx context.Context) { + defer s.serverLoopWg.Done() + ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + ddStream.SetPulsarClient(Params.PulsarAddress) + ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024) + ddStream.Start() + defer ddStream.Close() + for { + select { + case <-ctx.Done(): + log.Println("dd channel shut down") + return + default: + } + msgPack := ddStream.Consume() + for _, msg := range msgPack.Msgs { + if err := s.ddHandler.HandleDDMsg(msg); err != nil { + log.Println(err.Error()) + continue + } + } + } +} + func (s *Server) waitDataNodeRegister() { log.Println("waiting data node to register") <-s.registerFinishCh @@ -512,8 +545,8 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kSegmentInfo, MsgID: 0, - Timestamp: 0, // todo - SourceID: 0, + Timestamp: 0, + SourceID: Params.NodeID, }, Segment: segmentInfo, }, diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index 67c20bb458..cf3019dffb 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -35,6 +35,14 @@ func NewGrpcService(ctx context.Context) *Service { log.Fatalf("create server error: %s", err.Error()) return nil } + return s +} + +func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) { + s.server.SetMasterClient(masterClient) +} + +func (s *Service) Init() error { s.grpcServer = grpc.NewServer() datapb.RegisterDataServiceServer(s.grpcServer, s) lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port)) @@ -46,14 +54,6 @@ func NewGrpcService(ctx context.Context) *Service { log.Fatal(err.Error()) return nil } - return s -} - -func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) { - s.server.SetMasterClient(masterClient) -} - -func (s *Service) Init() error { return s.server.Init() }