Fill SourceID (#20000)

Signed-off-by: lixinguo <xinguo.li@zilliz.com>

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/20041/head
smellthemoon 2022-10-24 23:43:30 +08:00 committed by GitHub
parent aa6b8af80d
commit 2f19e6595a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 463 additions and 90 deletions

View File

@ -40,6 +40,8 @@ import (
// ClientParams is the parameters of client singleton
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
var _ types.DataCoord = (*Client)(nil)
// Client is the datacoord grpc client
@ -164,7 +166,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
// Flush flushes a collection's data
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.FlushResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -216,7 +221,10 @@ func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
// error is returned only when some communication issue occurs
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -241,7 +249,10 @@ func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta
// error is returned only when some communication issue occurs
func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -266,7 +277,10 @@ func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
// error is returned only when some communication issue occurs
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -291,7 +305,10 @@ func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol
// error is returned only when some communication issue occurs
func (c *Client) GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -328,7 +345,10 @@ func (c *Client) GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringRes
// error is returned only when some communication issue occurs
func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -358,7 +378,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) {
// use Call here on purpose
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.Call(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -380,7 +403,10 @@ func (c *Client) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
// error is returned only when some communication issue occurs
func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInfoRequest) (*datapb.GetRecoveryInfoResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -404,7 +430,10 @@ func (c *Client) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
// error is returned only when some communication issue occurs
func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -427,7 +456,10 @@ func (c *Client) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS
// error is returned only when some communication issue occurs
func (c *Client) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegmentsByStatesRequest) (*datapb.GetSegmentsByStatesResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -443,7 +475,10 @@ func (c *Client) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment
// ShowConfigurations gets specified configurations para of DataCoord
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -460,7 +495,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics gets all metrics of datacoord
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -546,7 +584,10 @@ func (c *Client) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR
// DropVirtualChannel drops virtual channel in datacoord.
func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -562,7 +603,10 @@ func (c *Client) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual
// SetSegmentState sets the state of a given segment.
func (c *Client) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -578,7 +622,10 @@ func (c *Client) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStat
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -594,7 +641,10 @@ func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*da
// UpdateSegmentStatistics is the client side caller of UpdateSegmentStatistics.
func (c *Client) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -610,7 +660,10 @@ func (c *Client) UpdateSegmentStatistics(ctx context.Context, req *datapb.Update
// AcquireSegmentLock acquire the reference lock of the segments.
func (c *Client) AcquireSegmentLock(ctx context.Context, req *datapb.AcquireSegmentLockRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -626,7 +679,10 @@ func (c *Client) AcquireSegmentLock(ctx context.Context, req *datapb.AcquireSegm
// ReleaseSegmentLock release the reference lock of the segments.
func (c *Client) ReleaseSegmentLock(ctx context.Context, req *datapb.ReleaseSegmentLockRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -642,7 +698,10 @@ func (c *Client) ReleaseSegmentLock(ctx context.Context, req *datapb.ReleaseSegm
// SaveImportSegment is the DataCoord client side code for SaveImportSegment call.
func (c *Client) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSegmentRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -657,7 +716,10 @@ func (c *Client) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe
func (c *Client) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsImportingStateRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -672,7 +734,10 @@ func (c *Client) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsI
func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -688,7 +753,10 @@ func (c *Client) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmen
// BroadcastAlteredCollection is the DataCoord client side code for BroadcastAlteredCollection call.
func (c *Client) BroadcastAlteredCollection(ctx context.Context, req *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -24,6 +24,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -33,6 +34,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client is the grpc client for DataNode
type Client struct {
grpcClient grpcclient.GrpcClient[datapb.DataNodeClient]
@ -129,6 +132,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
// Deprecated
// WatchDmChannels create consumers on dmChannels to reveive Incremental data
func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -142,14 +149,22 @@ func (c *Client) WatchDmChannels(ctx context.Context, req *datapb.WatchDmChannel
}
// FlushSegments notifies DataNode to flush the segments req provids. The flush tasks are async to this
// rpc, DataNode will flush the segments in the background.
//
// rpc, DataNode will flush the segments in the background.
//
// Return UnexpectedError code in status:
// If DataNode isn't in HEALTHY: states not HEALTHY or dynamic checks not HEALTHY
// If DataNode doesn't find the correspounding segmentID in its memeory replica
//
// If DataNode isn't in HEALTHY: states not HEALTHY or dynamic checks not HEALTHY
// If DataNode doesn't find the correspounding segmentID in its memeory replica
//
// Return Success code in status and trigers background flush:
// Log an info log if a segment is under flushing
//
// Log an info log if a segment is under flushing
func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -164,6 +179,10 @@ func (c *Client) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsReq
// ShowConfigurations gets specified configurations para of DataNode
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -179,6 +198,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics returns metrics
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -206,6 +229,10 @@ func (c *Client) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*c
}
func (c *Client) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -220,6 +247,10 @@ func (c *Client) GetCompactionState(ctx context.Context, req *datapb.CompactionS
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -233,6 +264,10 @@ func (c *Client) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*co
}
func (c *Client) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -247,6 +282,10 @@ func (c *Client) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegme
// AddImportSegment is the DataNode client side code for AddImportSegment call.
func (c *Client) AddImportSegment(ctx context.Context, req *datapb.AddImportSegmentRequest) (*datapb.AddImportSegmentResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.DataNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client datapb.DataNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -39,6 +39,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client is the grpc client of IndexCoord.
type Client struct {
grpcClient grpcclient.GrpcClient[indexpb.IndexCoordClient]
@ -246,7 +248,10 @@ func (c *Client) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
// ShowConfigurations gets specified configurations para of IndexCoord
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.IndexCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -263,7 +268,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics gets the metrics info of IndexCoord.
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.IndexCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -34,6 +35,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client is the grpc client of IndexNode.
type Client struct {
grpcClient grpcclient.GrpcClient[indexpb.IndexNodeClient]
@ -183,6 +186,10 @@ func (c *Client) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReques
// ShowConfigurations gets specified configurations para of IndexNode
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.IndexNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -198,6 +205,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics gets the metrics info of IndexNode.
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.IndexNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client indexpb.IndexNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -34,6 +34,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client is the grpc client for Proxy
type Client struct {
grpcClient grpcclient.GrpcClient[proxypb.ProxyClient]
@ -126,7 +128,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
// InvalidateCollectionMetaCache invalidate collection meta cache
func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -141,7 +146,10 @@ func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, req *proxypb
func (c *Client) InvalidateCredentialCache(ctx context.Context, req *proxypb.InvalidateCredCacheRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -156,7 +164,10 @@ func (c *Client) InvalidateCredentialCache(ctx context.Context, req *proxypb.Inv
func (c *Client) UpdateCredentialCache(ctx context.Context, req *proxypb.UpdateCredCacheRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -171,7 +182,10 @@ func (c *Client) UpdateCredentialCache(ctx context.Context, req *proxypb.UpdateC
func (c *Client) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -188,7 +202,10 @@ func (c *Client) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refres
// because it only obtains the metrics of Proxy, not including the topological metrics of Query cluster and Data cluster.
func (c *Client) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -204,7 +221,10 @@ func (c *Client) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
// SetRates notifies Proxy to limit rates of requests.
func (c *Client) SetRates(ctx context.Context, req *proxypb.SetRatesRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.ProxyCfg.GetNodeID(), commonpbutil.WithTargetID(c.grpcClient.GetNodeID())),
)
ret, err := c.grpcClient.ReCall(ctx, func(client proxypb.ProxyClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -38,6 +38,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client is the grpc client of QueryCoord.
type Client struct {
grpcClient grpcclient.GrpcClient[querypb.QueryCoordClient]
@ -159,7 +161,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
// ShowCollections shows the collections in the QueryCoord.
func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -175,7 +180,10 @@ func (c *Client) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
// LoadCollection loads the data of the specified collections in the QueryCoord.
func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -191,7 +199,10 @@ func (c *Client) LoadCollection(ctx context.Context, req *querypb.LoadCollection
// ReleaseCollection release the data of the specified collections in the QueryCoord.
func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -207,7 +218,10 @@ func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
// ShowPartitions shows the partitions in the QueryCoord.
func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -223,7 +237,10 @@ func (c *Client) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
// LoadPartitions loads the data of the specified partitions in the QueryCoord.
func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -239,7 +256,10 @@ func (c *Client) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
// ReleasePartitions release the data of the specified partitions in the QueryCoord.
func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -255,7 +275,10 @@ func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
// GetPartitionStates gets the states of the specified partition.
func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -271,7 +294,10 @@ func (c *Client) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti
// GetSegmentInfo gets the information of the specified segment from QueryCoord.
func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -287,7 +313,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
// LoadBalance migrate the sealed segments on the source node to the dst nodes.
func (c *Client) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -303,7 +332,10 @@ func (c *Client) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
// ShowConfigurations gets specified configurations para of QueryCoord
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -320,7 +352,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics gets the metrics information of QueryCoord.
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -336,7 +371,10 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
// GetReplicas gets the replicas of a certain collection.
func (c *Client) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -352,7 +390,10 @@ func (c *Client) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
// GetShardLeaders gets the shard leaders of a certain collection.
func (c *Client) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.sess.ServerID))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -34,6 +35,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client is the grpc client of QueryNode.
type Client struct {
grpcClient grpcclient.GrpcClient[querypb.QueryNodeClient]
@ -140,6 +143,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
// WatchDmChannels watches the channels about data manipulation.
func (c *Client) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChannelsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -154,6 +161,10 @@ func (c *Client) WatchDmChannels(ctx context.Context, req *querypb.WatchDmChanne
// UnsubDmChannel unsubscribes the channels about data manipulation.
func (c *Client) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -168,6 +179,10 @@ func (c *Client) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannel
// LoadSegments loads the segments to search.
func (c *Client) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -182,6 +197,10 @@ func (c *Client) LoadSegments(ctx context.Context, req *querypb.LoadSegmentsRequ
// ReleaseCollection releases the data of the specified collection in QueryNode.
func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -196,6 +215,10 @@ func (c *Client) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
// ReleasePartitions releases the data of the specified partitions in QueryNode.
func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -210,6 +233,10 @@ func (c *Client) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
// ReleaseSegments releases the data of the specified segments in QueryNode.
func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -252,6 +279,10 @@ func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*interna
// GetSegmentInfo gets the information of the specified segments in QueryNode.
func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -266,6 +297,10 @@ func (c *Client) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
// SyncReplicaSegments syncs replica node segments information to shard leaders.
func (c *Client) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -280,6 +315,10 @@ func (c *Client) SyncReplicaSegments(ctx context.Context, req *querypb.SyncRepli
// ShowConfigurations gets specified configurations para of QueryNode
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -295,6 +334,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics gets the metrics information of QueryNode.
func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.ReCall(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -321,6 +364,10 @@ func (c *Client) GetStatistics(ctx context.Context, request *querypb.GetStatisti
}
func (c *Client) GetDataDistribution(ctx context.Context, req *querypb.GetDataDistributionRequest) (*querypb.GetDataDistributionResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -334,6 +381,10 @@ func (c *Client) GetDataDistribution(ctx context.Context, req *querypb.GetDataDi
}
func (c *Client) SyncDistribution(ctx context.Context, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.QueryNodeCfg.GetNodeID()))
ret, err := c.grpcClient.Call(ctx, func(client querypb.QueryNodeClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -39,6 +39,8 @@ import (
var ClientParams paramtable.GrpcClientConfig
var Params paramtable.ComponentParam
// Client grpc client
type Client struct {
grpcClient grpcclient.GrpcClient[rootcoordpb.RootCoordClient]
@ -165,7 +167,10 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
// CreateCollection create collection
func (c *Client) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -181,7 +186,10 @@ func (c *Client) CreateCollection(ctx context.Context, in *milvuspb.CreateCollec
// DropCollection drop collection
func (c *Client) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -197,7 +205,10 @@ func (c *Client) DropCollection(ctx context.Context, in *milvuspb.DropCollection
// HasCollection check collection existence
func (c *Client) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -213,7 +224,10 @@ func (c *Client) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRe
// DescribeCollection return collection info
func (c *Client) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -229,7 +243,10 @@ func (c *Client) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCo
// ShowCollections list all collection names
func (c *Client) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -244,7 +261,10 @@ func (c *Client) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectio
func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
request = typeutil.Clone(request)
commonpbutil.UpdateMsgBase(request.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
request.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -260,7 +280,10 @@ func (c *Client) AlterCollection(ctx context.Context, request *milvuspb.AlterCol
// CreatePartition create partition
func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -276,7 +299,10 @@ func (c *Client) CreatePartition(ctx context.Context, in *milvuspb.CreatePartiti
// DropPartition drop partition
func (c *Client) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -292,7 +318,10 @@ func (c *Client) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRe
// HasPartition check partition existence
func (c *Client) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -308,7 +337,10 @@ func (c *Client) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequ
// ShowPartitions list all partitions in collection
func (c *Client) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -324,7 +356,10 @@ func (c *Client) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitions
// AllocTimestamp global timestamp allocator
func (c *Client) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -340,7 +375,10 @@ func (c *Client) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimest
// AllocID global ID allocator
func (c *Client) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -356,7 +394,10 @@ func (c *Client) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*
// UpdateChannelTimeTick used to handle ChannelTimeTickMsg
func (c *Client) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -372,7 +413,10 @@ func (c *Client) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Chann
// ShowSegments list all segments
func (c *Client) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -388,7 +432,10 @@ func (c *Client) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequ
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -404,7 +451,10 @@ func (c *Client) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.
// ShowConfigurations gets specified configurations para of RootCoord
func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -421,7 +471,10 @@ func (c *Client) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon
// GetMetrics get metrics
func (c *Client) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
in = typeutil.Clone(in)
commonpbutil.UpdateMsgBase(in.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
in.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -437,7 +490,10 @@ func (c *Client) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest)
// CreateAlias create collection alias
func (c *Client) CreateAlias(ctx context.Context, req *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -453,7 +509,10 @@ func (c *Client) CreateAlias(ctx context.Context, req *milvuspb.CreateAliasReque
// DropAlias drop collection alias
func (c *Client) DropAlias(ctx context.Context, req *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -469,7 +528,10 @@ func (c *Client) DropAlias(ctx context.Context, req *milvuspb.DropAliasRequest)
// AlterAlias alter collection alias
func (c *Client) AlterAlias(ctx context.Context, req *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -553,7 +615,10 @@ func (c *Client) CreateCredential(ctx context.Context, req *internalpb.Credentia
func (c *Client) GetCredential(ctx context.Context, req *rootcoordpb.GetCredentialRequest) (*rootcoordpb.GetCredentialResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -581,7 +646,10 @@ func (c *Client) UpdateCredential(ctx context.Context, req *internalpb.Credentia
func (c *Client) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCredentialRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -596,7 +664,10 @@ func (c *Client) DeleteCredential(ctx context.Context, req *milvuspb.DeleteCrede
func (c *Client) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersRequest) (*milvuspb.ListCredUsersResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -611,7 +682,10 @@ func (c *Client) ListCredUsers(ctx context.Context, req *milvuspb.ListCredUsersR
func (c *Client) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -626,7 +700,10 @@ func (c *Client) CreateRole(ctx context.Context, req *milvuspb.CreateRoleRequest
func (c *Client) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -641,7 +718,10 @@ func (c *Client) DropRole(ctx context.Context, req *milvuspb.DropRoleRequest) (*
func (c *Client) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserRoleRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -656,7 +736,10 @@ func (c *Client) OperateUserRole(ctx context.Context, req *milvuspb.OperateUserR
func (c *Client) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest) (*milvuspb.SelectRoleResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -671,7 +754,10 @@ func (c *Client) SelectRole(ctx context.Context, req *milvuspb.SelectRoleRequest
func (c *Client) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest) (*milvuspb.SelectUserResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -686,7 +772,10 @@ func (c *Client) SelectUser(ctx context.Context, req *milvuspb.SelectUserRequest
func (c *Client) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePrivilegeRequest) (*commonpb.Status, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -701,7 +790,10 @@ func (c *Client) OperatePrivilege(ctx context.Context, req *milvuspb.OperatePriv
func (c *Client) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantRequest) (*milvuspb.SelectGrantResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
@ -716,7 +808,10 @@ func (c *Client) SelectGrant(ctx context.Context, req *milvuspb.SelectGrantReque
func (c *Client) ListPolicy(ctx context.Context, req *internalpb.ListPolicyRequest) (*internalpb.ListPolicyResponse, error) {
req = typeutil.Clone(req)
commonpbutil.UpdateMsgBase(req.GetBase(), commonpbutil.FillMsgBaseFromClient(c.grpcClient.GetNodeID()))
commonpbutil.UpdateMsgBase(
req.GetBase(),
commonpbutil.FillMsgBaseFromClient(Params.RootCoordCfg.GetNodeID(), commonpbutil.WithTargetID(c.sess.ServerID)),
)
ret, err := c.grpcClient.ReCall(ctx, func(client rootcoordpb.RootCoordClient) (any, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()

View File

@ -50,16 +50,27 @@ func WithSourceID(sourceID int64) MsgBaseOptions {
}
}
func WithTargetID(targetID int64) MsgBaseOptions {
return func(msgBase *commonpb.MsgBase) {
msgBase.TargetID = targetID
}
}
func GetNowTimestamp() uint64 {
return uint64(time.Now().Unix())
}
func FillMsgBaseFromClient(targetID int64) MsgBaseOptions {
func FillMsgBaseFromClient(sourceID int64, options ...MsgBaseOptions) MsgBaseOptions {
return func(msgBase *commonpb.MsgBase) {
if msgBase.Timestamp == 0 {
msgBase.Timestamp = GetNowTimestamp()
}
msgBase.TargetID = targetID
if msgBase.SourceID == 0 {
msgBase.SourceID = sourceID
}
for _, op := range options {
op(msgBase)
}
}
}
@ -88,5 +99,4 @@ func UpdateMsgBase(msgBase *commonpb.MsgBase, options ...MsgBaseOptions) *common
op(msgBaseRt)
}
return msgBaseRt
}

View File

@ -111,7 +111,7 @@ func (p *ComponentParam) KafkaEnable() bool {
return p.KafkaCfg.Address != ""
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- common ---
type commonConfig struct {
Base *BaseTable
@ -453,7 +453,7 @@ func (p *commonConfig) initSessionRetryTimes() {
p.SessionRetryTimes = p.Base.ParseInt64WithDefault("common.session.retryTimes", 30)
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- rootcoord ---
type rootCoordConfig struct {
Base *BaseTable
@ -461,6 +461,8 @@ type rootCoordConfig struct {
Address string
Port int
NodeID atomic.Value
DmlChannelNum int64
MaxPartitionNum int64
MinSegmentSizeToEnableIndex int64
@ -485,9 +487,22 @@ func (p *rootCoordConfig) init(base *BaseTable) {
p.ImportTaskRetention = p.Base.ParseFloatWithDefault("rootCoord.importTaskRetention", 24*60*60)
p.ImportTaskSubPath = "importtask"
p.EnableActiveStandby = p.Base.ParseBool("rootCoord.enableActiveStandby", false)
p.NodeID.Store(UniqueID(0))
}
///////////////////////////////////////////////////////////////////////////////
func (p *rootCoordConfig) SetNodeID(id UniqueID) {
p.NodeID.Store(id)
}
func (p *rootCoordConfig) GetNodeID() UniqueID {
val := p.NodeID.Load()
if val != nil {
return val.(UniqueID)
}
return 0
}
// /////////////////////////////////////////////////////////////////////////////
// --- proxy ---
type proxyConfig struct {
Base *BaseTable
@ -666,7 +681,7 @@ func (p *proxyConfig) initMaxRoleNum() {
p.MaxRoleNum = int(maxRoleNum)
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- querycoord ---
type queryCoordConfig struct {
Base *BaseTable
@ -855,7 +870,7 @@ func (p *queryCoordConfig) GetNodeID() UniqueID {
return 0
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- querynode ---
type queryNodeConfig struct {
Base *BaseTable
@ -1102,7 +1117,7 @@ func (p *queryNodeConfig) initDiskCapacity() {
p.DiskCapacityLimit = diskSize * 1024 * 1024 * 1024
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- datacoord ---
type dataCoordConfig struct {
Base *BaseTable
@ -1319,7 +1334,7 @@ func (p *dataCoordConfig) GetNodeID() UniqueID {
return 0
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- datanode ---
type dataNodeConfig struct {
Base *BaseTable
@ -1401,7 +1416,7 @@ func (p *dataNodeConfig) GetNodeID() UniqueID {
return 0
}
///////////////////////////////////////////////////////////////////////////////
// /////////////////////////////////////////////////////////////////////////////
// --- indexcoord ---
type indexCoordConfig struct {
Base *BaseTable
@ -1414,6 +1429,8 @@ type indexCoordConfig struct {
WithCredential bool
IndexNodeID int64
NodeID atomic.Value
MinSegmentNumRowsToEnableIndex int64
GCInterval time.Duration
@ -1434,6 +1451,7 @@ func (p *indexCoordConfig) init(base *BaseTable) {
p.initWithCredential()
p.initIndexNodeID()
p.initEnableActiveStandby()
p.NodeID.Store(UniqueID(0))
}
func (p *indexCoordConfig) initMinSegmentNumRowsToEnableIndex() {
@ -1464,7 +1482,19 @@ func (p *indexCoordConfig) initEnableActiveStandby() {
p.EnableActiveStandby = p.Base.ParseBool("indexCoord.enableActiveStandby", false)
}
///////////////////////////////////////////////////////////////////////////////
func (p *indexCoordConfig) SetNodeID(id UniqueID) {
p.NodeID.Store(id)
}
func (p *indexCoordConfig) GetNodeID() UniqueID {
val := p.NodeID.Load()
if val != nil {
return val.(UniqueID)
}
return 0
}
// /////////////////////////////////////////////////////////////////////////////
// --- indexnode ---
type indexNodeConfig struct {
Base *BaseTable