mirror of https://github.com/milvus-io/milvus.git
enhance: make delegator delete buffer holding all delete from cp (#29626)
See also #29625 This PR: - Add a new implemention of `DeleteBuffer`: listDeleteBuffer - holds cacheBlock slice - `Put` method append new delete data into last block - when a block is full, append a new block into the list - Add `TryDiscard` method for `DeleteBuffer` interface - For doubleCacheBuffer, do nothing - For listDeleteBuffer, try to evict "old" blocks, which are blocks before the first block whose start ts is behind provided ts - Add checkpoint field for `UpdateVersion` sync action, which shall be used to discard old cache delete block --------- Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/29688/head
parent
79c06c5e73
commit
da7c3cbd88
|
@ -583,6 +583,7 @@ message SyncAction {
|
||||||
repeated int64 sealedInTarget = 8;
|
repeated int64 sealedInTarget = 8;
|
||||||
int64 TargetVersion = 9;
|
int64 TargetVersion = 9;
|
||||||
repeated int64 droppedInTarget = 10;
|
repeated int64 droppedInTarget = 10;
|
||||||
|
msg.MsgPosition checkpoint = 11;
|
||||||
}
|
}
|
||||||
|
|
||||||
message SyncDistributionRequest {
|
message SyncDistributionRequest {
|
||||||
|
|
|
@ -458,14 +458,21 @@ func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, lead
|
||||||
sealedSegments := ob.targetMgr.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
|
sealedSegments := ob.targetMgr.GetSealedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
|
||||||
growingSegments := ob.targetMgr.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
|
growingSegments := ob.targetMgr.GetGrowingSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
|
||||||
droppedSegments := ob.targetMgr.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
|
droppedSegments := ob.targetMgr.GetDroppedSegmentsByChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTarget)
|
||||||
|
channel := ob.targetMgr.GetDmChannel(leaderView.CollectionID, leaderView.Channel, meta.NextTargetFirst)
|
||||||
|
|
||||||
return &querypb.SyncAction{
|
action := &querypb.SyncAction{
|
||||||
Type: querypb.SyncType_UpdateVersion,
|
Type: querypb.SyncType_UpdateVersion,
|
||||||
GrowingInTarget: growingSegments.Collect(),
|
GrowingInTarget: growingSegments.Collect(),
|
||||||
SealedInTarget: lo.Keys(sealedSegments),
|
SealedInTarget: lo.Keys(sealedSegments),
|
||||||
DroppedInTarget: droppedSegments,
|
DroppedInTarget: droppedSegments,
|
||||||
TargetVersion: targetVersion,
|
TargetVersion: targetVersion,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if channel != nil {
|
||||||
|
action.Checkpoint = channel.GetSeekPosition()
|
||||||
|
}
|
||||||
|
|
||||||
|
return action
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
|
func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
|
||||||
|
|
|
@ -30,6 +30,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
|
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
|
||||||
|
@ -69,7 +70,7 @@ type ShardDelegator interface {
|
||||||
LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error
|
LoadGrowing(ctx context.Context, infos []*querypb.SegmentLoadInfo, version int64) error
|
||||||
LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error
|
LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) error
|
||||||
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
|
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest, force bool) error
|
||||||
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64)
|
SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)
|
||||||
GetTargetVersion() int64
|
GetTargetVersion() int64
|
||||||
|
|
||||||
// control
|
// control
|
||||||
|
@ -497,7 +498,8 @@ func organizeSubTask[T any](ctx context.Context, req T, sealed []SnapshotItem, g
|
||||||
|
|
||||||
func executeSubTasks[T any, R interface {
|
func executeSubTasks[T any, R interface {
|
||||||
GetStatus() *commonpb.Status
|
GetStatus() *commonpb.Status
|
||||||
}](ctx context.Context, tasks []subTask[T], execute func(context.Context, T, cluster.Worker) (R, error), taskType string, log *log.MLogger) ([]R, error) {
|
}](ctx context.Context, tasks []subTask[T], execute func(context.Context, T, cluster.Worker) (R, error), taskType string, log *log.MLogger,
|
||||||
|
) ([]R, error) {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -663,8 +665,8 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
||||||
return nil, fmt.Errorf("collection(%d) not found in manager", collectionID)
|
return nil, fmt.Errorf("collection(%d) not found in manager", collectionID)
|
||||||
}
|
}
|
||||||
|
|
||||||
maxSegmentDeleteBuffer := paramtable.Get().QueryNodeCfg.MaxSegmentDeleteBuffer.GetAsInt64()
|
sizePerBlock := paramtable.Get().QueryNodeCfg.DeleteBufferBlockSize.GetAsInt64()
|
||||||
log.Info("Init delta cache", zap.Int64("maxSegmentCacheBuffer", maxSegmentDeleteBuffer), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))
|
log.Info("Init delete cache with list delete buffer", zap.Int64("sizePerBlock", sizePerBlock), zap.Time("startTime", tsoutil.PhysicalTime(startTs)))
|
||||||
|
|
||||||
sd := &shardDelegator{
|
sd := &shardDelegator{
|
||||||
collectionID: collectionID,
|
collectionID: collectionID,
|
||||||
|
@ -677,7 +679,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
|
||||||
lifetime: lifetime.NewLifetime(lifetime.Initializing),
|
lifetime: lifetime.NewLifetime(lifetime.Initializing),
|
||||||
distribution: NewDistribution(),
|
distribution: NewDistribution(),
|
||||||
level0Deletions: make(map[int64]*storage.DeleteData),
|
level0Deletions: make(map[int64]*storage.DeleteData),
|
||||||
deleteBuffer: deletebuffer.NewDoubleCacheDeleteBuffer[*deletebuffer.Item](startTs, maxSegmentDeleteBuffer),
|
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
|
||||||
pkOracle: pkoracle.NewPkOracle(),
|
pkOracle: pkoracle.NewPkOracle(),
|
||||||
tsafeManager: tsafeManager,
|
tsafeManager: tsafeManager,
|
||||||
latestTsafe: atomic.NewUint64(startTs),
|
latestTsafe: atomic.NewUint64(startTs),
|
||||||
|
|
|
@ -843,7 +843,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64,
|
func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64,
|
||||||
sealedInTarget []int64, droppedInTarget []int64,
|
sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition,
|
||||||
) {
|
) {
|
||||||
growings := sd.segmentManager.GetBy(
|
growings := sd.segmentManager.GetBy(
|
||||||
segments.WithType(segments.SegmentTypeGrowing),
|
segments.WithType(segments.SegmentTypeGrowing),
|
||||||
|
@ -875,6 +875,7 @@ func (sd *shardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []
|
||||||
zap.Int64s("growingSegments", redundantGrowingIDs))
|
zap.Int64s("growingSegments", redundantGrowingIDs))
|
||||||
}
|
}
|
||||||
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs)
|
sd.distribution.SyncTargetVersion(newVersion, growingInTarget, sealedInTarget, redundantGrowingIDs)
|
||||||
|
sd.deleteBuffer.TryDiscard(checkpoint.GetTimestamp())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sd *shardDelegator) GetTargetVersion() int64 {
|
func (sd *shardDelegator) GetTargetVersion() int64 {
|
||||||
|
|
|
@ -918,7 +918,7 @@ func (s *DelegatorDataSuite) TestSyncTargetVersion() {
|
||||||
s.manager.Segment.Put(segments.SegmentTypeGrowing, ms)
|
s.manager.Segment.Put(segments.SegmentTypeGrowing, ms)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4})
|
s.delegator.SyncTargetVersion(int64(5), []int64{1}, []int64{2}, []int64{3, 4}, &msgpb.MsgPosition{})
|
||||||
s.Equal(int64(5), s.delegator.GetTargetVersion())
|
s.Equal(int64(5), s.delegator.GetTargetVersion())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import (
|
||||||
"go.uber.org/atomic"
|
"go.uber.org/atomic"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||||
|
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
@ -244,7 +245,7 @@ func (s *DelegatorSuite) initSegments() {
|
||||||
Version: 2001,
|
Version: 2001,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
s.delegator.SyncTargetVersion(2001, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{})
|
s.delegator.SyncTargetVersion(2001, []int64{1004}, []int64{1000, 1001, 1002, 1003}, []int64{}, &msgpb.MsgPosition{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DelegatorSuite) TestSearch() {
|
func (s *DelegatorSuite) TestSearch() {
|
||||||
|
|
|
@ -35,11 +35,12 @@ type DeleteBuffer[T timed] interface {
|
||||||
Put(T)
|
Put(T)
|
||||||
ListAfter(uint64) []T
|
ListAfter(uint64) []T
|
||||||
SafeTs() uint64
|
SafeTs() uint64
|
||||||
|
TryDiscard(uint64)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
|
func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBuffer[T] {
|
||||||
return &doubleCacheBuffer[T]{
|
return &doubleCacheBuffer[T]{
|
||||||
head: newDoubleCacheItem[T](startTs, maxSize),
|
head: newCacheBlock[T](startTs, maxSize),
|
||||||
maxSize: maxSize,
|
maxSize: maxSize,
|
||||||
ts: startTs,
|
ts: startTs,
|
||||||
}
|
}
|
||||||
|
@ -48,7 +49,7 @@ func NewDoubleCacheDeleteBuffer[T timed](startTs uint64, maxSize int64) DeleteBu
|
||||||
// doubleCacheBuffer implements DeleteBuffer with fixed sized double cache.
|
// doubleCacheBuffer implements DeleteBuffer with fixed sized double cache.
|
||||||
type doubleCacheBuffer[T timed] struct {
|
type doubleCacheBuffer[T timed] struct {
|
||||||
mut sync.RWMutex
|
mut sync.RWMutex
|
||||||
head, tail *doubleCacheItem[T]
|
head, tail *cacheBlock[T]
|
||||||
maxSize int64
|
maxSize int64
|
||||||
ts uint64
|
ts uint64
|
||||||
}
|
}
|
||||||
|
@ -57,6 +58,9 @@ func (c *doubleCacheBuffer[T]) SafeTs() uint64 {
|
||||||
return c.ts
|
return c.ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *doubleCacheBuffer[T]) TryDiscard(_ uint64) {
|
||||||
|
}
|
||||||
|
|
||||||
// Put implements DeleteBuffer.
|
// Put implements DeleteBuffer.
|
||||||
func (c *doubleCacheBuffer[T]) Put(entry T) {
|
func (c *doubleCacheBuffer[T]) Put(entry T) {
|
||||||
c.mut.Lock()
|
c.mut.Lock()
|
||||||
|
@ -86,18 +90,19 @@ func (c *doubleCacheBuffer[T]) ListAfter(ts uint64) []T {
|
||||||
// evict sets head as tail and evicts tail.
|
// evict sets head as tail and evicts tail.
|
||||||
func (c *doubleCacheBuffer[T]) evict(newTs uint64) {
|
func (c *doubleCacheBuffer[T]) evict(newTs uint64) {
|
||||||
c.tail = c.head
|
c.tail = c.head
|
||||||
c.head = newDoubleCacheItem[T](newTs, c.maxSize/2)
|
c.head = newCacheBlock[T](newTs, c.maxSize/2)
|
||||||
c.ts = c.tail.headTs
|
c.ts = c.tail.headTs
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDoubleCacheItem[T timed](ts uint64, maxSize int64) *doubleCacheItem[T] {
|
func newCacheBlock[T timed](ts uint64, maxSize int64, elements ...T) *cacheBlock[T] {
|
||||||
return &doubleCacheItem[T]{
|
return &cacheBlock[T]{
|
||||||
headTs: ts,
|
headTs: ts,
|
||||||
maxSize: maxSize,
|
maxSize: maxSize,
|
||||||
|
data: elements,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type doubleCacheItem[T timed] struct {
|
type cacheBlock[T timed] struct {
|
||||||
mut sync.RWMutex
|
mut sync.RWMutex
|
||||||
headTs uint64
|
headTs uint64
|
||||||
size int64
|
size int64
|
||||||
|
@ -108,7 +113,7 @@ type doubleCacheItem[T timed] struct {
|
||||||
|
|
||||||
// Cache adds entry into cache item.
|
// Cache adds entry into cache item.
|
||||||
// returns error if item is full
|
// returns error if item is full
|
||||||
func (c *doubleCacheItem[T]) Put(entry T) error {
|
func (c *cacheBlock[T]) Put(entry T) error {
|
||||||
c.mut.Lock()
|
c.mut.Lock()
|
||||||
defer c.mut.Unlock()
|
defer c.mut.Unlock()
|
||||||
|
|
||||||
|
@ -122,7 +127,7 @@ func (c *doubleCacheItem[T]) Put(entry T) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListAfter returns entries of which ts after provided value.
|
// ListAfter returns entries of which ts after provided value.
|
||||||
func (c *doubleCacheItem[T]) ListAfter(ts uint64) []T {
|
func (c *cacheBlock[T]) ListAfter(ts uint64) []T {
|
||||||
c.mut.RLock()
|
c.mut.RLock()
|
||||||
defer c.mut.RUnlock()
|
defer c.mut.RUnlock()
|
||||||
idx := sort.Search(len(c.data), func(idx int) bool {
|
idx := sort.Search(len(c.data), func(idx int) bool {
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package deletebuffer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/cockroachdb/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewListDeleteBuffer[T timed](startTs uint64, sizePerBlock int64) DeleteBuffer[T] {
|
||||||
|
return &listDeleteBuffer[T]{
|
||||||
|
safeTs: startTs,
|
||||||
|
sizePerBlock: sizePerBlock,
|
||||||
|
list: []*cacheBlock[T]{newCacheBlock[T](startTs, sizePerBlock)},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// listDeleteBuffer implements DeleteBuffer with a list.
|
||||||
|
// head points to the earliest block.
|
||||||
|
// tail points to the latest block which shall be written into.
|
||||||
|
type listDeleteBuffer[T timed] struct {
|
||||||
|
mut sync.RWMutex
|
||||||
|
|
||||||
|
list []*cacheBlock[T]
|
||||||
|
|
||||||
|
safeTs uint64
|
||||||
|
sizePerBlock int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *listDeleteBuffer[T]) Put(entry T) {
|
||||||
|
b.mut.Lock()
|
||||||
|
defer b.mut.Unlock()
|
||||||
|
|
||||||
|
tail := b.list[len(b.list)-1]
|
||||||
|
err := tail.Put(entry)
|
||||||
|
if errors.Is(err, errBufferFull) {
|
||||||
|
b.list = append(b.list, newCacheBlock[T](entry.Timestamp(), b.sizePerBlock, entry))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *listDeleteBuffer[T]) ListAfter(ts uint64) []T {
|
||||||
|
b.mut.RLock()
|
||||||
|
defer b.mut.RUnlock()
|
||||||
|
|
||||||
|
var result []T
|
||||||
|
for _, block := range b.list {
|
||||||
|
result = append(result, block.ListAfter(ts)...)
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *listDeleteBuffer[T]) SafeTs() uint64 {
|
||||||
|
b.mut.RLock()
|
||||||
|
defer b.mut.RUnlock()
|
||||||
|
return b.safeTs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *listDeleteBuffer[T]) TryDiscard(ts uint64) {
|
||||||
|
b.mut.Lock()
|
||||||
|
defer b.mut.Unlock()
|
||||||
|
if len(b.list) == 1 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var nextHead int
|
||||||
|
for idx := len(b.list) - 1; idx >= 0; idx-- {
|
||||||
|
block := b.list[idx]
|
||||||
|
if block.headTs <= ts {
|
||||||
|
nextHead = idx
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if nextHead > 0 {
|
||||||
|
for idx := 0; idx < nextHead; idx++ {
|
||||||
|
b.list[idx] = nil
|
||||||
|
}
|
||||||
|
b.list = b.list[nextHead:]
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,114 @@
|
||||||
|
// Licensed to the LF AI & Data foundation under one
|
||||||
|
// or more contributor license agreements. See the NOTICE file
|
||||||
|
// distributed with this work for additional information
|
||||||
|
// regarding copyright ownership. The ASF licenses this file
|
||||||
|
// to you under the Apache License, Version 2.0 (the
|
||||||
|
// "License"); you may not use this file except in compliance
|
||||||
|
// with the License. You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package deletebuffer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/storage"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ListDeleteBufferSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ListDeleteBufferSuite) TestNewBuffer() {
|
||||||
|
buffer := NewListDeleteBuffer[*Item](10, 1000)
|
||||||
|
|
||||||
|
s.EqualValues(10, buffer.SafeTs())
|
||||||
|
|
||||||
|
ldb, ok := buffer.(*listDeleteBuffer[*Item])
|
||||||
|
s.True(ok)
|
||||||
|
s.Len(ldb.list, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ListDeleteBufferSuite) TestCache() {
|
||||||
|
buffer := NewListDeleteBuffer[*Item](10, 1000)
|
||||||
|
buffer.Put(&Item{
|
||||||
|
Ts: 11,
|
||||||
|
Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
buffer.Put(&Item{
|
||||||
|
Ts: 12,
|
||||||
|
Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Equal(2, len(buffer.ListAfter(11)))
|
||||||
|
s.Equal(1, len(buffer.ListAfter(12)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *ListDeleteBufferSuite) TestTryDiscard() {
|
||||||
|
buffer := NewListDeleteBuffer[*Item](10, 1)
|
||||||
|
buffer.Put(&Item{
|
||||||
|
Ts: 10,
|
||||||
|
Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(1)},
|
||||||
|
Tss: []uint64{10},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
buffer.Put(&Item{
|
||||||
|
Ts: 20,
|
||||||
|
Data: []BufferItem{
|
||||||
|
{
|
||||||
|
PartitionID: 200,
|
||||||
|
DeleteData: storage.DeleteData{
|
||||||
|
Pks: []storage.PrimaryKey{storage.NewInt64PrimaryKey(2)},
|
||||||
|
Tss: []uint64{20},
|
||||||
|
RowCount: 1,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
s.Equal(2, len(buffer.ListAfter(10)))
|
||||||
|
|
||||||
|
buffer.TryDiscard(10)
|
||||||
|
s.Equal(2, len(buffer.ListAfter(10)), "equal ts shall not discard block")
|
||||||
|
|
||||||
|
buffer.TryDiscard(9)
|
||||||
|
s.Equal(2, len(buffer.ListAfter(10)), "history ts shall not discard any block")
|
||||||
|
|
||||||
|
buffer.TryDiscard(20)
|
||||||
|
s.Equal(1, len(buffer.ListAfter(10)), "first block shall be discarded")
|
||||||
|
|
||||||
|
buffer.TryDiscard(20)
|
||||||
|
s.Equal(1, len(buffer.ListAfter(10)), "discard will not happen if there is only one block")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListDeleteBuffer(t *testing.T) {
|
||||||
|
suite.Run(t, new(ListDeleteBufferSuite))
|
||||||
|
}
|
|
@ -8,6 +8,8 @@ import (
|
||||||
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
|
internalpb "github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
mock "github.com/stretchr/testify/mock"
|
mock "github.com/stretchr/testify/mock"
|
||||||
|
|
||||||
|
msgpb "github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||||
|
|
||||||
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
|
querypb "github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
|
|
||||||
streamrpc "github.com/milvus-io/milvus/internal/util/streamrpc"
|
streamrpc "github.com/milvus-io/milvus/internal/util/streamrpc"
|
||||||
|
@ -724,9 +726,9 @@ func (_c *MockShardDelegator_SyncDistribution_Call) RunAndReturn(run func(contex
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
// SyncTargetVersion provides a mock function with given fields: newVersion, growingInTarget, sealedInTarget, droppedInTarget
|
// SyncTargetVersion provides a mock function with given fields: newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint
|
||||||
func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64) {
|
func (_m *MockShardDelegator) SyncTargetVersion(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition) {
|
||||||
_m.Called(newVersion, growingInTarget, sealedInTarget, droppedInTarget)
|
_m.Called(newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MockShardDelegator_SyncTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncTargetVersion'
|
// MockShardDelegator_SyncTargetVersion_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncTargetVersion'
|
||||||
|
@ -739,13 +741,14 @@ type MockShardDelegator_SyncTargetVersion_Call struct {
|
||||||
// - growingInTarget []int64
|
// - growingInTarget []int64
|
||||||
// - sealedInTarget []int64
|
// - sealedInTarget []int64
|
||||||
// - droppedInTarget []int64
|
// - droppedInTarget []int64
|
||||||
func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}) *MockShardDelegator_SyncTargetVersion_Call {
|
// - checkpoint *msgpb.MsgPosition
|
||||||
return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, growingInTarget, sealedInTarget, droppedInTarget)}
|
func (_e *MockShardDelegator_Expecter) SyncTargetVersion(newVersion interface{}, growingInTarget interface{}, sealedInTarget interface{}, droppedInTarget interface{}, checkpoint interface{}) *MockShardDelegator_SyncTargetVersion_Call {
|
||||||
|
return &MockShardDelegator_SyncTargetVersion_Call{Call: _e.mock.On("SyncTargetVersion", newVersion, growingInTarget, sealedInTarget, droppedInTarget, checkpoint)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64)) *MockShardDelegator_SyncTargetVersion_Call {
|
func (_c *MockShardDelegator_SyncTargetVersion_Call) Run(run func(newVersion int64, growingInTarget []int64, sealedInTarget []int64, droppedInTarget []int64, checkpoint *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
|
||||||
_c.Call.Run(func(args mock.Arguments) {
|
_c.Call.Run(func(args mock.Arguments) {
|
||||||
run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64))
|
run(args[0].(int64), args[1].([]int64), args[2].([]int64), args[3].([]int64), args[4].(*msgpb.MsgPosition))
|
||||||
})
|
})
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
@ -755,7 +758,7 @@ func (_c *MockShardDelegator_SyncTargetVersion_Call) Return() *MockShardDelegato
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64)) *MockShardDelegator_SyncTargetVersion_Call {
|
func (_c *MockShardDelegator_SyncTargetVersion_Call) RunAndReturn(run func(int64, []int64, []int64, []int64, *msgpb.MsgPosition)) *MockShardDelegator_SyncTargetVersion_Call {
|
||||||
_c.Call.Return(run)
|
_c.Call.Return(run)
|
||||||
return _c
|
return _c
|
||||||
}
|
}
|
||||||
|
|
|
@ -1377,7 +1377,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
||||||
pipeline.ExcludedSegments(droppedInfos)
|
pipeline.ExcludedSegments(droppedInfos)
|
||||||
}
|
}
|
||||||
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
|
shardDelegator.SyncTargetVersion(action.GetTargetVersion(), action.GetGrowingInTarget(),
|
||||||
action.GetSealedInTarget(), action.GetDroppedInTarget())
|
action.GetSealedInTarget(), action.GetDroppedInTarget(), action.GetCheckpoint())
|
||||||
default:
|
default:
|
||||||
return merr.Status(merr.WrapErrServiceInternal("unknown action type", action.GetType().String())), nil
|
return merr.Status(merr.WrapErrServiceInternal("unknown action type", action.GetType().String())), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -1771,6 +1771,7 @@ type queryNodeConfig struct {
|
||||||
|
|
||||||
// delete buffer
|
// delete buffer
|
||||||
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
|
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
|
||||||
|
DeleteBufferBlockSize ParamItem `refreshable:"false"`
|
||||||
|
|
||||||
// loader
|
// loader
|
||||||
IoPoolSize ParamItem `refreshable:"false"`
|
IoPoolSize ParamItem `refreshable:"false"`
|
||||||
|
@ -2121,6 +2122,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
|
||||||
}
|
}
|
||||||
p.MaxSegmentDeleteBuffer.Init(base.mgr)
|
p.MaxSegmentDeleteBuffer.Init(base.mgr)
|
||||||
|
|
||||||
|
p.DeleteBufferBlockSize = ParamItem{
|
||||||
|
Key: "queryNode.deleteBufferBlockSize",
|
||||||
|
Version: "2.3.5",
|
||||||
|
Doc: "delegator delete buffer block size when using list delete buffer",
|
||||||
|
DefaultValue: "1048576", // 1MB
|
||||||
|
}
|
||||||
|
p.DeleteBufferBlockSize.Init(base.mgr)
|
||||||
|
|
||||||
p.IoPoolSize = ParamItem{
|
p.IoPoolSize = ParamItem{
|
||||||
Key: "queryNode.ioPoolSize",
|
Key: "queryNode.ioPoolSize",
|
||||||
Version: "2.3.0",
|
Version: "2.3.0",
|
||||||
|
|
Loading…
Reference in New Issue