diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 8f074cf0ce..2ea1f0c6ef 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -34,8 +34,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -var Params *paramtable.ComponentParam = paramtable.Get() - // Client is the grpc client of QueryNode. type Client struct { grpcClient grpcclient.GrpcClient[querypb.QueryNodeClient] @@ -47,7 +45,7 @@ func NewClient(ctx context.Context, addr string) (*Client, error) { if addr == "" { return nil, fmt.Errorf("addr is empty") } - clientParams := &Params.QueryNodeGrpcClientCfg + clientParams := paramtable.Get().QueryNodeGrpcClientCfg client := &Client{ addr: addr, grpcClient: &grpcclient.ClientBase[querypb.QueryNodeClient]{ @@ -99,46 +97,38 @@ func (c *Client) getAddr() (string, error) { return c.addr, nil } -// GetComponentStates gets the component states of QueryNode. -func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { +func wrapGrpcCall[T any](ctx context.Context, c *Client, call func(grpcClient querypb.QueryNodeClient) (*T, error)) (*T, error) { ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { if !funcutil.CheckCtxValid(ctx) { return nil, ctx.Err() } - return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + return call(client) }) if err != nil || ret == nil { return nil, err } - return ret.(*milvuspb.ComponentStates), err + return ret.(*T), err +} + +// GetComponentStates gets the component states of QueryNode. +func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.ComponentStates, error) { + return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + }) } // GetTimeTickChannel gets the time tick channel of QueryNode. func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.StringResponse, error) { return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{}) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*milvuspb.StringResponse), err } // GetStatisticsChannel gets the statistics channel of QueryNode. func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.StringResponse, error) { return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{}) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*milvuspb.StringResponse), err } // WatchDmChannels watches the channels about data manipulation. @@ -147,16 +137,9 @@ func (c *Client) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChanne commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.WatchDmChannels(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // UnsubDmChannel unsubscribes the channels about data manipulation. @@ -165,16 +148,9 @@ func (c *Client) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannel commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.UnsubDmChannel(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // LoadSegments loads the segments to search. @@ -183,16 +159,9 @@ func (c *Client) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequ commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.LoadSegments(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // ReleaseCollection releases the data of the specified collection in QueryNode. @@ -201,16 +170,9 @@ func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.ReleaseCollection(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // LoadPartitions updates partitions meta info in QueryNode. @@ -219,16 +181,9 @@ func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.LoadPartitions(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // ReleasePartitions releases the data of the specified partitions in QueryNode. @@ -237,16 +192,9 @@ func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.ReleasePartitions(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // ReleaseSegments releases the data of the specified segments in QueryNode. @@ -255,44 +203,23 @@ func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmen commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.ReleaseSegments(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // Search performs replica search tasks in QueryNode. func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { - ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*internalpb.SearchResults, error) { return client.Search(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*internalpb.SearchResults), err } // Query performs replica query tasks in QueryNode. func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { - ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*internalpb.RetrieveResults, error) { return client.Query(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*internalpb.RetrieveResults), err } // GetSegmentInfo gets the information of the specified segments in QueryNode. @@ -301,16 +228,9 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.GetSegmentInfoResponse, error) { return client.GetSegmentInfo(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*querypb.GetSegmentInfoResponse), err } // SyncReplicaSegments syncs replica node segments information to shard leaders. @@ -319,16 +239,9 @@ func (c *Client) SyncReplicaSegments(ctx context.Context, req *querypb.SyncRepli commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.SyncReplicaSegments(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // ShowConfigurations gets specified configurations para of QueryNode @@ -337,17 +250,9 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*internalpb.ShowConfigurationsResponse, error) { return client.ShowConfigurations(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - - return ret.(*internalpb.ShowConfigurationsResponse), err } // GetMetrics gets the metrics information of QueryNode. @@ -356,29 +261,15 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.GetMetricsResponse, error) { return client.GetMetrics(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*milvuspb.GetMetricsResponse), err } func (c *Client) GetStatistics(ctx context.Context, request *querypb.GetStatisticsRequest) (*internalpb.GetStatisticsResponse, error) { - ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*internalpb.GetStatisticsResponse, error) { return client.GetStatistics(ctx, request) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*internalpb.GetStatisticsResponse), err } func (c *Client) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) { @@ -386,16 +277,9 @@ func (c *Client) GetDataDistribution(ctx context.Context, req *querypb.GetDataDi commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*querypb.GetDataDistributionResponse, error) { return client.GetDataDistribution(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*querypb.GetDataDistributionResponse), err } func (c *Client) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) { @@ -403,16 +287,9 @@ func (c *Client) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu commonpbutil.UpdateMsgBase( req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID())) - ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.SyncDistribution(ctx, req) }) - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err } // Delete is used to forward delete message between delegator and workers. @@ -422,15 +299,7 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo req.GetBase(), commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID()), ) - ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return nil, ctx.Err() - } + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*commonpb.Status, error) { return client.Delete(ctx, req) }) - - if err != nil || ret == nil { - return nil, err - } - return ret.(*commonpb.Status), err }