Merge pull request #18004 from influxdata/sgc/fix/unique_keys
fix(httpd): Fixes key collisions when serializing /debug/varspull/18013/head
commit
0cb09dd09f
|
@ -1455,6 +1455,8 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||||
fmt.Fprintf(w, "\"memstats\": %s", val)
|
fmt.Fprintf(w, "\"memstats\": %s", val)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uniqueKeys := make(map[string]int)
|
||||||
|
|
||||||
for _, s := range stats {
|
for _, s := range stats {
|
||||||
val, err := json.Marshal(s)
|
val, err := json.Marshal(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1486,6 +1488,12 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
key := buf.String()
|
key := buf.String()
|
||||||
|
v := uniqueKeys[key]
|
||||||
|
uniqueKeys[key] = v + 1
|
||||||
|
if v > 0 {
|
||||||
|
fmt.Fprintf(buf, ":%d", v)
|
||||||
|
key = buf.String()
|
||||||
|
}
|
||||||
|
|
||||||
if !first {
|
if !first {
|
||||||
fmt.Fprintln(w, ",")
|
fmt.Fprintln(w, ",")
|
||||||
|
|
|
@ -15,6 +15,7 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -32,6 +33,8 @@ import (
|
||||||
"github.com/influxdata/influxdb/internal"
|
"github.com/influxdata/influxdb/internal"
|
||||||
"github.com/influxdata/influxdb/logger"
|
"github.com/influxdata/influxdb/logger"
|
||||||
"github.com/influxdata/influxdb/models"
|
"github.com/influxdata/influxdb/models"
|
||||||
|
"github.com/influxdata/influxdb/monitor"
|
||||||
|
"github.com/influxdata/influxdb/monitor/diagnostics"
|
||||||
"github.com/influxdata/influxdb/prometheus/remote"
|
"github.com/influxdata/influxdb/prometheus/remote"
|
||||||
"github.com/influxdata/influxdb/query"
|
"github.com/influxdata/influxdb/query"
|
||||||
"github.com/influxdata/influxdb/services/httpd"
|
"github.com/influxdata/influxdb/services/httpd"
|
||||||
|
@ -1900,6 +1903,133 @@ func TestThrottler_Handler(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHandlerDebugVars(t *testing.T) {
|
||||||
|
stats := func(s ...*monitor.Statistic) ([]*monitor.Statistic, error) {
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
stat := func(name string, tags map[string]string, vals map[string]interface{}) *monitor.Statistic {
|
||||||
|
return &monitor.Statistic{
|
||||||
|
Statistic: models.Statistic{
|
||||||
|
Name: name,
|
||||||
|
Tags: tags,
|
||||||
|
Values: vals,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tags := func(kv ...string) map[string]string {
|
||||||
|
if len(kv)%2 != 0 {
|
||||||
|
panic("expect even number of key/values")
|
||||||
|
}
|
||||||
|
res := make(map[string]string, len(kv)/2)
|
||||||
|
for i := 0; i < len(kv); i += 2 {
|
||||||
|
res[kv[i]] = kv[i+1]
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
vals := func(kv ...interface{}) map[string]interface{} {
|
||||||
|
if len(kv)%2 != 0 {
|
||||||
|
panic("expect even number of key/values")
|
||||||
|
}
|
||||||
|
res := make(map[string]interface{}, len(kv)/2)
|
||||||
|
for i := 0; i < len(kv); i += 2 {
|
||||||
|
if key, ok := kv[i].(string); !ok {
|
||||||
|
panic("key must be string")
|
||||||
|
} else {
|
||||||
|
res[key] = kv[i+1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
var Ignored = []string{"memstats", "cmdline"}
|
||||||
|
read := func(t *testing.T, b *bytes.Buffer, del ...string) map[string]interface{} {
|
||||||
|
t.Helper()
|
||||||
|
res := make(map[string]interface{})
|
||||||
|
if err := json.Unmarshal(b.Bytes(), &res); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, k := range del {
|
||||||
|
delete(res, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
keys := func(m map[string]interface{}) []string {
|
||||||
|
keys := make([]string, 0, len(m))
|
||||||
|
for k := range m {
|
||||||
|
keys = append(keys, k)
|
||||||
|
}
|
||||||
|
sort.Strings(keys)
|
||||||
|
return keys
|
||||||
|
}
|
||||||
|
|
||||||
|
// stats tests the results of serializing Monitor.Statistics
|
||||||
|
t.Run("stats", func(t *testing.T) {
|
||||||
|
t.Run("generates unique keys using known tags", func(t *testing.T) {
|
||||||
|
h := NewHandler(false)
|
||||||
|
h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) {
|
||||||
|
return stats(
|
||||||
|
stat("database", tags("database", "foo"), nil),
|
||||||
|
stat("hh", tags("path", "/mnt/foo/bar"), nil),
|
||||||
|
stat("httpd", tags("bind", "127.0.0.1:8088", "proto", "https"), nil),
|
||||||
|
stat("other", tags("foo", "bar"), nil),
|
||||||
|
stat("shard", tags("path", "/mnt/foo", "id", "111"), nil),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
req := MustNewRequest("GET", "/debug/vars", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
h.ServeHTTP(w, req)
|
||||||
|
got := keys(read(t, w.Body, Ignored...))
|
||||||
|
exp := []string{"database:foo", "hh:/mnt/foo/bar", "httpd:https:127.0.0.1:8088", "other", "shard:/mnt/foo:111"}
|
||||||
|
if !cmp.Equal(got, exp) {
|
||||||
|
t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("generates numbered keys for collisions", func(t *testing.T) {
|
||||||
|
h := NewHandler(false)
|
||||||
|
h.Monitor.StatisticsFn = func(_ map[string]string) ([]*monitor.Statistic, error) {
|
||||||
|
return stats(
|
||||||
|
stat("hh_processor", tags("db", "foo", "shardID", "10"), vals("queueSize", 100)),
|
||||||
|
stat("hh_processor", tags("db", "foo", "shardID", "15"), vals("queueSize", 500)),
|
||||||
|
stat("hh_processor", tags("db", "bar", "shardID", "20"), vals("queueSize", 200)),
|
||||||
|
stat("hh_processor", tags("db", "bar", "shardID", "25"), vals("queueSize", 700)),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
req := MustNewRequest("GET", "/debug/vars", nil)
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
h.ServeHTTP(w, req)
|
||||||
|
got := read(t, w.Body, Ignored...)
|
||||||
|
exp := map[string]interface{}{
|
||||||
|
"hh_processor": map[string]interface{}{
|
||||||
|
"name": "hh_processor",
|
||||||
|
"tags": map[string]interface{}{"db": "foo", "shardID": "10"},
|
||||||
|
"values": map[string]interface{}{"queueSize": float64(100)},
|
||||||
|
},
|
||||||
|
"hh_processor:1": map[string]interface{}{
|
||||||
|
"name": "hh_processor",
|
||||||
|
"tags": map[string]interface{}{"db": "foo", "shardID": "15"},
|
||||||
|
"values": map[string]interface{}{"queueSize": float64(500)},
|
||||||
|
},
|
||||||
|
"hh_processor:2": map[string]interface{}{
|
||||||
|
"name": "hh_processor",
|
||||||
|
"tags": map[string]interface{}{"db": "bar", "shardID": "20"},
|
||||||
|
"values": map[string]interface{}{"queueSize": float64(200)},
|
||||||
|
},
|
||||||
|
"hh_processor:3": map[string]interface{}{
|
||||||
|
"name": "hh_processor",
|
||||||
|
"tags": map[string]interface{}{"db": "bar", "shardID": "25"},
|
||||||
|
"values": map[string]interface{}{"queueSize": float64(700)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !cmp.Equal(got, exp) {
|
||||||
|
t.Errorf("unexpected keys; -got/+exp\n%s", cmp.Diff(got, exp))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// NewHandler represents a test wrapper for httpd.Handler.
|
// NewHandler represents a test wrapper for httpd.Handler.
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
*httpd.Handler
|
*httpd.Handler
|
||||||
|
@ -1907,6 +2037,7 @@ type Handler struct {
|
||||||
StatementExecutor HandlerStatementExecutor
|
StatementExecutor HandlerStatementExecutor
|
||||||
QueryAuthorizer HandlerQueryAuthorizer
|
QueryAuthorizer HandlerQueryAuthorizer
|
||||||
PointsWriter HandlerPointsWriter
|
PointsWriter HandlerPointsWriter
|
||||||
|
Monitor *HandlerMonitor
|
||||||
Store *internal.StorageStoreMock
|
Store *internal.StorageStoreMock
|
||||||
Controller *internal.FluxControllerMock
|
Controller *internal.FluxControllerMock
|
||||||
}
|
}
|
||||||
|
@ -1973,6 +2104,7 @@ func NewHandlerWithConfig(config httpd.Config) *Handler {
|
||||||
h.MetaClient = &internal.MetaClientMock{}
|
h.MetaClient = &internal.MetaClientMock{}
|
||||||
h.Store = internal.NewStorageStoreMock()
|
h.Store = internal.NewStorageStoreMock()
|
||||||
h.Controller = internal.NewFluxControllerMock()
|
h.Controller = internal.NewFluxControllerMock()
|
||||||
|
h.Monitor = newHandlerMonitor()
|
||||||
|
|
||||||
h.Handler.MetaClient = h.MetaClient
|
h.Handler.MetaClient = h.MetaClient
|
||||||
h.Handler.Store = h.Store
|
h.Handler.Store = h.Store
|
||||||
|
@ -1980,6 +2112,7 @@ func NewHandlerWithConfig(config httpd.Config) *Handler {
|
||||||
h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
|
h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
|
||||||
h.Handler.QueryAuthorizer = &h.QueryAuthorizer
|
h.Handler.QueryAuthorizer = &h.QueryAuthorizer
|
||||||
h.Handler.PointsWriter = &h.PointsWriter
|
h.Handler.PointsWriter = &h.PointsWriter
|
||||||
|
h.Handler.Monitor = h.Monitor
|
||||||
h.Handler.Version = "0.0.0"
|
h.Handler.Version = "0.0.0"
|
||||||
h.Handler.BuildType = "OSS"
|
h.Handler.BuildType = "OSS"
|
||||||
h.Handler.Controller = h.Controller
|
h.Handler.Controller = h.Controller
|
||||||
|
@ -1992,6 +2125,33 @@ func NewHandlerWithConfig(config httpd.Config) *Handler {
|
||||||
return h
|
return h
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HandlerMonitor is a mock implementation of Handler.Monitor.
|
||||||
|
type HandlerMonitor struct {
|
||||||
|
StatisticsFn func(tags map[string]string) ([]*monitor.Statistic, error)
|
||||||
|
DiagnosticsFn func() (map[string]*diagnostics.Diagnostics, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newHandlerMonitor returns a HandlerMonitor with default implementations
|
||||||
|
// for each function.
|
||||||
|
func newHandlerMonitor() *HandlerMonitor {
|
||||||
|
return &HandlerMonitor{
|
||||||
|
StatisticsFn: func(_ map[string]string) ([]*monitor.Statistic, error) {
|
||||||
|
return nil, nil
|
||||||
|
},
|
||||||
|
DiagnosticsFn: func() (map[string]*diagnostics.Diagnostics, error) {
|
||||||
|
return make(map[string]*diagnostics.Diagnostics), nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *HandlerMonitor) Statistics(tags map[string]string) ([]*monitor.Statistic, error) {
|
||||||
|
return m.StatisticsFn(tags)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *HandlerMonitor) Diagnostics() (map[string]*diagnostics.Diagnostics, error) {
|
||||||
|
return m.DiagnosticsFn()
|
||||||
|
}
|
||||||
|
|
||||||
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
|
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
|
||||||
type HandlerStatementExecutor struct {
|
type HandlerStatementExecutor struct {
|
||||||
ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
|
ExecuteStatementFn func(stmt influxql.Statement, ctx *query.ExecutionContext) error
|
||||||
|
|
Loading…
Reference in New Issue