2021-04-19 05:42:47 +00:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2022-03-03 13:57:56 +00:00
|
|
|
package client
|
2021-03-13 03:46:50 +00:00
|
|
|
|
2021-03-19 11:33:21 +00:00
|
|
|
import (
|
2021-04-08 12:05:33 +00:00
|
|
|
"reflect"
|
2021-09-22 09:21:00 +00:00
|
|
|
"sync"
|
2021-03-27 09:39:12 +00:00
|
|
|
|
2021-09-22 09:21:00 +00:00
|
|
|
"go.uber.org/zap"
|
2023-04-06 11:14:32 +00:00
|
|
|
|
|
|
|
"github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
|
|
|
|
"github.com/milvus-io/milvus/pkg/log"
|
|
|
|
"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
|
2021-03-19 11:33:21 +00:00
|
|
|
)
|
|
|
|
|
2021-03-13 03:46:50 +00:00
|
|
|
type client struct {
|
2021-03-19 11:33:21 +00:00
|
|
|
server RocksMQ
|
|
|
|
producerOptions []ProducerOptions
|
|
|
|
consumerOptions []ConsumerOptions
|
2021-09-22 09:21:00 +00:00
|
|
|
wg *sync.WaitGroup
|
2021-10-22 09:25:12 +00:00
|
|
|
closeCh chan struct{}
|
|
|
|
closeOnce sync.Once
|
2021-03-13 03:46:50 +00:00
|
|
|
}
|
|
|
|
|
2022-03-03 13:57:56 +00:00
|
|
|
func newClient(options Options) (*client, error) {
|
|
|
|
|
2021-03-19 11:33:21 +00:00
|
|
|
if options.Server == nil {
|
2021-10-19 09:50:49 +00:00
|
|
|
return nil, newError(InvalidConfiguration, "options.Server is nil")
|
2021-03-13 03:46:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
c := &client{
|
2021-03-19 11:33:21 +00:00
|
|
|
server: options.Server,
|
|
|
|
producerOptions: []ProducerOptions{},
|
2021-09-22 09:21:00 +00:00
|
|
|
wg: &sync.WaitGroup{},
|
2021-10-22 09:25:12 +00:00
|
|
|
closeCh: make(chan struct{}),
|
2021-03-13 03:46:50 +00:00
|
|
|
}
|
|
|
|
return c, nil
|
|
|
|
}
|
|
|
|
|
2021-10-29 13:45:01 +00:00
|
|
|
// CreateProducer create a rocksmq producer
|
2021-03-13 03:46:50 +00:00
|
|
|
func (c *client) CreateProducer(options ProducerOptions) (Producer, error) {
|
|
|
|
// Create a producer
|
|
|
|
producer, err := newProducer(c, options)
|
2022-03-03 13:57:56 +00:00
|
|
|
|
2021-03-13 03:46:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-04-08 12:05:33 +00:00
|
|
|
if reflect.ValueOf(c.server).IsNil() {
|
2021-10-19 09:52:42 +00:00
|
|
|
return nil, newError(0, "Rmq server is nil")
|
2021-04-08 12:05:33 +00:00
|
|
|
}
|
2021-03-13 03:46:50 +00:00
|
|
|
// Create a topic in rocksmq, ignore if topic exists
|
2021-03-19 11:33:21 +00:00
|
|
|
err = c.server.CreateTopic(options.Topic)
|
2021-03-13 03:46:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-03-19 11:33:21 +00:00
|
|
|
c.producerOptions = append(c.producerOptions, options)
|
2021-03-13 03:46:50 +00:00
|
|
|
|
|
|
|
return producer, nil
|
|
|
|
}
|
|
|
|
|
2021-11-01 15:02:33 +00:00
|
|
|
// Subscribe create a rocksmq consumer and start consume in a goroutine
|
2021-03-13 03:46:50 +00:00
|
|
|
func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|
|
|
// Create a consumer
|
2021-04-08 12:05:33 +00:00
|
|
|
if reflect.ValueOf(c.server).IsNil() {
|
2021-10-19 09:52:42 +00:00
|
|
|
return nil, newError(0, "Rmq server is nil")
|
2021-04-08 12:05:33 +00:00
|
|
|
}
|
2023-05-19 08:23:24 +00:00
|
|
|
|
2021-12-17 15:44:42 +00:00
|
|
|
exist, con, err := c.server.ExistConsumerGroup(options.Topic, options.SubscriptionName)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if exist {
|
2021-09-22 09:21:00 +00:00
|
|
|
log.Debug("ConsumerGroup already existed", zap.Any("topic", options.Topic), zap.Any("SubscriptionName", options.SubscriptionName))
|
2021-09-30 12:51:40 +00:00
|
|
|
consumer, err := getExistedConsumer(c, options, con.MsgMutex)
|
2021-03-25 10:49:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2022-10-25 05:23:30 +00:00
|
|
|
if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest {
|
2021-09-30 12:51:40 +00:00
|
|
|
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
2021-03-25 10:49:41 +00:00
|
|
|
return consumer, nil
|
|
|
|
}
|
2021-03-13 03:46:50 +00:00
|
|
|
consumer, err := newConsumer(c, options)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a consumergroup in rocksmq, raise error if consumergroup exists
|
2021-03-19 11:33:21 +00:00
|
|
|
err = c.server.CreateConsumerGroup(options.Topic, options.SubscriptionName)
|
2021-03-13 03:46:50 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2021-03-19 11:33:21 +00:00
|
|
|
// Register self in rocksmq server
|
|
|
|
cons := &server.Consumer{
|
|
|
|
Topic: consumer.topic,
|
|
|
|
GroupName: consumer.consumerName,
|
|
|
|
MsgMutex: consumer.msgMutex,
|
|
|
|
}
|
|
|
|
c.server.RegisterConsumer(cons)
|
|
|
|
|
2023-05-19 08:23:24 +00:00
|
|
|
if options.SubscriptionInitialPosition == mqwrapper.SubscriptionPositionLatest {
|
|
|
|
err = c.server.SeekToLatest(options.Topic, options.SubscriptionName)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-03-19 11:33:21 +00:00
|
|
|
// Take messages from RocksDB and put it into consumer.Chan(),
|
|
|
|
// trigger by consumer.MsgMutex which trigger by producer
|
2021-03-25 10:49:41 +00:00
|
|
|
c.consumerOptions = append(c.consumerOptions, options)
|
|
|
|
|
|
|
|
return consumer, nil
|
|
|
|
}
|
|
|
|
|
2021-09-22 09:21:00 +00:00
|
|
|
func (c *client) consume(consumer *consumer) {
|
|
|
|
defer c.wg.Done()
|
2021-03-29 09:48:15 +00:00
|
|
|
for {
|
|
|
|
select {
|
2021-10-22 09:25:12 +00:00
|
|
|
case <-c.closeCh:
|
2021-03-27 09:39:12 +00:00
|
|
|
return
|
2021-11-24 02:25:15 +00:00
|
|
|
case _, ok := <-consumer.initCh:
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
c.deliver(consumer, 100)
|
2021-03-25 10:49:41 +00:00
|
|
|
case _, ok := <-consumer.MsgMutex():
|
|
|
|
if !ok {
|
|
|
|
// consumer MsgMutex closed, goroutine exit
|
2021-10-19 09:54:37 +00:00
|
|
|
log.Debug("Consumer MsgMutex closed")
|
2021-03-25 10:49:41 +00:00
|
|
|
return
|
|
|
|
}
|
2021-11-24 02:25:15 +00:00
|
|
|
c.deliver(consumer, 100)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-03-25 10:49:41 +00:00
|
|
|
|
2021-12-30 12:08:15 +00:00
|
|
|
func (c *client) deliver(consumer *consumer, batchMax int) {
|
2021-11-24 02:25:15 +00:00
|
|
|
for {
|
|
|
|
n := cap(consumer.messageCh) - len(consumer.messageCh)
|
2021-12-30 12:08:15 +00:00
|
|
|
if n == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if n > batchMax { // batch min size
|
|
|
|
n = batchMax
|
2021-11-24 02:25:15 +00:00
|
|
|
}
|
|
|
|
msgs, err := consumer.client.server.Consume(consumer.topic, consumer.consumerName, n)
|
|
|
|
if err != nil {
|
|
|
|
log.Warn("Consumer's goroutine cannot consume from (" + consumer.topic + "," + consumer.consumerName + "): " + err.Error())
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// no more msgs
|
|
|
|
if len(msgs) == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
for _, msg := range msgs {
|
|
|
|
select {
|
|
|
|
case consumer.messageCh <- Message{
|
2023-03-29 10:10:02 +00:00
|
|
|
MsgID: msg.MsgID,
|
|
|
|
Payload: msg.Payload,
|
|
|
|
Properties: msg.Properties,
|
|
|
|
Topic: consumer.Topic()}:
|
2021-11-24 02:25:15 +00:00
|
|
|
case <-c.closeCh:
|
|
|
|
return
|
2021-03-19 11:33:21 +00:00
|
|
|
}
|
|
|
|
}
|
2021-03-25 10:49:41 +00:00
|
|
|
}
|
2021-03-13 03:46:50 +00:00
|
|
|
}
|
|
|
|
|
2021-11-08 13:09:07 +00:00
|
|
|
// Close close the channel to notify rocksmq to stop operation and close rocksmq server
|
2021-03-13 03:46:50 +00:00
|
|
|
func (c *client) Close() {
|
2021-10-22 09:25:12 +00:00
|
|
|
c.closeOnce.Do(func() {
|
|
|
|
close(c.closeCh)
|
|
|
|
c.wg.Wait()
|
|
|
|
})
|
2021-03-13 03:46:50 +00:00
|
|
|
}
|