enhance:change some logs (#29579)

related #29588

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
pull/29643/head
smellthemoon 2024-01-05 16:12:48 +08:00 committed by GitHub
parent 07057fcf7c
commit 1c1f2a1371
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 89 additions and 88 deletions

View File

@ -256,8 +256,8 @@ func (ta *CachedAllocator) failRemainRequest() {
}
if len(ta.ToDoReqs) > 0 {
log.Warn("Allocator has some reqs to fail",
zap.Any("Role", ta.Role),
zap.Any("reqLen", len(ta.ToDoReqs)))
zap.String("Role", ta.Role),
zap.Int("reqLen", len(ta.ToDoReqs)))
}
for _, req := range ta.ToDoReqs {
if req != nil {

View File

@ -719,7 +719,7 @@ func (c *ChannelManagerImpl) watchChannelStatesLoop(ctx context.Context, revisio
return
case ackEvent := <-timeoutWatcher:
log.Info("receive timeout acks from state watcher",
zap.Any("state", ackEvent.ackType),
zap.Int("state", ackEvent.ackType),
zap.Int64("nodeID", ackEvent.nodeID), zap.String("channelName", ackEvent.channelName))
c.processAck(ackEvent)
case event, ok := <-etcdWatcher:

View File

@ -152,7 +152,7 @@ func (s *LevelZeroSegmentsViewSuite) TestTrigger() {
return v.ID
})
s.ElementsMatch(gotSegIDs, test.expectedSegs)
log.Info("trigger reason", zap.Any("trigger reason", reason))
log.Info("trigger reason", zap.String("trigger reason", reason))
}
})
}

View File

@ -553,7 +553,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
}
log.Info("time cost of generating compaction",
zap.Int64("plan ID", plan.PlanID),
zap.Any("time cost", time.Since(start).Milliseconds()),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", channel),
zap.Int64("partitionID", partitionID),

View File

@ -553,7 +553,7 @@ func (s *SegmentManager) cleanupSealedSegment(ts Timestamp, channel string) {
}
if isEmptySealedSegment(segment, ts) {
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Any("segment", id))
log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id))
s.meta.SetState(id, commonpb.SegmentState_Dropped)
continue
}

View File

@ -784,7 +784,7 @@ func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.Dat
}
err = s.cluster.Flush(s.ctx, ttMsg.GetBase().GetSourceID(), ch, finfo)
if err != nil {
log.Warn("failed to handle flush", zap.Any("source", ttMsg.GetBase().GetSourceID()), zap.Error(err))
log.Warn("failed to handle flush", zap.Int64("source", ttMsg.GetBase().GetSourceID()), zap.Error(err))
return err
}
@ -1011,7 +1011,7 @@ func (s *Server) startFlushLoop(ctx context.Context) {
log.Info("flush successfully", zap.Any("segmentID", segmentID))
err := s.postFlush(ctx, segmentID)
if err != nil {
log.Warn("failed to do post flush", zap.Any("segmentID", segmentID), zap.Error(err))
log.Warn("failed to do post flush", zap.Int64("segmentID", segmentID), zap.Error(err))
}
}
}

View File

@ -200,7 +200,7 @@ func (c *SessionManagerImpl) Compaction(nodeID int64, plan *datapb.CompactionPla
return err
}
log.Info("success to execute compaction", zap.Int64("node", nodeID), zap.Any("planID", plan.GetPlanID()))
log.Info("success to execute compaction", zap.Int64("node", nodeID), zap.Int64("planID", plan.GetPlanID()))
return nil
}

View File

@ -267,7 +267,7 @@ func TestBinlogIOInnerMethods(t *testing.T) {
assert.Equal(t, 12, len(kvs))
log.Debug("test paths",
zap.Any("kvs no.", len(kvs)),
zap.Int("kvs no.", len(kvs)),
zap.String("insert paths field0", pin[common.TimeStampField].GetBinlogs()[0].GetLogPath()))
})
}

View File

@ -120,7 +120,7 @@ func (node *DataNode) handleWatchInfo(e *event, key string, data []byte) {
e.info = watchInfo
e.vChanName = watchInfo.GetVchan().GetChannelName()
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.Any("watch state", watchInfo.GetState().String()))
log.Info("DataNode is handling watchInfo PUT event", zap.String("key", key), zap.String("watch state", watchInfo.GetState().String()))
case deleteEventType:
e.vChanName = parseDeleteEventKey(key)
log.Info("DataNode is handling watchInfo DELETE event", zap.String("key", key))

