mirror of https://github.com/milvus-io/milvus.git
78 lines
1.9 KiB
Go
78 lines
1.9 KiB
Go
package rmq
|
|
|
|
import (
|
|
"github.com/cockroachdb/errors"
|
|
|
|
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/client"
|
|
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
|
|
"github.com/milvus-io/milvus/pkg/streaming/util/message"
|
|
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
|
|
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
|
|
)
|
|
|
|
var _ walimpls.ScannerImpls = (*scannerImpl)(nil)
|
|
|
|
// newScanner creates a new scanner.
|
|
func newScanner(
|
|
scannerName string,
|
|
exclude *rmqID,
|
|
consumer client.Consumer,
|
|
) *scannerImpl {
|
|
s := &scannerImpl{
|
|
ScannerHelper: helper.NewScannerHelper(scannerName),
|
|
exclude: exclude,
|
|
consumer: consumer,
|
|
msgChannel: make(chan message.ImmutableMessage, 1),
|
|
}
|
|
go s.executeConsume()
|
|
return s
|
|
}
|
|
|
|
// scannerImpl is the implementation of ScannerImpls for rmq.
|
|
type scannerImpl struct {
|
|
*helper.ScannerHelper
|
|
exclude *rmqID
|
|
consumer client.Consumer
|
|
msgChannel chan message.ImmutableMessage
|
|
}
|
|
|
|
// Chan returns the channel of message.
|
|
func (s *scannerImpl) Chan() <-chan message.ImmutableMessage {
|
|
return s.msgChannel
|
|
}
|
|
|
|
// Close the scanner, release the underlying resources.
|
|
// Return the error same with `Error`
|
|
func (s *scannerImpl) Close() error {
|
|
err := s.ScannerHelper.Close()
|
|
s.consumer.Close()
|
|
return err
|
|
}
|
|
|
|
// executeConsume consumes the message from the consumer.
|
|
func (s *scannerImpl) executeConsume() {
|
|
defer close(s.msgChannel)
|
|
for {
|
|
select {
|
|
case <-s.Context().Done():
|
|
s.Finish(nil)
|
|
return
|
|
case msg, ok := <-s.consumer.Chan():
|
|
if !ok {
|
|
s.Finish(errors.New("mq consumer unexpected channel closed"))
|
|
return
|
|
}
|
|
msgID := rmqID(msg.ID().(*server.RmqID).MessageID)
|
|
// record the last message id to avoid repeated consume message.
|
|
// and exclude message id should be filterred.
|
|
if s.exclude == nil || !s.exclude.EQ(msgID) {
|
|
s.msgChannel <- message.NewImmutableMesasge(
|
|
msgID,
|
|
msg.Payload(),
|
|
msg.Properties(),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|