Tidy replica (#5920)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/5926/head
XuanYang-cn 2021-06-21 16:00:22 +08:00 committed by GitHub
parent 60171c5927
commit 441300140d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 708 additions and 613 deletions

View File

@ -1,285 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"context"
"fmt"
"sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
)
type Replica interface {
getCollectionID() UniqueID
getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error)
// segment
addSegment(segmentID, collID, partitionID UniqueID, channelName string) error
removeSegment(segmentID UniqueID) error
hasSegment(segmentID UniqueID) bool
updateStatistics(segmentID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
getSegmentByID(segmentID UniqueID) (*Segment, error)
getChannelName(segID UniqueID) (string, error)
setStartPositions(segmentID UniqueID, startPos []*internalpb.MsgPosition) error
setEndPositions(segmentID UniqueID, endPos []*internalpb.MsgPosition) error
getAllStartPositions() []*datapb.SegmentStartPosition
getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition)
listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64)
}
// Segment is the data structure of segments in data node replica.
type Segment struct {
segmentID UniqueID
collectionID UniqueID
partitionID UniqueID
numRows int64
memorySize int64
isNew atomic.Value // bool
channelName string
field2Paths map[UniqueID][]string // fieldID to binlog paths, only auto-flushed paths will be buffered.
}
// CollectionSegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type CollectionSegmentReplica struct {
mu sync.RWMutex
collectionID UniqueID
collSchema *schemapb.CollectionSchema
segments map[UniqueID]*Segment
metaService *metaService
posMu sync.Mutex
startPositions map[UniqueID][]*internalpb.MsgPosition
endPositions map[UniqueID][]*internalpb.MsgPosition
}
var _ Replica = &CollectionSegmentReplica{}
func newReplica(ms types.MasterService, collectionID UniqueID) Replica {
metaService := newMetaService(ms, collectionID)
segments := make(map[UniqueID]*Segment)
var replica Replica = &CollectionSegmentReplica{
segments: segments,
collectionID: collectionID,
metaService: metaService,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
}
return replica
}
func (replica *CollectionSegmentReplica) getChannelName(segID UniqueID) (string, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
seg, ok := replica.segments[segID]
if !ok {
return "", fmt.Errorf("Cannot find segment, id = %v", segID)
}
return seg.channelName, nil
}
func (replica *CollectionSegmentReplica) getSegmentByID(segmentID UniqueID) (*Segment, error) {
replica.mu.RLock()
defer replica.mu.RUnlock()
if seg, ok := replica.segments[segmentID]; ok {
return seg, nil
}
return nil, fmt.Errorf("Cannot find segment, id = %v", segmentID)
}
// `addSegment` add a new segment into replica when data node see the segment
func (replica *CollectionSegmentReplica) addSegment(
segmentID UniqueID,
collID UniqueID,
partitionID UniqueID,
channelName string) error {
replica.mu.Lock()
defer replica.mu.Unlock()
log.Debug("Add Segment", zap.Int64("Segment ID", segmentID))
seg := &Segment{
segmentID: segmentID,
collectionID: collID,
partitionID: partitionID,
channelName: channelName,
field2Paths: make(map[UniqueID][]string),
}
seg.isNew.Store(true)
replica.segments[segmentID] = seg
return nil
}
func (replica *CollectionSegmentReplica) getAllStartPositions() []*datapb.SegmentStartPosition {
replica.mu.RLock()
defer replica.mu.RUnlock()
result := make([]*datapb.SegmentStartPosition, 0, len(replica.segments))
for id, seg := range replica.segments {
if seg.isNew.Load().(bool) {
pos, ok := replica.startPositions[id]
if !ok {
log.Warn("Segment has no start positions")
continue
}
result = append(result, &datapb.SegmentStartPosition{
SegmentID: id,
StartPosition: pos[0],
})
seg.isNew.Store(false)
}
}
return result
}
func (replica *CollectionSegmentReplica) removeSegment(segmentID UniqueID) error {
replica.mu.Lock()
delete(replica.segments, segmentID)
replica.mu.Unlock()
replica.posMu.Lock()
delete(replica.startPositions, segmentID)
delete(replica.endPositions, segmentID)
replica.posMu.Unlock()
return nil
}
func (replica *CollectionSegmentReplica) hasSegment(segmentID UniqueID) bool {
replica.mu.RLock()
defer replica.mu.RUnlock()
_, ok := replica.segments[segmentID]
return ok
}
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *CollectionSegmentReplica) updateStatistics(segmentID UniqueID, numRows int64) error {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
log.Debug("updating segment", zap.Int64("Segment ID", segmentID), zap.Int64("numRows", numRows))
seg.memorySize = 0
seg.numRows += numRows
return nil
}
return fmt.Errorf("There's no segment %v", segmentID)
}
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
func (replica *CollectionSegmentReplica) getSegmentStatisticsUpdates(segmentID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if seg, ok := replica.segments[segmentID]; ok {
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segmentID,
MemorySize: seg.memorySize,
NumRows: seg.numRows,
}
return updates, nil
}
return nil, fmt.Errorf("Error, there's no segment %v", segmentID)
}
// --- collection ---
func (replica *CollectionSegmentReplica) getCollectionID() UniqueID {
return replica.collectionID
}
// getCollectionSchema will get collection schema from masterservice for a certain time.
// If you want the latest collection schema, ts should be 0
func (replica *CollectionSegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
replica.mu.Lock()
defer replica.mu.Unlock()
if !replica.validCollection(collID) {
log.Error("Illegal Collection for the replica")
return nil, fmt.Errorf("Not supported collection %v", collID)
}
sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts)
if err != nil {
log.Error("Grpc error", zap.Error(err))
return nil, err
}
return sch, nil
}
func (replica *CollectionSegmentReplica) validCollection(collID UniqueID) bool {
return collID == replica.collectionID
}
// setStartPositions set segment `Start Position` - means the `startPositions` from the MsgPack when segment is first found
func (replica *CollectionSegmentReplica) setStartPositions(segID UniqueID, startPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.startPositions[segID] = startPositions
return nil
}
// setEndPositions set segment `End Position` - means the `endPositions` from the MsgPack when segment need to be flushed
func (replica *CollectionSegmentReplica) setEndPositions(segID UniqueID, endPositions []*internalpb.MsgPosition) error {
replica.posMu.Lock()
defer replica.posMu.Unlock()
replica.endPositions[segID] = endPositions
return nil
}
// getSegmentPositions returns stored segment start-end Positions
// To te Noted: start/end positions are NOT start&end position from one single MsgPack, they are from different MsgPack!
// see setStartPositions, setEndPositions comment
func (replica *CollectionSegmentReplica) getSegmentPositions(segID UniqueID) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
startPos := replica.startPositions[segID]
endPos := replica.endPositions[segID]
return startPos, endPos
}
func (replica *CollectionSegmentReplica) listOpenSegmentCheckPointAndNumRows(segs []UniqueID) (map[UniqueID]internalpb.MsgPosition, map[UniqueID]int64) {
replica.posMu.Lock()
defer replica.posMu.Unlock()
r1 := make(map[UniqueID]internalpb.MsgPosition)
r2 := make(map[UniqueID]int64)
for _, seg := range segs {
r1[seg] = *replica.endPositions[seg][0]
r2[seg] = replica.segments[seg].numRows
}
return r1, r2
}

