Add unittest in segment_replica (#6197)

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/6206/head
XuanYang-cn 2021-06-30 10:26:12 +08:00 committed by GitHub
parent ae072b4f1d
commit 357af89d55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 85 additions and 10 deletions

2
go.mod
View File

@ -13,7 +13,7 @@ require (
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/frankban/quicktest v1.10.2 // indirect
github.com/go-basic/ipv4 v1.0.0
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/protobuf v1.3.2
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.4.3

View File

@ -91,6 +91,7 @@ func newReplica(rc types.RootCoord, collID UniqueID) Replica {
return replica
}
// segmentFlushed transfers a segment from *New* or *Normal* into *Flushed*.
func (replica *SegmentReplica) segmentFlushed(segID UniqueID) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -117,7 +118,7 @@ func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) {
var seg Segment = *replica.newSegments[segID]
seg.isNew.Store(false)
seg.isFlushed.Store(false)
seg.isFlushed.Store(true)
replica.flushedSegments[segID] = &seg
delete(replica.newSegments, segID)
@ -126,7 +127,7 @@ func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) {
func (replica *SegmentReplica) normal2FlushedSegment(segID UniqueID) {
var seg Segment = *replica.normalSegments[segID]
seg.isFlushed.Store(false)
seg.isFlushed.Store(true)
replica.flushedSegments[segID] = &seg
delete(replica.normalSegments, segID)
@ -147,6 +148,7 @@ func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (coll
return 0, 0, fmt.Errorf("Cannot find segment, id = %v", segID)
}
// addNewSegment adds a *New* and *NotFlushed* new segment
func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID, channelName string,
startPos, endPos *internalpb.MsgPosition) error {
@ -183,6 +185,7 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
return nil
}
// addNormalSegment adds a *NotNew* and *NotFlushed* segment
func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID UniqueID, channelName string, numOfRows int64, cp *segmentCheckPoint) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -216,6 +219,9 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
replica.normalSegments[segID] = seg
return nil
}
// listNewSegmentsStartPositions gets all *New Segments* start positions and
// transfer segments states from *New* to *Normal*.
func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.SegmentStartPosition {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
@ -228,11 +234,13 @@ func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.Segment
StartPosition: seg.startPos,
})
// transfer states
replica.new2NormalSegment(id)
}
return result
}
// listSegmentsCheckPoints gets check points from both *New* and *Normal* segments.
func (replica *SegmentReplica) listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
@ -250,6 +258,7 @@ func (replica *SegmentReplica) listSegmentsCheckPoints() map[UniqueID]segmentChe
return result
}
// updateSegmentEndPosition updates *New* or *Normal* segment's end position.
func (replica *SegmentReplica) updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition) {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
@ -273,6 +282,7 @@ func (replica *SegmentReplica) removeSegment(segID UniqueID) error {
return nil
}
// hasSegment checks whether this replica has a segment according to segment ID.
func (replica *SegmentReplica) hasSegment(segID UniqueID) bool {
replica.segMu.RLock()
defer replica.segMu.RUnlock()
@ -284,7 +294,7 @@ func (replica *SegmentReplica) hasSegment(segID UniqueID) bool {
return inNew || inNormal || inFlush
}
// `updateStatistics` updates the number of rows of a segment in replica.
// updateStatistics updates the number of rows of a segment in replica.
func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) error {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -305,7 +315,7 @@ func (replica *SegmentReplica) updateStatistics(segID UniqueID, numRows int64) e
return fmt.Errorf("There's no segment %v", segID)
}
// `getSegmentStatisticsUpdates` gives current segment's statistics updates.
// getSegmentStatisticsUpdates gives current segment's statistics updates.
func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*internalpb.SegmentStatisticsUpdates, error) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -327,13 +337,12 @@ func (replica *SegmentReplica) getSegmentStatisticsUpdates(segID UniqueID) (*int
}
// --- collection ---
func (replica *SegmentReplica) getCollectionID() UniqueID {
return replica.collectionID
}
// getCollectionSchema will get collection schema from rootcoord for a certain time.
// If you want the latest collection schema, ts should be 0
// getCollectionSchema gets collection schema from rootcoord for a certain timestamp.
// If you want the latest collection schema, ts should be 0.
func (replica *SegmentReplica) getCollectionSchema(collID UniqueID, ts Timestamp) (*schemapb.CollectionSchema, error) {
replica.segMu.Lock()
defer replica.segMu.Unlock()
@ -359,7 +368,7 @@ func (replica *SegmentReplica) validCollection(collID UniqueID) bool {
return collID == replica.collectionID
}
// Auto flush or mannul flush
// updateSegmentCheckPoint is called when auto flush or mannul flush is done.
func (replica *SegmentReplica) updateSegmentCheckPoint(segID UniqueID) {
replica.segMu.Lock()
defer replica.segMu.Unlock()

View File

@ -40,6 +40,73 @@ func TestSegmentReplica(t *testing.T) {
rc := &RootCoordFactory{}
collID := UniqueID(1)
t.Run("Test segmentFlushed", func(t *testing.T) {
testReplica := &SegmentReplica{
newSegments: make(map[UniqueID]*Segment),
normalSegments: make(map[UniqueID]*Segment),
flushedSegments: make(map[UniqueID]*Segment),
}
type Test struct {
inisNew bool
inisFlushed bool
inSegID UniqueID
expectedisNew bool
expectedisFlushed bool
expectedSegID UniqueID
}
tests := []Test{
// new segment
{true, false, 1, false, true, 1},
{true, false, 2, false, true, 2},
{true, false, 3, false, true, 3},
// normal segment
{false, false, 10, false, true, 10},
{false, false, 20, false, true, 20},
{false, false, 30, false, true, 30},
// flushed segment
{false, true, 100, false, true, 100},
{false, true, 200, false, true, 200},
{false, true, 300, false, true, 300},
}
newSeg := func(sr *SegmentReplica, isNew, isFlushed bool, id UniqueID) {
ns := &Segment{segmentID: id}
ns.isNew.Store(isNew)
ns.isFlushed.Store(isFlushed)
if isNew && !isFlushed {
sr.newSegments[id] = ns
return
}
if !isNew && !isFlushed {
sr.normalSegments[id] = ns
return
}
if !isNew && isFlushed {
sr.flushedSegments[id] = ns
return
}
}
for _, te := range tests {
// prepare case
newSeg(testReplica, te.inisNew, te.inisFlushed, te.inSegID)
testReplica.segmentFlushed(te.inSegID)
flushedSeg := testReplica.flushedSegments[te.inSegID]
assert.Equal(t, te.expectedSegID, flushedSeg.segmentID)
assert.Equal(t, te.expectedisNew, flushedSeg.isNew.Load().(bool))
assert.Equal(t, te.expectedisFlushed, flushedSeg.isFlushed.Load().(bool))
}
})
t.Run("Test inner function segment", func(t *testing.T) {
replica := newSegmentReplica(rc, collID)
assert.False(t, replica.hasSegment(0))
@ -121,6 +188,5 @@ func TestSegmentReplica(t *testing.T) {
assert.Equal(t, int64(10), replica.normalSegments[UniqueID(0)].checkPoint.numRows)
replica.updateSegmentCheckPoint(1)
assert.Equal(t, int64(20), replica.normalSegments[UniqueID(1)].checkPoint.numRows)
})
}