Add more elapse log in rocksmq produce and consume (#14446)

Signed-off-by: fishpenguin <kun.yu@zilliz.com>
pull/14692/head
yukun 2021-12-31 15:55:18 +08:00 committed by GitHub
parent d7da870cc4
commit c4025dba7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 4 deletions

View File

@ -581,9 +581,11 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
if getProduceTime > 200 {
log.Warn("rocksmq produce too slowly", zap.String("topic", topicName),
zap.Int64("get lock elapse", getLockTime),
zap.Int64("alloc elapse", allocTime),
zap.Int64("write elapse", writeTime),
zap.Int64("updatePage elapse", getProduceTime))
zap.Int64("alloc elapse", allocTime-getLockTime),
zap.Int64("write elapse", writeTime-allocTime),
zap.Int64("updatePage elapse", getProduceTime-writeTime),
zap.Int64("produce total elapse", getProduceTime),
)
}
return msgIDs, nil
}
@ -695,6 +697,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
if err := iter.Err(); err != nil {
return nil, err
}
iterTime := time.Since(start).Milliseconds()
// When already consume to last mes, an empty slice will be returned
if len(consumerMessage) == 0 {
@ -713,13 +716,17 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
zap.String("groupName", groupName), zap.Error(err))
return nil, err
}
updateAckedTime := time.Since(start).Milliseconds()
rmq.moveConsumePos(topicName, groupName, newID+1)
// TODO add this to monitor metrics
getConsumeTime := time.Since(start).Milliseconds()
if getConsumeTime > 200 {
log.Warn("rocksmq consume too slowly", zap.String("topic", topicName),
zap.Int64("get lock elapse", getLockTime), zap.Int64("consume elapse", getConsumeTime))
zap.Int64("get lock elapse", getLockTime),
zap.Int64("iterator elapse", iterTime-getLockTime),
zap.Int64("updateAckedInfo elapse", updateAckedTime-iterTime),
zap.Int64("total consume elapse", getConsumeTime))
}
return consumerMessage, nil
}