mirror of https://github.com/milvus-io/milvus.git
parent
8a9fb9b287
commit
8c947109a9
|
@ -401,6 +401,7 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
|||
|
||||
// Produce produces messages for topic and updates page infos for retention
|
||||
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]UniqueID, error) {
|
||||
start := time.Now()
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return []UniqueID{}, fmt.Errorf("topic name = %s not exist", topicName)
|
||||
|
@ -412,6 +413,11 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
|
|||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
getLockTime := time.Since(start).Milliseconds()
|
||||
if getLockTime > 200 {
|
||||
log.Warn("rocksmq produce get lock slow", zap.Int64("elapse", getLockTime))
|
||||
}
|
||||
|
||||
msgLen := len(messages)
|
||||
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
|
||||
|
||||
|
@ -490,6 +496,8 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
|
|||
if err != nil {
|
||||
return []UniqueID{}, err
|
||||
}
|
||||
log.Debug("Rocksmq produce successfully ", zap.String("topic", topicName),
|
||||
zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return msgIDs, nil
|
||||
}
|
||||
|
||||
|
@ -534,6 +542,7 @@ func (rmq *rocksmq) updatePageInfo(topicName string, msgIDs []UniqueID, msgSizes
|
|||
// 2. Update current_id to the last consumed message
|
||||
// 3. Update ack informations in rocksdb
|
||||
func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) {
|
||||
start := time.Now()
|
||||
ll, ok := topicMu.Load(topicName)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("topic name = %s not exist", topicName)
|
||||
|
@ -624,7 +633,9 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
}
|
||||
|
||||
go rmq.updateAckedInfo(topicName, groupName, consumedIDs)
|
||||
|
||||
log.Debug("Rocksmq produce successfully ", zap.String("topic", topicName),
|
||||
zap.String("groupName", groupName),
|
||||
zap.Int64("elapsed", time.Since(start).Milliseconds()))
|
||||
return consumerMessage, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue