Rearrange update segment statistics of start position procedure

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
pull/4973/head^2
XuanYang-cn 2021-01-26 09:44:39 +08:00 committed by yefu.chen
parent 5af23cf018
commit 6c303c67b2
9 changed files with 44 additions and 43 deletions

View File

@ -22,11 +22,10 @@ type collectionReplica interface {
// segment
addSegment(segmentID UniqueID, collID UniqueID, partitionID UniqueID,
createTime Timestamp, positions []*internalpb2.MsgPosition) error
positions []*internalpb2.MsgPosition) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
updateStatistics(segmentID UniqueID, numRows int64, endTime Timestamp,
positions []*internalpb2.MsgPosition) error
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb2.SegmentStatisticsUpdates, error)
getSegmentByID(segmentID UniqueID) (*Segment, error)
}
@ -39,10 +38,10 @@ type (
numRows int64
memorySize int64
isNew bool
createTime Timestamp
endTime Timestamp
createTime Timestamp // not using
endTime Timestamp // not using
startPositions []*internalpb2.MsgPosition
endPositions []*internalpb2.MsgPosition
endPositions []*internalpb2.MsgPosition // not using
}
collectionReplicaImpl struct {
@ -66,7 +65,7 @@ func (colReplica *collectionReplicaImpl) getSegmentByID(segmentID UniqueID) (*Se
}
func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID UniqueID,
partitionID UniqueID, createTime Timestamp, positions []*internalpb2.MsgPosition) error {
partitionID UniqueID, positions []*internalpb2.MsgPosition) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
@ -77,7 +76,7 @@ func (colReplica *collectionReplicaImpl) addSegment(segmentID UniqueID, collID U
collectionID: collID,
partitionID: partitionID,
isNew: true,
createTime: createTime,
createTime: 0,
startPositions: positions,
endPositions: make([]*internalpb2.MsgPosition, 0),
}
@ -113,7 +112,7 @@ func (colReplica *collectionReplicaImpl) hasSegment(segmentID UniqueID) bool {
return false
}
func (colReplica *collectionReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64, endTime Timestamp, positions []*internalpb2.MsgPosition) error {
func (colReplica *collectionReplicaImpl) updateStatistics(segmentID UniqueID, numRows int64) error {
colReplica.mu.Lock()
defer colReplica.mu.Unlock()
@ -122,8 +121,6 @@ func (colReplica *collectionReplicaImpl) updateStatistics(segmentID UniqueID, nu
log.Printf("updating segment(%v) row nums: (%v)", segmentID, numRows)
ele.memorySize = 0
ele.numRows += numRows
ele.endTime = endTime
ele.endPositions = positions
return nil
}
}
@ -137,17 +134,14 @@ func (colReplica *collectionReplicaImpl) getSegmentStatisticsUpdates(segmentID U
for _, ele := range colReplica.segments {
if ele.segmentID == segmentID {
updates := &internalpb2.SegmentStatisticsUpdates{
SegmentID: segmentID,
MemorySize: ele.memorySize,
NumRows: ele.numRows,
IsNewSegment: ele.isNew,
CreateTime: ele.createTime,
EndTime: ele.endTime,
StartPositions: ele.startPositions,
EndPositions: ele.endPositions,
SegmentID: segmentID,
MemorySize: ele.memorySize,
NumRows: ele.numRows,
IsNewSegment: ele.isNew,
}
if ele.isNew {
updates.StartPositions = ele.startPositions
ele.isNew = false
}
return updates, nil

View File

@ -140,7 +140,7 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
tsMessages = append(tsMessages, msgstream.TsMsg(&dropColMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0))
var inMsg Msg = msgStream
ddNode.Operate([]*Msg{&inMsg})
}

View File

@ -9,6 +9,7 @@ import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
)
type filterDmNode struct {
@ -71,6 +72,7 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
timestampMin: msgStreamMsg.TimestampMin(),
timestampMax: msgStreamMsg.TimestampMax(),
},
startPositions: make([]*internalpb2.MsgPosition, 0),
}
iMsg.flushMessages = append(iMsg.flushMessages, ddMsg.flushMessages...)
@ -98,6 +100,7 @@ func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
}
}
iMsg.startPositions = append(iMsg.startPositions, msgStreamMsg.StartPositions()...)
iMsg.gcRecord = ddMsg.gcRecord
var res Msg = &iMsg
for _, child := range childs {

View File

@ -111,17 +111,8 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
collID := msg.GetCollectionID()
partitionID := msg.GetPartitionID()
currentPosition := make([]*internalpb2.MsgPosition, 0, len(Params.InsertChannelNames))
for _, name := range Params.InsertChannelNames {
currentPosition = append(currentPosition, &internalpb2.MsgPosition{
ChannelName: name,
MsgID: strconv.FormatInt(msg.Base.MsgID, 10),
Timestamp: msg.Base.GetTimestamp(),
})
}
if !ibNode.replica.hasSegment(currentSegID) {
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.Base.Timestamp, currentPosition)
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, iMsg.startPositions)
if err != nil {
log.Println("Error: add segment error", err)
}
@ -134,7 +125,7 @@ func (ibNode *insertBufferNode) Operate(in []*Msg) []*Msg {
}
}
err := ibNode.replica.updateStatistics(currentSegID, int64(len(msg.RowIDs)), msg.Base.Timestamp, currentPosition)
err := ibNode.replica.updateStatistics(currentSegID, int64(len(msg.RowIDs)))
if err != nil {
log.Println("Error: update Segment Row number wrong, ", err)
}

