mirror of https://github.com/milvus-io/milvus.git
parent
a78cd3e2a7
commit
c4f9a6f22b
|
@ -57,6 +57,7 @@ func (rc *rmqClient) Subscribe(options client.ConsumerOptions) (client.Consumer,
|
|||
close(msgChannel)
|
||||
return
|
||||
}
|
||||
msg.Topic = options.Topic
|
||||
msgChannel <- &rmqMessage{msg: msg}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -606,6 +606,7 @@ func (ms *TtMsgStream) findTimeTick(consumer Consumer,
|
|||
}
|
||||
|
||||
// set msg info to tsMsg
|
||||
log.Debug(msg.Topic() + "--------" + string(msg.ID().Serialize()))
|
||||
tsMsg.SetPosition(&msgstream.MsgPosition{
|
||||
ChannelName: filepath.Base(msg.Topic()),
|
||||
MsgID: msg.ID().Serialize(),
|
||||
|
@ -701,7 +702,7 @@ func (ms *TtMsgStream) Seek(mp *internalpb.MsgPosition) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
consumer.Seek(seekMsgID)
|
||||
_ = consumer.Seek(seekMsgID)
|
||||
ms.addConsumer(consumer, seekChannel)
|
||||
|
||||
//TODO: May cause problem
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
package rocksmq
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zilliztech/milvus-distributed/internal/log"
|
||||
server "github.com/zilliztech/milvus-distributed/internal/util/rocksmq/server/rocksmq"
|
||||
)
|
||||
|
@ -9,6 +11,8 @@ type client struct {
|
|||
server RocksMQ
|
||||
producerOptions []ProducerOptions
|
||||
consumerOptions []ConsumerOptions
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func newClient(options ClientOptions) (*client, error) {
|
||||
|
@ -16,9 +20,12 @@ func newClient(options ClientOptions) (*client, error) {
|
|||
return nil, newError(InvalidConfiguration, "Server is nil")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &client{
|
||||
server: options.Server,
|
||||
producerOptions: []ProducerOptions{},
|
||||
context: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
@ -55,7 +62,7 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go consume(consumer)
|
||||
go consume(c.context, consumer)
|
||||
return consumer, nil
|
||||
}
|
||||
consumer, err := newConsumer(c, options)
|
||||
|
@ -79,26 +86,25 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|||
|
||||
// Take messages from RocksDB and put it into consumer.Chan(),
|
||||
// trigger by consumer.MsgMutex which trigger by producer
|
||||
go consume(consumer)
|
||||
go consume(c.context, consumer)
|
||||
c.consumerOptions = append(c.consumerOptions, options)
|
||||
|
||||
return consumer, nil
|
||||
}
|
||||
|
||||
func consume(consumer *consumer) {
|
||||
func consume(ctx context.Context, consumer *consumer) {
|
||||
for { //nolint:gosimple
|
||||
log.Debug(consumer.topic + "+" + consumer.consumerName)
|
||||
//if consumer.msgMutex == nil {
|
||||
// break
|
||||
//}
|
||||
select { //nolint:gosimple
|
||||
case <-ctx.Done():
|
||||
log.Debug("client finished")
|
||||
return
|
||||
case _, ok := <-consumer.MsgMutex():
|
||||
log.Debug("Before consume")
|
||||
if !ok {
|
||||
// consumer MsgMutex closed, goroutine exit
|
||||
log.Debug("consumer MsgMutex closed")
|
||||
return
|
||||
}
|
||||
log.Debug("Before consume")
|
||||
|
||||
for {
|
||||
msg, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, 1)
|
||||
|
@ -120,8 +126,7 @@ func consume(consumer *consumer) {
|
|||
Payload: msg[0].Payload,
|
||||
}
|
||||
}
|
||||
//default:
|
||||
// log.Debug("In default")
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -131,6 +136,8 @@ func (c *client) Close() {
|
|||
for _, opt := range c.consumerOptions {
|
||||
log.Debug("Close" + opt.Topic + "+" + opt.SubscriptionName)
|
||||
_ = c.server.DestroyConsumerGroup(opt.Topic, opt.SubscriptionName)
|
||||
//TODO(yukun): Should topic be closed?
|
||||
//_ = c.server.DestroyTopic(opt.Topic)
|
||||
}
|
||||
c.cancel()
|
||||
}
|
||||
|
|
|
@ -134,6 +134,8 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
|||
log.Debug("RocksMQ: remove " + endKey + " failed.")
|
||||
return err
|
||||
}
|
||||
|
||||
delete(rmq.consumers, topicName)
|
||||
log.Debug("DestroyTopic: " + topicName)
|
||||
|
||||
return nil
|
||||
|
@ -185,6 +187,7 @@ func (rmq *rocksmq) DestroyConsumerGroup(topicName, groupName string) error {
|
|||
}
|
||||
for index, con := range rmq.consumers[topicName] {
|
||||
if con.GroupName == groupName {
|
||||
close(con.MsgMutex)
|
||||
rmq.consumers[topicName] = append(rmq.consumers[topicName][:index],
|
||||
rmq.consumers[topicName][index+1:]...)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue