mirror of https://github.com/milvus-io/milvus.git
parent
3b1719c8de
commit
769cbb292a
|
@ -51,25 +51,27 @@ const (
|
|||
MetricRequestsSuccess = "success"
|
||||
)
|
||||
|
||||
// DataNode struct communicates with outside services and unioun all
|
||||
// services of data node.
|
||||
// DataNode communicates with outside services and unioun all
|
||||
// services in datanode package.
|
||||
//
|
||||
// DataNode struct implements `types.Component`, `types.DataNode` interfaces.
|
||||
// `rootCoord` holds a grpc client of root coordinator.
|
||||
// `dataCoord` holds a grpc client of data service.
|
||||
// `NodeID` is unique to each data node.
|
||||
// DataNode implements `types.Component`, `types.DataNode` interfaces.
|
||||
// `rootCoord` is grpc client of root coordinator.
|
||||
// `dataCoord` is grpc client of data service.
|
||||
// `NodeID` is unique to each datanode.
|
||||
// `State` is current statement of this data node, indicating whether it's healthy.
|
||||
//
|
||||
// `vchan2SyncService` holds map of vchannlName and dataSyncService, so that datanode
|
||||
// has ability to scale flowgraph
|
||||
// `vchan2FlushCh` holds flush-signal channels for every flowgraph
|
||||
// `vchan2SyncService` is a map of vchannlName to dataSyncService, so that datanode
|
||||
// has ability to scale flowgraph.
|
||||
// `vchan2FlushCh` holds flush-signal channels for every flowgraph.
|
||||
|
||||
// `clearSignal` is a signal channel for releasing the flowgraph resources.
|
||||
// `segmentCache` stores all flushing and flushed segments.
|
||||
type DataNode struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
NodeID UniqueID
|
||||
Role string
|
||||
State atomic.Value // internalpb.StateCode_Initializing
|
||||
watchDm chan struct{}
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
NodeID UniqueID
|
||||
Role string
|
||||
State atomic.Value // internalpb.StateCode_Initializing
|
||||
|
||||
chanMut sync.RWMutex
|
||||
vchan2SyncService map[string]*dataSyncService // vchannel name
|
||||
|
@ -92,10 +94,9 @@ func NewDataNode(ctx context.Context, factory msgstream.Factory) *DataNode {
|
|||
rand.Seed(time.Now().UnixNano())
|
||||
ctx2, cancel2 := context.WithCancel(ctx)
|
||||
node := &DataNode{
|
||||
ctx: ctx2,
|
||||
cancel: cancel2,
|
||||
Role: typeutil.DataNodeRole,
|
||||
watchDm: make(chan struct{}, 1),
|
||||
ctx: ctx2,
|
||||
cancel: cancel2,
|
||||
Role: typeutil.DataNodeRole,
|
||||
|
||||
rootCoord: nil,
|
||||
dataCoord: nil,
|
||||
|
@ -132,7 +133,7 @@ func (node *DataNode) SetDataCoordInterface(ds types.DataCoord) error {
|
|||
}
|
||||
}
|
||||
|
||||
// Register register data node at etcd
|
||||
// Register register datanode to etcd
|
||||
func (node *DataNode) Register() error {
|
||||
node.session = sessionutil.NewSession(node.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
node.session.Init(typeutil.DataNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
|
@ -145,11 +146,11 @@ func (node *DataNode) Register() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Init function supposes data service is in INITIALIZING state.
|
||||
// Init function do nothing now.
|
||||
func (node *DataNode) Init() error {
|
||||
log.Debug("DataNode Init",
|
||||
zap.Any("SegmentStatisticsChannelName", Params.SegmentStatisticsChannelName),
|
||||
zap.Any("TimeTickChannelName", Params.TimeTickChannelName),
|
||||
zap.String("SegmentStatisticsChannelName", Params.SegmentStatisticsChannelName),
|
||||
zap.String("TimeTickChannelName", Params.TimeTickChannelName),
|
||||
)
|
||||
|
||||
return nil
|
||||
|
@ -177,13 +178,16 @@ func (node *DataNode) NewDataSyncService(vchan *datapb.VchannelInfo) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
node.vchan2SyncService[vchan.GetChannelName()] = dataSyncService
|
||||
node.vchan2FlushCh[vchan.GetChannelName()] = flushChan
|
||||
|
||||
log.Info("Start New dataSyncService",
|
||||
zap.Int64("Collection ID", vchan.GetCollectionID()),
|
||||
zap.String("Vchannel name", vchan.GetChannelName()),
|
||||
)
|
||||
go dataSyncService.start()
|
||||
|
||||
log.Info("New dataSyncService started!")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -365,7 +369,11 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
|
|||
}
|
||||
|
||||
numOfFlushingSeg := len(req.SegmentIDs)
|
||||
log.Debug("FlushSegments ...", zap.Int("num", len(req.SegmentIDs)), zap.Int64s("segments", req.SegmentIDs))
|
||||
log.Debug("FlushSegments ...",
|
||||
zap.Int("num", len(req.SegmentIDs)),
|
||||
zap.Int64s("segments", req.SegmentIDs),
|
||||
)
|
||||
|
||||
dmlFlushedCh := make(chan []*datapb.ID2PathList, len(req.SegmentIDs))
|
||||
for _, id := range req.SegmentIDs {
|
||||
chanName := node.getChannelNamebySegmentID(id)
|
||||
|
|
Loading…
Reference in New Issue