refactor: clean up launcher code (#22330)

* Modify launcher shutdown to reduce field count
* Move tracing setup into its own method
* Move metastore setup into its own method
* Delete unused getter methods
* Fix imports
pull/22335/head
Daniel Moran 2021-08-30 14:46:25 -04:00 committed by GitHub
parent 4dd2d7cc7f
commit 6bb95ae6fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 207 additions and 210 deletions

View File

@ -5,7 +5,6 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
nethttp "net/http"
"os"
@ -77,12 +76,8 @@ import (
telegrafservice "github.com/influxdata/influxdb/v2/telegraf/service"
"github.com/influxdata/influxdb/v2/telemetry"
"github.com/influxdata/influxdb/v2/tenant"
// needed for tsm1
_ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
// needed for tsi1
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
_ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" // needed for tsm1
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" // needed for tsi1
authv1 "github.com/influxdata/influxdb/v2/v1/authorization"
iqlcoordinator "github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/v1/services/meta"
@ -109,16 +104,22 @@ const (
JaegerTracing = "jaeger"
)
type labeledCloser struct {
label string
closer func(context.Context) error
}
// Launcher represents the main program execution.
type Launcher struct {
wg sync.WaitGroup
cancel func()
doneChan <-chan struct{}
closers []labeledCloser
flushers flushers
flagger feature.Flagger
boltClient *bolt.Client
kvStore kv.SchemaStore
kvStore kv.Store
kvService *kv.Service
sqlStore *sqlite.SqlStore
@ -129,17 +130,11 @@ type Launcher struct {
queryController *control.Controller
httpPort int
httpServer *nethttp.Server
tlsEnabled bool
natsServer *nats.Server
natsPort int
scheduler stoppingScheduler
executor *executor.Executor
taskControlService taskbackend.TaskControlService
jaegerTracerCloser io.Closer
log *zap.Logger
reg *prom.Registry
@ -163,11 +158,6 @@ func (m *Launcher) Registry() *prom.Registry {
return m.reg
}
// NatsURL returns the URL to connection to the NATS server.
func (m *Launcher) NatsURL() string {
return fmt.Sprintf("http://127.0.0.1:%d", m.natsPort)
}
// Engine returns a reference to the storage engine. It should only be called
// for end-to-end testing purposes.
func (m *Launcher) Engine() Engine {
@ -178,51 +168,18 @@ func (m *Launcher) Engine() Engine {
func (m *Launcher) Shutdown(ctx context.Context) error {
var errs []string
if err := m.httpServer.Shutdown(ctx); err != nil {
m.log.Error("Failed to close HTTP server", zap.Error(err))
// Shut down subsystems in the reverse order of their registration.
for i := len(m.closers); i > 0; i-- {
lc := m.closers[i-1]
m.log.Info("Stopping subsystem", zap.String("subsystem", lc.label))
if err := lc.closer(ctx); err != nil {
m.log.Error("Failed to stop subsystem", zap.String("subsystem", lc.label), zap.Error(err))
errs = append(errs, err.Error())
}
m.log.Info("Stopping", zap.String("service", "task"))
m.scheduler.Stop()
m.log.Info("Stopping", zap.String("service", "nats"))
m.natsServer.Close()
m.log.Info("Stopping", zap.String("service", "bolt"))
if err := m.boltClient.Close(); err != nil {
m.log.Error("Failed closing bolt", zap.Error(err))
errs = append(errs, err.Error())
}
m.log.Info("Stopping", zap.String("service", "sqlite"))
if err := m.sqlStore.Close(); err != nil {
m.log.Error("Failed closing sqlite", zap.Error(err))
errs = append(errs, err.Error())
}
m.log.Info("Stopping", zap.String("service", "query"))
if err := m.queryController.Shutdown(ctx); err != nil && err != context.Canceled {
m.log.Error("Failed closing query service", zap.Error(err))
errs = append(errs, err.Error())
}
m.log.Info("Stopping", zap.String("service", "storage-engine"))
if err := m.engine.Close(); err != nil {
m.log.Error("Failed to close engine", zap.Error(err))
errs = append(errs, err.Error())
}
m.wg.Wait()
if m.jaegerTracerCloser != nil {
if err := m.jaegerTracerCloser.Close(); err != nil {
m.log.Error("Failed to closer Jaeger tracer", zap.Error(err))
errs = append(errs, err.Error())
}
}
// N.B. We ignore any errors here because Sync is known to fail with EINVAL
// when logging to Stdout on certain OS's.
//
@ -253,113 +210,17 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
zap.String("commit", info.Commit),
zap.String("build_date", info.Date),
)
switch opts.TracingType {
case LogTracing:
m.log.Info("Tracing via zap logging")
tracer := pzap.NewTracer(m.log, snowflake.NewIDGenerator())
opentracing.SetGlobalTracer(tracer)
case JaegerTracing:
m.log.Info("Tracing via Jaeger")
cfg, err := jaegerconfig.FromEnv()
if err != nil {
m.log.Error("Failed to get Jaeger client config from environment variables", zap.Error(err))
break
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
m.log.Error("Failed to instantiate Jaeger tracer", zap.Error(err))
break
}
opentracing.SetGlobalTracer(tracer)
m.jaegerTracerCloser = closer
}
m.boltClient = bolt.NewClient(m.log.With(zap.String("service", "bolt")))
m.boltClient.Path = opts.BoltPath
if err := m.boltClient.Open(ctx); err != nil {
m.log.Error("Failed opening bolt", zap.Error(err))
return err
}
var flushers flushers
switch opts.StoreType {
case BoltStore:
m.log.Warn("Using --store=bolt is deprecated. Use --store=disk instead.")
fallthrough
case DiskStore:
kvStore := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), opts.BoltPath)
kvStore.WithDB(m.boltClient.DB())
m.kvStore = kvStore
// If a sqlite-path is not specified, store sqlite db in the same directory as bolt with the default filename.
if opts.SqLitePath == "" {
opts.SqLitePath = filepath.Dir(opts.BoltPath) + "/" + sqlite.DefaultFilename
}
sqlStore, err := sqlite.NewSqlStore(opts.SqLitePath, m.log.With(zap.String("service", "sqlite")))
if err != nil {
m.log.Error("Failed opening sqlite store", zap.Error(err))
return err
}
m.sqlStore = sqlStore
if opts.Testing {
flushers = append(flushers, kvStore, sqlStore)
}
case MemoryStore:
kvStore := inmem.NewKVStore()
m.kvStore = kvStore
sqlStore, err := sqlite.NewSqlStore(sqlite.InmemPath, m.log.With(zap.String("service", "sqlite")))
if err != nil {
m.log.Error("Failed opening sqlite store", zap.Error(err))
return err
}
m.sqlStore = sqlStore
if opts.Testing {
flushers = append(flushers, kvStore, sqlStore)
}
default:
err := fmt.Errorf("unknown store type %s; expected disk or memory", opts.StoreType)
m.log.Error("Failed opening metadata store", zap.Error(err))
return err
}
boltMigrator, err := migration.NewMigrator(
m.log.With(zap.String("service", "bolt migrations")),
m.kvStore,
all.Migrations[:]...,
)
if err != nil {
m.log.Error("Failed to initialize kv migrator", zap.Error(err))
return err
}
// apply migrations to the bolt metadata store
if err := boltMigrator.Up(ctx); err != nil {
m.log.Error("Failed to apply bolt migrations", zap.Error(err))
return err
}
sqliteMigrator := sqlite.NewMigrator(m.sqlStore, m.log.With(zap.String("service", "sqlite migrations")))
// apply migrations to the sqlite metadata store
if err := sqliteMigrator.Up(ctx, sqliteMigrations.All); err != nil {
m.log.Error("Failed to apply sqlite migrations", zap.Error(err))
return err
}
m.initTracing(opts)
m.reg = prom.NewRegistry(m.log.With(zap.String("service", "prom_registry")))
m.reg.MustRegister(
prometheus.NewGoCollector(),
infprom.NewInfluxCollector(m.boltClient, info),
)
m.reg.MustRegister(m.boltClient)
m.reg.MustRegister(prometheus.NewGoCollector())
// Open KV and SQL stores.
procID, err := m.openMetaStores(ctx, opts)
if err != nil {
return err
}
m.reg.MustRegister(infprom.NewInfluxCollector(procID, info))
tenantStore := tenant.NewStore(m.kvStore)
ts := tenant.NewSystem(tenantStore, m.log.With(zap.String("store", "new")), m.reg, metric.WithSuffix("new"))
@ -430,7 +291,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
opts.StorageConfig,
storage.WithMetaClient(metaClient),
)
flushers = append(flushers, engine)
m.flushers = append(m.flushers, engine)
m.engine = engine
} else {
// check for 2.x data / state from a prior 2.x
@ -449,6 +310,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
m.log.Error("Failed to open engine", zap.Error(err))
return err
}
m.closers = append(m.closers, labeledCloser{
label: "engine",
closer: func(context.Context) error {
return m.engine.Close()
},
})
// The Engine's metrics must be registered after it opens.
m.reg.MustRegister(m.engine.PrometheusCollectors()...)
@ -489,6 +356,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
m.log.Error("Failed to create query controller", zap.Error(err))
return err
}
m.closers = append(m.closers, labeledCloser{
label: "query",
closer: func(ctx context.Context) error {
return m.queryController.Shutdown(ctx)
},
})
m.reg.MustRegister(m.queryController.PrometheusCollectors()...)
@ -537,6 +410,13 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
if err != nil {
m.log.Fatal("could not start task scheduler", zap.Error(err))
}
m.closers = append(m.closers, labeledCloser{
label: "task",
closer: func(context.Context) error {
sch.Stop()
return nil
},
})
m.reg.MustRegister(sm.PrometheusCollectors()...)
}
@ -549,7 +429,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
executor)
taskSvc = middleware.New(combinedTaskService, taskCoord)
m.taskControlService = combinedTaskService
if err := taskbackend.TaskNotifyCoordinatorOfExisting(
ctx,
taskSvc,
@ -627,21 +506,28 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
natsOpts := nats.NewDefaultServerOptions()
natsOpts.Port = opts.NatsPort
natsOpts.MaxPayload = opts.NatsMaxPayloadBytes
m.natsServer = nats.NewServer(&natsOpts, m.log.With(zap.String("service", "nats")))
if err := m.natsServer.Open(); err != nil {
natsServer := nats.NewServer(&natsOpts, m.log.With(zap.String("service", "nats")))
if err := natsServer.Open(); err != nil {
m.log.Error("Failed to start nats streaming server", zap.Error(err))
return err
}
m.closers = append(m.closers, labeledCloser{
label: "nats",
closer: func(context.Context) error {
natsServer.Close()
return nil
},
})
// If a random port was used, the opts will be updated to include the selected value.
m.natsPort = natsOpts.Port
publisher := nats.NewAsyncPublisher(m.log, fmt.Sprintf("nats-publisher-%d", m.natsPort), m.NatsURL())
natsURL := fmt.Sprintf("http://127.0.0.1:%d", natsOpts.Port)
publisher := nats.NewAsyncPublisher(m.log, fmt.Sprintf("nats-publisher-%d", natsOpts.Port), natsURL)
if err := publisher.Open(); err != nil {
m.log.Error("Failed to connect to streaming server", zap.Error(err))
return err
}
// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", m.natsPort), m.NatsURL())
subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", natsOpts.Port), natsURL)
if err := subscriber.Open(); err != nil {
m.log.Error("Failed to connect to streaming server", zap.Error(err))
return err
@ -1018,7 +904,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
}
// If we are in testing mode we allow all data to be flushed and removed.
if opts.Testing {
httpHandler = http.DebugFlush(ctx, httpHandler, flushers)
httpHandler = http.DebugFlush(ctx, httpHandler, m.flushers)
}
if !opts.ReportingDisabled {
@ -1031,13 +917,144 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
return nil
}
// initTracing sets up the global tracer for the influxd process.
// Any errors encountered during setup are logged, but don't crash the process.
func (m *Launcher) initTracing(opts *InfluxdOpts) {
switch opts.TracingType {
case LogTracing:
m.log.Info("Tracing via zap logging")
opentracing.SetGlobalTracer(pzap.NewTracer(m.log, snowflake.NewIDGenerator()))
case JaegerTracing:
m.log.Info("Tracing via Jaeger")
cfg, err := jaegerconfig.FromEnv()
if err != nil {
m.log.Error("Failed to get Jaeger client config from environment variables", zap.Error(err))
return
}
tracer, closer, err := cfg.NewTracer()
if err != nil {
m.log.Error("Failed to instantiate Jaeger tracer", zap.Error(err))
return
}
m.closers = append(m.closers, labeledCloser{
label: "Jaeger tracer",
closer: func(context.Context) error {
return closer.Close()
},
})
opentracing.SetGlobalTracer(tracer)
}
}
// openMetaStores opens the embedded DBs used to store metadata about influxd resources, migrating them to
// the latest schema expected by the server.
// On success, a unique ID is returned to be used as an identifier for the influxd instance in telemetry.
func (m *Launcher) openMetaStores(ctx context.Context, opts *InfluxdOpts) (string, error) {
type flushableKVStore interface {
kv.SchemaStore
http.Flusher
}
var kvStore flushableKVStore
var sqlStore *sqlite.SqlStore
var procID string
var err error
switch opts.StoreType {
case BoltStore:
m.log.Warn("Using --store=bolt is deprecated. Use --store=disk instead.")
fallthrough
case DiskStore:
boltClient := bolt.NewClient(m.log.With(zap.String("service", "bolt")))
boltClient.Path = opts.BoltPath
if err := boltClient.Open(ctx); err != nil {
m.log.Error("Failed opening bolt", zap.Error(err))
return "", err
}
m.closers = append(m.closers, labeledCloser{
label: "bolt",
closer: func(context.Context) error {
return boltClient.Close()
},
})
m.reg.MustRegister(boltClient)
procID = boltClient.ID().String()
boltKV := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), opts.BoltPath)
boltKV.WithDB(boltClient.DB())
kvStore = boltKV
// If a sqlite-path is not specified, store sqlite db in the same directory as bolt with the default filename.
if opts.SqLitePath == "" {
opts.SqLitePath = filepath.Dir(opts.BoltPath) + "/" + sqlite.DefaultFilename
}
sqlStore, err = sqlite.NewSqlStore(opts.SqLitePath, m.log.With(zap.String("service", "sqlite")))
if err != nil {
m.log.Error("Failed opening sqlite store", zap.Error(err))
return "", err
}
case MemoryStore:
kvStore = inmem.NewKVStore()
sqlStore, err = sqlite.NewSqlStore(sqlite.InmemPath, m.log.With(zap.String("service", "sqlite")))
if err != nil {
m.log.Error("Failed opening sqlite store", zap.Error(err))
return "", err
}
default:
err := fmt.Errorf("unknown store type %s; expected disk or memory", opts.StoreType)
m.log.Error("Failed opening metadata store", zap.Error(err))
return "", err
}
m.closers = append(m.closers, labeledCloser{
label: "sqlite",
closer: func(context.Context) error {
return sqlStore.Close()
},
})
if opts.Testing {
m.flushers = append(m.flushers, kvStore, sqlStore)
}
boltMigrator, err := migration.NewMigrator(
m.log.With(zap.String("service", "bolt migrations")),
kvStore,
all.Migrations[:]...,
)
if err != nil {
m.log.Error("Failed to initialize kv migrator", zap.Error(err))
return "", err
}
// apply migrations to the bolt metadata store
if err := boltMigrator.Up(ctx); err != nil {
m.log.Error("Failed to apply bolt migrations", zap.Error(err))
return "", err
}
sqliteMigrator := sqlite.NewMigrator(sqlStore, m.log.With(zap.String("service", "sqlite migrations")))
// apply migrations to the sqlite metadata store
if err := sqliteMigrator.Up(ctx, sqliteMigrations.All); err != nil {
m.log.Error("Failed to apply sqlite migrations", zap.Error(err))
return "", err
}
m.kvStore = kvStore
m.sqlStore = sqlStore
return procID, nil
}
// runHTTP configures and launches a listener for incoming HTTP(S) requests.
// The listener is run in a separate goroutine. If it fails to start up, it
// will cancel the launcher.
func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogger *zap.Logger) error {
log := m.log.With(zap.String("service", "tcp-listener"))
m.httpServer = &nethttp.Server{
httpServer := &nethttp.Server{
Addr: opts.HttpBindAddress,
Handler: handler,
ReadHeaderTimeout: opts.HttpReadHeaderTimeout,
@ -1046,6 +1063,10 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge
IdleTimeout: opts.HttpIdleTimeout,
ErrorLog: zap.NewStdLog(httpLogger),
}
m.closers = append(m.closers, labeledCloser{
label: "HTTP server",
closer: httpServer.Shutdown,
})
ln, err := net.Listen("tcp", opts.HttpBindAddress)
if err != nil {
@ -1067,7 +1088,7 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge
defer m.wg.Done()
log.Info("Listening", zap.String("transport", "http"), zap.String("addr", opts.HttpBindAddress), zap.Int("port", m.httpPort))
if err := m.httpServer.Serve(ln); err != nethttp.ErrServerClosed {
if err := httpServer.Serve(ln); err != nethttp.ErrServerClosed {
log.Error("Failed to serve HTTP", zap.Error(err))
m.cancel()
}
@ -1117,7 +1138,7 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge
}
}
m.httpServer.TLSConfig = &tls.Config{
httpServer.TLSConfig = &tls.Config{
CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
PreferServerCipherSuites: !useStrictCiphers,
MinVersion: tlsMinVersion,
@ -1128,7 +1149,7 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge
defer m.wg.Done()
log.Info("Listening", zap.String("transport", "https"), zap.String("addr", opts.HttpBindAddress), zap.Int("port", m.httpPort))
if err := m.httpServer.ServeTLS(ln, opts.HttpTLSCert, opts.HttpTLSKey); err != nethttp.ErrServerClosed {
if err := httpServer.ServeTLS(ln, opts.HttpTLSCert, opts.HttpTLSKey); err != nethttp.ErrServerClosed {
log.Error("Failed to serve HTTPS", zap.Error(err))
m.cancel()
}
@ -1216,11 +1237,6 @@ func (m *Launcher) UserService() platform.UserService {
return m.apibackend.UserService
}
// UserResourceMappingService returns the internal user resource mapping service.
func (m *Launcher) UserResourceMappingService() platform.UserResourceMappingService {
return m.apibackend.UserResourceMappingService
}
// AuthorizationService returns the internal authorization service.
func (m *Launcher) AuthorizationService() platform.AuthorizationService {
return m.apibackend.AuthorizationService
@ -1235,26 +1251,11 @@ func (m *Launcher) SecretService() platform.SecretService {
return m.apibackend.SecretService
}
// TaskService returns the internal task service.
func (m *Launcher) TaskService() taskmodel.TaskService {
return m.apibackend.TaskService
}
// TaskControlService returns the internal store service.
func (m *Launcher) TaskControlService() taskbackend.TaskControlService {
return m.taskControlService
}
// CheckService returns the internal check service.
func (m *Launcher) CheckService() platform.CheckService {
return m.apibackend.CheckService
}
// KeyValueService returns the internal key-value service.
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService
}
func (m *Launcher) DBRPMappingServiceV2() platform.DBRPMappingServiceV2 {
return m.apibackend.DBRPService
}

View File

@ -5,8 +5,6 @@ import (
"strconv"
"time"
platform2 "github.com/influxdata/influxdb/v2/kit/platform"
platform "github.com/influxdata/influxdb/v2"
"github.com/prometheus/client_golang/prometheus"
)
@ -18,9 +16,7 @@ type influxCollector struct {
}
// NewInfluxCollector returns a collector which exports influxdb process metrics.
func NewInfluxCollector(procID platform2.IDGenerator, build platform.BuildInfo) prometheus.Collector {
id := procID.ID().String()
func NewInfluxCollector(procID string, build platform.BuildInfo) prometheus.Collector {
return &influxCollector{
influxInfoDesc: prometheus.NewDesc(
"influxdb_info",
@ -38,7 +34,7 @@ func NewInfluxCollector(procID platform2.IDGenerator, build platform.BuildInfo)
"influxdb_uptime_seconds",
"influxdb process uptime in seconds",
nil, prometheus.Labels{
"id": id,
"id": procID,
},
),
start: time.Now(),