View File

@ -1,94 +0,0 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
)
func newCollectionSegmentReplica(ms types.MasterService, collectionID UniqueID) *CollectionSegmentReplica {
metaService := newMetaService(ms, collectionID)
segments := make(map[UniqueID]*Segment)
replica := &CollectionSegmentReplica{
segments: segments,
collectionID: collectionID,
metaService: metaService,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
}
return replica
}
func TestReplica_Collection(t *testing.T) {
// collID := UniqueID(100)
}
func TestReplica_Segment(t *testing.T) {
mockMaster := &MasterServiceFactory{}
collID := UniqueID(1)
t.Run("Test segment", func(t *testing.T) {
replica := newReplica(mockMaster, collID)
assert.False(t, replica.hasSegment(0))
err := replica.addSegment(0, 1, 2, "insert-01")
assert.NoError(t, err)
assert.True(t, replica.hasSegment(0))
seg, err := replica.getSegmentByID(0)
assert.NoError(t, err)
assert.NotNil(t, seg)
assert.Equal(t, UniqueID(1), seg.collectionID)
assert.Equal(t, UniqueID(2), seg.partitionID)
assert.Equal(t, int64(0), seg.numRows)
err = replica.updateStatistics(0, 100)
assert.NoError(t, err)
assert.Equal(t, int64(100), seg.numRows)
update, err := replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
assert.Equal(t, UniqueID(0), update.SegmentID)
assert.Equal(t, int64(100), update.NumRows)
update, err = replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
err = replica.removeSegment(0)
assert.NoError(t, err)
assert.False(t, replica.hasSegment(0))
})
t.Run("Test errors", func(t *testing.T) {
replica := newReplica(mockMaster, collID)
require.False(t, replica.hasSegment(0))
seg, err := replica.getSegmentByID(0)
assert.Error(t, err)
assert.Nil(t, seg)
err = replica.updateStatistics(0, 0)
assert.Error(t, err)
update, err := replica.getSegmentStatisticsUpdates(0)
assert.Error(t, err)
assert.Nil(t, update)
})
}

View File

@ -339,18 +339,6 @@ func (node *DataNode) ReadyToFlush() error {
return nil
}
func (node *DataNode) getSegmentPositionPair(segmentID UniqueID, chanName string) ([]*internalpb.MsgPosition, []*internalpb.MsgPosition) {
node.chanMut.Lock()
defer node.chanMut.Unlock()
sync, ok := node.vchan2SyncService[chanName]
if !ok {
return nil, nil
}
starts, ends := sync.replica.getSegmentPositions(segmentID)
return starts, ends
}
// FlushSegments packs flush messages into flowgraph through flushChan.
// If DataNode receives a valid segment to flush, new flush message for the segment should be ignored.
// So if receiving calls to flush segment A, DataNode should guarantee the segment to be flushed.

View File

@ -134,8 +134,7 @@ func TestDataNode(t *testing.T) {
sync, ok := node1.vchan2SyncService[dmChannelName]
assert.True(t, ok)
sync.replica.addSegment(0, 1, 1, dmChannelName)
// sync.replica.addSegment(1, 1, 1, dmChannelName) unable to deal with this.
sync.replica.addNewSegment(0, 1, 1, dmChannelName, nil, nil)
req := &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{},
@ -187,9 +186,6 @@ func TestDataNode(t *testing.T) {
err = insertMsgStream.Broadcast(&timeTickMsgPack)
assert.NoError(t, err)
_, err = sync.replica.getSegmentByID(0)
assert.NoError(t, err)
defer func() {
<-node1.ctx.Done()
node1.Stop()

View File

@ -185,8 +185,9 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
zap.Int64("SegmentID", us.GetID()),
zap.Int64("NumOfRows", us.GetNumOfRows()),
)
dsService.replica.addSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel())
dsService.replica.updateStatistics(us.GetID(), us.GetNumOfRows())
dsService.replica.addNormalSegment(us.GetID(), us.CollectionID, us.PartitionID, us.GetInsertChannel(),
us.GetNumOfRows(), &segmentCheckPoint{us.GetNumOfRows(), *us.GetDmlPosition()})
}
dsService.fg.AddNode(dmStreamNode)

View File

