enhance: Batch forward delete when using DirectForward (#37076)

Relatedt #36887

DirectFoward streaming delete will cause memory usage explode if the
segments number was large. This PR add batching delete API and using it
for direct forward implementation.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
pull/37124/head
congqixia 2024-10-24 10:39:28 +08:00 committed by GitHub
parent f78f61129a
commit f43527ef6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 653 additions and 45 deletions

View File

@ -332,3 +332,16 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr
return client.Delete(ctx, req)
})
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest, _ ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(c.nodeID),
)
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.DeleteBatchResponse, error) {
return client.DeleteBatch(ctx, req)
})
}

View File

@ -108,6 +108,9 @@ func Test_NewClient(t *testing.T) {
r20, err := client.SearchSegments(ctx, nil)
retCheck(retNotNil, r20, err)
r21, err := client.DeleteBatch(ctx, nil)
retCheck(retNotNil, r21, err)
// stream rpc
client, err := client.QueryStream(ctx, nil)
retCheck(retNotNil, client, err)

View File

@ -383,3 +383,9 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu
func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) {
return s.querynode.Delete(ctx, req)
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
return s.querynode.DeleteBatch(ctx, req)
}

View File

@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -270,6 +271,15 @@ func Test_NewServer(t *testing.T) {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("DeleteBatch", func(t *testing.T) {
mockQN.EXPECT().DeleteBatch(mock.Anything, mock.Anything).Return(&querypb.DeleteBatchResponse{
Status: merr.Success(),
}, nil)
resp, err := server.DeleteBatch(ctx, &querypb.DeleteBatchRequest{})
assert.NoError(t, merr.CheckRPCCall(resp, err))
})
err = server.Stop()
assert.NoError(t, err)
}

View File

