diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index e6e38b0c3b..5555ceb4e0 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -416,6 +416,9 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) { srv.QueryExecutor = s.QueryExecutor srv.Monitor = s.Monitor s.Services = append(s.Services, srv) + if s.Monitor != nil { + s.Monitor.RegisterDiagnosticsClient("cq", srv) + } } // Err returns an error channel that multiplexes all out of band errors received from all services. diff --git a/monitor/service.go b/monitor/service.go index 0283e796a7..6e205f56d1 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -241,6 +241,7 @@ func (m *Monitor) Close() error { m.DeregisterDiagnosticsClient("system") m.DeregisterDiagnosticsClient("stats") m.DeregisterDiagnosticsClient("config") + m.DeregisterDiagnosticsClient("cq") return nil } diff --git a/monitor/service_test.go b/monitor/service_test.go index eccb2d6a28..90b4e87c9b 100644 --- a/monitor/service_test.go +++ b/monitor/service_test.go @@ -13,9 +13,11 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" + "github.com/influxdata/influxdb/services/continuous_querier" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/toml" "github.com/influxdata/influxdb/tsdb" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" ) @@ -414,6 +416,26 @@ func TestMonitor_QuickClose(t *testing.T) { } } +func TestMonitor_CQStatistics(t *testing.T) { + s := monitor.New(nil, monitor.Config{}, &tsdb.Config{}) + err := s.Open() + require.NoError(t, err, "monitor open") + defer s.Close() + + s.RegisterDiagnosticsClient("cq", continuous_querier.NewService(continuous_querier.NewConfig())) + stats, err := s.Statistics(nil) + require.NoError(t, err, "cq statistics") + + for _, stat := range stats { + if stat.Name == "cq" { + require.Equal(t, stat.Values, map[string]interface{}{ + "queryOk": 0, + "queryFail": 0, + }, "statistics") + } + } +} + func TestStatistic_ValueNames(t *testing.T) { statistic := monitor.Statistic{ Statistic: models.Statistic{ diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 9cb540e193..90518d8271 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxql" @@ -169,6 +170,12 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic { }} } +// Required for Monitor interface. Currently does not return any +// diagnostic values. +func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) { + return &diagnostics.Diagnostics{}, nil +} + // Run runs the specified continuous query, or all CQs if none is specified. func (s *Service) Run(database, name string, t time.Time) error { var dbs []meta.DatabaseInfo diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 59cc9fce87..bf7b1f489d 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -2395,6 +2395,33 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { uniqueKeys := make(map[string]int) for _, s := range stats { + if s.Name == "cq" { + jv, err := parseCQStatistics(&s.Statistic) + if err != nil { + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + data, err := json.Marshal(jv) + if err != nil { + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + + if !first { + _, err := fmt.Fprintln(w, ",") + if err != nil { + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + } + first = false + _, err = fmt.Fprintf(w, "\"cq\": %s", data) + if err != nil { + h.httpError(w, err.Error(), http.StatusInternalServerError) + return + } + continue + } val, err := json.Marshal(s) if err != nil { continue @@ -2644,6 +2671,14 @@ func parseConfigDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{}, return m, nil } +func parseCQStatistics(s *models.Statistic) (map[string]interface{}, error) { + if len(s.Values) == 0 { + return nil, fmt.Errorf("no cq statistics data available") + } + + return s.Values, nil +} + // httpError writes an error to the client in a standard format. func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) { if code == http.StatusUnauthorized {