diff --git a/CHANGELOG.md b/CHANGELOG.md index 048d3ea2cc..b2ce15365e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,11 +22,12 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio - [#8302](https://github.com/influxdata/influxdb/pull/8302): Write throughput/concurrency improvements - [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI. - [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1 -- [#8348](https://github.com/influxd ata/influxdb/pull/8348): Add max concurrent compaction limits +- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits - [#8366](https://github.com/influxdata/influxdb/pull/8366): Add TSI support tooling. - [#8350](https://github.com/influxdata/influxdb/pull/8350): Track HTTP client requests for /write and /query with /debug/requests. +<<<<<<< 8f8ff0ec612e6e77ff618f572b302e069de8e5c3 - [#8384](https://github.com/influxdata/influxdb/pull/8384): Write and compaction stability -- [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles single in archive. +- [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles and querues in single archive. ### Bugfixes diff --git a/internal/meta_client.go b/internal/meta_client.go index eb6439ba1f..7f1f0c445d 100644 --- a/internal/meta_client.go +++ b/internal/meta_client.go @@ -34,6 +34,8 @@ type MetaClientMock struct { RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error) + AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error) + AdminUserExistsFn func() bool SetAdminPrivilegeFn func(username string, admin bool) error SetDataFn func(*meta.Data) error SetPrivilegeFn func(username, database string, p influxql.Privilege) error @@ -43,6 +45,7 @@ type MetaClientMock struct { UpdateUserFn func(name, password string) error UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error) + UserFn func(username string) (*meta.UserInfo, error) UsersFn func() []meta.UserInfo } @@ -150,7 +153,13 @@ func (c *MetaClientMock) UserPrivileges(username string) (map[string]influxql.Pr return c.UserPrivilegesFn(username) } -func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() } +func (c *MetaClientMock) Authenticate(username, password string) (*meta.UserInfo, error) { + return c.AuthenticateFn(username, password) +} +func (c *MetaClientMock) AdminUserExists() bool { return c.AdminUserExistsFn() } + +func (c *MetaClientMock) User(username string) (*meta.UserInfo, error) { return c.UserFn(username) } +func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() } func (c *MetaClientMock) Open() error { return c.OpenFn() } func (c *MetaClientMock) Data() meta.Data { return c.DataFn() } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index be6fdd0821..a859dd3ef2 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -74,6 +74,7 @@ type Handler struct { MetaClient interface { Database(name string) *meta.DatabaseInfo + Databases() []meta.DatabaseInfo Authenticate(username, password string) (ui *meta.UserInfo, err error) User(username string) (*meta.UserInfo, error) AdminUserExists() bool @@ -251,7 +252,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Add("X-Influxdb-Version", h.Version) if strings.HasPrefix(r.URL.Path, "/debug/pprof") && h.Config.PprofEnabled { - handleProfiles(w, r) + h.handleProfiles(w, r) } else if strings.HasPrefix(r.URL.Path, "/debug/vars") { h.serveExpvar(w, r) } else if strings.HasPrefix(r.URL.Path, "/debug/requests") { diff --git a/services/httpd/handler_test.go b/services/httpd/handler_test.go index 3b4820240e..3151e13f1e 100644 --- a/services/httpd/handler_test.go +++ b/services/httpd/handler_test.go @@ -14,6 +14,8 @@ import ( "testing" "time" + "github.com/influxdata/influxdb/internal" + "github.com/dgrijalva/jwt-go" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" @@ -605,7 +607,7 @@ func TestHandler_XForwardedFor(t *testing.T) { // NewHandler represents a test wrapper for httpd.Handler. type Handler struct { *httpd.Handler - MetaClient HandlerMetaStore + MetaClient *internal.MetaClientMock StatementExecutor HandlerStatementExecutor QueryAuthorizer HandlerQueryAuthorizer } @@ -619,7 +621,10 @@ func NewHandler(requireAuthentication bool) *Handler { h := &Handler{ Handler: httpd.NewHandler(config), } - h.Handler.MetaClient = &h.MetaClient + + h.MetaClient = &internal.MetaClientMock{} + + h.Handler.MetaClient = h.MetaClient h.Handler.QueryExecutor = influxql.NewQueryExecutor() h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor h.Handler.QueryAuthorizer = &h.QueryAuthorizer @@ -627,39 +632,6 @@ func NewHandler(requireAuthentication bool) *Handler { return h } -// HandlerMetaStore is a mock implementation of Handler.MetaClient. -type HandlerMetaStore struct { - PingFn func(d time.Duration) error - DatabaseFn func(name string) *meta.DatabaseInfo - AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error) - UserFn func(username string) (*meta.UserInfo, error) - AdminUserExistsFn func() bool -} - -func (s *HandlerMetaStore) Ping(b bool) error { - if s.PingFn == nil { - // Default behaviour is to assume there is a leader. - return nil - } - return s.Ping(b) -} - -func (s *HandlerMetaStore) Database(name string) *meta.DatabaseInfo { - return s.DatabaseFn(name) -} - -func (s *HandlerMetaStore) Authenticate(username, password string) (ui *meta.UserInfo, err error) { - return s.AuthenticateFn(username, password) -} - -func (s *HandlerMetaStore) AdminUserExists() bool { - return s.AdminUserExistsFn() -} - -func (s *HandlerMetaStore) User(username string) (*meta.UserInfo, error) { - return s.UserFn(username) -} - // HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor. type HandlerStatementExecutor struct { ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error diff --git a/services/httpd/pprof.go b/services/httpd/pprof.go index 40fd14e48a..6bfec3d33a 100644 --- a/services/httpd/pprof.go +++ b/services/httpd/pprof.go @@ -4,16 +4,21 @@ import ( "archive/tar" "bytes" "compress/gzip" + "fmt" "io" "net/http" httppprof "net/http/pprof" "runtime/pprof" + "sort" "strconv" + "text/tabwriter" "time" + + "github.com/influxdata/influxdb/models" ) // handleProfiles determines which profile to return to the requester. -func handleProfiles(w http.ResponseWriter, r *http.Request) { +func (h *Handler) handleProfiles(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/debug/pprof/cmdline": httppprof.Cmdline(w, r) @@ -22,7 +27,7 @@ func handleProfiles(w http.ResponseWriter, r *http.Request) { case "/debug/pprof/symbol": httppprof.Symbol(w, r) case "/debug/pprof/all": - archiveProfiles(w, r) + h.archiveProfilesAndQueries(w, r) default: httppprof.Index(w, r) } @@ -35,13 +40,19 @@ type prof struct { Debug int64 } -// archiveProfiles collects the following profiles: +// archiveProfilesAndQueries collects the following profiles: // - goroutine profile // - heap profile // - blocking profile // - (optionally) CPU profile // -// All profiles are added to a tar archive and then compressed, before being +// It also collects the following query results: +// +// - SHOW SHARDS +// - SHOW STATS +// - SHOW DIAGNOSTICS +// +// All information is added to a tar archive and then compressed, before being // returned to the requester as an archive file. Where profiles support debug // parameters, the profile is collected with debug=1. To optionally include a // CPU profile, the requester should provide a `cpu` query parameter, and can @@ -55,8 +66,8 @@ type prof struct { // The value after the `cpu` query parameter is not actually important, as long // as there is something there. // -func archiveProfiles(w http.ResponseWriter, r *http.Request) { - var all = []*prof{ +func (h *Handler) archiveProfilesAndQueries(w http.ResponseWriter, r *http.Request) { + var allProfs = []*prof{ {Name: "goroutine", Debug: 1}, {Name: "block", Debug: 1}, {Name: "heap", Debug: 1}, @@ -72,17 +83,19 @@ func archiveProfiles(w http.ResponseWriter, r *http.Request) { if profile.Debug <= 0 { profile.Debug = 30 } - all = append([]*prof{profile}, all...) // CPU profile first. + allProfs = append([]*prof{profile}, allProfs...) // CPU profile first. } var ( resp bytes.Buffer // Temporary buffer for entire archive. - buf bytes.Buffer // Temporary buffer for each profile. + buf bytes.Buffer // Temporary buffer for each profile/query result. ) gz := gzip.NewWriter(&resp) tw := tar.NewWriter(gz) - for _, profile := range all { + + // Collect and write out profiles. + for _, profile := range allProfs { if profile.Name == "cpu" { if err := pprof.StartCPUProfile(&buf); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -123,6 +136,76 @@ func archiveProfiles(w http.ResponseWriter, r *http.Request) { buf.Reset() } + // Collect and write out the queries. + var allQueries = []struct { + name string + fn func() ([]*models.Row, error) + }{ + {"shards", h.showShards}, + {"stats", h.showStats}, + {"diagnostics", h.showDiagnostics}, + } + + tabW := tabwriter.NewWriter(&buf, 8, 8, 1, '\t', 0) + for _, query := range allQueries { + rows, err := query.fn() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + for i, row := range rows { + var out []byte + // Write the columns + for _, col := range row.Columns { + out = append(out, []byte(col+"\t")...) + } + out = append(out, []byte("\n")...) + if _, err := tabW.Write(out); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + // Write all the values + for _, val := range row.Values { + out = out[:0] + for _, v := range val { + out = append(out, []byte(fmt.Sprintf("%v\t", v))...) + } + out = append(out, []byte("\n")...) + if _, err := tabW.Write(out); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } + + // Write a final newline + if i < len(rows)-1 { + if _, err := tabW.Write([]byte("\n")); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } + } + + if err := tabW.Flush(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + err = tw.WriteHeader(&tar.Header{ + Name: query.name + ".txt", + Mode: 0600, + Size: int64(buf.Len()), + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + // Write the query file's data. + if _, err := tw.Write(buf.Bytes()); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + // Reset the buffer for the next query. + buf.Reset() + } + // Close the tar writer. if err := tw.Close(); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -139,6 +222,107 @@ func archiveProfiles(w http.ResponseWriter, r *http.Request) { io.Copy(w, &resp) // Nothing we can really do about an error at this point. } +// showShards generates the same values that a StatementExecutor would if a +// SHOW SHARDS query was executed. +func (h *Handler) showShards() ([]*models.Row, error) { + dis := h.MetaClient.Databases() + + rows := []*models.Row{} + for _, di := range dis { + row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name} + for _, rpi := range di.RetentionPolicies { + for _, sgi := range rpi.ShardGroups { + // Shards associated with deleted shard groups are effectively deleted. + // Don't list them. + if sgi.Deleted() { + continue + } + + for _, si := range sgi.Shards { + ownerIDs := make([]uint64, len(si.Owners)) + for i, owner := range si.Owners { + ownerIDs[i] = owner.NodeID + } + + row.Values = append(row.Values, []interface{}{ + si.ID, + di.Name, + rpi.Name, + sgi.ID, + sgi.StartTime.UTC().Format(time.RFC3339), + sgi.EndTime.UTC().Format(time.RFC3339), + sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339), + joinUint64(ownerIDs), + }) + } + } + } + rows = append(rows, row) + } + return rows, nil +} + +// showDiagnostics generates the same values that a StatementExecutor would if a +// SHOW DIAGNOSTICS query was executed. +func (h *Handler) showDiagnostics() ([]*models.Row, error) { + diags, err := h.Monitor.Diagnostics() + if err != nil { + return nil, err + } + + // Get a sorted list of diagnostics keys. + sortedKeys := make([]string, 0, len(diags)) + for k := range diags { + sortedKeys = append(sortedKeys, k) + } + sort.Strings(sortedKeys) + + rows := make([]*models.Row, 0, len(diags)) + for _, k := range sortedKeys { + row := &models.Row{Name: k} + + row.Columns = diags[k].Columns + row.Values = diags[k].Rows + rows = append(rows, row) + } + return rows, nil +} + +// showStats generates the same values that a StatementExecutor would if a +// SHOW STATS query was executed. +func (h *Handler) showStats() ([]*models.Row, error) { + stats, err := h.Monitor.Statistics(nil) + if err != nil { + return nil, err + } + + var rows []*models.Row + for _, stat := range stats { + row := &models.Row{Name: stat.Name, Tags: stat.Tags} + + values := make([]interface{}, 0, len(stat.Values)) + for _, k := range stat.ValueNames() { + row.Columns = append(row.Columns, k) + values = append(values, stat.Values[k]) + } + row.Values = [][]interface{}{values} + rows = append(rows, row) + } + return rows, nil +} + +// joinUint64 returns a comma-delimited string of uint64 numbers. +func joinUint64(a []uint64) string { + var buf bytes.Buffer + for i, x := range a { + buf.WriteString(strconv.FormatUint(x, 10)) + if i < len(a)-1 { + buf.WriteRune(',') + } + } + return buf.String() +} + // Taken from net/http/pprof/pprof.go func sleep(w http.ResponseWriter, d time.Duration) { var clientGone <-chan bool