@ -89,6 +89,65 @@ func (_c *MockQueryNode_Delete_Call) RunAndReturn(run func(context.Context, *que
return _c
}
// DeleteBatch provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNode) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for DeleteBatch")
}
var r0 *querypb.DeleteBatchResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNode_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
type MockQueryNode_DeleteBatch_Call struct {
*mock.Call
}
// DeleteBatch is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.DeleteBatchRequest
func (_e *MockQueryNode_Expecter) DeleteBatch(_a0 interface{}, _a1 interface{}) *MockQueryNode_DeleteBatch_Call {
return &MockQueryNode_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", _a0, _a1)}
}
func (_c *MockQueryNode_DeleteBatch_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeleteBatchRequest)) *MockQueryNode_DeleteBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest))
})
return _c
}
func (_c *MockQueryNode_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNode_DeleteBatch_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNode_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockQueryNode_DeleteBatch_Call {
_c.Call.Return(run)
return _c
}
// GetAddress provides a mock function with given fields:
func (_m *MockQueryNode) GetAddress() string {
ret := _m.Called()

View File

@ -150,6 +150,80 @@ func (_c *MockQueryNodeClient_Delete_Call) RunAndReturn(run func(context.Context
return _c
}
// DeleteBatch provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
if len(ret) == 0 {
panic("no return value specified for DeleteBatch")
}
var r0 *querypb.DeleteBatchResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) (*querypb.DeleteBatchResponse, error)); ok {
return rf(ctx, in, opts...)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) *querypb.DeleteBatchResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeClient_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
type MockQueryNodeClient_DeleteBatch_Call struct {
*mock.Call
}
// DeleteBatch is a helper method to define mock.On call
// - ctx context.Context
// - in *querypb.DeleteBatchRequest
// - opts ...grpc.CallOption
func (_e *MockQueryNodeClient_Expecter) DeleteBatch(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_DeleteBatch_Call {
return &MockQueryNodeClient_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch",
append([]interface{}{ctx, in}, opts...)...)}
}
func (_c *MockQueryNodeClient_DeleteBatch_Call) Run(run func(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_DeleteBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
variadicArgs := make([]grpc.CallOption, len(args)-2)
for i, a := range args[2:] {
if a != nil {
variadicArgs[i] = a.(grpc.CallOption)
}
}
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest), variadicArgs...)
})
return _c
}
func (_c *MockQueryNodeClient_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNodeClient_DeleteBatch_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeClient_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest, ...grpc.CallOption) (*querypb.DeleteBatchResponse, error)) *MockQueryNodeClient_DeleteBatch_Call {
_c.Call.Return(run)
return _c
}
// GetComponentStates provides a mock function with given fields: ctx, in, opts
func (_m *MockQueryNodeClient) GetComponentStates(ctx context.Context, in *milvuspb.GetComponentStatesRequest, opts ...grpc.CallOption) (*milvuspb.ComponentStates, error) {
_va := make([]interface{}, len(opts))

View File

@ -172,6 +172,10 @@ service QueryNode {
}
rpc Delete(DeleteRequest) returns (common.Status) {
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) {
}
}
// --------------------QueryCoord grpc request and response proto------------------
@ -777,6 +781,25 @@ message DeleteRequest {
DataScope scope = 8;
}
message DeleteBatchRequest {
common.MsgBase base = 1;
int64 collection_id = 2;
int64 partition_id = 3;
string vchannel_name = 4;
repeated int64 segment_ids = 5;
schema.IDs primary_keys = 6;
repeated uint64 timestamps = 7;
DataScope scope = 8;
}
// DeleteBatchResponse returns failed/missing segment ids
// cannot just using common.Status to handle partial failure logic
message DeleteBatchResponse {
common.Status status = 1;
repeated int64 failed_ids = 2;
repeated int64 missing_ids = 3;
}
message ActivateCheckerRequest {
common.MsgBase base = 1;
int32 checkerID = 2;

View File

@ -88,6 +88,65 @@ func (_c *MockQueryNodeServer_Delete_Call) RunAndReturn(run func(context.Context
return _c
}
// DeleteBatch provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) DeleteBatch(_a0 context.Context, _a1 *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for DeleteBatch")
}
var r0 *querypb.DeleteBatchResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockQueryNodeServer_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
type MockQueryNodeServer_DeleteBatch_Call struct {
*mock.Call
}
// DeleteBatch is a helper method to define mock.On call
// - _a0 context.Context
// - _a1 *querypb.DeleteBatchRequest
func (_e *MockQueryNodeServer_Expecter) DeleteBatch(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_DeleteBatch_Call {
return &MockQueryNodeServer_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", _a0, _a1)}
}
func (_c *MockQueryNodeServer_DeleteBatch_Call) Run(run func(_a0 context.Context, _a1 *querypb.DeleteBatchRequest)) *MockQueryNodeServer_DeleteBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest))
})
return _c
}
func (_c *MockQueryNodeServer_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockQueryNodeServer_DeleteBatch_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockQueryNodeServer_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockQueryNodeServer_DeleteBatch_Call {
_c.Call.Return(run)
return _c
}
// GetComponentStates provides a mock function with given fields: _a0, _a1
func (_m *MockQueryNodeServer) GetComponentStates(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
ret := _m.Called(_a0, _a1)

View File

@ -73,6 +73,65 @@ func (_c *MockWorker_Delete_Call) RunAndReturn(run func(context.Context, *queryp
return _c
}
// DeleteBatch provides a mock function with given fields: ctx, req
func (_m *MockWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
ret := _m.Called(ctx, req)
if len(ret) == 0 {
panic("no return value specified for DeleteBatch")
}
var r0 *querypb.DeleteBatchResponse
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)); ok {
return rf(ctx, req)
}
if rf, ok := ret.Get(0).(func(context.Context, *querypb.DeleteBatchRequest) *querypb.DeleteBatchResponse); ok {
r0 = rf(ctx, req)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*querypb.DeleteBatchResponse)
}
}
if rf, ok := ret.Get(1).(func(context.Context, *querypb.DeleteBatchRequest) error); ok {
r1 = rf(ctx, req)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// MockWorker_DeleteBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteBatch'
type MockWorker_DeleteBatch_Call struct {
*mock.Call
}
// DeleteBatch is a helper method to define mock.On call
// - ctx context.Context
// - req *querypb.DeleteBatchRequest
func (_e *MockWorker_Expecter) DeleteBatch(ctx interface{}, req interface{}) *MockWorker_DeleteBatch_Call {
return &MockWorker_DeleteBatch_Call{Call: _e.mock.On("DeleteBatch", ctx, req)}
}
func (_c *MockWorker_DeleteBatch_Call) Run(run func(ctx context.Context, req *querypb.DeleteBatchRequest)) *MockWorker_DeleteBatch_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(*querypb.DeleteBatchRequest))
})
return _c
}
func (_c *MockWorker_DeleteBatch_Call) Return(_a0 *querypb.DeleteBatchResponse, _a1 error) *MockWorker_DeleteBatch_Call {
_c.Call.Return(_a0, _a1)
return _c
}
func (_c *MockWorker_DeleteBatch_Call) RunAndReturn(run func(context.Context, *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)) *MockWorker_DeleteBatch_Call {
_c.Call.Return(run)
return _c
}
// GetStatistics provides a mock function with given fields: ctx, req
func (_m *MockWorker) GetStatistics(ctx context.Context, req *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) {
ret := _m.Called(ctx, req)

View File

@ -39,6 +39,7 @@ type Worker interface {
LoadSegments(context.Context, *querypb.LoadSegmentsRequest) error
ReleaseSegments(context.Context, *querypb.ReleaseSegmentsRequest) error
Delete(ctx context.Context, req *querypb.DeleteRequest) error
DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error)
SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error)
QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error)
QueryStreamSegments(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) error
@ -141,6 +142,52 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e
return nil
}
func (w *remoteWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("workerID", req.GetBase().GetTargetID()),
)
client := w.getClient()
resp, err := client.DeleteBatch(ctx, req)
if err := merr.CheckRPCCall(resp, err); err != nil {
if errors.Is(err, merr.ErrServiceUnimplemented) {
log.Warn("invoke legacy querynode DeleteBatch method, fallback to ")
return w.splitDeleteBatch(ctx, req)
}
return nil, err
}
return resp, nil
}
func (w *remoteWorker) splitDeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
sReq := &querypb.DeleteRequest{
CollectionId: req.GetCollectionId(),
PartitionId: req.GetPartitionId(),
VchannelName: req.GetVchannelName(),
PrimaryKeys: req.GetPrimaryKeys(),
Timestamps: req.GetTimestamps(),
Scope: req.GetScope(),
}
// do fallback without parallel, to protect the mem limit
var missingIDs []int64
var failedIDs []int64
for _, segmentID := range req.GetSegmentIds() {
sReq.SegmentId = segmentID
err := w.Delete(ctx, sReq)
switch {
case errors.Is(err, merr.ErrSegmentNotFound):
missingIDs = append(missingIDs, segmentID)
case err != nil:
failedIDs = append(failedIDs, segmentID)
default:
}
}
return &querypb.DeleteBatchResponse{
Status: merr.Success(),
FailedIds: failedIDs,
MissingIds: missingIDs,
}, nil
}
func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
client := w.getClient()
ret, err := client.SearchSegments(ctx, req)

View File

@ -193,6 +193,70 @@ func (s *RemoteWorkerSuite) TestDelete() {
})
}
func (s *RemoteWorkerSuite) TestDeleteBatch() {
s.Run("normal_run", func() {
defer func() { s.mockClient.ExpectedCalls = nil }()
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
Return(&querypb.DeleteBatchResponse{Status: merr.Success()}, nil).Once()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
SegmentIds: []int64{100, 200},
})
s.NoError(merr.CheckRPCCall(resp, err))
})
s.Run("client_return_error", func() {
defer func() { s.mockClient.ExpectedCalls = nil }()
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
Return(nil, merr.WrapErrServiceInternal("mocked")).Once()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
SegmentIds: []int64{100, 200},
})
s.Error(merr.CheckRPCCall(resp, err))
})
s.Run("client_return_fail_status", func() {
defer func() { s.mockClient.ExpectedCalls = nil }()
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
Return(&querypb.DeleteBatchResponse{
Status: merr.Status(merr.WrapErrServiceUnavailable("mocked")),
}, nil).Once()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
SegmentIds: []int64{100, 200},
})
s.Error(merr.CheckRPCCall(resp, err))
})
s.Run("batch_delete_unimplemented", func() {
defer func() { s.mockClient.ExpectedCalls = nil }()
s.mockClient.EXPECT().DeleteBatch(mock.Anything, mock.AnythingOfType("*querypb.DeleteBatchRequest")).
Return(nil, merr.WrapErrServiceUnimplemented(status.Errorf(codes.Unimplemented, "mocked grpc unimplemented")))
s.mockClient.EXPECT().Delete(mock.Anything, mock.Anything).Return(merr.Success(), nil).Times(2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := s.worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
SegmentIds: []int64{100, 200},
})
s.NoError(merr.CheckRPCCall(resp, err))
})
}
func (s *RemoteWorkerSuite) TestSearch() {
s.Run("normal_run", func() {
defer func() { s.mockClient.ExpectedCalls = nil }()

View File

@ -19,8 +19,10 @@ package delegator
import (
"context"
"fmt"
"runtime"
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -36,8 +38,10 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -276,8 +280,6 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData
sealed, growing, version := sd.distribution.PinOnlineSegments(partitions...)
defer sd.distribution.Unpin(version)
for _, item := range group {
deleteData := *item
for _, entry := range sealed {
entry := entry
worker, err := sd.workerManager.GetWorker(ctx, entry.NodeID)
@ -296,9 +298,7 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData
})
eg.Go(func() error {
offlineSegments.Upsert(sd.applyDelete(ctx, entry.NodeID, worker, func(segmentID int64) (DeleteData, bool) {
return deleteData, true
}, segments, querypb.DataScope_Historical)...)
offlineSegments.Upsert(sd.applyDeleteBatch(ctx, entry.NodeID, worker, group, segments, querypb.DataScope_Historical)...)
return nil
})
}
@ -314,13 +314,10 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData
panic(err)
}
eg.Go(func() error {
offlineSegments.Upsert(sd.applyDelete(ctx, paramtable.GetNodeID(), worker, func(segmentID int64) (DeleteData, bool) {
return deleteData, true
}, growing, querypb.DataScope_Streaming)...)
offlineSegments.Upsert(sd.applyDeleteBatch(ctx, paramtable.GetNodeID(), worker, group, growing, querypb.DataScope_Streaming)...)
return nil
})
}
}
return nil
})
}
@ -336,3 +333,72 @@ func (sd *shardDelegator) forwardStreamingDirect(ctx context.Context, deleteData
metrics.QueryNodeForwardDeleteCost.WithLabelValues("ProcessDelete", fmt.Sprint(paramtable.GetNodeID())).Observe(float64(forwardDeleteCost.Milliseconds()))
}
// applyDeleteBatch handles delete record and apply them to corresponding workers in batch.
func (sd *shardDelegator) applyDeleteBatch(ctx context.Context,
nodeID int64,
worker cluster.Worker,
data []*DeleteData,
entries []SegmentEntry,
scope querypb.DataScope,
) []int64 {
offlineSegments := typeutil.NewConcurrentSet[int64]()
log := sd.getLogger(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0) * 4)
defer pool.Release()
var futures []*conc.Future[struct{}]
for _, delData := range data {
delData := delData
segmentIDs := lo.Map(entries, func(entry SegmentEntry, _ int) int64 {
return entry.SegmentID
})
future := pool.Submit(func() (struct{}, error) {
log.Debug("delegator plan to applyDelete via worker")
err := retry.Handle(ctx, func() (bool, error) {
if sd.Stopped() {
return false, merr.WrapErrChannelNotAvailable(sd.vchannelName, "channel is unsubscribing")
}
resp, err := worker.DeleteBatch(ctx, &querypb.DeleteBatchRequest{
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(nodeID)),
CollectionId: sd.collectionID,
PartitionId: delData.PartitionID,
VchannelName: sd.vchannelName,
SegmentIds: segmentIDs,
PrimaryKeys: storage.ParsePrimaryKeys2IDs(delData.PrimaryKeys),
Timestamps: delData.Timestamps,
Scope: scope,
})
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("try to delete data on non-exist node")
// cancel other request
cancel()
return false, err
}
// grpc/network error
if err != nil {
return true, err
}
if len(resp.GetMissingIds()) > 0 {
log.Warn("try to delete data of released segment", zap.Int64s("ids", resp.GetMissingIds()))
}
if len(resp.GetFailedIds()) > 0 {
log.Warn("apply delete for segment failed, marking it offline")
offlineSegments.Upsert(resp.GetFailedIds()...)
}
return false, nil
}, retry.Attempts(10))
return struct{}{}, err
})
futures = append(futures, future)
}
conc.AwaitAll(futures...)
return offlineSegments.Collect()
}

