Merge pull request #5511 from seiflotfy/master
Initialize MetaClient before setting it up as a data and/or meta nodepull/5513/head
commit
5a9953eec1
|
@ -35,6 +35,7 @@
|
|||
- [#5455](https://github.com/influxdata/influxdb/issues/5455): panic: runtime error: slice bounds out of range when loading corrupted wal segment
|
||||
- [#5478](https://github.com/influxdata/influxdb/issues/5478): panic: interface conversion: interface is float64, not int64
|
||||
- [#5475](https://github.com/influxdata/influxdb/issues/5475): Ensure appropriate exit code returned for non-interactive use of CLI.
|
||||
- [#5479](https://github.com/influxdata/influxdb/issues/5479): Bringing up a node as a meta only node causes panic
|
||||
|
||||
## v0.9.6 [2015-12-09]
|
||||
|
||||
|
|
|
@ -363,114 +363,107 @@ func (s *Server) Err() <-chan error { return s.err }
|
|||
|
||||
// Open opens the meta and data store and all services.
|
||||
func (s *Server) Open() error {
|
||||
if err := func() error {
|
||||
// Start profiling, if set.
|
||||
startProfile(s.CPUProfile, s.MemProfile)
|
||||
// Start profiling, if set.
|
||||
startProfile(s.CPUProfile, s.MemProfile)
|
||||
|
||||
// Open shared TCP connection.
|
||||
ln, err := net.Listen("tcp", s.BindAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listen: %s", err)
|
||||
// Open shared TCP connection.
|
||||
ln, err := net.Listen("tcp", s.BindAddress)
|
||||
if err != nil {
|
||||
return fmt.Errorf("listen: %s", err)
|
||||
}
|
||||
s.Listener = ln
|
||||
|
||||
// Multiplex listener.
|
||||
mux := tcp.NewMux()
|
||||
go mux.Serve(ln)
|
||||
|
||||
if s.MetaService != nil {
|
||||
s.MetaService.RaftListener = mux.Listen(meta.MuxHeader)
|
||||
// Open meta service.
|
||||
if err := s.MetaService.Open(); err != nil {
|
||||
return fmt.Errorf("open meta service: %s", err)
|
||||
}
|
||||
s.Listener = ln
|
||||
go s.monitorErrorChan(s.MetaService.Err())
|
||||
}
|
||||
|
||||
// Multiplex listener.
|
||||
mux := tcp.NewMux()
|
||||
go mux.Serve(ln)
|
||||
|
||||
if s.MetaService != nil {
|
||||
s.MetaService.RaftListener = mux.Listen(meta.MuxHeader)
|
||||
|
||||
// Open meta service.
|
||||
if err := s.MetaService.Open(); err != nil {
|
||||
return fmt.Errorf("open meta service: %s", err)
|
||||
}
|
||||
go s.monitorErrorChan(s.MetaService.Err())
|
||||
}
|
||||
|
||||
if s.TSDBStore != nil {
|
||||
if err := s.initializeDataNode(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Append services.
|
||||
s.appendClusterService(s.config.Cluster)
|
||||
s.appendPrecreatorService(s.config.Precreator)
|
||||
s.appendSnapshotterService()
|
||||
s.appendCopierService()
|
||||
s.appendAdminService(s.config.Admin)
|
||||
s.appendContinuousQueryService(s.config.ContinuousQuery)
|
||||
s.appendHTTPDService(s.config.HTTPD)
|
||||
s.appendCollectdService(s.config.Collectd)
|
||||
if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, g := range s.config.UDPs {
|
||||
s.appendUDPService(g)
|
||||
}
|
||||
s.appendRetentionPolicyService(s.config.Retention)
|
||||
for _, g := range s.config.Graphites {
|
||||
if err := s.appendGraphiteService(g); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.Subscriber.MetaClient = s.MetaClient
|
||||
s.ShardMapper.MetaClient = s.MetaClient
|
||||
s.QueryExecutor.MetaClient = s.MetaClient
|
||||
s.ShardWriter.MetaClient = s.MetaClient
|
||||
s.HintedHandoff.MetaClient = s.MetaClient
|
||||
s.Subscriber.MetaClient = s.MetaClient
|
||||
s.PointsWriter.MetaClient = s.MetaClient
|
||||
s.Monitor.MetaClient = s.MetaClient
|
||||
|
||||
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
|
||||
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
|
||||
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
|
||||
|
||||
// Open TSDB store.
|
||||
if err := s.TSDBStore.Open(); err != nil {
|
||||
return fmt.Errorf("open tsdb store: %s", err)
|
||||
}
|
||||
|
||||
// Open the hinted handoff service
|
||||
if err := s.HintedHandoff.Open(); err != nil {
|
||||
return fmt.Errorf("open hinted handoff: %s", err)
|
||||
}
|
||||
|
||||
// Open the subcriber service
|
||||
if err := s.Subscriber.Open(); err != nil {
|
||||
return fmt.Errorf("open subscriber: %s", err)
|
||||
}
|
||||
|
||||
// Open the points writer service
|
||||
if err := s.PointsWriter.Open(); err != nil {
|
||||
return fmt.Errorf("open points writer: %s", err)
|
||||
}
|
||||
|
||||
// Open the monitor service
|
||||
if err := s.Monitor.Open(); err != nil {
|
||||
return fmt.Errorf("open monitor: %v", err)
|
||||
}
|
||||
|
||||
for _, service := range s.Services {
|
||||
if err := service.Open(); err != nil {
|
||||
return fmt.Errorf("open service: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the reporting service, if not disabled.
|
||||
if !s.reportingDisabled {
|
||||
go s.startServerReporting()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}(); err != nil {
|
||||
// initialize MetaClient.
|
||||
if err = s.initializeMetaClient(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s.TSDBStore != nil {
|
||||
// Append services.
|
||||
s.appendClusterService(s.config.Cluster)
|
||||
s.appendPrecreatorService(s.config.Precreator)
|
||||
s.appendSnapshotterService()
|
||||
s.appendCopierService()
|
||||
s.appendAdminService(s.config.Admin)
|
||||
s.appendContinuousQueryService(s.config.ContinuousQuery)
|
||||
s.appendHTTPDService(s.config.HTTPD)
|
||||
s.appendCollectdService(s.config.Collectd)
|
||||
if err := s.appendOpenTSDBService(s.config.OpenTSDB); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, g := range s.config.UDPs {
|
||||
s.appendUDPService(g)
|
||||
}
|
||||
s.appendRetentionPolicyService(s.config.Retention)
|
||||
for _, g := range s.config.Graphites {
|
||||
if err := s.appendGraphiteService(g); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
s.Subscriber.MetaClient = s.MetaClient
|
||||
s.ShardMapper.MetaClient = s.MetaClient
|
||||
s.QueryExecutor.MetaClient = s.MetaClient
|
||||
s.ShardWriter.MetaClient = s.MetaClient
|
||||
s.HintedHandoff.MetaClient = s.MetaClient
|
||||
s.Subscriber.MetaClient = s.MetaClient
|
||||
s.PointsWriter.MetaClient = s.MetaClient
|
||||
s.Monitor.MetaClient = s.MetaClient
|
||||
|
||||
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
|
||||
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
|
||||
s.CopierService.Listener = mux.Listen(copier.MuxHeader)
|
||||
|
||||
// Open TSDB store.
|
||||
if err := s.TSDBStore.Open(); err != nil {
|
||||
return fmt.Errorf("open tsdb store: %s", err)
|
||||
}
|
||||
|
||||
// Open the hinted handoff service
|
||||
if err := s.HintedHandoff.Open(); err != nil {
|
||||
return fmt.Errorf("open hinted handoff: %s", err)
|
||||
}
|
||||
|
||||
// Open the subcriber service
|
||||
if err := s.Subscriber.Open(); err != nil {
|
||||
return fmt.Errorf("open subscriber: %s", err)
|
||||
}
|
||||
|
||||
// Open the points writer service
|
||||
if err := s.PointsWriter.Open(); err != nil {
|
||||
return fmt.Errorf("open points writer: %s", err)
|
||||
}
|
||||
|
||||
// Open the monitor service
|
||||
if err := s.Monitor.Open(); err != nil {
|
||||
return fmt.Errorf("open monitor: %v", err)
|
||||
}
|
||||
|
||||
for _, service := range s.Services {
|
||||
if err := service.Open(); err != nil {
|
||||
return fmt.Errorf("open service: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start the reporting service, if not disabled.
|
||||
if !s.reportingDisabled {
|
||||
go s.startServerReporting()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -547,15 +540,19 @@ func (s *Server) reportServer() {
|
|||
|
||||
numMeasurements := 0
|
||||
numSeries := 0
|
||||
for _, di := range dis {
|
||||
d := s.TSDBStore.DatabaseIndex(di.Name)
|
||||
if d == nil {
|
||||
// No data in this store for this database.
|
||||
continue
|
||||
|
||||
// Only needed in the case of a data node
|
||||
if s.TSDBStore != nil {
|
||||
for _, di := range dis {
|
||||
d := s.TSDBStore.DatabaseIndex(di.Name)
|
||||
if d == nil {
|
||||
// No data in this store for this database.
|
||||
continue
|
||||
}
|
||||
m, s := d.MeasurementSeriesCounts()
|
||||
numMeasurements += m
|
||||
numSeries += s
|
||||
}
|
||||
m, s := d.MeasurementSeriesCounts()
|
||||
numMeasurements += m
|
||||
numSeries += s
|
||||
}
|
||||
|
||||
clusterID := s.MetaClient.ClusterID()
|
||||
|
@ -603,8 +600,8 @@ func (s *Server) monitorErrorChan(ch <-chan error) {
|
|||
}
|
||||
}
|
||||
|
||||
// initializeDataNode will set the MetaClient and join the node to the cluster if needed
|
||||
func (s *Server) initializeDataNode() error {
|
||||
// initializeMetaClient will set the MetaClient and join the node to the cluster if needed
|
||||
func (s *Server) initializeMetaClient() error {
|
||||
// if the node ID is > 0 then we just need to initialize the metaclient
|
||||
if s.Node.ID > 0 {
|
||||
s.MetaClient = meta.NewClient(s.Node.MetaServers, s.metaUseTLS)
|
||||
|
@ -626,17 +623,20 @@ func (s *Server) initializeDataNode() error {
|
|||
}
|
||||
s.MetaClient = meta.NewClient([]string{s.MetaService.HTTPAddr()}, s.metaUseTLS)
|
||||
} else {
|
||||
// join this data node to the cluster
|
||||
// join this node to the cluster
|
||||
s.MetaClient = meta.NewClient(s.joinPeers, s.metaUseTLS)
|
||||
}
|
||||
if err := s.MetaClient.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
if s.TSDBStore != nil {
|
||||
n, err := s.MetaClient.CreateDataNode(s.httpAPIAddr, s.tcpAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.Node.ID = n.ID
|
||||
}
|
||||
s.Node.ID = n.ID
|
||||
metaNodes, err := s.MetaClient.MetaNodes()
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue