influxdb/cmd/influxd/run/server.go

292 lines
7.1 KiB
Go
Raw Normal View History

2015-05-28 21:47:47 +00:00
package run
import (
"fmt"
2015-06-05 22:25:21 +00:00
"net"
2015-06-01 17:20:57 +00:00
"time"
2015-05-29 19:50:05 +00:00
2015-05-28 21:47:47 +00:00
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
2015-05-30 14:20:12 +00:00
"github.com/influxdb/influxdb/services/admin"
"github.com/influxdb/influxdb/services/collectd"
2015-06-03 22:39:37 +00:00
"github.com/influxdb/influxdb/services/continuous_querier"
2015-05-30 20:00:46 +00:00
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/hh"
2015-05-30 14:20:12 +00:00
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
2015-06-04 21:31:23 +00:00
"github.com/influxdb/influxdb/services/retention"
2015-06-03 13:06:36 +00:00
"github.com/influxdb/influxdb/services/udp"
2015-06-05 22:25:21 +00:00
"github.com/influxdb/influxdb/tcp"
2015-05-28 21:47:47 +00:00
"github.com/influxdb/influxdb/tsdb"
)
// Server represents a container for the metadata and storage data and services.
// It is built using a Config and it manages the startup and shutdown of all
// services in the proper order.
2015-05-28 21:47:47 +00:00
type Server struct {
err chan error
closing chan struct{}
2015-06-05 22:25:21 +00:00
Hostname string
BindAddress string
Listener net.Listener
MetaStore *meta.Store
TSDBStore *tsdb.Store
QueryExecutor *tsdb.QueryExecutor
2015-05-30 20:00:46 +00:00
PointsWriter *cluster.PointsWriter
ShardWriter *cluster.ShardWriter
HintedHandoff *hh.Service
2015-05-28 21:47:47 +00:00
Services []Service
2015-03-25 16:49:05 +00:00
}
2015-05-28 21:47:47 +00:00
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config) *Server {
2015-05-28 21:47:47 +00:00
// Construct base meta store and data store.
s := &Server{
2015-06-05 22:25:21 +00:00
err: make(chan error),
closing: make(chan struct{}),
Hostname: c.Meta.Hostname,
BindAddress: c.Meta.BindAddress,
2015-06-01 22:00:13 +00:00
MetaStore: meta.NewStore(c.Meta),
2015-05-29 19:50:05 +00:00
TSDBStore: tsdb.NewStore(c.Data.Dir),
2015-05-28 21:47:47 +00:00
}
2015-05-30 20:00:46 +00:00
// Initialize query executor.
s.QueryExecutor = tsdb.NewQueryExecutor(s.TSDBStore)
s.QueryExecutor.MetaStore = s.MetaStore
s.QueryExecutor.MetaStatementExecutor = &meta.StatementExecutor{Store: s.MetaStore}
2015-06-01 17:20:57 +00:00
// Set the shard writer
s.ShardWriter = cluster.NewShardWriter(time.Duration(c.Cluster.ShardWriterTimeout))
s.ShardWriter.MetaStore = s.MetaStore
2015-06-01 17:20:57 +00:00
// Create the hinted handoff service
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter)
2015-05-30 20:00:46 +00:00
// Initialize points writer.
2015-06-01 17:20:57 +00:00
s.PointsWriter = cluster.NewPointsWriter()
s.PointsWriter.MetaStore = s.MetaStore
s.PointsWriter.TSDBStore = s.TSDBStore
2015-05-30 20:00:46 +00:00
s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff
2015-05-30 20:00:46 +00:00
// Append services.
s.appendClusterService(c.Cluster)
s.appendAdminService(c.Admin)
s.appendContinuousQueryService(c.ContinuousQuery)
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
2015-06-03 13:06:36 +00:00
s.appendUDPService(c.UDP)
2015-06-04 21:31:23 +00:00
s.appendRetentionPolicyService(c.Retention)
for _, g := range c.Graphites {
2015-05-30 20:00:46 +00:00
s.appendGraphiteService(g)
2015-05-28 21:47:47 +00:00
}
return s
}
func (s *Server) appendClusterService(c cluster.Config) {
srv := cluster.NewService(c)
srv.TSDBStore = s.TSDBStore
2015-06-06 04:29:52 +00:00
srv.MetaStore = s.MetaStore
s.Services = append(s.Services, srv)
}
2015-05-28 21:47:47 +00:00
2015-06-04 21:31:23 +00:00
func (s *Server) appendRetentionPolicyService(c retention.Config) {
if !c.Enabled {
return
}
2015-06-04 21:31:23 +00:00
srv := retention.NewService(c)
srv.MetaStore = s.MetaStore
srv.TSDBStore = s.TSDBStore
s.Services = append(s.Services, srv)
}
func (s *Server) appendAdminService(c admin.Config) {
if !c.Enabled {
return
}
srv := admin.NewService(c)
s.Services = append(s.Services, srv)
}
func (s *Server) appendHTTPDService(c httpd.Config) {
if !c.Enabled {
return
}
srv := httpd.NewService(c)
2015-06-01 17:20:57 +00:00
srv.Handler.MetaStore = s.MetaStore
srv.Handler.QueryExecutor = s.QueryExecutor
srv.Handler.PointsWriter = s.PointsWriter
// If a ContinuousQuerier service has been started, attach it.
for _, srvc := range s.Services {
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
srv.Handler.ContinuousQuerier = cqsrvc
}
}
s.Services = append(s.Services, srv)
}
2015-05-28 21:47:47 +00:00
func (s *Server) appendCollectdService(c collectd.Config) {
if !c.Enabled {
return
}
srv := collectd.NewService(c)
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}
func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
if !c.Enabled {
return
}
srv := opentsdb.NewService(c)
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}
func (s *Server) appendGraphiteService(c graphite.Config) {
if !c.Enabled {
return
}
srv := graphite.NewService(c)
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
2015-05-28 21:47:47 +00:00
}
2015-06-03 13:06:36 +00:00
func (s *Server) appendUDPService(c udp.Config) {
if !c.Enabled {
return
}
2015-06-03 13:06:36 +00:00
srv := udp.NewService(c)
2015-06-04 23:07:34 +00:00
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
2015-06-03 22:39:37 +00:00
}
func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
if !c.Enabled {
return
}
2015-06-03 22:39:37 +00:00
srv := continuous_querier.NewService(c)
srv.MetaStore = s.MetaStore
srv.QueryExecutor = s.QueryExecutor
2015-06-04 23:07:34 +00:00
srv.PointsWriter = s.PointsWriter
2015-06-03 13:06:36 +00:00
s.Services = append(s.Services, srv)
}
// Err returns an error channel that multiplexes all out of band errors received from all services.
func (s *Server) Err() <-chan error { return s.err }
2015-05-28 21:47:47 +00:00
// Open opens the meta and data store and all services.
func (s *Server) Open() error {
if err := func() error {
2015-06-05 22:25:21 +00:00
// Resolve host to address.
_, port, err := net.SplitHostPort(s.BindAddress)
if err != nil {
return fmt.Errorf("split bind address: %s", err)
}
hostport := net.JoinHostPort(s.Hostname, port)
addr, err := net.ResolveTCPAddr("tcp", hostport)
if err != nil {
return fmt.Errorf("resolve tcp: addr=%s, err=%s", hostport, err)
}
s.MetaStore.Addr = addr
// Open shared TCP connection.
ln, err := net.Listen("tcp", s.BindAddress)
if err != nil {
return fmt.Errorf("listen: %s", err)
}
s.Listener = ln
// Multiplex listener.
mux := tcp.NewMux()
s.MetaStore.RaftListener = mux.Listen(meta.MuxRaftHeader)
s.MetaStore.ExecListener = mux.Listen(meta.MuxExecHeader)
s.Services[0].(*cluster.Service).Listener = mux.Listen(cluster.MuxHeader)
2015-06-06 03:58:54 +00:00
go mux.Serve(ln)
2015-06-05 22:25:21 +00:00
2015-05-28 21:47:47 +00:00
// Open meta store.
if err := s.MetaStore.Open(); err != nil {
return fmt.Errorf("open meta store: %s", err)
}
go s.monitorErrorChan(s.MetaStore.Err())
// Wait for the store to initialize.
<-s.MetaStore.Ready()
2015-05-28 21:47:47 +00:00
// Open TSDB store.
if err := s.TSDBStore.Open(); err != nil {
return fmt.Errorf("open tsdb store: %s", err)
}
// Open the hinted handoff service
if err := s.HintedHandoff.Open(); err != nil {
return fmt.Errorf("open hinted handoff: %s", err)
}
2015-05-28 21:47:47 +00:00
for _, service := range s.Services {
if err := service.Open(); err != nil {
return fmt.Errorf("open service: %s", err)
}
}
return nil
}(); err != nil {
s.Close()
return err
}
2015-05-28 21:47:47 +00:00
return nil
}
// Close shuts down the meta and data stores and all services.
func (s *Server) Close() error {
2015-06-05 22:25:21 +00:00
if s.Listener != nil {
s.Listener.Close()
}
2015-05-28 21:47:47 +00:00
if s.MetaStore != nil {
s.MetaStore.Close()
}
if s.TSDBStore != nil {
s.TSDBStore.Close()
}
if s.HintedHandoff != nil {
s.HintedHandoff.Close()
}
2015-05-28 21:47:47 +00:00
for _, service := range s.Services {
service.Close()
}
close(s.closing)
2015-05-28 21:47:47 +00:00
return nil
}
// monitorErrorChan reads an error channel and resends it through the server.
func (s *Server) monitorErrorChan(ch <-chan error) {
for {
select {
case err, ok := <-ch:
if !ok {
return
}
s.err <- err
case <-s.closing:
2015-04-14 19:43:25 +00:00
return
2014-12-31 19:42:53 +00:00
}
}
2014-12-31 19:42:53 +00:00
}
// Service represents a service attached to the server.
type Service interface {
Open() error
Close() error
}