From d143682d7d28fbb6c7d8d7c93f04945f043a3520 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Mon, 19 Jun 2023 13:28:41 +0800 Subject: [PATCH] Refactor logs in proxy package. (#24936) Signed-off-by: Enwei Jiao --- internal/proxy/meta_cache.go | 4 +- internal/proxy/multi_rate_limiter.go | 10 ---- internal/proxy/privilege_interceptor.go | 12 ++-- internal/proxy/proxy.go | 40 +++++--------- internal/proxy/task_calc_distance.go | 73 +++++++++---------------- internal/proxy/task_delete.go | 6 +- internal/proxy/task_insert.go | 18 +++--- internal/proxy/task_query.go | 6 +- internal/proxy/task_scheduler.go | 6 +- internal/proxy/task_search.go | 13 ----- internal/proxy/task_statistic.go | 17 +++--- internal/proxy/task_upsert.go | 28 +++++----- 12 files changed, 85 insertions(+), 148 deletions(-) diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 475dd70c9c..5fe459a659 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -372,7 +372,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m return nil, err } metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) - log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.Any("collectionName", collectionName)) + log.Debug("proxy", zap.Any("GetPartitions:partitions after update", partitions), zap.String("collectionName", collectionName)) ret := make(map[string]typeutil.UniqueID) partInfo := m.collInfo[collectionName].partInfo for k, v := range partInfo { @@ -414,7 +414,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, return nil, err } metrics.ProxyUpdateCacheLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(tr.ElapseSpan().Milliseconds())) - log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName)) + log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.String("collectionName", collectionName)) partInfo, ok = m.collInfo[collectionName].partInfo[partitionName] if !ok { return nil, merr.WrapErrPartitionNotFound(partitionName) diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index e8ae26b2cc..3b1f189e99 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -259,16 +259,6 @@ func setRateGaugeByRateType(rateType internalpb.RateType, nodeID int64, collecti } } -// printRates logs the rate info. -func (rl *rateLimiter) printRates(rates []*internalpb.Rate) { - //fmt.Printf("RateLimiter set rates:\n---------------------------------\n") - //for _, r := range rates { - // fmt.Printf("%s -> %v\n", r.GetRt().String(), r.GetR()) - //} - //fmt.Printf("---------------------------------\n") - log.Debug("RateLimiter setRates", zap.Any("rates", rates)) -} - // registerLimiters register limiter for all rate types. func (rl *rateLimiter) registerLimiters(globalLevel bool) { log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0) diff --git a/internal/proxy/privilege_interceptor.go b/internal/proxy/privilege_interceptor.go index 6e164fb131..6351170d38 100644 --- a/internal/proxy/privilege_interceptor.go +++ b/internal/proxy/privilege_interceptor.go @@ -68,7 +68,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context log.Debug("PrivilegeInterceptor", zap.String("type", reflect.TypeOf(req).String())) privilegeExt, err := funcutil.GetPrivilegeExtObj(req) if err != nil { - log.Debug("GetPrivilegeExtObj err", zap.Error(err)) + log.Warn("GetPrivilegeExtObj err", zap.Error(err)) return ctx, nil } username, err := GetCurUserFromContext(ctx) @@ -96,7 +96,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context objectPrivilege := privilegeExt.ObjectPrivilege.String() policyInfo := strings.Join(globalMetaCache.GetPrivilegeInfo(ctx), ",") - logWithCurrentRequestInfo := log.With(zap.String("username", username), zap.Strings("role_names", roleNames), + log := log.With(zap.String("username", username), zap.Strings("role_names", roleNames), zap.String("object_type", objectType), zap.String("object_privilege", objectPrivilege), zap.Int32("object_index", objectNameIndex), zap.String("object_name", objectName), zap.Int32("object_indexs", objectNameIndexs), zap.Strings("object_names", objectNames), @@ -109,7 +109,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context casbinModel := templateModel.Copy() e, err := casbin.NewEnforcer(casbinModel, a) if err != nil { - logWithCurrentRequestInfo.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err)) + log.Warn("NewEnforcer fail", zap.String("policy", policy), zap.Error(err)) return ctx, err } for _, roleName := range roleNames { @@ -126,7 +126,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context // handle the api which refers one resource permitObject, err := permitFunc(objectName) if err != nil { - logWithCurrentRequestInfo.Warn("fail to execute permit func", zap.String("name", objectName), zap.Error(err)) + log.Warn("fail to execute permit func", zap.String("name", objectName), zap.Error(err)) return ctx, err } if permitObject { @@ -140,7 +140,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context for _, name := range objectNames { p, err := permitFunc(name) if err != nil { - logWithCurrentRequestInfo.Warn("fail to execute permit func", zap.String("name", name), zap.Error(err)) + log.Warn("fail to execute permit func", zap.String("name", name), zap.Error(err)) return ctx, err } if !p { @@ -154,7 +154,7 @@ func PrivilegeInterceptor(ctx context.Context, req interface{}) (context.Context } } - logWithCurrentRequestInfo.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames)) + log.Info("permission deny", zap.String("policy", policy), zap.Strings("roles", roleNames)) return ctx, status.Error(codes.PermissionDenied, fmt.Sprintf("%s: permission deny", objectPrivilege)) } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index a5c91c7a4b..504bbb7202 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -189,7 +189,6 @@ func (node *Proxy) Init() error { log.Info("init session for Proxy done") node.factory.Init(Params) - log.Debug("init parameters for factory", zap.String("role", typeutil.ProxyRole), zap.Any("parameters", Params.GetAll())) accesslog.SetupAccseeLog(&Params.ProxyCfg.AccessLog, &Params.MinioCfg) log.Debug("init access log for Proxy done") @@ -200,67 +199,58 @@ func (node *Proxy) Init() error { } log.Info("Proxy init rateCollector done", zap.Int64("nodeID", paramtable.GetNodeID())) - log.Debug("create id allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) idAllocator, err := allocator.NewIDAllocator(node.ctx, node.rootCoord, paramtable.GetNodeID()) if err != nil { log.Warn("failed to create id allocator", - zap.Error(err), - zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) + zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()), + zap.Error(err)) return err } node.rowIDAllocator = idAllocator log.Debug("create id allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) - log.Debug("create timestamp allocator", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) tsoAllocator, err := newTimestampAllocator(node.rootCoord, paramtable.GetNodeID()) if err != nil { log.Warn("failed to create timestamp allocator", - zap.Error(err), - zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) + zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()), + zap.Error(err)) return err } node.tsoAllocator = tsoAllocator log.Debug("create timestamp allocator done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) - log.Debug("create segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) segAssigner, err := newSegIDAssigner(node.ctx, node.dataCoord, node.lastTick) if err != nil { log.Warn("failed to create segment id assigner", - zap.Error(err), - zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) + zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID()), + zap.Error(err)) return err } node.segAssigner = segAssigner node.segAssigner.PeerID = paramtable.GetNodeID() log.Debug("create segment id assigner done", zap.String("role", typeutil.ProxyRole), zap.Int64("ProxyID", paramtable.GetNodeID())) - log.Debug("create channels manager", zap.String("role", typeutil.ProxyRole)) dmlChannelsFunc := getDmlChannelsFunc(node.ctx, node.rootCoord) chMgr := newChannelsMgrImpl(dmlChannelsFunc, defaultInsertRepackFunc, node.factory) node.chMgr = chMgr log.Debug("create channels manager done", zap.String("role", typeutil.ProxyRole)) - log.Debug("create task scheduler", zap.String("role", typeutil.ProxyRole)) node.sched, err = newTaskScheduler(node.ctx, node.tsoAllocator, node.factory) if err != nil { - log.Warn("failed to create task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole)) + log.Warn("failed to create task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err)) return err } log.Debug("create task scheduler done", zap.String("role", typeutil.ProxyRole)) syncTimeTickInterval := Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond) / 2 - log.Debug("create channels time ticker", - zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval)) node.chTicker = newChannelsTimeTicker(node.ctx, Params.ProxyCfg.TimeTickInterval.GetAsDuration(time.Millisecond)/2, []string{}, node.sched.getPChanStatistics, tsoAllocator) - log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole)) + log.Debug("create channels time ticker done", zap.String("role", typeutil.ProxyRole), zap.Duration("syncTimeTickInterval", syncTimeTickInterval)) - log.Debug("create metrics cache manager", zap.String("role", typeutil.ProxyRole)) node.metricsCacheManager = metricsinfo.NewMetricsCacheManager() log.Debug("create metrics cache manager done", zap.String("role", typeutil.ProxyRole)) - log.Debug("init meta cache", zap.String("role", typeutil.ProxyRole)) if err := InitMetaCache(node.ctx, node.rootCoord, node.queryCoord, node.shardMgr); err != nil { - log.Warn("failed to init meta cache", zap.Error(err), zap.String("role", typeutil.ProxyRole)) + log.Warn("failed to init meta cache", zap.String("role", typeutil.ProxyRole), zap.Error(err)) return err } log.Debug("init meta cache done", zap.String("role", typeutil.ProxyRole)) @@ -351,30 +341,26 @@ func (node *Proxy) sendChannelsTimeTickLoop() { // Start starts a proxy node. func (node *Proxy) Start() error { - log.Debug("start task scheduler", zap.String("role", typeutil.ProxyRole)) if err := node.sched.Start(); err != nil { - log.Warn("failed to start task scheduler", zap.Error(err), zap.String("role", typeutil.ProxyRole)) + log.Warn("failed to start task scheduler", zap.String("role", typeutil.ProxyRole), zap.Error(err)) return err } log.Debug("start task scheduler done", zap.String("role", typeutil.ProxyRole)) - log.Debug("start id allocator", zap.String("role", typeutil.ProxyRole)) if err := node.rowIDAllocator.Start(); err != nil { - log.Warn("failed to start id allocator", zap.Error(err), zap.String("role", typeutil.ProxyRole)) + log.Warn("failed to start id allocator", zap.String("role", typeutil.ProxyRole), zap.Error(err)) return err } log.Debug("start id allocator done", zap.String("role", typeutil.ProxyRole)) - log.Debug("start segment id assigner", zap.String("role", typeutil.ProxyRole)) if err := node.segAssigner.Start(); err != nil { - log.Warn("failed to start segment id assigner", zap.Error(err), zap.String("role", typeutil.ProxyRole)) + log.Warn("failed to start segment id assigner", zap.String("role", typeutil.ProxyRole), zap.Error(err)) return err } log.Debug("start segment id assigner done", zap.String("role", typeutil.ProxyRole)) - log.Debug("start channels time ticker", zap.String("role", typeutil.ProxyRole)) if err := node.chTicker.start(); err != nil { - log.Warn("failed to start channels time ticker", zap.Error(err), zap.String("role", typeutil.ProxyRole)) + log.Warn("failed to start channels time ticker", zap.String("role", typeutil.ProxyRole), zap.Error(err)) return err } log.Debug("start channels time ticker done", zap.String("role", typeutil.ProxyRole)) diff --git a/internal/proxy/task_calc_distance.go b/internal/proxy/task_calc_distance.go index 6d803c16d9..70418d6d5e 100644 --- a/internal/proxy/task_calc_distance.go +++ b/internal/proxy/task_calc_distance.go @@ -176,24 +176,21 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi return t.arrangeVectorsByIntID(inputIds, dict, retrievedVectors) } + log := log.Ctx(ctx) log.Debug("CalcDistance received", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole), zap.String("metric", metric)) vectorsLeft := request.GetOpLeft().GetDataArray() opLeft := request.GetOpLeft().GetIdArray() if opLeft != nil { - log.Debug("OpLeft IdArray not empty, Get vectors by id", - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Debug("OpLeft IdArray not empty, Get vectors by id", zap.String("role", typeutil.ProxyRole)) result, err := t.queryFunc(opLeft) if err != nil { - log.Debug("Failed to get left vectors by id", - zap.Error(err), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Warn("Failed to get left vectors by id", + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -204,15 +201,13 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } log.Debug("OpLeft IdArray not empty, Get vectors by id done", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) vectorsLeft, err = arrangeFunc(opLeft, result.FieldsData) if err != nil { log.Debug("Failed to re-arrange left vectors", - zap.Error(err), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -223,14 +218,12 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } log.Debug("Re-arrange left vectors done", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) } if vectorsLeft == nil { msg := "Left vectors array is empty" log.Debug(msg, - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) return &milvuspb.CalcDistanceResults{ @@ -245,15 +238,13 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi opRight := request.GetOpRight().GetIdArray() if opRight != nil { log.Debug("OpRight IdArray not empty, Get vectors by id", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) result, err := t.queryFunc(opRight) if err != nil { log.Debug("Failed to get right vectors by id", - zap.Error(err), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -264,15 +255,13 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } log.Debug("OpRight IdArray not empty, Get vectors by id done", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) vectorsRight, err = arrangeFunc(opRight, result.FieldsData) if err != nil { log.Debug("Failed to re-arrange right vectors", - zap.Error(err), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -283,15 +272,12 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } log.Debug("Re-arrange right vectors done", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) } if vectorsRight == nil { msg := "Right vectors array is empty" - log.Debug(msg, - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Warn(msg, zap.String("role", typeutil.ProxyRole)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -303,9 +289,7 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi if vectorsLeft.GetDim() != vectorsRight.GetDim() { msg := "Vectors dimension is not equal" - log.Debug(msg, - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Debug(msg, zap.String("role", typeutil.ProxyRole)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -318,14 +302,14 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi if vectorsLeft.GetFloatVector() != nil && vectorsRight.GetFloatVector() != nil { distances, err := distance.CalcFloatDistance(vectorsLeft.GetDim(), vectorsLeft.GetFloatVector().GetData(), vectorsRight.GetFloatVector().GetData(), metric) if err != nil { - log.Debug("Failed to CalcFloatDistance", - zap.Error(err), + log.Warn("Failed to CalcFloatDistance", zap.Int64("leftDim", vectorsLeft.GetDim()), zap.Int("leftLen", len(vectorsLeft.GetFloatVector().GetData())), zap.Int64("rightDim", vectorsRight.GetDim()), zap.Int("rightLen", len(vectorsRight.GetFloatVector().GetData())), zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -336,7 +320,6 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } log.Debug("CalcFloatDistance done", - zap.Error(err), zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) @@ -354,13 +337,12 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi hamming, err := distance.CalcHammingDistance(vectorsLeft.GetDim(), vectorsLeft.GetBinaryVector(), vectorsRight.GetBinaryVector()) if err != nil { log.Debug("Failed to CalcHammingDistance", - zap.Error(err), zap.Int64("leftDim", vectorsLeft.GetDim()), zap.Int("leftLen", len(vectorsLeft.GetBinaryVector())), zap.Int64("rightDim", vectorsRight.GetDim()), zap.Int("rightLen", len(vectorsRight.GetBinaryVector())), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -371,9 +353,7 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } if metric == distance.HAMMING { - log.Debug("CalcHammingDistance done", - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Debug("CalcHammingDistance done", zap.String("role", typeutil.ProxyRole)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success, Reason: ""}, @@ -388,10 +368,9 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi if metric == distance.TANIMOTO { tanimoto, err := distance.CalcTanimotoCoefficient(vectorsLeft.GetDim(), hamming) if err != nil { - log.Debug("Failed to CalcTanimotoCoefficient", - zap.Error(err), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Warn("Failed to CalcTanimotoCoefficient", + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ @@ -402,7 +381,6 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi } log.Debug("CalcTanimotoCoefficient done", - zap.String("traceID", t.traceID), zap.String("role", typeutil.ProxyRole)) return &milvuspb.CalcDistanceResults{ @@ -421,10 +399,9 @@ func (t *calcDistanceTask) Execute(ctx context.Context, request *milvuspb.CalcDi err = errors.New("cannot calculate distance between binary vectors and float vectors") } - log.Debug("Failed to CalcDistance", - zap.Error(err), - zap.String("traceID", t.traceID), - zap.String("role", typeutil.ProxyRole)) + log.Warn("Failed to CalcDistance", + zap.String("role", typeutil.ProxyRole), + zap.Error(err)) return &milvuspb.CalcDistanceResults{ Status: &commonpb.Status{ diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 24118ddcb7..f528c289d1 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -312,10 +312,10 @@ func (dt *deleteTask) Execute(ctx context.Context) (err error) { } log.Debug("send delete request to virtual channels", - zap.String("collection", dt.deleteMsg.GetCollectionName()), - zap.Int64("collection_id", collID), + zap.String("collectionName", dt.deleteMsg.GetCollectionName()), + zap.Int64("collectionID", collID), zap.Strings("virtual_channels", channelNames), - zap.Int64("task_id", dt.ID()), + zap.Int64("taskID", dt.ID()), zap.Duration("prepare duration", tr.RecordSpan())) err = stream.Produce(msgPack) diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index da06aa7738..6c0219ee90 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -110,13 +110,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error { collectionName := it.insertMsg.CollectionName if err := validateCollectionName(collectionName); err != nil { - log.Info("valid collection name failed", zap.String("collectionName", collectionName), zap.Error(err)) + log.Warn("valid collection name failed", zap.String("collectionName", collectionName), zap.Error(err)) return err } schema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName) if err != nil { - log.Error("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err)) + log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err)) return err } it.schema = schema @@ -161,7 +161,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error { it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.insertMsg, true) log := log.Ctx(ctx).With(zap.String("collectionName", collectionName)) if err != nil { - log.Error("check primary field data and hash primary key failed", + log.Warn("check primary field data and hash primary key failed", zap.Error(err)) return err } @@ -183,7 +183,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error { fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema) it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.insertMsg) if err != nil { - log.Info("get partition keys from insert request failed", zap.String("collection name", collectionName), zap.Error(err)) + log.Warn("get partition keys from insert request failed", zap.String("collection name", collectionName), zap.Error(err)) return err } } else { @@ -196,7 +196,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error { } if err := validatePartitionTag(partitionTag, true); err != nil { - log.Info("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err)) + log.Warn("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err)) return err } } @@ -233,7 +233,7 @@ func (it *insertTask) Execute(ctx context.Context) error { channelNames, err := it.chMgr.getVChannels(collID) if err != nil { - log.Ctx(ctx).Error("get vChannels failed", + log.Ctx(ctx).Warn("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError @@ -242,9 +242,9 @@ func (it *insertTask) Execute(ctx context.Context) error { } log.Ctx(ctx).Debug("send insert request to virtual channels", - zap.String("collection", it.insertMsg.GetCollectionName()), + zap.String("collectionName", it.insertMsg.GetCollectionName()), zap.String("partition", it.insertMsg.GetPartitionName()), - zap.Int64("collection_id", collID), + zap.Int64("collectionID", collID), zap.Strings("virtual_channels", channelNames), zap.Int64("task_id", it.ID()), zap.Duration("get cache duration", getCacheDur), @@ -258,7 +258,7 @@ func (it *insertTask) Execute(ctx context.Context) error { msgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.insertMsg, it.result, it.idAllocator, it.segIDAssigner) } if err != nil { - log.Error("assign segmentID and repack insert data failed", + log.Warn("assign segmentID and repack insert data failed", zap.Int64("collectionID", collID), zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 06db426e37..9de6f97e24 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -260,7 +260,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, collectionName) if err != nil { - log.Warn("Failed to get collection id.") + log.Warn("Failed to get collection id.", zap.String("collectionName", collectionName), zap.Error(err)) return err } t.CollectionID = collID @@ -268,7 +268,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { t.partitionKeyMode, err = isPartitionKeyMode(ctx, collectionName) if err != nil { - log.Warn("check partition key mode failed", zap.Int64("collectionID", t.CollectionID)) + log.Warn("check partition key mode failed", zap.Int64("collectionID", t.CollectionID), zap.Error(err)) return err } if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 { @@ -485,7 +485,7 @@ func (t *queryTask) queryShard(ctx context.Context, nodeID int64, qn types.Query return errInvalidShardLeaders } if result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("QueryNode query result error") + log.Warn("QueryNode query result error", zap.Any("errorCode", result.GetStatus().GetErrorCode()), zap.String("reason", result.GetStatus().GetReason())) return fmt.Errorf("fail to Query, QueryNode ID = %d, reason=%s", nodeID, result.GetStatus().GetReason()) } diff --git a/internal/proxy/task_scheduler.go b/internal/proxy/task_scheduler.go index 0435a81bc3..0467dfd4e9 100644 --- a/internal/proxy/task_scheduler.go +++ b/internal/proxy/task_scheduler.go @@ -449,7 +449,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { }() if err != nil { span.RecordError(err) - log.Ctx(ctx).Error("Failed to pre-execute task: " + err.Error()) + log.Ctx(ctx).Warn("Failed to pre-execute task: " + err.Error()) return } @@ -457,7 +457,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { err = t.Execute(ctx) if err != nil { span.RecordError(err) - log.Ctx(ctx).Error("Failed to execute task: ", zap.Error(err)) + log.Ctx(ctx).Warn("Failed to execute task: ", zap.Error(err)) return } @@ -466,7 +466,7 @@ func (sched *taskScheduler) processTask(t task, q taskQueue) { if err != nil { span.RecordError(err) - log.Ctx(ctx).Error("Failed to post-execute task: ", zap.Error(err)) + log.Ctx(ctx).Warn("Failed to post-execute task: ", zap.Error(err)) return } } diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index af8fee34e1..fc54a4ad15 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -878,22 +878,9 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb ret.Results.Scores[k] *= -1 } } - // printSearchResultData(ret.Results, "proxy reduce result") return ret, nil } -// func printSearchResultData(data *schemapb.SearchResultData, header string) { -// size := len(data.GetIds().GetIntId().GetData()) -// if size != len(data.Scores) { -// log.Error("SearchResultData length mis-match") -// } -// log.Debug("==== SearchResultData ====", -// zap.String("header", header), zap.Int64("nq", data.NumQueries), zap.Int64("topk", data.TopK)) -// for i := 0; i < size; i++ { -// log.Debug("", zap.Int("i", i), zap.Int64("id", data.GetIds().GetIntId().Data[i]), zap.Float32("score", data.Scores[i])) -// } -// } - func (t *searchTask) TraceCtx() context.Context { return t.ctx } diff --git a/internal/proxy/task_statistic.go b/internal/proxy/task_statistic.go index 3c7e24b309..fac4b2289a 100644 --- a/internal/proxy/task_statistic.go +++ b/internal/proxy/task_statistic.go @@ -140,28 +140,26 @@ func (g *getStatisticsTask) PreExecute(ctx context.Context) error { // check if collection/partitions are loaded into query node loaded, unloaded, err := checkFullLoaded(ctx, g.qc, g.collectionName, partIDs) + log := log.Ctx(ctx) if err != nil { g.fromDataCoord = true g.unloadedPartitionIDs = partIDs - log.Ctx(ctx).Debug("checkFullLoaded failed, try get statistics from DataCoord", - zap.Error(err)) + log.Info("checkFullLoaded failed, try get statistics from DataCoord", zap.Error(err)) return nil } if len(unloaded) > 0 { g.fromDataCoord = true g.unloadedPartitionIDs = unloaded - log.Debug("some partitions has not been loaded, try get statistics from DataCoord", + log.Info("some partitions has not been loaded, try get statistics from DataCoord", zap.String("collection", g.collectionName), - zap.Int64s("unloaded partitions", unloaded), - zap.Error(err)) + zap.Int64s("unloaded partitions", unloaded)) } if len(loaded) > 0 { g.fromQueryNode = true g.loadedPartitionIDs = loaded - log.Debug("some partitions has been loaded, try get statistics from QueryNode", + log.Info("some partitions has been loaded, try get statistics from QueryNode", zap.String("collection", g.collectionName), - zap.Int64s("loaded partitions", loaded), - zap.Error(err)) + zap.Int64s("loaded partitions", loaded)) } return nil } @@ -228,8 +226,7 @@ func (g *getStatisticsTask) PostExecute(ctx context.Context) error { Stats: result, } - log.Info("get statistics post execute done", - zap.Any("result", result)) + log.Debug("get statistics post execute done", zap.Any("result", result)) return nil } diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 0b79191a60..3610b860a2 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -176,14 +176,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.upsertMsg.InsertMsg, false) log := log.Ctx(ctx).With(zap.String("collectionName", it.upsertMsg.InsertMsg.CollectionName)) if err != nil { - log.Error("check primary field data and hash primary key failed when upsert", + log.Warn("check primary field data and hash primary key failed when upsert", zap.Error(err)) return err } // set field ID to insert field data err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema) if err != nil { - log.Error("insert set fieldID to fieldData failed when upsert", + log.Warn("insert set fieldID to fieldData failed when upsert", zap.Error(err)) return err } @@ -192,7 +192,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { fieldSchema, _ := typeutil.GetPartitionKeyFieldSchema(it.schema) it.partitionKeys, err = getPartitionKeyFieldData(fieldSchema, it.upsertMsg.InsertMsg) if err != nil { - log.Info("get partition keys from insert request failed", + log.Warn("get partition keys from insert request failed", zap.String("collectionName", collectionName), zap.Error(err)) return err @@ -200,7 +200,7 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { } else { partitionTag := it.upsertMsg.InsertMsg.PartitionName if err = validatePartitionTag(partitionTag, true); err != nil { - log.Error("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err)) + log.Warn("valid partition name failed", zap.String("partition name", partitionTag), zap.Error(err)) return err } } @@ -241,12 +241,12 @@ func (it *upsertTask) deletePreExecute(ctx context.Context) error { // partition name could be defaultPartitionName or name specified by sdk partName := it.upsertMsg.DeleteMsg.PartitionName if err := validatePartitionTag(partName, true); err != nil { - log.Info("Invalid partition name", zap.String("partitionName", partName), zap.Error(err)) + log.Warn("Invalid partition name", zap.String("partitionName", partName), zap.Error(err)) return err } partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName) if err != nil { - log.Info("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName), zap.Error(err)) + log.Warn("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName), zap.Error(err)) return err } it.upsertMsg.DeleteMsg.PartitionID = partID @@ -279,7 +279,7 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { schema, err := globalMetaCache.GetCollectionSchema(ctx, collectionName) if err != nil { - log.Info("Failed to get collection schema", + log.Warn("Failed to get collection schema", zap.String("collectionName", collectionName), zap.Error(err)) return err @@ -337,20 +337,20 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { } err = it.insertPreExecute(ctx) if err != nil { - log.Info("Fail to insertPreExecute", zap.Error(err)) + log.Warn("Fail to insertPreExecute", zap.Error(err)) return err } err = it.deletePreExecute(ctx) if err != nil { - log.Info("Fail to deletePreExecute", zap.Error(err)) + log.Warn("Fail to deletePreExecute", zap.Error(err)) return err } it.result.DeleteCnt = it.upsertMsg.DeleteMsg.NumRows it.result.InsertCnt = int64(it.upsertMsg.InsertMsg.NumRows) if it.result.DeleteCnt != it.result.InsertCnt { - log.Error("DeleteCnt and InsertCnt are not the same when upsert", + log.Info("DeleteCnt and InsertCnt are not the same when upsert", zap.Int64("DeleteCnt", it.result.DeleteCnt), zap.Int64("InsertCnt", it.result.InsertCnt)) } @@ -380,7 +380,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP getMsgStreamDur := tr.RecordSpan() channelNames, err := it.chMgr.getVChannels(collID) if err != nil { - log.Error("get vChannels failed when insertExecute", + log.Warn("get vChannels failed when insertExecute", zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() @@ -404,7 +404,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP insertMsgPack, err = repackInsertDataWithPartitionKey(it.TraceCtx(), channelNames, it.partitionKeys, it.upsertMsg.InsertMsg, it.result, it.idAllocator, it.segIDAssigner) } if err != nil { - log.Error("assign segmentID and repack insert data failed when insertExecute", + log.Warn("assign segmentID and repack insert data failed when insertExecute", zap.Error(err)) it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status.Reason = err.Error() @@ -519,13 +519,13 @@ func (it *upsertTask) Execute(ctx context.Context) (err error) { } err = it.insertExecute(ctx, msgPack) if err != nil { - log.Info("Fail to insertExecute", zap.Error(err)) + log.Warn("Fail to insertExecute", zap.Error(err)) return err } err = it.deleteExecute(ctx, msgPack) if err != nil { - log.Info("Fail to deleteExecute", zap.Error(err)) + log.Warn("Fail to deleteExecute", zap.Error(err)) return err }