mirror of https://github.com/milvus-io/milvus.git
Update: Wrap common calls querycoord rootcoord clients (#24360)
Signed-off-by: Preetham <kamidipreetham@gmail.com>pull/24334/head
parent
0c8045d66c
commit
4ea0830c25
|
@ -117,46 +117,38 @@ func (c *Client) Register() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetComponentStates gets the component states of QueryCoord.
|
||||
func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
||||
func wrapGrpcCall[T any](ctx context.Context, c *Client, call func(grpcClient querypb.QueryCoordClient) (*T, error)) (*T, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
return call(client)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ComponentStates), err
|
||||
return ret.(*T), err
|
||||
}
|
||||
|
||||
// GetComponentStates gets the component states of QueryCoord.
|
||||
func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.ComponentStates, error) {
|
||||
return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
})
|
||||
}
|
||||
|
||||
// GetTimeTickChannel gets the time tick channel of QueryCoord.
|
||||
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.StringResponse, error) {
|
||||
return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.StringResponse), err
|
||||
}
|
||||
|
||||
// GetStatisticsChannel gets the statistics channel of QueryCoord.
|
||||
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.StringResponse, error) {
|
||||
return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.StringResponse), err
|
||||
}
|
||||
|
||||
// ShowCollections shows the collections in the QueryCoord.
|
||||
|
@ -166,16 +158,9 @@ func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.ShowCollectionsResponse, error) {
|
||||
return client.ShowCollections(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*querypb.ShowCollectionsResponse), err
|
||||
}
|
||||
|
||||
// LoadCollection loads the data of the specified collections in the QueryCoord.
|
||||
|
@ -185,16 +170,9 @@ func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollection
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.LoadCollection(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// ReleaseCollection release the data of the specified collections in the QueryCoord.
|
||||
|
@ -204,16 +182,9 @@ func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.ReleaseCollection(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// ShowPartitions shows the partitions in the QueryCoord.
|
||||
|
@ -223,16 +194,9 @@ func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.ShowPartitionsResponse, error) {
|
||||
return client.ShowPartitions(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*querypb.ShowPartitionsResponse), err
|
||||
}
|
||||
|
||||
// LoadPartitions loads the data of the specified partitions in the QueryCoord.
|
||||
|
@ -242,16 +206,9 @@ func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.LoadPartitions(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// ReleasePartitions release the data of the specified partitions in the QueryCoord.
|
||||
|
@ -261,16 +218,9 @@ func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.ReleasePartitions(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// SyncNewCreatedPartition notifies QueryCoord to sync new created partition if collection is loaded.
|
||||
|
@ -280,16 +230,9 @@ func (c *Client) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncN
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.SyncNewCreatedPartition(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// GetPartitionStates gets the states of the specified partition.
|
||||
|
@ -299,16 +242,9 @@ func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.GetPartitionStatesResponse, error) {
|
||||
return client.GetPartitionStates(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*querypb.GetPartitionStatesResponse), err
|
||||
}
|
||||
|
||||
// GetSegmentInfo gets the information of the specified segment from QueryCoord.
|
||||
|
@ -318,16 +254,9 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.GetSegmentInfoResponse, error) {
|
||||
return client.GetSegmentInfo(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*querypb.GetSegmentInfoResponse), err
|
||||
}
|
||||
|
||||
// LoadBalance migrate the sealed segments on the source node to the dst nodes.
|
||||
|
@ -337,16 +266,9 @@ func (c *Client) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.LoadBalance(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// ShowConfigurations gets specified configurations para of QueryCoord
|
||||
|
@ -356,17 +278,9 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*internalpb.ShowConfigurationsResponse, error) {
|
||||
return client.ShowConfigurations(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ret.(*internalpb.ShowConfigurationsResponse), err
|
||||
}
|
||||
|
||||
// GetMetrics gets the metrics information of QueryCoord.
|
||||
|
@ -376,16 +290,9 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.GetMetricsResponse, error) {
|
||||
return client.GetMetrics(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.GetMetricsResponse), err
|
||||
}
|
||||
|
||||
// GetReplicas gets the replicas of a certain collection.
|
||||
|
@ -395,16 +302,9 @@ func (c *Client) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.GetReplicasResponse, error) {
|
||||
return client.GetReplicas(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.GetReplicasResponse), err
|
||||
}
|
||||
|
||||
// GetShardLeaders gets the shard leaders of a certain collection.
|
||||
|
@ -414,29 +314,15 @@ func (c *Client) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.GetShardLeadersResponse, error) {
|
||||
return client.GetShardLeaders(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*querypb.GetShardLeadersResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.CheckHealthResponse, error) {
|
||||
return client.CheckHealth(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.CheckHealthResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) {
|
||||
|
@ -445,16 +331,9 @@ func (c *Client) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateResourceGroup(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
|
||||
|
@ -463,16 +342,9 @@ func (c *Client) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropResourceGroup(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) DescribeResourceGroup(ctx context.Context, req *querypb.DescribeResourceGroupRequest) (*querypb.DescribeResourceGroupResponse, error) {
|
||||
|
@ -481,16 +353,9 @@ func (c *Client) DescribeResourceGroup(ctx context.Context, req *querypb.Describ
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*querypb.DescribeResourceGroupResponse, error) {
|
||||
return client.DescribeResourceGroup(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*querypb.DescribeResourceGroupResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) TransferNode(ctx context.Context, req *milvuspb.TransferNodeRequest) (*commonpb.Status, error) {
|
||||
|
@ -499,16 +364,9 @@ func (c *Client) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.TransferNode(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) TransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) (*commonpb.Status, error) {
|
||||
|
@ -517,16 +375,9 @@ func (c *Client) TransferReplica(ctx context.Context, req *querypb.TransferRepli
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*commonpb.Status, error) {
|
||||
return client.TransferReplica(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) ListResourceGroups(ctx context.Context, req *milvuspb.ListResourceGroupsRequest) (*milvuspb.ListResourceGroupsResponse, error) {
|
||||
|
@ -535,14 +386,7 @@ func (c *Client) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client querypb.QueryCoordClient) (*milvuspb.ListResourceGroupsResponse, error) {
|
||||
return client.ListResourceGroups(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ListResourceGroupsResponse), err
|
||||
}
|
||||
|
|
|
@ -128,46 +128,38 @@ func (c *Client) Register() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetComponentStates TODO: timeout need to be propagated through ctx
|
||||
func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
||||
func wrapGrpcCall[T any](ctx context.Context, c *Client, call func(grpcClient rootcoordpb.RootCoordClient) (*T, error)) (*T, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
return call(client)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ComponentStates), err
|
||||
return ret.(*T), err
|
||||
}
|
||||
|
||||
// GetComponentStates TODO: timeout need to be propagated through ctx
|
||||
func (c *Client) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ComponentStates, error) {
|
||||
return client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
|
||||
})
|
||||
}
|
||||
|
||||
// GetTimeTickChannel get timetick channel name
|
||||
func (c *Client) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.StringResponse, error) {
|
||||
return client.GetTimeTickChannel(ctx, &internalpb.GetTimeTickChannelRequest{})
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.StringResponse), err
|
||||
}
|
||||
|
||||
// GetStatisticsChannel just define a channel, not used currently
|
||||
func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.StringResponse, error) {
|
||||
return client.GetStatisticsChannel(ctx, &internalpb.GetStatisticsChannelRequest{})
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.StringResponse), err
|
||||
}
|
||||
|
||||
// CreateCollection create collection
|
||||
|
@ -177,16 +169,9 @@ func (c *Client) CreateCollection(ctx context.Context, in *milvuspb.CreateCollec
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateCollection(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// DropCollection drop collection
|
||||
|
@ -196,16 +181,9 @@ func (c *Client) DropCollection(ctx context.Context, in *milvuspb.DropCollection
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropCollection(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// HasCollection check collection existence
|
||||
|
@ -215,16 +193,9 @@ func (c *Client) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRe
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.BoolResponse, error) {
|
||||
return client.HasCollection(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.BoolResponse), err
|
||||
}
|
||||
|
||||
// DescribeCollection return collection info
|
||||
|
@ -234,16 +205,9 @@ func (c *Client) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCo
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
return client.DescribeCollection(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.DescribeCollectionResponse), err
|
||||
}
|
||||
|
||||
// describeCollectionInternal return collection info
|
||||
|
@ -253,16 +217,9 @@ func (c *Client) describeCollectionInternal(ctx context.Context, in *milvuspb.De
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
return client.DescribeCollectionInternal(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.DescribeCollectionResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) DescribeCollectionInternal(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
|
@ -281,16 +238,9 @@ func (c *Client) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectio
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ShowCollectionsResponse, error) {
|
||||
return client.ShowCollections(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ShowCollectionsResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
|
||||
|
@ -299,16 +249,9 @@ func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCol
|
|||
request.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.AlterCollection(ctx, request)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// CreatePartition create partition
|
||||
|
@ -318,16 +261,9 @@ func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartiti
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreatePartition(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// DropPartition drop partition
|
||||
|
@ -337,16 +273,9 @@ func (c *Client) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRe
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropPartition(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// HasPartition check partition existence
|
||||
|
@ -356,16 +285,9 @@ func (c *Client) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequ
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.BoolResponse, error) {
|
||||
return client.HasPartition(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.BoolResponse), err
|
||||
}
|
||||
|
||||
// ShowPartitions list all partitions in collection
|
||||
|
@ -375,16 +297,9 @@ func (c *Client) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitions
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
return client.ShowPartitions(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ShowPartitionsResponse), err
|
||||
}
|
||||
|
||||
// showPartitionsInternal list all partitions in collection
|
||||
|
@ -394,16 +309,9 @@ func (c *Client) showPartitionsInternal(ctx context.Context, in *milvuspb.ShowPa
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
return client.ShowPartitionsInternal(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ShowPartitionsResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
|
@ -422,16 +330,9 @@ func (c *Client) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimest
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.AllocTimestampResponse, error) {
|
||||
return client.AllocTimestamp(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*rootcoordpb.AllocTimestampResponse), err
|
||||
}
|
||||
|
||||
// AllocID global ID allocator
|
||||
|
@ -441,16 +342,9 @@ func (c *Client) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.AllocIDResponse, error) {
|
||||
return client.AllocID(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*rootcoordpb.AllocIDResponse), err
|
||||
}
|
||||
|
||||
// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
|
||||
|
@ -460,16 +354,9 @@ func (c *Client) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Chann
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.UpdateChannelTimeTick(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// ShowSegments list all segments
|
||||
|
@ -479,16 +366,9 @@ func (c *Client) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequ
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ShowSegmentsResponse, error) {
|
||||
return client.ShowSegments(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ShowSegmentsResponse), err
|
||||
}
|
||||
|
||||
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
|
||||
|
@ -498,16 +378,9 @@ func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.InvalidateCollectionMetaCache(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// ShowConfigurations gets specified configurations para of RootCoord
|
||||
|
@ -517,17 +390,9 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*internalpb.ShowConfigurationsResponse, error) {
|
||||
return client.ShowConfigurations(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ret.(*internalpb.ShowConfigurationsResponse), err
|
||||
}
|
||||
|
||||
// GetMetrics get metrics
|
||||
|
@ -537,16 +402,9 @@ func (c *Client) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest)
|
|||
in.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.GetMetricsResponse, error) {
|
||||
return client.GetMetrics(ctx, in)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.GetMetricsResponse), err
|
||||
}
|
||||
|
||||
// CreateAlias create collection alias
|
||||
|
@ -556,16 +414,9 @@ func (c *Client) CreateAlias(ctx context.Context, req *milvuspb.CreateAliasReque
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateAlias(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// DropAlias drop collection alias
|
||||
|
@ -575,16 +426,9 @@ func (c *Client) DropAlias(ctx context.Context, req *milvuspb.DropAliasRequest)
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropAlias(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// AlterAlias alter collection alias
|
||||
|
@ -594,85 +438,43 @@ func (c *Client) AlterAlias(ctx context.Context, req *milvuspb.AlterAliasRequest
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.AlterAlias(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
|
||||
func (c *Client) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ImportResponse, error) {
|
||||
return client.Import(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ImportResponse), err
|
||||
}
|
||||
|
||||
// Check import task state from datanode
|
||||
func (c *Client) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.GetImportStateResponse, error) {
|
||||
return client.GetImportState(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.GetImportStateResponse), err
|
||||
}
|
||||
|
||||
// List id array of all import tasks
|
||||
func (c *Client) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ListImportTasksResponse, error) {
|
||||
return client.ListImportTasks(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ListImportTasksResponse), err
|
||||
}
|
||||
|
||||
// Report impot task state to rootcoord
|
||||
func (c *Client) ReportImport(ctx context.Context, req *rootcoordpb.ImportResult) (*commonpb.Status, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.ReportImport(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) CreateCredential(ctx context.Context, req *internalpb.CredentialInfo) (*commonpb.Status, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateCredential(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) GetCredential(ctx context.Context, req *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
|
||||
|
@ -681,29 +483,15 @@ func (c *Client) GetCredential(ctx context.Context, req *rootcoordpb.GetCredenti
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*rootcoordpb.GetCredentialResponse, error) {
|
||||
return client.GetCredential(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*rootcoordpb.GetCredentialResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) UpdateCredential(ctx context.Context, req *internalpb.CredentialInfo) (*commonpb.Status, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.UpdateCredential(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
|
||||
|
@ -712,16 +500,9 @@ func (c *Client) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCrede
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.DeleteCredential(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
|
||||
|
@ -730,16 +511,9 @@ func (c *Client) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersR
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.ListCredUsersResponse, error) {
|
||||
return client.ListCredUsers(ctx, req)
|
||||
})
|
||||
if err != nil || ret == nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.ListCredUsersResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
|
||||
|
@ -748,16 +522,9 @@ func (c *Client) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.CreateRole(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
|
||||
|
@ -766,16 +533,9 @@ func (c *Client) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.DropRole(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
|
||||
|
@ -784,16 +544,9 @@ func (c *Client) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserR
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.OperateUserRole(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
|
||||
|
@ -802,16 +555,9 @@ func (c *Client) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.SelectRoleResponse, error) {
|
||||
return client.SelectRole(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.SelectRoleResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
|
||||
|
@ -820,16 +566,9 @@ func (c *Client) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.SelectUserResponse, error) {
|
||||
return client.SelectUser(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.SelectUserResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
|
||||
|
@ -838,16 +577,9 @@ func (c *Client) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePriv
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.OperatePrivilege(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
||||
func (c *Client) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
|
||||
|
@ -856,16 +588,9 @@ func (c *Client) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantReque
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.SelectGrantResponse, error) {
|
||||
return client.SelectGrant(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.SelectGrantResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) ListPolicy(ctx context.Context, req *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
|
||||
|
@ -874,29 +599,15 @@ func (c *Client) ListPolicy(ctx context.Context, req *internalpb.ListPolicyReque
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*internalpb.ListPolicyResponse, error) {
|
||||
return client.ListPolicy(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*internalpb.ListPolicyResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*milvuspb.CheckHealthResponse, error) {
|
||||
return client.CheckHealth(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*milvuspb.CheckHealthResponse), err
|
||||
}
|
||||
|
||||
func (c *Client) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
|
||||
|
@ -905,14 +616,7 @@ func (c *Client) RenameCollection(ctx context.Context, req *milvuspb.RenameColle
|
|||
req.GetBase(),
|
||||
commonpbutil.FillMsgBaseFromClient(paramtable.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
|
||||
)
|
||||
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
|
||||
if !funcutil.CheckCtxValid(ctx) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return wrapGrpcCall(ctx, c, func(client rootcoordpb.RootCoordClient) (*commonpb.Status, error) {
|
||||
return client.RenameCollection(ctx, req)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ret.(*commonpb.Status), err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue