enhance: Add buffer forwarder for stream delta loading (#40559)

See also #40558
Related to #35303 & #38066 as well

This PR:
- Add `BufferedForward` to limit memory usage forwarding stream delete
- Add `UseLoad` flag to determine `Delete` shall use `segment.Delete` or
`segment.LoadDelta`
- Fix delegator accidentally use always true candidate while load
streaming delta

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/40610/head
congqixia 2025-03-17 15:24:10 +08:00 committed by GitHub
parent b3edb76516
commit 94a859c028
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 993 additions and 734 deletions

View File

@ -473,6 +473,7 @@ queryNode:
maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler
levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"]
streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"]
forwardBatchSize: 4194304 # the batch size delegator uses for forwarding stream delete in loading procedure
dataSync:
flowGraph:
maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node.

View File

@ -0,0 +1,101 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package delegator
import (
"context"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/v2/proto/querypb"
"github.com/milvus-io/milvus/pkg/v2/util/commonpbutil"
)
// BufferForwarder is a util object to buffer delta data
// when buffer size reaches the designed size,
// it shall forward buffered data to worker via Delete API.
type BufferForwarder struct {
bufferSize int64
buffer *storage.DeltaData
doSync func(pks storage.PrimaryKeys, tss []uint64) error
}
// NewBufferedForwarder creates a BufferForwarder with max size
// and `doSync` op function
func NewBufferedForwarder(bufferSize int64, doSync func(pks storage.PrimaryKeys, tss []uint64) error) *BufferForwarder {
return &BufferForwarder{
bufferSize: bufferSize,
buffer: storage.NewDeltaData(1024),
doSync: doSync,
}
}
// deleteViaWorker is the util func for doSync impl which calls worker.Delete
func deleteViaWorker(ctx context.Context,
worker cluster.Worker,
workerID int64,
info *querypb.SegmentLoadInfo,
deleteScope querypb.DataScope,
) func(pks storage.PrimaryKeys, tss []uint64) error {
return func(pks storage.PrimaryKeys, tss []uint64) error {
ids, err := storage.ParsePrimaryKeysBatch2IDs(pks)
if err != nil {
return err
}
return worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(workerID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: ids,
Timestamps: tss,
Scope: deleteScope,
UseLoad: true,
})
}
}
func (bf *BufferForwarder) Buffer(pk storage.PrimaryKey, ts uint64) error {
if err := bf.buffer.Append(pk, ts); err != nil {
return err
}
if bf.buffer.MemSize() > bf.bufferSize {
if err := bf.sync(); err != nil {
return err
}
}
return nil
}
func (bf *BufferForwarder) sync() error {
if bf.buffer.DeleteRowCount() == 0 {
return nil
}
if err := bf.doSync(bf.buffer.DeletePks(), bf.buffer.DeleteTimestamps()); err != nil {
return err
}
bf.buffer.Reset()
return nil
}
func (bf *BufferForwarder) Flush() error {
return bf.sync()
}

View File

@ -0,0 +1,87 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package delegator
import (
"testing"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus/internal/storage"
)
type BufferForwarderSuite struct {
suite.Suite
}
func (s *BufferForwarderSuite) TestNormalSync() {
s.Run("large_buffer", func() {
counter := atomic.NewInt64(0)
buf := NewBufferedForwarder(16*1024*1024, func(pks storage.PrimaryKeys, tss []uint64) error {
counter.Add(1)
return nil
})
err := buf.Buffer(storage.NewInt64PrimaryKey(100), 100)
s.NoError(err)
err = buf.Flush()
s.NoError(err)
s.EqualValues(1, counter.Load())
})
s.Run("small_buffer", func() {
counter := atomic.NewInt64(0)
buf := NewBufferedForwarder(1, func(pks storage.PrimaryKeys, tss []uint64) error {
counter.Add(1)
return nil
})
err := buf.Buffer(storage.NewInt64PrimaryKey(100), 100)
s.NoError(err)
err = buf.Flush()
s.NoError(err)
s.EqualValues(1, counter.Load())
})
}
func (s *BufferForwarderSuite) TestSyncFailure() {
s.Run("large_buffer", func() {
buf := NewBufferedForwarder(16*1024*1024, func(pks storage.PrimaryKeys, tss []uint64) error {
return errors.New("mocked")
})
err := buf.Buffer(storage.NewInt64PrimaryKey(100), 100)
s.NoError(err)
err = buf.Flush()
s.Error(err)
})
s.Run("small_buffer", func() {
buf := NewBufferedForwarder(1, func(pks storage.PrimaryKeys, tss []uint64) error {
return errors.New("mocked")
})
err := buf.Buffer(storage.NewInt64PrimaryKey(100), 100)
s.Error(err)
})
}
func TestBufferedForwarder(t *testing.T) {
suite.Run(t, new(BufferForwarderSuite))
}

