diff --git a/trigger/pubsub/pubsub.go b/trigger/pubsub/pubsub.go index 06f87296..aea90ec0 100644 --- a/trigger/pubsub/pubsub.go +++ b/trigger/pubsub/pubsub.go @@ -7,6 +7,9 @@ import ( "cloud.google.com/go/pubsub" "golang.org/x/net/context" + "google.golang.org/api/option" + "google.golang.org/grpc" + "net" "github.com/rusenask/keel/provider" "github.com/rusenask/keel/types" @@ -37,9 +40,19 @@ type Opts struct { Providers map[string]provider.Provider } +// WithKeepAliveDialer - required so connections aren't dropped +// https://github.com/GoogleCloudPlatform/google-cloud-go/issues/500 +func WithKeepAliveDialer() grpc.DialOption { + return grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) { + d := net.Dialer{Timeout: timeout, KeepAlive: time.Duration(10 * time.Second)} + return d.Dial("tcp", addr) + }) +} + // NewPubsubSubscriber - create new pubsub subscriber func NewPubsubSubscriber(opts *Opts) (*PubsubSubscriber, error) { - client, err := pubsub.NewClient(context.Background(), opts.ProjectID) + clientOption := option.WithGRPCDialOption(WithKeepAliveDialer()) + client, err := pubsub.NewClient(context.Background(), opts.ProjectID, clientOption) if err != nil { return nil, err } @@ -113,7 +126,7 @@ func (s *PubsubSubscriber) Subscribe(ctx context.Context, topic, subscription st log.WithFields(log.Fields{ "topic": topic, "subscription": subscription, - }).Info("trigger.pubsub: subscribing for events...") + }).Info("trigger.pubsub: subscribing for events...") err = sub.Receive(ctx, s.callback) if err != nil { log.WithFields(log.Fields{