enhance: [2.4] Add streaming forward policy switch for delegator (#36330) (#36712)

Cherry pick from master
pr: #36330
Related to #35303

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/36722/head
congqixia 2024-10-09 17:41:20 +08:00 committed by GitHub
parent 05f96f5298
commit 3a80d1f602
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 625 additions and 227 deletions

View File

@ -454,6 +454,7 @@ queryNode:
enableCrossUserGrouping: false # Enable Cross user grouping when using user-task-polling policy. (Disable it if user's task can not merge each other)
maxPendingTaskPerUser: 1024 # Max pending task per user in scheduler
levelZeroForwardPolicy: FilterByBF # delegator level zero deletion forward policy, possible option["FilterByBF", "RemoteLoad"]
streamingDeltaForwardPolicy: FilterByBF # delegator streaming deletion forward policy, possible option["FilterByBF", "Direct"]
dataSync:
flowGraph:
maxQueueLength: 16 # The maximum size of task queue cache in flow graph in query node.

View File

@ -198,84 +198,10 @@ func (sd *shardDelegator) ProcessDelete(deleteData []*DeleteData, ts uint64) {
Data: cacheItems,
})
start := time.Now()
retMap := sd.applyBFInParallel(deleteData, segments.GetBFApplyPool())
// segment => delete data
delRecords := make(map[int64]DeleteData)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
segmentID2Hits := value.Segment2Hits
sd.forwardStreamingDeletion(context.Background(), deleteData)
pks := deleteData[value.DeleteDataIdx].PrimaryKeys
tss := deleteData[value.DeleteDataIdx].Timestamps
for segmentID, hits := range segmentID2Hits {
for i, hit := range hits {
if hit {
delRecord := delRecords[segmentID]
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[startIdx+i])
delRecord.Timestamps = append(delRecord.Timestamps, tss[startIdx+i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
}
}
}
return true
})
bfCost := time.Since(start)
offlineSegments := typeutil.NewConcurrentSet[int64]()
sealed, growing, version := sd.distribution.PinOnlineSegments()
start = time.Now()
eg, ctx := errgroup.WithContext(context.Background())
for _, entry := range sealed {
entry := entry
eg.Go(func() error {
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
if err != nil {
log.Warn("failed to get worker",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// skip if node down
// delete will be processed after loaded again
return nil
}
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, delRecords, entry.Segments, querypb.DataScope_Historical)...)
return nil
})
}
if len(growing) > 0 {
eg.Go(func() error {
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
if err != nil {
log.Error("failed to get worker(local)",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// panic here, local worker shall not have error
panic(err)
}
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, delRecords, growing, querypb.DataScope_Streaming)...)
return nil
})
}
// not error return in apply delete
_ = eg.Wait()
forwardDeleteCost := time.Since(start)
sd.distribution.Unpin(version)
offlineSegIDs := offlineSegments.Collect()
if len(offlineSegIDs) > 0 {
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))
sd.markSegmentOffline(offlineSegIDs...)
}
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.QueryNodeApplyBFCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(bfCost.Milliseconds()))
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
metrics.QueryNodeProcessCost.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.DeleteLabel).
Observe(float64(tr.ElapseSpan().Milliseconds()))
}
type BatchApplyRet = struct {
@ -320,7 +246,13 @@ func (sd *shardDelegator) applyBFInParallel(deleteDatas []*DeleteData, pool *con
}
// applyDelete handles delete record and apply them to corresponding workers.
func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker cluster.Worker, delRecords map[int64]DeleteData, entries []SegmentEntry, scope querypb.DataScope) []int64 {
func (sd *shardDelegator) applyDelete(ctx context.Context,
nodeID int64,
worker cluster.Worker,
delRecords func(segmentID int64) (DeleteData, bool),
entries []SegmentEntry,
scope querypb.DataScope,
) []int64 {
offlineSegments := typeutil.NewConcurrentSet[int64]()
log := sd.getLogger(ctx)
@ -333,7 +265,7 @@ func (sd *shardDelegator) applyDelete(ctx context.Context, nodeID int64, worker
var futures []*conc.Future[struct{}]
for _, segmentEntry := range entries {
segmentEntry := segmentEntry
delRecord, ok := delRecords[segmentEntry.SegmentID]
delRecord, ok := delRecords(segmentEntry.SegmentID)
log := log.With(
zap.Int64("segmentID", segmentEntry.SegmentID),
zap.Int64("workerID", nodeID),

View File

@ -1,146 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package delegator
import (
"context"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
L0ForwardPolicyDefault = ``
L0ForwardPolicyBF = `FilterByBF`
L0ForwardPolicyRemoteLoad = `RemoteLoad`
)
func (sd *shardDelegator) forwardL0Deletion(ctx context.Context,
info *querypb.SegmentLoadInfo,
req *querypb.LoadSegmentsRequest,
candidate *pkoracle.BloomFilterSet,
targetNodeID int64,
worker cluster.Worker,
) error {
switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy {
case L0ForwardPolicyDefault, L0ForwardPolicyBF:
return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker)
case L0ForwardPolicyRemoteLoad:
return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker)
default:
return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy)
}
}
func (sd *shardDelegator) forwardL0ByBF(ctx context.Context,
info *querypb.SegmentLoadInfo,
candidate *pkoracle.BloomFilterSet,
targetNodeID int64,
worker cluster.Worker,
) error {
// after L0 segment feature
// growing segemnts should have load stream delete as well
deleteScope := querypb.DataScope_All
switch candidate.Type() {
case commonpb.SegmentState_Sealed:
deleteScope = querypb.DataScope_Historical
case commonpb.SegmentState_Growing:
deleteScope = querypb.DataScope_Streaming
}
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
}
return nil
}
func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context,
info *querypb.SegmentLoadInfo,
req *querypb.LoadSegmentsRequest,
targetNodeID int64,
worker cluster.Worker,
) error {
info = typeutil.Clone(info)
// load l0 segment deltalogs
info.Deltalogs = sd.getLevel0Deltalogs(info.GetPartitionID())
return worker.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: targetNodeID,
},
DstNodeID: targetNodeID,
Infos: []*querypb.SegmentLoadInfo{
info,
},
CollectionID: info.GetCollectionID(),
LoadScope: querypb.LoadScope_Delta,
Schema: req.GetSchema(),
IndexInfoList: req.GetIndexInfoList(),
})
}
func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
level0Segments := sd.segmentManager.GetBy(
segments.WithLevel(datapb.SegmentLevel_L0),
segments.WithChannel(sd.vchannelName))
var deltalogs []*datapb.FieldBinlog
for _, segment := range level0Segments {
if segment.Partition() != common.AllPartitionsID && segment.Partition() != partitionID {
continue
}
segment := segment.(*segments.L0Segment)
deltalogs = append(deltalogs, segment.LoadInfo().GetDeltalogs()...)
}
return deltalogs
}