View File

@ -2,6 +2,7 @@ package datanode
import (
"github.com/zilliztech/milvus-distributed/internal/msgstream"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
)
@ -36,6 +37,7 @@ type (
flushMessages []*flushMsg
gcRecord *gcRecord
timeRange TimeRange
startPositions []*internalpb2.MsgPosition
}
deleteMsg struct {

View File

@ -63,9 +63,10 @@ func (inNode *InputNode) Operate([]*Msg) []*Msg {
}
var msgStreamMsg Msg = &MsgStreamMsg{
tsMessages: msgPack.Msgs,
timestampMin: msgPack.BeginTs,
timestampMax: msgPack.EndTs,
tsMessages: msgPack.Msgs,
timestampMin: msgPack.BeginTs,
timestampMax: msgPack.EndTs,
startPositions: msgPack.StartPositions,
}
for _, child := range childs {

View File

@ -7,16 +7,18 @@ type Msg interface {
}
type MsgStreamMsg struct {
tsMessages []msgstream.TsMsg
timestampMin Timestamp
timestampMax Timestamp
tsMessages []msgstream.TsMsg
timestampMin Timestamp
timestampMax Timestamp
startPositions []*MsgPosition
}
func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampMax Timestamp) *MsgStreamMsg {
func GenerateMsgStreamMsg(tsMessages []msgstream.TsMsg, timestampMin, timestampMax Timestamp, positions []*MsgPosition) *MsgStreamMsg {
return &MsgStreamMsg{
tsMessages: tsMessages,
timestampMin: timestampMin,
timestampMax: timestampMax,
tsMessages: tsMessages,
timestampMin: timestampMin,
timestampMax: timestampMax,
startPositions: positions,
}
}
@ -39,3 +41,7 @@ func (msMsg *MsgStreamMsg) TimestampMin() Timestamp {
func (msMsg *MsgStreamMsg) TimestampMax() Timestamp {
return msMsg.timestampMax
}
func (msMsg *MsgStreamMsg) StartPositions() []*MsgPosition {
return msMsg.startPositions
}

View File

@ -1,6 +1,10 @@
package flowgraph
import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
import (
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)
type Timestamp = typeutil.Timestamp
type NodeName = string
type MsgPosition = internalpb2.MsgPosition

View File

@ -158,7 +158,7 @@ func TestFlowGraphDDNode_Operate(t *testing.T) {
tsMessages = append(tsMessages, msgstream.TsMsg(&createPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&dropPartitionMsg))
tsMessages = append(tsMessages, msgstream.TsMsg(&flushMsg))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3))
msgStream := flowgraph.GenerateMsgStreamMsg(tsMessages, Timestamp(0), Timestamp(3), make([]*internalpb2.MsgPosition, 0))
var inMsg Msg = msgStream
ddNode.Operate([]*Msg{&inMsg})
}