2018-08-29 23:15:39 +00:00
|
|
|
package nats
|
|
|
|
|
|
|
|
import (
|
|
|
|
stan "github.com/nats-io/go-nats-streaming"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Subscriber interface {
|
|
|
|
// Subscribe listens to a channel, handling messages with Handler
|
|
|
|
Subscribe(subject, group string, handler Handler) error
|
|
|
|
}
|
|
|
|
|
|
|
|
type QueueSubscriber struct {
|
|
|
|
ClientID string
|
|
|
|
Connection stan.Conn
|
2019-09-13 18:33:19 +00:00
|
|
|
Addr string
|
2018-08-29 23:15:39 +00:00
|
|
|
}
|
|
|
|
|
2019-09-13 18:33:19 +00:00
|
|
|
func NewQueueSubscriber(clientID string, addr string) *QueueSubscriber {
|
|
|
|
return &QueueSubscriber{ClientID: clientID, Addr: addr}
|
2018-08-29 23:15:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Open creates and maintains a connection to NATS server
|
|
|
|
func (s *QueueSubscriber) Open() error {
|
2019-09-13 18:33:19 +00:00
|
|
|
sc, err := stan.Connect(ServerName, s.ClientID, stan.NatsURL(s.Addr))
|
2018-08-29 23:15:39 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
s.Connection = sc
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type messageHandler struct {
|
|
|
|
handler Handler
|
|
|
|
sub subscription
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mh *messageHandler) handle(m *stan.Msg) {
|
|
|
|
mh.handler.Process(mh.sub, &message{m: m})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *QueueSubscriber) Subscribe(subject, group string, handler Handler) error {
|
|
|
|
if s.Connection == nil {
|
|
|
|
return ErrNoNatsConnection
|
|
|
|
}
|
|
|
|
|
|
|
|
mh := messageHandler{handler: handler}
|
|
|
|
sub, err := s.Connection.QueueSubscribe(subject, group, mh.handle, stan.DurableName(group), stan.SetManualAckMode(), stan.MaxInflight(25))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
mh.sub = subscription{sub: sub}
|
|
|
|
return nil
|
|
|
|
}
|