mirror of https://github.com/milvus-io/milvus.git
parent
c486007796
commit
9303a6414d
|
@ -49,6 +49,8 @@ const (
|
|||
AckedTsTitle = "acked_ts/"
|
||||
AckedSizeTitle = "acked_size/"
|
||||
LastRetTsTitle = "last_retention_ts/"
|
||||
|
||||
CurrentIDSuffix = "current_id"
|
||||
)
|
||||
|
||||
/**
|
||||
|
@ -80,6 +82,13 @@ func combKey(channelName string, id UniqueID) (string, error) {
|
|||
return fixName + "/" + strconv.FormatInt(id, 10), nil
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct current id
|
||||
*/
|
||||
func constructCurrentID(topicName, groupName string) string {
|
||||
return groupName + "/" + topicName + "/" + CurrentIDSuffix
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct table name and fixed channel name to be a key with length of FixedChannelNameLen,
|
||||
* used for meta infos
|
||||
|
@ -308,7 +317,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
|||
}
|
||||
|
||||
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) {
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
key := constructCurrentID(topicName, groupName)
|
||||
if rmq.checkKeyExist(key) {
|
||||
if vals, ok := rmq.consumers.Load(topicName); ok {
|
||||
for _, v := range vals.([]*Consumer) {
|
||||
|
@ -322,7 +331,7 @@ func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Cons
|
|||
}
|
||||
|
||||
func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
key := constructCurrentID(topicName, groupName)
|
||||
if rmq.checkKeyExist(key) {
|
||||
log.Debug("RocksMQ: " + key + " existed.")
|
||||
return nil
|
||||
|
@ -363,7 +372,7 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
|||
}
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
key := constructCurrentID(topicName, groupName)
|
||||
|
||||
err := rmq.kv.Remove(key)
|
||||
if err != nil {
|
||||
|
@ -540,7 +549,7 @@ func (rmq *rocksmq) Consume(topicName string, groupName string, n int) ([]Consum
|
|||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
metaKey := groupName + "/" + topicName + "/current_id"
|
||||
metaKey := constructCurrentID(topicName, groupName)
|
||||
currentID, err := rmq.kv.Load(metaKey)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: load " + metaKey + " failed")
|
||||
|
@ -622,7 +631,7 @@ func (rmq *rocksmq) Seek(topicName string, groupName string, msgID UniqueID) err
|
|||
/* Step I: Check if key exists */
|
||||
rmq.storeMu.Lock()
|
||||
defer rmq.storeMu.Unlock()
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
key := constructCurrentID(topicName, groupName)
|
||||
if !rmq.checkKeyExist(key) {
|
||||
log.Debug("RocksMQ: channel " + key + " not exists")
|
||||
return fmt.Errorf("ConsumerGroup %s, channel %s not exists", groupName, topicName)
|
||||
|
|
Loading…
Reference in New Issue