mirror of https://github.com/milvus-io/milvus.git
Pass smoke test (#5797)
* pass smoke test Signed-off-by: yefu.chen <yefu.chen@zilliz.com>pull/5808/head
parent
d6f7fbf19f
commit
71ed21f9bf
|
@ -12,14 +12,12 @@
|
|||
package rocksmq
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"errors"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/allocator"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
@ -69,7 +67,7 @@ type rocksmq struct {
|
|||
store *gorocksdb.DB
|
||||
kv kv.BaseKV
|
||||
idAllocator allocator.GIDAllocator
|
||||
channelMu map[string]*sync.Mutex
|
||||
channelMu sync.Map
|
||||
|
||||
consumers sync.Map
|
||||
}
|
||||
|
@ -94,7 +92,7 @@ func NewRocksMQ(name string, idAllocator allocator.GIDAllocator) (*rocksmq, erro
|
|||
kv: mkv,
|
||||
idAllocator: idAllocator,
|
||||
}
|
||||
rmq.channelMu = make(map[string]*sync.Mutex)
|
||||
rmq.channelMu = sync.Map{}
|
||||
rmq.consumers = sync.Map{}
|
||||
return rmq, nil
|
||||
}
|
||||
|
@ -125,8 +123,8 @@ func (rmq *rocksmq) CreateTopic(topicName string) error {
|
|||
log.Debug("RocksMQ: save " + endKey + " failed.")
|
||||
return err
|
||||
}
|
||||
rmq.channelMu.Store(topicName, new(sync.Mutex))
|
||||
|
||||
rmq.channelMu[topicName] = new(sync.Mutex)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -224,8 +222,17 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
|||
}
|
||||
|
||||
func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error {
|
||||
rmq.channelMu[topicName].Lock()
|
||||
defer rmq.channelMu[topicName].Unlock()
|
||||
ll, ok := rmq.channelMu.Load(topicName)
|
||||
if !ok {
|
||||
return fmt.Errorf("topic name = %s not exist", topicName)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return fmt.Errorf("get mutex failed, topic name = %s", topicName)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
msgLen := len(messages)
|
||||
idStart, idEnd, err := rmq.idAllocator.Alloc(uint32(msgLen))
|
||||
|
||||
|
@ -296,8 +303,17 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) error
|
|||
}
|
||||
|
||||
func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error) {
|
||||
rmq.channelMu[topicName].Lock()
|
||||
defer rmq.channelMu[topicName].Unlock()
|
||||
ll, ok := rmq.channelMu.Load(topicName)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("topic name = %s not exist", topicName)
|
||||
}
|
||||
lock, ok := ll.(*sync.Mutex)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("get mutex failed, topic name = %s", topicName)
|
||||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
metaKey := groupName + "/" + topicName + "/current_id"
|
||||
currentID, err := rmq.kv.Load(metaKey)
|
||||
if err != nil {
|
||||
|
|
|
@ -12,7 +12,7 @@ pytest-print==0.2.1
|
|||
pytest-level==0.1.1
|
||||
pytest-xdist==2.2.1
|
||||
pytest-parallel
|
||||
pymilvus==2.0a1.dev22
|
||||
pymilvus==2.0a1.dev35
|
||||
pytest-rerunfailures==9.1.1
|
||||
git+https://github.com/Projectplace/pytest-tags
|
||||
ndg-httpsclient
|
||||
|
|
Loading…
Reference in New Issue