Don't Panic on Missing Dirs
Issue: 1753 Figure out what's missing, create it, and keep going.pull/1874/head
parent
f957036405
commit
771ef1cec3
|
@ -31,7 +31,16 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
if config == nil {
|
||||
config = NewConfig()
|
||||
}
|
||||
initializing := !fileExists(config.BrokerDir()) && !fileExists(config.DataDir())
|
||||
|
||||
var initBroker, initServer bool
|
||||
if initBroker = !fileExists(config.BrokerDir()); initBroker {
|
||||
log.Printf("Broker directory missing. Need to create a broker.")
|
||||
}
|
||||
|
||||
if initServer = !fileExists(config.DataDir()); initServer {
|
||||
log.Printf("Data directory missing. Need to create data directory.")
|
||||
}
|
||||
initServer = initServer || initBroker
|
||||
|
||||
// Parse join urls from the --join flag.
|
||||
var joinURLs []*url.URL
|
||||
|
@ -42,7 +51,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
}
|
||||
|
||||
// Open broker, initialize or join as necessary.
|
||||
b := openBroker(config.BrokerDir(), config.BrokerURL(), initializing, joinURLs, logWriter)
|
||||
b := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, logWriter)
|
||||
|
||||
// Start the broker handler.
|
||||
var h *Handler
|
||||
|
@ -65,7 +74,7 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
}
|
||||
|
||||
// Open server, initialize or join as necessary.
|
||||
s := openServer(config, b, initializing, configExists, joinURLs, logWriter)
|
||||
s := openServer(config, b, initServer, initBroker, configExists, joinURLs, logWriter)
|
||||
s.SetAuthenticationEnabled(config.Authentication.Enabled)
|
||||
|
||||
// Enable retention policy enforcement if requested.
|
||||
|
@ -155,8 +164,11 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
|
|||
|
||||
// unless disabled, start the loop to report anonymous usage stats every 24h
|
||||
if !config.ReportingDisabled {
|
||||
clusterID := b.Broker.Log().Config().ClusterID
|
||||
go s.StartReportingLoop(version, clusterID)
|
||||
// Make sure we have a config object b4 we try to use it.
|
||||
if configObj := b.Broker.Log().Config(); configObj != nil {
|
||||
clusterID := configObj.ClusterID
|
||||
go s.StartReportingLoop(version, clusterID)
|
||||
}
|
||||
}
|
||||
|
||||
return b.Broker, s
|
||||
|
@ -169,7 +181,7 @@ func writePIDFile(path string) {
|
|||
}
|
||||
|
||||
// Ensure the required directory structure exists.
|
||||
err := os.MkdirAll(filepath.Dir(path), 0700)
|
||||
err := os.MkdirAll(filepath.Dir(path), 0755)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
@ -204,11 +216,6 @@ func parseConfig(path, hostname string) *Config {
|
|||
|
||||
// creates and initializes a broker.
|
||||
func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL, w io.Writer) *influxdb.Broker {
|
||||
// Ignore if there's no existing broker and we're not initializing or joining.
|
||||
if !fileExists(path) && !initializing && len(joinURLs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create broker.
|
||||
b := influxdb.NewBroker()
|
||||
b.SetLogOutput(w)
|
||||
|
@ -217,7 +224,7 @@ func openBroker(path string, u *url.URL, initializing bool, joinURLs []*url.URL,
|
|||
log.Fatalf("failed to open broker: %s", err)
|
||||
}
|
||||
|
||||
// If this is a new broker then we can initialie two ways:
|
||||
// If this is a new broker then we can initialize two ways:
|
||||
// 1) Start a brand new cluster.
|
||||
// 2) Join an existing cluster.
|
||||
if initializing {
|
||||
|
@ -253,12 +260,7 @@ func joinBroker(b *influxdb.Broker, joinURLs []*url.URL) {
|
|||
}
|
||||
|
||||
// creates and initializes a server.
|
||||
func openServer(config *Config, b *influxdb.Broker, initializing, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
|
||||
// Ignore if there's no existing server and we're not initializing or joining.
|
||||
if !fileExists(config.Data.Dir) && !initializing && len(joinURLs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []*url.URL, w io.Writer) *influxdb.Server {
|
||||
// Create and open the server.
|
||||
s := influxdb.NewServer()
|
||||
s.SetLogOutput(w)
|
||||
|
@ -272,14 +274,15 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
|
|||
}
|
||||
|
||||
// If the server is uninitialized then initialize or join it.
|
||||
if initializing {
|
||||
if initServer {
|
||||
if len(joinURLs) == 0 {
|
||||
initializeServer(config.DataURL(), s, b, w)
|
||||
initializeServer(config.DataURL(), s, b, w, initBroker)
|
||||
} else {
|
||||
joinServer(s, config.DataURL(), joinURLs)
|
||||
openServerClient(s, joinURLs, w)
|
||||
}
|
||||
} else if !configExists {
|
||||
}
|
||||
|
||||
if !configExists {
|
||||
// We are spining up a server that has no config,
|
||||
// but already has an initialized data directory
|
||||
joinURLs = []*url.URL{b.URL()}
|
||||
|
@ -297,12 +300,14 @@ func openServer(config *Config, b *influxdb.Broker, initializing, configExists b
|
|||
}
|
||||
|
||||
// initializes a new server that does not yet have an ID.
|
||||
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer) {
|
||||
func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) {
|
||||
// TODO: Create replica using the messaging client.
|
||||
|
||||
// Create replica on broker.
|
||||
if err := b.CreateReplica(1, u); err != nil {
|
||||
log.Fatalf("replica creation error: %s", err)
|
||||
if initBroker {
|
||||
// Create replica on broker.
|
||||
if err := b.CreateReplica(1, u); err != nil {
|
||||
log.Fatalf("replica creation error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Create messaging client.
|
||||
|
@ -315,9 +320,11 @@ func initializeServer(u *url.URL, s *influxdb.Server, b *influxdb.Broker, w io.W
|
|||
log.Fatalf("set client error: %s", err)
|
||||
}
|
||||
|
||||
// Initialize the server.
|
||||
if err := s.Initialize(b.URL()); err != nil {
|
||||
log.Fatalf("server initialization error: %s", err)
|
||||
if initBroker {
|
||||
// Initialize the server.
|
||||
if err := s.Initialize(b.URL()); err != nil {
|
||||
log.Fatalf("server initialization error: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -770,7 +770,7 @@ func (t *topic) open() error {
|
|||
assert(t.file == nil, "topic already open: %d", t.id)
|
||||
|
||||
// Ensure the parent directory exists.
|
||||
if err := os.MkdirAll(filepath.Dir(t.path), 0700); err != nil {
|
||||
if err := os.MkdirAll(filepath.Dir(t.path), 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -229,7 +229,7 @@ func (l *Log) Open(path string) error {
|
|||
}
|
||||
|
||||
// Create directory, if not exists.
|
||||
if err := os.MkdirAll(path, 0700); err != nil {
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
l.path = path
|
||||
|
|
|
@ -164,10 +164,10 @@ func (s *Server) Open(path string) error {
|
|||
s.path = path
|
||||
|
||||
// Create required directories.
|
||||
if err := os.MkdirAll(path, 0700); err != nil {
|
||||
if err := os.MkdirAll(path, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Join(path, "shards"), 0700); err != nil {
|
||||
if err := os.MkdirAll(filepath.Join(path, "shards"), 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue