CQ statistics written to monitor database, addresses #8188

* off by default, enabled by `query-stats-enabled`
* writes to cq_query measurement of configured monitor database
* see CHANGELOG for schema of individual points
pull/8471/head
Stuart Carnie 2017-06-10 09:20:38 +08:00
parent 4957b3d8be
commit 2de52834f0
7 changed files with 215 additions and 39 deletions

View File

@ -1,5 +1,24 @@
## v1.3.0 [unreleased]
### Release Notes
#### Continuous Query Statistics
When enabled, each time a continuous query is completed, a number of details regarding the execution are written to the `cq_query` measurement of the internal monitor database (`_internal` by default). The tags and fields of interest are
| tag / field | description |
|:----------- |:----------------------------------------------------- |
| `db` | name of database |
| `cq` | name of continuous query |
| `duration` | query execution time in nanoseconds |
| `startTime` | lower bound of time range |
| `endTime` | upper bound of time range |
| `written` | number of points written to the target measurement |
* `startTime` and `endTime` are UNIX timestamps, in nanoseconds.
* The number of points written is also included in CQ log messages.
### Removals
The admin UI is removed and unusable in this release. The `[admin]` configuration section will be ignored.
@ -16,6 +35,10 @@ The following new configuration options are available.
* `max-body-size` was added with a default of 25,000,000, but can be disabled by setting it to 0.
Specifies the maximum size (in bytes) of a client request body. When a client sends data that exceeds
the configured maximum size, a `413 Request Entity Too Large` HTTP response is returned.
#### `[continuous_queries]` Section
* `query-stats-enabled` was added with a default of `false`. When set to `true`, continuous query execution statistics are written to the default monitor store.
### Features
@ -43,6 +66,7 @@ The following new configuration options are available.
- [#8390](https://github.com/influxdata/influxdb/issues/8390): Add nanosecond duration literal support.
- [#8394](https://github.com/influxdata/influxdb/pull/8394): Optimize top() and bottom() using an incremental aggregator.
- [#7129](https://github.com/influxdata/influxdb/issues/7129): Maintain the tags of points selected by top() or bottom() when writing the results.
- [#8188](https://github.com/influxdata/influxdb/issues/8188): Write CQ stats to _internal
### Bugfixes

View File

@ -338,6 +338,7 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
srv := continuous_querier.NewService(c)
srv.MetaClient = s.MetaClient
srv.QueryExecutor = s.QueryExecutor
srv.Monitor = s.Monitor
s.Services = append(s.Services, srv)
}

View File

@ -429,5 +429,8 @@
# Controls whether queries are logged when executed by the CQ service.
# log-enabled = true
# Controls whether queries are logged to the self-monitoring data store.
# query-stats-enabled = false
# interval for how often continuous queries will be checked if they need to run
# run-interval = "1s"

View File

@ -117,6 +117,9 @@ func (m *Monitor) Open() error {
// If enabled, record stats in a InfluxDB system.
if m.storeEnabled {
hostname, _ := os.Hostname()
m.SetGlobalTag("hostname", hostname)
// Start periodic writes to system.
m.wg.Add(1)
go m.storeStatistics()
@ -125,6 +128,32 @@ func (m *Monitor) Open() error {
return nil
}
func (m *Monitor) Enabled() bool { return m.storeEnabled }
func (m *Monitor) WritePoints(p models.Points) error {
if !m.storeEnabled {
return nil
}
if len(m.globalTags) > 0 {
for _, pp := range p {
pp.SetTags(pp.Tags().Merge(m.globalTags))
}
}
return m.writePoints(p)
}
func (m *Monitor) writePoints(p models.Points) error {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, p); err != nil {
m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err))
}
return nil
}
// Close closes the monitor system.
func (m *Monitor) Close() error {
if !m.open() {
@ -386,9 +415,6 @@ func (m *Monitor) storeStatistics() {
m.Logger.Info(fmt.Sprintf("Storing statistics in database '%s' retention policy '%s', at interval %s",
m.storeDatabase, m.storeRetentionPolicy, m.storeInterval))
hostname, _ := os.Hostname()
m.SetGlobalTag("hostname", hostname)
// Wait until an even interval to start recording monitor statistics.
// If we are interrupted before the interval for some reason, exit early.
if err := m.waitUntilInterval(m.storeInterval); err != nil {
@ -424,14 +450,7 @@ func (m *Monitor) storeStatistics() {
points = append(points, pt)
}
func() {
m.mu.RLock()
defer m.mu.RUnlock()
if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil {
m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err))
}
}()
m.writePoints(points)
case <-m.done:
m.Logger.Info(fmt.Sprintf("terminating storage of statistics"))
return

View File

@ -16,12 +16,16 @@ const (
// Config represents a configuration for the continuous query service.
type Config struct {
// Enables logging in CQ service to display when CQ's are processed and how many points are wrote.
// Enables logging in CQ service to display when CQ's are processed and how many points were written.
LogEnabled bool `toml:"log-enabled"`
// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
Enabled bool `toml:"enabled"`
// QueryStatsEnabled enables logging of individual query execution statistics to the self-monitoring data
// store. The default is false.
QueryStatsEnabled bool `toml:"query-stats-enabled"`
// Run interval for checking continuous queries. This should be set to the least common factor
// of the interval for running continuous queries. If you only aggregate continuous queries
// every minute, this should be set to 1 minute. The default is set to '1s' so the interval
@ -32,9 +36,10 @@ type Config struct {
// NewConfig returns a new instance of Config with defaults.
func NewConfig() Config {
return Config{
LogEnabled: true,
Enabled: true,
RunInterval: toml.Duration(DefaultRunInterval),
LogEnabled: true,
Enabled: true,
QueryStatsEnabled: false,
RunInterval: toml.Duration(DefaultRunInterval),
}
}
@ -62,7 +67,8 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
}
return diagnostics.RowFromMap(map[string]interface{}{
"enabled": true,
"run-interval": c.RunInterval,
"enabled": true,
"query-stats-enabled": c.QueryStatsEnabled,
"run-interval": c.RunInterval,
}), nil
}

View File

@ -67,17 +67,29 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
return false
}
type Monitor interface {
Enabled() bool
WritePoints(models.Points) error
}
type nullMonitor int
func (nullMonitor) Enabled() bool { return false }
func (nullMonitor) WritePoints(models.Points) error { return nil }
// Service manages continuous query execution.
type Service struct {
MetaClient metaClient
QueryExecutor *influxql.QueryExecutor
Monitor Monitor
Config *Config
RunInterval time.Duration
// RunCh can be used by clients to signal service to run CQs.
RunCh chan *RunRequest
Logger zap.Logger
loggingEnabled bool
stats *Statistics
RunCh chan *RunRequest
Logger zap.Logger
loggingEnabled bool
queryStatsEnabled bool
stats *Statistics
// lastRuns maps CQ name to last time it was run.
mu sync.RWMutex
lastRuns map[string]time.Time
@ -88,13 +100,15 @@ type Service struct {
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
s := &Service{
Config: &c,
RunInterval: time.Duration(c.RunInterval),
RunCh: make(chan *RunRequest),
loggingEnabled: c.LogEnabled,
Logger: zap.New(zap.NullEncoder()),
stats: &Statistics{},
lastRuns: map[string]time.Time{},
Config: &c,
Monitor: nullMonitor(0),
RunInterval: time.Duration(c.RunInterval),
RunCh: make(chan *RunRequest),
loggingEnabled: c.LogEnabled,
queryStatsEnabled: c.QueryStatsEnabled,
Logger: zap.New(zap.NullEncoder()),
stats: &Statistics{},
lastRuns: map[string]time.Time{},
}
return s
@ -141,6 +155,11 @@ type Statistics struct {
QueryFail int64
}
type statistic struct {
ok uint64
fail uint64
}
// Statistics returns statistics for periodic monitoring.
func (s *Service) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
@ -342,25 +361,49 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti
}
var start time.Time
if s.loggingEnabled {
s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime))
if s.loggingEnabled || s.queryStatsEnabled {
start = time.Now()
}
if s.loggingEnabled {
s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime))
}
// Do the actual processing of the query & writing of results.
if err := s.runContinuousQueryAndWriteResult(cq); err != nil {
s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String()))
return false, err
res := s.runContinuousQueryAndWriteResult(cq)
if res.Err != nil {
s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", res.Err, cq.q.String()))
return false, res.Err
}
var execDuration time.Duration
if s.loggingEnabled || s.queryStatsEnabled {
execDuration = time.Since(start)
}
// extract number of points written from SELECT ... INTO result
var written int64 = -1
if len(res.Series) == 1 && len(res.Series[0].Values) == 1 {
s := res.Series[0]
written = s.Values[0][1].(int64)
}
if s.loggingEnabled {
s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Since(start)))
s.Logger.Info(fmt.Sprintf("finished continuous query %s, %d points(s) written (%v to %v) in %s", cq.Info.Name, written, startTime, endTime, execDuration))
}
if s.queryStatsEnabled && s.Monitor.Enabled() {
tags := map[string]string{"db": dbi.Name, "cq": cq.Info.Name}
fields := map[string]interface{}{"duration": int64(execDuration), "written": written, "startTime": startTime.UnixNano(), "endTime": endTime.UnixNano()}
p, _ := models.NewPoint("cq_query", models.NewTags(tags), fields, time.Now())
s.Monitor.WritePoints(models.Points{p})
}
return true, nil
}
// runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in
func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) *influxql.Result {
// Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor.
q := &influxql.Query{
Statements: influxql.Statements([]influxql.Statement{cq.q}),
@ -379,10 +422,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
if !ok {
panic("result channel was closed")
}
if res.Err != nil {
return res.Err
}
return nil
return res
}
// ContinuousQuery is a local wrapper / helper around continuous queries.

View File

@ -9,6 +9,7 @@ import (
"time"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/uber-go/zap"
)
@ -376,6 +377,80 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
}
}
func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) {
s := NewTestService(t)
const writeN = int64(50)
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
ctx.Results <- &influxql.Result{
Series: []*models.Row{{
Name: "result",
Columns: []string{"time", "written"},
Values: [][]interface{}{{time.Time{}, writeN}},
}},
}
return nil
},
}
s.queryStatsEnabled = true
var point models.Point
s.Monitor = &monitor{
EnabledFn: func() bool { return true },
WritePointsFn: func(p models.Points) error {
if len(p) != 1 {
t.Fatalf("expected point")
}
point = p[0]
return nil
},
}
dbis := s.MetaClient.Databases()
dbi := dbis[0]
cqi := dbi.ContinuousQueries[0]
now := time.Now().Truncate(10 * time.Minute)
if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil {
t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err)
}
if point == nil {
t.Fatal("expected Monitor.WritePoints call")
}
f, _ := point.Fields()
if got, ok := f["written"].(int64); !ok || got != writeN {
t.Errorf("unexpected value for written; exp=%d, got=%d", writeN, got)
}
}
func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) {
s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
ctx.Results <- &influxql.Result{}
return nil
},
}
s.Monitor = &monitor{
EnabledFn: func() bool { return true },
WritePointsFn: func(p models.Points) error {
t.Fatalf("unexpected Monitor.WritePoints call")
return nil
},
}
dbis := s.MetaClient.Databases()
dbi := dbis[0]
cqi := dbi.ContinuousQueries[0]
now := time.Now().Truncate(10 * time.Minute)
if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil {
t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err)
}
}
// NewTestService returns a new *Service with default mock object members.
func NewTestService(t *testing.T) *Service {
s := NewService(NewConfig())
@ -534,3 +609,11 @@ func wait(c chan struct{}, d time.Duration) (err error) {
}
return
}
type monitor struct {
EnabledFn func() bool
WritePointsFn func(models.Points) error
}
func (m *monitor) Enabled() bool { return m.EnabledFn() }
func (m *monitor) WritePoints(p models.Points) error { return m.WritePointsFn(p) }