keep alive dialer

pull/22/head
Karolis Rusenas 2017-06-27 15:41:01 +01:00
parent 680773bdeb
commit 8d3fe8c90f
1 changed files with 15 additions and 2 deletions

View File

@ -7,6 +7,9 @@ import (
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
"google.golang.org/api/option"
"google.golang.org/grpc"
"net"
"github.com/rusenask/keel/provider"
"github.com/rusenask/keel/types"
@ -37,9 +40,19 @@ type Opts struct {
Providers map[string]provider.Provider
}
// WithKeepAliveDialer - required so connections aren't dropped
// https://github.com/GoogleCloudPlatform/google-cloud-go/issues/500
func WithKeepAliveDialer() grpc.DialOption {
return grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
d := net.Dialer{Timeout: timeout, KeepAlive: time.Duration(10 * time.Second)}
return d.Dial("tcp", addr)
})
}
// NewPubsubSubscriber - create new pubsub subscriber
func NewPubsubSubscriber(opts *Opts) (*PubsubSubscriber, error) {
client, err := pubsub.NewClient(context.Background(), opts.ProjectID)
clientOption := option.WithGRPCDialOption(WithKeepAliveDialer())
client, err := pubsub.NewClient(context.Background(), opts.ProjectID, clientOption)
if err != nil {
return nil, err
}
@ -113,7 +126,7 @@ func (s *PubsubSubscriber) Subscribe(ctx context.Context, topic, subscription st
log.WithFields(log.Fields{
"topic": topic,
"subscription": subscription,
}).Info("trigger.pubsub: subscribing for events...")
}).Info("trigger.pubsub: subscribing for events...")
err = sub.Receive(ctx, s.callback)
if err != nil {
log.WithFields(log.Fields{