mirror of https://github.com/milvus-io/milvus.git
parent
c4f4ae8627
commit
8c54af292e
|
@ -0,0 +1,89 @@
|
||||||
|
package dataservice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/golang/protobuf/proto"
|
||||||
|
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/msgstream"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
|
||||||
|
"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ddHandler struct {
|
||||||
|
meta *meta
|
||||||
|
segmentAllocator segmentAllocator
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDDHandler(meta *meta, allocator segmentAllocator) *ddHandler {
|
||||||
|
return &ddHandler{
|
||||||
|
meta: meta,
|
||||||
|
segmentAllocator: allocator,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *ddHandler) HandleDDMsg(msg msgstream.TsMsg) error {
|
||||||
|
switch msg.Type() {
|
||||||
|
case commonpb.MsgType_kCreateCollection:
|
||||||
|
realMsg := msg.(*msgstream.CreateCollectionMsg)
|
||||||
|
return handler.handleCreateCollection(realMsg)
|
||||||
|
case commonpb.MsgType_kDropCollection:
|
||||||
|
realMsg := msg.(*msgstream.DropCollectionMsg)
|
||||||
|
return handler.handleDropCollection(realMsg)
|
||||||
|
case commonpb.MsgType_kCreatePartition:
|
||||||
|
realMsg := msg.(*msgstream.CreatePartitionMsg)
|
||||||
|
return handler.handleCreatePartition(realMsg)
|
||||||
|
case commonpb.MsgType_kDropPartition:
|
||||||
|
realMsg := msg.(*msgstream.DropPartitionMsg)
|
||||||
|
return handler.handleDropPartition(realMsg)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown msg type: %v", msg.Type())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *ddHandler) handleCreateCollection(msg *msgstream.CreateCollectionMsg) error {
|
||||||
|
schema := &schemapb.CollectionSchema{}
|
||||||
|
if err := proto.Unmarshal(msg.Schema, schema); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err := handler.meta.AddCollection(&collectionInfo{
|
||||||
|
ID: msg.CollectionID,
|
||||||
|
Schema: schema,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *ddHandler) handleDropCollection(msg *msgstream.DropCollectionMsg) error {
|
||||||
|
ids := handler.meta.GetSegmentsByCollectionID(msg.CollectionID)
|
||||||
|
for _, id := range ids {
|
||||||
|
if err := handler.meta.DropSegment(id); err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
handler.segmentAllocator.DropSegment(id)
|
||||||
|
}
|
||||||
|
if err := handler.meta.DropCollection(msg.CollectionID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *ddHandler) handleDropPartition(msg *msgstream.DropPartitionMsg) error {
|
||||||
|
ids := handler.meta.GetSegmentsByCollectionAndPartitionID(msg.CollectionID, msg.PartitionID)
|
||||||
|
for _, id := range ids {
|
||||||
|
if err := handler.meta.DropSegment(id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
handler.segmentAllocator.DropSegment(id)
|
||||||
|
}
|
||||||
|
if err := handler.meta.DropPartition(msg.CollectionID, msg.PartitionID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (handler *ddHandler) handleCreatePartition(msg *msgstream.CreatePartitionMsg) error {
|
||||||
|
return handler.meta.AddPartition(msg.CollectionID, msg.PartitionID)
|
||||||
|
}
|
|
@ -65,26 +65,26 @@ type (
|
||||||
UniqueID = typeutil.UniqueID
|
UniqueID = typeutil.UniqueID
|
||||||
Timestamp = typeutil.Timestamp
|
Timestamp = typeutil.Timestamp
|
||||||
Server struct {
|
Server struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
serverLoopCtx context.Context
|
serverLoopCtx context.Context
|
||||||
serverLoopCancel context.CancelFunc
|
serverLoopCancel context.CancelFunc
|
||||||
serverLoopWg sync.WaitGroup
|
serverLoopWg sync.WaitGroup
|
||||||
state atomic.Value
|
state atomic.Value
|
||||||
client *etcdkv.EtcdKV
|
client *etcdkv.EtcdKV
|
||||||
meta *meta
|
meta *meta
|
||||||
segAllocator segmentAllocator
|
segAllocator segmentAllocator
|
||||||
statsHandler *statsHandler
|
statsHandler *statsHandler
|
||||||
insertChannelMgr *insertChannelManager
|
ddHandler *ddHandler
|
||||||
allocator allocator
|
insertChannelMgr *insertChannelManager
|
||||||
cluster *dataNodeCluster
|
allocator allocator
|
||||||
msgProducer *timesync.MsgProducer
|
cluster *dataNodeCluster
|
||||||
registerFinishCh chan struct{}
|
msgProducer *timesync.MsgProducer
|
||||||
masterClient MasterClient
|
registerFinishCh chan struct{}
|
||||||
ttMsgStream msgstream.MsgStream
|
masterClient MasterClient
|
||||||
k2sMsgStream msgstream.MsgStream
|
ttMsgStream msgstream.MsgStream
|
||||||
ddChannelName string
|
k2sMsgStream msgstream.MsgStream
|
||||||
segmentInfoStream msgstream.MsgStream
|
ddChannelName string
|
||||||
segmentFlushStream msgstream.MsgStream
|
segmentInfoStream msgstream.MsgStream
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -97,7 +97,6 @@ func CreateServer(ctx context.Context) (*Server, error) {
|
||||||
registerFinishCh: ch,
|
registerFinishCh: ch,
|
||||||
cluster: newDataNodeCluster(ch),
|
cluster: newDataNodeCluster(ch),
|
||||||
}
|
}
|
||||||
s.state.Store(internalpb2.StateCode_INITIALIZING)
|
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,6 +105,7 @@ func (s *Server) SetMasterClient(masterClient MasterClient) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Init() error {
|
func (s *Server) Init() error {
|
||||||
|
s.state.Store(internalpb2.StateCode_INITIALIZING)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +120,7 @@ func (s *Server) Start() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.ddHandler = newDDHandler(s.meta, s.segAllocator)
|
||||||
s.initSegmentInfoChannel()
|
s.initSegmentInfoChannel()
|
||||||
if err = s.initMsgProducer(); err != nil {
|
if err = s.initMsgProducer(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -187,6 +188,13 @@ func (s *Server) loadMetaFromMaster() error {
|
||||||
if err := s.checkMasterIsHealthy(); err != nil {
|
if err := s.checkMasterIsHealthy(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if s.ddChannelName == "" {
|
||||||
|
channel, err := s.masterClient.GetDdChannel()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.ddChannelName = channel
|
||||||
|
}
|
||||||
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
|
collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
|
||||||
Base: &commonpb.MsgBase{
|
Base: &commonpb.MsgBase{
|
||||||
MsgType: commonpb.MsgType_kShowCollections,
|
MsgType: commonpb.MsgType_kShowCollections,
|
||||||
|
@ -274,9 +282,10 @@ func (s *Server) checkMasterIsHealthy() error {
|
||||||
|
|
||||||
func (s *Server) startServerLoop() {
|
func (s *Server) startServerLoop() {
|
||||||
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
|
||||||
s.serverLoopWg.Add(2)
|
s.serverLoopWg.Add(3)
|
||||||
go s.startStatsChannel(s.serverLoopCtx)
|
go s.startStatsChannel(s.serverLoopCtx)
|
||||||
go s.startSegmentFlushChannel(s.serverLoopCtx)
|
go s.startSegmentFlushChannel(s.serverLoopCtx)
|
||||||
|
go s.startDDChannel(s.serverLoopCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) startStatsChannel(ctx context.Context) {
|
func (s *Server) startStatsChannel(ctx context.Context) {
|
||||||
|
@ -340,6 +349,30 @@ func (s *Server) startSegmentFlushChannel(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) startDDChannel(ctx context.Context) {
|
||||||
|
defer s.serverLoopWg.Done()
|
||||||
|
ddStream := pulsarms.NewPulsarMsgStream(ctx, 1024)
|
||||||
|
ddStream.SetPulsarClient(Params.PulsarAddress)
|
||||||
|
ddStream.CreatePulsarConsumers([]string{s.ddChannelName}, Params.DataServiceSubscriptionName, util.NewUnmarshalDispatcher(), 1024)
|
||||||
|
ddStream.Start()
|
||||||
|
defer ddStream.Close()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Println("dd channel shut down")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
msgPack := ddStream.Consume()
|
||||||
|
for _, msg := range msgPack.Msgs {
|
||||||
|
if err := s.ddHandler.HandleDDMsg(msg); err != nil {
|
||||||
|
log.Println(err.Error())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) waitDataNodeRegister() {
|
func (s *Server) waitDataNodeRegister() {
|
||||||
log.Println("waiting data node to register")
|
log.Println("waiting data node to register")
|
||||||
<-s.registerFinishCh
|
<-s.registerFinishCh
|
||||||
|
@ -512,8 +545,8 @@ func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, cha
|
||||||
Base: &commonpb.MsgBase{
|
Base: &commonpb.MsgBase{
|
||||||
MsgType: commonpb.MsgType_kSegmentInfo,
|
MsgType: commonpb.MsgType_kSegmentInfo,
|
||||||
MsgID: 0,
|
MsgID: 0,
|
||||||
Timestamp: 0, // todo
|
Timestamp: 0,
|
||||||
SourceID: 0,
|
SourceID: Params.NodeID,
|
||||||
},
|
},
|
||||||
Segment: segmentInfo,
|
Segment: segmentInfo,
|
||||||
},
|
},
|
||||||
|
|
|
@ -35,6 +35,14 @@ func NewGrpcService(ctx context.Context) *Service {
|
||||||
log.Fatalf("create server error: %s", err.Error())
|
log.Fatalf("create server error: %s", err.Error())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
|
||||||
|
s.server.SetMasterClient(masterClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) Init() error {
|
||||||
s.grpcServer = grpc.NewServer()
|
s.grpcServer = grpc.NewServer()
|
||||||
datapb.RegisterDataServiceServer(s.grpcServer, s)
|
datapb.RegisterDataServiceServer(s.grpcServer, s)
|
||||||
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port))
|
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", dataservice.Params.Address, dataservice.Params.Port))
|
||||||
|
@ -46,14 +54,6 @@ func NewGrpcService(ctx context.Context) *Service {
|
||||||
log.Fatal(err.Error())
|
log.Fatal(err.Error())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) SetMasterClient(masterClient dataservice.MasterClient) {
|
|
||||||
s.server.SetMasterClient(masterClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) Init() error {
|
|
||||||
return s.server.Init()
|
return s.server.Init()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue