Monitor retention policy is configurable
parent
cfd0acdbc5
commit
214cfea53c
|
@ -95,6 +95,9 @@ reporting-disabled = false
|
|||
[monitor]
|
||||
store-enabled = true # Whether to record statistics internally.
|
||||
store-database = "_internal" # The destination database for recorded statistics
|
||||
store-retention-policy = "monitor" # The destination retention policy
|
||||
store-retention-duration = "168h" # How long to keep the data
|
||||
store-replication-factor = 1 # How many copies of the data to keep
|
||||
store-interval = "1m" # The interval at which to record statistics
|
||||
|
||||
###
|
||||
|
|
|
@ -14,22 +14,37 @@ const (
|
|||
// DefaultStoreDatabase is the name of the database where gathered information is written
|
||||
DefaultStoreDatabase = "_internal"
|
||||
|
||||
// DefaultStoreRetentionPolicy is the name of the retention policy for monitor data.
|
||||
DefaultStoreRetentionPolicy = "monitor"
|
||||
|
||||
// DefaultRetentionPolicyDuration is the duration the data is retained.
|
||||
DefaultStoreRetentionPolicyDuration = 168 * time.Hour
|
||||
|
||||
// DefaultStoreReplicationFactor is the default replication factor for the data.
|
||||
DefaultStoreReplicationFactor = 1
|
||||
|
||||
// DefaultStoreInterval is the period between storing gathered information.
|
||||
DefaultStoreInterval = time.Minute
|
||||
)
|
||||
|
||||
// Config represents the configuration for the monitor service.
|
||||
type Config struct {
|
||||
StoreEnabled bool `toml:"store-enabled"`
|
||||
StoreDatabase string `toml:"store-database"`
|
||||
StoreInterval toml.Duration `toml:"store-interval"`
|
||||
StoreEnabled bool `toml:"store-enabled"`
|
||||
StoreDatabase string `toml:"store-database"`
|
||||
StoreRetentionPolicy string `toml:"store-retention-policy"`
|
||||
StoreRetentionDuration toml.Duration `toml:"store-retention-duration"`
|
||||
StoreReplicationFactor int `toml:"store-replication-factor"`
|
||||
StoreInterval toml.Duration `toml:"store-interval"`
|
||||
}
|
||||
|
||||
// NewConfig returns an instance of Config with defaults.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
StoreEnabled: true,
|
||||
StoreDatabase: DefaultStoreDatabase,
|
||||
StoreInterval: toml.Duration(DefaultStoreInterval),
|
||||
StoreEnabled: true,
|
||||
StoreDatabase: DefaultStoreDatabase,
|
||||
StoreRetentionPolicy: DefaultStoreRetentionPolicy,
|
||||
StoreRetentionDuration: toml.Duration(DefaultStoreRetentionPolicyDuration),
|
||||
StoreReplicationFactor: DefaultStoreReplicationFactor,
|
||||
StoreInterval: toml.Duration(DefaultStoreInterval),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,9 @@ func TestConfig_Parse(t *testing.T) {
|
|||
if _, err := toml.Decode(`
|
||||
store-enabled=true
|
||||
store-database="the_db"
|
||||
store-retention-policy="the_rp"
|
||||
store-retention-duration="1h"
|
||||
store-replication-factor=1234
|
||||
store-interval="10m"
|
||||
`, &c); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -24,6 +27,12 @@ store-interval="10m"
|
|||
t.Fatalf("unexpected store-enabled: %v", c.StoreEnabled)
|
||||
} else if c.StoreDatabase != "the_db" {
|
||||
t.Fatalf("unexpected store-database: %s", c.StoreDatabase)
|
||||
} else if c.StoreRetentionPolicy != "the_rp" {
|
||||
t.Fatalf("unexpected store-retention-policy: %s", c.StoreRetentionPolicy)
|
||||
} else if time.Duration(c.StoreRetentionDuration) != 1*time.Hour {
|
||||
t.Fatalf("unexpected store-retention-duration: %s", c.StoreRetentionDuration)
|
||||
} else if c.StoreReplicationFactor != 1234 {
|
||||
t.Fatalf("unexpected store-replication-factor: %d", c.StoreReplicationFactor)
|
||||
} else if time.Duration(c.StoreInterval) != 10*time.Minute {
|
||||
t.Fatalf("unexpected store-interval: %s", c.StoreInterval)
|
||||
}
|
||||
|
|
|
@ -63,16 +63,20 @@ type Monitor struct {
|
|||
|
||||
diagRegistrations map[string]DiagsClient
|
||||
|
||||
storeEnabled bool
|
||||
storeDatabase string
|
||||
storeAddress string
|
||||
storeInterval time.Duration
|
||||
storeEnabled bool
|
||||
storeDatabase string
|
||||
storeRetentionPolicy string
|
||||
storeRetentionDuration time.Duration
|
||||
storeReplicationFactor int
|
||||
storeAddress string
|
||||
storeInterval time.Duration
|
||||
|
||||
MetaStore interface {
|
||||
ClusterID() (uint64, error)
|
||||
NodeID() uint64
|
||||
WaitForLeader(d time.Duration) error
|
||||
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicyIfNotExists(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error)
|
||||
}
|
||||
|
||||
PointsWriter interface {
|
||||
|
@ -85,12 +89,15 @@ type Monitor struct {
|
|||
// New returns a new instance of the monitor system.
|
||||
func New(c Config) *Monitor {
|
||||
return &Monitor{
|
||||
done: make(chan struct{}),
|
||||
diagRegistrations: make(map[string]DiagsClient),
|
||||
storeEnabled: c.StoreEnabled,
|
||||
storeDatabase: c.StoreDatabase,
|
||||
storeInterval: time.Duration(c.StoreInterval),
|
||||
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
|
||||
done: make(chan struct{}),
|
||||
diagRegistrations: make(map[string]DiagsClient),
|
||||
storeEnabled: c.StoreEnabled,
|
||||
storeDatabase: c.StoreDatabase,
|
||||
storeRetentionPolicy: c.StoreRetentionPolicy,
|
||||
storeRetentionDuration: time.Duration(c.StoreRetentionDuration),
|
||||
storeReplicationFactor: c.StoreReplicationFactor,
|
||||
storeInterval: time.Duration(c.StoreInterval),
|
||||
Logger: log.New(os.Stderr, "[monitor] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -255,8 +262,8 @@ func (m *Monitor) Diagnostics() (map[string]*Diagnostic, error) {
|
|||
// storeStatistics writes the statistics to an InfluxDB system.
|
||||
func (m *Monitor) storeStatistics() {
|
||||
defer m.wg.Done()
|
||||
m.Logger.Printf("storing statistics in database '%s', interval %s",
|
||||
m.storeDatabase, m.storeInterval)
|
||||
m.Logger.Printf("Storing statistics in database '%s' retention policy '%s', at interval %s",
|
||||
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)
|
||||
|
||||
if err := m.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
|
||||
m.Logger.Printf("failed to detect a cluster leader, terminating storage: %s", err.Error())
|
||||
|
@ -269,6 +276,16 @@ func (m *Monitor) storeStatistics() {
|
|||
return
|
||||
}
|
||||
|
||||
rpi := meta.NewRetentionPolicyInfo(m.storeRetentionPolicy)
|
||||
rpi.Duration = m.storeRetentionDuration
|
||||
rpi.ReplicaN = m.storeReplicationFactor
|
||||
|
||||
if _, err := m.MetaStore.CreateRetentionPolicyIfNotExists(m.storeDatabase, rpi); err != nil {
|
||||
m.Logger.Printf("failed to create retention policy '%s', terminating storage: %s",
|
||||
m.storeRetentionPolicy, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
tick := time.NewTicker(m.storeInterval)
|
||||
defer tick.Stop()
|
||||
for {
|
||||
|
@ -287,7 +304,7 @@ func (m *Monitor) storeStatistics() {
|
|||
|
||||
err = m.PointsWriter.WritePoints(&cluster.WritePointsRequest{
|
||||
Database: m.storeDatabase,
|
||||
RetentionPolicy: "",
|
||||
RetentionPolicy: m.storeRetentionPolicy,
|
||||
ConsistencyLevel: cluster.ConsistencyLevelOne,
|
||||
Points: points,
|
||||
})
|
||||
|
|
|
@ -45,6 +45,9 @@ func (m *mockMetastore) WaitForLeader(d time.Duration) error { return nil }
|
|||
func (m *mockMetastore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockMetastore) CreateRetentionPolicyIfNotExists(database string, rpi *meta.RetentionPolicyInfo) (*meta.RetentionPolicyInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func openMonitor(t *testing.T) *Monitor {
|
||||
monitor := New(NewConfig())
|
||||
|
|
Loading…
Reference in New Issue