milvus/pkg/streaming/walimpls/impls/rmq/scanner.go

78 lines
1.9 KiB
Go

package rmq
import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/client"
"github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
)
var _ walimpls.ScannerImpls = (*scannerImpl)(nil)
// newScanner creates a new scanner.
func newScanner(
scannerName string,
exclude *rmqID,
consumer client.Consumer,
) *scannerImpl {
s := &scannerImpl{
ScannerHelper: helper.NewScannerHelper(scannerName),
exclude: exclude,
consumer: consumer,
msgChannel: make(chan message.ImmutableMessage, 1),
}
go s.executeConsume()
return s
}
// scannerImpl is the implementation of ScannerImpls for rmq.
type scannerImpl struct {
*helper.ScannerHelper
exclude *rmqID
consumer client.Consumer
msgChannel chan message.ImmutableMessage
}
// Chan returns the channel of message.
func (s *scannerImpl) Chan() <-chan message.ImmutableMessage {
return s.msgChannel
}
// Close the scanner, release the underlying resources.
// Return the error same with `Error`
func (s *scannerImpl) Close() error {
err := s.ScannerHelper.Close()
s.consumer.Close()
return err
}
// executeConsume consumes the message from the consumer.
func (s *scannerImpl) executeConsume() {
defer close(s.msgChannel)
for {
select {
case <-s.Context().Done():
s.Finish(nil)
return
case msg, ok := <-s.consumer.Chan():
if !ok {
s.Finish(errors.New("mq consumer unexpected channel closed"))
return
}
msgID := rmqID(msg.ID().(*server.RmqID).MessageID)
// record the last message id to avoid repeated consume message.
// and exclude message id should be filterred.
if s.exclude == nil || !s.exclude.EQ(msgID) {
s.msgChannel <- message.NewImmutableMesasge(
msgID,
msg.Payload(),
msg.Properties(),
)
}
}
}
}