View File

@ -98,19 +98,19 @@ func (cm *ConnectionManager) AddDependency(roleName string) error {
_, ok := cm.dependencies[roleName]
if ok {
log.Warn("Dependency is already added", zap.Any("roleName", roleName))
log.Warn("Dependency is already added", zap.String("roleName", roleName))
return nil
}
cm.dependencies[roleName] = struct{}{}
msess, rev, err := cm.session.GetSessions(roleName)
if err != nil {
log.Debug("ClientManager GetSessions failed", zap.Any("roleName", roleName))
log.Debug("ClientManager GetSessions failed", zap.String("roleName", roleName))
return err
}
if len(msess) == 0 {
log.Debug("No nodes are currently alive", zap.Any("roleName", roleName))
log.Debug("No nodes are currently alive", zap.String("roleName", roleName))
} else {
for _, value := range msess {
cm.buildConnections(value)
@ -254,12 +254,12 @@ func (cm *ConnectionManager) receiveFinishTask() {
case serverID := <-cm.notify:
cm.taskMu.Lock()
task, ok := cm.buildTasks[serverID]
log.Debug("ConnectionManager", zap.Any("receive finish", serverID))
log.Debug("ConnectionManager", zap.Int64("receive finish", serverID))
if ok {
log.Debug("ConnectionManager", zap.Any("get task ok", serverID))
log.Debug("ConnectionManager", zap.Int64("get task ok", serverID))
log.Debug("ConnectionManager", zap.Any("task state", task.state))
if task.state == buildClientSuccess {
log.Debug("ConnectionManager", zap.Any("build success", serverID))
log.Debug("ConnectionManager", zap.Int64("build success", serverID))
cm.addConnection(task.sess.ServerID, task.result)
cm.buildClients(task.sess, task.result)
}
@ -410,10 +410,10 @@ func (bct *buildClientTask) Run() {
}
err := retry.Do(bct.ctx, connectGrpcFunc, bct.retryOptions...)
log.Debug("ConnectionManager", zap.Any("build connection finish", bct.sess.ServerID))
log.Debug("ConnectionManager", zap.Int64("build connection finish", bct.sess.ServerID))
if err != nil {
log.Debug("BuildClientTask try connect failed",
zap.Any("roleName", bct.sess.ServerName), zap.Error(err))
zap.String("roleName", bct.sess.ServerName), zap.Error(err))
bct.state = buildClientFailed
return
}
@ -425,7 +425,7 @@ func (bct *buildClientTask) Stop() {
}
func (bct *buildClientTask) finish() {
log.Debug("ConnectionManager", zap.Any("notify connection finish", bct.sess.ServerID))
log.Debug("ConnectionManager", zap.Int64("notify connection finish", bct.sess.ServerID))
bct.notify <- bct.sess.ServerID
}

View File

@ -164,7 +164,7 @@ func (s *Server) init() error {
// wait for grpc server loop start
err = <-s.grpcErrChan
if err != nil {
log.Error("IndexNode", zap.Any("grpc error", err))
log.Error("IndexNode", zap.Error(err))
return err
}

View File

@ -230,7 +230,7 @@ func (i *IndexNode) Start() error {
startErr = i.sched.Start()
i.UpdateStateCode(commonpb.StateCode_Healthy)
log.Info("IndexNode", zap.Any("State", i.lifetime.GetState().String()))
log.Info("IndexNode", zap.String("State", i.lifetime.GetState().String()))
})
log.Info("IndexNode start finished", zap.Error(startErr))

View File

@ -108,7 +108,7 @@ func (queue *IndexTaskQueue) AddActiveTask(t task) {
tName := t.Name()
_, ok := queue.activeTasks[tName]
if ok {
log.Debug("IndexNode task already in active task list", zap.Any("TaskID", tName))
log.Debug("IndexNode task already in active task list", zap.String("TaskID", tName))
}
queue.activeTasks[tName] = t

View File

@ -78,7 +78,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
return nil, err
}
if exist {
log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.Any("SubscriptionName", options.SubscriptionName))
log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.String("SubscriptionName", options.SubscriptionName))
consumer, err := getExistedConsumer(c, options, con.MsgMutex)
if err != nil {
return nil, err

View File

@ -132,7 +132,7 @@ func (c *consumer) Close() {
// TODO should panic?
err := c.client.server.DestroyConsumerGroup(c.topic, c.consumerName)
if err != nil {
log.Warn("Consumer close failed", zap.Any("topicName", c.topic), zap.Any("groupName", c.consumerName), zap.Any("error", err))
log.Warn("Consumer close failed", zap.String("topicName", c.topic), zap.String("groupName", c.consumerName), zap.Error(err))
}
}

View File

@ -80,6 +80,6 @@ func (p *producer) Send(message *mqwrapper.ProducerMessage) (UniqueID, error) {
func (p *producer) Close() {
err := p.c.server.DestroyTopic(p.topic)
if err != nil {
log.Warn("Producer close failed", zap.Any("topicName", p.topic), zap.Any("error", err))
log.Warn("Producer close failed", zap.String("topicName", p.topic), zap.Error(err))
}
}

View File

@ -58,11 +58,11 @@ func removePath(rmqPath string) {
rocksdbPath := rmqPath
err := os.RemoveAll(rocksdbPath)
if err != nil {
log.Error("Failed to call os.removeAll.", zap.Any("path", rocksdbPath))
log.Error("Failed to call os.removeAll.", zap.String("path", rocksdbPath))
}
metaPath := rmqPath + "_meta_kv"
err = os.RemoveAll(metaPath)
if err != nil {
log.Error("Failed to call os.removeAll.", zap.Any("path", metaPath))
log.Error("Failed to call os.removeAll.", zap.String("path", metaPath))
}
}

View File

@ -307,7 +307,7 @@ func (rmq *rocksmq) Close() {
for _, consumer := range v.([]*Consumer) {
err := rmq.destroyConsumerGroupInternal(consumer.Topic, consumer.GroupName)
if err != nil {
log.Warn("Failed to destroy consumer group in rocksmq!", zap.Any("topic", consumer.Topic), zap.Any("groupName", consumer.GroupName), zap.Any("error", err))
log.Warn("Failed to destroy consumer group in rocksmq!", zap.String("topic", consumer.Topic), zap.String("groupName", consumer.GroupName), zap.Error(err))
}
}
return true

View File

@ -143,8 +143,8 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
}
// Quick Path, No page to check
if totalAckedSize == 0 {
log.Debug("All messages are not expired, skip retention because no ack", zap.Any("topic", topic),
zap.Any("time taken", time.Since(start).Milliseconds()))
log.Debug("All messages are not expired, skip retention because no ack", zap.String("topic", topic),
zap.Int64("time taken", time.Since(start).Milliseconds()))
return nil
}
pageReadOpts := gorocksdb.NewDefaultReadOptions()
@ -232,13 +232,13 @@ func (ri *retentionInfo) expiredCleanUp(topic string) error {
}
if pageEndID == 0 {
log.Debug("All messages are not expired, skip retention", zap.Any("topic", topic), zap.Any("time taken", time.Since(start).Milliseconds()))
log.Debug("All messages are not expired, skip retention", zap.String("topic", topic), zap.Int64("time taken", time.Since(start).Milliseconds()))
return nil
}
expireTime := time.Since(start).Milliseconds()
log.Debug("Expired check by message size: ", zap.Any("topic", topic),
zap.Any("pageEndID", pageEndID), zap.Any("deletedAckedSize", deletedAckedSize),
zap.Any("pageCleaned", pageCleaned), zap.Any("time taken", expireTime))
log.Debug("Expired check by message size: ", zap.String("topic", topic),
zap.Int64("pageEndID", pageEndID), zap.Int64("deletedAckedSize", deletedAckedSize),
zap.Int64("pageCleaned", pageCleaned), zap.Int64("time taken", expireTime))
return ri.cleanData(topic, pageEndID)
}

View File

@ -122,7 +122,7 @@ func newMinioClient(ctx context.Context, cfg config) (*minio.Client, error) {
}
if !bucketExists {
if cfg.createBucket {
log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", cfg.bucketName))
log.Info("blob bucket not exist, create bucket.", zap.String("bucket name", cfg.bucketName))
err := minioClient.MakeBucket(ctx, cfg.bucketName, minio.MakeBucketOptions{})
if err != nil {
log.Warn("failed to create blob bucket", zap.String("bucket", cfg.bucketName), zap.Error(err))

View File

@ -100,8 +100,8 @@ func NewRotateLogger(logCfg *paramtable.AccessLogConfig, minioCfg *paramtable.Mi
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
log.Debug("remtepath", zap.Any("remote", logCfg.RemotePath.GetValue()))
log.Debug("maxBackups", zap.Any("maxBackups", logCfg.MaxBackups.GetValue()))
log.Debug("remtepath", zap.String("remote", logCfg.RemotePath.GetValue()))
log.Debug("maxBackups", zap.String("maxBackups", logCfg.MaxBackups.GetValue()))
handler, err := NewMinioHandler(ctx, minioCfg, logCfg.RemotePath.GetValue(), logCfg.MaxBackups.GetAsInt())
if err != nil {
return nil, err

View File

@ -470,7 +470,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, database, collectio
metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
log.Debug("Reload collection from root coordinator ",
zap.String("collectionName", collectionName),
zap.Any("time (milliseconds) take ", tr.ElapseSpan().Milliseconds()))
zap.Int64("time (milliseconds) take ", tr.ElapseSpan().Milliseconds()))
return collInfo.schema, nil
}
defer m.mu.RUnlock()

View File

@ -82,8 +82,8 @@ func (info *segInfo) Capacity(ts Timestamp) uint32 {
func (info *segInfo) Assign(ts Timestamp, count uint32) uint32 {
if info.IsExpired(ts) {
log.Debug("segInfo Assign IsExpired", zap.Any("ts", ts),
zap.Any("count", count))
log.Debug("segInfo Assign IsExpired", zap.Uint64("ts", ts),
zap.Uint32("count", count))
return 0
}
ret := uint32(0)
@ -229,8 +229,8 @@ func (sa *segIDAssigner) pickCanDoFunc() {
}
}
log.Debug("Proxy segIDAssigner pickCanDoFunc", zap.Any("records", records),
zap.Any("len(newTodoReqs)", len(newTodoReqs)),
zap.Any("len(CanDoReqs)", len(sa.CanDoReqs)))
zap.Int("len(newTodoReqs)", len(newTodoReqs)),
zap.Int("len(CanDoReqs)", len(sa.CanDoReqs)))
sa.ToDoReqs = newTodoReqs
}
@ -268,7 +268,7 @@ func (sa *segIDAssigner) checkSegReqEqual(req1, req2 *datapb.SegmentIDRequest) b
}
func (sa *segIDAssigner) reduceSegReqs() {
log.Debug("Proxy segIDAssigner reduceSegReqs", zap.Any("len(segReqs)", len(sa.segReqs)))
log.Debug("Proxy segIDAssigner reduceSegReqs", zap.Int("len(segReqs)", len(sa.segReqs)))
if len(sa.segReqs) == 0 {
return
}
@ -298,9 +298,9 @@ func (sa *segIDAssigner) reduceSegReqs() {
afterCnt += req.Count
}
sa.segReqs = newSegReqs
log.Debug("Proxy segIDAssigner reduceSegReqs after reduce", zap.Any("len(segReqs)", len(sa.segReqs)),
zap.Any("BeforeCnt", beforeCnt),
zap.Any("AfterCnt", afterCnt))
log.Debug("Proxy segIDAssigner reduceSegReqs after reduce", zap.Int("len(segReqs)", len(sa.segReqs)),
zap.Uint32("BeforeCnt", beforeCnt),
zap.Uint32("AfterCnt", afterCnt))
}
func (sa *segIDAssigner) syncSegments() (bool, error) {

View File

@ -679,8 +679,8 @@ func (t *showCollectionsTask) Execute(ctx context.Context) error {
for _, collectionName := range t.CollectionNames {
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName)
if err != nil {
log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections"))
log.Debug("Failed to get collection id.", zap.String("collectionName", collectionName),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showCollections"))
return err
}
collectionIDs = append(collectionIDs, collectionID)
@ -726,14 +726,14 @@ func (t *showCollectionsTask) Execute(ctx context.Context) error {
collectionName, ok := IDs2Names[id]
if !ok {
log.Debug("Failed to get collection info. This collection may be not released",
zap.Any("collectionID", id),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections"))
zap.Int64("collectionID", id),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showCollections"))
continue
}
collectionInfo, err := globalMetaCache.GetCollectionInfo(ctx, t.GetDbName(), collectionName, id)
if err != nil {
log.Debug("Failed to get collection info.", zap.Any("collectionName", collectionName),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showCollections"))
log.Debug("Failed to get collection info.", zap.String("collectionName", collectionName),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showCollections"))
return err
}
t.result.CollectionIds = append(t.result.CollectionIds, id)
@ -1178,8 +1178,8 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error {
collectionName := t.CollectionName
collectionID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collectionName)
if err != nil {
log.Debug("Failed to get collection id.", zap.Any("collectionName", collectionName),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions"))
log.Debug("Failed to get collection id.", zap.String("collectionName", collectionName),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions"))
return err
}
IDs2Names := make(map[UniqueID]string)
@ -1191,8 +1191,8 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error {
for _, partitionName := range t.PartitionNames {
partitionID, err := globalMetaCache.GetPartitionID(ctx, t.GetDbName(), collectionName, partitionName)
if err != nil {
log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions"))
log.Debug("Failed to get partition id.", zap.String("partitionName", partitionName),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions"))
return err
}
partitionIDs = append(partitionIDs, partitionID)
@ -1230,14 +1230,14 @@ func (t *showPartitionsTask) Execute(ctx context.Context) error {
for offset, id := range resp.PartitionIDs {
partitionName, ok := IDs2Names[id]
if !ok {
log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions"))
log.Debug("Failed to get partition id.", zap.String("partitionName", partitionName),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions"))
return errors.New("failed to show partitions")
}
partitionInfo, err := globalMetaCache.GetPartitionInfo(ctx, t.GetDbName(), collectionName, partitionName)
if err != nil {
log.Debug("Failed to get partition id.", zap.Any("partitionName", partitionName),
zap.Any("requestID", t.Base.MsgID), zap.Any("requestType", "showPartitions"))
log.Debug("Failed to get partition id.", zap.String("partitionName", partitionName),
zap.Int64("requestID", t.Base.MsgID), zap.String("requestType", "showPartitions"))
return err
}
t.result.PartitionIDs = append(t.result.PartitionIDs, id)

View File

@ -263,10 +263,10 @@ func (queue *dmTaskQueue) PopActiveTask(taskID UniqueID) task {
defer queue.statsLock.Unlock()
delete(queue.activeTasks, taskID)
log.Debug("Proxy dmTaskQueue popPChanStats", zap.Any("taskID", t.ID()))
log.Debug("Proxy dmTaskQueue popPChanStats", zap.Int64("taskID", t.ID()))
queue.popPChanStats(t)
} else {
log.Warn("Proxy task not in active task list!", zap.Any("taskID", taskID))
log.Warn("Proxy task not in active task list!", zap.Int64("taskID", taskID))
}
return t
}

View File

@ -854,7 +854,7 @@ func reduceSearchResultData(ctx context.Context, subSearchResultData []*schemapb
zap.Int64("nq", sData.NumQueries),
zap.Int64("topk", sData.TopK),
zap.Int("length of pks", pkLength),
zap.Any("length of FieldsData", len(sData.FieldsData)))
zap.Int("length of FieldsData", len(sData.FieldsData)))
if err := checkSearchResultData(sData, nq, topk); err != nil {
log.Ctx(ctx).Warn("invalid search results", zap.Error(err))
return ret, err

View File

@ -120,7 +120,7 @@ func (m *manager) Remove(channels ...string) {
pipeline.Close()
delete(m.channel2Pipeline, channel)
} else {
log.Warn("pipeline to be removed doesn't existed", zap.Any("channel", channel))
log.Warn("pipeline to be removed doesn't existed", zap.String("channel", channel))
}
}
metrics.QueryNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()

View File

@ -626,7 +626,7 @@ func (c *Core) initBuiltinRoles() error {
for role, privilegesJSON := range rolePrivilegesMap {
err := c.meta.CreateRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: role})
if err != nil && !common.IsIgnorableError(err) {
log.Error("create a builtin role fail", zap.Any("error", err), zap.String("roleName", role))
log.Error("create a builtin role fail", zap.String("roleName", role), zap.Error(err))
return errors.Wrapf(err, "failed to create a builtin role: %s", role)
}
for _, privilege := range privilegesJSON[util.RoleConfigPrivileges] {
@ -645,7 +645,7 @@ func (c *Core) initBuiltinRoles() error {
},
}, milvuspb.OperatePrivilegeType_Grant)
if err != nil && !common.IsIgnorableError(err) {
log.Error("grant privilege to builtin role fail", zap.Any("error", err), zap.String("roleName", role), zap.Any("privilege", privilege))
log.Error("grant privilege to builtin role fail", zap.String("roleName", role), zap.Any("privilege", privilege), zap.Error(err))
return errors.Wrapf(err, "failed to grant privilege: <%s, %s, %s> of db: %s to role: %s", privilege[util.RoleConfigObjectType], privilege[util.RoleConfigObjectName], privilege[util.RoleConfigPrivilege], privilege[util.RoleConfigDBName], role)
}
}
@ -2039,7 +2039,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
// Here ir.GetState() == commonpb.ImportState_ImportPersisted
// Seal these import segments, so they can be auto-flushed later.
log.Info("an import task turns to persisted state, flush segments to be sealed",
zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments()))
zap.Int64("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments()))
if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil {
log.Error("failed to call Flush on bulk insert segments",
zap.Int64("task ID", ir.GetTaskId()))

View File

@ -166,7 +166,7 @@ func (t *timetickSync) sendToChannel() bool {
// give warning every 2 second if not get ttMsg from source sessions
if maxCnt%10 == 0 {
log.Warn("session idle for long time", zap.Any("idle list", idleSessionList),
zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.GetAsInt64()*time.Millisecond.Milliseconds()*maxCnt))
zap.Int64("idle time", Params.ProxyCfg.TimeTickInterval.GetAsInt64()*time.Millisecond.Milliseconds()*maxCnt))
}
return false
}

View File

@ -118,7 +118,7 @@ func newMinioClient(ctx context.Context, c *config) (*minio.Client, error) {
}
if !bucketExists {
if c.createBucket {
log.Info("blob bucket not exist, create bucket.", zap.Any("bucket name", c.bucketName))
log.Info("blob bucket not exist, create bucket.", zap.String("bucket name", c.bucketName))
err := minIOClient.MakeBucket(ctx, c.bucketName, minio.MakeBucketOptions{})
if err != nil {
log.Warn("failed to create blob bucket", zap.String("bucket", c.bucketName), zap.Error(err))

View File

@ -77,7 +77,7 @@ func (inNode *InputNode) SetCloseMethod(gracefully bool) {
log.Info("input node close method set",
zap.String("node", inNode.Name()),
zap.Int64("collection", inNode.collectionID),
zap.Any("gracefully", gracefully))
zap.Bool("gracefully", gracefully))
}
// Operate consume a message pack from msgstream and return

View File

@ -180,7 +180,7 @@ func (p *BinlogAdapter) Read(segmentHolder *SegmentFilesHolder) error {
// read timestamps list
timestampLog := segmentHolder.fieldFiles[common.TimeStampField][i] // no need to check existence, already verified
log.Info("Binlog adapter: prepare to read timestamp binglog", zap.Any("logPath", timestampLog))
log.Info("Binlog adapter: prepare to read timestamp binglog", zap.String("logPath", timestampLog))
timestampList, err := p.readTimestamp(timestampLog)
if err != nil {
return err

View File

@ -485,15 +485,16 @@ func (s *Session) registerService() (<-chan *clientv3.LeaseKeepAliveResponse, er
// If we find previous session have same address as current , simply purge the old one so the recovery can be much faster
func (s *Session) handleRestart(key string) {
resp, err := s.etcdCli.Get(s.ctx, key)
log := log.With(zap.String("key", key))
if err != nil {
log.Warn("failed to read old session from etcd, ignore", zap.Any("key", key), zap.Error(err))
log.Warn("failed to read old session from etcd, ignore", zap.Error(err))
return
}
for _, kv := range resp.Kvs {
session := &Session{}
err = json.Unmarshal(kv.Value, session)
if err != nil {
log.Warn("failed to unmarshal old session from etcd, ignore", zap.Any("key", key), zap.Error(err))
log.Warn("failed to unmarshal old session from etcd, ignore", zap.Error(err))
return
}
@ -502,7 +503,7 @@ func (s *Session) handleRestart(key string) {
zap.String("address", session.Address))
_, err := s.etcdCli.Delete(s.ctx, key)
if err != nil {
log.Warn("failed to unmarshal old session from etcd, ignore", zap.Any("key", key), zap.Error(err))
log.Warn("failed to unmarshal old session from etcd, ignore", zap.Error(err))
return
}
}

View File

@ -169,7 +169,7 @@ func (f *PmsFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, st
}
}
log.Warn("failed to clean up subscriptions", zap.String("pulsar web", f.PulsarWebAddress),
zap.String("topic", channel), zap.Any("subname", subname), zap.Error(err))
zap.String("topic", channel), zap.String("subname", subname), zap.Error(err))
}
}
return nil

View File

@ -124,7 +124,7 @@ func (kc *kafkaClient) getKafkaProducer() (*kafka.Producer, error) {
// authentication issues, etc.
// After a fatal error has been raised, any subsequent Produce*() calls will fail with
// the original error code.
log.Error("kafka error", zap.Any("error msg", ev.Error()))
log.Error("kafka error", zap.String("error msg", ev.Error()))
if ev.IsFatal() {
panic(ev)
}

View File

@ -135,7 +135,7 @@ func (kc *Consumer) Chan() <-chan mqwrapper.Message {
e, err := kc.c.ReadMessage(readTimeout)
if err != nil {
// if we failed to read message in 30 Seconds, print out a warn message since there should always be a tt
log.Warn("consume msg failed", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err))
log.Warn("consume msg failed", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Error(err))
} else {
if kc.skipMsg {
kc.skipMsg = false
@ -217,7 +217,7 @@ func (kc *Consumer) GetLatestMsgID() (mqwrapper.MessageID, error) {
high = high - 1
}
log.Info("get latest msg ID ", zap.Any("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
log.Info("get latest msg ID ", zap.String("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
return &kafkaID{messageID: high}, nil
}
@ -249,7 +249,7 @@ func (kc *Consumer) closeInternal() {
}
cost := time.Since(start).Milliseconds()
if cost > 200 {
log.Warn("close consumer costs too long time", zap.Any("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
log.Warn("close consumer costs too long time", zap.String("topic", kc.topic), zap.String("groupID", kc.groupID), zap.Int64("time(ms)", cost))
}
}

View File

@ -81,14 +81,14 @@ func (kp *kafkaProducer) Close() {
// flush in-flight msg within queue.
i := kp.p.Flush(10000)
if i > 0 {
log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.Any("topic", kp.topic))
log.Warn("There are still un-flushed outstanding events", zap.Int("event_num", i), zap.String("topic", kp.topic))
}
close(kp.deliveryChan)
cost := time.Since(start).Milliseconds()
if cost > 500 {
log.Debug("kafka producer is closed", zap.Any("topic", kp.topic), zap.Int64("time cost(ms)", cost))
log.Debug("kafka producer is closed", zap.String("topic", kp.topic), zap.Int64("time cost(ms)", cost))
}
})
}

View File

@ -51,7 +51,7 @@ func ProcessFuncParallel(total, maxParallel int, f ProcessFunc, fname string) er
t := time.Now()
defer func() {
log.Debug(fname, zap.Any("total", total), zap.Any("time cost", time.Since(t)))
log.Debug(fname, zap.Int("total", total), zap.Any("time cost", time.Since(t)))
}()
nPerBatch := (total + maxParallel - 1) / maxParallel
@ -85,7 +85,7 @@ func ProcessFuncParallel(total, maxParallel int, f ProcessFunc, fname string) er
for idx := begin; idx < end; idx++ {
err = f(idx)
if err != nil {
log.Error(fname, zap.Error(err), zap.Any("idx", idx))
log.Error(fname, zap.Error(err), zap.Int("idx", idx))
break
}
}
@ -146,8 +146,8 @@ func ProcessTaskParallel(maxParallel int, fname string, tasks ...TaskFunc) error
total := len(tasks)
nPerBatch := (total + maxParallel - 1) / maxParallel
log.Debug(fname, zap.Any("total", total))
log.Debug(fname, zap.Any("nPerBatch", nPerBatch))
log.Debug(fname, zap.Int("total", total))
log.Debug(fname, zap.Int("nPerBatch", nPerBatch))
quit := make(chan bool)
errc := make(chan error)
@ -188,7 +188,7 @@ func ProcessTaskParallel(maxParallel int, fname string, tasks ...TaskFunc) error
for idx := begin; idx < end; idx++ {
err = tasks[idx]()
if err != nil {
log.Error(fname, zap.Error(err), zap.Any("idx", idx))
log.Error(fname, zap.Error(err), zap.Int("idx", idx))
break
}
}
@ -212,7 +212,7 @@ func ProcessTaskParallel(maxParallel int, fname string, tasks ...TaskFunc) error
routineNum++
}
log.Debug(fname, zap.Any("NumOfGoRoutines", routineNum))
log.Debug(fname, zap.Int("NumOfGoRoutines", routineNum))
if routineNum <= 0 {
return nil