HH processor-level stats
This change maintains stats on a per-shard and per-node basis.pull/4265/head
parent
14db3ce9f5
commit
235714755c
|
@ -2,6 +2,7 @@ package hh
|
|||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
|
@ -11,10 +12,17 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdb/influxdb"
|
||||
"github.com/influxdb/influxdb/models"
|
||||
"github.com/influxdb/influxdb/tsdb"
|
||||
)
|
||||
|
||||
const (
|
||||
pointsHint = "points_hint"
|
||||
pointsWrite = "points_write"
|
||||
bytesWrite = "bytes_write"
|
||||
)
|
||||
|
||||
type Processor struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
|
@ -26,6 +34,10 @@ type Processor struct {
|
|||
queues map[uint64]*queue
|
||||
writer shardWriter
|
||||
Logger *log.Logger
|
||||
|
||||
// Shard-level and node-level HH stats.
|
||||
shardStatMaps map[uint64]*expvar.Map
|
||||
nodeStatMaps map[uint64]*expvar.Map
|
||||
}
|
||||
|
||||
type ProcessorOptions struct {
|
||||
|
@ -35,10 +47,12 @@ type ProcessorOptions struct {
|
|||
|
||||
func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) {
|
||||
p := &Processor{
|
||||
dir: dir,
|
||||
queues: map[uint64]*queue{},
|
||||
writer: writer,
|
||||
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
||||
dir: dir,
|
||||
queues: map[uint64]*queue{},
|
||||
writer: writer,
|
||||
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
||||
shardStatMaps: make(map[uint64]*expvar.Map),
|
||||
nodeStatMaps: make(map[uint64]*expvar.Map),
|
||||
}
|
||||
p.setOptions(options)
|
||||
|
||||
|
@ -101,6 +115,11 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) {
|
|||
return nil, err
|
||||
}
|
||||
p.queues[nodeID] = queue
|
||||
|
||||
// Create node stats for this queue.
|
||||
key := fmt.Sprintf("hh_processor:node:%d", nodeID)
|
||||
tags := map[string]string{"nodeID": strconv.FormatUint(nodeID, 10)}
|
||||
p.nodeStatMaps[nodeID] = influxdb.NewStatistics(key, "hh_processor", tags)
|
||||
return queue, nil
|
||||
}
|
||||
|
||||
|
@ -113,6 +132,10 @@ func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) e
|
|||
}
|
||||
}
|
||||
|
||||
// Update stats
|
||||
p.updateShardStats(shardID, pointsHint, int64(len(points)))
|
||||
p.nodeStatMaps[ownerID].Add(pointsHint, int64(len(points)))
|
||||
|
||||
b := p.marshalWrite(shardID, points)
|
||||
return queue.Append(b)
|
||||
}
|
||||
|
@ -159,6 +182,8 @@ func (p *Processor) Process() error {
|
|||
res <- nil
|
||||
break
|
||||
}
|
||||
p.updateShardStats(shardID, pointsWrite, int64(len(points)))
|
||||
p.nodeStatMaps[nodeID].Add(pointsWrite, int64(len(points)))
|
||||
|
||||
// If we get here, the write succeeded so advance the queue to the next item
|
||||
if err := q.Advance(); err != nil {
|
||||
|
@ -170,6 +195,8 @@ func (p *Processor) Process() error {
|
|||
|
||||
// Update how many bytes we've sent
|
||||
limiter.Update(len(buf))
|
||||
p.updateShardStats(shardID, bytesWrite, int64(len(buf)))
|
||||
p.nodeStatMaps[nodeID].Add(bytesWrite, int64(len(buf)))
|
||||
|
||||
// Block to maintain the throughput rate
|
||||
time.Sleep(limiter.Delay())
|
||||
|
@ -206,6 +233,17 @@ func (p *Processor) unmarshalWrite(b []byte) (uint64, []models.Point, error) {
|
|||
return ownerID, points, err
|
||||
}
|
||||
|
||||
func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) {
|
||||
m, ok := p.shardStatMaps[shardID]
|
||||
if !ok {
|
||||
key := fmt.Sprintf("hh_processor:shard:%d", shardID)
|
||||
tags := map[string]string{"shardID": strconv.FormatUint(shardID, 10)}
|
||||
p.shardStatMaps[shardID] = influxdb.NewStatistics(key, "hh_processor", tags)
|
||||
m = p.shardStatMaps[shardID]
|
||||
}
|
||||
m.Add(stat, inc)
|
||||
}
|
||||
|
||||
func (p *Processor) PurgeOlderThan(when time.Duration) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
|
Loading…
Reference in New Issue