View File

@ -541,10 +541,8 @@ func (sd *shardDelegator) LoadL0(ctx context.Context, infos []*querypb.SegmentLo
return nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) {
// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
func (sd *shardDelegator) rangeHitL0Deletions(partitionID int64, candidate pkoracle.Candidate, fn func(pk storage.PrimaryKey, ts uint64) error) error {
level0Segments := sd.deleteBuffer.ListL0()
deltaData := storage.NewDeltaData(0)
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
@ -561,16 +559,38 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
deltaData.Append(segmentPks[idx+i], segmentTss[idx+i])
if err := fn(segmentPks[idx+i], segmentTss[idx+i]); err != nil {
return err
}
}
}
}
}
}
return nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) {
deltaData := storage.NewDeltaData(0)
sd.rangeHitL0Deletions(partitionID, candidate, func(pk storage.PrimaryKey, ts uint64) error {
deltaData.Append(pk, ts)
return nil
})
return deltaData.DeletePks(), deltaData.DeleteTimestamps()
}
func (sd *shardDelegator) StreamForwardLevel0Deletions(bufferedForwarder *BufferForwarder, partitionID int64, candidate pkoracle.Candidate) error {
err := sd.rangeHitL0Deletions(partitionID, candidate, func(pk storage.PrimaryKey, ts uint64) error {
return bufferedForwarder.Buffer(pk, ts)
})
if err != nil {
return err
}
return bufferedForwarder.Flush()
}
func (sd *shardDelegator) RefreshLevel0DeletionStats() {
level0Segments := sd.deleteBuffer.ListL0()
totalSize := int64(0)
@ -621,10 +641,6 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
// apply buffered delete for new segments
// no goroutines here since qnv2 has no load merging logic
for _, info := range infos {
log := log.With(
zap.Int64("segmentID", info.GetSegmentID()),
zap.Time("startPosition", tsoutil.PhysicalTime(info.GetStartPosition().GetTimestamp())),
)
candidate := idCandidates[info.GetSegmentID()]
// after L0 segment feature
// growing segemnts should have load stream delete as well
@ -636,24 +652,12 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
deleteScope = querypb.DataScope_Streaming
}
deleteData := &storage.DeleteData{}
// start position is dml position for segment
// if this position is before deleteBuffer's safe ts, it means some delete shall be read from msgstream
// if info.GetStartPosition().GetTimestamp() < sd.deleteBuffer.SafeTs() {
// log.Info("load delete from stream...")
// streamDeleteData, err := sd.readDeleteFromMsgstream(ctx, info.GetStartPosition(), sd.deleteBuffer.SafeTs(), candidate)
// if err != nil {
// log.Warn("failed to read delete data from msgstream", zap.Error(err))
// return err
// }
bufferedForwarder := NewBufferedForwarder(paramtable.Get().QueryNodeCfg.ForwardBatchSize.GetAsInt64(),
deleteViaWorker(ctx, worker, targetNodeID, info, deleteScope))
// deleteData.Merge(streamDeleteData)
// log.Info("load delete from stream done")
// }
// list buffered delete
deleteRecords := sd.deleteBuffer.ListAfter(info.GetStartPosition().GetTimestamp())
tsHitDeleteRows := int64(0)
start := time.Now()
for _, entry := range deleteRecords {
for _, record := range entry.Data {
tsHitDeleteRows += int64(len(record.DeleteData.Pks))
@ -672,32 +676,18 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
hits := candidate.BatchPkExist(lc)
for i, hit := range hits {
if hit {
deleteData.Append(pks[idx+i], record.DeleteData.Tss[idx+i])
err := bufferedForwarder.Buffer(pks[idx+i], record.DeleteData.Tss[idx+i])
if err != nil {
return err
}
}
}
}
}
}
// if delete count not empty, apply
if deleteData.RowCount > 0 {
log.Info("forward delete to worker...",
zap.Int64("tsHitDeleteRowNum", tsHitDeleteRows),
zap.Int64("bfHitDeleteRowNum", deleteData.RowCount),
zap.Int64("bfCost", time.Since(start).Milliseconds()),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
err := bufferedForwarder.Flush()
if err != nil {
return err
}
}

View File

@ -121,7 +121,7 @@ func (b *listDeleteBuffer[T]) Put(entry T) {
tail := b.list[len(b.list)-1]
err := tail.Put(entry)
if errors.Is(err, errBufferFull) {
b.list = append(b.list, newCacheBlock[T](entry.Timestamp(), b.sizePerBlock, entry))
b.list = append(b.list, newCacheBlock(entry.Timestamp(), b.sizePerBlock, entry))
}
// update metrics

View File

@ -99,13 +99,18 @@ func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.
}
func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error {
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if deletedPks == nil || deletedPks.Len() == 0 {
return nil
bufferedForwarder := NewBufferedForwarder(paramtable.Get().QueryNodeCfg.ForwardBatchSize.GetAsInt64(),
func(pks storage.PrimaryKeys, tss []uint64) error {
return segment.Delete(ctx, pks, tss)
})
if err := sd.rangeHitL0Deletions(segment.Partition(), segment, func(pk storage.PrimaryKey, ts uint64) error {
return bufferedForwarder.Buffer(pk, ts)
}); err != nil {
return err
}
log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", deletedPks.Len()))
return segment.Delete(ctx, deletedPks, deletedTss)
return bufferedForwarder.Flush()
}
func (sd *shardDelegator) addL0ForGrowingLoad(ctx context.Context, segment segments.Segment) error {
@ -130,30 +135,10 @@ func (sd *shardDelegator) forwardL0ByBF(ctx context.Context,
deleteScope = querypb.DataScope_Streaming
}
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
if deletedPks != nil && deletedPks.Len() > 0 {
log.Info("forward L0 delete to worker...",
zap.Int("deleteRowNum", deletedPks.Len()),
)
pks, err := storage.ParsePrimaryKeysBatch2IDs(deletedPks)
if err != nil {
return err
}
err = worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: pks,
Timestamps: deletedTss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
}
return nil
bufferedForwarder := NewBufferedForwarder(paramtable.Get().QueryNodeCfg.ForwardBatchSize.GetAsInt64(),
deleteViaWorker(ctx, worker, targetNodeID, info, deleteScope))
return sd.StreamForwardLevel0Deletions(bufferedForwarder, candidate.Partition(), candidate)
}
func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context,

View File

@ -418,8 +418,8 @@ func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() {
s.Require().NoError(err)
s.delegator.deleteBuffer.RegisterL0(l0Segment)
seg.EXPECT().ID().Return(10000)
seg.EXPECT().Partition().Return(100)
seg.EXPECT().BatchPkExist(mock.Anything).Return(lo.RepeatBy(n, func(i int) bool { return true }))
seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error {
s.Equal(deltaData.DeletePks(), pk)
s.Equal(deltaData.DeleteTimestamps(), u)

View File

@ -1392,8 +1392,17 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
}
pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys())
var err error
for _, segment := range segments {
err := segment.Delete(ctx, pks, req.GetTimestamps())
if req.GetUseLoad() {
var dd *storage.DeltaData
dd, err = storage.NewDeltaDataWithData(pks, req.GetTimestamps())
if err == nil {
err = segment.LoadDeltaData(ctx, dd)
}
} else {
err = segment.Delete(ctx, pks, req.GetTimestamps())
}
if err != nil {
log.Warn("segment delete failed", zap.Error(err))
return merr.Status(err), nil

View File

@ -2207,6 +2207,38 @@ func (suite *ServiceSuite) TestDelete_Int64() {
suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode)
}
func (suite *ServiceSuite) TestDelete_Int64_UseLoad() {
ctx := context.Background()
// prepare
suite.TestWatchDmChannelsInt64()
suite.TestLoadSegments_Int64()
// data
req := &querypb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgID: rand.Int63(),
TargetID: suite.node.session.ServerID,
},
CollectionId: suite.collectionID,
PartitionId: suite.partitionIDs[0],
SegmentId: suite.validSegmentIDs[0],
VchannelName: suite.vchannel,
Timestamps: []uint64{0},
Scope: querypb.DataScope_Historical,
UseLoad: true,
}
// type int
req.PrimaryKeys = &schemapb.IDs{
IdField: &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: []int64{111},
},
},
}
status, err := suite.node.Delete(ctx, req)
suite.NoError(merr.CheckRPCCall(status, err))
}
func (suite *ServiceSuite) TestDelete_VarChar() {
ctx := context.Background()
// prepare

View File

@ -100,6 +100,12 @@ func (dd *DeltaData) MemSize() int64 {
return result
}
func (dd *DeltaData) Reset() {
dd.deletePks.Reset()
dd.deleteTimestamps = dd.deleteTimestamps[:0]
dd.delRowCount = 0
}
func NewDeltaData(cap int64) *DeltaData {
return &DeltaData{
deleteTimestamps: make([]Timestamp, 0, cap),
@ -116,6 +122,23 @@ func NewDeltaDataWithPkType(cap int64, pkType schemapb.DataType) (*DeltaData, er
return result, nil
}
func NewDeltaDataWithData(pks PrimaryKeys, tss []uint64) (*DeltaData, error) {
if pks.Len() != len(tss) {
return nil, merr.WrapErrParameterInvalidMsg("length of pks and tss not equal")
}
dd := &DeltaData{
deletePks: pks,
deleteTimestamps: tss,
delRowCount: int64(pks.Len()),
}
dd.typeInitOnce.Do(func() {
dd.pkType = pks.Type()
})
return dd, nil
}
type DeleteLog struct {
Pk PrimaryKey `json:"pk"`
Ts uint64 `json:"ts"`

View File

@ -32,6 +32,7 @@ type PrimaryKeys interface {
Size() int64
Len() int
MustMerge(pks PrimaryKeys)
Reset()
}
type Int64PrimaryKeys struct {
@ -92,6 +93,10 @@ func (pks *Int64PrimaryKeys) MustMerge(another PrimaryKeys) {
pks.values = append(pks.values, aPks.values...)
}
func (pks *Int64PrimaryKeys) Reset() {
pks.values = pks.values[:0]
}
type VarcharPrimaryKeys struct {
values []string
size int64
@ -156,3 +161,8 @@ func (pks *VarcharPrimaryKeys) MustMerge(another PrimaryKeys) {
pks.values = append(pks.values, aPks.values...)
pks.size += aPks.size
}
func (pks *VarcharPrimaryKeys) Reset() {
pks.values = pks.values[:0]
pks.size = 0
}

View File

@ -790,6 +790,7 @@ message DeleteRequest {
schema.IDs primary_keys = 6;
repeated uint64 timestamps = 7;
DataScope scope = 8;
bool use_load = 9;
}
message DeleteBatchRequest {

File diff suppressed because it is too large Load Diff

View File

@ -2669,6 +2669,7 @@ type queryNodeConfig struct {
// delta forward
LevelZeroForwardPolicy ParamItem `refreshable:"true"`
StreamingDeltaForwardPolicy ParamItem `refreshable:"true"`
ForwardBatchSize ParamItem `refreshable:"true"`
// loader
IoPoolSize ParamItem `refreshable:"false"`
@ -3277,6 +3278,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
}
p.StreamingDeltaForwardPolicy.Init(base.mgr)
p.ForwardBatchSize = ParamItem{
Key: "queryNode.forwardBatchSize",
Version: "2.5.7",
Doc: "the batch size delegator uses for forwarding stream delete in loading procedure",
DefaultValue: "4194304", // 4MB
Export: true,
}
p.ForwardBatchSize.Init(base.mgr)
p.IoPoolSize = ParamItem{
Key: "queryNode.ioPoolSize",
Version: "2.3.0",