diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 23d15c28bf..4fb5afb088 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "errors" "fmt" - "io" "net" nethttp "net/http" "os" @@ -77,12 +76,8 @@ import ( telegrafservice "github.com/influxdata/influxdb/v2/telegraf/service" "github.com/influxdata/influxdb/v2/telemetry" "github.com/influxdata/influxdb/v2/tenant" - - // needed for tsm1 - _ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" - - // needed for tsi1 - _ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" + _ "github.com/influxdata/influxdb/v2/tsdb/engine/tsm1" // needed for tsm1 + _ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1" // needed for tsi1 authv1 "github.com/influxdata/influxdb/v2/v1/authorization" iqlcoordinator "github.com/influxdata/influxdb/v2/v1/coordinator" "github.com/influxdata/influxdb/v2/v1/services/meta" @@ -109,18 +104,24 @@ const ( JaegerTracing = "jaeger" ) +type labeledCloser struct { + label string + closer func(context.Context) error +} + // Launcher represents the main program execution. type Launcher struct { wg sync.WaitGroup cancel func() doneChan <-chan struct{} + closers []labeledCloser + flushers flushers flagger feature.Flagger - boltClient *bolt.Client - kvStore kv.SchemaStore - kvService *kv.Service - sqlStore *sqlite.SqlStore + kvStore kv.Store + kvService *kv.Service + sqlStore *sqlite.SqlStore // storage engine engine Engine @@ -129,19 +130,13 @@ type Launcher struct { queryController *control.Controller httpPort int - httpServer *nethttp.Server tlsEnabled bool - natsServer *nats.Server - natsPort int + scheduler stoppingScheduler + executor *executor.Executor - scheduler stoppingScheduler - executor *executor.Executor - taskControlService taskbackend.TaskControlService - - jaegerTracerCloser io.Closer - log *zap.Logger - reg *prom.Registry + log *zap.Logger + reg *prom.Registry apibackend *http.APIBackend } @@ -163,11 +158,6 @@ func (m *Launcher) Registry() *prom.Registry { return m.reg } -// NatsURL returns the URL to connection to the NATS server. -func (m *Launcher) NatsURL() string { - return fmt.Sprintf("http://127.0.0.1:%d", m.natsPort) -} - // Engine returns a reference to the storage engine. It should only be called // for end-to-end testing purposes. func (m *Launcher) Engine() Engine { @@ -178,51 +168,18 @@ func (m *Launcher) Engine() Engine { func (m *Launcher) Shutdown(ctx context.Context) error { var errs []string - if err := m.httpServer.Shutdown(ctx); err != nil { - m.log.Error("Failed to close HTTP server", zap.Error(err)) - errs = append(errs, err.Error()) - } - - m.log.Info("Stopping", zap.String("service", "task")) - - m.scheduler.Stop() - - m.log.Info("Stopping", zap.String("service", "nats")) - m.natsServer.Close() - - m.log.Info("Stopping", zap.String("service", "bolt")) - if err := m.boltClient.Close(); err != nil { - m.log.Error("Failed closing bolt", zap.Error(err)) - errs = append(errs, err.Error()) - } - - m.log.Info("Stopping", zap.String("service", "sqlite")) - if err := m.sqlStore.Close(); err != nil { - m.log.Error("Failed closing sqlite", zap.Error(err)) - errs = append(errs, err.Error()) - } - - m.log.Info("Stopping", zap.String("service", "query")) - if err := m.queryController.Shutdown(ctx); err != nil && err != context.Canceled { - m.log.Error("Failed closing query service", zap.Error(err)) - errs = append(errs, err.Error()) - } - - m.log.Info("Stopping", zap.String("service", "storage-engine")) - if err := m.engine.Close(); err != nil { - m.log.Error("Failed to close engine", zap.Error(err)) - errs = append(errs, err.Error()) - } - - m.wg.Wait() - - if m.jaegerTracerCloser != nil { - if err := m.jaegerTracerCloser.Close(); err != nil { - m.log.Error("Failed to closer Jaeger tracer", zap.Error(err)) + // Shut down subsystems in the reverse order of their registration. + for i := len(m.closers); i > 0; i-- { + lc := m.closers[i-1] + m.log.Info("Stopping subsystem", zap.String("subsystem", lc.label)) + if err := lc.closer(ctx); err != nil { + m.log.Error("Failed to stop subsystem", zap.String("subsystem", lc.label), zap.Error(err)) errs = append(errs, err.Error()) } } + m.wg.Wait() + // N.B. We ignore any errors here because Sync is known to fail with EINVAL // when logging to Stdout on certain OS's. // @@ -253,113 +210,17 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { zap.String("commit", info.Commit), zap.String("build_date", info.Date), ) - - switch opts.TracingType { - case LogTracing: - m.log.Info("Tracing via zap logging") - tracer := pzap.NewTracer(m.log, snowflake.NewIDGenerator()) - opentracing.SetGlobalTracer(tracer) - - case JaegerTracing: - m.log.Info("Tracing via Jaeger") - cfg, err := jaegerconfig.FromEnv() - if err != nil { - m.log.Error("Failed to get Jaeger client config from environment variables", zap.Error(err)) - break - } - tracer, closer, err := cfg.NewTracer() - if err != nil { - m.log.Error("Failed to instantiate Jaeger tracer", zap.Error(err)) - break - } - opentracing.SetGlobalTracer(tracer) - m.jaegerTracerCloser = closer - } - - m.boltClient = bolt.NewClient(m.log.With(zap.String("service", "bolt"))) - m.boltClient.Path = opts.BoltPath - - if err := m.boltClient.Open(ctx); err != nil { - m.log.Error("Failed opening bolt", zap.Error(err)) - return err - } - - var flushers flushers - switch opts.StoreType { - case BoltStore: - m.log.Warn("Using --store=bolt is deprecated. Use --store=disk instead.") - fallthrough - case DiskStore: - kvStore := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), opts.BoltPath) - kvStore.WithDB(m.boltClient.DB()) - m.kvStore = kvStore - - // If a sqlite-path is not specified, store sqlite db in the same directory as bolt with the default filename. - if opts.SqLitePath == "" { - opts.SqLitePath = filepath.Dir(opts.BoltPath) + "/" + sqlite.DefaultFilename - } - sqlStore, err := sqlite.NewSqlStore(opts.SqLitePath, m.log.With(zap.String("service", "sqlite"))) - if err != nil { - m.log.Error("Failed opening sqlite store", zap.Error(err)) - return err - } - m.sqlStore = sqlStore - - if opts.Testing { - flushers = append(flushers, kvStore, sqlStore) - } - - case MemoryStore: - kvStore := inmem.NewKVStore() - m.kvStore = kvStore - - sqlStore, err := sqlite.NewSqlStore(sqlite.InmemPath, m.log.With(zap.String("service", "sqlite"))) - if err != nil { - m.log.Error("Failed opening sqlite store", zap.Error(err)) - return err - } - m.sqlStore = sqlStore - - if opts.Testing { - flushers = append(flushers, kvStore, sqlStore) - } - - default: - err := fmt.Errorf("unknown store type %s; expected disk or memory", opts.StoreType) - m.log.Error("Failed opening metadata store", zap.Error(err)) - return err - } - - boltMigrator, err := migration.NewMigrator( - m.log.With(zap.String("service", "bolt migrations")), - m.kvStore, - all.Migrations[:]..., - ) - if err != nil { - m.log.Error("Failed to initialize kv migrator", zap.Error(err)) - return err - } - - // apply migrations to the bolt metadata store - if err := boltMigrator.Up(ctx); err != nil { - m.log.Error("Failed to apply bolt migrations", zap.Error(err)) - return err - } - - sqliteMigrator := sqlite.NewMigrator(m.sqlStore, m.log.With(zap.String("service", "sqlite migrations"))) - - // apply migrations to the sqlite metadata store - if err := sqliteMigrator.Up(ctx, sqliteMigrations.All); err != nil { - m.log.Error("Failed to apply sqlite migrations", zap.Error(err)) - return err - } + m.initTracing(opts) m.reg = prom.NewRegistry(m.log.With(zap.String("service", "prom_registry"))) - m.reg.MustRegister( - prometheus.NewGoCollector(), - infprom.NewInfluxCollector(m.boltClient, info), - ) - m.reg.MustRegister(m.boltClient) + m.reg.MustRegister(prometheus.NewGoCollector()) + + // Open KV and SQL stores. + procID, err := m.openMetaStores(ctx, opts) + if err != nil { + return err + } + m.reg.MustRegister(infprom.NewInfluxCollector(procID, info)) tenantStore := tenant.NewStore(m.kvStore) ts := tenant.NewSystem(tenantStore, m.log.With(zap.String("store", "new")), m.reg, metric.WithSuffix("new")) @@ -430,7 +291,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { opts.StorageConfig, storage.WithMetaClient(metaClient), ) - flushers = append(flushers, engine) + m.flushers = append(m.flushers, engine) m.engine = engine } else { // check for 2.x data / state from a prior 2.x @@ -449,6 +310,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { m.log.Error("Failed to open engine", zap.Error(err)) return err } + m.closers = append(m.closers, labeledCloser{ + label: "engine", + closer: func(context.Context) error { + return m.engine.Close() + }, + }) // The Engine's metrics must be registered after it opens. m.reg.MustRegister(m.engine.PrometheusCollectors()...) @@ -489,6 +356,12 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { m.log.Error("Failed to create query controller", zap.Error(err)) return err } + m.closers = append(m.closers, labeledCloser{ + label: "query", + closer: func(ctx context.Context) error { + return m.queryController.Shutdown(ctx) + }, + }) m.reg.MustRegister(m.queryController.PrometheusCollectors()...) @@ -537,6 +410,13 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { if err != nil { m.log.Fatal("could not start task scheduler", zap.Error(err)) } + m.closers = append(m.closers, labeledCloser{ + label: "task", + closer: func(context.Context) error { + sch.Stop() + return nil + }, + }) m.reg.MustRegister(sm.PrometheusCollectors()...) } @@ -549,7 +429,6 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { executor) taskSvc = middleware.New(combinedTaskService, taskCoord) - m.taskControlService = combinedTaskService if err := taskbackend.TaskNotifyCoordinatorOfExisting( ctx, taskSvc, @@ -627,21 +506,28 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { natsOpts := nats.NewDefaultServerOptions() natsOpts.Port = opts.NatsPort natsOpts.MaxPayload = opts.NatsMaxPayloadBytes - m.natsServer = nats.NewServer(&natsOpts, m.log.With(zap.String("service", "nats"))) - if err := m.natsServer.Open(); err != nil { + natsServer := nats.NewServer(&natsOpts, m.log.With(zap.String("service", "nats"))) + if err := natsServer.Open(); err != nil { m.log.Error("Failed to start nats streaming server", zap.Error(err)) return err } + m.closers = append(m.closers, labeledCloser{ + label: "nats", + closer: func(context.Context) error { + natsServer.Close() + return nil + }, + }) // If a random port was used, the opts will be updated to include the selected value. - m.natsPort = natsOpts.Port - publisher := nats.NewAsyncPublisher(m.log, fmt.Sprintf("nats-publisher-%d", m.natsPort), m.NatsURL()) + natsURL := fmt.Sprintf("http://127.0.0.1:%d", natsOpts.Port) + publisher := nats.NewAsyncPublisher(m.log, fmt.Sprintf("nats-publisher-%d", natsOpts.Port), natsURL) if err := publisher.Open(); err != nil { m.log.Error("Failed to connect to streaming server", zap.Error(err)) return err } // TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed. - subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", m.natsPort), m.NatsURL()) + subscriber := nats.NewQueueSubscriber(fmt.Sprintf("nats-subscriber-%d", natsOpts.Port), natsURL) if err := subscriber.Open(); err != nil { m.log.Error("Failed to connect to streaming server", zap.Error(err)) return err @@ -1018,7 +904,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { } // If we are in testing mode we allow all data to be flushed and removed. if opts.Testing { - httpHandler = http.DebugFlush(ctx, httpHandler, flushers) + httpHandler = http.DebugFlush(ctx, httpHandler, m.flushers) } if !opts.ReportingDisabled { @@ -1031,13 +917,144 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { return nil } +// initTracing sets up the global tracer for the influxd process. +// Any errors encountered during setup are logged, but don't crash the process. +func (m *Launcher) initTracing(opts *InfluxdOpts) { + switch opts.TracingType { + case LogTracing: + m.log.Info("Tracing via zap logging") + opentracing.SetGlobalTracer(pzap.NewTracer(m.log, snowflake.NewIDGenerator())) + + case JaegerTracing: + m.log.Info("Tracing via Jaeger") + cfg, err := jaegerconfig.FromEnv() + if err != nil { + m.log.Error("Failed to get Jaeger client config from environment variables", zap.Error(err)) + return + } + tracer, closer, err := cfg.NewTracer() + if err != nil { + m.log.Error("Failed to instantiate Jaeger tracer", zap.Error(err)) + return + } + m.closers = append(m.closers, labeledCloser{ + label: "Jaeger tracer", + closer: func(context.Context) error { + return closer.Close() + }, + }) + opentracing.SetGlobalTracer(tracer) + } +} + +// openMetaStores opens the embedded DBs used to store metadata about influxd resources, migrating them to +// the latest schema expected by the server. +// On success, a unique ID is returned to be used as an identifier for the influxd instance in telemetry. +func (m *Launcher) openMetaStores(ctx context.Context, opts *InfluxdOpts) (string, error) { + type flushableKVStore interface { + kv.SchemaStore + http.Flusher + } + var kvStore flushableKVStore + var sqlStore *sqlite.SqlStore + + var procID string + var err error + switch opts.StoreType { + case BoltStore: + m.log.Warn("Using --store=bolt is deprecated. Use --store=disk instead.") + fallthrough + case DiskStore: + boltClient := bolt.NewClient(m.log.With(zap.String("service", "bolt"))) + boltClient.Path = opts.BoltPath + + if err := boltClient.Open(ctx); err != nil { + m.log.Error("Failed opening bolt", zap.Error(err)) + return "", err + } + m.closers = append(m.closers, labeledCloser{ + label: "bolt", + closer: func(context.Context) error { + return boltClient.Close() + }, + }) + m.reg.MustRegister(boltClient) + procID = boltClient.ID().String() + + boltKV := bolt.NewKVStore(m.log.With(zap.String("service", "kvstore-bolt")), opts.BoltPath) + boltKV.WithDB(boltClient.DB()) + kvStore = boltKV + + // If a sqlite-path is not specified, store sqlite db in the same directory as bolt with the default filename. + if opts.SqLitePath == "" { + opts.SqLitePath = filepath.Dir(opts.BoltPath) + "/" + sqlite.DefaultFilename + } + sqlStore, err = sqlite.NewSqlStore(opts.SqLitePath, m.log.With(zap.String("service", "sqlite"))) + if err != nil { + m.log.Error("Failed opening sqlite store", zap.Error(err)) + return "", err + } + + case MemoryStore: + kvStore = inmem.NewKVStore() + sqlStore, err = sqlite.NewSqlStore(sqlite.InmemPath, m.log.With(zap.String("service", "sqlite"))) + if err != nil { + m.log.Error("Failed opening sqlite store", zap.Error(err)) + return "", err + } + + default: + err := fmt.Errorf("unknown store type %s; expected disk or memory", opts.StoreType) + m.log.Error("Failed opening metadata store", zap.Error(err)) + return "", err + } + + m.closers = append(m.closers, labeledCloser{ + label: "sqlite", + closer: func(context.Context) error { + return sqlStore.Close() + }, + }) + if opts.Testing { + m.flushers = append(m.flushers, kvStore, sqlStore) + } + + boltMigrator, err := migration.NewMigrator( + m.log.With(zap.String("service", "bolt migrations")), + kvStore, + all.Migrations[:]..., + ) + if err != nil { + m.log.Error("Failed to initialize kv migrator", zap.Error(err)) + return "", err + } + + // apply migrations to the bolt metadata store + if err := boltMigrator.Up(ctx); err != nil { + m.log.Error("Failed to apply bolt migrations", zap.Error(err)) + return "", err + } + + sqliteMigrator := sqlite.NewMigrator(sqlStore, m.log.With(zap.String("service", "sqlite migrations"))) + + // apply migrations to the sqlite metadata store + if err := sqliteMigrator.Up(ctx, sqliteMigrations.All); err != nil { + m.log.Error("Failed to apply sqlite migrations", zap.Error(err)) + return "", err + } + + m.kvStore = kvStore + m.sqlStore = sqlStore + return procID, nil +} + // runHTTP configures and launches a listener for incoming HTTP(S) requests. // The listener is run in a separate goroutine. If it fails to start up, it // will cancel the launcher. func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogger *zap.Logger) error { log := m.log.With(zap.String("service", "tcp-listener")) - m.httpServer = &nethttp.Server{ + httpServer := &nethttp.Server{ Addr: opts.HttpBindAddress, Handler: handler, ReadHeaderTimeout: opts.HttpReadHeaderTimeout, @@ -1046,6 +1063,10 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge IdleTimeout: opts.HttpIdleTimeout, ErrorLog: zap.NewStdLog(httpLogger), } + m.closers = append(m.closers, labeledCloser{ + label: "HTTP server", + closer: httpServer.Shutdown, + }) ln, err := net.Listen("tcp", opts.HttpBindAddress) if err != nil { @@ -1067,7 +1088,7 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge defer m.wg.Done() log.Info("Listening", zap.String("transport", "http"), zap.String("addr", opts.HttpBindAddress), zap.Int("port", m.httpPort)) - if err := m.httpServer.Serve(ln); err != nethttp.ErrServerClosed { + if err := httpServer.Serve(ln); err != nethttp.ErrServerClosed { log.Error("Failed to serve HTTP", zap.Error(err)) m.cancel() } @@ -1117,7 +1138,7 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge } } - m.httpServer.TLSConfig = &tls.Config{ + httpServer.TLSConfig = &tls.Config{ CurvePreferences: []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256}, PreferServerCipherSuites: !useStrictCiphers, MinVersion: tlsMinVersion, @@ -1128,7 +1149,7 @@ func (m *Launcher) runHTTP(opts *InfluxdOpts, handler nethttp.Handler, httpLogge defer m.wg.Done() log.Info("Listening", zap.String("transport", "https"), zap.String("addr", opts.HttpBindAddress), zap.Int("port", m.httpPort)) - if err := m.httpServer.ServeTLS(ln, opts.HttpTLSCert, opts.HttpTLSKey); err != nethttp.ErrServerClosed { + if err := httpServer.ServeTLS(ln, opts.HttpTLSCert, opts.HttpTLSKey); err != nethttp.ErrServerClosed { log.Error("Failed to serve HTTPS", zap.Error(err)) m.cancel() } @@ -1216,11 +1237,6 @@ func (m *Launcher) UserService() platform.UserService { return m.apibackend.UserService } -// UserResourceMappingService returns the internal user resource mapping service. -func (m *Launcher) UserResourceMappingService() platform.UserResourceMappingService { - return m.apibackend.UserResourceMappingService -} - // AuthorizationService returns the internal authorization service. func (m *Launcher) AuthorizationService() platform.AuthorizationService { return m.apibackend.AuthorizationService @@ -1235,26 +1251,11 @@ func (m *Launcher) SecretService() platform.SecretService { return m.apibackend.SecretService } -// TaskService returns the internal task service. -func (m *Launcher) TaskService() taskmodel.TaskService { - return m.apibackend.TaskService -} - -// TaskControlService returns the internal store service. -func (m *Launcher) TaskControlService() taskbackend.TaskControlService { - return m.taskControlService -} - // CheckService returns the internal check service. func (m *Launcher) CheckService() platform.CheckService { return m.apibackend.CheckService } -// KeyValueService returns the internal key-value service. -func (m *Launcher) KeyValueService() *kv.Service { - return m.kvService -} - func (m *Launcher) DBRPMappingServiceV2() platform.DBRPMappingServiceV2 { return m.apibackend.DBRPService } diff --git a/prometheus/influx.go b/prometheus/influx.go index 79bfc8713e..ff14dec1ae 100644 --- a/prometheus/influx.go +++ b/prometheus/influx.go @@ -5,8 +5,6 @@ import ( "strconv" "time" - platform2 "github.com/influxdata/influxdb/v2/kit/platform" - platform "github.com/influxdata/influxdb/v2" "github.com/prometheus/client_golang/prometheus" ) @@ -18,9 +16,7 @@ type influxCollector struct { } // NewInfluxCollector returns a collector which exports influxdb process metrics. -func NewInfluxCollector(procID platform2.IDGenerator, build platform.BuildInfo) prometheus.Collector { - id := procID.ID().String() - +func NewInfluxCollector(procID string, build platform.BuildInfo) prometheus.Collector { return &influxCollector{ influxInfoDesc: prometheus.NewDesc( "influxdb_info", @@ -38,7 +34,7 @@ func NewInfluxCollector(procID platform2.IDGenerator, build platform.BuildInfo) "influxdb_uptime_seconds", "influxdb process uptime in seconds", nil, prometheus.Labels{ - "id": id, + "id": procID, }, ), start: time.Now(),