DataNode stores segment msgpack positions (#5472)

* DataNode stores segment msgpack positions

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Fix typo caused deadlock

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

* Use MsgPack EndPositions for flush pos

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/5779/head
congqixia 2021-05-28 16:47:29 +08:00 committed by zhenshan.cao
parent 43534ef63f
commit 8657251f41
3 changed files with 52 additions and 2 deletions

View File

@ -45,6 +45,10 @@ type Replica interface {
bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error
getBufferPaths(segID UniqueID) (map[UniqueID][]string, error)
getChannelName(segID UniqueID) (string, error)
//new msg postions
setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
}
// Segment is the data structure of segments in data node replica.
@ -71,6 +75,10 @@ type CollectionSegmentReplica struct {
mu sync.RWMutex
segments map[UniqueID]*Segment
collections map[UniqueID]*Collection
posMu sync.Mutex
startPositions map[UniqueID][]*internalpb.MsgPosition
endPositions map[UniqueID][]*internalpb.MsgPosition
}
var _ Replica = &CollectionSegmentReplica{}
@ -80,8 +88,10 @@ func newReplica() Replica {
collections := make(map[UniqueID]*Collection)
var replica Replica = &CollectionSegmentReplica{
segments: segments,
collections: collections,
segments: segments,
collections: collections,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
}
return replica
}
@ -338,3 +348,30 @@ func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bo
_, ok := replica.collections[collectionID]
return ok
}
// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.startPositions[segID] = startPositions
return nil
}
// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.endPositions[segID] = endPositions
return nil
}
// getSegmentPositions returns stored segment start-end Positions
// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
// see setStartPositions, setEndPositions comment
func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
startPos := replica.startPositions[segID]
endPos := replica.endPositions[segID]
return startPos, endPos
}

View File

@ -63,6 +63,7 @@ type DataNode struct {
State atomic.Value // internalpb.StateCode_Initializing
watchDm chan struct{}
chanMut sync.RWMutex
vchan2SyncService map[string]*dataSyncService
vchan2FlushCh map[string]chan<- *flushMsg
@ -187,6 +188,8 @@ func (node *DataNode) Init() error {
// NewDataSyncService adds a new dataSyncService for new dmlVchannel and starts dataSyncService.
func (node *DataNode) NewDataSyncService(vchanPair *datapb.VchannelPair) error {
node.chanMut.Lock()
defer node.chanMut.Unlock()
if _, ok := node.vchan2SyncService[vchanPair.GetDmlVchannelName()]; ok {
return nil
}
@ -258,6 +261,8 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo
}
func (node *DataNode) getChannelName(segID UniqueID) string {
node.chanMut.RLock()
defer node.chanMut.RUnlock()
for name, dataSync := range node.vchan2SyncService {
if dataSync.replica.hasSegment(segID) {
return name
@ -282,7 +287,9 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
status.Reason = fmt.Sprintf("DataNode not find segment %d!", id)
return status, errors.New(status.GetReason())
}
node.chanMut.RLock()
flushCh, ok := node.vchan2FlushCh[chanName]
node.chanMut.RUnlock()
if !ok {
// TODO restart DataNode or reshape vchan2FlushCh and vchan2SyncService
status.Reason = "DataNode abnormal!"
@ -390,6 +397,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
func (node *DataNode) Stop() error {
node.cancel()
node.chanMut.RLock()
defer node.chanMut.RUnlock()
// close services
for _, syncService := range node.vchan2SyncService {
if syncService != nil {

View File

@ -160,6 +160,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
ibNode.replica.setStartPosition(currentSegID, startPosition)
}
}
// set msg pack start positions, new design
ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
}
segNum := uniqueSeg[currentSegID]
@ -479,6 +481,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
ibNode.replica.setEndPosition(currentSegID, endPosition)
}
// store current startPositions as Segment->EndPostion
ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions)
}
if len(iMsg.insertMessages) > 0 {