View File

@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -243,12 +244,11 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() {
deletedSegment := typeutil.NewConcurrentSet[int64]()
mockWorker := cluster.NewMockWorker(s.T())
s.workerManager.EXPECT().GetWorker(mock.Anything, int64(1)).Return(mockWorker, nil)
mockWorker.EXPECT().Delete(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteRequest) error {
s.T().Log(dr.GetSegmentId())
deletedSegment.Insert(dr.SegmentId)
mockWorker.EXPECT().DeleteBatch(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, dr *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
deletedSegment.Upsert(dr.GetSegmentIds()...)
s.ElementsMatch([]int64{10}, dr.GetPrimaryKeys().GetIntId().GetData())
s.ElementsMatch([]uint64{10}, dr.GetTimestamps())
return nil
return &querypb.DeleteBatchResponse{Status: merr.Success()}, nil
})
delegator.ProcessDelete([]*DeleteData{

View File

@ -53,6 +53,10 @@ func (w *LocalWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) er
return merr.CheckRPCCall(status, err)
}
func (w *LocalWorker) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
return w.node.DeleteBatch(ctx, req)
}
func (w *LocalWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
return w.node.SearchSegments(ctx, req)
}

View File

@ -87,6 +87,22 @@ func (f SegmentIDFilter) SegmentIDs() ([]int64, bool) {
return []int64{int64(f)}, true
}
type SegmentIDsFilter struct {
segmentIDs typeutil.Set[int64]
}
func (f SegmentIDsFilter) Filter(segment Segment) bool {
return f.segmentIDs.Contain(segment.ID())
}
func (f SegmentIDsFilter) SegmentType() (SegmentType, bool) {
return commonpb.SegmentState_SegmentStateNone, false
}
func (f SegmentIDsFilter) SegmentIDs() ([]int64, bool) {
return f.segmentIDs.Collect(), true
}
type SegmentTypeFilter SegmentType
func (f SegmentTypeFilter) Filter(segment Segment) bool {
@ -133,6 +149,12 @@ func WithID(id int64) SegmentFilter {
return SegmentIDFilter(id)
}
func WithIDs(ids ...int64) SegmentFilter {
return SegmentIDsFilter{
segmentIDs: typeutil.NewSet(ids...),
}
}
func WithLevel(level datapb.SegmentLevel) SegmentFilter {
return SegmentFilterFunc(func(segment Segment) bool {
return segment.Level() == level

View File

@ -19,6 +19,7 @@ package querynodev2
import (
"context"
"fmt"
"runtime"
"strconv"
"sync"
"time"
@ -48,6 +49,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
@ -1404,6 +1406,84 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
return merr.Success(), nil
}
// DeleteBatch is the API to apply same delete data into multiple segments.
// it's basically same as `Delete` but cost less memory pressure.
func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionId()),
zap.String("channel", req.GetVchannelName()),
zap.Int64s("segmentIDs", req.GetSegmentIds()),
zap.String("scope", req.GetScope().String()),
)
// check node healthy
if err := node.lifetime.Add(merr.IsHealthy); err != nil {
return &querypb.DeleteBatchResponse{
Status: merr.Status(err),
}, nil
}
defer node.lifetime.Done()
// log.Debug("QueryNode received worker delete detail", zap.Stringer("info", &deleteRequestStringer{DeleteRequest: req}))
filters := []segments.SegmentFilter{
segments.WithIDs(req.GetSegmentIds()...),
}
// do not add filter for Unknown & All scope, for backward cap
switch req.GetScope() {
case querypb.DataScope_Historical:
filters = append(filters, segments.WithType(segments.SegmentTypeSealed))
case querypb.DataScope_Streaming:
filters = append(filters, segments.WithType(segments.SegmentTypeGrowing))
}
segs := node.manager.Segment.GetBy(filters...)
hitIDs := lo.Map(segs, func(segment segments.Segment, _ int) int64 {
return segment.ID()
})
// calculate missing ids, continue to delete existing ones.
missingIDs := typeutil.NewSet(req.GetSegmentIds()...).Complement(typeutil.NewSet(hitIDs...))
if missingIDs.Len() > 0 {
log.Warn("Delete batch find missing ids", zap.Int64s("missing_ids", missingIDs.Collect()))
}
pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
// control the execution batch parallel with P number
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
pool := conc.NewPool[struct{}](runtime.GOMAXPROCS(0))
futures := make([]*conc.Future[struct{}], 0, len(segs))
errSet := typeutil.NewConcurrentSet[int64]()
for _, segment := range segs {
segment := segment
futures = append(futures, pool.Submit(func() (struct{}, error) {
// TODO @silverxia, add interface to use same data struct for segment delete
// current implementation still copys pks into protobuf(or arrow) struct
err := segment.Delete(ctx, pks, req.GetTimestamps())
if err != nil {
errSet.Insert(segment.ID())
log.Warn("segment delete failed",
zap.Int64("segmentID", segment.ID()),
zap.Error(err))
return struct{}{}, err
}
return struct{}{}, nil
}))
}
// ignore error returned, since error segment is recorded into error set
_ = conc.AwaitAll(futures...)
// return merr.Success(), nil
return &querypb.DeleteBatchResponse{
Status: merr.Success(),
FailedIds: errSet.Collect(),
}, nil
}
type deleteRequestStringer struct {
*querypb.DeleteRequest
}

View File

@ -130,6 +130,10 @@ func (m *GrpcQueryNodeClient) Delete(ctx context.Context, in *querypb.DeleteRequ
return &commonpb.Status{}, m.Err
}
func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
return &querypb.DeleteBatchResponse{}, m.Err
}
func (m *GrpcQueryNodeClient) Close() error {
return m.Err
}

View File

@ -148,6 +148,10 @@ func (qn *qnServerWrapper) Delete(ctx context.Context, in *querypb.DeleteRequest
return qn.QueryNode.Delete(ctx, in)
}
func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBatchRequest, opts ...grpc.CallOption) (*querypb.DeleteBatchResponse, error) {
return qn.QueryNode.DeleteBatch(ctx, in)
}
func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient {
return &qnServerWrapper{
QueryNode: qn,

View File

@ -246,6 +246,17 @@ func (s *QnWrapperSuite) TestDelete() {
s.NoError(err)
}
func (s *QnWrapperSuite) TestDeleteBatch() {
s.qn.EXPECT().DeleteBatch(mock.Anything, mock.Anything).
Return(&querypb.DeleteBatchResponse{
Status: merr.Status(nil),
}, nil)
resp, err := s.client.DeleteBatch(context.Background(), &querypb.DeleteBatchRequest{})
err = merr.CheckRPCCall(resp, err)
s.NoError(err)
}
// Race caused by mock parameter check on once
/*
func (s *QnWrapperSuite) TestQueryStream() {