730 lines
20 KiB
Go
730 lines
20 KiB
Go
package run
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"time"
|
|
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/dependencies/testing"
|
|
"github.com/influxdata/flux/execute/executetest"
|
|
"github.com/influxdata/influxdb"
|
|
"github.com/influxdata/influxdb/coordinator"
|
|
influxdb2 "github.com/influxdata/influxdb/flux/stdlib/influxdata/influxdb"
|
|
"github.com/influxdata/influxdb/logger"
|
|
"github.com/influxdata/influxdb/models"
|
|
"github.com/influxdata/influxdb/monitor"
|
|
prometheus2 "github.com/influxdata/influxdb/prometheus"
|
|
"github.com/influxdata/influxdb/query"
|
|
"github.com/influxdata/influxdb/query/control"
|
|
"github.com/influxdata/influxdb/services/collectd"
|
|
"github.com/influxdata/influxdb/services/continuous_querier"
|
|
"github.com/influxdata/influxdb/services/graphite"
|
|
"github.com/influxdata/influxdb/services/httpd"
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
"github.com/influxdata/influxdb/services/opentsdb"
|
|
"github.com/influxdata/influxdb/services/precreator"
|
|
"github.com/influxdata/influxdb/services/retention"
|
|
"github.com/influxdata/influxdb/services/snapshotter"
|
|
"github.com/influxdata/influxdb/services/storage"
|
|
"github.com/influxdata/influxdb/services/subscriber"
|
|
"github.com/influxdata/influxdb/services/udp"
|
|
reads "github.com/influxdata/influxdb/storage/flux"
|
|
"github.com/influxdata/influxdb/tcp"
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
|
|
// Initialize the engine package
|
|
_ "github.com/influxdata/influxdb/tsdb/engine"
|
|
|
|
// Initialize the index package
|
|
_ "github.com/influxdata/influxdb/tsdb/index"
|
|
client "github.com/influxdata/usage-client/v1"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
var startTime time.Time
|
|
|
|
func init() {
|
|
startTime = time.Now().UTC()
|
|
}
|
|
|
|
// BuildInfo represents the build details for the server code.
|
|
type BuildInfo struct {
|
|
Version string
|
|
Commit string
|
|
Branch string
|
|
Time string
|
|
}
|
|
|
|
type StartupProgress interface {
|
|
AddShard()
|
|
CompletedShard()
|
|
}
|
|
|
|
// Server represents a container for the metadata and storage data and services.
|
|
// It is built using a Config and it manages the startup and shutdown of all
|
|
// services in the proper order.
|
|
type Server struct {
|
|
buildInfo BuildInfo
|
|
|
|
err chan error
|
|
closing chan struct{}
|
|
|
|
BindAddress string
|
|
Listener net.Listener
|
|
|
|
Logger *zap.Logger
|
|
MuxLogger *log.Logger
|
|
|
|
MetaClient *meta.Client
|
|
|
|
TSDBStore *tsdb.Store
|
|
QueryExecutor *query.Executor
|
|
PointsWriter *coordinator.PointsWriter
|
|
Subscriber *subscriber.Service
|
|
|
|
Services []Service
|
|
|
|
Prometheus *prometheus.Registry
|
|
|
|
// These references are required for the tcp muxer.
|
|
SnapshotterService *snapshotter.Service
|
|
|
|
Monitor *monitor.Monitor
|
|
|
|
StartupProgressMetrics StartupProgress
|
|
|
|
// Server reporting and registration
|
|
reportingDisabled bool
|
|
|
|
// Profiling
|
|
CPUProfile string
|
|
CPUProfileWriteCloser io.WriteCloser
|
|
MemProfile string
|
|
MemProfileWriteCloser io.WriteCloser
|
|
|
|
// httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data
|
|
httpAPIAddr string
|
|
|
|
// httpUseTLS specifies if we should use a TLS connection to the http servers
|
|
httpUseTLS bool
|
|
|
|
config *Config
|
|
}
|
|
|
|
// updateTLSConfig stores with into the tls config pointed at by into but only if with is not nil
|
|
// and into is nil. Think of it as setting the default value.
|
|
func updateTLSConfig(into **tls.Config, with *tls.Config) {
|
|
if with != nil && into != nil && *into == nil {
|
|
*into = with
|
|
}
|
|
}
|
|
|
|
// NewServer returns a new instance of Server built from a config.
|
|
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
|
// First grab the base tls config we will use for all clients and servers
|
|
tlsConfig, err := c.TLS.Parse()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("tls configuration: %v", err)
|
|
}
|
|
|
|
// Update the TLS values on each of the configs to be the parsed one if
|
|
// not already specified (set the default).
|
|
updateTLSConfig(&c.HTTPD.TLS, tlsConfig)
|
|
updateTLSConfig(&c.Subscriber.TLS, tlsConfig)
|
|
for i := range c.OpenTSDBInputs {
|
|
updateTLSConfig(&c.OpenTSDBInputs[i].TLS, tlsConfig)
|
|
}
|
|
|
|
// We need to ensure that a meta directory always exists even if
|
|
// we don't start the meta store. node.json is always stored under
|
|
// the meta directory.
|
|
if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {
|
|
return nil, fmt.Errorf("mkdir all: %s", err)
|
|
}
|
|
|
|
// 0.10-rc1 and prior would sometimes put the node.json at the root
|
|
// dir which breaks backup/restore and restarting nodes. This moves
|
|
// the file from the root so it's always under the meta dir.
|
|
oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json")
|
|
newPath := filepath.Join(c.Meta.Dir, "node.json")
|
|
|
|
if _, err := os.Stat(oldPath); err == nil {
|
|
if err := os.Rename(oldPath, newPath); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
_, err = influxdb.LoadNode(c.Meta.Dir)
|
|
if err != nil {
|
|
if !os.IsNotExist(err) {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := raftDBExists(c.Meta.Dir); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// In 0.10.0 bind-address got moved to the top level. Check
|
|
// The old location to keep things backwards compatible
|
|
bind := c.BindAddress
|
|
|
|
s := &Server{
|
|
buildInfo: *buildInfo,
|
|
err: make(chan error),
|
|
closing: make(chan struct{}),
|
|
|
|
BindAddress: bind,
|
|
Prometheus: prometheus.NewRegistry(),
|
|
|
|
Logger: logger.New(os.Stderr),
|
|
MuxLogger: tcp.MuxLogger(os.Stderr),
|
|
|
|
MetaClient: meta.NewClient(c.Meta),
|
|
|
|
reportingDisabled: c.ReportingDisabled,
|
|
|
|
httpAPIAddr: c.HTTPD.BindAddress,
|
|
httpUseTLS: c.HTTPD.HTTPSEnabled,
|
|
|
|
config: c,
|
|
}
|
|
s.Monitor = monitor.New(s, c.Monitor)
|
|
s.config.registerDiagnostics(s.Monitor)
|
|
|
|
if err := s.MetaClient.Open(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
s.TSDBStore = tsdb.NewStore(c.Data.Dir)
|
|
s.TSDBStore.EngineOptions.Config = c.Data
|
|
|
|
// Copy TSDB configuration.
|
|
s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
|
|
s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index
|
|
|
|
// Create the Subscriber service
|
|
s.Subscriber = subscriber.NewService(c.Subscriber)
|
|
|
|
// Initialize points writer.
|
|
s.PointsWriter = coordinator.NewPointsWriter()
|
|
s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
|
|
s.PointsWriter.TSDBStore = s.TSDBStore
|
|
|
|
// Initialize query executor.
|
|
s.QueryExecutor = query.NewExecutor()
|
|
s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
|
|
MetaClient: s.MetaClient,
|
|
TaskManager: s.QueryExecutor.TaskManager,
|
|
TSDBStore: s.TSDBStore,
|
|
ShardMapper: &coordinator.LocalShardMapper{
|
|
MetaClient: s.MetaClient,
|
|
TSDBStore: coordinator.LocalTSDBStore{Store: s.TSDBStore},
|
|
},
|
|
StrictErrorHandling: s.TSDBStore.EngineOptions.Config.StrictErrorHandling,
|
|
Monitor: s.Monitor,
|
|
PointsWriter: s.PointsWriter,
|
|
MaxSelectPointN: c.Coordinator.MaxSelectPointN,
|
|
MaxSelectSeriesN: c.Coordinator.MaxSelectSeriesN,
|
|
MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
|
|
}
|
|
s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
|
|
s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
|
|
s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries
|
|
s.QueryExecutor.TaskManager.LogTimedoutQueries = c.Coordinator.LogTimedOutQueries
|
|
|
|
// Initialize the monitor
|
|
s.Monitor.Version = s.buildInfo.Version
|
|
s.Monitor.Commit = s.buildInfo.Commit
|
|
s.Monitor.Branch = s.buildInfo.Branch
|
|
s.Monitor.BuildTime = s.buildInfo.Time
|
|
s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
|
|
return s, nil
|
|
}
|
|
|
|
// Statistics returns statistics for the services running in the Server.
|
|
func (s *Server) Statistics(tags map[string]string) []models.Statistic {
|
|
var statistics []models.Statistic
|
|
statistics = append(statistics, s.QueryExecutor.Statistics(tags)...)
|
|
statistics = append(statistics, s.TSDBStore.Statistics(tags)...)
|
|
statistics = append(statistics, s.PointsWriter.Statistics(tags)...)
|
|
statistics = append(statistics, s.Subscriber.Statistics(tags)...)
|
|
for _, srv := range s.Services {
|
|
if m, ok := srv.(monitor.Reporter); ok {
|
|
statistics = append(statistics, m.Statistics(tags)...)
|
|
}
|
|
}
|
|
metricFamily, err := s.Prometheus.Gather()
|
|
if err == nil {
|
|
statistics = append(statistics, prometheus2.PrometheusToStatistics(metricFamily, tags)...)
|
|
}
|
|
return statistics
|
|
}
|
|
|
|
func (s *Server) appendSnapshotterService() {
|
|
srv := snapshotter.NewService()
|
|
srv.TSDBStore = s.TSDBStore
|
|
srv.MetaClient = s.MetaClient
|
|
s.Services = append(s.Services, srv)
|
|
s.SnapshotterService = srv
|
|
}
|
|
|
|
// SetLogOutput sets the logger used for all messages. It must not be called
|
|
// after the Open method has been called.
|
|
func (s *Server) SetLogOutput(w io.Writer) {
|
|
s.Logger = logger.New(w)
|
|
s.MuxLogger = tcp.MuxLogger(w)
|
|
}
|
|
|
|
func (s *Server) SetStartupMetrics(sp StartupProgress) {
|
|
s.StartupProgressMetrics = sp
|
|
}
|
|
|
|
func (s *Server) appendMonitorService() {
|
|
s.Services = append(s.Services, s.Monitor)
|
|
}
|
|
|
|
func (s *Server) appendRetentionPolicyService(c retention.Config) {
|
|
if !c.Enabled {
|
|
return
|
|
}
|
|
srv := retention.NewService(c)
|
|
srv.SetOSSMetaClient(s.MetaClient)
|
|
srv.TSDBStore = s.TSDBStore
|
|
srv.DropShardMetaRef = retention.OSSDropShardMetaRef(s.MetaClient)
|
|
s.Services = append(s.Services, srv)
|
|
}
|
|
|
|
func (s *Server) appendHTTPDService(c httpd.Config) error {
|
|
if !c.Enabled {
|
|
return nil
|
|
}
|
|
srv := httpd.NewService(c)
|
|
srv.Handler.MetaClient = s.MetaClient
|
|
authorizer := meta.NewQueryAuthorizer(s.MetaClient)
|
|
srv.Handler.QueryAuthorizer = authorizer
|
|
srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
|
|
srv.Handler.QueryExecutor = s.QueryExecutor
|
|
srv.Handler.Monitor = s.Monitor
|
|
srv.Handler.PointsWriter = s.PointsWriter
|
|
srv.Handler.Version = s.buildInfo.Version
|
|
srv.Handler.BuildType = "OSS"
|
|
ss := storage.NewStore(s.TSDBStore, s.MetaClient)
|
|
srv.Handler.Store = ss
|
|
if s.config.HTTPD.FluxEnabled {
|
|
storageDep, err := influxdb2.NewDependencies(s.MetaClient, reads.NewReader(ss), authorizer, c.AuthEnabled, s.PointsWriter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
deps := []flux.Dependency{storageDep, testing.FrameworkConfig{}}
|
|
if s.config.HTTPD.FluxTesting {
|
|
deps = append(deps, executetest.NewDefaultTestFlagger())
|
|
}
|
|
|
|
srv.Handler.Controller, err = control.New(
|
|
s.config.FluxController,
|
|
s.Logger.With(zap.String("service", "flux-controller")),
|
|
deps,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.Prometheus.MustRegister(srv.Handler.Controller.PrometheusCollectors()...)
|
|
}
|
|
|
|
s.Services = append(s.Services, srv)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) appendCollectdService(c collectd.Config) {
|
|
if !c.Enabled {
|
|
return
|
|
}
|
|
srv := collectd.NewService(c)
|
|
srv.MetaClient = s.MetaClient
|
|
srv.PointsWriter = s.PointsWriter
|
|
s.Services = append(s.Services, srv)
|
|
}
|
|
|
|
func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
|
|
if !c.Enabled {
|
|
return nil
|
|
}
|
|
srv, err := opentsdb.NewService(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
srv.PointsWriter = s.PointsWriter
|
|
srv.MetaClient = s.MetaClient
|
|
s.Services = append(s.Services, srv)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) appendGraphiteService(c graphite.Config) error {
|
|
if !c.Enabled {
|
|
return nil
|
|
}
|
|
srv, err := graphite.NewService(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
srv.PointsWriter = s.PointsWriter
|
|
srv.MetaClient = s.MetaClient
|
|
srv.Monitor = s.Monitor
|
|
s.Services = append(s.Services, srv)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) appendPrecreatorService(c precreator.Config) error {
|
|
if !c.Enabled {
|
|
return nil
|
|
}
|
|
srv := precreator.NewService(c)
|
|
srv.MetaClient = s.MetaClient
|
|
s.Services = append(s.Services, srv)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) appendUDPService(c udp.Config) {
|
|
if !c.Enabled {
|
|
return
|
|
}
|
|
srv := udp.NewService(c)
|
|
srv.PointsWriter = s.PointsWriter
|
|
srv.MetaClient = s.MetaClient
|
|
s.Services = append(s.Services, srv)
|
|
}
|
|
|
|
func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
|
|
if !c.Enabled {
|
|
return
|
|
}
|
|
srv := continuous_querier.NewService(c)
|
|
srv.MetaClient = s.MetaClient
|
|
srv.QueryExecutor = s.QueryExecutor
|
|
srv.Monitor = s.Monitor
|
|
s.Services = append(s.Services, srv)
|
|
}
|
|
|
|
// Err returns an error channel that multiplexes all out of band errors received from all services.
|
|
func (s *Server) Err() <-chan error { return s.err }
|
|
|
|
// Open opens the meta and data store and all services.
|
|
func (s *Server) Open() error {
|
|
// Start profiling if requested.
|
|
if err := s.startProfile(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Open shared TCP connection.
|
|
ln, err := net.Listen("tcp", s.BindAddress)
|
|
if err != nil {
|
|
return fmt.Errorf("listen: %s", err)
|
|
}
|
|
s.Listener = ln
|
|
|
|
// Multiplex listener.
|
|
mux := tcp.NewMux(s.MuxLogger)
|
|
go mux.Serve(ln)
|
|
|
|
// Append services.
|
|
s.appendMonitorService()
|
|
s.appendPrecreatorService(s.config.Precreator)
|
|
s.appendSnapshotterService()
|
|
s.appendContinuousQueryService(s.config.ContinuousQuery)
|
|
if err := s.appendHTTPDService(s.config.HTTPD); err != nil {
|
|
return err
|
|
}
|
|
s.appendRetentionPolicyService(s.config.Retention)
|
|
for _, i := range s.config.GraphiteInputs {
|
|
if err := s.appendGraphiteService(i); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for _, i := range s.config.CollectdInputs {
|
|
s.appendCollectdService(i)
|
|
}
|
|
for _, i := range s.config.OpenTSDBInputs {
|
|
if err := s.appendOpenTSDBService(i); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for _, i := range s.config.UDPInputs {
|
|
s.appendUDPService(i)
|
|
}
|
|
|
|
s.Subscriber.MetaClient = s.MetaClient
|
|
s.PointsWriter.MetaClient = s.MetaClient
|
|
s.Monitor.MetaClient = s.MetaClient
|
|
|
|
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
|
|
|
|
// Configure logging for all services and clients.
|
|
if s.config.Meta.LoggingEnabled {
|
|
s.MetaClient.WithLogger(s.Logger)
|
|
}
|
|
s.TSDBStore.WithLogger(s.Logger)
|
|
|
|
s.TSDBStore.WithStartupMetrics(s.StartupProgressMetrics)
|
|
|
|
if s.config.Data.QueryLogEnabled {
|
|
s.QueryExecutor.WithLogger(s.Logger)
|
|
} else if s.config.Coordinator.LogQueriesAfter > 0 || s.config.Coordinator.LogTimedOutQueries {
|
|
// If we need to do any logging, add a logger.
|
|
// The TaskManager properly handles both of the above configs
|
|
// so it only logs as is appropriate.
|
|
s.QueryExecutor.TaskManager.Logger = s.Logger
|
|
}
|
|
s.PointsWriter.WithLogger(s.Logger)
|
|
s.Subscriber.WithLogger(s.Logger)
|
|
for _, svc := range s.Services {
|
|
svc.WithLogger(s.Logger)
|
|
}
|
|
s.SnapshotterService.WithLogger(s.Logger)
|
|
s.Monitor.WithLogger(s.Logger)
|
|
|
|
// Open TSDB store.
|
|
if err := s.TSDBStore.Open(); err != nil {
|
|
return fmt.Errorf("open tsdb store: %s", err)
|
|
}
|
|
|
|
// Add the subscriber before opening the PointsWriter
|
|
s.PointsWriter.Subscriber = s.Subscriber
|
|
|
|
// Open the subscriber service
|
|
if err := s.Subscriber.Open(); err != nil {
|
|
return fmt.Errorf("open subscriber: %s", err)
|
|
}
|
|
|
|
// Open the points writer service
|
|
if err := s.PointsWriter.Open(); err != nil {
|
|
return fmt.Errorf("open points writer: %s", err)
|
|
}
|
|
|
|
for _, service := range s.Services {
|
|
if err := service.Open(); err != nil {
|
|
return fmt.Errorf("open service: %s", err)
|
|
}
|
|
}
|
|
|
|
// Start the reporting service, if not disabled.
|
|
if !s.reportingDisabled {
|
|
go s.startServerReporting()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close shuts down the meta and data stores and all services.
|
|
func (s *Server) Close() error {
|
|
s.stopProfile()
|
|
|
|
// Close the listener first to stop any new connections
|
|
if s.Listener != nil {
|
|
s.Listener.Close()
|
|
}
|
|
|
|
// Close services to allow any inflight requests to complete
|
|
// and prevent new requests from being accepted.
|
|
for _, service := range s.Services {
|
|
service.Close()
|
|
}
|
|
|
|
s.config.deregisterDiagnostics(s.Monitor)
|
|
|
|
if s.PointsWriter != nil {
|
|
s.PointsWriter.Close()
|
|
}
|
|
|
|
if s.QueryExecutor != nil {
|
|
s.QueryExecutor.Close()
|
|
}
|
|
|
|
// Close the TSDBStore, no more reads or writes at this point
|
|
if s.TSDBStore != nil {
|
|
s.TSDBStore.Close()
|
|
}
|
|
|
|
if s.Subscriber != nil {
|
|
s.Subscriber.Close()
|
|
}
|
|
|
|
if s.MetaClient != nil {
|
|
s.MetaClient.Close()
|
|
}
|
|
|
|
close(s.closing)
|
|
return nil
|
|
}
|
|
|
|
// startServerReporting starts periodic server reporting.
|
|
func (s *Server) startServerReporting() {
|
|
s.reportServer()
|
|
|
|
ticker := time.NewTicker(24 * time.Hour)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-s.closing:
|
|
return
|
|
case <-ticker.C:
|
|
s.reportServer()
|
|
}
|
|
}
|
|
}
|
|
|
|
// reportServer reports usage statistics about the system.
|
|
func (s *Server) reportServer() {
|
|
dbs := s.MetaClient.Databases()
|
|
numDatabases := len(dbs)
|
|
|
|
var (
|
|
numMeasurements int64
|
|
numSeries int64
|
|
)
|
|
|
|
for _, db := range dbs {
|
|
name := db.Name
|
|
// Use the context.Background() to avoid timing out on this.
|
|
n, err := s.TSDBStore.SeriesCardinality(context.Background(), name)
|
|
if err != nil {
|
|
s.Logger.Error(fmt.Sprintf("Unable to get series cardinality for database %s: %v", name, err))
|
|
} else {
|
|
numSeries += n
|
|
}
|
|
|
|
// Use the context.Background() to avoid timing out on this.
|
|
n, err = s.TSDBStore.MeasurementsCardinality(context.Background(), name)
|
|
if err != nil {
|
|
s.Logger.Error(fmt.Sprintf("Unable to get measurement cardinality for database %s: %v", name, err))
|
|
} else {
|
|
numMeasurements += n
|
|
}
|
|
}
|
|
|
|
clusterID := s.MetaClient.ClusterID()
|
|
cl := client.New("")
|
|
usage := client.Usage{
|
|
Product: "influxdb",
|
|
Data: []client.UsageData{
|
|
{
|
|
Values: client.Values{
|
|
"os": runtime.GOOS,
|
|
"arch": runtime.GOARCH,
|
|
"version": s.buildInfo.Version,
|
|
"cluster_id": fmt.Sprintf("%v", clusterID),
|
|
"num_series": numSeries,
|
|
"num_measurements": numMeasurements,
|
|
"num_databases": numDatabases,
|
|
"uptime": time.Since(startTime).Seconds(),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
s.Logger.Info("Sending usage statistics to usage.influxdata.com")
|
|
|
|
go cl.Save(usage)
|
|
}
|
|
|
|
// Service represents a service attached to the server.
|
|
type Service interface {
|
|
WithLogger(log *zap.Logger)
|
|
Open() error
|
|
Close() error
|
|
}
|
|
|
|
// prof stores the file locations of active profiles.
|
|
// StartProfile initializes the cpu and memory profile, if specified.
|
|
func (s *Server) startProfile() error {
|
|
if s.CPUProfile != "" {
|
|
f, err := os.Create(s.CPUProfile)
|
|
if err != nil {
|
|
return fmt.Errorf("cpuprofile: %v", err)
|
|
}
|
|
|
|
s.CPUProfileWriteCloser = f
|
|
if err := pprof.StartCPUProfile(s.CPUProfileWriteCloser); err != nil {
|
|
return err
|
|
}
|
|
|
|
s.Logger.Info("writing CPU profile", zap.String("location", s.CPUProfile))
|
|
}
|
|
|
|
if s.MemProfile != "" {
|
|
f, err := os.Create(s.MemProfile)
|
|
if err != nil {
|
|
return fmt.Errorf("memprofile: %v", err)
|
|
}
|
|
|
|
s.MemProfileWriteCloser = f
|
|
runtime.MemProfileRate = 4096
|
|
|
|
s.Logger.Info("writing mem profile", zap.String("location", s.MemProfile))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// StopProfile closes the cpu and memory profiles if they are running.
|
|
func (s *Server) stopProfile() error {
|
|
if s.CPUProfileWriteCloser != nil {
|
|
pprof.StopCPUProfile()
|
|
if err := s.CPUProfileWriteCloser.Close(); err != nil {
|
|
return err
|
|
}
|
|
s.Logger.Info("CPU profile stopped")
|
|
}
|
|
|
|
if s.MemProfileWriteCloser != nil {
|
|
pprof.Lookup("heap").WriteTo(s.MemProfileWriteCloser, 0)
|
|
if err := s.MemProfileWriteCloser.Close(); err != nil {
|
|
return err
|
|
}
|
|
s.Logger.Info("mem profile stopped")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) LogQueriesOnTermination() bool {
|
|
if s != nil && s.config != nil {
|
|
return s.config.Coordinator.TerminationQueryLog
|
|
} else {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// monitorPointsWriter is a wrapper around `coordinator.PointsWriter` that helps
|
|
// to prevent a circular dependency between the `cluster` and `monitor` packages.
|
|
type monitorPointsWriter coordinator.PointsWriter
|
|
|
|
func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
|
|
writeCtx := tsdb.WriteContext{
|
|
UserId: tsdb.MonitorUser,
|
|
}
|
|
return (*coordinator.PointsWriter)(pw).WritePointsPrivileged(writeCtx, database, retentionPolicy, models.ConsistencyLevelAny, points)
|
|
}
|
|
|
|
func raftDBExists(dir string) error {
|
|
// Check to see if there is a raft db, if so, error out with a message
|
|
// to downgrade, export, and then import the meta data
|
|
raftFile := filepath.Join(dir, "raft.db")
|
|
if _, err := os.Stat(raftFile); err == nil {
|
|
return fmt.Errorf("detected %s. To proceed, you'll need to either 1) downgrade to v0.11.x, export your metadata, upgrade to the current version again, and then import the metadata or 2) delete the file, which will effectively reset your database. For more assistance with the upgrade, see: https://docs.influxdata.com/influxdb/v0.12/administration/upgrading/", raftFile)
|
|
}
|
|
return nil
|
|
}
|