Merge pull request #4333 from influxdb/retry_mon_create
Retry monitor storage creation and only on Leaderpull/4335/head
commit
a542d5509f
|
@ -36,6 +36,7 @@
|
|||
- [#4263](https://github.com/influxdb/influxdb/issues/4263): derivative does not work when data is missing
|
||||
- [#4293](https://github.com/influxdb/influxdb/pull/4293): Ensure shell is invoked when touching PID file. Thanks @christopherjdickson
|
||||
- [#4296](https://github.com/influxdb/influxdb/pull/4296): Reject line protocol ending with '-'. Fixes [#4272](https://github.com/influxdb/influxdb/issues/4272)
|
||||
- [#4333](https://github.com/influxdb/influxdb/pull/4333): Retry monitor storage creation and only on Leader.
|
||||
|
||||
## v0.9.4 [2015-09-14]
|
||||
|
||||
|
|
|
@ -75,6 +75,7 @@ type Monitor struct {
|
|||
|
||||
diagRegistrations map[string]DiagsClient
|
||||
|
||||
storeCreated bool
|
||||
storeEnabled bool
|
||||
storeDatabase string
|
||||
storeRetentionPolicy string
|
||||
|
@ -87,6 +88,7 @@ type Monitor struct {
|
|||
ClusterID() (uint64, error)
|
||||
NodeID() uint64
|
||||
WaitForLeader(d time.Duration) error
|
||||
IsLeader() bool
|
||||
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicyIfNotExists(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||
SetDefaultRetentionPolicy(database, name string) error
|
||||
|
@ -294,6 +296,42 @@ func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
|
|||
return diags, nil
|
||||
}
|
||||
|
||||
// createInternalStorage ensures the internal storage has been created.
|
||||
func (m *Monitor) createInternalStorage() {
|
||||
if !m.MetaStore.IsLeader() || m.storeCreated {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := m.MetaStore.CreateDatabaseIfNotExists(m.storeDatabase); err != nil {
|
||||
m.Logger.Printf("failed to create database '%s', failed to create storage: %s",
|
||||
m.storeDatabase, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
rpi := meta.NewRetentionPolicyInfo(MonitorRetentionPolicy)
|
||||
rpi.Duration = MonitorRetentionPolicyDuration
|
||||
rpi.ReplicaN = 1
|
||||
if _, err := m.MetaStore.CreateRetentionPolicyIfNotExists(m.storeDatabase, rpi); err != nil {
|
||||
m.Logger.Printf("failed to create retention policy '%s', failed to create internal storage: %s",
|
||||
rpi.Name, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.MetaStore.SetDefaultRetentionPolicy(m.storeDatabase, rpi.Name); err != nil {
|
||||
m.Logger.Printf("failed to set default retention policy on '%s', failed to create internal storage: %s",
|
||||
m.storeDatabase, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != meta.ErrRetentionPolicyNotFound {
|
||||
m.Logger.Printf("failed to delete retention policy 'default', failed to created internal storage: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Mark storage creation complete.
|
||||
m.storeCreated = true
|
||||
}
|
||||
|
||||
// storeStatistics writes the statistics to an InfluxDB system.
|
||||
func (m *Monitor) storeStatistics() {
|
||||
defer m.wg.Done()
|
||||
|
@ -315,37 +353,13 @@ func (m *Monitor) storeStatistics() {
|
|||
"hostname": hostname,
|
||||
}
|
||||
|
||||
if _, err := m.MetaStore.CreateDatabaseIfNotExists(m.storeDatabase); err != nil {
|
||||
m.Logger.Printf("failed to create database '%s', terminating storage: %s",
|
||||
m.storeDatabase, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
rpi := meta.NewRetentionPolicyInfo(MonitorRetentionPolicy)
|
||||
rpi.Duration = MonitorRetentionPolicyDuration
|
||||
rpi.ReplicaN = 1
|
||||
if _, err := m.MetaStore.CreateRetentionPolicyIfNotExists(m.storeDatabase, rpi); err != nil {
|
||||
m.Logger.Printf("failed to create retention policy '%s', terminating storage: %s",
|
||||
rpi.Name, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.MetaStore.SetDefaultRetentionPolicy(m.storeDatabase, rpi.Name); err != nil {
|
||||
m.Logger.Printf("failed to set default retention policy on '%s', terminating storage: %s",
|
||||
m.storeDatabase, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if err := m.MetaStore.DropRetentionPolicy(m.storeDatabase, "default"); err != nil && err != meta.ErrRetentionPolicyNotFound {
|
||||
m.Logger.Printf("failed to delete retention policy 'default', terminating storage: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
tick := time.NewTicker(m.storeInterval)
|
||||
defer tick.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-tick.C:
|
||||
m.createInternalStorage()
|
||||
|
||||
stats, err := m.Statistics(clusterTags)
|
||||
if err != nil {
|
||||
m.Logger.Printf("failed to retrieve registered statistics: %s", err)
|
||||
|
|
|
@ -42,6 +42,7 @@ type mockMetastore struct{}
|
|||
func (m *mockMetastore) ClusterID() (uint64, error) { return 1, nil }
|
||||
func (m *mockMetastore) NodeID() uint64 { return 2 }
|
||||
func (m *mockMetastore) WaitForLeader(d time.Duration) error { return nil }
|
||||
func (m *mockMetastore) IsLeader() bool { return true }
|
||||
func (m *mockMetastore) SetDefaultRetentionPolicy(database, name string) error { return nil }
|
||||
func (m *mockMetastore) DropRetentionPolicy(database, name string) error { return nil }
|
||||
func (m *mockMetastore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
|
||||
|
|
Loading…
Reference in New Issue