mirror of https://github.com/milvus-io/milvus.git
Fix msgs consumed before seek appear in output after seek (#11419)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/11468/head
parent
d50543a030
commit
6d3952c9bf
|
@ -82,8 +82,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go c.consume(consumer)
|
||||
return consumer, nil
|
||||
}
|
||||
consumer, err := newConsumer(c, options)
|
||||
|
@ -113,8 +111,6 @@ func (c *client) Subscribe(options ConsumerOptions) (Consumer, error) {
|
|||
|
||||
// Take messages from RocksDB and put it into consumer.Chan(),
|
||||
// trigger by consumer.MsgMutex which trigger by producer
|
||||
c.wg.Add(1)
|
||||
go c.consume(consumer)
|
||||
c.consumerOptions = append(c.consumerOptions, options)
|
||||
|
||||
return consumer, nil
|
||||
|
|
|
@ -12,6 +12,8 @@
|
|||
package rocksmq
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -22,6 +24,8 @@ type consumer struct {
|
|||
consumerName string
|
||||
options ConsumerOptions
|
||||
|
||||
startOnce sync.Once
|
||||
|
||||
msgMutex chan struct{}
|
||||
messageCh chan ConsumerMessage
|
||||
}
|
||||
|
@ -95,6 +99,10 @@ func (c *consumer) MsgMutex() chan struct{} {
|
|||
}
|
||||
|
||||
func (c *consumer) Chan() <-chan ConsumerMessage {
|
||||
c.startOnce.Do(func() {
|
||||
c.client.wg.Add(1)
|
||||
go c.client.consume(c)
|
||||
})
|
||||
return c.messageCh
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue