mirror of https://github.com/milvus-io/milvus.git
Support multiple readers on one topic (#12155)
Signed-off-by: fishpenguin <kun.yu@zilliz.com>pull/12158/head
parent
a3d4cbdd4c
commit
f5bb180adb
|
@ -41,8 +41,12 @@ func newReader(c *client, readerOptions *ReaderOptions) (*reader, error) {
|
|||
if c.server == nil {
|
||||
return nil, newError(InvalidConfiguration, "rmq server in client is nil")
|
||||
}
|
||||
err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive)
|
||||
return reader, err
|
||||
name, err := c.server.CreateReader(readerOptions.Topic, reader.startMessageID, reader.startMessageIDInclusive)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
reader.name = name
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (r *reader) Topic() string {
|
||||
|
@ -50,7 +54,7 @@ func (r *reader) Topic() string {
|
|||
}
|
||||
|
||||
func (r *reader) Next(ctx context.Context) (Message, error) {
|
||||
cMsg, err := r.c.server.Next(ctx, r.topic, r.startMessageIDInclusive)
|
||||
cMsg, err := r.c.server.Next(ctx, r.topic, r.name, r.startMessageIDInclusive)
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
|
@ -63,14 +67,14 @@ func (r *reader) Next(ctx context.Context) (Message, error) {
|
|||
}
|
||||
|
||||
func (r *reader) HasNext() bool {
|
||||
return r.c.server.HasNext(r.topic, r.startMessageIDInclusive)
|
||||
return r.c.server.HasNext(r.topic, r.name, r.startMessageIDInclusive)
|
||||
}
|
||||
|
||||
func (r *reader) Close() {
|
||||
r.c.server.CloseReader(r.topic)
|
||||
r.c.server.CloseReader(r.topic, r.name)
|
||||
}
|
||||
|
||||
func (r *reader) Seek(msgID UniqueID) error { //nolint:govet
|
||||
r.c.server.ReaderSeek(r.topic, msgID)
|
||||
r.c.server.ReaderSeek(r.topic, r.name, msgID)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -50,9 +50,9 @@ type RocksMQ interface {
|
|||
|
||||
Notify(topicName, groupName string)
|
||||
|
||||
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error
|
||||
ReaderSeek(topicName string, msgID UniqueID)
|
||||
Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error)
|
||||
HasNext(topicName string, messageIDInclusive bool) bool
|
||||
CloseReader(topicName string)
|
||||
CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error)
|
||||
ReaderSeek(topicName string, readerName string, msgID UniqueID)
|
||||
Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error)
|
||||
HasNext(topicName string, readerName string, messageIDInclusive bool) bool
|
||||
CloseReader(topicName string, readerName string)
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/kv"
|
||||
rocksdbkv "github.com/milvus-io/milvus/internal/kv/rocksdb"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||
|
||||
"github.com/tecbot/gorocksdb"
|
||||
|
@ -53,7 +54,8 @@ const (
|
|||
AckedSizeTitle = "acked_size/"
|
||||
LastRetTsTitle = "last_retention_ts/"
|
||||
|
||||
CurrentIDSuffix = "current_id"
|
||||
CurrentIDSuffix = "current_id"
|
||||
ReaderNamePrefix = "reader-"
|
||||
)
|
||||
|
||||
/**
|
||||
|
@ -115,6 +117,19 @@ func checkRetention() bool {
|
|||
return RocksmqRetentionTimeInMinutes != -1 && RocksmqRetentionSizeInMB != -1
|
||||
}
|
||||
|
||||
func getNowTs(idAllocator allocator.GIDAllocator) (int64, error) {
|
||||
err := idAllocator.UpdateID()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
newID, err := idAllocator.AllocOne()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
nowTs, _ := tsoutil.ParseTS(uint64(newID))
|
||||
return nowTs.Unix(), err
|
||||
}
|
||||
|
||||
var topicMu sync.Map = sync.Map{}
|
||||
|
||||
type rocksmq struct {
|
||||
|
@ -312,7 +327,7 @@ func (rmq *rocksmq) DestroyTopic(topicName string) error {
|
|||
|
||||
// clean up reader
|
||||
if val, ok := rmq.readers.LoadAndDelete(topicName); ok {
|
||||
if reader, rOk := val.(*rocksmqReader); rOk {
|
||||
for _, reader := range val.([]*rocksmqReader) {
|
||||
reader.Close()
|
||||
}
|
||||
}
|
||||
|
@ -499,7 +514,7 @@ func (rmq *rocksmq) Produce(topicName string, messages []ProducerMessage) ([]Uni
|
|||
|
||||
// Notify reader
|
||||
if val, ok := rmq.readers.Load(topicName); ok {
|
||||
if reader, rOk := val.(*rocksmqReader); rOk {
|
||||
for _, reader := range val.([]*rocksmqReader) {
|
||||
select {
|
||||
case reader.readerMutex <- struct{}{}:
|
||||
default:
|
||||
|
@ -915,65 +930,90 @@ func (rmq *rocksmq) updateAckedInfo(topicName, groupName string, ids []UniqueID)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) error {
|
||||
func (rmq *rocksmq) CreateReader(topicName string, startMsgID UniqueID, messageIDInclusive bool) (string, error) {
|
||||
readOpts := gorocksdb.NewDefaultReadOptions()
|
||||
readOpts.SetPrefixSameAsStart(true)
|
||||
iter := rmq.store.NewIterator(readOpts)
|
||||
fixChanName, err := fixChannelName(topicName)
|
||||
if err != nil {
|
||||
log.Debug("RocksMQ: fixChannelName " + topicName + " failed")
|
||||
return err
|
||||
return "", err
|
||||
}
|
||||
dataKey := path.Join(fixChanName, strconv.FormatInt(startMsgID, 10))
|
||||
iter.Seek([]byte(dataKey))
|
||||
if !iter.Valid() {
|
||||
log.Warn("iterator of startMsgID is invalid")
|
||||
}
|
||||
nowTs, err := getNowTs(rmq.idAllocator)
|
||||
if err != nil {
|
||||
return "", errors.New("Can't get current ts from rocksmq idAllocator")
|
||||
}
|
||||
readerName := ReaderNamePrefix + strconv.FormatInt(nowTs, 10)
|
||||
|
||||
reader := &rocksmqReader{
|
||||
store: rmq.store,
|
||||
topic: topicName,
|
||||
readerName: readerName,
|
||||
readOpts: readOpts,
|
||||
iter: iter,
|
||||
currentID: startMsgID,
|
||||
messageIDInclusive: messageIDInclusive,
|
||||
readerMutex: make(chan struct{}, 1),
|
||||
}
|
||||
rmq.readers.Store(topicName, reader)
|
||||
if vals, ok := rmq.readers.Load(topicName); ok {
|
||||
readers := vals.([]*rocksmqReader)
|
||||
readers = append(readers, reader)
|
||||
rmq.readers.Store(topicName, readers)
|
||||
} else {
|
||||
readers := make([]*rocksmqReader, 1)
|
||||
readers[0] = reader
|
||||
rmq.readers.Store(topicName, readers)
|
||||
}
|
||||
return readerName, nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) getReader(topicName, readerName string) *rocksmqReader {
|
||||
if vals, ok := rmq.readers.Load(topicName); ok {
|
||||
for _, v := range vals.([]*rocksmqReader) {
|
||||
if v.readerName == readerName {
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) ReaderSeek(topicName string, msgID UniqueID) {
|
||||
if val, ok := rmq.readers.Load(topicName); ok {
|
||||
if reader, rOk := val.(*rocksmqReader); rOk {
|
||||
reader.Seek(msgID)
|
||||
}
|
||||
func (rmq *rocksmq) ReaderSeek(topicName string, readerName string, msgID UniqueID) {
|
||||
reader := rmq.getReader(topicName, readerName)
|
||||
if reader == nil {
|
||||
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
|
||||
return
|
||||
}
|
||||
reader.Seek(msgID)
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) Next(ctx context.Context, topicName string, messageIDInclusive bool) (ConsumerMessage, error) {
|
||||
if val, ok := rmq.readers.Load(topicName); ok {
|
||||
if reader, rOk := val.(*rocksmqReader); rOk {
|
||||
return reader.Next(ctx, messageIDInclusive)
|
||||
}
|
||||
func (rmq *rocksmq) Next(ctx context.Context, topicName string, readerName string, messageIDInclusive bool) (ConsumerMessage, error) {
|
||||
reader := rmq.getReader(topicName, readerName)
|
||||
if reader == nil {
|
||||
return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName)
|
||||
}
|
||||
return ConsumerMessage{}, fmt.Errorf("reader of %s doesn't exist", topicName)
|
||||
return reader.Next(ctx, messageIDInclusive)
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) HasNext(topicName string, messageIDInclusive bool) bool {
|
||||
if val, ok := rmq.readers.Load(topicName); ok {
|
||||
if reader, rOk := val.(*rocksmqReader); rOk {
|
||||
return reader.HasNext(messageIDInclusive)
|
||||
}
|
||||
func (rmq *rocksmq) HasNext(topicName string, readerName string, messageIDInclusive bool) bool {
|
||||
reader := rmq.getReader(topicName, readerName)
|
||||
if reader == nil {
|
||||
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
|
||||
return false
|
||||
}
|
||||
return false
|
||||
return reader.HasNext(messageIDInclusive)
|
||||
}
|
||||
|
||||
func (rmq *rocksmq) CloseReader(topicName string) {
|
||||
if val, ok := rmq.readers.Load(topicName); ok {
|
||||
if reader, rOk := val.(*rocksmqReader); rOk {
|
||||
reader.Close()
|
||||
}
|
||||
func (rmq *rocksmq) CloseReader(topicName string, readerName string) {
|
||||
reader := rmq.getReader(topicName, readerName)
|
||||
if reader == nil {
|
||||
log.Warn("reader not exist", zap.String("topic", topicName), zap.String("readerName", readerName))
|
||||
return
|
||||
}
|
||||
reader.Close()
|
||||
}
|
||||
|
|
|
@ -639,7 +639,7 @@ func TestRocksmq_Reader(t *testing.T) {
|
|||
defer rmq.DestroyTopic(channelName)
|
||||
loopNum := 100
|
||||
|
||||
err = rmq.CreateReader(channelName, 0, true)
|
||||
readerName, err := rmq.CreateReader(channelName, 0, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
pMsgs := make([]ProducerMessage, loopNum)
|
||||
|
@ -652,22 +652,22 @@ func TestRocksmq_Reader(t *testing.T) {
|
|||
assert.Nil(t, err)
|
||||
assert.Equal(t, len(ids), loopNum)
|
||||
|
||||
rmq.ReaderSeek(channelName, ids[0])
|
||||
rmq.ReaderSeek(channelName, readerName, ids[0])
|
||||
ctx := context.Background()
|
||||
for i := 0; i < loopNum; i++ {
|
||||
assert.Equal(t, true, rmq.HasNext(channelName, true))
|
||||
msg, err := rmq.Next(ctx, channelName, true)
|
||||
assert.Equal(t, true, rmq.HasNext(channelName, readerName, true))
|
||||
msg, err := rmq.Next(ctx, channelName, readerName, true)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, msg.MsgID, ids[i])
|
||||
}
|
||||
assert.False(t, rmq.HasNext(channelName, true))
|
||||
assert.False(t, rmq.HasNext(channelName, readerName, true))
|
||||
|
||||
rmq.ReaderSeek(channelName, ids[0])
|
||||
rmq.ReaderSeek(channelName, readerName, ids[0])
|
||||
for i := 0; i < loopNum-1; i++ {
|
||||
assert.Equal(t, true, rmq.HasNext(channelName, false))
|
||||
msg, err := rmq.Next(ctx, channelName, false)
|
||||
assert.Equal(t, true, rmq.HasNext(channelName, readerName, false))
|
||||
msg, err := rmq.Next(ctx, channelName, readerName, false)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, msg.MsgID, ids[i+1])
|
||||
}
|
||||
assert.False(t, rmq.HasNext(channelName, false))
|
||||
assert.False(t, rmq.HasNext(channelName, readerName, false))
|
||||
}
|
||||
|
|
|
@ -23,8 +23,9 @@ import (
|
|||
)
|
||||
|
||||
type rocksmqReader struct {
|
||||
store *gorocksdb.DB
|
||||
topic string
|
||||
store *gorocksdb.DB
|
||||
topic string
|
||||
readerName string
|
||||
|
||||
readOpts *gorocksdb.ReadOptions
|
||||
iter *gorocksdb.Iterator
|
||||
|
@ -71,7 +72,11 @@ func (rr *rocksmqReader) Next(ctx context.Context, messageIDInclusive bool) (Con
|
|||
case <-ctx.Done():
|
||||
log.Debug("Stop get next reader message!")
|
||||
return ConsumerMessage{}, nil
|
||||
case <-rr.readerMutex:
|
||||
case _, ok := <-rr.readerMutex:
|
||||
if !ok {
|
||||
log.Warn("reader Mutex closed")
|
||||
return ConsumerMessage{}, nil
|
||||
}
|
||||
dataKey := path.Join(fixChanName, strconv.FormatInt(rr.currentID, 10))
|
||||
if iter.Seek([]byte(dataKey)); !iter.Valid() {
|
||||
continue
|
||||
|
|
Loading…
Reference in New Issue