mirror of https://github.com/milvus-io/milvus.git
parent
1e4afda824
commit
85bc2a54f0
|
@ -597,6 +597,7 @@ func (rtms *RmqTtMsgStream) Seek(mp *msgstream.MsgPosition) error {
|
|||
rtms.addConsumer(consumer, seekChannel)
|
||||
|
||||
if len(consumer.Chan()) == 0 {
|
||||
log.Debug("len(consumer.Chan()) = 0")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
package rocksmq
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
server "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq"
|
||||
)
|
||||
|
@ -44,6 +42,22 @@ func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
|
|||
|
||||
func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
||||
// Create a consumer
|
||||
//for _, con := range c.consumers {
|
||||
// log.Debug(con.Topic() + "---------------" + con.Subscription())
|
||||
// if con.Topic() == options.Topic && con.Subscription() == options.SubscriptionName {
|
||||
// log.Debug("consumer existed")
|
||||
// return con, nil
|
||||
// }
|
||||
//}
|
||||
if exist, con := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName); exist {
|
||||
log.Debug("EXISTED")
|
||||
consumer, err := newConsumer1(c, options, con.MsgMutex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go consume(consumer)
|
||||
return consumer, nil
|
||||
}
|
||||
consumer, err := newConsumer(c, options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -65,42 +79,58 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|||
|
||||
// Take messages from RocksDB and put it into consumer.Chan(),
|
||||
// trigger by consumer.MsgMutex which trigger by producer
|
||||
go func() {
|
||||
for { //nolint:gosimple
|
||||
select {
|
||||
case _, ok := <-consumer.MsgMutex():
|
||||
if !ok {
|
||||
// consumer MsgMutex closed, goroutine exit
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1)
|
||||
if err != nil {
|
||||
log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic +
|
||||
"," + consumer.consumerName + "): " + err.Error())
|
||||
break
|
||||
}
|
||||
|
||||
if len(msg) != 1 {
|
||||
log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic +
|
||||
"," + consumer.consumerName + "): message len(" + strconv.Itoa(len(msg)) +
|
||||
") is not 1")
|
||||
break
|
||||
}
|
||||
|
||||
consumer.messageCh <- ConsumerMessage{
|
||||
MsgID: msg[0].MsgID,
|
||||
Payload: msg[0].Payload,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
go consume(consumer)
|
||||
c.consumerOptions = append(c.consumerOptions, options)
|
||||
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
func consume(consumer *consumer) {
|
||||
for { //nolint:gosimple
|
||||
log.Debug(consumer.topic + "+" + consumer.consumerName)
|
||||
//if consumer.msgMutex == nil {
|
||||
// break
|
||||
//}
|
||||
select { //nolint:gosimple
|
||||
case _, ok := <-consumer.MsgMutex():
|
||||
if !ok {
|
||||
// consumer MsgMutex closed, goroutine exit
|
||||
log.Debug("consumer MsgMutex closed")
|
||||
return
|
||||
}
|
||||
log.Debug("Before consume")
|
||||
|
||||
for {
|
||||
msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1)
|
||||
if err != nil {
|
||||
log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic +
|
||||
"," + consumer.consumerName + "): " + err.Error())
|
||||
break
|
||||
}
|
||||
|
||||
if len(msg) != 1 {
|
||||
//log.Debug("Consumer's goroutine cannot consume from (" + consumer.topic +
|
||||
// "," + consumer.consumerName + "): message len(" + strconv.Itoa(len(msg)) +
|
||||
// ") is not 1")
|
||||
break
|
||||
}
|
||||
|
||||
consumer.messageCh <- ConsumerMessage{
|
||||
MsgID: msg[0].MsgID,
|
||||
Payload: msg[0].Payload,
|
||||
}
|
||||
}
|
||||
//default:
|
||||
// log.Debug("In default")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *client) Close() {
|
||||
// TODO: free resources
|
||||
for _, opt := range c.consumerOptions {
|
||||
log.Debug("Close" + opt.Topic + "+" + opt.SubscriptionName)
|
||||
_ = c.server.DestroyConsumerGroup(opt.Topic, opt.SubscriptionName)
|
||||
//_ = c.server.DestroyTopic(opt.Topic)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,34 @@ func newConsumer(c *client, options ConsumerOptions) (*consumer, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func newConsumer1(c *client, options ConsumerOptions, msgMutex chan struct{}) (*consumer, error) {
|
||||
if c == nil {
|
||||
return nil, newError(InvalidConfiguration, "client is nil")
|
||||
}
|
||||
|
||||
if options.Topic == "" {
|
||||
return nil, newError(InvalidConfiguration, "Topic is empty")
|
||||
}
|
||||
|
||||
if options.SubscriptionName == "" {
|
||||
return nil, newError(InvalidConfiguration, "SubscriptionName is empty")
|
||||
}
|
||||
|
||||
messageCh := options.MessageChannel
|
||||
if options.MessageChannel == nil {
|
||||
messageCh = make(chan ConsumerMessage, 1)
|
||||
}
|
||||
|
||||
return &consumer{
|
||||
topic: options.Topic,
|
||||
client: c,
|
||||
consumerName: options.SubscriptionName,
|
||||
options: options,
|
||||
msgMutex: msgMutex,
|
||||
messageCh: messageCh,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *consumer) Subscription() string {
|
||||
return c.consumerName
|
||||
}
|
||||
|
|
|
@ -26,4 +26,5 @@ type RocksMQ interface {
|
|||
Produce(topicName string, messages []ProducerMessage) error
|
||||
Consume(topicName string, groupName string, n int) ([]ConsumerMessage, error)
|
||||
Seek(topicName string, groupName string, msgID UniqueID) error
|
||||
ExistConsumerGroup(topicName string, groupName string) (bool, *Consumer)
|
||||
}
|
||||
|
|
|
@ -134,10 +134,23 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
|||
log.Debug("RocksMQ: remove " + endKey + " failed.")
|
||||
return err
|
||||
}
|
||||
log.Debug("DestroyTopic: " + topicName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) ExistConsumerGroup(topicName, groupName string) (bool, *Consumer) {
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
if rmq.checkKeyExist(key) {
|
||||
for _, con := range rmq.consumers[topicName] {
|
||||
if con.GroupName == groupName {
|
||||
return true, con
|
||||
}
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) CreateConsumerGroup(topicName, groupName string) error {
|
||||
key := groupName + "/" + topicName + "/current_id"
|
||||
if rmq.checkKeyExist(key) {
|
||||
|
@ -170,6 +183,14 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
|||
log.Debug("RocksMQ: remove " + key + " failed.")
|
||||
return err
|
||||
}
|
||||
for index, con := range rmq.consumers[topicName] {
|
||||
if con.GroupName == groupName {
|
||||
rmq.consumers[topicName] = append(rmq.consumers[topicName][:index],
|
||||
rmq.consumers[topicName][index+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("DestroyConsumerGroup: " + topicName + "+" + groupName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue