From eb5f4fbe8ae4db4de20e2c6d7b14e8f81b33fc93 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Tue, 30 Oct 2018 01:57:48 -0500 Subject: [PATCH] refactor(cmd/influxd): use kit and close all services --- cmd/influxd/main.go | 285 ++++++++++++++++++++------------------------ gather/scheduler.go | 8 +- nats/server.go | 5 + 3 files changed, 140 insertions(+), 158 deletions(-) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index af9591f70c..24eeff1f3c 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -6,10 +6,9 @@ import ( nethttp "net/http" _ "net/http/pprof" "os" - "os/signal" "os/user" "path/filepath" - "syscall" + "sync" "time" "github.com/influxdata/platform" @@ -17,7 +16,9 @@ import ( "github.com/influxdata/platform/chronograf/server" "github.com/influxdata/platform/gather" "github.com/influxdata/platform/http" + "github.com/influxdata/platform/kit/cli" "github.com/influxdata/platform/kit/prom" + "github.com/influxdata/platform/kit/signals" influxlogger "github.com/influxdata/platform/logger" "github.com/influxdata/platform/nats" "github.com/influxdata/platform/query" @@ -33,13 +34,58 @@ import ( _ "github.com/influxdata/platform/tsdb/tsi1" _ "github.com/influxdata/platform/tsdb/tsm1" "github.com/prometheus/client_golang/prometheus" - "github.com/spf13/cobra" - "github.com/spf13/viper" "go.uber.org/zap" ) func main() { - Execute() + dir, err := influxDir() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to determine influx directory: %v", err) + os.Exit(1) + } + + prog := &cli.Program{ + Name: "influxd", + Run: run, + Opts: []cli.Opt{ + { + DestP: &httpBindAddress, + Flag: "http-bind-address", + Default: ":9999", + Desc: "bind address for the REST HTTP API", + }, + { + DestP: &boltPath, + Flag: "bolt-path", + Default: filepath.Join(dir, "influxd.bolt"), + Desc: "path to boltdb database", + }, + { + DestP: &developerMode, + Flag: "developer-mode", + Default: false, + Desc: "serve assets from the local filesystem in developer mode", + }, + { + DestP: &natsPath, + Flag: "nats-path", + Default: filepath.Join(dir, "nats"), + Desc: "path to NATS queue for scraping tasks", + }, + { + DestP: &enginePath, + Flag: "engine-path", + Default: filepath.Join(dir, "engine"), + Desc: "path to persistent engine files", + }, + }, + } + + cmd := cli.NewCommand(prog) + if err := cmd.Execute(); err != nil { + fmt.Println(err) + os.Exit(1) + } } const ( @@ -79,60 +125,13 @@ func influxDir() (string, error) { } func init() { - dir, err := influxDir() - if err != nil { - fmt.Fprintf(os.Stderr, "Failed to determine influx directory: %v", err) - os.Exit(1) - } - - viper.SetEnvPrefix("INFLUX") - - platformCmd.Flags().StringVar(&httpBindAddress, "http-bind-address", ":9999", "bind address for the rest http api") - viper.BindEnv("HTTP_BIND_ADDRESS") - if h := viper.GetString("HTTP_BIND_ADDRESS"); h != "" { - httpBindAddress = h - } - - platformCmd.Flags().StringVar(&authorizationPath, "authorization-path", "", "path to a bootstrap token") - viper.BindEnv("TOKEN_PATH") - if h := viper.GetString("TOKEN_PATH"); h != "" { - authorizationPath = h - } - - platformCmd.Flags().StringVar(&boltPath, "bolt-path", filepath.Join(dir, "influxd.bolt"), "path to boltdb database") - viper.BindEnv("BOLT_PATH") - if h := viper.GetString("BOLT_PATH"); h != "" { - boltPath = h - } - - platformCmd.Flags().BoolVar(&developerMode, "developer-mode", false, "serve assets from the local filesystem in developer mode") - viper.BindEnv("DEV_MODE") - if h := viper.GetBool("DEV_MODE"); h { - developerMode = h - } - - // TODO(edd): do we need NATS for anything? - platformCmd.Flags().StringVar(&natsPath, "nats-path", filepath.Join(dir, "nats"), "path to persistent NATS files") - viper.BindEnv("NATS_PATH") - if h := viper.GetString("NATS_PATH"); h != "" { - natsPath = h - } - - platformCmd.Flags().StringVar(&enginePath, "engine-path", filepath.Join(dir, "engine"), "path to persistent engine files") - viper.BindEnv("ENGINE_PATH") - if h := viper.GetString("ENGINE_PATH"); h != "" { - enginePath = h - } } -var platformCmd = &cobra.Command{ - Use: "influxd", - Short: "influxdata platform", - Run: platformF, -} - -func platformF(cmd *cobra.Command, args []string) { +func run() error { ctx := context.Background() + // exit with SIGINT and SIGTERM + ctx = signals.WithStandardSignals(ctx) + // Create top level logger logger := influxlogger.New(os.Stdout) @@ -146,67 +145,38 @@ func platformF(cmd *cobra.Command, args []string) { if err := c.Open(ctx); err != nil { logger.Error("failed opening bolt", zap.Error(err)) - os.Exit(1) + return err } - defer c.Close() + defer func(logger *zap.Logger) { + logger = logger.With(zap.String("service", "bolt")) + logger.Info("stopping") + if err := c.Close(); err != nil { + logger.Info("failed closing bolt", zap.String("service", "bolt")) + } + }(logger) - var authSvc platform.AuthorizationService - { - authSvc = c + var ( + orgSvc platform.OrganizationService = c + authSvc platform.AuthorizationService = c + userSvc platform.UserService = c + viewSvc platform.ViewService = c + macroSvc platform.MacroService = c + bucketSvc platform.BucketService = c + sourceSvc platform.SourceService = c + sessionSvc platform.SessionService = c + basicAuthSvc platform.BasicAuthService = c + dashboardSvc platform.DashboardService = c + onboardingSvc platform.OnboardingService = c + userResourceSvc platform.UserResourceMappingService = c + scraperTargetSvc platform.ScraperTargetStoreService = c + ) + + chronografSvc, err := server.NewServiceV2(ctx, c.DB()) + if err != nil { + logger.Error("failed creating chronograf service", zap.Error(err)) + return err } - var bucketSvc platform.BucketService - { - bucketSvc = c - } - - var orgSvc platform.OrganizationService - { - orgSvc = c - } - - var userSvc platform.UserService - { - userSvc = c - } - - var userResourceSvc platform.UserResourceMappingService - { - userResourceSvc = c - } - - var dashboardSvc platform.DashboardService - { - dashboardSvc = c - } - - var viewSvc platform.ViewService - { - viewSvc = c - } - - var sourceSvc platform.SourceService - { - sourceSvc = c - } - - var macroSvc platform.MacroService - { - macroSvc = c - } - - var basicAuthSvc platform.BasicAuthService - { - basicAuthSvc = c - } - - var sessionSvc platform.SessionService - { - sessionSvc = c - } - - var onboardingSvc platform.OnboardingService = c - var storageQueryService query.ProxyQueryService var pointsWriter storage.PointsWriter { @@ -220,8 +190,14 @@ func platformF(cmd *cobra.Command, args []string) { if err := engine.Open(); err != nil { logger.Error("failed to open engine", zap.Error(err)) - os.Exit(1) + return err } + defer func() { + logger.Info("stopping", zap.String("service", "storage-engine")) + if err := engine.Close(); err != nil { + logger.Error("failed to close engine", zap.Error(err)) + } + }() pointsWriter = engine @@ -229,7 +205,7 @@ func platformF(cmd *cobra.Command, args []string) { engine, bucketSvc, orgSvc, logger.With(zap.String("service", "storage-reads"))) if err != nil { logger.Error("failed to create query service", zap.Error(err)) - os.Exit(1) + return err } storageQueryService = service @@ -240,62 +216,66 @@ func platformF(cmd *cobra.Command, args []string) { { boltStore, err := taskbolt.New(c.DB(), "tasks") if err != nil { - logger.Fatal("failed opening task bolt", zap.Error(err)) + logger.Error("failed opening task bolt", zap.Error(err)) + return err } - executor := taskexecutor.NewQueryServiceExecutor(logger.With(zap.String("svc", "task-executor")), queryService, boltStore) + executor := taskexecutor.NewQueryServiceExecutor(logger.With(zap.String("service", "task-executor")), queryService, boltStore) // TODO(lh): Replace NopLogWriter with real log writer scheduler := taskbackend.NewScheduler(boltStore, executor, taskbackend.NopLogWriter{}, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, time.Second), taskbackend.WithLogger(logger)) scheduler.Start(ctx) + defer func() { + logger.Info("stopping", zap.String("service", "task")) + scheduler.Stop() + }() // TODO(lh): Replace NopLogReader with real log reader - taskSvc = task.PlatformAdapter(coordinator.New(logger.With(zap.String("svc", "task-coordinator")), scheduler, boltStore), taskbackend.NopLogReader{}, scheduler) + taskSvc = task.PlatformAdapter(coordinator.New(logger.With(zap.String("service", "task-coordinator")), scheduler, boltStore), taskbackend.NopLogReader{}, scheduler) // TODO(lh): Add in `taskSvc = task.NewValidator(taskSvc)` once we have Authentication coming in the context. // see issue #563 } - var scraperTargetSvc platform.ScraperTargetStoreService = c - - chronografSvc, err := server.NewServiceV2(ctx, c.DB()) - if err != nil { - logger.Error("failed creating chronograf service", zap.Error(err)) - os.Exit(1) - } - - errc := make(chan error) - - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM, os.Interrupt) - // NATS streaming server natsServer := nats.NewServer(nats.Config{FilestoreDir: natsPath}) if err := natsServer.Open(); err != nil { logger.Error("failed to start nats streaming server", zap.Error(err)) - os.Exit(1) + return err } + defer func() { + logger.Info("stopping", zap.String("service", "nats")) + natsServer.Close() + }() publisher := nats.NewAsyncPublisher("nats-publisher") if err := publisher.Open(); err != nil { logger.Error("failed to connect to streaming server", zap.Error(err)) - os.Exit(1) + return err } // TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed. subscriber := nats.NewQueueSubscriber("nats-subscriber") if err := subscriber.Open(); err != nil { logger.Error("failed to connect to streaming server", zap.Error(err)) - os.Exit(1) + return err } scraperScheduler, err := gather.NewScheduler(10, logger, scraperTargetSvc, publisher, subscriber, 0, 0) if err != nil { logger.Error("failed to create scraper subscriber", zap.Error(err)) - os.Exit(1) + return err } - go func() { - errc <- scraperScheduler.Run(ctx) - }() + + var wg sync.WaitGroup + wg.Add(1) + go func(logger *zap.Logger) { + defer wg.Done() + logger = logger.With(zap.String("service", "scraper")) + if err := scraperScheduler.Run(ctx); err != nil { + logger.Error("failed scraper service", zap.Error(err)) + } + logger.Info("stopping") + }(logger) httpServer := &nethttp.Server{ Addr: httpBindAddress, @@ -325,7 +305,10 @@ func platformF(cmd *cobra.Command, args []string) { } // HTTP server - go func() { + wg.Add(1) + go func(logger *zap.Logger) { + defer wg.Done() + logger = logger.With(zap.String("service", "http")) platformHandler := http.NewPlatformHandler(handlerConfig) reg.MustRegister(platformHandler.PrometheusCollectors()...) @@ -334,24 +317,16 @@ func platformF(cmd *cobra.Command, args []string) { httpServer.Handler = h logger.Info("Listening", zap.String("transport", "http"), zap.String("addr", httpBindAddress)) - errc <- httpServer.ListenAndServe() - }() - - select { - case <-sigs: - case err := <-errc: - logger.Fatal("unable to start platform", zap.Error(err)) - } + if err := httpServer.ListenAndServe(); err != nethttp.ErrServerClosed { + logger.Error("failed http service", zap.Error(err)) + } + logger.Info("Stopping") + }(logger) + <-ctx.Done() cctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() httpServer.Shutdown(cctx) -} - -// Execute executes the idped command -func Execute() { - if err := platformCmd.Execute(); err != nil { - fmt.Println(err) - os.Exit(1) - } + wg.Wait() + return nil } diff --git a/gather/scheduler.go b/gather/scheduler.go index bad9d32b4d..5a6aa9c9d3 100644 --- a/gather/scheduler.go +++ b/gather/scheduler.go @@ -76,14 +76,16 @@ func NewScheduler( // Run will retrieve scraper targets from the target storage, // and publish them to nats job queue for gather. func (s *Scheduler) Run(ctx context.Context) error { - go func(s *Scheduler) { + go func(s *Scheduler, ctx context.Context) { for { select { - case <-time.After(s.Interval): + case <-ctx.Done(): + return + case <-time.After(s.Interval): // TODO: change to ticker because of garbage collection s.gather <- struct{}{} } } - }(s) + }(s, ctx) return s.run(ctx) } diff --git a/nats/server.go b/nats/server.go index 0e917b11ce..463a4b36cf 100644 --- a/nats/server.go +++ b/nats/server.go @@ -33,6 +33,11 @@ func (s *Server) Open() error { return nil } +// Close stops the embedded NATS server. +func (s *Server) Close() { + s.Server.Shutdown() +} + // Config is the configuration for the NATS streaming server type Config struct { // The directory where nats persists message information