diff --git a/CHANGELOG.md b/CHANGELOG.md index da8770c2bd..aca70a1120 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - [#4196](https://github.com/influxdb/influxdb/pull/4196): Export tsdb.Iterator - [#4198](https://github.com/influxdb/influxdb/pull/4198): Add basic cluster-service stats - [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy +- [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff ### Bugfixes - [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW diff --git a/services/hh/processor.go b/services/hh/processor.go index af790ba258..769ddec95f 100644 --- a/services/hh/processor.go +++ b/services/hh/processor.go @@ -2,6 +2,7 @@ package hh import ( "encoding/binary" + "expvar" "fmt" "io/ioutil" "log" @@ -11,10 +12,17 @@ import ( "sync" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/models" "github.com/influxdb/influxdb/tsdb" ) +const ( + pointsHint = "points_hint" + pointsWrite = "points_write" + bytesWrite = "bytes_write" +) + type Processor struct { mu sync.RWMutex @@ -26,6 +34,10 @@ type Processor struct { queues map[uint64]*queue writer shardWriter Logger *log.Logger + + // Shard-level and node-level HH stats. + shardStatMaps map[uint64]*expvar.Map + nodeStatMaps map[uint64]*expvar.Map } type ProcessorOptions struct { @@ -35,10 +47,12 @@ type ProcessorOptions struct { func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) { p := &Processor{ - dir: dir, - queues: map[uint64]*queue{}, - writer: writer, - Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), + dir: dir, + queues: map[uint64]*queue{}, + writer: writer, + Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), + shardStatMaps: make(map[uint64]*expvar.Map), + nodeStatMaps: make(map[uint64]*expvar.Map), } p.setOptions(options) @@ -101,6 +115,11 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) { return nil, err } p.queues[nodeID] = queue + + // Create node stats for this queue. + key := fmt.Sprintf("hh_processor:node:%d", nodeID) + tags := map[string]string{"nodeID": strconv.FormatUint(nodeID, 10)} + p.nodeStatMaps[nodeID] = influxdb.NewStatistics(key, "hh_processor", tags) return queue, nil } @@ -113,6 +132,10 @@ func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) e } } + // Update stats + p.updateShardStats(shardID, pointsHint, int64(len(points))) + p.nodeStatMaps[ownerID].Add(pointsHint, int64(len(points))) + b := p.marshalWrite(shardID, points) return queue.Append(b) } @@ -159,6 +182,8 @@ func (p *Processor) Process() error { res <- nil break } + p.updateShardStats(shardID, pointsWrite, int64(len(points))) + p.nodeStatMaps[nodeID].Add(pointsWrite, int64(len(points))) // If we get here, the write succeeded so advance the queue to the next item if err := q.Advance(); err != nil { @@ -170,6 +195,8 @@ func (p *Processor) Process() error { // Update how many bytes we've sent limiter.Update(len(buf)) + p.updateShardStats(shardID, bytesWrite, int64(len(buf))) + p.nodeStatMaps[nodeID].Add(bytesWrite, int64(len(buf))) // Block to maintain the throughput rate time.Sleep(limiter.Delay()) @@ -206,6 +233,17 @@ func (p *Processor) unmarshalWrite(b []byte) (uint64, []models.Point, error) { return ownerID, points, err } +func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) { + m, ok := p.shardStatMaps[shardID] + if !ok { + key := fmt.Sprintf("hh_processor:shard:%d", shardID) + tags := map[string]string{"shardID": strconv.FormatUint(shardID, 10)} + p.shardStatMaps[shardID] = influxdb.NewStatistics(key, "hh_processor", tags) + m = p.shardStatMaps[shardID] + } + m.Add(stat, inc) +} + func (p *Processor) PurgeOlderThan(when time.Duration) error { p.mu.Lock() defer p.mu.Unlock() diff --git a/services/hh/service.go b/services/hh/service.go index febab4e0bc..cf9424a123 100644 --- a/services/hh/service.go +++ b/services/hh/service.go @@ -1,25 +1,36 @@ package hh import ( + "expvar" "fmt" "io" "log" "os" + "strings" "sync" "time" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/models" ) var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled") +const ( + writeShardReq = "write_shard_req" + writeShardReqPoints = "write_shard_req_points" + processReq = "process_req" + processReqFail = "process_req_fail" +) + type Service struct { mu sync.RWMutex wg sync.WaitGroup closing chan struct{} - Logger *log.Logger - cfg Config + statMap *expvar.Map + Logger *log.Logger + cfg Config ShardWriter shardWriter @@ -36,9 +47,13 @@ type shardWriter interface { // NewService returns a new instance of Service. func NewService(c Config, w shardWriter) *Service { + key := strings.Join([]string{"hh", c.Dir}, ":") + tags := map[string]string{"path": c.Dir} + s := &Service{ - cfg: c, - Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), + cfg: c, + statMap: influxdb.NewStatistics(key, "hh", tags), + Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags), } processor, err := NewProcessor(c.Dir, w, ProcessorOptions{ MaxSize: c.MaxSize, @@ -93,6 +108,8 @@ func (s *Service) SetLogger(l *log.Logger) { // WriteShard queues the points write for shardID to node ownerID to handoff queue func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error { + s.statMap.Add(writeShardReq, 1) + s.statMap.Add(writeShardReqPoints, int64(len(points))) if !s.cfg.Enabled { return ErrHintedHandoffDisabled } @@ -109,7 +126,9 @@ func (s *Service) retryWrites() { case <-s.closing: return case <-ticker.C: + s.statMap.Add(processReq, 1) if err := s.HintedHandoff.Process(); err != nil && err != io.EOF { + s.statMap.Add(processReqFail, 1) s.Logger.Printf("retried write failed: %v", err) } }