From 2de52834f0aff9169b8585f58a1e438753b39859 Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Sat, 10 Jun 2017 09:20:38 +0800 Subject: [PATCH] CQ statistics written to monitor database, addresses #8188 * off by default, enabled by `query-stats-enabled` * writes to cq_query measurement of configured monitor database * see CHANGELOG for schema of individual points --- CHANGELOG.md | 24 ++++++ cmd/influxd/run/server.go | 1 + etc/config.sample.toml | 3 + monitor/service.go | 41 +++++++--- services/continuous_querier/config.go | 18 +++-- services/continuous_querier/service.go | 84 +++++++++++++++------ services/continuous_querier/service_test.go | 83 ++++++++++++++++++++ 7 files changed, 215 insertions(+), 39 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83055093a1..ea0c0f01dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,24 @@ ## v1.3.0 [unreleased] +### Release Notes + +#### Continuous Query Statistics + +When enabled, each time a continuous query is completed, a number of details regarding the execution are written to the `cq_query` measurement of the internal monitor database (`_internal` by default). The tags and fields of interest are + +| tag / field | description | +|:----------- |:----------------------------------------------------- | +| `db` | name of database | +| `cq` | name of continuous query | +| `duration` | query execution time in nanoseconds | +| `startTime` | lower bound of time range | +| `endTime` | upper bound of time range | +| `written` | number of points written to the target measurement | + + +* `startTime` and `endTime` are UNIX timestamps, in nanoseconds. +* The number of points written is also included in CQ log messages. + ### Removals The admin UI is removed and unusable in this release. The `[admin]` configuration section will be ignored. @@ -16,6 +35,10 @@ The following new configuration options are available. * `max-body-size` was added with a default of 25,000,000, but can be disabled by setting it to 0. Specifies the maximum size (in bytes) of a client request body. When a client sends data that exceeds the configured maximum size, a `413 Request Entity Too Large` HTTP response is returned. + +#### `[continuous_queries]` Section + +* `query-stats-enabled` was added with a default of `false`. When set to `true`, continuous query execution statistics are written to the default monitor store. ### Features @@ -43,6 +66,7 @@ The following new configuration options are available. - [#8390](https://github.com/influxdata/influxdb/issues/8390): Add nanosecond duration literal support. - [#8394](https://github.com/influxdata/influxdb/pull/8394): Optimize top() and bottom() using an incremental aggregator. - [#7129](https://github.com/influxdata/influxdb/issues/7129): Maintain the tags of points selected by top() or bottom() when writing the results. +- [#8188](https://github.com/influxdata/influxdb/issues/8188): Write CQ stats to _internal ### Bugfixes diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 2f6d4ab52c..5f2dd46b93 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -338,6 +338,7 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) { srv := continuous_querier.NewService(c) srv.MetaClient = s.MetaClient srv.QueryExecutor = s.QueryExecutor + srv.Monitor = s.Monitor s.Services = append(s.Services, srv) } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 74c23bee31..f888169248 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -429,5 +429,8 @@ # Controls whether queries are logged when executed by the CQ service. # log-enabled = true + # Controls whether queries are logged to the self-monitoring data store. + # query-stats-enabled = false + # interval for how often continuous queries will be checked if they need to run # run-interval = "1s" diff --git a/monitor/service.go b/monitor/service.go index 69ca20e8ec..d8994d6e69 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -117,6 +117,9 @@ func (m *Monitor) Open() error { // If enabled, record stats in a InfluxDB system. if m.storeEnabled { + hostname, _ := os.Hostname() + m.SetGlobalTag("hostname", hostname) + // Start periodic writes to system. m.wg.Add(1) go m.storeStatistics() @@ -125,6 +128,32 @@ func (m *Monitor) Open() error { return nil } +func (m *Monitor) Enabled() bool { return m.storeEnabled } + +func (m *Monitor) WritePoints(p models.Points) error { + if !m.storeEnabled { + return nil + } + + if len(m.globalTags) > 0 { + for _, pp := range p { + pp.SetTags(pp.Tags().Merge(m.globalTags)) + } + } + + return m.writePoints(p) +} + +func (m *Monitor) writePoints(p models.Points) error { + m.mu.RLock() + defer m.mu.RUnlock() + + if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, p); err != nil { + m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err)) + } + return nil +} + // Close closes the monitor system. func (m *Monitor) Close() error { if !m.open() { @@ -386,9 +415,6 @@ func (m *Monitor) storeStatistics() { m.Logger.Info(fmt.Sprintf("Storing statistics in database '%s' retention policy '%s', at interval %s", m.storeDatabase, m.storeRetentionPolicy, m.storeInterval)) - hostname, _ := os.Hostname() - m.SetGlobalTag("hostname", hostname) - // Wait until an even interval to start recording monitor statistics. // If we are interrupted before the interval for some reason, exit early. if err := m.waitUntilInterval(m.storeInterval); err != nil { @@ -424,14 +450,7 @@ func (m *Monitor) storeStatistics() { points = append(points, pt) } - func() { - m.mu.RLock() - defer m.mu.RUnlock() - - if err := m.PointsWriter.WritePoints(m.storeDatabase, m.storeRetentionPolicy, points); err != nil { - m.Logger.Info(fmt.Sprintf("failed to store statistics: %s", err)) - } - }() + m.writePoints(points) case <-m.done: m.Logger.Info(fmt.Sprintf("terminating storage of statistics")) return diff --git a/services/continuous_querier/config.go b/services/continuous_querier/config.go index 4a6bcf026a..abf4c05ed1 100644 --- a/services/continuous_querier/config.go +++ b/services/continuous_querier/config.go @@ -16,12 +16,16 @@ const ( // Config represents a configuration for the continuous query service. type Config struct { - // Enables logging in CQ service to display when CQ's are processed and how many points are wrote. + // Enables logging in CQ service to display when CQ's are processed and how many points were written. LogEnabled bool `toml:"log-enabled"` // If this flag is set to false, both the brokers and data nodes should ignore any CQ processing. Enabled bool `toml:"enabled"` + // QueryStatsEnabled enables logging of individual query execution statistics to the self-monitoring data + // store. The default is false. + QueryStatsEnabled bool `toml:"query-stats-enabled"` + // Run interval for checking continuous queries. This should be set to the least common factor // of the interval for running continuous queries. If you only aggregate continuous queries // every minute, this should be set to 1 minute. The default is set to '1s' so the interval @@ -32,9 +36,10 @@ type Config struct { // NewConfig returns a new instance of Config with defaults. func NewConfig() Config { return Config{ - LogEnabled: true, - Enabled: true, - RunInterval: toml.Duration(DefaultRunInterval), + LogEnabled: true, + Enabled: true, + QueryStatsEnabled: false, + RunInterval: toml.Duration(DefaultRunInterval), } } @@ -62,7 +67,8 @@ func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) { } return diagnostics.RowFromMap(map[string]interface{}{ - "enabled": true, - "run-interval": c.RunInterval, + "enabled": true, + "query-stats-enabled": c.QueryStatsEnabled, + "run-interval": c.RunInterval, }), nil } diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 10f7e90615..4608d595ef 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -67,17 +67,29 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool { return false } +type Monitor interface { + Enabled() bool + WritePoints(models.Points) error +} + +type nullMonitor int + +func (nullMonitor) Enabled() bool { return false } +func (nullMonitor) WritePoints(models.Points) error { return nil } + // Service manages continuous query execution. type Service struct { MetaClient metaClient QueryExecutor *influxql.QueryExecutor + Monitor Monitor Config *Config RunInterval time.Duration // RunCh can be used by clients to signal service to run CQs. - RunCh chan *RunRequest - Logger zap.Logger - loggingEnabled bool - stats *Statistics + RunCh chan *RunRequest + Logger zap.Logger + loggingEnabled bool + queryStatsEnabled bool + stats *Statistics // lastRuns maps CQ name to last time it was run. mu sync.RWMutex lastRuns map[string]time.Time @@ -88,13 +100,15 @@ type Service struct { // NewService returns a new instance of Service. func NewService(c Config) *Service { s := &Service{ - Config: &c, - RunInterval: time.Duration(c.RunInterval), - RunCh: make(chan *RunRequest), - loggingEnabled: c.LogEnabled, - Logger: zap.New(zap.NullEncoder()), - stats: &Statistics{}, - lastRuns: map[string]time.Time{}, + Config: &c, + Monitor: nullMonitor(0), + RunInterval: time.Duration(c.RunInterval), + RunCh: make(chan *RunRequest), + loggingEnabled: c.LogEnabled, + queryStatsEnabled: c.QueryStatsEnabled, + Logger: zap.New(zap.NullEncoder()), + stats: &Statistics{}, + lastRuns: map[string]time.Time{}, } return s @@ -141,6 +155,11 @@ type Statistics struct { QueryFail int64 } +type statistic struct { + ok uint64 + fail uint64 +} + // Statistics returns statistics for periodic monitoring. func (s *Service) Statistics(tags map[string]string) []models.Statistic { return []models.Statistic{{ @@ -342,25 +361,49 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } var start time.Time - if s.loggingEnabled { - s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime)) + if s.loggingEnabled || s.queryStatsEnabled { start = time.Now() } + if s.loggingEnabled { + s.Logger.Info(fmt.Sprintf("executing continuous query %s (%v to %v)", cq.Info.Name, startTime, endTime)) + } + // Do the actual processing of the query & writing of results. - if err := s.runContinuousQueryAndWriteResult(cq); err != nil { - s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", err, cq.q.String())) - return false, err + res := s.runContinuousQueryAndWriteResult(cq) + if res.Err != nil { + s.Logger.Info(fmt.Sprintf("error: %s. running: %s\n", res.Err, cq.q.String())) + return false, res.Err + } + + var execDuration time.Duration + if s.loggingEnabled || s.queryStatsEnabled { + execDuration = time.Since(start) + } + + // extract number of points written from SELECT ... INTO result + var written int64 = -1 + if len(res.Series) == 1 && len(res.Series[0].Values) == 1 { + s := res.Series[0] + written = s.Values[0][1].(int64) } if s.loggingEnabled { - s.Logger.Info(fmt.Sprintf("finished continuous query %s (%v to %v) in %s", cq.Info.Name, startTime, endTime, time.Since(start))) + s.Logger.Info(fmt.Sprintf("finished continuous query %s, %d points(s) written (%v to %v) in %s", cq.Info.Name, written, startTime, endTime, execDuration)) } + + if s.queryStatsEnabled && s.Monitor.Enabled() { + tags := map[string]string{"db": dbi.Name, "cq": cq.Info.Name} + fields := map[string]interface{}{"duration": int64(execDuration), "written": written, "startTime": startTime.UnixNano(), "endTime": endTime.UnixNano()} + p, _ := models.NewPoint("cq_query", models.NewTags(tags), fields, time.Now()) + s.Monitor.WritePoints(models.Points{p}) + } + return true, nil } // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in -func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { +func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) *influxql.Result { // Wrap the CQ's inner SELECT statement in a Query for the QueryExecutor. q := &influxql.Query{ Statements: influxql.Statements([]influxql.Statement{cq.q}), @@ -379,10 +422,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { if !ok { panic("result channel was closed") } - if res.Err != nil { - return res.Err - } - return nil + return res } // ContinuousQuery is a local wrapper / helper around continuous queries. diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 72158f876e..9abbaf955e 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/uber-go/zap" ) @@ -376,6 +377,80 @@ func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) { } } +func TestService_ExecuteContinuousQuery_LogsToMonitor(t *testing.T) { + s := NewTestService(t) + const writeN = int64(50) + + s.QueryExecutor.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + ctx.Results <- &influxql.Result{ + Series: []*models.Row{{ + Name: "result", + Columns: []string{"time", "written"}, + Values: [][]interface{}{{time.Time{}, writeN}}, + }}, + } + return nil + }, + } + s.queryStatsEnabled = true + var point models.Point + s.Monitor = &monitor{ + EnabledFn: func() bool { return true }, + WritePointsFn: func(p models.Points) error { + if len(p) != 1 { + t.Fatalf("expected point") + } + point = p[0] + return nil + }, + } + + dbis := s.MetaClient.Databases() + dbi := dbis[0] + cqi := dbi.ContinuousQueries[0] + + now := time.Now().Truncate(10 * time.Minute) + if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil { + t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err) + } + + if point == nil { + t.Fatal("expected Monitor.WritePoints call") + } + + f, _ := point.Fields() + if got, ok := f["written"].(int64); !ok || got != writeN { + t.Errorf("unexpected value for written; exp=%d, got=%d", writeN, got) + } +} + +func TestService_ExecuteContinuousQuery_LogToMonitor_DisabledByDefault(t *testing.T) { + s := NewTestService(t) + s.QueryExecutor.StatementExecutor = &StatementExecutor{ + ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error { + ctx.Results <- &influxql.Result{} + return nil + }, + } + s.Monitor = &monitor{ + EnabledFn: func() bool { return true }, + WritePointsFn: func(p models.Points) error { + t.Fatalf("unexpected Monitor.WritePoints call") + return nil + }, + } + + dbis := s.MetaClient.Databases() + dbi := dbis[0] + cqi := dbi.ContinuousQueries[0] + + now := time.Now().Truncate(10 * time.Minute) + if ok, err := s.ExecuteContinuousQuery(&dbi, &cqi, now); !ok || err != nil { + t.Fatalf("ExecuteContinuousQuery failed, ok=%t, err=%v", ok, err) + } +} + // NewTestService returns a new *Service with default mock object members. func NewTestService(t *testing.T) *Service { s := NewService(NewConfig()) @@ -534,3 +609,11 @@ func wait(c chan struct{}, d time.Duration) (err error) { } return } + +type monitor struct { + EnabledFn func() bool + WritePointsFn func(models.Points) error +} + +func (m *monitor) Enabled() bool { return m.EnabledFn() } +func (m *monitor) WritePoints(p models.Points) error { return m.WritePointsFn(p) }