Merge pull request #15144 from influxdata/er-nats
fix(tests): ensure NATS server port freepull/15148/head
commit
5040dc7036
|
@ -2,6 +2,7 @@ package launcher
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
|
@ -227,6 +228,7 @@ type Launcher struct {
|
|||
httpServer *nethttp.Server
|
||||
|
||||
natsServer *nats.Server
|
||||
natsPort int
|
||||
|
||||
scheduler *taskbackend.TickScheduler
|
||||
taskControlService taskbackend.TaskControlService
|
||||
|
@ -276,6 +278,11 @@ func (m *Launcher) URL() string {
|
|||
return fmt.Sprintf("http://127.0.0.1:%d", m.httpPort)
|
||||
}
|
||||
|
||||
// NatsURL returns the URL to connection to the NATS server.
|
||||
func (m *Launcher) NatsURL() string {
|
||||
return fmt.Sprintf("http://127.0.0.1:%d", m.natsPort)
|
||||
}
|
||||
|
||||
// Engine returns a reference to the storage engine. It should only be called
|
||||
// for end-to-end testing purposes.
|
||||
func (m *Launcher) Engine() *storage.Engine {
|
||||
|
@ -577,20 +584,49 @@ func (m *Launcher) run(ctx context.Context) (err error) {
|
|||
}
|
||||
|
||||
// NATS streaming server
|
||||
m.natsServer = nats.NewServer()
|
||||
natsOpts := nats.NewDefaultServerOptions()
|
||||
nextPort := int64(4222)
|
||||
|
||||
// Welcome to ghetto land. It doesn't seem possible to tell NATS to initialise
|
||||
// a random port. In some integration-style tests, this launcher gets initialised
|
||||
// multiple times, and sometimes the port from the previous instantiation is
|
||||
// still open.
|
||||
//
|
||||
// This atrocity checks if the port is free, and if it's not, moves on to the
|
||||
// next one.
|
||||
var total int
|
||||
for {
|
||||
l, err := net.Listen("tcp", fmt.Sprintf(":%d", nextPort))
|
||||
if err == nil {
|
||||
if err := l.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
nextPort++
|
||||
total++
|
||||
if total > 50 {
|
||||
return errors.New("unable to find free port for Nats server")
|
||||
}
|
||||
}
|
||||
natsOpts.Port = int(nextPort)
|
||||
m.natsServer = nats.NewServer(&natsOpts)
|
||||
m.natsPort = int(nextPort)
|
||||
|
||||
if err := m.natsServer.Open(); err != nil {
|
||||
m.logger.Error("failed to start nats streaming server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
publisher := nats.NewAsyncPublisher("nats-publisher")
|
||||
publisher := nats.NewAsyncPublisher(fmt.Sprintf("nats-publisher-%d", m.natsPort), m.NatsURL())
|
||||
if err := publisher.Open(); err != nil {
|
||||
m.logger.Error("failed to connect to streaming server", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
|
||||
subscriber := nats.NewQueueSubscriber("nats-subscriber")
|
||||
subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", m.natsPort), m.NatsURL())
|
||||
if err := subscriber.Open(); err != nil {
|
||||
m.logger.Error("failed to connect to streaming server", zap.Error(err))
|
||||
return err
|
||||
|
|
2
go.mod
2
go.mod
|
@ -50,7 +50,7 @@ require (
|
|||
github.com/matttproud/golang_protobuf_extensions v1.0.1
|
||||
github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae
|
||||
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect
|
||||
github.com/nats-io/gnatsd v1.3.0 // indirect
|
||||
github.com/nats-io/gnatsd v1.3.0
|
||||
github.com/nats-io/go-nats v1.7.0 // indirect
|
||||
github.com/nats-io/go-nats-streaming v0.4.0
|
||||
github.com/nats-io/nats-streaming-server v0.11.2
|
||||
|
|
|
@ -17,15 +17,16 @@ type AsyncPublisher struct {
|
|||
ClientID string
|
||||
Connection stan.Conn
|
||||
Logger *zap.Logger
|
||||
Addr string
|
||||
}
|
||||
|
||||
func NewAsyncPublisher(clientID string) *AsyncPublisher {
|
||||
return &AsyncPublisher{ClientID: clientID}
|
||||
func NewAsyncPublisher(clientID string, addr string) *AsyncPublisher {
|
||||
return &AsyncPublisher{ClientID: clientID, Addr: addr}
|
||||
}
|
||||
|
||||
// Open creates and maintains a connection to NATS server
|
||||
func (p *AsyncPublisher) Open() error {
|
||||
sc, err := stan.Connect(ServerName, p.ClientID)
|
||||
sc, err := stan.Connect(ServerName, p.ClientID, stan.NatsURL(p.Addr))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -3,7 +3,8 @@ package nats
|
|||
import (
|
||||
"errors"
|
||||
|
||||
stand "github.com/nats-io/nats-streaming-server/server"
|
||||
"github.com/nats-io/gnatsd/server"
|
||||
sserver "github.com/nats-io/nats-streaming-server/server"
|
||||
"github.com/nats-io/nats-streaming-server/stores"
|
||||
)
|
||||
|
||||
|
@ -13,15 +14,18 @@ var ErrNoNatsConnection = errors.New("nats connection has not been established.
|
|||
|
||||
// Server wraps a connection to a NATS streaming server
|
||||
type Server struct {
|
||||
Server *stand.StanServer
|
||||
serverOpts *server.Options
|
||||
Server *sserver.StanServer
|
||||
}
|
||||
|
||||
// Open starts a NATS streaming server
|
||||
func (s *Server) Open() error {
|
||||
opts := stand.GetDefaultOptions()
|
||||
// Streaming options
|
||||
opts := sserver.GetDefaultOptions()
|
||||
opts.StoreType = stores.TypeMemory
|
||||
opts.ID = ServerName
|
||||
server, err := stand.RunServerWithOpts(opts, nil)
|
||||
|
||||
server, err := sserver.RunServerWithOpts(opts, s.serverOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -36,7 +40,17 @@ func (s *Server) Close() {
|
|||
s.Server.Shutdown()
|
||||
}
|
||||
|
||||
// NewServer creates and returns a new server struct from the provided config
|
||||
func NewServer() *Server {
|
||||
return &Server{}
|
||||
// NewDefaultServerOptions returns the default NATS server options, allowing the
|
||||
// caller to override specific fields.
|
||||
func NewDefaultServerOptions() server.Options {
|
||||
return sserver.DefaultNatsServerOptions
|
||||
}
|
||||
|
||||
// NewServer creates a new streaming server with the provided server options.
|
||||
func NewServer(opts *server.Options) *Server {
|
||||
if opts == nil {
|
||||
o := NewDefaultServerOptions()
|
||||
opts = &o
|
||||
}
|
||||
return &Server{serverOpts: opts}
|
||||
}
|
||||
|
|
|
@ -12,15 +12,16 @@ type Subscriber interface {
|
|||
type QueueSubscriber struct {
|
||||
ClientID string
|
||||
Connection stan.Conn
|
||||
Addr string
|
||||
}
|
||||
|
||||
func NewQueueSubscriber(clientID string) *QueueSubscriber {
|
||||
return &QueueSubscriber{ClientID: clientID}
|
||||
func NewQueueSubscriber(clientID string, addr string) *QueueSubscriber {
|
||||
return &QueueSubscriber{ClientID: clientID, Addr: addr}
|
||||
}
|
||||
|
||||
// Open creates and maintains a connection to NATS server
|
||||
func (s *QueueSubscriber) Open() error {
|
||||
sc, err := stan.Connect(ServerName, s.ClientID)
|
||||
sc, err := stan.Connect(ServerName, s.ClientID, stan.NatsURL(s.Addr))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue