Add support for shards, stats and diagnostics
parent
8f8ff0ec61
commit
1cbbaa9317
|
@ -22,11 +22,12 @@ The admin UI is removed and unusable in this release. The `[admin]` configuratio
|
|||
- [#8302](https://github.com/influxdata/influxdb/pull/8302): Write throughput/concurrency improvements
|
||||
- [#8273](https://github.com/influxdata/influxdb/issues/8273): Remove the admin UI.
|
||||
- [#8327](https://github.com/influxdata/influxdb/pull/8327): Update to go1.8.1
|
||||
- [#8348](https://github.com/influxd ata/influxdb/pull/8348): Add max concurrent compaction limits
|
||||
- [#8348](https://github.com/influxdata/influxdb/pull/8348): Add max concurrent compaction limits
|
||||
- [#8366](https://github.com/influxdata/influxdb/pull/8366): Add TSI support tooling.
|
||||
- [#8350](https://github.com/influxdata/influxdb/pull/8350): Track HTTP client requests for /write and /query with /debug/requests.
|
||||
<<<<<<< 8f8ff0ec612e6e77ff618f572b302e069de8e5c3
|
||||
- [#8384](https://github.com/influxdata/influxdb/pull/8384): Write and compaction stability
|
||||
- [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles single in archive.
|
||||
- [#7862](https://github.com/influxdata/influxdb/pull/7861): Add new profile endpoint for gathering all debug profiles and querues in single archive.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ type MetaClientMock struct {
|
|||
|
||||
RetentionPolicyFn func(database, name string) (rpi *meta.RetentionPolicyInfo, err error)
|
||||
|
||||
AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error)
|
||||
AdminUserExistsFn func() bool
|
||||
SetAdminPrivilegeFn func(username string, admin bool) error
|
||||
SetDataFn func(*meta.Data) error
|
||||
SetPrivilegeFn func(username, database string, p influxql.Privilege) error
|
||||
|
@ -43,6 +45,7 @@ type MetaClientMock struct {
|
|||
UpdateUserFn func(name, password string) error
|
||||
UserPrivilegeFn func(username, database string) (*influxql.Privilege, error)
|
||||
UserPrivilegesFn func(username string) (map[string]influxql.Privilege, error)
|
||||
UserFn func(username string) (*meta.UserInfo, error)
|
||||
UsersFn func() []meta.UserInfo
|
||||
}
|
||||
|
||||
|
@ -150,7 +153,13 @@ func (c *MetaClientMock) UserPrivileges(username string) (map[string]influxql.Pr
|
|||
return c.UserPrivilegesFn(username)
|
||||
}
|
||||
|
||||
func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() }
|
||||
func (c *MetaClientMock) Authenticate(username, password string) (*meta.UserInfo, error) {
|
||||
return c.AuthenticateFn(username, password)
|
||||
}
|
||||
func (c *MetaClientMock) AdminUserExists() bool { return c.AdminUserExistsFn() }
|
||||
|
||||
func (c *MetaClientMock) User(username string) (*meta.UserInfo, error) { return c.UserFn(username) }
|
||||
func (c *MetaClientMock) Users() []meta.UserInfo { return c.UsersFn() }
|
||||
|
||||
func (c *MetaClientMock) Open() error { return c.OpenFn() }
|
||||
func (c *MetaClientMock) Data() meta.Data { return c.DataFn() }
|
||||
|
|
|
@ -74,6 +74,7 @@ type Handler struct {
|
|||
|
||||
MetaClient interface {
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
Databases() []meta.DatabaseInfo
|
||||
Authenticate(username, password string) (ui *meta.UserInfo, err error)
|
||||
User(username string) (*meta.UserInfo, error)
|
||||
AdminUserExists() bool
|
||||
|
@ -251,7 +252,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
w.Header().Add("X-Influxdb-Version", h.Version)
|
||||
|
||||
if strings.HasPrefix(r.URL.Path, "/debug/pprof") && h.Config.PprofEnabled {
|
||||
handleProfiles(w, r)
|
||||
h.handleProfiles(w, r)
|
||||
} else if strings.HasPrefix(r.URL.Path, "/debug/vars") {
|
||||
h.serveExpvar(w, r)
|
||||
} else if strings.HasPrefix(r.URL.Path, "/debug/requests") {
|
||||
|
|
|
@ -14,6 +14,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/internal"
|
||||
|
||||
"github.com/dgrijalva/jwt-go"
|
||||
"github.com/influxdata/influxdb/influxql"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
@ -605,7 +607,7 @@ func TestHandler_XForwardedFor(t *testing.T) {
|
|||
// NewHandler represents a test wrapper for httpd.Handler.
|
||||
type Handler struct {
|
||||
*httpd.Handler
|
||||
MetaClient HandlerMetaStore
|
||||
MetaClient *internal.MetaClientMock
|
||||
StatementExecutor HandlerStatementExecutor
|
||||
QueryAuthorizer HandlerQueryAuthorizer
|
||||
}
|
||||
|
@ -619,7 +621,10 @@ func NewHandler(requireAuthentication bool) *Handler {
|
|||
h := &Handler{
|
||||
Handler: httpd.NewHandler(config),
|
||||
}
|
||||
h.Handler.MetaClient = &h.MetaClient
|
||||
|
||||
h.MetaClient = &internal.MetaClientMock{}
|
||||
|
||||
h.Handler.MetaClient = h.MetaClient
|
||||
h.Handler.QueryExecutor = influxql.NewQueryExecutor()
|
||||
h.Handler.QueryExecutor.StatementExecutor = &h.StatementExecutor
|
||||
h.Handler.QueryAuthorizer = &h.QueryAuthorizer
|
||||
|
@ -627,39 +632,6 @@ func NewHandler(requireAuthentication bool) *Handler {
|
|||
return h
|
||||
}
|
||||
|
||||
// HandlerMetaStore is a mock implementation of Handler.MetaClient.
|
||||
type HandlerMetaStore struct {
|
||||
PingFn func(d time.Duration) error
|
||||
DatabaseFn func(name string) *meta.DatabaseInfo
|
||||
AuthenticateFn func(username, password string) (ui *meta.UserInfo, err error)
|
||||
UserFn func(username string) (*meta.UserInfo, error)
|
||||
AdminUserExistsFn func() bool
|
||||
}
|
||||
|
||||
func (s *HandlerMetaStore) Ping(b bool) error {
|
||||
if s.PingFn == nil {
|
||||
// Default behaviour is to assume there is a leader.
|
||||
return nil
|
||||
}
|
||||
return s.Ping(b)
|
||||
}
|
||||
|
||||
func (s *HandlerMetaStore) Database(name string) *meta.DatabaseInfo {
|
||||
return s.DatabaseFn(name)
|
||||
}
|
||||
|
||||
func (s *HandlerMetaStore) Authenticate(username, password string) (ui *meta.UserInfo, err error) {
|
||||
return s.AuthenticateFn(username, password)
|
||||
}
|
||||
|
||||
func (s *HandlerMetaStore) AdminUserExists() bool {
|
||||
return s.AdminUserExistsFn()
|
||||
}
|
||||
|
||||
func (s *HandlerMetaStore) User(username string) (*meta.UserInfo, error) {
|
||||
return s.UserFn(username)
|
||||
}
|
||||
|
||||
// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
|
||||
type HandlerStatementExecutor struct {
|
||||
ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error
|
||||
|
|
|
@ -4,16 +4,21 @@ import (
|
|||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
httppprof "net/http/pprof"
|
||||
"runtime/pprof"
|
||||
"sort"
|
||||
"strconv"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/models"
|
||||
)
|
||||
|
||||
// handleProfiles determines which profile to return to the requester.
|
||||
func handleProfiles(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Handler) handleProfiles(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/debug/pprof/cmdline":
|
||||
httppprof.Cmdline(w, r)
|
||||
|
@ -22,7 +27,7 @@ func handleProfiles(w http.ResponseWriter, r *http.Request) {
|
|||
case "/debug/pprof/symbol":
|
||||
httppprof.Symbol(w, r)
|
||||
case "/debug/pprof/all":
|
||||
archiveProfiles(w, r)
|
||||
h.archiveProfilesAndQueries(w, r)
|
||||
default:
|
||||
httppprof.Index(w, r)
|
||||
}
|
||||
|
@ -35,13 +40,19 @@ type prof struct {
|
|||
Debug int64
|
||||
}
|
||||
|
||||
// archiveProfiles collects the following profiles:
|
||||
// archiveProfilesAndQueries collects the following profiles:
|
||||
// - goroutine profile
|
||||
// - heap profile
|
||||
// - blocking profile
|
||||
// - (optionally) CPU profile
|
||||
//
|
||||
// All profiles are added to a tar archive and then compressed, before being
|
||||
// It also collects the following query results:
|
||||
//
|
||||
// - SHOW SHARDS
|
||||
// - SHOW STATS
|
||||
// - SHOW DIAGNOSTICS
|
||||
//
|
||||
// All information is added to a tar archive and then compressed, before being
|
||||
// returned to the requester as an archive file. Where profiles support debug
|
||||
// parameters, the profile is collected with debug=1. To optionally include a
|
||||
// CPU profile, the requester should provide a `cpu` query parameter, and can
|
||||
|
@ -55,8 +66,8 @@ type prof struct {
|
|||
// The value after the `cpu` query parameter is not actually important, as long
|
||||
// as there is something there.
|
||||
//
|
||||
func archiveProfiles(w http.ResponseWriter, r *http.Request) {
|
||||
var all = []*prof{
|
||||
func (h *Handler) archiveProfilesAndQueries(w http.ResponseWriter, r *http.Request) {
|
||||
var allProfs = []*prof{
|
||||
{Name: "goroutine", Debug: 1},
|
||||
{Name: "block", Debug: 1},
|
||||
{Name: "heap", Debug: 1},
|
||||
|
@ -72,17 +83,19 @@ func archiveProfiles(w http.ResponseWriter, r *http.Request) {
|
|||
if profile.Debug <= 0 {
|
||||
profile.Debug = 30
|
||||
}
|
||||
all = append([]*prof{profile}, all...) // CPU profile first.
|
||||
allProfs = append([]*prof{profile}, allProfs...) // CPU profile first.
|
||||
}
|
||||
|
||||
var (
|
||||
resp bytes.Buffer // Temporary buffer for entire archive.
|
||||
buf bytes.Buffer // Temporary buffer for each profile.
|
||||
buf bytes.Buffer // Temporary buffer for each profile/query result.
|
||||
)
|
||||
|
||||
gz := gzip.NewWriter(&resp)
|
||||
tw := tar.NewWriter(gz)
|
||||
for _, profile := range all {
|
||||
|
||||
// Collect and write out profiles.
|
||||
for _, profile := range allProfs {
|
||||
if profile.Name == "cpu" {
|
||||
if err := pprof.StartCPUProfile(&buf); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -123,6 +136,76 @@ func archiveProfiles(w http.ResponseWriter, r *http.Request) {
|
|||
buf.Reset()
|
||||
}
|
||||
|
||||
// Collect and write out the queries.
|
||||
var allQueries = []struct {
|
||||
name string
|
||||
fn func() ([]*models.Row, error)
|
||||
}{
|
||||
{"shards", h.showShards},
|
||||
{"stats", h.showStats},
|
||||
{"diagnostics", h.showDiagnostics},
|
||||
}
|
||||
|
||||
tabW := tabwriter.NewWriter(&buf, 8, 8, 1, '\t', 0)
|
||||
for _, query := range allQueries {
|
||||
rows, err := query.fn()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
for i, row := range rows {
|
||||
var out []byte
|
||||
// Write the columns
|
||||
for _, col := range row.Columns {
|
||||
out = append(out, []byte(col+"\t")...)
|
||||
}
|
||||
out = append(out, []byte("\n")...)
|
||||
if _, err := tabW.Write(out); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// Write all the values
|
||||
for _, val := range row.Values {
|
||||
out = out[:0]
|
||||
for _, v := range val {
|
||||
out = append(out, []byte(fmt.Sprintf("%v\t", v))...)
|
||||
}
|
||||
out = append(out, []byte("\n")...)
|
||||
if _, err := tabW.Write(out); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
|
||||
// Write a final newline
|
||||
if i < len(rows)-1 {
|
||||
if _, err := tabW.Write([]byte("\n")); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := tabW.Flush(); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
err = tw.WriteHeader(&tar.Header{
|
||||
Name: query.name + ".txt",
|
||||
Mode: 0600,
|
||||
Size: int64(buf.Len()),
|
||||
})
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// Write the query file's data.
|
||||
if _, err := tw.Write(buf.Bytes()); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
||||
// Reset the buffer for the next query.
|
||||
buf.Reset()
|
||||
}
|
||||
|
||||
// Close the tar writer.
|
||||
if err := tw.Close(); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
|
@ -139,6 +222,107 @@ func archiveProfiles(w http.ResponseWriter, r *http.Request) {
|
|||
io.Copy(w, &resp) // Nothing we can really do about an error at this point.
|
||||
}
|
||||
|
||||
// showShards generates the same values that a StatementExecutor would if a
|
||||
// SHOW SHARDS query was executed.
|
||||
func (h *Handler) showShards() ([]*models.Row, error) {
|
||||
dis := h.MetaClient.Databases()
|
||||
|
||||
rows := []*models.Row{}
|
||||
for _, di := range dis {
|
||||
row := &models.Row{Columns: []string{"id", "database", "retention_policy", "shard_group", "start_time", "end_time", "expiry_time", "owners"}, Name: di.Name}
|
||||
for _, rpi := range di.RetentionPolicies {
|
||||
for _, sgi := range rpi.ShardGroups {
|
||||
// Shards associated with deleted shard groups are effectively deleted.
|
||||
// Don't list them.
|
||||
if sgi.Deleted() {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, si := range sgi.Shards {
|
||||
ownerIDs := make([]uint64, len(si.Owners))
|
||||
for i, owner := range si.Owners {
|
||||
ownerIDs[i] = owner.NodeID
|
||||
}
|
||||
|
||||
row.Values = append(row.Values, []interface{}{
|
||||
si.ID,
|
||||
di.Name,
|
||||
rpi.Name,
|
||||
sgi.ID,
|
||||
sgi.StartTime.UTC().Format(time.RFC3339),
|
||||
sgi.EndTime.UTC().Format(time.RFC3339),
|
||||
sgi.EndTime.Add(rpi.Duration).UTC().Format(time.RFC3339),
|
||||
joinUint64(ownerIDs),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// showDiagnostics generates the same values that a StatementExecutor would if a
|
||||
// SHOW DIAGNOSTICS query was executed.
|
||||
func (h *Handler) showDiagnostics() ([]*models.Row, error) {
|
||||
diags, err := h.Monitor.Diagnostics()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get a sorted list of diagnostics keys.
|
||||
sortedKeys := make([]string, 0, len(diags))
|
||||
for k := range diags {
|
||||
sortedKeys = append(sortedKeys, k)
|
||||
}
|
||||
sort.Strings(sortedKeys)
|
||||
|
||||
rows := make([]*models.Row, 0, len(diags))
|
||||
for _, k := range sortedKeys {
|
||||
row := &models.Row{Name: k}
|
||||
|
||||
row.Columns = diags[k].Columns
|
||||
row.Values = diags[k].Rows
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// showStats generates the same values that a StatementExecutor would if a
|
||||
// SHOW STATS query was executed.
|
||||
func (h *Handler) showStats() ([]*models.Row, error) {
|
||||
stats, err := h.Monitor.Statistics(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var rows []*models.Row
|
||||
for _, stat := range stats {
|
||||
row := &models.Row{Name: stat.Name, Tags: stat.Tags}
|
||||
|
||||
values := make([]interface{}, 0, len(stat.Values))
|
||||
for _, k := range stat.ValueNames() {
|
||||
row.Columns = append(row.Columns, k)
|
||||
values = append(values, stat.Values[k])
|
||||
}
|
||||
row.Values = [][]interface{}{values}
|
||||
rows = append(rows, row)
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// joinUint64 returns a comma-delimited string of uint64 numbers.
|
||||
func joinUint64(a []uint64) string {
|
||||
var buf bytes.Buffer
|
||||
for i, x := range a {
|
||||
buf.WriteString(strconv.FormatUint(x, 10))
|
||||
if i < len(a)-1 {
|
||||
buf.WriteRune(',')
|
||||
}
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// Taken from net/http/pprof/pprof.go
|
||||
func sleep(w http.ResponseWriter, d time.Duration) {
|
||||
var clientGone <-chan bool
|
||||
|
|
Loading…
Reference in New Issue