Add HH diagnostics

pull/4516/head
Philip O'Toole 2015-10-26 15:32:20 -07:00
parent 87299caad1
commit f703f58d22
4 changed files with 66 additions and 0 deletions

View File

@ -129,6 +129,7 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
// Create the hinted handoff service
s.HintedHandoff = hh.NewService(c.HintedHandoff, s.ShardWriter, s.MetaStore)
s.HintedHandoff.Monitor = s.Monitor
// Create the Subscriber service
s.Subscriber = subscriber.NewService(c.Subscriber)

View File

@ -249,6 +249,22 @@ func (n *NodeProcessor) SendWrite() (int, error) {
return len(buf), nil
}
func (n *NodeProcessor) Head() string {
qp, err := n.queue.Position()
if err != nil {
return ""
}
return qp.head
}
func (n *NodeProcessor) Tail() string {
qp, err := n.queue.Position()
if err != nil {
return ""
}
return qp.tail
}
func marshalWrite(shardID uint64, points []models.Point) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, shardID)

View File

@ -72,6 +72,10 @@ type queue struct {
// The segments that exist on disk
segments segments
}
type queuePos struct {
head string
tail string
}
type segments []*segment
@ -214,6 +218,20 @@ func (l *queue) LastModified() (time.Time, error) {
return time.Time{}.UTC(), nil
}
func (l *queue) Position() (*queuePos, error) {
l.mu.RLock()
defer l.mu.RUnlock()
qp := &queuePos{}
if l.head != nil {
qp.head = fmt.Sprintf("%s:%d", l.head.path, l.head.pos)
}
if l.tail != nil {
qp.tail = fmt.Sprintf("%s:%d", l.tail.path, l.tail.filePos())
}
return qp, nil
}
// diskUsage returns the total size on disk used by the queue
func (l *queue) diskUsage() int64 {
var size int64

View File

@ -15,6 +15,7 @@ import (
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/monitor"
)
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")
@ -40,6 +41,11 @@ type Service struct {
shardWriter shardWriter
metastore metaStore
Monitor interface {
RegisterDiagnosticsClient(name string, client monitor.DiagsClient)
DeregisterDiagnosticsClient(name string)
}
}
type shardWriter interface {
@ -76,6 +82,11 @@ func (s *Service) Open() error {
s.Logger.Printf("Starting hinted handoff service")
s.closing = make(chan struct{})
// Register diagnostics if a Monitor service is available.
if s.Monitor != nil {
s.Monitor.RegisterDiagnosticsClient("hh", s)
}
// Create the root directory if it doesn't already exist.
s.Logger.Printf("Using data dir: %v", s.cfg.Dir)
if err := os.MkdirAll(s.cfg.Dir, 0700); err != nil {
@ -170,6 +181,26 @@ func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) err
return nil
}
// Diagnostics returns diagnostic information.
func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
s.mu.RLock()
defer s.mu.RUnlock()
d := &monitor.Diagnostic{
Columns: []string{"node", "last modified", "head", "tail"},
Rows: make([][]interface{}, 0, len(s.processors)),
}
for k, v := range s.processors {
lm, err := v.LastModified()
if err != nil {
return nil, err
}
d.Rows = append(d.Rows, []interface{}{k, lm, v.Head(), v.Tail()})
}
return d, nil
}
// purgeInactiveProcessors will cause the service to remove processors for inactive nodes.
func (s *Service) purgeInactiveProcessors() {
defer s.wg.Done()