mirror of https://github.com/milvus-io/milvus.git
Make newSegment transfer state after SaveBinlogPath succuess (#19858)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/19815/head
parent
59bcbf0cf6
commit
a1cdc55bcb
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/flowgraph"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -434,7 +435,10 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
|
|||
fpMut.Lock()
|
||||
flushPacks = append(flushPacks, pack)
|
||||
fpMut.Unlock()
|
||||
colRep.listNewSegmentsStartPositions()
|
||||
startPos := colRep.listNewSegmentsStartPositions()
|
||||
colRep.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID {
|
||||
return pos.GetSegmentID()
|
||||
}))
|
||||
colRep.listSegmentsCheckPoints()
|
||||
if pack.flushed || pack.dropped {
|
||||
colRep.segmentFlushed(pack.segmentID)
|
||||
|
|
|
@ -38,6 +38,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/metautil"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
// flushManager defines a flush manager signature
|
||||
|
@ -688,8 +689,9 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
|
|||
}
|
||||
}
|
||||
|
||||
startPos := dsService.replica.listNewSegmentsStartPositions()
|
||||
// start positions for all new segments
|
||||
for _, pos := range dsService.replica.listNewSegmentsStartPositions() {
|
||||
for _, pos := range startPos {
|
||||
segment, has := segmentPack[pos.GetSegmentID()]
|
||||
if !has {
|
||||
segment = &datapb.DropVirtualChannelSegment{
|
||||
|
@ -726,6 +728,9 @@ func dropVirtualChannelFunc(dsService *dataSyncService, opts ...retry.Option) fl
|
|||
if rsp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("data service DropVirtualChannel failed, reason = %s", rsp.GetStatus().GetReason())
|
||||
}
|
||||
dsService.replica.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID {
|
||||
return pos.GetSegmentID()
|
||||
}))
|
||||
return nil
|
||||
}, opts...)
|
||||
if err != nil {
|
||||
|
@ -827,6 +832,10 @@ func flushNotifyFunc(dsService *dataSyncService, opts ...retry.Option) notifyMet
|
|||
if rsp.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return fmt.Errorf("data service save bin log path failed, reason = %s", rsp.Reason)
|
||||
}
|
||||
|
||||
dsService.replica.transferNewSegments(lo.Map(startPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID {
|
||||
return pos.GetSegmentID()
|
||||
}))
|
||||
return nil
|
||||
}, opts...)
|
||||
if err != nil {
|
||||
|
|
|
@ -65,6 +65,7 @@ type Replica interface {
|
|||
listPartitionSegments(partID UniqueID) []UniqueID
|
||||
filterSegments(channelName string, partitionID UniqueID) []*Segment
|
||||
listNewSegmentsStartPositions() []*datapb.SegmentStartPosition
|
||||
transferNewSegments(segmentIDs []UniqueID)
|
||||
listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint
|
||||
updateSegmentEndPosition(segID UniqueID, endPos *internalpb.MsgPosition)
|
||||
updateSegmentCheckPoint(segID UniqueID)
|
||||
|
@ -525,18 +526,24 @@ func (replica *SegmentReplica) listNewSegmentsStartPositions() []*datapb.Segment
|
|||
|
||||
result := make([]*datapb.SegmentStartPosition, 0, len(replica.newSegments))
|
||||
for id, seg := range replica.newSegments {
|
||||
|
||||
result = append(result, &datapb.SegmentStartPosition{
|
||||
SegmentID: id,
|
||||
StartPosition: seg.startPos,
|
||||
})
|
||||
|
||||
// transfer states
|
||||
replica.new2NormalSegment(id)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// transferNewSegments make new segment transfer to normal segments.
|
||||
func (replica *SegmentReplica) transferNewSegments(segmentIDs []UniqueID) {
|
||||
replica.segMu.Lock()
|
||||
defer replica.segMu.Unlock()
|
||||
|
||||
for _, segmentID := range segmentIDs {
|
||||
replica.new2NormalSegment(segmentID)
|
||||
}
|
||||
}
|
||||
|
||||
// listSegmentsCheckPoints gets check points from both *New* and *Normal* segments.
|
||||
func (replica *SegmentReplica) listSegmentsCheckPoints() map[UniqueID]segmentCheckPoint {
|
||||
replica.segMu.RLock()
|
||||
|
|
|
@ -26,6 +26,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/bits-and-blooms/bloom/v3"
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
@ -1003,8 +1004,13 @@ func TestInnerFunctionSegment(t *testing.T) {
|
|||
assert.Equal(t, "insert-01", segPos[0].StartPosition.ChannelName)
|
||||
assert.Equal(t, Timestamp(100), segPos[0].StartPosition.Timestamp)
|
||||
|
||||
assert.Equal(t, 0, len(replica.newSegments))
|
||||
assert.Equal(t, 2, len(replica.normalSegments))
|
||||
// not change until transferNewSegment called
|
||||
assert.Equal(t, 1, len(replica.newSegments))
|
||||
assert.Equal(t, 1, len(replica.normalSegments))
|
||||
|
||||
replica.transferNewSegments(lo.Map(segPos, func(pos *datapb.SegmentStartPosition, _ int) UniqueID {
|
||||
return pos.GetSegmentID()
|
||||
}))
|
||||
|
||||
cps := replica.listSegmentsCheckPoints()
|
||||
assert.Equal(t, 2, len(cps))
|
||||
|
|
Loading…
Reference in New Issue