diff --git a/trigger/pubsub/pubsub.go b/trigger/pubsub/pubsub.go index 5c24df0c..657f48e1 100644 --- a/trigger/pubsub/pubsub.go +++ b/trigger/pubsub/pubsub.go @@ -2,6 +2,7 @@ package pubsub import ( "encoding/json" + "fmt" "time" "cloud.google.com/go/pubsub" @@ -17,10 +18,8 @@ import ( type Subscriber struct { providers map[string]provider.Provider - project string - topic string - subscription string - disableAck bool + project string + disableAck bool client *pubsub.Client } @@ -32,24 +31,20 @@ type pubsubImplementer interface { } type Opts struct { - Project string - Topic string - Subscription string - Providers map[string]provider.Provider + ProjectID string + Providers map[string]provider.Provider } func NewSubscriber(opts *Opts) (*Subscriber, error) { - client, err := pubsub.NewClient(context.Background(), opts.Project) + client, err := pubsub.NewClient(context.Background(), opts.ProjectID) if err != nil { return nil, err } return &Subscriber{ - project: opts.Project, - topic: opts.Topic, - subscription: opts.Subscription, - providers: opts.Providers, - client: client, + project: opts.ProjectID, + providers: opts.Providers, + client: client, }, nil } @@ -59,12 +54,62 @@ type Message struct { Tag string `json:"tag,omitempty"` } +func (s *Subscriber) ensureTopic(ctx context.Context, id string) error { + topic := s.client.Topic(id) + exists, err := topic.Exists(ctx) + if err != nil { + return fmt.Errorf("failed to check whether topic exists, error: %s", err) + } + + if exists { + log.WithFields(log.Fields{ + "topic": id, + }).Info("trigger.pubsub: topic exists") + return nil + } + + _, err = s.client.CreateTopic(ctx, id) + return err +} + +func (s *Subscriber) ensureSubscription(ctx context.Context, subscriptionID, topicID string) error { + sub := s.client.Subscription(subscriptionID) + exists, err := sub.Exists(ctx) + if err != nil { + return fmt.Errorf("failed to check whether subscription exists, error: %s", err) + } + if exists { + log.WithFields(log.Fields{ + "subscription": subscriptionID, + "topic": topicID, + }).Info("trigger.pubsub: subscription exists") + return nil + } + + _, err = s.client.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{ + Topic: s.client.Topic(topicID), + AckDeadline: 10 * time.Second, + }) + return fmt.Errorf("failed to create subscription %s, error: %s", subscriptionID, err) +} + // Subscribe - initiate subscriber -func (s *Subscriber) Subscribe(ctx context.Context) error { - sub := s.client.Subscription(s.subscription) +func (s *Subscriber) Subscribe(ctx context.Context, topic, subscription string) error { + // ensuring that topic exists + err := s.ensureTopic(ctx, topic) + if err != nil { + return err + } + + err = s.ensureSubscription(ctx, subscription, topic) + if err != nil { + return err + } + + sub := s.client.Subscription(subscription) log.Info("trigger.pubsub: subscribing for events...") // err := sub.Receive(ctx, s.callback) - err := sub.Receive(ctx, s.callback) + err = sub.Receive(ctx, s.callback) if err != nil { log.WithFields(log.Fields{ "error": err,