View File

@ -0,0 +1,334 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package delegator
import (
"context"
"fmt"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
ForwardPolicyDefault = ``
L0ForwardPolicyBF = `FilterByBF`
L0ForwardPolicyRemoteLoad = `RemoteLoad`
StreamingForwardPolicyBF = `FilterByBF`
StreamingForwardPolicyDirect = `Direct`
)
func (sd *shardDelegator) forwardL0Deletion(ctx context.Context,
info *querypb.SegmentLoadInfo,
req *querypb.LoadSegmentsRequest,
candidate *pkoracle.BloomFilterSet,
targetNodeID int64,
worker cluster.Worker,
) error {
switch policy := paramtable.Get().QueryNodeCfg.LevelZeroForwardPolicy.GetValue(); policy {
case ForwardPolicyDefault, L0ForwardPolicyBF:
return sd.forwardL0ByBF(ctx, info, candidate, targetNodeID, worker)
case L0ForwardPolicyRemoteLoad:
return sd.forwardL0RemoteLoad(ctx, info, req, targetNodeID, worker)
default:
return merr.WrapErrServiceInternal("Unknown l0 forward policy: %s", policy)
}
}
func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteData []*DeleteData) {
// TODO add `auto` policy
// using direct when streaming size is too large
// need some experimental data to support this policy
switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy {
case ForwardPolicyDefault, StreamingForwardPolicyBF:
sd.forwardStreamingByBF(ctx, deleteData)
case StreamingForwardPolicyDirect:
// forward streaming deletion without bf filtering
sd.forwardStreamingDirect(ctx, deleteData)
default:
log.Fatal("unsupported streaming forward policy", zap.String("policy", policy))
}
}
func (sd *shardDelegator) forwardL0ByBF(ctx context.Context,
info *querypb.SegmentLoadInfo,
candidate *pkoracle.BloomFilterSet,
targetNodeID int64,
worker cluster.Worker,
) error {
// after L0 segment feature
// growing segemnts should have load stream delete as well
deleteScope := querypb.DataScope_All
switch candidate.Type() {
case commonpb.SegmentState_Sealed:
deleteScope = querypb.DataScope_Historical
case commonpb.SegmentState_Growing:
deleteScope = querypb.DataScope_Streaming
}
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
)
err := worker.Delete(ctx, &querypb.DeleteRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
CollectionId: info.GetCollectionID(),
PartitionId: info.GetPartitionID(),
SegmentId: info.GetSegmentID(),
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
Timestamps: deleteData.Tss,
Scope: deleteScope,
})
if err != nil {
log.Warn("failed to apply delete when LoadSegment", zap.Error(err))
return err
}
}
return nil
}
func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context,
info *querypb.SegmentLoadInfo,
req *querypb.LoadSegmentsRequest,
targetNodeID int64,
worker cluster.Worker,
) error {
info = typeutil.Clone(info)
// load l0 segment deltalogs
info.Deltalogs = sd.getLevel0Deltalogs(info.GetPartitionID())
return worker.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: &commonpb.MsgBase{
TargetID: targetNodeID,
},
DstNodeID: targetNodeID,
Infos: []*querypb.SegmentLoadInfo{
info,
},
CollectionID: info.GetCollectionID(),
LoadScope: querypb.LoadScope_Delta,
Schema: req.GetSchema(),
IndexInfoList: req.GetIndexInfoList(),
})
}
func (sd *shardDelegator) getLevel0Deltalogs(partitionID int64) []*datapb.FieldBinlog {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
level0Segments := sd.segmentManager.GetBy(
segments.WithLevel(datapb.SegmentLevel_L0),
segments.WithChannel(sd.vchannelName))
var deltalogs []*datapb.FieldBinlog
for _, segment := range level0Segments {
if segment.Partition() != common.AllPartitionsID && segment.Partition() != partitionID {
continue
}
segment := segment.(*segments.L0Segment)
deltalogs = append(deltalogs, segment.LoadInfo().GetDeltalogs()...)
}
return deltalogs
}
func (sd *shardDelegator) forwardStreamingByBF(ctx context.Context, deleteData []*DeleteData) {
start := time.Now()
retMap := sd.applyBFInParallel(deleteData, segments.GetBFApplyPool())
// segment => delete data
delRecords := make(map[int64]DeleteData)
retMap.Range(func(key int, value *BatchApplyRet) bool {
startIdx := value.StartIdx
pk2SegmentIDs := value.Segment2Hits
pks := deleteData[value.DeleteDataIdx].PrimaryKeys
tss := deleteData[value.DeleteDataIdx].Timestamps
for segmentID, hits := range pk2SegmentIDs {
for i, hit := range hits {
if hit {
delRecord := delRecords[segmentID]
delRecord.PrimaryKeys = append(delRecord.PrimaryKeys, pks[startIdx+i])
delRecord.Timestamps = append(delRecord.Timestamps, tss[startIdx+i])
delRecord.RowCount++
delRecords[segmentID] = delRecord
}
}
}
return true
})
bfCost := time.Since(start)
offlineSegments := typeutil.NewConcurrentSet[int64]()
sealed, growing, version := sd.distribution.PinOnlineSegments()
start = time.Now()
eg, ctx := errgroup.WithContext(context.Background())
for _, entry := range sealed {
entry := entry
eg.Go(func() error {
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
if err != nil {
log.Warn("failed to get worker",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// skip if node down
// delete will be processed after loaded again
return nil
}
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) {
data, ok := delRecords[segmentID]
return data, ok
}, entry.Segments, querypb.DataScope_Historical)...)
return nil
})
}
if len(growing) > 0 {
eg.Go(func() error {
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
if err != nil {
log.Error("failed to get worker(local)",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// panic here, local worker shall not have error
panic(err)
}
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) {
data, ok := delRecords[segmentID]
return data, ok
}, growing, querypb.DataScope_Streaming)...)
return nil
})
}
// not error return in apply delete
_ = eg.Wait()
forwardDeleteCost := time.Since(start)
sd.distribution.Unpin(version)
offlineSegIDs := offlineSegments.Collect()
if len(offlineSegIDs) > 0 {
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))
sd.markSegmentOffline(offlineSegIDs...)
}
metrics.QueryNodeApplyBFCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(bfCost.Milliseconds()))
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
}
func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData []*DeleteData) {
start := time.Now()
// group by partition id
groups := lo.GroupBy(deleteData, func(delData *DeleteData) int64 {
return delData.PartitionID
})
offlineSegments := typeutil.NewConcurrentSet[int64]()
eg, ctx := errgroup.WithContext(ctx)
for partitionID, group := range groups {
partitionID := partitionID
group := group
eg.Go(func() error {
partitions := []int64{partitionID}
// check if all partitions
if partitionID == common.AllPartitionsID {
partitions = []int64{}
}
sealed, growing, version := sd.distribution.PinOnlineSegments(partitions...)
defer sd.distribution.Unpin(version)
for _, item := range group {
deleteData := *item
for _, entry := range sealed {
entry := entry
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
if err != nil {
log.Warn("failed to get worker",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// skip if node down
// delete will be processed after loaded again
continue
}
eg.Go(func() error {
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) {
return deleteData, true
}, entry.Segments, querypb.DataScope_Historical)...)
return nil
})
}
if len(growing) > 0 {
worker, err := sd.workerManager.GetWorker(ctx, paramtable.GetNodeID())
if err != nil {
log.Error("failed to get worker(local)",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Error(err),
)
// panic here, local worker shall not have error
panic(err)
}
eg.Go(func() error {
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) {
return deleteData, true
}, growing, querypb.DataScope_Streaming)...)
return nil
})
}
}
return nil
})
}
// not error return in apply delete
_ = eg.Wait()
forwardDeleteCost := time.Since(start)
offlineSegIDs := offlineSegments.Collect()
if len(offlineSegIDs) > 0 {
log.Warn("failed to apply delete, mark segment offline", zap.Int64s("offlineSegments", offlineSegIDs))
sd.markSegmentOffline(offlineSegIDs...)
}
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
}

