mirror of https://github.com/milvus-io/milvus.git
fix: double buffer was invalid when put entry which size larger than max size (#31549)
relate: https://github.com/milvus-io/milvus/issues/31548 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/31559/head
parent
368180bce4
commit
7c234f23c3
|
@ -64,8 +64,7 @@ func (c *doubleCacheBuffer[T]) Put(entry T) {
|
|||
|
||||
err := c.head.Put(entry)
|
||||
if errors.Is(err, errBufferFull) {
|
||||
c.evict(entry.Timestamp())
|
||||
c.head.Put(entry)
|
||||
c.evict(entry.Timestamp(), entry)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,9 +83,14 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T {
|
|||
}
|
||||
|
||||
// evict sets head as tail and evicts tail.
|
||||
func (c *doubleCacheBuffer[T]) evict(newTs uint64) {
|
||||
func (c *doubleCacheBuffer[T]) evict(newTs uint64, entry T) {
|
||||
c.tail = c.head
|
||||
c.head = newDoubleCacheItem[T](newTs, c.maxSize/2)
|
||||
c.head = &doubleCacheItem[T]{
|
||||
headTs: newTs,
|
||||
maxSize: c.maxSize / 2,
|
||||
size: entry.Size(),
|
||||
data: []T{entry},
|
||||
}
|
||||
c.ts = c.tail.headTs
|
||||
}
|
||||
|
||||
|
|
|
@ -78,6 +78,58 @@ func (s *DoubleCacheBufferSuite) TestCache() {
|
|||
s.Equal(1, len(buffer.ListAfter(12)))
|
||||
}
|
||||
|
||||
func (s *DoubleCacheBufferSuite) TestPut() {
|
||||
buffer := NewDoubleCacheDeleteBuffer[*Item](10, 1)
|
||||
buffer.Put(&Item{
|
||||
Ts: 11,
|
||||
Data: []BufferItem{
|
||||
{
|
||||
PartitionID: 200,
|
||||
DeleteData: storage.DeleteData{
|
||||
Pks: []storage.PrimaryKey{storage.NewVarCharPrimaryKey("test1")},
|
||||
Tss: []uint64{11},
|
||||
RowCount: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
buffer.Put(&Item{
|
||||
Ts: 12,
|
||||
Data: []BufferItem{
|
||||
{
|
||||
PartitionID: 200,
|
||||
DeleteData: storage.DeleteData{
|
||||
Pks: []storage.PrimaryKey{storage.NewVarCharPrimaryKey("test2")},
|
||||
Tss: []uint64{12},
|
||||
RowCount: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
s.Equal(2, len(buffer.ListAfter(11)))
|
||||
s.Equal(1, len(buffer.ListAfter(12)))
|
||||
|
||||
buffer.Put(&Item{
|
||||
Ts: 13,
|
||||
Data: []BufferItem{
|
||||
{
|
||||
PartitionID: 200,
|
||||
DeleteData: storage.DeleteData{
|
||||
Pks: []storage.PrimaryKey{storage.NewVarCharPrimaryKey("test3")},
|
||||
Tss: []uint64{13},
|
||||
RowCount: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
s.Equal(2, len(buffer.ListAfter(11)))
|
||||
s.Equal(2, len(buffer.ListAfter(12)))
|
||||
s.Equal(1, len(buffer.ListAfter(13)))
|
||||
}
|
||||
|
||||
func TestDoubleCacheDeleteBuffer(t *testing.T) {
|
||||
suite.Run(t, new(DoubleCacheBufferSuite))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue