diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 5f45a75e27..e37945a6a4 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -1455,6 +1455,8 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "\"memstats\": %s", val) } + uniqueKeys := make(map[string]int) + for _, s := range stats { val, err := json.Marshal(s) if err != nil { @@ -1486,6 +1488,12 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) { } } key := buf.String() + v := uniqueKeys[key] + uniqueKeys[key] = v + 1 + if v > 0 { + fmt.Fprintf(buf, ":%d", v) + key = buf.String() + } if !first { fmt.Fprintln(w, ",") diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 52c403413d..a548627160 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "reflect" + "sort" "strings" "sync/atomic" "testing" @@ -32,6 +33,8 @@ import ( "github.com/influxdata/influxdb/internal" "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/monitor" + "github.com/influxdata/influxdb/monitor/diagnostics" "github.com/influxdata/influxdb/prometheus/remote" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/httpd" @@ -1900,6 +1903,133 @@ func TestThrottler_Handler(t *testing.T) { }) } +func TestHandlerDebugVars(t *testing.T) { + stats := func(s ...*monitor.Statistic) ([]*monitor.Statistic, error) { + return s, nil + } + stat := func(name string, tags map[string]string, vals map[string]interface{}) *monitor.Statistic { + return &monitor.Statistic{ + Statistic: models.Statistic{ + Name: name, + Tags: tags, + Values: vals, + }, + } + } + tags := func(kv ...string) map[string]string { + if len(kv)%2 != 0 { + panic("expect even number of key/values") + } + res := make(map[string]string, len(kv)/2) + for i := 0; i < len(kv); i += 2 { + res[kv[i]] = kv[i+1] + } + return res + } + vals := func(kv ...interface{}) map[string]interface{} { + if len(kv)%2 != 0 { + panic("expect even number of key/values") + } + res := make(map[string]interface{}, len(kv)/2) + for i := 0; i < len(kv); i += 2 { + if key, ok := kv[i].(string); !ok { + panic("key must be string") + } else { + res[key] = kv[i+1] + } + } + return res + } + + var Ignored = []string{"memstats", "cmdline"} + read := func(t *testing.T, b *bytes.Buffer, del ...string) map[string]interface{} { + t.Helper() + res := make(map[string]interface{}) + if err := json.Unmarshal(b.Bytes(), &res); err != nil { + t.Fatal(err) + } + + for _, k := range del { + delete(res, k) + } + + return res + } + keys := func(m map[string]interface{}) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys + } + + // stats tests the results of serializing Monitor.Statistics + t.Run("stats", func(t *testing.T) { + t.Run("generates unique keys using known tags", func(t *testing.T) { + h := NewHandler(false) + h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) { + return stats( + stat("database", tags("database", "foo"), nil), + stat("hh", tags("path", "/mnt/foo/bar"), nil), + stat("httpd", tags("bind", "127.0.0.1:8088", "proto", "https"), nil), + stat("other", tags("foo", "bar"), nil), + stat("shard", tags("path", "/mnt/foo", "id", "111"), nil), + ) + } + req := MustNewRequest("GET", "/debug/vars", nil) + w := httptest.NewRecorder() + h.ServeHTTP(w, req) + got := keys(read(t, w.Body, Ignored...)) + exp := []string{"database:foo", "hh:/mnt/foo/bar", "httpd:https:127.0.0.1:8088", "other", "shard:/mnt/foo:111"} + if !cmp.Equal(got, exp) { + t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + + t.Run("generates numbered keys for collisions", func(t *testing.T) { + h := NewHandler(false) + h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) { + return stats( + stat("hh_processor", tags("db", "foo", "shardID", "10"), vals("queueSize", 100)), + stat("hh_processor", tags("db", "foo", "shardID", "15"), vals("queueSize", 500)), + stat("hh_processor", tags("db", "bar", "shardID", "20"), vals("queueSize", 200)), + stat("hh_processor", tags("db", "bar", "shardID", "25"), vals("queueSize", 700)), + ) + } + req := MustNewRequest("GET", "/debug/vars", nil) + w := httptest.NewRecorder() + h.ServeHTTP(w, req) + got := read(t, w.Body, Ignored...) + exp := map[string]interface{}{ + "hh_processor": map[string]interface{}{ + "name": "hh_processor", + "tags": map[string]interface{}{"db": "foo", "shardID": "10"}, + "values": map[string]interface{}{"queueSize": float64(100)}, + }, + "hh_processor:1": map[string]interface{}{ + "name": "hh_processor", + "tags": map[string]interface{}{"db": "foo", "shardID": "15"}, + "values": map[string]interface{}{"queueSize": float64(500)}, + }, + "hh_processor:2": map[string]interface{}{ + "name": "hh_processor", + "tags": map[string]interface{}{"db": "bar", "shardID": "20"}, + "values": map[string]interface{}{"queueSize": float64(200)}, + }, + "hh_processor:3": map[string]interface{}{ + "name": "hh_processor", + "tags": map[string]interface{}{"db": "bar", "shardID": "25"}, + "values": map[string]interface{}{"queueSize": float64(700)}, + }, + } + if !cmp.Equal(got, exp) { + t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp)) + } + }) + }) +} + // NewHandler represents a test wrapper for httpd.Handler. type Handler struct { *httpd.Handler @@ -1907,6 +2037,7 @@ type Handler struct { StatementExecutor HandlerStatementExecutor QueryAuthorizer HandlerQueryAuthorizer PointsWriter HandlerPointsWriter + Monitor *HandlerMonitor Store *internal.StorageStoreMock Controller *internal.FluxControllerMock } @@ -1973,6 +2104,7 @@ func NewHandlerWithConfig(config httpd.Config) *Handler { h.MetaClient = &internal.MetaClientMock{} h.Store = internal.NewStorageStoreMock() h.Controller = internal.NewFluxControllerMock() + h.Monitor = newHandlerMonitor() h.Handler.MetaClient = h.MetaClient h.Handler.Store = h.Store @@ -1980,6 +2112,7 @@ func NewHandlerWithConfig(config httpd.Config) *Handler { h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor h.Handler.QueryAuthorizer = &h.QueryAuthorizer h.Handler.PointsWriter = &h.PointsWriter + h.Handler.Monitor = h.Monitor h.Handler.Version = "0.0.0" h.Handler.BuildType = "OSS" h.Handler.Controller = h.Controller @@ -1992,6 +2125,33 @@ func NewHandlerWithConfig(config httpd.Config) *Handler { return h } +// HandlerMonitor is a mock implementation of Handler.Monitor. +type HandlerMonitor struct { + StatisticsFn func(tags map[string]string) ([]*monitor.Statistic, error) + DiagnosticsFn func() (map[string]*diagnostics.Diagnostics, error) +} + +// newHandlerMonitor returns a HandlerMonitor with default implementations +// for each function. +func newHandlerMonitor() *HandlerMonitor { + return &HandlerMonitor{ + StatisticsFn: func(_ map[string]string) ([]*monitor.Statistic, error) { + return nil, nil + }, + DiagnosticsFn: func() (map[string]*diagnostics.Diagnostics, error) { + return make(map[string]*diagnostics.Diagnostics), nil + }, + } +} + +func (m *HandlerMonitor) Statistics(tags map[string]string) ([]*monitor.Statistic, error) { + return m.StatisticsFn(tags) +} + +func (m *HandlerMonitor) Diagnostics() (map[string]*diagnostics.Diagnostics, error) { + return m.DiagnosticsFn() +} + // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. type HandlerStatementExecutor struct { ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error