View File

@ -0,0 +1,267 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package delegator
import (
"context"
"testing"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/segcorepb"
"github.com/milvus-io/milvus/internal/querynodev2/cluster"
"github.com/milvus-io/milvus/internal/querynodev2/pkoracle"
"github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tsafe"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type StreamingForwardSuite struct {
suite.Suite
collectionID int64
partitionIDs []int64
replicaID int64
vchannelName string
version int64
workerManager *cluster.MockManager
manager *segments.Manager
tsafeManager tsafe.Manager
loader *segments.MockLoader
mq *msgstream.MockMsgStream
delegator *shardDelegator
chunkManager storage.ChunkManager
rootPath string
}
func (s *StreamingForwardSuite) SetupSuite() {
paramtable.Init()
paramtable.SetNodeID(1)
}
func (s *StreamingForwardSuite) SetupTest() {
s.collectionID = 1000
s.partitionIDs = []int64{500, 501}
s.replicaID = 65535
s.vchannelName = "rootcoord-dml_1000_v0"
s.version = 2000
s.workerManager = &cluster.MockManager{}
s.manager = segments.NewManager()
s.tsafeManager = tsafe.NewTSafeReplica()
s.loader = &segments.MockLoader{}
s.loader.EXPECT().
Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything).
Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment {
return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment {
ms := &segments.MockSegment{}
ms.EXPECT().ID().Return(info.GetSegmentID())
ms.EXPECT().Type().Return(segments.SegmentTypeGrowing)
ms.EXPECT().Partition().Return(info.GetPartitionID())
ms.EXPECT().Collection().Return(info.GetCollectionID())
ms.EXPECT().Indexes().Return(nil)
ms.EXPECT().RowNum().Return(info.GetNumOfRows())
ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil)
return ms
})
}, nil)
// init schema
s.manager.Collection.PutOrRef(s.collectionID, &schemapb.CollectionSchema{
Name: "TestCollection",
Fields: []*schemapb.FieldSchema{
{
Name: "id",
FieldID: 100,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
},
{
Name: "vector",
FieldID: 101,
IsPrimaryKey: false,
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
},
},
}, &segcorepb.CollectionIndexMeta{
MaxIndexRowCount: 100,
IndexMetas: []*segcorepb.FieldIndexMeta{
{
FieldID: 101,
CollectionID: s.collectionID,
IndexName: "binary_index",
TypeParams: []*commonpb.KeyValuePair{
{
Key: common.DimKey,
Value: "128",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: common.IndexTypeKey,
Value: "BIN_IVF_FLAT",
},
{
Key: common.MetricTypeKey,
Value: metric.JACCARD,
},
},
},
},
}, &querypb.LoadMetaInfo{
PartitionIDs: s.partitionIDs,
})
s.mq = &msgstream.MockMsgStream{}
s.rootPath = "delegator_test"
// init chunkManager
chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath)
s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background())
delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{
NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) {
return s.mq, nil
},
}, 10000, nil, s.chunkManager)
s.Require().NoError(err)
sd, ok := delegator.(*shardDelegator)
s.Require().True(ok)
s.delegator = sd
}
func (s *StreamingForwardSuite) TestBFStreamingForward() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key, StreamingForwardPolicyBF)
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key)
delegator := s.delegator
// Setup distribution
delegator.distribution.AddGrowing(SegmentEntry{
NodeID: 1,
SegmentID: 100,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 101,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 102,
})
delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil)
// Setup pk oracle
// empty bfs will not match
delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(100, 10, commonpb.SegmentState_Growing), 1)
delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(102, 10, commonpb.SegmentState_Sealed), 1)
// candidate key alway match
delegator.pkOracle.Register(pkoracle.NewCandidateKey(101, 10, commonpb.SegmentState_Sealed), 1)
deletedSegment := typeutil.NewConcurrentSet[int64]()
mockWorker := cluster.NewMockWorker(s.T())
s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil)
mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error {
s.T().Log(dr.GetSegmentId())
deletedSegment.Insert(dr.SegmentId)
s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData())
s.ElementsMatch([]uint64{10}, dr.GetTimestamps())
return nil
}).Maybe()
delegator.ProcessDelete([]*DeleteData{
{
PartitionID: -1,
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
Timestamps: []uint64{10},
RowCount: 1,
},
}, 100)
s.ElementsMatch([]int64{101}, deletedSegment.Collect())
}
func (s *StreamingForwardSuite) TestDirectStreamingForward() {
paramtable.Get().Save(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key, StreamingForwardPolicyDirect)
defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.Key)
delegator := s.delegator
// Setup distribution
delegator.distribution.AddGrowing(SegmentEntry{
NodeID: 1,
SegmentID: 100,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 101,
})
delegator.distribution.AddDistributions(SegmentEntry{
NodeID: 1,
SegmentID: 102,
})
delegator.distribution.SyncTargetVersion(1, []int64{100}, []int64{101, 102}, nil)
// Setup pk oracle
// empty bfs will not match
delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(100, 10, commonpb.SegmentState_Growing), 1)
delegator.pkOracle.Register(pkoracle.NewBloomFilterSet(102, 10, commonpb.SegmentState_Sealed), 1)
// candidate key alway match
delegator.pkOracle.Register(pkoracle.NewCandidateKey(101, 10, commonpb.SegmentState_Sealed), 1)
deletedSegment := typeutil.NewConcurrentSet[int64]()
mockWorker := cluster.NewMockWorker(s.T())
s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil)
mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error {
s.T().Log(dr.GetSegmentId())
deletedSegment.Insert(dr.SegmentId)
s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData())
s.ElementsMatch([]uint64{10}, dr.GetTimestamps())
return nil
})
delegator.ProcessDelete([]*DeleteData{
{
PartitionID: -1,
PrimaryKeys: []storage.PrimaryKey{storage.NewInt64PrimaryKey(10)},
Timestamps: []uint64{10},
RowCount: 1,
},
}, 100)
s.ElementsMatch([]int64{100, 101, 102}, deletedSegment.Collect())
}
func TestStreamingForward(t *testing.T) {
suite.Run(t, new(StreamingForwardSuite))
}

View File

@ -2382,8 +2382,9 @@ type queryNodeConfig struct {
MaxSegmentDeleteBuffer ParamItem `refreshable:"false"`
DeleteBufferBlockSize ParamItem `refreshable:"false"`
// level zero
LevelZeroForwardPolicy ParamItem `refreshable:"true"`
// delta forward
LevelZeroForwardPolicy ParamItem `refreshable:"true"`
StreamingDeltaForwardPolicy ParamItem `refreshable:"true"`
// loader
IoPoolSize ParamItem `refreshable:"false"`
@ -2983,6 +2984,15 @@ Max read concurrency must greater than or equal to 1, and less than or equal to
}
p.LevelZeroForwardPolicy.Init(base.mgr)
p.StreamingDeltaForwardPolicy = ParamItem{
Key: "queryNode.streamingDeltaForwardPolicy",
Version: "2.4.12",
Doc: "delegator streaming deletion forward policy, possible option[\"FilterByBF\", \"Direct\"]",
DefaultValue: "FilterByBF",
Export: true,
}
p.StreamingDeltaForwardPolicy.Init(base.mgr)
p.IoPoolSize = ParamItem{
Key: "queryNode.ioPoolSize",
Version: "2.3.0",