commit
a196644167
|
@ -10,6 +10,7 @@
|
||||||
- [#4196](https://github.com/influxdb/influxdb/pull/4196): Export tsdb.Iterator
|
- [#4196](https://github.com/influxdb/influxdb/pull/4196): Export tsdb.Iterator
|
||||||
- [#4198](https://github.com/influxdb/influxdb/pull/4198): Add basic cluster-service stats
|
- [#4198](https://github.com/influxdb/influxdb/pull/4198): Add basic cluster-service stats
|
||||||
- [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy
|
- [#4262](https://github.com/influxdb/influxdb/pull/4262): Allow configuration of UDP retention policy
|
||||||
|
- [#4265](https://github.com/influxdb/influxdb/pull/4265): Add statistics for Hinted-Handoff
|
||||||
|
|
||||||
### Bugfixes
|
### Bugfixes
|
||||||
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW
|
- [#4166](https://github.com/influxdb/influxdb/pull/4166): Fix parser error on invalid SHOW
|
||||||
|
|
|
@ -2,6 +2,7 @@ package hh
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
@ -11,10 +12,17 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb"
|
||||||
"github.com/influxdb/influxdb/models"
|
"github.com/influxdb/influxdb/models"
|
||||||
"github.com/influxdb/influxdb/tsdb"
|
"github.com/influxdb/influxdb/tsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
pointsHint = "points_hint"
|
||||||
|
pointsWrite = "points_write"
|
||||||
|
bytesWrite = "bytes_write"
|
||||||
|
)
|
||||||
|
|
||||||
type Processor struct {
|
type Processor struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
|
|
||||||
|
@ -26,6 +34,10 @@ type Processor struct {
|
||||||
queues map[uint64]*queue
|
queues map[uint64]*queue
|
||||||
writer shardWriter
|
writer shardWriter
|
||||||
Logger *log.Logger
|
Logger *log.Logger
|
||||||
|
|
||||||
|
// Shard-level and node-level HH stats.
|
||||||
|
shardStatMaps map[uint64]*expvar.Map
|
||||||
|
nodeStatMaps map[uint64]*expvar.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
type ProcessorOptions struct {
|
type ProcessorOptions struct {
|
||||||
|
@ -35,10 +47,12 @@ type ProcessorOptions struct {
|
||||||
|
|
||||||
func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) {
|
func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) {
|
||||||
p := &Processor{
|
p := &Processor{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
queues: map[uint64]*queue{},
|
queues: map[uint64]*queue{},
|
||||||
writer: writer,
|
writer: writer,
|
||||||
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
||||||
|
shardStatMaps: make(map[uint64]*expvar.Map),
|
||||||
|
nodeStatMaps: make(map[uint64]*expvar.Map),
|
||||||
}
|
}
|
||||||
p.setOptions(options)
|
p.setOptions(options)
|
||||||
|
|
||||||
|
@ -101,6 +115,11 @@ func (p *Processor) addQueue(nodeID uint64) (*queue, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p.queues[nodeID] = queue
|
p.queues[nodeID] = queue
|
||||||
|
|
||||||
|
// Create node stats for this queue.
|
||||||
|
key := fmt.Sprintf("hh_processor:node:%d", nodeID)
|
||||||
|
tags := map[string]string{"nodeID": strconv.FormatUint(nodeID, 10)}
|
||||||
|
p.nodeStatMaps[nodeID] = influxdb.NewStatistics(key, "hh_processor", tags)
|
||||||
return queue, nil
|
return queue, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,6 +132,10 @@ func (p *Processor) WriteShard(shardID, ownerID uint64, points []models.Point) e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update stats
|
||||||
|
p.updateShardStats(shardID, pointsHint, int64(len(points)))
|
||||||
|
p.nodeStatMaps[ownerID].Add(pointsHint, int64(len(points)))
|
||||||
|
|
||||||
b := p.marshalWrite(shardID, points)
|
b := p.marshalWrite(shardID, points)
|
||||||
return queue.Append(b)
|
return queue.Append(b)
|
||||||
}
|
}
|
||||||
|
@ -159,6 +182,8 @@ func (p *Processor) Process() error {
|
||||||
res <- nil
|
res <- nil
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
p.updateShardStats(shardID, pointsWrite, int64(len(points)))
|
||||||
|
p.nodeStatMaps[nodeID].Add(pointsWrite, int64(len(points)))
|
||||||
|
|
||||||
// If we get here, the write succeeded so advance the queue to the next item
|
// If we get here, the write succeeded so advance the queue to the next item
|
||||||
if err := q.Advance(); err != nil {
|
if err := q.Advance(); err != nil {
|
||||||
|
@ -170,6 +195,8 @@ func (p *Processor) Process() error {
|
||||||
|
|
||||||
// Update how many bytes we've sent
|
// Update how many bytes we've sent
|
||||||
limiter.Update(len(buf))
|
limiter.Update(len(buf))
|
||||||
|
p.updateShardStats(shardID, bytesWrite, int64(len(buf)))
|
||||||
|
p.nodeStatMaps[nodeID].Add(bytesWrite, int64(len(buf)))
|
||||||
|
|
||||||
// Block to maintain the throughput rate
|
// Block to maintain the throughput rate
|
||||||
time.Sleep(limiter.Delay())
|
time.Sleep(limiter.Delay())
|
||||||
|
@ -206,6 +233,17 @@ func (p *Processor) unmarshalWrite(b []byte) (uint64, []models.Point, error) {
|
||||||
return ownerID, points, err
|
return ownerID, points, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *Processor) updateShardStats(shardID uint64, stat string, inc int64) {
|
||||||
|
m, ok := p.shardStatMaps[shardID]
|
||||||
|
if !ok {
|
||||||
|
key := fmt.Sprintf("hh_processor:shard:%d", shardID)
|
||||||
|
tags := map[string]string{"shardID": strconv.FormatUint(shardID, 10)}
|
||||||
|
p.shardStatMaps[shardID] = influxdb.NewStatistics(key, "hh_processor", tags)
|
||||||
|
m = p.shardStatMaps[shardID]
|
||||||
|
}
|
||||||
|
m.Add(stat, inc)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *Processor) PurgeOlderThan(when time.Duration) error {
|
func (p *Processor) PurgeOlderThan(when time.Duration) error {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
|
@ -1,25 +1,36 @@
|
||||||
package hh
|
package hh
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"expvar"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdb/influxdb"
|
||||||
"github.com/influxdb/influxdb/models"
|
"github.com/influxdb/influxdb/models"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")
|
var ErrHintedHandoffDisabled = fmt.Errorf("hinted handoff disabled")
|
||||||
|
|
||||||
|
const (
|
||||||
|
writeShardReq = "write_shard_req"
|
||||||
|
writeShardReqPoints = "write_shard_req_points"
|
||||||
|
processReq = "process_req"
|
||||||
|
processReqFail = "process_req_fail"
|
||||||
|
)
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
closing chan struct{}
|
closing chan struct{}
|
||||||
|
|
||||||
Logger *log.Logger
|
statMap *expvar.Map
|
||||||
cfg Config
|
Logger *log.Logger
|
||||||
|
cfg Config
|
||||||
|
|
||||||
ShardWriter shardWriter
|
ShardWriter shardWriter
|
||||||
|
|
||||||
|
@ -36,9 +47,13 @@ type shardWriter interface {
|
||||||
|
|
||||||
// NewService returns a new instance of Service.
|
// NewService returns a new instance of Service.
|
||||||
func NewService(c Config, w shardWriter) *Service {
|
func NewService(c Config, w shardWriter) *Service {
|
||||||
|
key := strings.Join([]string{"hh", c.Dir}, ":")
|
||||||
|
tags := map[string]string{"path": c.Dir}
|
||||||
|
|
||||||
s := &Service{
|
s := &Service{
|
||||||
cfg: c,
|
cfg: c,
|
||||||
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
statMap: influxdb.NewStatistics(key, "hh", tags),
|
||||||
|
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
|
||||||
}
|
}
|
||||||
processor, err := NewProcessor(c.Dir, w, ProcessorOptions{
|
processor, err := NewProcessor(c.Dir, w, ProcessorOptions{
|
||||||
MaxSize: c.MaxSize,
|
MaxSize: c.MaxSize,
|
||||||
|
@ -93,6 +108,8 @@ func (s *Service) SetLogger(l *log.Logger) {
|
||||||
|
|
||||||
// WriteShard queues the points write for shardID to node ownerID to handoff queue
|
// WriteShard queues the points write for shardID to node ownerID to handoff queue
|
||||||
func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error {
|
func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) error {
|
||||||
|
s.statMap.Add(writeShardReq, 1)
|
||||||
|
s.statMap.Add(writeShardReqPoints, int64(len(points)))
|
||||||
if !s.cfg.Enabled {
|
if !s.cfg.Enabled {
|
||||||
return ErrHintedHandoffDisabled
|
return ErrHintedHandoffDisabled
|
||||||
}
|
}
|
||||||
|
@ -109,7 +126,9 @@ func (s *Service) retryWrites() {
|
||||||
case <-s.closing:
|
case <-s.closing:
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
s.statMap.Add(processReq, 1)
|
||||||
if err := s.HintedHandoff.Process(); err != nil && err != io.EOF {
|
if err := s.HintedHandoff.Process(); err != nil && err != io.EOF {
|
||||||
|
s.statMap.Add(processReqFail, 1)
|
||||||
s.Logger.Printf("retried write failed: %v", err)
|
s.Logger.Printf("retried write failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue