2015-05-28 21:47:47 +00:00
|
|
|
package run
|
2014-12-06 01:02:30 +00:00
|
|
|
|
|
|
|
import (
|
2015-02-12 00:10:18 +00:00
|
|
|
"fmt"
|
2015-06-11 04:50:20 +00:00
|
|
|
"log"
|
2015-06-05 22:25:21 +00:00
|
|
|
"net"
|
2015-06-11 19:12:14 +00:00
|
|
|
"os"
|
2016-02-02 19:10:29 +00:00
|
|
|
"path/filepath"
|
2015-06-11 04:50:20 +00:00
|
|
|
"runtime"
|
2015-06-11 19:12:14 +00:00
|
|
|
"runtime/pprof"
|
2015-06-01 17:20:57 +00:00
|
|
|
"time"
|
2015-05-29 19:50:05 +00:00
|
|
|
|
2016-02-10 17:26:18 +00:00
|
|
|
"github.com/influxdata/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/cluster"
|
2016-02-12 22:10:02 +00:00
|
|
|
"github.com/influxdata/influxdb/models"
|
2016-02-10 17:26:18 +00:00
|
|
|
"github.com/influxdata/influxdb/monitor"
|
|
|
|
"github.com/influxdata/influxdb/services/admin"
|
|
|
|
"github.com/influxdata/influxdb/services/collectd"
|
|
|
|
"github.com/influxdata/influxdb/services/continuous_querier"
|
|
|
|
"github.com/influxdata/influxdb/services/copier"
|
|
|
|
"github.com/influxdata/influxdb/services/graphite"
|
|
|
|
"github.com/influxdata/influxdb/services/hh"
|
|
|
|
"github.com/influxdata/influxdb/services/httpd"
|
|
|
|
"github.com/influxdata/influxdb/services/meta"
|
|
|
|
"github.com/influxdata/influxdb/services/opentsdb"
|
|
|
|
"github.com/influxdata/influxdb/services/precreator"
|
|
|
|
"github.com/influxdata/influxdb/services/retention"
|
|
|
|
"github.com/influxdata/influxdb/services/snapshotter"
|
|
|
|
"github.com/influxdata/influxdb/services/subscriber"
|
|
|
|
"github.com/influxdata/influxdb/services/udp"
|
|
|
|
"github.com/influxdata/influxdb/tcp"
|
|
|
|
"github.com/influxdata/influxdb/tsdb"
|
2016-02-10 20:50:12 +00:00
|
|
|
client "github.com/influxdata/usage-client/v1"
|
2015-11-07 07:30:27 +00:00
|
|
|
// Initialize the engine packages
|
2016-02-10 17:26:18 +00:00
|
|
|
_ "github.com/influxdata/influxdb/tsdb/engine"
|
2014-12-06 01:02:30 +00:00
|
|
|
)
|
|
|
|
|
2016-03-09 18:17:12 +00:00
|
|
|
var startTime time.Time
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
startTime = time.Now().UTC()
|
|
|
|
}
|
|
|
|
|
2015-09-09 19:36:32 +00:00
|
|
|
// BuildInfo represents the build details for the server code.
|
|
|
|
type BuildInfo struct {
|
|
|
|
Version string
|
|
|
|
Commit string
|
|
|
|
Branch string
|
2015-09-25 06:32:47 +00:00
|
|
|
Time string
|
2015-09-09 19:36:32 +00:00
|
|
|
}
|
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
// Server represents a container for the metadata and storage data and services.
|
|
|
|
// It is built using a Config and it manages the startup and shutdown of all
|
|
|
|
// services in the proper order.
|
2015-05-28 21:47:47 +00:00
|
|
|
type Server struct {
|
2015-09-09 19:36:32 +00:00
|
|
|
buildInfo BuildInfo
|
2015-06-11 04:50:20 +00:00
|
|
|
|
2015-06-05 20:40:18 +00:00
|
|
|
err chan error
|
|
|
|
closing chan struct{}
|
|
|
|
|
2015-06-05 22:25:21 +00:00
|
|
|
BindAddress string
|
|
|
|
Listener net.Listener
|
|
|
|
|
2015-12-23 17:46:22 +00:00
|
|
|
Node *influxdb.Node
|
|
|
|
|
|
|
|
MetaClient *meta.Client
|
|
|
|
MetaService *meta.Service
|
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
TSDBStore *tsdb.Store
|
|
|
|
QueryExecutor *cluster.QueryExecutor
|
|
|
|
PointsWriter *cluster.PointsWriter
|
|
|
|
ShardWriter *cluster.ShardWriter
|
|
|
|
HintedHandoff *hh.Service
|
|
|
|
Subscriber *subscriber.Service
|
2015-05-28 21:47:47 +00:00
|
|
|
|
|
|
|
Services []Service
|
2015-06-08 19:07:05 +00:00
|
|
|
|
|
|
|
// These references are required for the tcp muxer.
|
|
|
|
ClusterService *cluster.Service
|
|
|
|
SnapshotterService *snapshotter.Service
|
2015-09-03 16:48:37 +00:00
|
|
|
CopierService *copier.Service
|
2015-06-11 04:50:20 +00:00
|
|
|
|
2015-09-02 22:07:30 +00:00
|
|
|
Monitor *monitor.Monitor
|
2015-09-01 03:28:24 +00:00
|
|
|
|
2015-10-15 20:04:21 +00:00
|
|
|
// Server reporting and registration
|
2015-06-11 04:50:20 +00:00
|
|
|
reportingDisabled bool
|
2015-06-11 19:12:14 +00:00
|
|
|
|
|
|
|
// Profiling
|
|
|
|
CPUProfile string
|
|
|
|
MemProfile string
|
2016-01-06 22:34:34 +00:00
|
|
|
|
|
|
|
// joinPeers are the metaservers specified at run time to join this server to
|
|
|
|
joinPeers []string
|
|
|
|
|
|
|
|
// metaUseTLS specifies if we should use a TLS connection to the meta servers
|
|
|
|
metaUseTLS bool
|
|
|
|
|
|
|
|
// httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data
|
|
|
|
httpAPIAddr string
|
|
|
|
|
|
|
|
// httpUseTLS specifies if we should use a TLS connection to the http servers
|
|
|
|
httpUseTLS bool
|
|
|
|
|
|
|
|
// tcpAddr is the host:port combination for the TCP listener that services mux onto
|
|
|
|
tcpAddr string
|
2016-01-07 20:29:24 +00:00
|
|
|
|
|
|
|
config *Config
|
2015-03-25 16:49:05 +00:00
|
|
|
}
|
|
|
|
|
2015-05-28 21:47:47 +00:00
|
|
|
// NewServer returns a new instance of Server built from a config.
|
2015-09-09 19:36:32 +00:00
|
|
|
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
|
2016-02-02 19:10:29 +00:00
|
|
|
// We need to ensure that a meta directory always exists even if
|
|
|
|
// we don't start the meta store. node.json is always stored under
|
|
|
|
// the meta directory.
|
|
|
|
if err := os.MkdirAll(c.Meta.Dir, 0777); err != nil {
|
|
|
|
return nil, fmt.Errorf("mkdir all: %s", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// 0.10-rc1 and prior would sometimes put the node.json at the root
|
|
|
|
// dir which breaks backup/restore and restarting nodes. This moves
|
|
|
|
// the file from the root so it's always under the meta dir.
|
|
|
|
oldPath := filepath.Join(filepath.Dir(c.Meta.Dir), "node.json")
|
|
|
|
newPath := filepath.Join(c.Meta.Dir, "node.json")
|
|
|
|
|
|
|
|
if _, err := os.Stat(oldPath); err == nil {
|
|
|
|
if err := os.Rename(oldPath, newPath); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-01-06 22:34:34 +00:00
|
|
|
}
|
2016-01-23 00:02:54 +00:00
|
|
|
|
2016-02-15 20:50:21 +00:00
|
|
|
node, err := influxdb.LoadNode(c.Meta.Dir)
|
2015-12-30 13:15:00 +00:00
|
|
|
if err != nil {
|
2016-01-25 23:13:06 +00:00
|
|
|
if !os.IsNotExist(err) {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-02-24 16:39:27 +00:00
|
|
|
|
|
|
|
node = influxdb.NewNode(c.Meta.Dir)
|
2015-12-30 13:15:00 +00:00
|
|
|
}
|
|
|
|
|
2016-01-06 22:34:34 +00:00
|
|
|
// In 0.10.0 bind-address got moved to the top level. Check
|
|
|
|
// The old location to keep things backwards compatible
|
|
|
|
bind := c.BindAddress
|
2016-01-23 00:02:54 +00:00
|
|
|
if c.Meta.BindAddress != "" {
|
2016-01-06 22:34:34 +00:00
|
|
|
bind = c.Meta.BindAddress
|
|
|
|
}
|
|
|
|
|
|
|
|
if !c.Data.Enabled && !c.Meta.Enabled {
|
|
|
|
return nil, fmt.Errorf("must run as either meta node or data node or both")
|
|
|
|
}
|
|
|
|
|
2015-05-28 21:47:47 +00:00
|
|
|
s := &Server{
|
2015-09-09 19:36:32 +00:00
|
|
|
buildInfo: *buildInfo,
|
|
|
|
err: make(chan error),
|
|
|
|
closing: make(chan struct{}),
|
2015-06-05 22:25:21 +00:00
|
|
|
|
2016-01-06 22:34:34 +00:00
|
|
|
BindAddress: bind,
|
2015-06-05 22:25:21 +00:00
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
Node: node,
|
|
|
|
MetaClient: meta.NewClient(),
|
2015-06-11 04:50:20 +00:00
|
|
|
|
2015-09-02 22:45:11 +00:00
|
|
|
Monitor: monitor.New(c.Monitor),
|
2015-05-28 21:47:47 +00:00
|
|
|
|
2015-09-02 22:45:11 +00:00
|
|
|
reportingDisabled: c.ReportingDisabled,
|
2016-01-06 22:34:34 +00:00
|
|
|
joinPeers: c.Meta.JoinPeers,
|
|
|
|
metaUseTLS: c.Meta.HTTPSEnabled,
|
|
|
|
|
2016-02-17 03:16:19 +00:00
|
|
|
httpAPIAddr: c.HTTPD.BindAddress,
|
2016-01-06 22:34:34 +00:00
|
|
|
httpUseTLS: c.HTTPD.HTTPSEnabled,
|
2016-02-17 03:16:19 +00:00
|
|
|
tcpAddr: bind,
|
2016-01-07 20:29:24 +00:00
|
|
|
|
|
|
|
config: c,
|
2015-09-01 03:28:24 +00:00
|
|
|
}
|
|
|
|
|
2015-12-23 17:46:22 +00:00
|
|
|
if c.Meta.Enabled {
|
|
|
|
s.MetaService = meta.NewService(c.Meta)
|
2016-02-18 20:26:50 +00:00
|
|
|
s.MetaService.Version = s.buildInfo.Version
|
2016-02-19 21:16:35 +00:00
|
|
|
s.MetaService.Node = s.Node
|
2015-08-10 23:21:27 +00:00
|
|
|
}
|
2015-12-23 17:46:22 +00:00
|
|
|
|
|
|
|
if c.Data.Enabled {
|
|
|
|
s.TSDBStore = tsdb.NewStore(c.Data.Dir)
|
|
|
|
s.TSDBStore.EngineOptions.Config = c.Data
|
|
|
|
|
|
|
|
// Copy TSDB configuration.
|
|
|
|
s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
|
|
|
|
s.TSDBStore.EngineOptions.MaxWALSize = c.Data.MaxWALSize
|
|
|
|
s.TSDBStore.EngineOptions.WALFlushInterval = time.Duration(c.Data.WALFlushInterval)
|
|
|
|
s.TSDBStore.EngineOptions.WALPartitionFlushDelay = time.Duration(c.Data.WALPartitionFlushDelay)
|
|
|
|
|
|
|
|
// Set the shard writer
|
2016-01-26 22:53:50 +00:00
|
|
|
s.ShardWriter = cluster.NewShardWriter(time.Duration(c.Cluster.ShardWriterTimeout),
|
|
|
|
c.Cluster.MaxRemoteWriteConnections)
|
2015-12-23 17:46:22 +00:00
|
|
|
|
|
|
|
// Create the hinted handoff service
|
|
|
|
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaClient)
|
|
|
|
s.HintedHandoff.Monitor = s.Monitor
|
|
|
|
|
|
|
|
// Create the Subscriber service
|
|
|
|
s.Subscriber = subscriber.NewService(c.Subscriber)
|
|
|
|
|
|
|
|
// Initialize points writer.
|
|
|
|
s.PointsWriter = cluster.NewPointsWriter()
|
|
|
|
s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout)
|
|
|
|
s.PointsWriter.TSDBStore = s.TSDBStore
|
|
|
|
s.PointsWriter.ShardWriter = s.ShardWriter
|
|
|
|
s.PointsWriter.HintedHandoff = s.HintedHandoff
|
|
|
|
s.PointsWriter.Subscriber = s.Subscriber
|
|
|
|
s.PointsWriter.Node = s.Node
|
|
|
|
|
2016-02-19 05:53:44 +00:00
|
|
|
// Initialize meta executor.
|
|
|
|
metaExecutor := cluster.NewMetaExecutor()
|
|
|
|
metaExecutor.MetaClient = s.MetaClient
|
2016-02-19 14:00:17 +00:00
|
|
|
metaExecutor.Node = s.Node
|
2016-02-19 05:53:44 +00:00
|
|
|
|
2016-02-12 22:10:02 +00:00
|
|
|
// Initialize query executor.
|
|
|
|
s.QueryExecutor = cluster.NewQueryExecutor()
|
|
|
|
s.QueryExecutor.MetaClient = s.MetaClient
|
|
|
|
s.QueryExecutor.TSDBStore = s.TSDBStore
|
|
|
|
s.QueryExecutor.Monitor = s.Monitor
|
|
|
|
s.QueryExecutor.PointsWriter = s.PointsWriter
|
2016-02-19 05:53:44 +00:00
|
|
|
s.QueryExecutor.MetaExecutor = metaExecutor
|
2016-02-12 22:10:02 +00:00
|
|
|
if c.Data.QueryLogEnabled {
|
|
|
|
s.QueryExecutor.LogOutput = os.Stderr
|
|
|
|
}
|
2015-12-23 17:46:22 +00:00
|
|
|
|
|
|
|
// Initialize the monitor
|
|
|
|
s.Monitor.Version = s.buildInfo.Version
|
|
|
|
s.Monitor.Commit = s.buildInfo.Commit
|
|
|
|
s.Monitor.Branch = s.buildInfo.Branch
|
|
|
|
s.Monitor.BuildTime = s.buildInfo.Time
|
2016-02-12 22:10:02 +00:00
|
|
|
s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
|
|
|
|
2015-06-09 20:28:40 +00:00
|
|
|
return s, nil
|
2015-05-30 16:11:23 +00:00
|
|
|
}
|
2015-05-29 20:12:00 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
func (s *Server) appendClusterService(c cluster.Config) {
|
|
|
|
srv := cluster.NewService(c)
|
2015-06-02 20:45:52 +00:00
|
|
|
srv.TSDBStore = s.TSDBStore
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-05-30 16:11:23 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
2015-06-08 19:07:05 +00:00
|
|
|
s.ClusterService = srv
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) appendSnapshotterService() {
|
|
|
|
srv := snapshotter.NewService()
|
|
|
|
srv.TSDBStore = s.TSDBStore
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2016-01-28 00:39:54 +00:00
|
|
|
srv.Node = s.Node
|
2015-06-08 19:07:05 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
s.SnapshotterService = srv
|
2015-05-30 16:11:23 +00:00
|
|
|
}
|
2015-05-28 21:47:47 +00:00
|
|
|
|
2015-09-03 16:48:37 +00:00
|
|
|
func (s *Server) appendCopierService() {
|
|
|
|
srv := copier.NewService()
|
|
|
|
srv.TSDBStore = s.TSDBStore
|
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
s.CopierService = srv
|
|
|
|
}
|
|
|
|
|
2015-06-04 21:31:23 +00:00
|
|
|
func (s *Server) appendRetentionPolicyService(c retention.Config) {
|
2015-06-05 04:44:00 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return
|
|
|
|
}
|
2015-06-04 21:31:23 +00:00
|
|
|
srv := retention.NewService(c)
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-06-04 21:31:23 +00:00
|
|
|
srv.TSDBStore = s.TSDBStore
|
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
}
|
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
func (s *Server) appendAdminService(c admin.Config) {
|
2015-06-05 20:40:18 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return
|
|
|
|
}
|
2016-02-20 02:36:47 +00:00
|
|
|
c.Version = s.buildInfo.Version
|
2015-05-30 16:11:23 +00:00
|
|
|
srv := admin.NewService(c)
|
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
}
|
2015-05-29 20:12:00 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
func (s *Server) appendHTTPDService(c httpd.Config) {
|
2015-06-04 15:24:08 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return
|
|
|
|
}
|
2015-05-30 16:11:23 +00:00
|
|
|
srv := httpd.NewService(c)
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.Handler.MetaClient = s.MetaClient
|
2016-02-22 23:02:25 +00:00
|
|
|
srv.Handler.QueryAuthorizer = meta.NewQueryAuthorizer(s.MetaClient)
|
2015-06-01 17:20:57 +00:00
|
|
|
srv.Handler.QueryExecutor = s.QueryExecutor
|
|
|
|
srv.Handler.PointsWriter = s.PointsWriter
|
2015-09-09 19:36:32 +00:00
|
|
|
srv.Handler.Version = s.buildInfo.Version
|
2015-06-05 23:19:44 +00:00
|
|
|
|
|
|
|
// If a ContinuousQuerier service has been started, attach it.
|
|
|
|
for _, srvc := range s.Services {
|
|
|
|
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
|
|
|
|
srv.Handler.ContinuousQuerier = cqsrvc
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
}
|
2015-05-28 21:47:47 +00:00
|
|
|
|
2015-05-30 16:11:23 +00:00
|
|
|
func (s *Server) appendCollectdService(c collectd.Config) {
|
2015-06-04 15:24:08 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return
|
|
|
|
}
|
2015-05-30 16:11:23 +00:00
|
|
|
srv := collectd.NewService(c)
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-06-09 02:44:42 +00:00
|
|
|
srv.PointsWriter = s.PointsWriter
|
2015-05-30 16:11:23 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
}
|
|
|
|
|
2015-06-09 22:10:32 +00:00
|
|
|
func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
|
2015-06-04 15:24:08 +00:00
|
|
|
if !c.Enabled {
|
2015-06-09 22:10:32 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
srv, err := opentsdb.NewService(c)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2015-06-04 15:24:08 +00:00
|
|
|
}
|
2015-06-08 16:20:54 +00:00
|
|
|
srv.PointsWriter = s.PointsWriter
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-05-30 16:11:23 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
2015-06-09 22:10:32 +00:00
|
|
|
return nil
|
2015-05-30 16:11:23 +00:00
|
|
|
}
|
|
|
|
|
2015-06-09 20:28:40 +00:00
|
|
|
func (s *Server) appendGraphiteService(c graphite.Config) error {
|
2015-06-04 15:24:08 +00:00
|
|
|
if !c.Enabled {
|
2015-06-09 20:28:40 +00:00
|
|
|
return nil
|
2015-06-04 15:24:08 +00:00
|
|
|
}
|
2015-06-09 20:28:40 +00:00
|
|
|
srv, err := graphite.NewService(c)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-06-08 22:35:38 +00:00
|
|
|
srv.PointsWriter = s.PointsWriter
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-09-02 22:07:30 +00:00
|
|
|
srv.Monitor = s.Monitor
|
2015-05-30 16:11:23 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
2015-06-09 20:28:40 +00:00
|
|
|
return nil
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
|
|
|
|
2015-06-10 21:53:12 +00:00
|
|
|
func (s *Server) appendPrecreatorService(c precreator.Config) error {
|
2015-06-10 20:05:48 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return nil
|
|
|
|
}
|
2015-06-10 21:53:12 +00:00
|
|
|
srv, err := precreator.NewService(c)
|
2015-06-10 20:05:48 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-06-10 20:05:48 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-06-03 13:06:36 +00:00
|
|
|
func (s *Server) appendUDPService(c udp.Config) {
|
2015-06-04 15:24:08 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return
|
|
|
|
}
|
2015-06-03 13:06:36 +00:00
|
|
|
srv := udp.NewService(c)
|
2015-06-04 23:07:34 +00:00
|
|
|
srv.PointsWriter = s.PointsWriter
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-06-08 08:45:47 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
2015-06-03 22:39:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
|
2015-06-04 22:53:00 +00:00
|
|
|
if !c.Enabled {
|
|
|
|
return
|
|
|
|
}
|
2015-06-03 22:39:37 +00:00
|
|
|
srv := continuous_querier.NewService(c)
|
2015-12-23 15:48:25 +00:00
|
|
|
srv.MetaClient = s.MetaClient
|
2015-06-03 22:39:37 +00:00
|
|
|
srv.QueryExecutor = s.QueryExecutor
|
2015-06-03 13:06:36 +00:00
|
|
|
s.Services = append(s.Services, srv)
|
|
|
|
}
|
|
|
|
|
2015-06-05 20:40:18 +00:00
|
|
|
// Err returns an error channel that multiplexes all out of band errors received from all services.
|
|
|
|
func (s *Server) Err() <-chan error { return s.err }
|
|
|
|
|
2015-05-28 21:47:47 +00:00
|
|
|
// Open opens the meta and data store and all services.
|
|
|
|
func (s *Server) Open() error {
|
2016-02-02 16:32:42 +00:00
|
|
|
// Start profiling, if set.
|
|
|
|
startProfile(s.CPUProfile, s.MemProfile)
|
2015-06-11 19:12:14 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Open shared TCP connection.
|
|
|
|
ln, err := net.Listen("tcp", s.BindAddress)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("listen: %s", err)
|
|
|
|
}
|
|
|
|
s.Listener = ln
|
2015-06-05 22:25:21 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Multiplex listener.
|
|
|
|
mux := tcp.NewMux()
|
|
|
|
go mux.Serve(ln)
|
2015-05-28 21:47:47 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
if s.MetaService != nil {
|
|
|
|
s.MetaService.RaftListener = mux.Listen(meta.MuxHeader)
|
|
|
|
// Open meta service.
|
|
|
|
if err := s.MetaService.Open(); err != nil {
|
|
|
|
return fmt.Errorf("open meta service: %s", err)
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
2016-02-02 16:32:42 +00:00
|
|
|
go s.monitorErrorChan(s.MetaService.Err())
|
|
|
|
}
|
2015-05-28 21:47:47 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// initialize MetaClient.
|
|
|
|
if err = s.initializeMetaClient(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-01-07 20:29:24 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
if s.TSDBStore != nil {
|
|
|
|
// Append services.
|
|
|
|
s.appendClusterService(s.config.Cluster)
|
|
|
|
s.appendPrecreatorService(s.config.Precreator)
|
|
|
|
s.appendSnapshotterService()
|
|
|
|
s.appendCopierService()
|
|
|
|
s.appendAdminService(s.config.Admin)
|
|
|
|
s.appendContinuousQueryService(s.config.ContinuousQuery)
|
|
|
|
s.appendHTTPDService(s.config.HTTPD)
|
|
|
|
s.appendCollectdService(s.config.Collectd)
|
|
|
|
if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for _, g := range s.config.UDPs {
|
|
|
|
s.appendUDPService(g)
|
|
|
|
}
|
|
|
|
s.appendRetentionPolicyService(s.config.Retention)
|
|
|
|
for _, g := range s.config.Graphites {
|
|
|
|
if err := s.appendGraphiteService(g); err != nil {
|
2016-01-07 20:29:24 +00:00
|
|
|
return err
|
|
|
|
}
|
2016-02-02 16:32:42 +00:00
|
|
|
}
|
2015-10-08 16:45:23 +00:00
|
|
|
|
2016-02-19 20:38:02 +00:00
|
|
|
s.QueryExecutor.Node = s.Node
|
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
s.Subscriber.MetaClient = s.MetaClient
|
|
|
|
s.ShardWriter.MetaClient = s.MetaClient
|
|
|
|
s.HintedHandoff.MetaClient = s.MetaClient
|
|
|
|
s.Subscriber.MetaClient = s.MetaClient
|
|
|
|
s.PointsWriter.MetaClient = s.MetaClient
|
|
|
|
s.Monitor.MetaClient = s.MetaClient
|
|
|
|
|
|
|
|
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
|
|
|
|
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
|
|
|
|
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
|
|
|
|
|
|
|
|
// Open TSDB store.
|
|
|
|
if err := s.TSDBStore.Open(); err != nil {
|
2016-02-24 13:34:19 +00:00
|
|
|
// Provide helpful error if user needs to upgrade shards to
|
|
|
|
// tsm1.
|
|
|
|
if serr, ok := err.(tsdb.ShardError); ok && serr.Err == tsdb.ErrUnknownEngineFormat {
|
|
|
|
return influxdb.ErrUpgradeEngine
|
|
|
|
}
|
2016-02-02 16:32:42 +00:00
|
|
|
return fmt.Errorf("open tsdb store: %s", err)
|
|
|
|
}
|
2015-11-05 22:27:00 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Open the hinted handoff service
|
|
|
|
if err := s.HintedHandoff.Open(); err != nil {
|
|
|
|
return fmt.Errorf("open hinted handoff: %s", err)
|
|
|
|
}
|
2015-11-05 22:27:00 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Open the subcriber service
|
|
|
|
if err := s.Subscriber.Open(); err != nil {
|
|
|
|
return fmt.Errorf("open subscriber: %s", err)
|
|
|
|
}
|
2016-01-06 22:34:34 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Open the points writer service
|
|
|
|
if err := s.PointsWriter.Open(); err != nil {
|
|
|
|
return fmt.Errorf("open points writer: %s", err)
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Open the monitor service
|
|
|
|
if err := s.Monitor.Open(); err != nil {
|
|
|
|
return fmt.Errorf("open monitor: %v", err)
|
2015-06-11 07:03:10 +00:00
|
|
|
}
|
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
for _, service := range s.Services {
|
|
|
|
if err := service.Open(); err != nil {
|
|
|
|
return fmt.Errorf("open service: %s", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2015-05-28 21:47:47 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// Start the reporting service, if not disabled.
|
|
|
|
if !s.reportingDisabled {
|
|
|
|
go s.startServerReporting()
|
2015-03-26 21:54:10 +00:00
|
|
|
}
|
2015-05-28 21:47:47 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close shuts down the meta and data stores and all services.
|
|
|
|
func (s *Server) Close() error {
|
2015-06-12 14:10:46 +00:00
|
|
|
stopProfile()
|
2015-06-11 19:12:14 +00:00
|
|
|
|
2015-09-04 22:29:59 +00:00
|
|
|
// Close the listener first to stop any new connections
|
2015-06-05 22:25:21 +00:00
|
|
|
if s.Listener != nil {
|
|
|
|
s.Listener.Close()
|
|
|
|
}
|
2015-09-04 22:29:59 +00:00
|
|
|
|
2015-09-08 17:05:39 +00:00
|
|
|
// Close services to allow any inflight requests to complete
|
|
|
|
// and prevent new requests from being accepted.
|
2015-09-04 22:29:59 +00:00
|
|
|
for _, service := range s.Services {
|
|
|
|
service.Close()
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
2015-09-04 22:29:59 +00:00
|
|
|
|
|
|
|
if s.Monitor != nil {
|
|
|
|
s.Monitor.Close()
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
2015-09-04 22:29:59 +00:00
|
|
|
|
2015-11-05 22:27:00 +00:00
|
|
|
if s.PointsWriter != nil {
|
|
|
|
s.PointsWriter.Close()
|
|
|
|
}
|
|
|
|
|
2015-06-03 03:31:04 +00:00
|
|
|
if s.HintedHandoff != nil {
|
|
|
|
s.HintedHandoff.Close()
|
|
|
|
}
|
2015-09-04 22:29:59 +00:00
|
|
|
|
|
|
|
// Close the TSDBStore, no more reads or writes at this point
|
|
|
|
if s.TSDBStore != nil {
|
|
|
|
s.TSDBStore.Close()
|
2015-09-02 22:45:11 +00:00
|
|
|
}
|
2015-09-04 22:29:59 +00:00
|
|
|
|
2015-10-08 16:45:23 +00:00
|
|
|
if s.Subscriber != nil {
|
|
|
|
s.Subscriber.Close()
|
|
|
|
}
|
|
|
|
|
2015-09-04 22:29:59 +00:00
|
|
|
// Finally close the meta-store since everything else depends on it
|
2015-12-23 15:48:25 +00:00
|
|
|
if s.MetaService != nil {
|
|
|
|
s.MetaService.Close()
|
2015-05-28 21:47:47 +00:00
|
|
|
}
|
2015-07-15 17:14:11 +00:00
|
|
|
|
2016-01-06 22:34:34 +00:00
|
|
|
if s.MetaClient != nil {
|
|
|
|
s.MetaClient.Close()
|
|
|
|
}
|
|
|
|
|
2015-06-05 20:40:18 +00:00
|
|
|
close(s.closing)
|
2015-05-28 21:47:47 +00:00
|
|
|
return nil
|
2015-03-26 21:54:10 +00:00
|
|
|
}
|
|
|
|
|
2015-06-11 04:50:20 +00:00
|
|
|
// startServerReporting starts periodic server reporting.
|
|
|
|
func (s *Server) startServerReporting() {
|
|
|
|
for {
|
2015-07-27 21:24:49 +00:00
|
|
|
select {
|
|
|
|
case <-s.closing:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
2015-06-11 04:50:20 +00:00
|
|
|
s.reportServer()
|
|
|
|
<-time.After(24 * time.Hour)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// reportServer reports anonymous statistics about the system.
|
|
|
|
func (s *Server) reportServer() {
|
2015-12-23 15:48:25 +00:00
|
|
|
dis, err := s.MetaClient.Databases()
|
2015-06-11 04:50:20 +00:00
|
|
|
if err != nil {
|
2015-06-11 15:51:48 +00:00
|
|
|
log.Printf("failed to retrieve databases for reporting: %s", err.Error())
|
2015-06-11 04:50:20 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
numDatabases := len(dis)
|
|
|
|
|
|
|
|
numMeasurements := 0
|
|
|
|
numSeries := 0
|
2016-02-02 16:32:42 +00:00
|
|
|
|
|
|
|
// Only needed in the case of a data node
|
|
|
|
if s.TSDBStore != nil {
|
|
|
|
for _, di := range dis {
|
|
|
|
d := s.TSDBStore.DatabaseIndex(di.Name)
|
|
|
|
if d == nil {
|
|
|
|
// No data in this store for this database.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
m, s := d.MeasurementSeriesCounts()
|
|
|
|
numMeasurements += m
|
|
|
|
numSeries += s
|
2015-06-11 04:50:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-01-08 19:25:07 +00:00
|
|
|
clusterID := s.MetaClient.ClusterID()
|
2015-06-11 04:50:20 +00:00
|
|
|
if err != nil {
|
2015-06-11 15:51:48 +00:00
|
|
|
log.Printf("failed to retrieve cluster ID for reporting: %s", err.Error())
|
2015-06-11 04:50:20 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2015-11-06 15:30:57 +00:00
|
|
|
cl := client.New("")
|
|
|
|
usage := client.Usage{
|
|
|
|
Product: "influxdb",
|
|
|
|
Data: []client.UsageData{
|
|
|
|
{
|
|
|
|
Values: client.Values{
|
|
|
|
"os": runtime.GOOS,
|
|
|
|
"arch": runtime.GOARCH,
|
|
|
|
"version": s.buildInfo.Version,
|
2016-01-06 22:34:34 +00:00
|
|
|
"server_id": fmt.Sprintf("%v", s.Node.ID),
|
2015-11-20 20:43:35 +00:00
|
|
|
"cluster_id": fmt.Sprintf("%v", clusterID),
|
2015-11-06 15:30:57 +00:00
|
|
|
"num_series": numSeries,
|
|
|
|
"num_measurements": numMeasurements,
|
|
|
|
"num_databases": numDatabases,
|
2016-03-09 18:17:12 +00:00
|
|
|
"uptime": time.Since(startTime).Seconds(),
|
2015-11-06 15:30:57 +00:00
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
2015-06-11 04:50:20 +00:00
|
|
|
|
|
|
|
log.Printf("Sending anonymous usage statistics to m.influxdb.com")
|
|
|
|
|
2015-11-06 15:30:57 +00:00
|
|
|
go cl.Save(usage)
|
2015-06-11 04:50:20 +00:00
|
|
|
}
|
|
|
|
|
2015-06-05 20:40:18 +00:00
|
|
|
// monitorErrorChan reads an error channel and resends it through the server.
|
|
|
|
func (s *Server) monitorErrorChan(ch <-chan error) {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case err, ok := <-ch:
|
|
|
|
if !ok {
|
|
|
|
return
|
2015-04-03 19:49:54 +00:00
|
|
|
}
|
2015-06-05 20:40:18 +00:00
|
|
|
s.err <- err
|
|
|
|
case <-s.closing:
|
2015-04-14 19:43:25 +00:00
|
|
|
return
|
2014-12-31 19:42:53 +00:00
|
|
|
}
|
2014-12-16 06:05:01 +00:00
|
|
|
}
|
2014-12-31 19:42:53 +00:00
|
|
|
}
|
2014-12-06 01:02:30 +00:00
|
|
|
|
2016-02-02 16:32:42 +00:00
|
|
|
// initializeMetaClient will set the MetaClient and join the node to the cluster if needed
|
|
|
|
func (s *Server) initializeMetaClient() error {
|
2016-01-06 22:34:34 +00:00
|
|
|
// It's the first time starting up and we need to either join
|
|
|
|
// the cluster or initialize this node as the first member
|
|
|
|
if len(s.joinPeers) == 0 {
|
|
|
|
// start up a new single node cluster
|
|
|
|
if s.MetaService == nil {
|
|
|
|
return fmt.Errorf("server not set to join existing cluster must run also as a meta node")
|
2015-08-07 16:27:45 +00:00
|
|
|
}
|
2016-02-12 22:10:02 +00:00
|
|
|
s.MetaClient.SetMetaServers([]string{s.MetaService.HTTPAddr()})
|
|
|
|
s.MetaClient.SetTLS(s.metaUseTLS)
|
2016-01-06 22:34:34 +00:00
|
|
|
} else {
|
2016-02-02 16:32:42 +00:00
|
|
|
// join this node to the cluster
|
2016-02-12 22:10:02 +00:00
|
|
|
s.MetaClient.SetMetaServers(s.joinPeers)
|
|
|
|
s.MetaClient.SetTLS(s.metaUseTLS)
|
2016-01-06 22:34:34 +00:00
|
|
|
}
|
|
|
|
if err := s.MetaClient.Open(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-02-15 20:50:21 +00:00
|
|
|
|
2016-02-23 15:54:29 +00:00
|
|
|
// if the node ID is > 0 then we need to initialize the metaclient
|
2016-02-15 20:50:21 +00:00
|
|
|
if s.Node.ID > 0 {
|
|
|
|
s.MetaClient.WaitForDataChanged()
|
|
|
|
}
|
|
|
|
|
2016-02-19 21:16:35 +00:00
|
|
|
if s.config.Data.Enabled {
|
2016-02-23 15:54:29 +00:00
|
|
|
// If we've already created a data node for our id, we're done
|
|
|
|
if _, err := s.MetaClient.DataNode(s.Node.ID); err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-02-19 21:16:35 +00:00
|
|
|
n, err := s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
|
|
|
|
for err != nil {
|
|
|
|
log.Printf("Unable to create data node. retry in 1s: %s", err.Error())
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
n, err = s.MetaClient.CreateDataNode(s.HTTPAddr(), s.TCPAddr())
|
|
|
|
}
|
|
|
|
s.Node.ID = n.ID
|
2016-02-12 14:06:05 +00:00
|
|
|
|
2016-02-19 21:16:35 +00:00
|
|
|
if err := s.Node.Save(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-01-06 22:34:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2015-08-07 16:27:45 +00:00
|
|
|
|
2016-02-17 03:16:19 +00:00
|
|
|
// HTTPAddr returns the HTTP address used by other nodes for HTTP queries and writes.
|
|
|
|
func (s *Server) HTTPAddr() string {
|
|
|
|
return s.remoteAddr(s.httpAPIAddr)
|
|
|
|
}
|
|
|
|
|
|
|
|
// TCPAddr returns the TCP address used by other nodes for cluster communication.
|
|
|
|
func (s *Server) TCPAddr() string {
|
|
|
|
return s.remoteAddr(s.tcpAddr)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) remoteAddr(addr string) string {
|
|
|
|
hostname := s.config.Hostname
|
|
|
|
if hostname == "" {
|
|
|
|
hostname = meta.DefaultHostname
|
|
|
|
}
|
|
|
|
remote, err := meta.DefaultHost(hostname, addr)
|
|
|
|
if err != nil {
|
|
|
|
return addr
|
|
|
|
}
|
|
|
|
return remote
|
|
|
|
}
|
|
|
|
|
2016-02-15 20:50:21 +00:00
|
|
|
// MetaServers returns the meta node HTTP addresses used by this server.
|
|
|
|
func (s *Server) MetaServers() []string {
|
|
|
|
return s.MetaClient.MetaServers()
|
2015-08-07 16:27:45 +00:00
|
|
|
}
|
|
|
|
|
2015-06-05 20:40:18 +00:00
|
|
|
// Service represents a service attached to the server.
|
|
|
|
type Service interface {
|
|
|
|
Open() error
|
|
|
|
Close() error
|
2014-12-06 01:02:30 +00:00
|
|
|
}
|
2015-06-11 19:12:14 +00:00
|
|
|
|
|
|
|
// prof stores the file locations of active profiles.
|
|
|
|
var prof struct {
|
|
|
|
cpu *os.File
|
|
|
|
mem *os.File
|
|
|
|
}
|
|
|
|
|
|
|
|
// StartProfile initializes the cpu and memory profile, if specified.
|
2015-06-12 14:10:46 +00:00
|
|
|
func startProfile(cpuprofile, memprofile string) {
|
2015-06-11 19:12:14 +00:00
|
|
|
if cpuprofile != "" {
|
|
|
|
f, err := os.Create(cpuprofile)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("cpuprofile: %v", err)
|
|
|
|
}
|
2015-06-12 14:10:46 +00:00
|
|
|
log.Printf("writing CPU profile to: %s\n", cpuprofile)
|
2015-06-11 19:12:14 +00:00
|
|
|
prof.cpu = f
|
|
|
|
pprof.StartCPUProfile(prof.cpu)
|
|
|
|
}
|
|
|
|
|
|
|
|
if memprofile != "" {
|
|
|
|
f, err := os.Create(memprofile)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("memprofile: %v", err)
|
|
|
|
}
|
2015-06-12 14:10:46 +00:00
|
|
|
log.Printf("writing mem profile to: %s\n", memprofile)
|
2015-06-11 19:12:14 +00:00
|
|
|
prof.mem = f
|
2015-06-12 16:07:01 +00:00
|
|
|
runtime.MemProfileRate = 4096
|
2015-06-11 19:12:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// StopProfile closes the cpu and memory profiles if they are running.
|
2015-06-12 14:10:46 +00:00
|
|
|
func stopProfile() {
|
2015-06-11 19:12:14 +00:00
|
|
|
if prof.cpu != nil {
|
|
|
|
pprof.StopCPUProfile()
|
|
|
|
prof.cpu.Close()
|
2015-06-12 14:10:46 +00:00
|
|
|
log.Println("CPU profile stopped")
|
2015-06-11 19:12:14 +00:00
|
|
|
}
|
|
|
|
if prof.mem != nil {
|
|
|
|
pprof.Lookup("heap").WriteTo(prof.mem, 0)
|
|
|
|
prof.mem.Close()
|
2015-06-12 14:10:46 +00:00
|
|
|
log.Println("mem profile stopped")
|
2015-06-11 19:12:14 +00:00
|
|
|
}
|
|
|
|
}
|
2015-07-28 22:04:03 +00:00
|
|
|
|
|
|
|
type tcpaddr struct{ host string }
|
|
|
|
|
|
|
|
func (a *tcpaddr) Network() string { return "tcp" }
|
|
|
|
func (a *tcpaddr) String() string { return a.host }
|
2016-02-12 22:10:02 +00:00
|
|
|
|
|
|
|
// monitorPointsWriter is a wrapper around `cluster.PointsWriter` that helps
|
|
|
|
// to prevent a circular dependency between the `cluster` and `monitor` packages.
|
|
|
|
type monitorPointsWriter cluster.PointsWriter
|
|
|
|
|
|
|
|
func (pw *monitorPointsWriter) WritePoints(database, retentionPolicy string, points models.Points) error {
|
|
|
|
return (*cluster.PointsWriter)(pw).WritePoints(&cluster.WritePointsRequest{
|
|
|
|
Database: database,
|
|
|
|
RetentionPolicy: retentionPolicy,
|
|
|
|
ConsistencyLevel: cluster.ConsistencyLevelOne,
|
|
|
|
Points: points,
|
|
|
|
})
|
|
|
|
}
|