@ -162,22 +162,13 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collID := msg.GetCollectionID()
partitionID := msg.GetPartitionID()
// log.Debug("InsertBufferNode Operating Segment",
// zap.Int64("ID", currentSegID),
// zap.Int("NumOfRows", len(msg.RowIDs)),
// )
if !ibNode.replica.hasSegment(currentSegID) {
err := ibNode.replica.addSegment(currentSegID, collID, partitionID, msg.GetChannelID())
err := ibNode.replica.addNewSegment(currentSegID, collID, partitionID, msg.GetChannelID(),
iMsg.startPositions[0], iMsg.endPositions[0])
if err != nil {
log.Error("add segment wrong", zap.Error(err))
}
// set msg pack start positions
// this position is the start position of current segment, not start position of current MsgPack
// so setStartPositions will only call once when meet new segment
ibNode.replica.setStartPositions(currentSegID, iMsg.startPositions)
ibNode.setSegmentCheckPoint(currentSegID, segmentCheckPoint{0, *iMsg.startPositions[0]})
}
segNum := uniqueSeg[currentSegID]
@ -473,7 +464,7 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
ibNode.insertBuffer.insertData[currentSegID] = idata
// store current endPositions as Segment->EndPostion
ibNode.replica.setEndPositions(currentSegID, iMsg.endPositions)
ibNode.replica.updateSegmentEndPosition(currentSegID, iMsg.endPositions[0])
}
if len(iMsg.insertMessages) > 0 {
@ -524,15 +515,14 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
log.Debug("segment is empty")
continue
}
fu.checkPoint = ibNode.listSegmentCheckPoints()
fu.checkPoint = ibNode.replica.listSegmentsCheckPoints()
fu.flushed = false
if err := ibNode.dsSaveBinlog(&fu); err != nil {
log.Debug("data service save bin log path failed", zap.Error(err))
}
}
// iMsg is Flush() msg from dataservice
// 1. insertBuffer(not empty) -> binLogs -> minIO/S3
// iMsg is Flush() msg from data cooperator
select {
case fmsg := <-ibNode.flushChan:
currentSegID := fmsg.segmentID
@ -547,12 +537,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
collID: fmsg.collectionID,
segID: currentSegID,
field2Path: map[UniqueID]string{},
checkPoint: ibNode.listSegmentCheckPoints(),
checkPoint: ibNode.replica.listSegmentsCheckPoints(),
flushed: true,
})
ibNode.removeSegmentCheckPoint(fmsg.segmentID)
ibNode.replica.segmentFlushed(currentSegID)
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
} else {
} else { //insertBuffer(not empty) -> binLogs -> minIO/S3
log.Debug(".. Buffer not empty, flushing ..")
finishCh := make(chan segmentFlushUnit, 1)
@ -587,13 +577,12 @@ func (ibNode *insertBufferNode) Operate(in []flowgraph.Msg) []flowgraph.Msg {
fu := <-finishCh
close(finishCh)
if fu.field2Path != nil {
fu.checkPoint = ibNode.listSegmentCheckPoints()
fu.checkPoint = ibNode.replica.listSegmentsCheckPoints()
fu.flushed = true
if err := ibNode.dsSaveBinlog(&fu); err != nil {
log.Debug("Data service save binlog path failed", zap.Error(err))
} else {
// this segment has flushed, so it's not `open segment`, so remove from the check point
ibNode.removeSegmentCheckPoint(fu.segID)
ibNode.replica.segmentFlushed(fu.segID)
}
}
fmsg.dmlFlushedCh <- []*datapb.ID2PathList{{ID: currentSegID, Paths: []string{}}}
@ -714,39 +703,15 @@ func flushSegment(
return
}
_, ep := ibNode.replica.getSegmentPositions(segID)
sta, _ := ibNode.replica.getSegmentStatisticsUpdates(segID)
ibNode.setSegmentCheckPoint(segID, segmentCheckPoint{sta.NumRows, *ep[0]})
startPos := ibNode.replica.getAllStartPositions()
ibNode.replica.updateSegmentCheckPoint(segID)
startPos := ibNode.replica.listNewSegmentsStartPositions()
flushUnit <- segmentFlushUnit{collID: collID, segID: segID, field2Path: field2Path, startPositions: startPos}
clearFn(true)
}
func (ibNode *insertBufferNode) setSegmentCheckPoint(segID UniqueID, chk segmentCheckPoint) {
ibNode.segmentCheckPointLock.Lock()
defer ibNode.segmentCheckPointLock.Unlock()
ibNode.segmentCheckPoints[segID] = chk
}
func (ibNode *insertBufferNode) removeSegmentCheckPoint(segID UniqueID) {
ibNode.segmentCheckPointLock.Lock()
defer ibNode.segmentCheckPointLock.Unlock()
delete(ibNode.segmentCheckPoints, segID)
}
func (ibNode *insertBufferNode) listSegmentCheckPoints() map[UniqueID]segmentCheckPoint {
ibNode.segmentCheckPointLock.Lock()
defer ibNode.segmentCheckPointLock.Unlock()
segs := make(map[UniqueID]segmentCheckPoint)
for k, v := range ibNode.segmentCheckPoints {
segs[k] = v
}
return segs
}
func (ibNode *insertBufferNode) writeHardTimeTick(ts Timestamp) error {
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.DataNodeTtMsg{
// timeTickMsg := msgstream.TimeTickMsg{
BaseMsg: msgstream.BaseMsg{
BeginTimestamp: ts,
EndTimestamp: ts,
@ -826,13 +791,7 @@ func (ibNode *insertBufferNode) getCollMetabySegID(segmentID UniqueID, ts Timest
}
func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID UniqueID) (collID, partitionID UniqueID, err error) {
seg, err := ibNode.replica.getSegmentByID(segmentID)
if err != nil {
return
}
collID = seg.collectionID
partitionID = seg.partitionID
return
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(

View File

@ -53,7 +53,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
replica := newReplica(mockMaster, collMeta.ID)
err = replica.addSegment(1, collMeta.ID, 0, insertChannelName)
err = replica.addNewSegment(1, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
msFactory := msgstream.NewPmsFactory()
@ -141,9 +141,9 @@ func TestFlushSegment(t *testing.T) {
replica := newReplica(mockMaster, collMeta.ID)
err := replica.addSegment(segmentID, collMeta.ID, 0, insertChannelName)
err := replica.addNewSegment(segmentID, collMeta.ID, 0, insertChannelName, &internalpb.MsgPosition{}, &internalpb.MsgPosition{})
require.NoError(t, err)
replica.setEndPositions(segmentID, []*internalpb.MsgPosition{{ChannelName: "TestChannel"}})
replica.updateSegmentEndPosition(segmentID, &internalpb.MsgPosition{ChannelName: "TestChannel"})
finishCh := make(chan segmentFlushUnit, 1)
@ -265,11 +265,11 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
mockMaster := &MasterServiceFactory{}
colRep := &CollectionSegmentReplica{
segments: make(map[UniqueID]*Segment),
collectionID: collMeta.ID,
startPositions: make(map[UniqueID][]*internalpb.MsgPosition),
endPositions: make(map[UniqueID][]*internalpb.MsgPosition),
colRep := &SegmentReplica{
collectionID: collMeta.ID,
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
}
colRep.metaService = newMetaService(mockMaster, collMeta.ID)
@ -291,170 +291,196 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
flushChan := make(chan *flushMsg, 100)
iBNode := newInsertBufferNode(ctx, colRep, msFactory, NewAllocatorFactory(), flushChan, saveBinlog, "string")
// Auto flush number of rows set to 2
inMsg := genInsertMsg("datanode-03-test-autoflush")
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(100)
inMsg.insertMessages = append(inMsg.insertMessages, dataFactory.GetMsgStreamInsertMsgs(32000)...)
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 1
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
inMsg.insertMessages = dataFactory.GetMsgStreamInsertMsgs(2)
var iMsg flowgraph.Msg = &inMsg
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(colRep.endPositions), 2)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(123))
assert.Equal(t, len(iBNode.segmentCheckPoints), 2)
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(100))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(100))
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000))
assert.Equal(t, iBNode.insertBuffer.size(2), int32(50+16000))
assert.Equal(t, len(flushUnit), 0)
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 2
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(123))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(iBNode.segmentCheckPoints), 3)
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(100))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
t.Run("Pure auto flush", func(t *testing.T) {
iBNode.insertBuffer.maxSize = 2
assert.Equal(t, len(flushUnit), 1)
assert.Equal(t, flushUnit[0].segID, int64(2))
assert.Equal(t, len(flushUnit[0].checkPoint), 3)
assert.Equal(t, flushUnit[0].checkPoint[1].numRows, int64(0))
assert.Equal(t, flushUnit[0].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[0].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[0].checkPoint[1].pos.Timestamp, Timestamp(100))
assert.Equal(t, flushUnit[0].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[0].checkPoint[3].pos.Timestamp, Timestamp(123))
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 1
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 100}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 123}}
assert.Greater(t, len(flushUnit[0].field2Path), 0)
assert.False(t, flushUnit[0].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 2)
assert.Equal(t, iBNode.insertBuffer.size(1), int32(50+16000))
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
type Test struct {
expectedSegID UniqueID
expectedNumOfRows int64
expectedStartPosTs Timestamp
expectedEndPosTs Timestamp
expectedCpNumOfRows int64
expectedCpPosTs Timestamp
}
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = 1
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(iBNode.segmentCheckPoints), 3)
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
beforeAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{1, 1, 100, 123, 0, 100},
{2, 1, 100, 123, 0, 100},
}
assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].checkPoint), 3)
assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.False(t, flushUnit[1].flushed)
assert.Greater(t, len(flushUnit[1].field2Path), 0)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
iBNode.Operate([]flowgraph.Msg{iMsg})
require.Equal(t, 2, len(colRep.newSegments))
require.Equal(t, 0, len(colRep.normalSegments))
assert.Equal(t, 0, len(flushUnit))
dmlFlushedCh := make(chan []*datapb.ID2PathList, 1)
for i, test := range beforeAutoFlushTests {
seg, ok := colRep.newSegments[UniqueID(i+1)]
assert.True(t, ok)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows)
assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp())
}
flushChan <- &flushMsg{
msgID: 3,
timestamp: 456,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
dmlFlushedCh: dmlFlushedCh,
}
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = int64(i%2) + 2
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 200}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
iMsg = &inMsg
inMsg.insertMessages = []*msgstream.InsertMsg{}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
iBNode.Operate([]flowgraph.Msg{iMsg})
// Triger auto flush
iBNode.Operate([]flowgraph.Msg{iMsg})
require.Equal(t, 0, len(colRep.newSegments))
require.Equal(t, 3, len(colRep.normalSegments))
flushSeg := <-dmlFlushedCh
assert.NotNil(t, flushSeg)
assert.Equal(t, len(flushSeg), 1)
assert.Equal(t, flushSeg[0].ID, int64(1))
assert.NotNil(t, flushSeg[0].Paths)
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(iBNode.segmentCheckPoints), 2)
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, 1, len(flushUnit))
assert.Equal(t, 3, len(flushUnit[0].checkPoint))
assert.Less(t, 0, len(flushUnit[0].field2Path))
assert.False(t, flushUnit[0].flushed)
assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].checkPoint), 3)
assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit[2].field2Path), 0)
assert.NotNil(t, flushUnit[2].field2Path)
assert.True(t, flushUnit[2].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
afterAutoFlushTests := []Test{
// segID, numOfRow, startTs, endTs, cp.numOfRow, cp.Ts
{1, 1, 100, 123, 0, 100},
{2, 2, 100, 234, 2, 234},
{3, 1, 200, 234, 0, 200},
}
flushChan <- &flushMsg{
msgID: 4,
timestamp: 567,
segmentID: UniqueID(3),
collectionID: UniqueID(3),
dmlFlushedCh: dmlFlushedCh,
}
iBNode.Operate([]flowgraph.Msg{iMsg})
flushSeg = <-dmlFlushedCh
assert.NotNil(t, flushSeg)
assert.Equal(t, len(flushSeg), 1)
assert.Equal(t, flushSeg[0].ID, int64(3))
assert.NotNil(t, flushSeg[0].Paths)
assert.Equal(t, len(colRep.endPositions), 3)
assert.Equal(t, colRep.endPositions[1][0].Timestamp, Timestamp(345))
assert.Equal(t, colRep.endPositions[2][0].Timestamp, Timestamp(234))
assert.Equal(t, colRep.endPositions[3][0].Timestamp, Timestamp(234))
assert.Equal(t, len(iBNode.segmentCheckPoints), 1)
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].checkPoint), 2)
assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000))
assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Greater(t, len(flushUnit[3].field2Path), 0)
assert.NotNil(t, flushUnit[3].field2Path)
assert.True(t, flushUnit[3].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 0)
for i, test := range afterAutoFlushTests {
seg, ok := colRep.normalSegments[UniqueID(i+1)]
assert.True(t, ok)
assert.Equal(t, test.expectedSegID, seg.segmentID)
assert.Equal(t, test.expectedNumOfRows, seg.numRows)
assert.Equal(t, test.expectedStartPosTs, seg.startPos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, seg.checkPoint.numRows)
assert.Equal(t, test.expectedCpPosTs, seg.checkPoint.pos.GetTimestamp())
assert.Equal(t, test.expectedCpNumOfRows, flushUnit[0].checkPoint[UniqueID(i+1)].numRows)
assert.Equal(t, test.expectedCpPosTs, flushUnit[0].checkPoint[UniqueID(i+1)].pos.Timestamp)
if i == 1 {
assert.Equal(t, test.expectedSegID, flushUnit[0].segID)
assert.Equal(t, int32(0), iBNode.insertBuffer.size(UniqueID(i+1)))
} else {
assert.Equal(t, int32(1), iBNode.insertBuffer.size(UniqueID(i+1)))
}
}
})
t.Run("Auto with manul flush", func(t *testing.T) {
t.Skipf("Skip, fix later")
for i := range inMsg.insertMessages {
inMsg.insertMessages[i].SegmentID = 1
}
inMsg.startPositions = []*internalpb.MsgPosition{{Timestamp: 234}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
iBNode.Operate([]flowgraph.Msg{iMsg})
assert.Equal(t, len(iBNode.segmentCheckPoints), 3)
assert.Equal(t, iBNode.segmentCheckPoints[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit), 2)
assert.Equal(t, flushUnit[1].segID, int64(1))
assert.Equal(t, len(flushUnit[1].checkPoint), 3)
assert.Equal(t, flushUnit[1].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[1].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[1].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[1].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[1].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[1].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.False(t, flushUnit[1].flushed)
assert.Greater(t, len(flushUnit[1].field2Path), 0)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
dmlFlushedCh := make(chan []*datapb.ID2PathList, 1)
flushChan <- &flushMsg{
msgID: 3,
timestamp: 456,
segmentID: UniqueID(1),
collectionID: UniqueID(1),
dmlFlushedCh: dmlFlushedCh,
}
inMsg.insertMessages = []*msgstream.InsertMsg{}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 345}}
inMsg.endPositions = []*internalpb.MsgPosition{{Timestamp: 456}}
iBNode.Operate([]flowgraph.Msg{iMsg})
flushSeg := <-dmlFlushedCh
assert.NotNil(t, flushSeg)
assert.Equal(t, len(flushSeg), 1)
assert.Equal(t, flushSeg[0].ID, int64(1))
assert.NotNil(t, flushSeg[0].Paths)
assert.Equal(t, len(iBNode.segmentCheckPoints), 2)
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[3].numRows, int64(0))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, iBNode.segmentCheckPoints[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit), 3)
assert.Equal(t, flushUnit[2].segID, int64(1))
assert.Equal(t, len(flushUnit[2].checkPoint), 3)
assert.Equal(t, flushUnit[2].checkPoint[1].numRows, int64(50+16000+100+32000))
assert.Equal(t, flushUnit[2].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[2].checkPoint[3].numRows, int64(0))
assert.Equal(t, flushUnit[2].checkPoint[1].pos.Timestamp, Timestamp(345))
assert.Equal(t, flushUnit[2].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[2].checkPoint[3].pos.Timestamp, Timestamp(123))
assert.Equal(t, len(flushUnit[2].field2Path), 0)
assert.NotNil(t, flushUnit[2].field2Path)
assert.True(t, flushUnit[2].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 1)
assert.Equal(t, iBNode.insertBuffer.size(3), int32(50+16000))
flushChan <- &flushMsg{
msgID: 4,
timestamp: 567,
segmentID: UniqueID(3),
collectionID: UniqueID(3),
dmlFlushedCh: dmlFlushedCh,
}
iBNode.Operate([]flowgraph.Msg{iMsg})
flushSeg = <-dmlFlushedCh
assert.NotNil(t, flushSeg)
assert.Equal(t, len(flushSeg), 1)
assert.Equal(t, flushSeg[0].ID, int64(3))
assert.NotNil(t, flushSeg[0].Paths)
assert.Equal(t, len(iBNode.segmentCheckPoints), 1)
assert.Equal(t, iBNode.segmentCheckPoints[2].numRows, int64(100+32000))
assert.Equal(t, iBNode.segmentCheckPoints[2].pos.Timestamp, Timestamp(234))
assert.Equal(t, len(flushUnit), 4)
assert.Equal(t, flushUnit[3].segID, int64(3))
assert.Equal(t, len(flushUnit[3].checkPoint), 2)
assert.Equal(t, flushUnit[3].checkPoint[3].numRows, int64(50+16000))
assert.Equal(t, flushUnit[3].checkPoint[2].numRows, int64(100+32000))
assert.Equal(t, flushUnit[3].checkPoint[3].pos.Timestamp, Timestamp(234))
assert.Equal(t, flushUnit[3].checkPoint[2].pos.Timestamp, Timestamp(234))
assert.Greater(t, len(flushUnit[3].field2Path), 0)
assert.NotNil(t, flushUnit[3].field2Path)
assert.True(t, flushUnit[3].flushed)
assert.Equal(t, len(iBNode.insertBuffer.insertData), 0)
})
}

