Refactor log for Query (#26310)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
pull/26343/head
Enwei Jiao 2023-08-14 18:57:32 +08:00 committed by GitHub
parent a9cbf43353
commit 7d61355ab0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 42 additions and 48 deletions

View File

@ -76,7 +76,7 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) {
defer wg.Done()
resp, err := handler.getDistribution(ctx)
if err != nil {
log.Error("SyncAll come across err when getting data distribution", zap.Error(err))
log.Warn("SyncAll come across err when getting data distribution", zap.Error(err))
} else {
handler.handleDistResp(resp)
}

View File

@ -116,7 +116,7 @@ func (job *LoadCollectionJob) Execute() error {
partitionIDs, err := job.broker.GetPartitions(job.ctx, req.GetCollectionID())
if err != nil {
msg := "failed to get partitions from RootCoord"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
loadedPartitionIDs := lo.Map(job.meta.CollectionManager.GetPartitionsByCollection(req.GetCollectionID()),
@ -150,7 +150,7 @@ func (job *LoadCollectionJob) Execute() error {
replicas, err = utils.SpawnReplicasWithRG(job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber())
if err != nil {
msg := "failed to spawn replica for collection"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
for _, replica := range replicas {
@ -192,7 +192,7 @@ func (job *LoadCollectionJob) Execute() error {
err = job.meta.CollectionManager.PutCollection(collection, partitions...)
if err != nil {
msg := "failed to store collection and partitions"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Start load collection %d", collection.CollectionID)))
@ -323,7 +323,7 @@ func (job *LoadPartitionJob) Execute() error {
replicas, err = utils.SpawnReplicasWithRG(job.meta, req.GetCollectionID(), req.GetResourceGroups(), req.GetReplicaNumber())
if err != nil {
msg := "failed to spawn replica for collection"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
for _, replica := range replicas {
@ -366,14 +366,14 @@ func (job *LoadPartitionJob) Execute() error {
err = job.meta.CollectionManager.PutCollection(collection, partitions...)
if err != nil {
msg := "failed to store collection and partitions"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
} else { // collection exists, put partitions only
err = job.meta.CollectionManager.PutPartition(partitions...)
if err != nil {
msg := "failed to store partitions"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}
}

View File

@ -92,7 +92,7 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
err = job.meta.CollectionManager.PutPartition(partition)
if err != nil {
msg := "failed to store partitions"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}

View File

@ -199,7 +199,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
err = m.upgradeRecover(broker)
if err != nil {
log.Error("upgrade recover failed", zap.Error(err))
log.Warn("upgrade recover failed", zap.Error(err))
return err
}
return nil

View File

@ -86,7 +86,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = errors.New(resp.GetStatus().GetReason())
log.Error("failed to get collection schema", zap.Int64("collectionID", collectionID), zap.Error(err))
log.Warn("failed to get collection schema", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
return resp.GetSchema(), nil
@ -135,13 +135,13 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection
}
recoveryInfo, err := broker.dataCoord.GetRecoveryInfo(ctx, getRecoveryInfoRequest)
if err != nil {
log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
return nil, nil, err
}
if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = errors.New(recoveryInfo.GetStatus().GetReason())
log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
return nil, nil, err
}
@ -161,13 +161,13 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti
}
recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest)
if err != nil {
log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
return nil, nil, err
}
if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = errors.New(recoveryInfo.GetStatus().GetReason())
log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err))
return nil, nil, err
}
@ -184,7 +184,7 @@ func (broker *CoordinatorBroker) GetSegmentInfo(ctx context.Context, ids ...Uniq
}
resp, err := broker.dataCoord.GetSegmentInfo(ctx, req)
if err != nil {
log.Error("failed to get segment info from DataCoord",
log.Warn("failed to get segment info from DataCoord",
zap.Int64s("segments", ids),
zap.Error(err))
return nil, err
@ -208,7 +208,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
SegmentIDs: []int64{segmentID},
})
if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("failed to get segment index info",
log.Warn("failed to get segment index info",
zap.Int64("collection", collectionID),
zap.Int64("segment", segmentID),
zap.Error(err))

View File

@ -100,8 +100,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
zap.Int64s("PartitionIDs", partitionIDs))
segments, channels, err := mgr.PullNextTargetV2(mgr.broker, collectionID, partitionIDs...)
if err != nil {
log.Error("failed to get next targets for collection",
zap.Error(err))
log.Warn("failed to get next targets for collection", zap.Error(err))
return err
}

View File

@ -244,12 +244,12 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
schema, err := o.broker.GetCollectionSchema(ctx, leaderView.CollectionID)
if err != nil {
log.Error("sync distribution failed, cannot get schema of collection", zap.Error(err))
log.Warn("sync distribution failed, cannot get schema of collection", zap.Error(err))
return false
}
partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Error("sync distribution failed, cannot get partitions of collection", zap.Error(err))
log.Warn("sync distribution failed, cannot get partitions of collection", zap.Error(err))
return false
}
@ -271,12 +271,12 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
}
resp, err := o.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
log.Error("failed to sync distribution", zap.Error(err))
log.Warn("failed to sync distribution", zap.Error(err))
return false
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Error("failed to sync distribution", zap.String("reason", resp.GetReason()))
log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason()))
return false
}

View File

@ -219,7 +219,7 @@ func (ob *TargetObserver) updateNextTarget(collectionID int64) error {
log.Info("observer trigger update next target")
err := ob.targetMgr.UpdateCollectionNextTarget(collectionID)
if err != nil {
log.Error("failed to update next target for collection",
log.Warn("failed to update next target for collection",
zap.Error(err))
return err
}

View File

@ -310,7 +310,7 @@ func (s *Server) initMeta() error {
log.Info("recover meta...")
err := s.meta.CollectionManager.Recover(s.broker)
if err != nil {
log.Error("failed to recover collections")
log.Warn("failed to recover collections")
return err
}
collections := s.meta.GetAll()
@ -323,13 +323,13 @@ func (s *Server) initMeta() error {
err = s.meta.ReplicaManager.Recover(collections)
if err != nil {
log.Error("failed to recover replicas")
log.Warn("failed to recover replicas")
return err
}
err = s.meta.ResourceManager.Recover()
if err != nil {
log.Error("failed to recover resource groups")
log.Warn("failed to recover resource groups")
return err
}
@ -619,7 +619,7 @@ func (s *Server) watchNodes(revision int64) {
case event, ok := <-eventChan:
if !ok {
// ErrCompacted is handled inside SessionWatcher
log.Error("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
log.Warn("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
go s.Stop()
if s.session.TriggerKill {
if p, err := os.FindProcess(os.Getpid()); err == nil {

View File

@ -277,7 +277,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
err := releaseJob.Wait()
if err != nil {
msg := "failed to release collection"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(errors.Wrap(err, msg)), nil
}
@ -402,7 +402,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
err := releaseJob.Wait()
if err != nil {
msg := "failed to release partitions"
log.Error(msg, zap.Error(err))
log.Warn(msg, zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return merr.Status(errors.Wrap(err, msg)), nil
}

View File

@ -545,7 +545,7 @@ func (scheduler *taskScheduler) schedule(node int64) {
zap.Int("toRemoveNum", len(toRemove)),
)
log.Debug("process tasks related to node done",
log.Info("process tasks related to node done",
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),

View File

@ -122,12 +122,12 @@ func AssignNodesToReplicas(m *meta.Meta, rgName string, replicas ...*meta.Replic
nodeGroup, err := m.ResourceManager.GetNodes(rgName)
if err != nil {
log.Error("failed to get nodes", zap.Error(err))
log.Warn("failed to get nodes", zap.Error(err))
return err
}
if len(nodeGroup) < len(replicaIDs) {
log.Error(meta.ErrNodeNotEnough.Error())
log.Warn(meta.ErrNodeNotEnough.Error(), zap.Error(meta.ErrNodeNotEnough))
return meta.ErrNodeNotEnough
}

View File

@ -255,8 +255,7 @@ func (d *distribution) SyncTargetVersion(newVersion int64, growingInTarget []int
for _, segmentID := range sealedInTarget {
entry, ok := d.sealedSegments[segmentID]
if !ok {
log.Error("readable sealed segment lost, make it unserviceable",
zap.Int64("segmentID", segmentID))
log.Warn("readable sealed segment lost, make it unserviceable", zap.Int64("segmentID", segmentID))
available = false
continue
}

View File

@ -63,7 +63,7 @@ func (m *manager) Add(collectionID UniqueID, channel string) (Pipeline, error) {
m.mu.Lock()
defer m.mu.Unlock()
log.Debug("start create pipeine",
log.Info("start create pipeine",
zap.Int64("collectionID", collectionID),
zap.String("channel", channel),
)

View File

@ -189,7 +189,7 @@ func SelectSearchResultData(dataArray []*schemapb.SearchResultData, resultOffset
if sel == -1 {
// A bad case happens where knowhere returns distance == +/-maxFloat32
// by mistake.
log.Error("a bad distance is found, something is wrong here!", zap.Float32("score", distance))
log.Warn("a bad distance is found, something is wrong here!", zap.Float32("score", distance))
} else if typeutil.ComparePK(
typeutil.GetPK(dataArray[i].GetIds(), idx),
typeutil.GetPK(dataArray[sel].GetIds(), resultDataIdx)) {

View File

@ -184,7 +184,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
}
segment, err := NewSegment(collection, segmentID, partitionID, collectionID, shard, segmentType, version, info.GetStartPosition(), info.GetDeltaPosition())
if err != nil {
log.Error("load segment failed when create new segment",
log.Warn("load segment failed when create new segment",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Error(err),
@ -205,7 +205,7 @@ func (loader *segmentLoader) Load(ctx context.Context,
tr := timerecord.NewTimeRecorder("loadDurationPerSegment")
err := loader.loadSegment(ctx, segment, loadInfo)
if err != nil {
log.Error("load segment failed when load data into memory",
log.Warn("load segment failed when load data into memory",
zap.Int64("partitionID", partitionID),
zap.Int64("segmentID", segmentID),
zap.Error(err),
@ -864,7 +864,7 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn
if fieldIndexInfo, ok := vecFieldID2IndexInfo[fieldID]; ok {
neededMemSize, neededDiskSize, err := GetIndexResourceUsage(fieldIndexInfo)
if err != nil {
log.Error("failed to get index size",
log.Warn("failed to get index size",
zap.Int64("collectionID", loadInfo.CollectionID),
zap.Int64("segmentID", loadInfo.SegmentID),
zap.Int64("indexBuildID", fieldIndexInfo.BuildID),

View File

@ -172,9 +172,6 @@ func (node *QueryNode) Register() error {
}
}
})
// TODO Reset the logger
// paramtable.Get().initLogCfg()
return nil
}
@ -477,13 +474,13 @@ func (node *QueryNode) initHook() error {
if path == "" {
return fmt.Errorf("fail to set the plugin path")
}
log.Debug("start to load plugin", zap.String("path", path))
log.Info("start to load plugin", zap.String("path", path))
p, err := plugin.Open(path)
if err != nil {
return fmt.Errorf("fail to open the plugin, error: %s", err.Error())
}
log.Debug("plugin open")
log.Info("plugin open")
h, err := p.Lookup("QueryNodePlugin")
if err != nil {
@ -520,11 +517,11 @@ func (node *QueryNode) handleQueryHookEvent() {
realKey := strings.TrimPrefix(event.Key, paramtable.Get().HookCfg.QueryNodePluginTuningConfig.KeyPrefix)
if event.EventType == config.CreateType || event.EventType == config.UpdateType {
if err := node.queryHook.InitTuningConfig(map[string]string{realKey: event.Value}); err != nil {
log.Error("failed to refresh hook tuning config", zap.Error(err))
log.Warn("failed to refresh hook tuning config", zap.Error(err))
}
} else if event.EventType == config.DeleteType {
if err := node.queryHook.DeleteTuningConfig(realKey); err != nil {
log.Error("failed to delete hook tuning config", zap.Error(err))
log.Warn("failed to delete hook tuning config", zap.Error(err))
}
}
}

View File

@ -75,7 +75,6 @@ func (node *QueryNode) GetComponentStates(ctx context.Context) (*milvuspb.Compon
StateCode: code,
}
stats.State = info
log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
return stats, nil
}
@ -1125,7 +1124,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
}
log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("nodeId", paramtable.GetNodeID()),
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.String("metricType", metricType))