From 235714755c5b813c692b93115d1d39e41114f019 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 28 Sep 2015 18:33:31 -0700 Subject: [PATCH] HH processor-level stats This change maintains stats on a per-shard and per-node basis. --- services/hh/processor.go | 46 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/services/hh/processor.go b/services/hh/processor.go index af790ba258..769ddec95f 100644 --- a/services/hh/processor.go +++ b/services/hh/processor.go @@ -2,6 +2,7 @@ package hh import ( "encoding/binary" + "expvar" "fmt" "io/ioutil" "log" @@ -11,10 +12,17 @@ import ( "sync" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/models" "github.com/influxdb/influxdb/tsdb" ) +const ( + pointsHint = "points_hint" + pointsWrite = "points_write" + bytesWrite = "bytes_write" +) + type Processor struct { mu sync.RWMutex @@ -26,6 +34,10 @@ type Processor struct { queues map[uint64]*queue writer shardWriter Logger *log.Logger + + // Shard-level and node-level HH stats. + shardStatMaps map[uint64]*expvar.Map + nodeStatMaps map[uint64]*expvar.Map } type ProcessorOptions struct { @@ -35,10 +47,12 @@ type ProcessorOptions struct { func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) { p := &Processor{ - dir: dir, - queues: map[uint64]*queue{}, - writer: writer, - Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), + dir: dir, + queues: map[uint64]*queue{}, + writer: writer, + Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), + shardStatMaps: make(map[uint64]*expvar.Map), + nodeStatMaps: make(map[uint64]*expvar.Map), } p.setOptions(options) @@ -101,6 +115,11 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) { return nil, err } p.queues[nodeID] = queue + + // Create node stats for this queue. + key := fmt.Sprintf("hh_processor:node:%d", nodeID) + tags := map[string]string{"nodeID": strconv.FormatUint(nodeID, 10)} + p.nodeStatMaps[nodeID] = influxdb.NewStatistics(key, "hh_processor", tags) return queue, nil } @@ -113,6 +132,10 @@ func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) e } } + // Update stats + p.updateShardStats(shardID, pointsHint, int64(len(points))) + p.nodeStatMaps[ownerID].Add(pointsHint, int64(len(points))) + b := p.marshalWrite(shardID, points) return queue.Append(b) } @@ -159,6 +182,8 @@ func (p *Processor) Process() error { res <- nil break } + p.updateShardStats(shardID, pointsWrite, int64(len(points))) + p.nodeStatMaps[nodeID].Add(pointsWrite, int64(len(points))) // If we get here, the write succeeded so advance the queue to the next item if err := q.Advance(); err != nil { @@ -170,6 +195,8 @@ func (p *Processor) Process() error { // Update how many bytes we've sent limiter.Update(len(buf)) + p.updateShardStats(shardID, bytesWrite, int64(len(buf))) + p.nodeStatMaps[nodeID].Add(bytesWrite, int64(len(buf))) // Block to maintain the throughput rate time.Sleep(limiter.Delay()) @@ -206,6 +233,17 @@ func (p *Processor) unmarshalWrite(b []byte) (uint64, []models.Point, error) { return ownerID, points, err } +func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) { + m, ok := p.shardStatMaps[shardID] + if !ok { + key := fmt.Sprintf("hh_processor:shard:%d", shardID) + tags := map[string]string{"shardID": strconv.FormatUint(shardID, 10)} + p.shardStatMaps[shardID] = influxdb.NewStatistics(key, "hh_processor", tags) + m = p.shardStatMaps[shardID] + } + m.Add(stat, inc) +} + func (p *Processor) PurgeOlderThan(when time.Duration) error { p.mu.Lock() defer p.mu.Unlock()