diff --git a/internal/querynodev2/delegator/delegator.go b/internal/querynodev2/delegator/delegator.go index 1defa7fc8c..fa4cd1d9f2 100644 --- a/internal/querynodev2/delegator/delegator.go +++ b/internal/querynodev2/delegator/delegator.go @@ -185,7 +185,6 @@ func (sd *shardDelegator) modifySearchRequest(req *querypb.SearchRequest, scope nodeReq.Scope = scope nodeReq.Req.Base.TargetID = targetID nodeReq.SegmentIDs = segmentIDs - nodeReq.FromShardLeader = true nodeReq.DmlChannels = []string{sd.vchannelName} return nodeReq } @@ -195,7 +194,6 @@ func (sd *shardDelegator) modifyQueryRequest(req *querypb.QueryRequest, scope qu nodeReq.Scope = scope nodeReq.Req.Base.TargetID = targetID nodeReq.SegmentIDs = segmentIDs - nodeReq.FromShardLeader = true nodeReq.DmlChannels = []string{sd.vchannelName} return nodeReq } @@ -315,7 +313,6 @@ func (sd *shardDelegator) Search(ctx context.Context, req *querypb.SearchRequest Req: newRequest, DmlChannels: req.GetDmlChannels(), TotalChannelNum: req.GetTotalChannelNum(), - FromShardLeader: true, } searchReq.Req.GuaranteeTimestamp = req.GetReq().GetGuaranteeTimestamp() searchReq.Req.TimeoutTimestamp = req.GetReq().GetTimeoutTimestamp() diff --git a/internal/querynodev2/delegator/delegator_test.go b/internal/querynodev2/delegator/delegator_test.go index 6e1c97da2a..2dcd9ac5e0 100644 --- a/internal/querynodev2/delegator/delegator_test.go +++ b/internal/querynodev2/delegator/delegator_test.go @@ -274,7 +274,7 @@ func (s *DelegatorSuite) TestSearch() { worker1.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). Run(func(_ context.Context, req *querypb.SearchRequest) { s.EqualValues(1, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + if req.GetScope() == querypb.DataScope_Streaming { s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1004}, req.GetSegmentIDs()) @@ -287,7 +287,7 @@ func (s *DelegatorSuite) TestSearch() { worker2.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). Run(func(_ context.Context, req *querypb.SearchRequest) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -342,7 +342,7 @@ func (s *DelegatorSuite) TestSearch() { worker2.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). Run(func(_ context.Context, req *querypb.SearchRequest) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -382,7 +382,7 @@ func (s *DelegatorSuite) TestSearch() { worker2.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). Run(func(_ context.Context, req *querypb.SearchRequest) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -495,7 +495,7 @@ func (s *DelegatorSuite) TestQuery() { worker1.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). Run(func(_ context.Context, req *querypb.QueryRequest) { s.EqualValues(1, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + if req.GetScope() == querypb.DataScope_Streaming { s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1004}, req.GetSegmentIDs()) @@ -508,7 +508,7 @@ func (s *DelegatorSuite) TestQuery() { worker2.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). Run(func(_ context.Context, req *querypb.QueryRequest) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -562,7 +562,7 @@ func (s *DelegatorSuite) TestQuery() { worker2.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). Run(func(_ context.Context, req *querypb.QueryRequest) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -599,7 +599,7 @@ func (s *DelegatorSuite) TestQuery() { worker2.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). Run(func(_ context.Context, req *querypb.QueryRequest) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -692,7 +692,7 @@ func (s *DelegatorSuite) TestQueryStream() { worker1.EXPECT().QueryStreamSegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest"), mock.Anything). Run(func(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) { s.EqualValues(1, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + if req.GetScope() == querypb.DataScope_Streaming { s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1004}, req.GetSegmentIDs()) @@ -715,7 +715,7 @@ func (s *DelegatorSuite) TestQueryStream() { worker2.EXPECT().QueryStreamSegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest"), mock.Anything). Run(func(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) @@ -848,7 +848,7 @@ func (s *DelegatorSuite) TestQueryStream() { worker2.EXPECT().QueryStreamSegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest"), mock.Anything). Run(func(ctx context.Context, req *querypb.QueryRequest, srv streamrpc.QueryStreamServer) { s.EqualValues(2, req.Req.GetBase().GetTargetID()) - s.True(req.GetFromShardLeader()) + s.Equal(querypb.DataScope_Historical, req.GetScope()) s.EqualValues([]string{s.vchannelName}, req.GetDmlChannels()) s.ElementsMatch([]int64{1002, 1003}, req.GetSegmentIDs()) diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 39304b923a..ab224ba076 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -193,7 +193,6 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque }() log.Debug("start do query with channel", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), ) // add cancel when error occurs @@ -218,9 +217,8 @@ func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryReque } // reduce result - tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, traceID = %s, fromShardLeader = %t, vChannel = %s, segmentIDs = %v", + tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, traceID = %s, vChannel = %s, segmentIDs = %v", traceID, - req.GetFromShardLeader(), channel, req.GetSegmentIDs(), )) @@ -268,7 +266,6 @@ func (node *QueryNode) queryChannelStream(ctx context.Context, req *querypb.Quer }() log.Debug("start do streaming query with channel", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), ) @@ -353,7 +350,6 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq }() log.Debug("start to search channel", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), ) searchCtx, cancel := context.WithCancel(ctx) @@ -376,9 +372,8 @@ func (node *QueryNode) searchChannel(ctx context.Context, req *querypb.SearchReq } // reduce result - tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, traceID = %s, fromShardLeader = %t, vChannel = %s, segmentIDs = %v", + tr.CtxElapse(ctx, fmt.Sprintf("start reduce query result, traceID = %s, vChannel = %s, segmentIDs = %v", traceID, - req.GetFromShardLeader(), channel, req.GetSegmentIDs(), )) diff --git a/internal/querynodev2/segments/mock_data.go b/internal/querynodev2/segments/mock_data.go index aa6fcfd6b6..b767013bf2 100644 --- a/internal/querynodev2/segments/mock_data.go +++ b/internal/querynodev2/segments/mock_data.go @@ -1388,11 +1388,10 @@ func checkSearchResult(ctx context.Context, nq int64, plan *SearchPlan, searchRe func genSearchPlanAndRequests(collection *Collection, segments []int64, indexType string, nq int64) (*SearchRequest, error) { iReq, _ := genSearchRequest(nq, indexType, collection) queryReq := &querypb.SearchRequest{ - Req: iReq, - DmlChannels: []string{"dml"}, - SegmentIDs: segments, - FromShardLeader: true, - Scope: querypb.DataScope_Historical, + Req: iReq, + DmlChannels: []string{"dml"}, + SegmentIDs: segments, + Scope: querypb.DataScope_Historical, } return NewSearchRequest(context.Background(), collection, queryReq, queryReq.Req.GetPlaceholderGroup()) } diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 1ea9b779a7..1ad47bb10c 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -702,15 +702,9 @@ func (node *QueryNode) SearchSegments(ctx context.Context, req *querypb.SearchRe // Search performs replica search tasks. func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - if req.FromShardLeader { - // for compatible with rolling upgrade from version before v2.2.9 - return node.SearchSegments(ctx, req) - } - log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetReq().GetCollectionID()), zap.Strings("channels", req.GetDmlChannels()), - zap.Bool("fromShardLeader", req.GetFromShardLeader()), zap.Int64("nq", req.GetReq().GetNq()), ) @@ -746,7 +740,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( Req: req.Req, DmlChannels: []string{ch}, SegmentIDs: req.SegmentIDs, - FromShardLeader: req.FromShardLeader, Scope: req.Scope, TotalChannelNum: req.TotalChannelNum, } @@ -827,10 +820,7 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ } }() - log.Debug("start do query segments", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - ) + log.Debug("start do query segments", zap.Int64s("segmentIDs", req.GetSegmentIDs())) // add cancel when error occurs queryCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -856,9 +846,8 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ return resp, nil } - tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromShardLeader = %t, vChannel = %s, segmentIDs = %v", + tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, vChannel = %s, segmentIDs = %v", traceID, - req.GetFromShardLeader(), channel, req.GetSegmentIDs(), )) @@ -875,11 +864,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ // Query performs replica query tasks. func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { - if req.FromShardLeader { - // for compatible with rolling upgrade from version before v2.2.9 - return node.QuerySegments(ctx, req) - } - log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetReq().GetCollectionID()), zap.Strings("shards", req.GetDmlChannels()), @@ -907,11 +891,10 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i for i, ch := range req.GetDmlChannels() { ch := ch req := &querypb.QueryRequest{ - Req: req.Req, - DmlChannels: []string{ch}, - SegmentIDs: req.SegmentIDs, - FromShardLeader: req.FromShardLeader, - Scope: req.Scope, + Req: req.Req, + DmlChannels: []string{ch}, + SegmentIDs: req.SegmentIDs, + Scope: req.Scope, } idx := i @@ -945,10 +928,8 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(node.GetNodeID()), metrics.QueryLabel, metrics.ReduceShards). Observe(float64(reduceLatency.Milliseconds())) - if !req.FromShardLeader { - collector.Rate.Add(metricsinfo.NQPerSecond, 1) - metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req))) - } + collector.Rate.Add(metricsinfo.NQPerSecond, 1) + metrics.QueryNodeExecuteCounter.WithLabelValues(strconv.FormatInt(node.GetNodeID(), 10), metrics.QueryLabel).Add(float64(proto.Size(req))) relatedDataSize := lo.Reduce(toMergeResults, func(acc int64, result *internalpb.RetrieveResults, _ int) int64 { return acc + result.GetCostAggregation().GetTotalRelatedDataSize() }, 0) @@ -988,11 +969,10 @@ func (node *QueryNode) QueryStream(req *querypb.QueryRequest, srv querypb.QueryN for _, ch := range req.GetDmlChannels() { ch := ch req := &querypb.QueryRequest{ - Req: req.Req, - DmlChannels: []string{ch}, - SegmentIDs: req.SegmentIDs, - FromShardLeader: req.FromShardLeader, - Scope: req.Scope, + Req: req.Req, + DmlChannels: []string{ch}, + SegmentIDs: req.SegmentIDs, + Scope: req.Scope, } runningGp.Go(func() error { @@ -1045,10 +1025,7 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp } defer node.lifetime.Done() - log.Debug("start do query with channel", - zap.Bool("fromShardLeader", req.GetFromShardLeader()), - zap.Int64s("segmentIDs", req.GetSegmentIDs()), - ) + log.Debug("start do query with channel", zap.Int64s("segmentIDs", req.GetSegmentIDs())) tr := timerecord.NewTimeRecorder("queryChannel") @@ -1059,9 +1036,8 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp return nil } - tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromShardLeader = %t, vChannel = %s, segmentIDs = %v", + tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, vChannel = %s, segmentIDs = %v", traceID, - req.GetFromShardLeader(), channel, req.GetSegmentIDs(), )) diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index f8ecfb104a..4fcc2ee0b2 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -1184,8 +1184,8 @@ func (suite *ServiceSuite) TestSearch_Normal() { creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType) req := &querypb.SearchRequest{ - Req: creq, - FromShardLeader: false, + Req: creq, + DmlChannels: []string{suite.vchannel}, TotalChannelNum: 2, } @@ -1208,8 +1208,8 @@ func (suite *ServiceSuite) TestSearch_Concurrent() { future := conc.Go(func() (*internalpb.SearchResults, error) { creq, err := suite.genCSearchRequest(30, schemapb.DataType_FloatVector, 107, defaultMetricType) req := &querypb.SearchRequest{ - Req: creq, - FromShardLeader: false, + Req: creq, + DmlChannels: []string{suite.vchannel}, TotalChannelNum: 2, } @@ -1234,8 +1234,8 @@ func (suite *ServiceSuite) TestSearch_Failed() { schema := segments.GenTestCollectionSchema(suite.collectionName, schemapb.DataType_Int64, false) creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, "invalidMetricType") req := &querypb.SearchRequest{ - Req: creq, - FromShardLeader: false, + Req: creq, + DmlChannels: []string{suite.vchannel}, TotalChannelNum: 2, } @@ -1305,7 +1305,6 @@ func (suite *ServiceSuite) TestSearchSegments_Unhealthy() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) req := &querypb.SearchRequest{ - FromShardLeader: true, DmlChannels: []string{suite.vchannel}, TotalChannelNum: 2, } @@ -1324,7 +1323,7 @@ func (suite *ServiceSuite) TestSearchSegments_Failed() { Req: &internalpb.SearchRequest{ CollectionID: -1, // not exist collection id }, - FromShardLeader: true, + DmlChannels: []string{suite.vchannel}, TotalChannelNum: 2, } @@ -1353,8 +1352,8 @@ func (suite *ServiceSuite) TestSearchSegments_Normal() { creq, err := suite.genCSearchRequest(10, schemapb.DataType_FloatVector, 107, defaultMetricType) req := &querypb.SearchRequest{ - Req: creq, - FromShardLeader: true, + Req: creq, + DmlChannels: []string{suite.vchannel}, TotalChannelNum: 2, } @@ -1397,9 +1396,9 @@ func (suite *ServiceSuite) TestQuery_Normal() { creq, err := suite.genCQueryRequest(10, IndexFaissIDMap, schema) suite.NoError(err) req := &querypb.QueryRequest{ - Req: creq, - FromShardLeader: false, - DmlChannels: []string{suite.vchannel}, + Req: creq, + + DmlChannels: []string{suite.vchannel}, } rsp, err := suite.node.Query(ctx, req) @@ -1416,9 +1415,9 @@ func (suite *ServiceSuite) TestQuery_Failed() { creq, err := suite.genCQueryRequest(10, IndexFaissIDMap, schema) suite.NoError(err) req := &querypb.QueryRequest{ - Req: creq, - FromShardLeader: false, - DmlChannels: []string{suite.vchannel}, + Req: creq, + + DmlChannels: []string{suite.vchannel}, } // Delegator not found @@ -1443,8 +1442,8 @@ func (suite *ServiceSuite) TestQuerySegments_Failed() { Req: &internalpb.RetrieveRequest{ CollectionID: -1, }, - FromShardLeader: true, - DmlChannels: []string{suite.vchannel}, + + DmlChannels: []string{suite.vchannel}, } rsp, err := suite.node.QuerySegments(ctx, req) @@ -1478,9 +1477,9 @@ func (suite *ServiceSuite) TestQueryStream_Normal() { creq, err := suite.genCQueryRequest(10, IndexFaissIDMap, schema) suite.NoError(err) req := &querypb.QueryRequest{ - Req: creq, - FromShardLeader: false, - DmlChannels: []string{suite.vchannel}, + Req: creq, + + DmlChannels: []string{suite.vchannel}, } client := streamrpc.NewLocalQueryClient(ctx) @@ -1513,9 +1512,9 @@ func (suite *ServiceSuite) TestQueryStream_Failed() { creq, err := suite.genCQueryRequest(10, IndexFaissIDMap, schema) suite.NoError(err) req := &querypb.QueryRequest{ - Req: creq, - FromShardLeader: false, - DmlChannels: []string{suite.vchannel}, + Req: creq, + + DmlChannels: []string{suite.vchannel}, } queryFunc := func(wg *sync.WaitGroup, req *querypb.QueryRequest, client *streamrpc.LocalQueryClient) { @@ -1591,9 +1590,9 @@ func (suite *ServiceSuite) TestQuerySegments_Normal() { creq, err := suite.genCQueryRequest(10, IndexFaissIDMap, schema) suite.NoError(err) req := &querypb.QueryRequest{ - Req: creq, - FromShardLeader: true, - DmlChannels: []string{suite.vchannel}, + Req: creq, + + DmlChannels: []string{suite.vchannel}, } rsp, err := suite.node.QuerySegments(ctx, req) @@ -1613,9 +1612,9 @@ func (suite *ServiceSuite) TestQueryStreamSegments_Normal() { creq, err := suite.genCQueryRequest(10, IndexFaissIDMap, schema) suite.NoError(err) req := &querypb.QueryRequest{ - Req: creq, - FromShardLeader: true, - DmlChannels: []string{suite.vchannel}, + Req: creq, + + DmlChannels: []string{suite.vchannel}, } client := streamrpc.NewLocalQueryClient(ctx)