View File

@ -0,0 +1,378 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"context"
"fmt"
"sync"
"sync/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/types"
)
type Replica interface {
getCollectionID() UniqueID
getCollectionSchema(collectionID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error)
getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error)
addNewSegment(segID, collID, partitionID UniqueID, channelName string, startPos, endPos *internalpb.MsgPosition) error
addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
updateSegmentCheckPoint(segID UniqueID)
hasSegment(segID UniqueID) bool
updateStatistics(segID UniqueID, numRows int64) error
getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error)
segmentFlushed(segID UniqueID)
}
// Segment is the data structure of segments in data node replica.
type Segment struct {
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
numRows int64
memorySize int64
isNew atomic.Value // bool
isFlushed atomic.Value // bool
channelName string
checkPoint segmentCheckPoint
startPos *internalpb.MsgPosition // TODO readonly
endPos *internalpb.MsgPosition
}
// SegmentReplica is the data replication of persistent data in datanode.
// It implements `Replica` interface.
type SegmentReplica struct {
collectionID UniqueID
collSchema *schemapb.CollectionSchema
segMu sync.RWMutex
newSegments map[UniqueID]*Segment
normalSegments map[UniqueID]*Segment
flushedSegments map[UniqueID]*Segment
metaService *metaService
}
var _ Replica = &SegmentReplica{}
func newReplica(ms types.MasterService, collID UniqueID) Replica {
metaService := newMetaService(ms, collID)
var replica Replica = &SegmentReplica{
collectionID: collID,
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
metaService: metaService,
}
return replica
}
func (replica *SegmentReplica) segmentFlushed(segID UniqueID) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if _, ok := replica.newSegments[segID]; ok {
replica.new2FlushedSegment(segID)
}
if _, ok := replica.normalSegments[segID]; ok {
replica.normal2FlushedSegment(segID)
}
}
func (replica *SegmentReplica) new2NormalSegment(segID UniqueID) {
var seg Segment = *replica.newSegments[segID]
seg.isNew.Store(false)
replica.normalSegments[segID] = &seg
delete(replica.newSegments, segID)
}
func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) {
var seg Segment = *replica.newSegments[segID]
seg.isNew.Store(false)
seg.isFlushed.Store(false)
replica.flushedSegments[segID] = &seg
delete(replica.newSegments, segID)
}
func (replica *SegmentReplica) normal2FlushedSegment(segID UniqueID) {
var seg Segment = *replica.normalSegments[segID]
seg.isFlushed.Store(false)
replica.flushedSegments[segID] = &seg
delete(replica.normalSegments, segID)
}
func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
if seg, ok := replica.newSegments[segID]; ok {
return seg.collectionID, seg.partitionID, nil
}
if seg, ok := replica.normalSegments[segID]; ok {
return seg.collectionID, seg.partitionID, nil
}
return 0, 0, fmt.Errorf("Cannot find segment, id = %v", segID)
}
func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID, channelName string,
startPos, endPos *internalpb.MsgPosition) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if collID != replica.collectionID {
log.Warn("Mismatch collection", zap.Int64("ID", collID))
return fmt.Errorf("Mismatch collection, ID=%d", collID)
}
log.Debug("Add new segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName),
)
seg := &Segment{
collectionID: collID,
partitionID: partitionID,
segmentID: segID,
channelName: channelName,
checkPoint: segmentCheckPoint{0, *startPos},
startPos: startPos,
endPos: endPos,
}
seg.isNew.Store(true)
seg.isFlushed.Store(false)
replica.newSegments[segID] = seg
return nil
}
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if collID != replica.collectionID {
log.Warn("Mismatch collection", zap.Int64("ID", collID))
return fmt.Errorf("Mismatch collection, ID=%d", collID)
}
log.Debug("Add Normal segment",
zap.Int64("segment ID", segID),
zap.Int64("collection ID", collID),
zap.Int64("partition ID", partitionID),
zap.String("channel name", channelName),
)
seg := &Segment{
collectionID: collID,
partitionID: partitionID,
segmentID: segID,
channelName: channelName,
numRows: numOfRows,
checkPoint: *cp,
endPos: &cp.pos,
}
seg.isNew.Store(false)
seg.isFlushed.Store(false)
replica.normalSegments[segID] = seg
return nil
}
func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
result := make([]*datapb.SegmentStartPosition, 0, len(replica.newSegments))
for id, seg := range replica.newSegments {
result = append(result, &datapb.SegmentStartPosition{
SegmentID: id,
StartPosition: seg.startPos,
})
replica.new2NormalSegment(id)
}
return result
}
func (replica *SegmentReplica) listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
result := make(map[UniqueID]segmentCheckPoint)
for id, seg := range replica.newSegments {
result[id] = seg.checkPoint
}
for id, seg := range replica.normalSegments {
result[id] = seg.checkPoint
}
return result
}
func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
seg, ok := replica.newSegments[segID]
if ok {
seg.endPos = endPos
return
}
seg, ok = replica.normalSegments[segID]
if ok {
seg.endPos = endPos
return
}
log.Warn("No match segment", zap.Int64("ID", segID))
}
func (replica *SegmentReplica) removeSegment(segID UniqueID) error {
return nil
}
func (replica *SegmentReplica) hasSegment(segID UniqueID) bool {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
_, inNew := replica.newSegments[segID]
_, inNormal := replica.normalSegments[segID]
_, inFlush := replica.flushedSegments[segID]
return inNew || inNormal || inFlush
}
// `updateStatistics` updates the number of rows of a segment in replica.
func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
log.Debug("updating segment", zap.Int64("Segment ID", segID), zap.Int64("numRows", numRows))
if seg, ok := replica.newSegments[segID]; ok {
seg.memorySize = 0
seg.numRows += numRows
return nil
}
if seg, ok := replica.normalSegments[segID]; ok {
seg.memorySize = 0
seg.numRows += numRows
return nil
}
return fmt.Errorf("There's no segment %v", segID)
}
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
updates := &internalpb.SegmentStatisticsUpdates{
SegmentID: segID,
}
if seg, ok := replica.newSegments[segID]; ok {
updates.NumRows = seg.numRows
return updates, nil
}
if seg, ok := replica.normalSegments[segID]; ok {
updates.NumRows = seg.numRows
return updates, nil
}
return nil, fmt.Errorf("Error, there's no segment %v", segID)
}
// --- collection ---
func (replica *SegmentReplica) getCollectionID() UniqueID {
return replica.collectionID
}
// getCollectionSchema will get collection schema from masterservice for a certain time.
// If you want the latest collection schema, ts should be 0
func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if !replica.validCollection(collID) {
log.Error("Mismatch collection for the replica",
zap.Int64("Want", replica.collectionID),
zap.Int64("Actual", collID),
)
return nil, fmt.Errorf("Not supported collection %v", collID)
}
sch, err := replica.metaService.getCollectionSchema(context.Background(), collID, ts)
if err != nil {
log.Error("Grpc error", zap.Error(err))
return nil, err
}
return sch, nil
}
func (replica *SegmentReplica) validCollection(collID UniqueID) bool {
return collID == replica.collectionID
}
// Auto flush or mannul flush
func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
if seg, ok := replica.newSegments[segID]; ok {
seg.checkPoint = segmentCheckPoint{seg.numRows, *seg.endPos}
return
}
if seg, ok := replica.normalSegments[segID]; ok {
seg.checkPoint = segmentCheckPoint{seg.numRows, *seg.endPos}
return
}
log.Warn("There's no segment", zap.Int64("ID", segID))
}

