diff --git a/internal/datanode/collection_replica.go b/internal/datanode/collection_replica.go index a3ac675247..84638bfc5b 100644 --- a/internal/datanode/collection_replica.go +++ b/internal/datanode/collection_replica.go @@ -45,6 +45,10 @@ type Replica interface { bufferAutoFlushBinlogPaths(segmentID UniqueID, field2Path map[UniqueID]string) error getBufferPaths(segID UniqueID) (map[UniqueID][]string, error) getChannelName(segID UniqueID) (string, error) + //new msg postions + setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error + setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error + getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) } // Segment is the data structure of segments in data node replica. @@ -71,6 +75,10 @@ type CollectionSegmentReplica struct { mu sync.RWMutex segments map[UniqueID]*Segment collections map[UniqueID]*Collection + + posMu sync.Mutex + startPositions map[UniqueID][]*internalpb.MsgPosition + endPositions map[UniqueID][]*internalpb.MsgPosition } var _ Replica = &CollectionSegmentReplica{} @@ -80,8 +88,10 @@ func newReplica() Replica { collections := make(map[UniqueID]*Collection) var replica Replica = &CollectionSegmentReplica{ - segments: segments, - collections: collections, + segments: segments, + collections: collections, + startPositions: make(map[UniqueID][]*internalpb.MsgPosition), + endPositions: make(map[UniqueID][]*internalpb.MsgPosition), } return replica } @@ -338,3 +348,30 @@ func (replica *CollectionSegmentReplica) hasCollection(collectionID UniqueID) bo _, ok := replica.collections[collectionID] return ok } + +// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found +func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error { + replica.posMu.Lock() + defer replica.posMu.Unlock() + replica.startPositions[segID] = startPositions + return nil +} + +// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed +func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error { + replica.posMu.Lock() + defer replica.posMu.Unlock() + replica.endPositions[segID] = endPositions + return nil +} + +// getSegmentPositions returns stored segment start-end Positions +// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack! +// see setStartPositions, setEndPositions comment +func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) { + replica.posMu.Lock() + defer replica.posMu.Unlock() + startPos := replica.startPositions[segID] + endPos := replica.endPositions[segID] + return startPos, endPos +} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index ca9c23126b..539e0b2f9c 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -63,6 +63,7 @@ type DataNode struct { State atomic.Value // internalpb.StateCode_Initializing watchDm chan struct{} + chanMut sync.RWMutex vchan2SyncService map[string]*dataSyncService vchan2FlushCh map[string]chan<- *flushMsg @@ -187,6 +188,8 @@ func (node *DataNode) Init() error { // NewDataSyncService adds a new dataSyncService for new dmlVchannel and starts dataSyncService. func (node *DataNode) NewDataSyncService(vchanPair *datapb.VchannelPair) error { + node.chanMut.Lock() + defer node.chanMut.Unlock() if _, ok := node.vchan2SyncService[vchanPair.GetDmlVchannelName()]; ok { return nil } @@ -258,6 +261,8 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*internalpb.Compo } func (node *DataNode) getChannelName(segID UniqueID) string { + node.chanMut.RLock() + defer node.chanMut.RUnlock() for name, dataSync := range node.vchan2SyncService { if dataSync.replica.hasSegment(segID) { return name @@ -282,7 +287,9 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen status.Reason = fmt.Sprintf("DataNode not find segment %d!", id) return status, errors.New(status.GetReason()) } + node.chanMut.RLock() flushCh, ok := node.vchan2FlushCh[chanName] + node.chanMut.RUnlock() if !ok { // TODO restart DataNode or reshape vchan2FlushCh and vchan2SyncService status.Reason = "DataNode abnormal!" @@ -390,6 +397,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen func (node *DataNode) Stop() error { node.cancel() + node.chanMut.RLock() + defer node.chanMut.RUnlock() // close services for _, syncService := range node.vchan2SyncService { if syncService != nil { diff --git a/internal/datanode/flow_graph_insert_buffer_node.go b/internal/datanode/flow_graph_insert_buffer_node.go index e8e32b783c..402e250c67 100644 --- a/internal/datanode/flow_graph_insert_buffer_node.go +++ b/internal/datanode/flow_graph_insert_buffer_node.go @@ -160,6 +160,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ibNode.replica.setStartPosition(currentSegID, startPosition) } } + // set msg pack start positions, new design + ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions) } segNum := uniqueSeg[currentSegID] @@ -479,6 +481,8 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { ibNode.replica.setEndPosition(currentSegID, endPosition) } + // store current startPositions as Segment->EndPostion + ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions) } if len(iMsg.insertMessages) > 0 {