Merge pull request #1936 from influxdb/server_stats2

Implement server stats and self-monitoring
pull/1968/head v0.9.0-rc12
Todd Persen 2015-03-15 20:48:29 -07:00
commit 86ede78836
12 changed files with 399 additions and 4 deletions

View File

@ -9,6 +9,9 @@
- [#1955](https://github.com/influxdb/influxdb/pull/1955): Prohibit creation of databases with no name. Thanks @dullgiulio
- [#1952](https://github.com/influxdb/influxdb/pull/1952): Handle delete statement with an error. Thanks again to @dullgiulio
### Features
- [#1936](https://github.com/influxdb/influxdb/pull/1936): Implement "SHOW STATS" and self-monitoring
## v0.9.0-rc11 [2015-03-13]
### Bugfixes

View File

@ -110,6 +110,13 @@ type Config struct {
RaftTracing bool `toml:"raft-tracing"`
} `toml:"logging"`
Statistics struct {
Enabled bool `toml:"enabled"`
Database string `toml:"database"`
RetentionPolicy string `toml:"retention-policy"`
WriteInterval Duration `toml:"write-interval"`
}
ContinuousQuery struct {
// when continuous queries are run we'll automatically recompute previous intervals
// in case lagged data came in. Set to zero if you never have lagged data. We do
@ -166,6 +173,11 @@ func NewConfig() *Config {
c.ContinuousQuery.Disable = false
c.ReportingDisabled = false
c.Statistics.Enabled = false
c.Statistics.Database = "_internal"
c.Statistics.RetentionPolicy = "default"
c.Statistics.WriteInterval = Duration(1 * time.Minute)
// Detect hostname (or set to localhost).
if c.Hostname, _ = os.Hostname(); c.Hostname == "" {
c.Hostname = "localhost"

View File

@ -177,6 +177,12 @@ file = "influxdb.log"
write-tracing = true
raft-tracing = true
[statistics]
enabled = true
database = "_internal"
retention-policy = "default"
write-interval = "1m"
# Configure the admin server
[admin]
enabled = true

View File

@ -180,6 +180,27 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
log.Fatalf("failed to start %s Graphite server: %s", c.Protocol, err.Error())
}
}
// Start up self-monitoring if enabled.
if config.Statistics.Enabled {
database := config.Statistics.Database
policy := config.Statistics.RetentionPolicy
interval := time.Duration(config.Statistics.WriteInterval)
// Ensure database exists.
if err := s.CreateDatabaseIfNotExists(database); err != nil {
log.Fatalf("failed to create database %s for internal statistics: %s", database, err.Error())
}
// Ensure retention policy exists.
rp := influxdb.NewRetentionPolicy(policy)
if err := s.CreateRetentionPolicyIfNotExists(database, rp); err != nil {
log.Fatalf("failed to create retention policy for internal statistics: %s", err.Error())
}
s.StartSelfMonitoring(database, policy, interval)
log.Printf("started self-monitoring at interval of %s", interval)
}
}
// unless disabled, start the loop to report anonymous usage stats every 24h

View File

@ -88,3 +88,12 @@ dir = "/tmp/influxdb/development/state"
file = "/var/log/influxdb/influxd.log" # Leave blank to redirect logs to stderr.
write-tracing = false # If true, enables detailed logging of the write system.
raft-tracing = false # If true, enables detailed logging of Raft consensus.
# InfluxDB can store statistics about itself. This is useful for monitoring purposes.
# This feature is disabled by default, but if enabled, these statistics can be queried
# as any other data.
[statistics]
enabled = false
database = "_internal" # The database to which the data is written.
retention-policy = "default" # The retention policy within the database.
write-interval = "1m" # Period between writing the data.

View File

@ -78,6 +78,7 @@ func (*ShowFieldKeysStatement) node() {}
func (*ShowRetentionPoliciesStatement) node() {}
func (*ShowMeasurementsStatement) node() {}
func (*ShowSeriesStatement) node() {}
func (*ShowStatsStatement) node() {}
func (*ShowTagKeysStatement) node() {}
func (*ShowTagValuesStatement) node() {}
func (*ShowUsersStatement) node() {}
@ -169,6 +170,7 @@ func (*ShowFieldKeysStatement) stmt() {}
func (*ShowMeasurementsStatement) stmt() {}
func (*ShowRetentionPoliciesStatement) stmt() {}
func (*ShowSeriesStatement) stmt() {}
func (*ShowStatsStatement) stmt() {}
func (*ShowTagKeysStatement) stmt() {}
func (*ShowTagValuesStatement) stmt() {}
func (*ShowUsersStatement) stmt() {}
@ -1365,6 +1367,27 @@ func (s *ShowRetentionPoliciesStatement) RequiredPrivileges() ExecutionPrivilege
return ExecutionPrivileges{{Name: "", Privilege: ReadPrivilege}}
}
// ShowRetentionPoliciesStatement represents a command for displaying stats for a given server.
type ShowStatsStatement struct {
// Hostname or IP of the server for stats.
Host string
}
// String returns a string representation of a ShowStatsStatement.
func (s *ShowStatsStatement) String() string {
var buf bytes.Buffer
_, _ = buf.WriteString("SHOW STATS ")
if s.Host != "" {
_, _ = buf.WriteString(s.Host)
}
return buf.String()
}
// RequiredPrivileges returns the privilege(s) required to execute a ShowStatsStatement
func (s *ShowStatsStatement) RequiredPrivileges() ExecutionPrivileges {
return ExecutionPrivileges{{Name: "", Privilege: AllPrivileges}}
}
// ShowTagKeysStatement represents a command for listing tag keys.
type ShowTagKeysStatement struct {
// Data source that fields are extracted from.

View File

@ -119,6 +119,8 @@ func (p *Parser) parseShowStatement() (Statement, error) {
return nil, newParseError(tokstr(tok, lit), []string{"POLICIES"}, pos)
case SERIES:
return p.parseShowSeriesStatement()
case STATS:
return p.parseShowStatsStatement()
case TAG:
tok, pos, lit := p.scanIgnoreWhitespace()
if tok == KEYS {
@ -1172,6 +1174,21 @@ func (p *Parser) parseRetentionPolicy() (name string, dfault bool, err error) {
return
}
// parseShowStatsStatement parses a string and returns a ShowStatsStatement.
// This function assumes the "SHOW STATS" tokens have already been consumed.
func (p *Parser) parseShowStatsStatement() (*ShowStatsStatement, error) {
stmt := &ShowStatsStatement{}
var err error
if tok, _, _ := p.scanIgnoreWhitespace(); tok == ON {
stmt.Host, err = p.parseString()
} else {
p.unscan()
}
return stmt, err
}
// parseDropContinuousQueriesStatement parses a string and returns a DropContinuousQueryStatement.
// This function assumes the "DROP CONTINUOUS" tokens have already been consumed.
func (p *Parser) parseDropContinuousQueryStatement() (*DropContinuousQueryStatement, error) {

View File

@ -769,6 +769,26 @@ func TestParser_ParseStatement(t *testing.T) {
stmt: newAlterRetentionPolicyStatement("policy1", "testdb", -1, 4, false),
},
// SHOW STATS
{
s: `SHOW STATS`,
stmt: &influxql.ShowStatsStatement{
Host: "",
},
},
{
s: `SHOW STATS ON 'servera'`,
stmt: &influxql.ShowStatsStatement{
Host: "servera",
},
},
{
s: `SHOW STATS ON '192.167.1.44'`,
stmt: &influxql.ShowStatsStatement{
Host: "192.167.1.44",
},
},
// Errors
{s: ``, err: `found EOF, expected SELECT at line 1, char 1`},
{s: `SELECT`, err: `found EOF, expected identifier, string, number, bool at line 1, char 8`},
@ -798,6 +818,7 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`},
{s: `SHOW FOO`, err: `found FOO, expected CONTINUOUS, DATABASES, FIELD, MEASUREMENTS, RETENTION, SERIES, SERVERS, TAG, USERS at line 1, char 6`},
{s: `SHOW STATS ON`, err: `found EOF, expected string at line 1, char 15`},
{s: `DROP CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 17`},
{s: `DROP CONTINUOUS QUERY`, err: `found EOF, expected identifier at line 1, char 23`},
{s: `CREATE CONTINUOUS`, err: `found EOF, expected QUERY at line 1, char 19`},

View File

@ -106,6 +106,7 @@ const (
SERVERS
SHOW
SLIMIT
STATS
SOFFSET
TAG
TO
@ -208,6 +209,7 @@ var tokens = [...]string{
SHOW: "SHOW",
SLIMIT: "SLIMIT",
SOFFSET: "SOFFSET",
STATS: "STATS",
TAG: "TAG",
TO: "TO",
USER: "USER",

View File

@ -72,6 +72,7 @@ type Server struct {
shards map[uint64]*Shard // shards by shard id
stats *Stats
Logger *log.Logger
WriteTrace bool // Detailed logging of write path
@ -104,6 +105,7 @@ func NewServer() *Server {
users: make(map[string]*User),
shards: make(map[uint64]*Shard),
stats: NewStats("server"),
Logger: log.New(os.Stderr, "[server] ", log.LstdFlags),
}
// Server will always return with authentication enabled.
@ -321,6 +323,32 @@ func (s *Server) load() error {
})
}
func (s *Server) StartSelfMonitoring(database, retention string, interval time.Duration) error {
if interval == 0 {
return fmt.Errorf("statistics check interval must be non-zero")
}
go func() {
tick := time.NewTicker(interval)
for {
<-tick.C
// Create the data point and write it.
point := Point{
Name: s.stats.Name(),
Tags: map[string]string{"raftID": strconv.FormatUint(s.id, 10)},
Fields: make(map[string]interface{}),
}
s.stats.Walk(func(k string, v int64) {
point.Fields[k] = int(v)
})
s.WriteSeries(database, retention, []Point{point})
}
}()
return nil
}
// StartRetentionPolicyEnforcement launches retention policy enforcement.
func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error {
if checkInterval == 0 {
@ -458,6 +486,8 @@ func (s *Server) Client() MessagingClient {
// This function waits until the message has been processed by the server.
// Returns the broker log index of the message or an error.
func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, error) {
s.stats.Inc("broadcastMessageTx")
// Encode the command.
data, err := json.Marshal(c)
if err != nil {
@ -764,7 +794,12 @@ func (s *Server) CreateDatabaseIfNotExists(name string) error {
if s.DatabaseExists(name) {
return nil
}
return s.CreateDatabase(name)
// Small chance database could have been created even though the check above said it didn't.
if err := s.CreateDatabase(name); err != nil && err != ErrDatabaseExists {
return err
}
return nil
}
func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) {
@ -943,6 +978,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
nodeIndex++
}
}
s.stats.Add("shardsCreated", int64(len(g.Shards)))
// Retention policy has a new shard group, so update the policy.
rp.shardGroups = append(rp.shardGroups, g)
@ -1023,6 +1059,7 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
// Remove from metastore.
rp.removeShardGroupByID(c.ID)
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error {
s.stats.Add("shardsDeleted", int64(len(g.Shards)))
return tx.saveDatabase(db)
})
return
@ -1274,6 +1311,13 @@ func (s *Server) RetentionPolicies(database string) ([]*RetentionPolicy, error)
return a, nil
}
// RetentionPolicyExists returns true if a retention policy exists for a given database.
func (s *Server) RetentionPolicyExists(database, retention string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.DatabaseExists(database) && s.databases[database].policies[retention] != nil
}
// CreateRetentionPolicy creates a retention policy for a database.
func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) error {
// Enforce duration of at least retentionPolicyMinDuration
@ -1292,6 +1336,18 @@ func (s *Server) CreateRetentionPolicy(database string, rp *RetentionPolicy) err
return err
}
// CreateRetentionPolicy creates a retention policy for a database.
func (s *Server) CreateRetentionPolicyIfNotExists(database string, rp *RetentionPolicy) error {
// Ensure retention policy exists.
if !s.RetentionPolicyExists(database, rp.Name) {
// Small chance retention policy could be created after it didn't exist when checked.
if err := s.CreateRetentionPolicy(database, rp); err != nil && err != ErrRetentionPolicyExists {
return err
}
}
return nil
}
func calculateShardGroupDuration(d time.Duration) time.Duration {
const (
day = time.Hour * 24
@ -1508,7 +1564,15 @@ type Point struct {
// WriteSeries writes series data to the database.
// Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (uint64, error) {
func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (idx uint64, err error) {
s.stats.Inc("batchWriteRx")
s.stats.Add("pointWriteRx", int64(len(points)))
defer func() {
if err != nil {
s.stats.Inc("batchWriteRxError")
}
}()
if s.WriteTrace {
log.Printf("received write for database '%s', retention policy '%s', with %d points",
database, retentionPolicy, len(points))
@ -1613,7 +1677,6 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
}
// Write data for each shard to the Broker.
var err error
var maxIndex uint64
for i, d := range shardData {
assert(len(d) > 0, "raw series data required: topic=%d", i)
@ -1626,6 +1689,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
if err != nil {
return maxIndex, err
}
s.stats.Inc("writeSeriesMessageTx")
if index > maxIndex {
maxIndex = index
}
@ -1634,7 +1698,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
}
}
return maxIndex, err
return maxIndex, nil
}
// createMeasurementsIfNotExists walks the "points" and ensures that all new Series are created, and all
@ -1937,6 +2001,8 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re
res = s.executeShowTagValuesStatement(stmt, database, user)
case *influxql.ShowFieldKeysStatement:
res = s.executeShowFieldKeysStatement(stmt, database, user)
case *influxql.ShowStatsStatement:
res = s.executeShowStatsStatement(stmt, user)
case *influxql.GrantStatement:
res = s.executeGrantStatement(stmt, user)
case *influxql.RevokeStatement:
@ -2442,6 +2508,17 @@ func (s *Server) executeShowContinuousQueriesStatement(stmt *influxql.ShowContin
return &Result{Series: rows}
}
func (s *Server) executeShowStatsStatement(stmt *influxql.ShowStatsStatement, user *User) *Result {
row := &influxql.Row{Columns: []string{}}
row.Name = s.stats.Name()
s.stats.Walk(func(k string, v int64) {
row.Columns = append(row.Columns, k)
row.Values = append(row.Values, []interface{}{v})
})
return &Result{Series: []*influxql.Row{row}}
}
// filterMeasurementsByExpr filters a list of measurements by a tags expression.
func filterMeasurementsByExpr(measurements Measurements, expr influxql.Expr) (Measurements, error) {
// Create a list to hold result measurements.
@ -2816,6 +2893,7 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) {
// All messages must be processed under lock.
func() {
s.stats.Inc("broadcastMessageRx")
s.mu.Lock()
defer s.mu.Unlock()
@ -3279,6 +3357,7 @@ func (s *Server) shouldRunContinuousQuery(cq *ContinuousQuery) bool {
// runContinuousQuery will execute a continuous query
// TODO: make this fan out to the cluster instead of running all the queries on this single data node
func (s *Server) runContinuousQuery(cq *ContinuousQuery) {
s.stats.Inc("continuousQueryExecuted")
cq.mu.Lock()
defer cq.mu.Unlock()

114
stats.go Normal file
View File

@ -0,0 +1,114 @@
package influxdb
import (
"sync"
)
// Int representes a 64-bit signed integer which can be updated atomically.
type Int struct {
mu sync.RWMutex
i int64
}
// NewInt returns a new Int
func NewInt(v int64) *Int {
return &Int{i: v}
}
// Add atomically adds the given delta to the Int.
func (i *Int) Add(delta int64) {
i.mu.Lock()
defer i.mu.Unlock()
i.i += delta
}
// Stats represents a collection of metrics, as key-value pairs.
type Stats struct {
name string
m map[string]*Int
mu sync.RWMutex
}
// NewStats returns a Stats object with the given name.
func NewStats(name string) *Stats {
return &Stats{
name: name,
m: make(map[string]*Int),
}
}
// Add adds delta to the stat indiciated by key.
func (s *Stats) Add(key string, delta int64) {
s.mu.RLock()
i, ok := s.m[key]
s.mu.RUnlock()
if !ok {
// check again under the write lock
s.mu.Lock()
i, ok = s.m[key]
if !ok {
i = new(Int)
s.m[key] = i
}
s.mu.Unlock()
}
i.Add(delta)
}
// Inc simply increments the given key by 1.
func (s *Stats) Inc(key string) {
s.Add(key, 1)
}
// Get returns a value for a given key.
func (s *Stats) Get(key string) int64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.m[key].i
}
// Set sets a value for the given key.
func (s *Stats) Set(key string, v int64) {
s.mu.Lock()
defer s.mu.Unlock()
s.m[key] = NewInt(v)
}
// Name returns the name of the Stats object.
func (s *Stats) Name() string {
return s.name
}
// Walk calls f for each entry in the stats. The stats are locked
// during the walk but existing entries may be concurrently updated.
func (s *Stats) Walk(f func(string, int64)) {
s.mu.RLock()
defer s.mu.RUnlock()
for k, v := range s.m {
f(k, v.i)
}
}
// Diff returns the difference between two sets of stats. The result is undefined
// if the two Stats objects do not contain the same keys.
func (s *Stats) Diff(other *Stats) *Stats {
diff := NewStats(s.name)
s.Walk(func(k string, v int64) {
diff.Set(k, v-other.Get(k))
})
return diff
}
// Snapshot returns a copy of the stats object. Addition and removal of stats keys
// is blocked during the created of the snapshot, but existing entries may be
// concurrently updated.
func (s *Stats) Snapshot() *Stats {
snap := NewStats(s.name)
s.Walk(func(k string, v int64) {
snap.Set(k, s.m[k].i)
})
return snap
}

88
stats_test.go Normal file
View File

@ -0,0 +1,88 @@
package influxdb_test
import (
"testing"
"github.com/influxdb/influxdb"
)
func TestStats_SetAndGet(t *testing.T) {
s := influxdb.NewStats("foo")
s.Set("a", 100)
if s.Get("a") != 100 {
t.Fatalf("stats set failed, expected 100, got %d", s.Get("a"))
}
}
func TestStats_Add(t *testing.T) {
s := influxdb.NewStats("foo")
s.Add("a", 200)
if s.Get("a") != 200 {
t.Fatalf("stats set failed, expected 200, got %d", s.Get("a"))
}
}
func TestStats_Inc(t *testing.T) {
s := influxdb.NewStats("foo")
s.Set("a", 100)
s.Inc("a")
if s.Get("a") != 101 {
t.Fatalf("stats Inc failed, expected 101, got %d", s.Get("a"))
}
s.Inc("b")
if s.Get("b") != 1 {
t.Fatalf("stats Inc failed, expected 1, got %d", s.Get("b"))
}
}
func TestStats_AddNegative(t *testing.T) {
s := influxdb.NewStats("foo")
s.Add("a", -200)
if s.Get("a") != -200 {
t.Fatalf("stats set failed, expected -200, got %d", s.Get("a"))
}
}
func TestStats_SetAndAdd(t *testing.T) {
s := influxdb.NewStats("foo")
s.Set("a", 100)
s.Add("a", 200)
if s.Get("a") != 300 {
t.Fatalf("stats set failed, expected 300, got %d", s.Get("a"))
}
}
func TestStats_Diff(t *testing.T) {
foo := influxdb.NewStats("server")
bar := influxdb.NewStats("server")
foo.Set("a", 100)
foo.Set("b", 600)
bar.Set("a", 450)
bar.Set("b", 525)
qux := bar.Diff(foo)
if qux.Name() != "server" {
t.Fatalf("stats diff has unexpected name: %s", qux.Name())
}
if qux.Get("a") != 350 || qux.Get("b") != -75 {
t.Fatalf("stats diff returned unexpected result: %v", qux)
}
}
func TestStats_Snapshot(t *testing.T) {
foo := influxdb.NewStats("server")
foo.Set("a", 100)
foo.Set("b", 600)
bar := foo.Snapshot()
if bar.Name() != "server" || bar.Get("a") != 100 || bar.Get("b") != 600 {
t.Fatalf("stats snapshot returned unexpected result: %s", bar)
}
}