Fix expire allocation bug (#7053)

issue: #7052
Signed-off-by: sunby <bingyi.sun@zilliz.com>
pull/7063/head
sunby 2021-08-12 10:48:08 +08:00 committed by GitHub
parent 8bf4524f6f
commit 31182e52a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 2 deletions

View File

@ -403,14 +403,16 @@ func (s *SegmentManager) ExpireAllocations(channel string, ts Timestamp) error {
if segment == nil || segment.InsertChannel != channel {
continue
}
allocations := make([]*Allocation, 0, len(segment.allocations))
for i := 0; i < len(segment.allocations); i++ {
if segment.allocations[i].ExpireTime <= ts {
a := segment.allocations[i]
segment.allocations = append(segment.allocations[:i], segment.allocations[i+1:]...)
s.putAllocation(a)
} else {
allocations = append(allocations, segment.allocations[i])
}
}
s.meta.SetAllocations(segment.GetID(), segment.allocations)
s.meta.SetAllocations(segment.GetID(), allocations)
}
return nil
}

View File

@ -159,3 +159,44 @@ func TestAllocRowsLargerThanOneSegment(t *testing.T) {
assert.EqualValues(t, 1, allocations[0].NumOfRows)
assert.EqualValues(t, 1, allocations[1].NumOfRows)
}
func TestExpireAllocation(t *testing.T) {
Params.Init()
mockAllocator := newMockAllocator()
meta, err := newMemoryMeta(mockAllocator)
assert.Nil(t, err)
schema := newTestSchema()
collID, err := mockAllocator.allocID()
assert.Nil(t, err)
meta.AddCollection(&datapb.CollectionInfo{ID: collID, Schema: schema})
var mockPolicy = func(schema *schemapb.CollectionSchema) (int, error) {
return 10000000, nil
}
segmentManager := newSegmentManager(meta, mockAllocator, withCalUpperLimitPolicy(mockPolicy))
// alloc 100 times and expire
var maxts Timestamp
var id int64 = -1
for i := 0; i < 100; i++ {
allocs, err := segmentManager.AllocSegment(context.TODO(), collID, 0, "ch1", 100)
assert.Nil(t, err)
assert.EqualValues(t, 1, len(allocs))
if id == -1 {
id = allocs[0].SegmentID
} else {
assert.EqualValues(t, id, allocs[0].SegmentID)
}
if allocs[0].ExpireTime > maxts {
maxts = allocs[0].ExpireTime
}
}
segment := meta.GetSegment(id)
assert.NotNil(t, segment)
assert.EqualValues(t, 100, len(segment.allocations))
segmentManager.ExpireAllocations("ch1", maxts)
segment = meta.GetSegment(id)
assert.NotNil(t, segment)
assert.EqualValues(t, 0, len(segment.allocations))
}