mirror of https://github.com/milvus-io/milvus.git
Refine some codes about the datanode (#17720)
Signed-off-by: SimFG <bang.fu@zilliz.com>pull/17708/head
parent
f4c6a6734e
commit
50d5d04552
|
@ -594,7 +594,7 @@ func (c *ChannelManager) updateWithTimer(updates ChannelOpSet, state datapb.Chan
|
||||||
|
|
||||||
err := c.store.Update(updates)
|
err := c.store.Update(updates)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("fail to update", zap.Array("updates", updates))
|
log.Warn("fail to update", zap.Array("updates", updates), zap.Error(err))
|
||||||
c.stateTimer.removeTimers(channelsWithTimer)
|
c.stateTimer.removeTimers(channelsWithTimer)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -85,20 +85,18 @@ func (fm *flowgraphManager) release(vchanName string) {
|
||||||
func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) {
|
func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) {
|
||||||
var (
|
var (
|
||||||
flushCh chan flushMsg
|
flushCh chan flushMsg
|
||||||
loaded = false
|
|
||||||
)
|
)
|
||||||
|
|
||||||
fm.flowgraphs.Range(func(key, value interface{}) bool {
|
fm.flowgraphs.Range(func(key, value interface{}) bool {
|
||||||
fg := value.(*dataSyncService)
|
fg := value.(*dataSyncService)
|
||||||
if fg.replica.hasSegment(segID, true) {
|
if fg.replica.hasSegment(segID, true) {
|
||||||
loaded = true
|
|
||||||
flushCh = fg.flushCh
|
flushCh = fg.flushCh
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
if loaded {
|
if flushCh != nil {
|
||||||
return flushCh, nil
|
return flushCh, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,13 +31,12 @@ type TimeTickedFlowGraph struct {
|
||||||
|
|
||||||
// AddNode add Node into flowgraph
|
// AddNode add Node into flowgraph
|
||||||
func (fg *TimeTickedFlowGraph) AddNode(node Node) {
|
func (fg *TimeTickedFlowGraph) AddNode(node Node) {
|
||||||
nodeName := node.Name()
|
|
||||||
nodeCtx := nodeCtx{
|
nodeCtx := nodeCtx{
|
||||||
node: node,
|
node: node,
|
||||||
downstreamInputChanIdx: make(map[string]int),
|
downstreamInputChanIdx: make(map[string]int),
|
||||||
closeCh: make(chan struct{}),
|
closeCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
fg.nodeCtx[nodeName] = &nodeCtx
|
fg.nodeCtx[node.Name()] = &nodeCtx
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetEdges set directed edges from in nodes to out nodes
|
// SetEdges set directed edges from in nodes to out nodes
|
||||||
|
|
Loading…
Reference in New Issue