From 080943a9f5620c3de5db5a732f350f995cd1ed45 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 13 Sep 2019 19:33:19 +0100 Subject: [PATCH] fix(tests): ensure NATS server port free --- cmd/influxd/launcher/launcher.go | 42 +++++++++++++++++++++++++++++--- go.mod | 2 +- nats/publisher.go | 7 +++--- nats/server.go | 28 +++++++++++++++------ nats/subscriber.go | 7 +++--- 5 files changed, 69 insertions(+), 17 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 2ba5f68708..56315d02f6 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -2,6 +2,7 @@ package launcher import ( "context" + "errors" "fmt" "io" "math" @@ -227,6 +228,7 @@ type Launcher struct { httpServer *nethttp.Server natsServer *nats.Server + natsPort int scheduler *taskbackend.TickScheduler taskControlService taskbackend.TaskControlService @@ -276,6 +278,11 @@ func (m *Launcher) URL() string { return fmt.Sprintf("http://127.0.0.1:%d", m.httpPort) } +// NatsURL returns the URL to connection to the NATS server. +func (m *Launcher) NatsURL() string { + return fmt.Sprintf("http://127.0.0.1:%d", m.natsPort) +} + // Engine returns a reference to the storage engine. It should only be called // for end-to-end testing purposes. func (m *Launcher) Engine() *storage.Engine { @@ -577,20 +584,49 @@ func (m *Launcher) run(ctx context.Context) (err error) { } // NATS streaming server - m.natsServer = nats.NewServer() + natsOpts := nats.NewDefaultServerOptions() + nextPort := int64(4222) + + // Welcome to ghetto land. It doesn't seem possible to tell NATS to initialise + // a random port. In some integration-style tests, this launcher gets initialised + // multiple times, and sometimes the port from the previous instantiation is + // still open. + // + // This atrocity checks if the port is free, and if it's not, moves on to the + // next one. + var total int + for { + l, err := net.Listen("tcp", fmt.Sprintf(":%d", nextPort)) + if err == nil { + if err := l.Close(); err != nil { + return err + } + break + } + time.Sleep(time.Second) + nextPort++ + total++ + if total > 50 { + return errors.New("unable to find free port for Nats server") + } + } + natsOpts.Port = int(nextPort) + m.natsServer = nats.NewServer(&natsOpts) + m.natsPort = int(nextPort) + if err := m.natsServer.Open(); err != nil { m.logger.Error("failed to start nats streaming server", zap.Error(err)) return err } - publisher := nats.NewAsyncPublisher("nats-publisher") + publisher := nats.NewAsyncPublisher(fmt.Sprintf("nats-publisher-%d", m.natsPort), m.NatsURL()) if err := publisher.Open(); err != nil { m.logger.Error("failed to connect to streaming server", zap.Error(err)) return err } // TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed. - subscriber := nats.NewQueueSubscriber("nats-subscriber") + subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", m.natsPort), m.NatsURL()) if err := subscriber.Open(); err != nil { m.logger.Error("failed to connect to streaming server", zap.Error(err)) return err diff --git a/go.mod b/go.mod index 53b0493e5f..927a461143 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.1 github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect - github.com/nats-io/gnatsd v1.3.0 // indirect + github.com/nats-io/gnatsd v1.3.0 github.com/nats-io/go-nats v1.7.0 // indirect github.com/nats-io/go-nats-streaming v0.4.0 github.com/nats-io/nats-streaming-server v0.11.2 diff --git a/nats/publisher.go b/nats/publisher.go index 62ba8c7925..945523e155 100644 --- a/nats/publisher.go +++ b/nats/publisher.go @@ -17,15 +17,16 @@ type AsyncPublisher struct { ClientID string Connection stan.Conn Logger *zap.Logger + Addr string } -func NewAsyncPublisher(clientID string) *AsyncPublisher { - return &AsyncPublisher{ClientID: clientID} +func NewAsyncPublisher(clientID string, addr string) *AsyncPublisher { + return &AsyncPublisher{ClientID: clientID, Addr: addr} } // Open creates and maintains a connection to NATS server func (p *AsyncPublisher) Open() error { - sc, err := stan.Connect(ServerName, p.ClientID) + sc, err := stan.Connect(ServerName, p.ClientID, stan.NatsURL(p.Addr)) if err != nil { return err } diff --git a/nats/server.go b/nats/server.go index 5bc3a77f67..7e0f139ef9 100644 --- a/nats/server.go +++ b/nats/server.go @@ -3,7 +3,8 @@ package nats import ( "errors" - stand "github.com/nats-io/nats-streaming-server/server" + "github.com/nats-io/gnatsd/server" + sserver "github.com/nats-io/nats-streaming-server/server" "github.com/nats-io/nats-streaming-server/stores" ) @@ -13,15 +14,18 @@ var ErrNoNatsConnection = errors.New("nats connection has not been established. // Server wraps a connection to a NATS streaming server type Server struct { - Server *stand.StanServer + serverOpts *server.Options + Server *sserver.StanServer } // Open starts a NATS streaming server func (s *Server) Open() error { - opts := stand.GetDefaultOptions() + // Streaming options + opts := sserver.GetDefaultOptions() opts.StoreType = stores.TypeMemory opts.ID = ServerName - server, err := stand.RunServerWithOpts(opts, nil) + + server, err := sserver.RunServerWithOpts(opts, s.serverOpts) if err != nil { return err } @@ -36,7 +40,17 @@ func (s *Server) Close() { s.Server.Shutdown() } -// NewServer creates and returns a new server struct from the provided config -func NewServer() *Server { - return &Server{} +// NewDefaultServerOptions returns the default NATS server options, allowing the +// caller to override specific fields. +func NewDefaultServerOptions() server.Options { + return sserver.DefaultNatsServerOptions +} + +// NewServer creates a new streaming server with the provided server options. +func NewServer(opts *server.Options) *Server { + if opts == nil { + o := NewDefaultServerOptions() + opts = &o + } + return &Server{serverOpts: opts} } diff --git a/nats/subscriber.go b/nats/subscriber.go index bd174ed54c..b30bd3ebb4 100644 --- a/nats/subscriber.go +++ b/nats/subscriber.go @@ -12,15 +12,16 @@ type Subscriber interface { type QueueSubscriber struct { ClientID string Connection stan.Conn + Addr string } -func NewQueueSubscriber(clientID string) *QueueSubscriber { - return &QueueSubscriber{ClientID: clientID} +func NewQueueSubscriber(clientID string, addr string) *QueueSubscriber { + return &QueueSubscriber{ClientID: clientID, Addr: addr} } // Open creates and maintains a connection to NATS server func (s *QueueSubscriber) Open() error { - sc, err := stan.Connect(ServerName, s.ClientID) + sc, err := stan.Connect(ServerName, s.ClientID, stan.NatsURL(s.Addr)) if err != nil { return err }