View File

@ -0,0 +1,126 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package datanode
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
)
func newSegmentReplica(ms types.MasterService, collID UniqueID) *SegmentReplica {
metaService := newMetaService(ms, collID)
var replica = &SegmentReplica{
collectionID: collID,
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
metaService: metaService,
}
return replica
}
func TestSegmentReplica(t *testing.T) {
mockMaster := &MasterServiceFactory{}
collID := UniqueID(1)
t.Run("Test inner function segment", func(t *testing.T) {
replica := newSegmentReplica(mockMaster, collID)
assert.False(t, replica.hasSegment(0))
startPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(100)}
endPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(200)}
err := replica.addNewSegment(0, 1, 2, "insert-01", startPos, endPos)
assert.NoError(t, err)
assert.True(t, replica.hasSegment(0))
assert.Equal(t, 1, len(replica.newSegments))
seg, ok := replica.newSegments[UniqueID(0)]
assert.True(t, ok)
require.NotNil(t, seg)
assert.Equal(t, UniqueID(0), seg.segmentID)
assert.Equal(t, UniqueID(1), seg.collectionID)
assert.Equal(t, UniqueID(2), seg.partitionID)
assert.Equal(t, "insert-01", seg.channelName)
assert.Equal(t, Timestamp(100), seg.startPos.Timestamp)
assert.Equal(t, Timestamp(200), seg.endPos.Timestamp)
assert.Equal(t, startPos.ChannelName, seg.checkPoint.pos.ChannelName)
assert.Equal(t, startPos.Timestamp, seg.checkPoint.pos.Timestamp)
assert.Equal(t, int64(0), seg.numRows)
assert.True(t, seg.isNew.Load().(bool))
assert.False(t, seg.isFlushed.Load().(bool))
err = replica.updateStatistics(0, 10)
assert.NoError(t, err)
assert.Equal(t, int64(10), seg.numRows)
cpPos := &internalpb.MsgPosition{ChannelName: "insert-01", Timestamp: Timestamp(10)}
cp := &segmentCheckPoint{int64(10), *cpPos}
err = replica.addNormalSegment(1, 1, 2, "insert-01", int64(10), cp)
assert.NoError(t, err)
assert.True(t, replica.hasSegment(1))
assert.Equal(t, 1, len(replica.normalSegments))
seg, ok = replica.normalSegments[UniqueID(1)]
assert.True(t, ok)
require.NotNil(t, seg)
assert.Equal(t, UniqueID(1), seg.segmentID)
assert.Equal(t, UniqueID(1), seg.collectionID)
assert.Equal(t, UniqueID(2), seg.partitionID)
assert.Equal(t, "insert-01", seg.channelName)
assert.Equal(t, cpPos.ChannelName, seg.checkPoint.pos.ChannelName)
assert.Equal(t, cpPos.Timestamp, seg.checkPoint.pos.Timestamp)
assert.Equal(t, int64(10), seg.numRows)
assert.False(t, seg.isNew.Load().(bool))
assert.False(t, seg.isFlushed.Load().(bool))
err = replica.updateStatistics(1, 10)
assert.NoError(t, err)
assert.Equal(t, int64(20), seg.numRows)
segPos := replica.listNewSegmentsStartPositions()
assert.Equal(t, 1, len(segPos))
assert.Equal(t, UniqueID(0), segPos[0].SegmentID)
assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName)
assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp)
assert.Equal(t, 0, len(replica.newSegments))
assert.Equal(t, 2, len(replica.normalSegments))
cps := replica.listSegmentsCheckPoints()
assert.Equal(t, 2, len(cps))
assert.Equal(t, startPos.Timestamp, cps[UniqueID(0)].pos.Timestamp)
assert.Equal(t, int64(0), cps[UniqueID(0)].numRows)
assert.Equal(t, cp.pos.Timestamp, cps[UniqueID(1)].pos.Timestamp)
assert.Equal(t, int64(10), cps[UniqueID(1)].numRows)
updates, err := replica.getSegmentStatisticsUpdates(0)
assert.NoError(t, err)
assert.Equal(t, int64(10), updates.NumRows)
updates, err = replica.getSegmentStatisticsUpdates(1)
assert.NoError(t, err)
assert.Equal(t, int64(20), updates.NumRows)
replica.updateSegmentCheckPoint(0)
assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows)
replica.updateSegmentCheckPoint(1)
assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows)
})
}