From f703f58d22f94f4e6803fcbd2316d82c2a3e5dc1 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 26 Oct 2015 15:32:20 -0700 Subject: [PATCH] Add HH diagnostics --- cmd/influxd/run/server.go | 1 + services/hh/node_processor.go | 16 ++++++++++++++++ services/hh/queue.go | 18 ++++++++++++++++++ services/hh/service.go | 31 +++++++++++++++++++++++++++++++ 4 files changed, 66 insertions(+) diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 9a7a5c6e05..6ae658b71b 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -129,6 +129,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) { // Create the hinted handoff service s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaStore) + s.HintedHandoff.Monitor = s.Monitor // Create the Subscriber service s.Subscriber = subscriber.NewService(c.Subscriber) diff --git a/services/hh/node_processor.go b/services/hh/node_processor.go index 792265d4e6..2b658185d7 100644 --- a/services/hh/node_processor.go +++ b/services/hh/node_processor.go @@ -249,6 +249,22 @@ func (n *NodeProcessor) SendWrite() (int, error) { return len(buf), nil } +func (n *NodeProcessor) Head() string { + qp, err := n.queue.Position() + if err != nil { + return "" + } + return qp.head +} + +func (n *NodeProcessor) Tail() string { + qp, err := n.queue.Position() + if err != nil { + return "" + } + return qp.tail +} + func marshalWrite(shardID uint64, points []models.Point) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, shardID) diff --git a/services/hh/queue.go b/services/hh/queue.go index ea80f9e2a7..19c004fccb 100644 --- a/services/hh/queue.go +++ b/services/hh/queue.go @@ -72,6 +72,10 @@ type queue struct { // The segments that exist on disk segments segments } +type queuePos struct { + head string + tail string +} type segments []*segment @@ -214,6 +218,20 @@ func (l *queue) LastModified() (time.Time, error) { return time.Time{}.UTC(), nil } +func (l *queue) Position() (*queuePos, error) { + l.mu.RLock() + defer l.mu.RUnlock() + + qp := &queuePos{} + if l.head != nil { + qp.head = fmt.Sprintf("%s:%d", l.head.path, l.head.pos) + } + if l.tail != nil { + qp.tail = fmt.Sprintf("%s:%d", l.tail.path, l.tail.filePos()) + } + return qp, nil +} + // diskUsage returns the total size on disk used by the queue func (l *queue) diskUsage() int64 { var size int64 diff --git a/services/hh/service.go b/services/hh/service.go index 29b6217432..6cef7fb648 100644 --- a/services/hh/service.go +++ b/services/hh/service.go @@ -15,6 +15,7 @@ import ( "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/meta" "github.com/influxdb/influxdb/models" + "github.com/influxdb/influxdb/monitor" ) var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled") @@ -40,6 +41,11 @@ type Service struct { shardWriter shardWriter metastore metaStore + + Monitor interface { + RegisterDiagnosticsClient(name string, client monitor.DiagsClient) + DeregisterDiagnosticsClient(name string) + } } type shardWriter interface { @@ -76,6 +82,11 @@ func (s *Service) Open() error { s.Logger.Printf("Starting hinted handoff service") s.closing = make(chan struct{}) + // Register diagnostics if a Monitor service is available. + if s.Monitor != nil { + s.Monitor.RegisterDiagnosticsClient("hh", s) + } + // Create the root directory if it doesn't already exist. s.Logger.Printf("Using data dir: %v", s.cfg.Dir) if err := os.MkdirAll(s.cfg.Dir, 0700); err != nil { @@ -170,6 +181,26 @@ func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) err return nil } +// Diagnostics returns diagnostic information. +func (s *Service) Diagnostics() (*monitor.Diagnostic, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + d := &monitor.Diagnostic{ + Columns: []string{"node", "last modified", "head", "tail"}, + Rows: make([][]interface{}, 0, len(s.processors)), + } + for k, v := range s.processors { + lm, err := v.LastModified() + if err != nil { + return nil, err + } + + d.Rows = append(d.Rows, []interface{}{k, lm, v.Head(), v.Tail()}) + } + return d, nil +} + // purgeInactiveProcessors will cause the service to remove processors for inactive nodes. func (s *Service) purgeInactiveProcessors() { defer s.wg.Done()