mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: longjiquan <jiquan.long@zilliz.com> Signed-off-by: longjiquan <jiquan.long@zilliz.com>pull/20469/head
parent
a1bc8c2bb2
commit
6faa579928
|
@ -98,7 +98,7 @@ func (ticker *channelsTimeTickerImpl) tick() error {
|
|||
|
||||
stats, err2 := ticker.getStatisticsFunc()
|
||||
if err2 != nil {
|
||||
log.Debug("Proxy channelsTimeTickerImpl failed to getStatistics", zap.Error(err2))
|
||||
log.Warn("failed to get tt statistics", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1850,7 +1850,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
|
|||
defer sp.Finish()
|
||||
traceID, _, _ := trace.InfoFromSpan(sp)
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc()
|
||||
logger.Info(
|
||||
logger.Debug(
|
||||
rpcReceived(method),
|
||||
zap.String("traceID", traceID),
|
||||
zap.Any("request", request))
|
||||
|
@ -1898,7 +1898,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get
|
|||
}
|
||||
}
|
||||
|
||||
logger.Info(
|
||||
logger.Debug(
|
||||
rpcDone(method),
|
||||
zap.String("traceID", traceID),
|
||||
zap.Any("request", request))
|
||||
|
@ -2463,8 +2463,8 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
|||
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Insert")
|
||||
defer sp.Finish()
|
||||
traceID, _, _ := trace.InfoFromSpan(sp)
|
||||
log.Info("Start processing insert request in Proxy", zap.String("traceID", traceID))
|
||||
defer log.Info("Finish processing insert request in Proxy", zap.String("traceID", traceID))
|
||||
log.Debug("Start processing insert request in Proxy", zap.String("traceID", traceID))
|
||||
defer log.Debug("Finish processing insert request in Proxy", zap.String("traceID", traceID))
|
||||
|
||||
if !node.checkHealthy() {
|
||||
return &milvuspb.MutationResult{
|
||||
|
@ -2537,7 +2537,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
|||
zap.String("traceID", traceID))
|
||||
|
||||
if err := node.sched.dmQueue.Enqueue(it); err != nil {
|
||||
log.Debug("Failed to enqueue insert task: " + err.Error())
|
||||
log.Warn("Failed to enqueue insert task: " + err.Error())
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
|
||||
metrics.AbandonLabel).Inc()
|
||||
return constructFailedResponse(err), nil
|
||||
|
@ -2555,7 +2555,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
|
|||
zap.String("traceID", traceID))
|
||||
|
||||
if err := it.WaitToFinish(); err != nil {
|
||||
log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
|
||||
log.Warn("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
|
||||
metrics.FailLabel).Inc()
|
||||
return constructFailedResponse(err), nil
|
||||
|
@ -2591,8 +2591,8 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
|
|||
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Delete")
|
||||
defer sp.Finish()
|
||||
traceID, _, _ := trace.InfoFromSpan(sp)
|
||||
log.Info("Start processing delete request in Proxy", zap.String("traceID", traceID))
|
||||
defer log.Info("Finish processing delete request in Proxy", zap.String("traceID", traceID))
|
||||
log.Debug("Start processing delete request in Proxy", zap.String("traceID", traceID))
|
||||
defer log.Debug("Finish processing delete request in Proxy", zap.String("traceID", traceID))
|
||||
|
||||
receiveSize := proto.Size(request)
|
||||
rateCol.Add(internalpb.RateType_DMLDelete.String(), float64(receiveSize))
|
||||
|
@ -2721,7 +2721,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
|
|||
travelTs := request.TravelTimestamp
|
||||
guaranteeTs := request.GuaranteeTimestamp
|
||||
|
||||
log.Ctx(ctx).Info(
|
||||
log.Ctx(ctx).Debug(
|
||||
rpcReceived(method),
|
||||
zap.String("role", typeutil.ProxyRole),
|
||||
zap.String("db", request.DbName),
|
||||
|
@ -2969,7 +2969,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
|
|||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method,
|
||||
metrics.TotalLabel).Inc()
|
||||
|
||||
log.Ctx(ctx).Info(
|
||||
log.Ctx(ctx).Debug(
|
||||
rpcReceived(method),
|
||||
zap.String("role", typeutil.ProxyRole),
|
||||
zap.String("db", request.DbName),
|
||||
|
@ -3594,14 +3594,14 @@ func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milv
|
|||
// TODO(wxyu): change name RequestType to Request
|
||||
drt, err := parseDummyRequestType(req.RequestType)
|
||||
if err != nil {
|
||||
log.Debug("Failed to parse dummy request type")
|
||||
log.Warn("Failed to parse dummy request type", zap.Error(err))
|
||||
return failedResponse, nil
|
||||
}
|
||||
|
||||
if drt.RequestType == "query" {
|
||||
drr, err := parseDummyQueryRequest(req.RequestType)
|
||||
if err != nil {
|
||||
log.Debug("Failed to parse dummy query request")
|
||||
log.Warn("Failed to parse dummy query request", zap.Error(err))
|
||||
return failedResponse, nil
|
||||
}
|
||||
|
||||
|
@ -3614,7 +3614,7 @@ func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milv
|
|||
|
||||
_, err = node.Query(ctx, request)
|
||||
if err != nil {
|
||||
log.Debug("Failed to execute dummy query")
|
||||
log.Warn("Failed to execute dummy query", zap.Error(err))
|
||||
return failedResponse, err
|
||||
}
|
||||
|
||||
|
@ -3722,7 +3722,7 @@ func (node *Proxy) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReque
|
|||
return metrics, nil
|
||||
}
|
||||
|
||||
log.Debug("Proxy.GetMetrics failed, request metric type is not implemented yet",
|
||||
log.Warn("Proxy.GetMetrics failed, request metric type is not implemented yet",
|
||||
zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
|
||||
zap.String("req", req.Request),
|
||||
zap.String("metric_type", metricType))
|
||||
|
@ -3799,7 +3799,7 @@ func (node *Proxy) GetProxyMetrics(ctx context.Context, req *milvuspb.GetMetrics
|
|||
return proxyMetrics, nil
|
||||
}
|
||||
|
||||
log.Debug("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
|
||||
log.Warn("Proxy.GetProxyMetrics failed, request metric type is not implemented yet",
|
||||
zap.Int64("node_id", Params.ProxyCfg.GetNodeID()),
|
||||
zap.String("req", req.Request),
|
||||
zap.String("metric_type", metricType))
|
||||
|
@ -3863,7 +3863,7 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq
|
|||
|
||||
// GetReplicas gets replica info
|
||||
func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
|
||||
log.Info("received get replicas request")
|
||||
log.Debug("received get replicas request", zap.Int64("collection", req.GetCollectionID()), zap.Bool("with shard nodes", req.GetWithShardNodes()))
|
||||
resp := &milvuspb.GetReplicasResponse{}
|
||||
if !node.checkHealthy() {
|
||||
resp.Status = unhealthyStatus()
|
||||
|
@ -3882,13 +3882,13 @@ func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReq
|
|||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err))
|
||||
log.Debug("received get replicas response", zap.Any("resp", resp), zap.Error(err))
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// GetCompactionState gets the compaction state of multiple segments
|
||||
func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) {
|
||||
log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
|
||||
log.Debug("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID()))
|
||||
resp := &milvuspb.GetCompactionStateResponse{}
|
||||
if !node.checkHealthy() {
|
||||
resp.Status = unhealthyStatus()
|
||||
|
@ -3896,7 +3896,7 @@ func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetComp
|
|||
}
|
||||
|
||||
resp, err := node.dataCoord.GetCompactionState(ctx, req)
|
||||
log.Info("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
|
||||
log.Debug("received GetCompactionState response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
|
@ -3916,7 +3916,7 @@ func (node *Proxy) ManualCompaction(ctx context.Context, req *milvuspb.ManualCom
|
|||
|
||||
// GetCompactionStateWithPlans returns the compactions states with the given plan ID
|
||||
func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb.GetCompactionPlansRequest) (*milvuspb.GetCompactionPlansResponse, error) {
|
||||
log.Info("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
|
||||
log.Debug("received GetCompactionStateWithPlans request", zap.Int64("compactionID", req.GetCompactionID()))
|
||||
resp := &milvuspb.GetCompactionPlansResponse{}
|
||||
if !node.checkHealthy() {
|
||||
resp.Status = unhealthyStatus()
|
||||
|
@ -3924,27 +3924,27 @@ func (node *Proxy) GetCompactionStateWithPlans(ctx context.Context, req *milvusp
|
|||
}
|
||||
|
||||
resp, err := node.dataCoord.GetCompactionStateWithPlans(ctx, req)
|
||||
log.Info("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
|
||||
log.Debug("received GetCompactionStateWithPlans response", zap.Int64("compactionID", req.GetCompactionID()), zap.Any("resp", resp), zap.Error(err))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// GetFlushState gets the flush state of multiple segments
|
||||
func (node *Proxy) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateRequest) (*milvuspb.GetFlushStateResponse, error) {
|
||||
log.Info("received get flush state request", zap.Any("request", req))
|
||||
log.Debug("received get flush state request", zap.Any("request", req))
|
||||
var err error
|
||||
resp := &milvuspb.GetFlushStateResponse{}
|
||||
if !node.checkHealthy() {
|
||||
resp.Status = unhealthyStatus()
|
||||
log.Info("unable to get flush state because of closed server")
|
||||
log.Warn("unable to get flush state because of closed server")
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
resp, err = node.dataCoord.GetFlushState(ctx, req)
|
||||
if err != nil {
|
||||
log.Info("failed to get flush state response", zap.Error(err))
|
||||
log.Warn("failed to get flush state response", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
log.Info("received get flush state response", zap.Any("response", resp))
|
||||
log.Debug("received get flush state response", zap.Any("response", resp))
|
||||
return resp, err
|
||||
}
|
||||
|
||||
|
@ -4014,7 +4014,7 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi
|
|||
|
||||
// GetImportState checks import task state from RootCoord.
|
||||
func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
|
||||
log.Info("received get import state request", zap.Int64("taskID", req.GetTask()))
|
||||
log.Debug("received get import state request", zap.Int64("taskID", req.GetTask()))
|
||||
resp := &milvuspb.GetImportStateResponse{}
|
||||
if !node.checkHealthy() {
|
||||
resp.Status = unhealthyStatus()
|
||||
|
@ -4034,7 +4034,7 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
log.Info("successfully received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
|
||||
log.Debug("successfully received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
|
||||
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return resp, nil
|
||||
|
@ -4042,7 +4042,7 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt
|
|||
|
||||
// ListImportTasks get id array of all import tasks from rootcoord
|
||||
func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
|
||||
log.Info("received list import tasks request")
|
||||
log.Debug("received list import tasks request")
|
||||
resp := &milvuspb.ListImportTasksResponse{}
|
||||
if !node.checkHealthy() {
|
||||
resp.Status = unhealthyStatus()
|
||||
|
@ -4061,7 +4061,7 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
log.Info("successfully received list import tasks response", zap.String("collection", req.CollectionName), zap.Any("tasks", resp.Tasks))
|
||||
log.Debug("successfully received list import tasks response", zap.String("collection", req.CollectionName), zap.Any("tasks", resp.Tasks))
|
||||
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
|
||||
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return resp, err
|
||||
|
|
Loading…
Reference in New Issue