mirror of https://github.com/milvus-io/milvus.git
Set delBuf's start-end position of compacted segment (#20614)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com> Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/20572/head
parent
50791bd932
commit
2d307c1909
|
@ -296,6 +296,7 @@ func (ddb *DelDataBuf) mergeDelDataBuf(buf *DelDataBuf) {
|
|||
|
||||
tr := TimeRange{timestampMax: buf.TimestampTo, timestampMin: buf.TimestampFrom}
|
||||
ddb.updateTimeRange(tr)
|
||||
ddb.updateStartAndEndPosition(buf.startPos, buf.endPos)
|
||||
|
||||
ddb.delData.Pks = append(ddb.delData.Pks, buf.delData.Pks...)
|
||||
ddb.delData.Tss = append(ddb.delData.Tss, buf.delData.Tss...)
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
)
|
||||
|
||||
func genTestCollectionSchema(dim int64) *schemapb.CollectionSchema {
|
||||
|
@ -162,10 +163,12 @@ func Test_CompactSegBuff(t *testing.T) {
|
|||
//2. set up deleteDataBuf for seg1 and seg2
|
||||
delDataBuf1 := newDelDataBuf()
|
||||
delDataBuf1.EntriesNum++
|
||||
delDataBuf1.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50})
|
||||
delBufferManager.Store(segID1, delDataBuf1)
|
||||
heap.Push(delBufferManager.delBufHeap, delDataBuf1.item)
|
||||
delDataBuf2 := newDelDataBuf()
|
||||
delDataBuf2.EntriesNum++
|
||||
delDataBuf2.updateStartAndEndPosition(nil, &internalpb.MsgPosition{Timestamp: 50})
|
||||
delBufferManager.Store(segID2, delDataBuf2)
|
||||
heap.Push(delBufferManager.delBufHeap, delDataBuf2.item)
|
||||
|
||||
|
@ -180,4 +183,16 @@ func Test_CompactSegBuff(t *testing.T) {
|
|||
assert.False(t, seg1Exist)
|
||||
assert.False(t, seg2Exist)
|
||||
assert.Equal(t, int64(2), delBufferManager.GetEntriesNum(compactedToSegID))
|
||||
|
||||
//5. test roll and evict (https://github.com/milvus-io/milvus/issues/20501)
|
||||
delBufferManager.channel.rollDeleteBuffer(compactedToSegID)
|
||||
_, segCompactedToExist := delBufferManager.Load(compactedToSegID)
|
||||
assert.False(t, segCompactedToExist)
|
||||
delBufferManager.channel.evictHistoryDeleteBuffer(compactedToSegID, &internalpb.MsgPosition{
|
||||
Timestamp: 100,
|
||||
})
|
||||
cp := delBufferManager.channel.getChannelCheckpoint(&internalpb.MsgPosition{
|
||||
Timestamp: 200,
|
||||
})
|
||||
assert.Equal(t, Timestamp(200), cp.Timestamp) // evict all buffer, use ttPos as cp
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue