naming changed
parent
1ca5749ca7
commit
6e07105e56
|
@ -15,7 +15,8 @@ import (
|
||||||
log "github.com/Sirupsen/logrus"
|
log "github.com/Sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Subscriber struct {
|
// PubsubSubscriber is Google Cloud pubsub based subscriber
|
||||||
|
type PubsubSubscriber struct {
|
||||||
providers map[string]provider.Provider
|
providers map[string]provider.Provider
|
||||||
|
|
||||||
project string
|
project string
|
||||||
|
@ -30,18 +31,20 @@ type pubsubImplementer interface {
|
||||||
Receive(ctx context.Context, f func(context.Context, *Message)) error
|
Receive(ctx context.Context, f func(context.Context, *Message)) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Opts - subscriber options
|
||||||
type Opts struct {
|
type Opts struct {
|
||||||
ProjectID string
|
ProjectID string
|
||||||
Providers map[string]provider.Provider
|
Providers map[string]provider.Provider
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSubscriber(opts *Opts) (*Subscriber, error) {
|
// NewPubsubSubscriber - create new pubsub subscriber
|
||||||
|
func NewPubsubSubscriber(opts *Opts) (*PubsubSubscriber, error) {
|
||||||
client, err := pubsub.NewClient(context.Background(), opts.ProjectID)
|
client, err := pubsub.NewClient(context.Background(), opts.ProjectID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Subscriber{
|
return &PubsubSubscriber{
|
||||||
project: opts.ProjectID,
|
project: opts.ProjectID,
|
||||||
providers: opts.Providers,
|
providers: opts.Providers,
|
||||||
client: client,
|
client: client,
|
||||||
|
@ -54,7 +57,7 @@ type Message struct {
|
||||||
Tag string `json:"tag,omitempty"`
|
Tag string `json:"tag,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) ensureTopic(ctx context.Context, id string) error {
|
func (s *PubsubSubscriber) ensureTopic(ctx context.Context, id string) error {
|
||||||
topic := s.client.Topic(id)
|
topic := s.client.Topic(id)
|
||||||
exists, err := topic.Exists(ctx)
|
exists, err := topic.Exists(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -72,7 +75,7 @@ func (s *Subscriber) ensureTopic(ctx context.Context, id string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) ensureSubscription(ctx context.Context, subscriptionID, topicID string) error {
|
func (s *PubsubSubscriber) ensureSubscription(ctx context.Context, subscriptionID, topicID string) error {
|
||||||
sub := s.client.Subscription(subscriptionID)
|
sub := s.client.Subscription(subscriptionID)
|
||||||
exists, err := sub.Exists(ctx)
|
exists, err := sub.Exists(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -93,8 +96,8 @@ func (s *Subscriber) ensureSubscription(ctx context.Context, subscriptionID, top
|
||||||
return fmt.Errorf("failed to create subscription %s, error: %s", subscriptionID, err)
|
return fmt.Errorf("failed to create subscription %s, error: %s", subscriptionID, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe - initiate subscriber
|
// Subscribe - initiate PubsubSubscriber
|
||||||
func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string) error {
|
func (s *PubsubSubscriber) Subscribe(ctx context.Context, topic, subscription string) error {
|
||||||
// ensuring that topic exists
|
// ensuring that topic exists
|
||||||
err := s.ensureTopic(ctx, topic)
|
err := s.ensureTopic(ctx, topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -110,7 +113,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string)
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
"topic": topic,
|
"topic": topic,
|
||||||
"subscription": subscription,
|
"subscription": subscription,
|
||||||
}).Info("trigger.pubsub: subscribing for events...")
|
}).Info("trigger.pubsub: subscribing for events...")
|
||||||
err = sub.Receive(ctx, s.callback)
|
err = sub.Receive(ctx, s.callback)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithFields(log.Fields{
|
log.WithFields(log.Fields{
|
||||||
|
@ -120,7 +123,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Subscriber) callback(ctx context.Context, msg *pubsub.Message) {
|
func (s *PubsubSubscriber) callback(ctx context.Context, msg *pubsub.Message) {
|
||||||
// disable ack, useful for testing
|
// disable ack, useful for testing
|
||||||
if !s.disableAck {
|
if !s.disableAck {
|
||||||
defer msg.Ack()
|
defer msg.Ack()
|
||||||
|
|
|
@ -35,7 +35,7 @@ func fakeDoneFunc(id string, done bool) {
|
||||||
func TestCallback(t *testing.T) {
|
func TestCallback(t *testing.T) {
|
||||||
|
|
||||||
fp := &fakeProvider{}
|
fp := &fakeProvider{}
|
||||||
sub := &Subscriber{disableAck: true, providers: map[string]provider.Provider{
|
sub := &PubsubSubscriber{disableAck: true, providers: map[string]provider.Provider{
|
||||||
fp.GetName(): fp,
|
fp.GetName(): fp,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue