fix: add logging to NATS streaming server (#21375)
parent
c8b76dce91
commit
4343d5ce07
|
@ -8,8 +8,9 @@
|
|||
### Bug Fixes
|
||||
|
||||
1. [21345](https://github.com/influxdata/influxdb/pull/21345): Deprecate the unsupported `PostSetupUser` API.
|
||||
1. [21356](https://github.com/influxdata/influxdb/pull/21356): disable MergeFiltersRule until it is more stable
|
||||
1. [21356](https://github.com/influxdata/influxdb/pull/21356): Disable MergeFiltersRule until it is more stable.
|
||||
1. [21369](https://github.com/influxdata/influxdb/pull/21369): Add limits to the `/api/v2/delete` endpoint for start and stop times with error messages.
|
||||
1. [21375](https://github.com/influxdata/influxdb/pull/21375): Add logging to NATS streaming server to help debug startup failures.
|
||||
|
||||
## v2.0.6 [2021-04-29]
|
||||
|
||||
|
|
|
@ -584,7 +584,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
natsOpts := nats.NewDefaultServerOptions()
|
||||
natsOpts.Port = opts.NatsPort
|
||||
natsOpts.MaxPayload = opts.NatsMaxPayloadBytes
|
||||
m.natsServer = nats.NewServer(&natsOpts)
|
||||
m.natsServer = nats.NewServer(&natsOpts, m.log.With(zap.String("service", "nats")))
|
||||
if err := m.natsServer.Open(); err != nil {
|
||||
m.log.Error("Failed to start nats streaming server", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
package nats
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
natsserver "github.com/nats-io/gnatsd/server"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ natsserver.Logger = (*zapLoggerAdapter)(nil)
|
||||
|
||||
// zapLogger
|
||||
type zapLoggerAdapter struct {
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Noticef(format string, v ...interface{}) {
|
||||
z.log.Debug(fmt.Sprintf(format, v...), zap.String("nats_level", "notice"))
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Debugf(format string, v ...interface{}) {
|
||||
z.log.Debug(fmt.Sprintf(format, v...), zap.String("nats_level", "debug"))
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Tracef(format string, v ...interface{}) {
|
||||
z.log.Debug(fmt.Sprintf(format, v...), zap.String("nats_level", "trace"))
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Fatalf(format string, v ...interface{}) {
|
||||
z.log.Fatal(fmt.Sprintf(format, v...), zap.String("nats_level", "fatal"))
|
||||
}
|
||||
|
||||
func (z *zapLoggerAdapter) Errorf(format string, v ...interface{}) {
|
||||
z.log.Error(fmt.Sprintf(format, v...), zap.String("nats_level", "error"))
|
||||
}
|
|
@ -6,6 +6,7 @@ import (
|
|||
"github.com/nats-io/gnatsd/server"
|
||||
sserver "github.com/nats-io/nats-streaming-server/server"
|
||||
"github.com/nats-io/nats-streaming-server/stores"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const ServerName = "platform"
|
||||
|
@ -23,6 +24,7 @@ var ErrNoNatsConnection = errors.New("nats connection has not been established.
|
|||
type Server struct {
|
||||
serverOpts *server.Options
|
||||
Server *sserver.StanServer
|
||||
logger server.Logger
|
||||
}
|
||||
|
||||
// Open starts a NATS streaming server
|
||||
|
@ -31,6 +33,7 @@ func (s *Server) Open() error {
|
|||
opts := sserver.GetDefaultOptions()
|
||||
opts.StoreType = stores.TypeMemory
|
||||
opts.ID = ServerName
|
||||
opts.CustomLogger = s.logger
|
||||
|
||||
server, err := sserver.RunServerWithOpts(opts, s.serverOpts)
|
||||
if err != nil {
|
||||
|
@ -54,10 +57,10 @@ func NewDefaultServerOptions() server.Options {
|
|||
}
|
||||
|
||||
// NewServer creates a new streaming server with the provided server options.
|
||||
func NewServer(opts *server.Options) *Server {
|
||||
func NewServer(opts *server.Options, log *zap.Logger) *Server {
|
||||
if opts == nil {
|
||||
o := NewDefaultServerOptions()
|
||||
opts = &o
|
||||
}
|
||||
return &Server{serverOpts: opts}
|
||||
return &Server{serverOpts: opts, logger: &zapLoggerAdapter{log}}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue