feat: Adds 'cq' diagnostics to /debug/vars (#26874)
This PR adds continuous query diags to our /debug/vars endpoint
Example
```
"cq": {
"queryFail": 0,
"queryOk": 2
},
```
pull/26885/head
parent
879e34a0c2
commit
63498cab4e
|
|
@ -416,6 +416,9 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
|
|||
srv.QueryExecutor = s.QueryExecutor
|
||||
srv.Monitor = s.Monitor
|
||||
s.Services = append(s.Services, srv)
|
||||
if s.Monitor != nil {
|
||||
s.Monitor.RegisterDiagnosticsClient("cq", srv)
|
||||
}
|
||||
}
|
||||
|
||||
// Err returns an error channel that multiplexes all out of band errors received from all services.
|
||||
|
|
|
|||
|
|
@ -241,6 +241,7 @@ func (m *Monitor) Close() error {
|
|||
m.DeregisterDiagnosticsClient("system")
|
||||
m.DeregisterDiagnosticsClient("stats")
|
||||
m.DeregisterDiagnosticsClient("config")
|
||||
m.DeregisterDiagnosticsClient("cq")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,9 +13,11 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor"
|
||||
"github.com/influxdata/influxdb/services/continuous_querier"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxdb/toml"
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest/observer"
|
||||
)
|
||||
|
|
@ -414,6 +416,26 @@ func TestMonitor_QuickClose(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMonitor_CQStatistics(t *testing.T) {
|
||||
s := monitor.New(nil, monitor.Config{}, &tsdb.Config{})
|
||||
err := s.Open()
|
||||
require.NoError(t, err, "monitor open")
|
||||
defer s.Close()
|
||||
|
||||
s.RegisterDiagnosticsClient("cq", continuous_querier.NewService(continuous_querier.NewConfig()))
|
||||
stats, err := s.Statistics(nil)
|
||||
require.NoError(t, err, "cq statistics")
|
||||
|
||||
for _, stat := range stats {
|
||||
if stat.Name == "cq" {
|
||||
require.Equal(t, stat.Values, map[string]interface{}{
|
||||
"queryOk": 0,
|
||||
"queryFail": 0,
|
||||
}, "statistics")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatistic_ValueNames(t *testing.T) {
|
||||
statistic := monitor.Statistic{
|
||||
Statistic: models.Statistic{
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/influxdata/influxdb/logger"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
"github.com/influxdata/influxql"
|
||||
|
|
@ -169,6 +170,12 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
|
|||
}}
|
||||
}
|
||||
|
||||
// Required for Monitor interface. Currently does not return any
|
||||
// diagnostic values.
|
||||
func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) {
|
||||
return &diagnostics.Diagnostics{}, nil
|
||||
}
|
||||
|
||||
// Run runs the specified continuous query, or all CQs if none is specified.
|
||||
func (s *Service) Run(database, name string, t time.Time) error {
|
||||
var dbs []meta.DatabaseInfo
|
||||
|
|
|
|||
|
|
@ -2395,6 +2395,33 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
|||
uniqueKeys := make(map[string]int)
|
||||
|
||||
for _, s := range stats {
|
||||
if s.Name == "cq" {
|
||||
jv, err := parseCQStatistics(&s.Statistic)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
data, err := json.Marshal(jv)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if !first {
|
||||
_, err := fmt.Fprintln(w, ",")
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
first = false
|
||||
_, err = fmt.Fprintf(w, "\"cq\": %s", data)
|
||||
if err != nil {
|
||||
h.httpError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
val, err := json.Marshal(s)
|
||||
if err != nil {
|
||||
continue
|
||||
|
|
@ -2644,6 +2671,14 @@ func parseConfigDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{},
|
|||
return m, nil
|
||||
}
|
||||
|
||||
func parseCQStatistics(s *models.Statistic) (map[string]interface{}, error) {
|
||||
if len(s.Values) == 0 {
|
||||
return nil, fmt.Errorf("no cq statistics data available")
|
||||
}
|
||||
|
||||
return s.Values, nil
|
||||
}
|
||||
|
||||
// httpError writes an error to the client in a standard format.
|
||||
func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) {
|
||||
if code == http.StatusUnauthorized {
|
||||
|
|
|
|||
Loading…
Reference in New Issue