influxdb/cmd/influxd/run/server.go

726 lines
18 KiB
Go
Raw Normal View History

2015-05-28 21:47:47 +00:00
package run
import (
"fmt"
"log"
2015-06-05 22:25:21 +00:00
"net"
"os"
"reflect"
"runtime"
"runtime/pprof"
2015-06-01 17:20:57 +00:00
"time"
2015-05-29 19:50:05 +00:00
2015-12-23 15:48:25 +00:00
"github.com/influxdb/influxdb"
2015-05-28 21:47:47 +00:00
"github.com/influxdb/influxdb/cluster"
2015-09-01 03:28:24 +00:00
"github.com/influxdb/influxdb/monitor"
2015-05-30 14:20:12 +00:00
"github.com/influxdb/influxdb/services/admin"
"github.com/influxdb/influxdb/services/collectd"
2015-06-03 22:39:37 +00:00
"github.com/influxdb/influxdb/services/continuous_querier"
"github.com/influxdb/influxdb/services/copier"
2015-05-30 20:00:46 +00:00
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/hh"
2015-05-30 14:20:12 +00:00
"github.com/influxdb/influxdb/services/httpd"
2015-12-23 15:48:25 +00:00
"github.com/influxdb/influxdb/services/meta"
2015-05-30 14:20:12 +00:00
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/precreator"
2015-06-04 21:31:23 +00:00
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/snapshotter"
"github.com/influxdb/influxdb/services/subscriber"
2015-06-03 13:06:36 +00:00
"github.com/influxdb/influxdb/services/udp"
2015-06-05 22:25:21 +00:00
"github.com/influxdb/influxdb/tcp"
2015-05-28 21:47:47 +00:00
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/usage-client/v1"
// Initialize the engine packages
2015-07-22 14:53:20 +00:00
_ "github.com/influxdb/influxdb/tsdb/engine"
)
// BuildInfo represents the build details for the server code.
type BuildInfo struct {
Version string
Commit string
Branch string
2015-09-25 06:32:47 +00:00
Time string
}
// Server represents a container for the metadata and storage data and services.
// It is built using a Config and it manages the startup and shutdown of all
// services in the proper order.
2015-05-28 21:47:47 +00:00
type Server struct {
buildInfo BuildInfo
err chan error
closing chan struct{}
2015-06-05 22:25:21 +00:00
BindAddress string
Listener net.Listener
Node *influxdb.Node
MetaClient *meta.Client
MetaService *meta.Service
TSDBStore *tsdb.Store
QueryExecutor *tsdb.QueryExecutor
2015-05-30 20:00:46 +00:00
PointsWriter *cluster.PointsWriter
ShardWriter *cluster.ShardWriter
2015-07-16 00:52:24 +00:00
ShardMapper *cluster.ShardMapper
HintedHandoff *hh.Service
Subscriber *subscriber.Service
2015-05-28 21:47:47 +00:00
Services []Service
// These references are required for the tcp muxer.
ClusterService *cluster.Service
SnapshotterService *snapshotter.Service
CopierService *copier.Service
Monitor *monitor.Monitor
2015-09-01 03:28:24 +00:00
// Server reporting and registration
reportingDisabled bool
// Profiling
CPUProfile string
MemProfile string
// joinPeers are the metaservers specified at run time to join this server to
joinPeers []string
// metaUseTLS specifies if we should use a TLS connection to the meta servers
metaUseTLS bool
// httpAPIAddr is the host:port combination for the main HTTP API for querying and writing data
httpAPIAddr string
// httpUseTLS specifies if we should use a TLS connection to the http servers
httpUseTLS bool
// tcpAddr is the host:port combination for the TCP listener that services mux onto
tcpAddr string
2015-03-25 16:49:05 +00:00
}
2015-05-28 21:47:47 +00:00
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// load the node information. Before 0.10 this was in the meta directory,
// so use that if the top level directory isn't specified
dir := c.Dir
if dir == "" {
dir = c.Meta.Dir
}
node, err := influxdb.NewNode(dir)
if err != nil {
return nil, err
}
// In 0.10.0 bind-address got moved to the top level. Check
// The old location to keep things backwards compatible
bind := c.BindAddress
if bind == "" {
bind = c.Meta.BindAddress
}
if !c.Data.Enabled && !c.Meta.Enabled {
return nil, fmt.Errorf("must run as either meta node or data node or both")
}
2015-05-28 21:47:47 +00:00
s := &Server{
buildInfo: *buildInfo,
err: make(chan error),
closing: make(chan struct{}),
2015-06-05 22:25:21 +00:00
BindAddress: bind,
2015-06-05 22:25:21 +00:00
Node: node,
Monitor: monitor.New(c.Monitor),
2015-05-28 21:47:47 +00:00
reportingDisabled: c.ReportingDisabled,
joinPeers: c.Meta.JoinPeers,
metaUseTLS: c.Meta.HTTPSEnabled,
httpAPIAddr: c.HTTPD.BindAddress,
httpUseTLS: c.HTTPD.HTTPSEnabled,
tcpAddr: c.BindAddress,
2015-09-01 03:28:24 +00:00
}
// before 0.10.0 the TCP bind address was in meta, get it from there
// if they don't have it at the top level
if s.tcpAddr == "" {
s.tcpAddr = c.Meta.BindAddress
}
2015-12-23 15:48:25 +00:00
if c.Meta.Enabled {
s.MetaService = meta.NewService(c.Meta)
}
if c.Data.Enabled {
s.TSDBStore = tsdb.NewStore(c.Data.Dir)
s.TSDBStore.EngineOptions.Config = c.Data
// Copy TSDB configuration.
s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
s.TSDBStore.EngineOptions.MaxWALSize = c.Data.MaxWALSize
s.TSDBStore.EngineOptions.WALFlushInterval = time.Duration(c.Data.WALFlushInterval)
s.TSDBStore.EngineOptions.WALPartitionFlushDelay = time.Duration(c.Data.WALPartitionFlushDelay)
// Set the shard mapper
s.ShardMapper = cluster.NewShardMapper(time.Duration(c.Cluster.ShardMapperTimeout))
s.ShardMapper.ForceRemoteMapping = c.Cluster.ForceRemoteShardMapping
s.ShardMapper.TSDBStore = s.TSDBStore
s.ShardMapper.Node = node
// Initialize query executor.
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
s.QueryExecutor.MonitorStatementExecutor = &monitor.StatementExecutor{Monitor: s.Monitor}
s.QueryExecutor.ShardMapper = s.ShardMapper
s.QueryExecutor.QueryLogEnabled = c.Data.QueryLogEnabled
// Set the shard writer
s.ShardWriter = cluster.NewShardWriter(time.Duration(c.Cluster.ShardWriterTimeout))
// Create the hinted handoff service
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaClient)
s.HintedHandoff.Monitor = s.Monitor
// Create the Subscriber service
s.Subscriber = subscriber.NewService(c.Subscriber)
// Initialize points writer.
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.WriteTimeout = time.Duration(c.Cluster.WriteTimeout)
s.PointsWriter.TSDBStore = s.TSDBStore
s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff
s.PointsWriter.Subscriber = s.Subscriber
s.PointsWriter.Node = s.Node
// needed for executing INTO queries.
s.QueryExecutor.IntoWriter = s.PointsWriter
// Initialize the monitor
s.Monitor.Version = s.buildInfo.Version
s.Monitor.Commit = s.buildInfo.Commit
s.Monitor.Branch = s.buildInfo.Branch
s.Monitor.BuildTime = s.buildInfo.Time
s.Monitor.PointsWriter = s.PointsWriter
// Append services.
s.appendClusterService(c.Cluster)
s.appendPrecreatorService(c.Precreator)
s.appendSnapshotterService()
s.appendCopierService()
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
if err := s.appendOpenTSDBService(c.OpenTSDB); err != nil {
2015-06-09 20:28:40 +00:00
return nil, err
}
for _, g := range c.UDPs {
s.appendUDPService(g)
}
s.appendRetentionPolicyService(c.Retention)
for _, g := range c.Graphites {
if err := s.appendGraphiteService(g); err != nil {
return nil, err
}
}
2015-05-28 21:47:47 +00:00
}
2015-06-09 20:28:40 +00:00
return s, nil
}
func (s *Server) appendClusterService(c cluster.Config) {
srv := cluster.NewService(c)
srv.TSDBStore = s.TSDBStore
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
s.Services = append(s.Services, srv)
s.ClusterService = srv
}
func (s *Server) appendSnapshotterService() {
srv := snapshotter.NewService()
srv.TSDBStore = s.TSDBStore
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
s.Services = append(s.Services, srv)
s.SnapshotterService = srv
}
2015-05-28 21:47:47 +00:00
func (s *Server) appendCopierService() {
srv := copier.NewService()
srv.TSDBStore = s.TSDBStore
s.Services = append(s.Services, srv)
s.CopierService = srv
}
2015-06-04 21:31:23 +00:00
func (s *Server) appendRetentionPolicyService(c retention.Config) {
if !c.Enabled {
return
}
2015-06-04 21:31:23 +00:00
srv := retention.NewService(c)
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
2015-06-04 21:31:23 +00:00
srv.TSDBStore = s.TSDBStore
s.Services = append(s.Services, srv)
}
func (s *Server) appendAdminService(c admin.Config) {
if !c.Enabled {
return
}
srv := admin.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendHTTPDService(c httpd.Config) {
if !c.Enabled {
return
}
srv := httpd.NewService(c)
2015-12-23 15:48:25 +00:00
srv.Handler.MetaClient = s.MetaClient
2015-06-01 17:20:57 +00:00
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter
srv.Handler.Version = s.buildInfo.Version
// If a ContinuousQuerier service has been started, attach it.
for _, srvc := range s.Services {
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
srv.Handler.ContinuousQuerier = cqsrvc
}
}
s.Services = append(s.Services, srv)
}
2015-05-28 21:47:47 +00:00
func (s *Server) appendCollectdService(c collectd.Config) {
if !c.Enabled {
return
}
srv := collectd.NewService(c)
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}
func (s *Server) appendOpenTSDBService(c opentsdb.Config) error {
if !c.Enabled {
return nil
}
srv, err := opentsdb.NewService(c)
if err != nil {
return err
}
srv.PointsWriter = s.PointsWriter
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
s.Services = append(s.Services, srv)
return nil
}
2015-06-09 20:28:40 +00:00
func (s *Server) appendGraphiteService(c graphite.Config) error {
if !c.Enabled {
2015-06-09 20:28:40 +00:00
return nil
}
2015-06-09 20:28:40 +00:00
srv, err := graphite.NewService(c)
if err != nil {
return err
}
srv.PointsWriter = s.PointsWriter
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
2015-06-09 20:28:40 +00:00
return nil
2015-05-28 21:47:47 +00:00
}
func (s *Server) appendPrecreatorService(c precreator.Config) error {
2015-06-10 20:05:48 +00:00
if !c.Enabled {
return nil
}
srv, err := precreator.NewService(c)
2015-06-10 20:05:48 +00:00
if err != nil {
return err
}
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
2015-06-10 20:05:48 +00:00
s.Services = append(s.Services, srv)
return nil
}
2015-06-03 13:06:36 +00:00
func (s *Server) appendUDPService(c udp.Config) {
if !c.Enabled {
return
}
2015-06-03 13:06:36 +00:00
srv := udp.NewService(c)
2015-06-04 23:07:34 +00:00
srv.PointsWriter = s.PointsWriter
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
s.Services = append(s.Services, srv)
2015-06-03 22:39:37 +00:00
}
func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
if !c.Enabled {
return
}
2015-06-03 22:39:37 +00:00
srv := continuous_querier.NewService(c)
2015-12-23 15:48:25 +00:00
srv.MetaClient = s.MetaClient
2015-06-03 22:39:37 +00:00
srv.QueryExecutor = s.QueryExecutor
2015-06-03 13:06:36 +00:00
s.Services = append(s.Services, srv)
}
// Err returns an error channel that multiplexes all out of band errors received from all services.
func (s *Server) Err() <-chan error { return s.err }
2015-05-28 21:47:47 +00:00
// Open opens the meta and data store and all services.
func (s *Server) Open() error {
if err := func() error {
// Start profiling, if set.
2015-06-12 14:10:46 +00:00
startProfile(s.CPUProfile, s.MemProfile)
2015-06-05 22:25:21 +00:00
// Open shared TCP connection.
ln, err := net.Listen("tcp", s.BindAddress)
if err != nil {
return fmt.Errorf("listen: %s", err)
}
s.Listener = ln
// Multiplex listener.
mux := tcp.NewMux()
2015-06-06 03:58:54 +00:00
go mux.Serve(ln)
2015-06-05 22:25:21 +00:00
if s.MetaService != nil {
s.MetaService.RaftListener = mux.Listen(meta.MuxHeader)
2015-05-28 21:47:47 +00:00
// Open meta service.
if err := s.MetaService.Open(); err != nil {
return fmt.Errorf("open meta service: %s", err)
}
go s.monitorErrorChan(s.MetaService.Err())
2015-05-28 21:47:47 +00:00
}
if s.TSDBStore != nil {
if err := s.initializeDataNode(); err != nil {
return err
}
s.Subscriber.MetaClient = s.MetaClient
s.ShardMapper.MetaClient = s.MetaClient
s.QueryExecutor.MetaClient = s.MetaClient
s.ShardWriter.MetaClient = s.MetaClient
s.HintedHandoff.MetaClient = s.MetaClient
s.Subscriber.MetaClient = s.MetaClient
s.PointsWriter.MetaClient = s.MetaClient
s.Monitor.MetaClient = s.MetaClient
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
// Open TSDB store.
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
}
// Open the hinted handoff service
if err := s.HintedHandoff.Open(); err != nil {
return fmt.Errorf("open hinted handoff: %s", err)
}
// Open the subcriber service
if err := s.Subscriber.Open(); err != nil {
return fmt.Errorf("open subscriber: %s", err)
}
// Open the points writer service
if err := s.PointsWriter.Open(); err != nil {
return fmt.Errorf("open points writer: %s", err)
}
// Open the monitor service
if err := s.Monitor.Open(); err != nil {
return fmt.Errorf("open monitor: %v", err)
}
for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
}
2015-05-28 21:47:47 +00:00
}
}
2015-06-11 07:03:10 +00:00
// Start the reporting service, if not disabled.
if !s.reportingDisabled {
go s.startServerReporting()
}
2015-05-28 21:47:47 +00:00
return nil
}(); err != nil {
return err
}
2015-05-28 21:47:47 +00:00
return nil
}
// Close shuts down the meta and data stores and all services.
func (s *Server) Close() error {
2015-06-12 14:10:46 +00:00
stopProfile()
// Close the listener first to stop any new connections
2015-06-05 22:25:21 +00:00
if s.Listener != nil {
s.Listener.Close()
}
2015-09-08 17:05:39 +00:00
// Close services to allow any inflight requests to complete
// and prevent new requests from being accepted.
for _, service := range s.Services {
service.Close()
2015-05-28 21:47:47 +00:00
}
if s.Monitor != nil {
s.Monitor.Close()
2015-05-28 21:47:47 +00:00
}
if s.PointsWriter != nil {
s.PointsWriter.Close()
}
if s.HintedHandoff != nil {
s.HintedHandoff.Close()
}
// Close the TSDBStore, no more reads or writes at this point
if s.TSDBStore != nil {
s.TSDBStore.Close()
}
if s.Subscriber != nil {
s.Subscriber.Close()
}
// Finally close the meta-store since everything else depends on it
2015-12-23 15:48:25 +00:00
if s.MetaService != nil {
s.MetaService.Close()
2015-05-28 21:47:47 +00:00
}
2015-07-15 17:14:11 +00:00
if s.MetaClient != nil {
s.MetaClient.Close()
}
close(s.closing)
2015-05-28 21:47:47 +00:00
return nil
}
// startServerReporting starts periodic server reporting.
func (s *Server) startServerReporting() {
for {
select {
case <-s.closing:
return
default:
}
2015-12-23 15:48:25 +00:00
if err := s.MetaClient.WaitForLeader(30 * time.Second); err != nil {
log.Printf("no leader available for reporting: %s", err.Error())
time.Sleep(time.Second)
continue
}
s.reportServer()
<-time.After(24 * time.Hour)
}
}
// reportServer reports anonymous statistics about the system.
func (s *Server) reportServer() {
2015-12-23 15:48:25 +00:00
dis, err := s.MetaClient.Databases()
if err != nil {
2015-06-11 15:51:48 +00:00
log.Printf("failed to retrieve databases for reporting: %s", err.Error())
return
}
numDatabases := len(dis)
numMeasurements := 0
numSeries := 0
for _, di := range dis {
d := s.TSDBStore.DatabaseIndex(di.Name)
if d == nil {
// No data in this store for this database.
continue
}
m, s := d.MeasurementSeriesCounts()
numMeasurements += m
numSeries += s
}
2015-12-23 15:48:25 +00:00
clusterID, err := s.MetaClient.ClusterID()
if err != nil {
2015-06-11 15:51:48 +00:00
log.Printf("failed to retrieve cluster ID for reporting: %s", err.Error())
return
}
2015-11-06 15:30:57 +00:00
cl := client.New("")
usage := client.Usage{
Product: "influxdb",
Data: []client.UsageData{
{
Values: client.Values{
"os": runtime.GOOS,
"arch": runtime.GOARCH,
"version": s.buildInfo.Version,
"server_id": fmt.Sprintf("%v", s.Node.ID),
"cluster_id": fmt.Sprintf("%v", clusterID),
2015-11-06 15:30:57 +00:00
"num_series": numSeries,
"num_measurements": numMeasurements,
"num_databases": numDatabases,
},
},
},
}
log.Printf("Sending anonymous usage statistics to m.influxdb.com")
2015-11-06 15:30:57 +00:00
go cl.Save(usage)
}
// monitorErrorChan reads an error channel and resends it through the server.
func (s *Server) monitorErrorChan(ch <-chan error) {
for {
select {
case err, ok := <-ch:
if !ok {
return
}
s.err <- err
case <-s.closing:
2015-04-14 19:43:25 +00:00
return
2014-12-31 19:42:53 +00:00
}
}
2014-12-31 19:42:53 +00:00
}
// initializeDataNode will set the MetaClient and join the node to the cluster if needed
func (s *Server) initializeDataNode() error {
// if the node ID is > 0 then we just need to initialize the metaclient
if s.Node.ID > 0 {
s.MetaClient = meta.NewClient(s.Node.MetaServers, s.metaUseTLS)
if err := s.MetaClient.Open(); err != nil {
return err
}
go s.updateMetaNodeInformation()
return nil
}
// It's the first time starting up and we need to either join
// the cluster or initialize this node as the first member
if len(s.joinPeers) == 0 {
// start up a new single node cluster
if s.MetaService == nil {
return fmt.Errorf("server not set to join existing cluster must run also as a meta node")
}
s.MetaClient = meta.NewClient([]string{s.MetaService.HTTPAddr()}, s.metaUseTLS)
} else {
// join this data node to the cluster
s.MetaClient = meta.NewClient(s.joinPeers, s.metaUseTLS)
}
if err := s.MetaClient.Open(); err != nil {
return err
}
n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
if err == nil {
return err
}
s.Node.ID = n.ID
metaNodes, err := s.MetaClient.MetaNodes()
if err != nil {
return err
}
for _, n := range metaNodes {
s.Node.MetaServers = append(s.Node.MetaServers, n.Host)
}
if err := s.Node.Save(); err != nil {
return err
}
go s.updateMetaNodeInformation()
return nil
}
// updateMetaNodeInformation will continuously run and save the node.json file
// if the list of metaservers in the cluster changes
func (s *Server) updateMetaNodeInformation() {
for {
c := s.MetaClient.WaitForDataChanged()
select {
case <-c:
nodes, _ := s.MetaClient.MetaNodes()
var nodeAddrs []string
for _, n := range nodes {
nodeAddrs = append(nodeAddrs, n.Host)
}
if !reflect.DeepEqual(nodeAddrs, s.Node.MetaServers) {
s.Node.MetaServers = nodeAddrs
if err := s.Node.Save(); err != nil {
log.Printf("error saving node information: %s\n", err.Error())
} else {
log.Printf("updated node metaservers with: %v\n", s.Node.MetaServers)
}
}
case <-s.closing:
return
}
}
}
// Service represents a service attached to the server.
type Service interface {
Open() error
Close() error
}
// prof stores the file locations of active profiles.
var prof struct {
cpu *os.File
mem *os.File
}
// StartProfile initializes the cpu and memory profile, if specified.
2015-06-12 14:10:46 +00:00
func startProfile(cpuprofile, memprofile string) {
if cpuprofile != "" {
f, err := os.Create(cpuprofile)
if err != nil {
log.Fatalf("cpuprofile: %v", err)
}
2015-06-12 14:10:46 +00:00
log.Printf("writing CPU profile to: %s\n", cpuprofile)
prof.cpu = f
pprof.StartCPUProfile(prof.cpu)
}
if memprofile != "" {
f, err := os.Create(memprofile)
if err != nil {
log.Fatalf("memprofile: %v", err)
}
2015-06-12 14:10:46 +00:00
log.Printf("writing mem profile to: %s\n", memprofile)
prof.mem = f
2015-06-12 16:07:01 +00:00
runtime.MemProfileRate = 4096
}
}
// StopProfile closes the cpu and memory profiles if they are running.
2015-06-12 14:10:46 +00:00
func stopProfile() {
if prof.cpu != nil {
pprof.StopCPUProfile()
prof.cpu.Close()
2015-06-12 14:10:46 +00:00
log.Println("CPU profile stopped")
}
if prof.mem != nil {
pprof.Lookup("heap").WriteTo(prof.mem, 0)
prof.mem.Close()
2015-06-12 14:10:46 +00:00
log.Println("mem profile stopped")
}
}
type tcpaddr struct{ host string }
func (a *tcpaddr) Network() string { return "tcp" }
func (a *tcpaddr) String() string { return a.host }