influxdb/services/hh/processor.go

219 lines
4.4 KiB
Go

package hh
import (
"encoding/binary"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/influxdb/influxdb/tsdb"
)
type Processor struct {
mu sync.RWMutex
dir string
maxSize int64
maxAge time.Duration
retryRateLimit int64
queues map[uint64]*queue
writer shardWriter
Logger *log.Logger
}
type ProcessorOptions struct {
MaxSize int64
RetryRateLimit int64
}
func NewProcessor(dir string, writer shardWriter, options ProcessorOptions) (*Processor, error) {
p := &Processor{
dir: dir,
queues: map[uint64]*queue{},
writer: writer,
Logger: log.New(os.Stderr, "[handoff] ", log.LstdFlags),
}
p.setOptions(options)
// Create the root directory if it doesn't already exist.
if err := os.MkdirAll(dir, 0700); err != nil {
return nil, fmt.Errorf("mkdir all: %s", err)
}
if err := p.loadQueues(); err != nil {
return p, err
}
return p, nil
}
func (p *Processor) setOptions(options ProcessorOptions) {
p.maxSize = DefaultMaxSize
if options.MaxSize != 0 {
p.maxSize = options.MaxSize
}
p.retryRateLimit = DefaultRetryRateLimit
if options.RetryRateLimit != 0 {
p.retryRateLimit = options.RetryRateLimit
}
}
func (p *Processor) loadQueues() error {
files, err := ioutil.ReadDir(p.dir)
if err != nil {
return err
}
for _, file := range files {
nodeID, err := strconv.ParseUint(file.Name(), 10, 64)
if err != nil {
return err
}
if _, err := p.addQueue(nodeID); err != nil {
return err
}
}
return nil
}
func (p *Processor) addQueue(nodeID uint64) (*queue, error) {
p.mu.Lock()
defer p.mu.Unlock()
path := filepath.Join(p.dir, strconv.FormatUint(nodeID, 10))
if err := os.MkdirAll(path, 0700); err != nil {
return nil, err
}
queue, err := newQueue(path, p.maxSize)
if err != nil {
return nil, err
}
if err := queue.Open(); err != nil {
return nil, err
}
p.queues[nodeID] = queue
return queue, nil
}
func (p *Processor) WriteShard(shardID, ownerID uint64, points []tsdb.Point) error {
queue, ok := p.queues[ownerID]
if !ok {
var err error
if queue, err = p.addQueue(ownerID); err != nil {
return err
}
}
b := p.marshalWrite(shardID, points)
return queue.Append(b)
}
func (p *Processor) Process() error {
p.mu.RLock()
defer p.mu.RUnlock()
res := make(chan error, len(p.queues))
for nodeID, q := range p.queues {
go func(nodeID uint64, q *queue) {
// Log how many writes we successfully sent at the end
var sent int
start := time.Now()
defer func(start time.Time) {
if sent > 0 {
p.Logger.Printf("%d queued writes sent to node %d in %s", sent, nodeID, time.Since(start))
}
}(start)
limiter := NewRateLimiter(p.retryRateLimit)
for {
// Get the current block from the queue
buf, err := q.Current()
if err != nil {
res <- nil
break
}
// unmarshal the byte slice back to shard ID and points
shardID, points, err := p.unmarshalWrite(buf)
if err != nil {
p.Logger.Printf("unmarshal write failed: %v", err)
if err := q.Advance(); err != nil {
res <- err
}
return
}
// Try to send the write to the node
if err := p.writer.WriteShard(shardID, nodeID, points); err != nil && tsdb.IsRetryable(err) {
p.Logger.Printf("remote write failed: %v", err)
res <- nil
break
}
// If we get here, the write succeeded so advance the queue to the next item
if err := q.Advance(); err != nil {
res <- err
return
}
sent += 1
// Update how many bytes we've sent
limiter.Update(len(buf))
// Block to maintain the throughput rate
time.Sleep(limiter.Delay())
}
}(nodeID, q)
}
for range p.queues {
err := <-res
if err != nil {
return err
}
}
return nil
}
func (p *Processor) marshalWrite(shardID uint64, points []tsdb.Point) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, shardID)
for _, p := range points {
b = append(b, []byte(p.String())...)
b = append(b, '\n')
}
return b
}
func (p *Processor) unmarshalWrite(b []byte) (uint64, []tsdb.Point, error) {
if len(b) < 8 {
return 0, nil, fmt.Errorf("too short: len = %d", len(b))
}
ownerID := binary.BigEndian.Uint64(b[:8])
points, err := tsdb.ParsePoints(b[8:])
return ownerID, points, err
}
func (p *Processor) PurgeOlderThan(when time.Duration) error {
p.mu.Lock()
defer p.mu.Unlock()
for _, queue := range p.queues {
if err := queue.PurgeOlderThan(time.Now().Add(-when)); err != nil {
return err
}
}
return nil
}